Skip to main content

Flink On Yarn

已配置前置组件

  • SFTP
  • YARN
  • HDFS

配置Flink的前提是YARN、HDFS组件正常配置并保存

tip

部署模式分为 perjobsession 两种模式

下载chunjun

依赖Chunjun 1.12 版本 Chunjun源码编译

参数说明

perjob、session公共参数

参数项默认值说明是否必填
clusterModeperjob、session任务执行模式:perjob, session
flinkLibDir/data/insight_plugin1.12/flink_libflink lib path(taier本地目录)
remoteFlinkLibDir/data/insight_plugin/flink110_libflink lib 远程(sftp)路径
chunjunDistDir/data/insight_plugin1.12/chunjun-dist/chunjun plugins父级本地目录(taier本地目录)
remoteChunjunDistDir/data/insight_plugin1.12/chunjun-dist/chunjun plugins父级远程目录
pluginLoadModeshipfile插件加载类型
monitorAcceptedAppfalse是否监控yarn accepted状态任务
yarnAccepterTaskNumber3允许yarn accepter任务数量,达到这个值后不允许任务提交
prometheusHostprometheus地址,获取数据同步指标使用
prometheusPort9090prometheus,获取数据同步指标使用
classloader.dtstack-cachetrue是否缓存classloader

session特定参数

参数项默认值说明是否必填
checkSubmitJobGraphInterval60session check间隔(60 * 10s)
flinkSessionSlotCount10flink session允许的最大slot数
sessionRetryNum5session重试次数,达到后会放缓重试的频率
sessionStartAutotrue是否允许Taier启动拉起flink session
flinkSessionNameflink_sessionflink session任务名
jobmanager.heap.mb2048jobmanager内存大小
taskmanager.heap.mb1024taskmanager内存大小
参数项默认值说明是否必填
env.java.opts-XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSIncrementalMode -XX:+CMSIncrementalPacing -XX:MaxMetaspaceSize=300m -Dfile.encoding=UTF-8jvm参数
classloader.resolve-orderperjob默认为child-first,session默认为parent-first类加载模式
high-availabilityZOOKEEPERflink ha类型
high-availability.zookeeper.quorumzookeeper地址,当ha选择是zookeeper时必填
high-availability.zookeeper.path.root/flink110ha节点路径
high-availability.storageDirhdfs://ns1/dtInsight/flink110/haha元数据存储路径
jobmanager.archive.fs.dirhdfs://ns1/dtInsight/flink110/completed-jobs 任务结束后任务信息存储路径 是
metrics.reporter.promgateway.classorg.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter用来推送指标类
metrics.reporter.promgateway.hostpromgateway地址
metrics.reporter.promgateway.port9091promgateway端口
metrics.reporter.promgateway.deleteOnShutdowntrue任务结束后是否删除指标 是
metrics.reporter.promgateway.jobName110job 指标任务名
metrics.reporter.promgateway.randomJobNameSuffixtrue是否在任务名上添加随机值
state.backendRocksDB状态后端
state.backend.incrementaltrue是否开启增量
state.checkpoints.dirhdfs://ns1/dtInsight/flink110/checkpointscheckpoint路径地址
state.checkpoints.num-retained11checkpoint保存个数
state.savepoints.dirhdfs://ns1/dtInsight/flink110/savepointssavepoint路径
yarn.application-attempts3重试次数
yarn.application-attempt-failures-validity-interval3600000重试窗口时间大小
akka.ask.timeout60 s
akka.tcp.timeout60 s

更多 Flink 参数项详见官方文档

tip

Flink在自定义参数中添加Flink官方参数来调整任务提交参数信息

文件结构

tip

flinkLibDir为Flink jar 需要配置taier部署机器上的本地路径

如 flinkLibDir 配置为/opt/dtstack/flink110_lib
/opt/dtstack/flink110_lib 目录包含文件为:

├── flink-csv-1.12.7.jar
├── flink-dist_2.12-1.12.7.jar
├── flink-json-1.12.7.jar
├── flink-metrics-prometheus-1.12.7.jar
├── flink-parquet_2.12-1.12.7.jar
├── flink-python_2.12-1.12.7.jar
├── flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
├── flink-shaded-zookeeper-3.4.14.jar
├── flink-sql-avro-1.12.7.jar
├── flink-table_2.12-1.12.7.jar
├── flink-table-blink_2.12-1.12.7.jar
├── iceberg-flink-runtime-0.12.0.jar
├── log4j-1.2-api-2.16.0.jar
├── log4j-api-2.16.0.jar
├── log4j-core-2.16.0.jar
├── log4j-slf4j-impl-2.16.0.jar
├── logback-classic-1.2.11.jar
└── logback-core-1.2.11.jar
caution

