FLINK集群搭建
常用命令总结
启动/停止 flink 集群
./bin/start-cluster.sh./bin/stop-cluster.sh
启动或停止JOBMANAGER
bin/jobmanager.sh startbin/jobmanager.sh stop
添加新的 taskmanager 节点或者重启 taskmanager 节点
bin/taskmanager.sh startbin/taskmanager.sh stop
规划
主机01 | 主机02 | 主机03 |
---|---|---|
hadoop01 | hadoop02 | hadoop03 |
ip01 | ip02 | ip03 |
说明
3台主机搭建一个flink集群
解压
tar -zxvf /home/flink-1.11.1-bin-scala_2.11.tgz -C /usr/local/
小贴士:官网下载对应所需对应版本
配置环境变量
vi /etc/profile
追加如下内容
点击查看代码
export FLINK_HOME=/usr/local/flink-1.11.1/
export PATH=$PATH:$JAVA_HOME/bin:$ZK_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$FLINK_HOME/bin
刷新环境变量:
source /etc/profile
配置
./conf/flink-conf.yaml
点击查看代码
#==============================================================================
# Common
#==============================================================================
#jobmanager.rpc.address: hadoop01 HA模式不用
# The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM taskmanager.heap.size: 1024m
#==============================================================================
# Rest & web frontend
#==============================================================================
# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8081
# The address to which the REST client will connect to
#
rest.address: hadoop01
# HA settings
high-availability: zookeeper
high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_flink
high-availability.storageDir: hdfs://hadoop01:9000/flink/recovery
./conf/slaves
点击查看代码
hadoop01
hadoop02
hadoop03
小贴士:集群主机IP或者主机名
./conf/masters
点击查看代码
hadoop01:8081
hadoop02:8081
分发并修改hadoop02和hadoop03节点的ip或者主机名
分发:
scp -r /etc/profile hadoop02:/etc
scp -r /etc/profile hadoop03:/etc
scp -r ../flink-1.11.1/ hadoop02:/usr/local/
scp -r ../flink-1.11.1/ hadoop03:/usr/local/
修改配置:
hadoop02 --------./conf/flink-conf.yaml
修改如下
rest.address: hadoop02
hadoop03 -------- ./conf/flink-conf.yaml
修改如下
rest.address: hadoop03
启动
启动顺序:先启动zk和hdfs、再启动flflink。
启动集群
start-cluster.sh
关闭standalone模式
stop-cluster.sh
测试进程
检测每一台的jps进程
web访问地址:http://hadoop01:8081
web访问地址:http://hadoop02:8081
job historyserver配置:
配置:vi ./conf/flink-conf.yaml
追加如下内容
点击查看代码
# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#该目录不能创建, 则可以手动创建
jobmanager.archive.fs.dir: hdfs://hadoop01:9000/flink_completed_jobs/
# The address under which the web-based HistoryServer
listens.historyserver.web.address: 192.168.216.111
# The port under which the web-based HistoryServer listens.
historyserver.web.port: 8082
# Comma separated list of directories to monitor for completed
jobs.historyserver.archive.fs.dir: hdfs://hadoop01:9000/flink_completed_jobs/
# Interval in milliseconds for refreshing the monitored
directories.historyserver.archive.fs.refresh-interval: 10000
启动历史服务(重新启动flflink集群):
historyserver.sh start
查看进程:
jps
访问web:http://hadoop01:8082/
注意:
启动flink集群报错:
点击查看代码
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could
not find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this scheme
could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:447)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:359)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:116) ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:443) ... 13 more