配置好数据同步任务之后运行,如果提示Could not read ch.qos.logback.classic.Logger 请确认下flinkLibDir下logback等jar包是否放置

tip

FlinkPluginRoot配置的是chunjun的插件包目录 需要配置taier部署机器上的centos路径

如 flinkPluginRoot 配置为 /data/insight_plugin1.12/chunjun-dist
/data/insight_plugin1.12/chunjun-dist 目录包含文件为:

/data/insight_plugin1.12/chunjun-dist
├── chunjun-core-master.jar
├── connector
│   ├── binlog
│   │   └── chunjun-connector-binlog-master.jar
│   ├── cassandra
│   │   └── chunjun-connector-cassandra-master.jar
│   ├── clickhouse
│   │   └── chunjun-connector-clickhouse-master.jar
│   ├── db2
│   │   └── chunjun-connector-db2-master.jar
│   ├── dm
│   │   └── chunjun-connector-dm-master.jar
│   ├── doris
│   │   └── chunjun-connector-doris-master.jar
│   ├── elasticsearch7
│   │   └── chunjun-connector-elasticsearch7-master.jar
│   ├── emqx
│   │   └── chunjun-connector-emqx-master.jar
│   ├── file
│   │   └── chunjun-connector-file-master.jar
│   ├── filesystem
│   │   └── chunjun-connector-filesystem-master.jar
│   ├── ftp
│   │   └── chunjun-connector-ftp-master.jar
│   ├── gbase
│   │   └── chunjun-connector-gbase-master.jar
│   ├── greenplum
│   │   └── chunjun-connector-greenplum-master.jar
│   ├── hbase14
│   │   └── chunjun-connector-hbase-1.4-master.jar
│   ├── hdfs
│   │   └── chunjun-connector-hdfs-master.jar
│   ├── hive
│   │   └── chunjun-connector-hive-master.jar
│   ├── http
│   │   └── chunjun-connector-http-master.jar
│   ├── influxdb
│   │   └── chunjun-connector-influxdb-master.jar
│   ├── kingbase
│   │   └── chunjun-connector-kingbase-master.jar
│   ├── kudu
│   │   └── chunjun-connector-kudu-master.jar
│   ├── mongodb
│   │   └── chunjun-connector-mongodb-master.jar
│   ├── mysql
│   │   └── chunjun-connector-mysql-master.jar
│   ├── mysqld
│   │   └── chunjun-connector-mysqld-master.jar
│   ├── oceanbase
│   │   └── chunjun-connector-oceanbase-master.jar
│   ├── oracle
│   │   └── chunjun-connector-oracle-master.jar
│   ├── oraclelogminer
│   │   └── chunjun-connector-oraclelogminer-master.jar
│   ├── pgwal
│   │   └── chunjun-connector-pgwal-master.jar
│   ├── postgresql
│   │   └── chunjun-connector-postgresql-master.jar
│   ├── redis
│   │   └── chunjun-connector-redis-master.jar
│   ├── saphana
│   │   └── chunjun-connector-saphana-master.jar
│   ├── socket
│   │   └── chunjun-connector-socket-master.jar
│   ├── solr
│   │   └── chunjun-connector-solr-master.jar
│   ├── sqlserver
│   │   └── chunjun-connector-sqlserver-master.jar
│   ├── sqlservercdc
│   │   └── chunjun-connector-sqlservercdc-master.jar
│   ├── starrocks
│   │   └── chunjun-connector-starrocks-master.jar
│   └── stream
│   └── chunjun-connector-stream-master.jar
├── ddl
│   └── mysql
│   └── chunjun-ddl-mysql-master.jar
├── dirty-data-collector
│   ├── log
│   │   └── chunjun-dirty-log-master.jar
│   └── mysql
│   └── chunjun-dirty-mysql-master.jar
├── formats
│   └── pbformat
│   └── flinkx-protobuf-master.jar
├── metrics
│   ├── mysql
│   │   └── chunjun-metrics-mysql-master.jar
│   └── prometheus
│   └── chunjun-metrics-prometheus-master.jar
└── restore-plugins
└── mysql
└── chunjun-restore-mysql-master.jar

Flink 配置

caution

Flink Session 任务第一次的时候 会去启动Session 任务提交会较慢。
配置好数据同步任务之后运行,如果一直提示等待运行(No flink session found on yarn cluster),确保集群能手动正常启动session后,可以去flink_monitor.log 查看相应日志,确认是否有部分参数配置错误