kafka集群(zookeeper)


部署环境准备

kafka集群部署
ip地址            主机名          安装软件
10.0.0.131      mcwkafka01        zookeeper、kafka
10.0.0.132        mcwkafka02        zookeeper、kafka
10.0.0.133        mcwkafka03        zookeeper、kafka
10.0.0.131      kafka-manager     kafka-manager
                                  

查看防火墙,都已关闭
[root@mcwkafka01 ~]$ /etc/init.d/iptables status
-bash: /etc/init.d/iptables: No such file or directory
[root@mcwkafka01 ~]$ getenforce
Disabled
[root@mcwkafka01 ~]$ systemctl status firewalld.service 
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; enabled; vendor preset: enabled)
   Active: inactive (dead) since Tue 2022-02-08 01:35:17 CST; 35s ago

做hosts解析
[root@mcwkafka01 ~]$ tail -3 /etc/hosts
10.0.0.131      mcwkafka01
10.0.0.132    mcwkafka02
10.0.0.133    mcwkafka03


三个节点都部署jdk环境
[root@mcwkafka02 ~]$ unzip jdk1.8.0_202.zip
[root@mcwkafka02 ~]$ ls
anaconda-ks.cfg  jdk1.8.0_202  jdk1.8.0_202.zip  mcw4
[root@mcwkafka02 ~]$ 
[root@mcwkafka02 ~]$ mv jdk1.8.0_202 jdk
[root@mcwkafka02 ~]$ ls
anaconda-ks.cfg  jdk  jdk1.8.0_202.zip  mcw4
[root@mcwkafka02 ~]$ mv jdk /opt/
[root@mcwkafka02 ~]$ vim /etc/profile 
[root@mcwkafka02 ~]$ source /etc/profile
[root@mcwkafka02 ~]$ java -version
java version "1.8.0_202"
Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)
[root@mcwkafka02 ~]$ 

安装及配置zk(三台机器做如下同样操作)

[root@mcwkafka01 /usr/local/src]$ cd /usr/local/src/
[root@mcwkafka01 /usr/local/src]$ wget http://apache.forsale.plus/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

1)安装三个节点的zookeeper
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin.tar.gz
[root@mcwkafka01 /usr/local/src]$ tar xf apache-zookeeper-3.5.9-bin.tar.gz
[root@mcwkafka01 /usr/local/src]$ mkdir /data
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin  apache-zookeeper-3.5.9-bin.tar.gz
[root@mcwkafka01 /usr/local/src]$ mv apache-zookeeper-3.5.9-bin /data/zk
[root@mcwkafka01 /usr/local/src]$ 

修改三个节点的zookeeper的配置文件,内容如下所示:
[root@mcwkafka01 /usr/local/src]$ mkdir -p /data/zk/data
[root@mcwkafka01 /usr/local/src]$ ls /data/zk/
bin  conf  data  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.txt
[root@mcwkafka01 /usr/local/src]$ ls /data/zk/conf/
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@mcwkafka01 /usr/local/src]$ ls /data/bin
ls: cannot access /data/bin: No such file or directory
[root@mcwkafka01 /usr/local/src]$ ls /data/zk/bin/
README.txt    zkCli.cmd  zkEnv.cmd  zkServer.cmd            zkServer.sh          zkTxnLogToolkit.sh
zkCleanup.sh  zkCli.sh   zkEnv.sh   zkServer-initialize.sh  zkTxnLogToolkit.cmd
[root@mcwkafka01 /usr/local/src]$ cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo_sample.cfg.bak
[root@mcwkafka01 /usr/local/src]$ cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo.cfg
[root@mcwkafka01 /usr/local/src]$ vim /data/zk/conf/zoo.cfg ##清空之前的内容,配置成下面内容
[root@mcwkafka01 /usr/local/src]$ cat /data/zk/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data/zookeeper
dataLogDir=/data/zk/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=10.0.0.131:2888:3888
server.2=10.0.0.132:2888:3888
server.3=10.0.0.133:2888:3888
[root@mcwkafka01 /usr/local/src]$ 
===============
配置参数说明:
server.id=host:port:port:表示了不同的zookeeper服务器的自身标识,作为集群的一部分,每一台服务器应该知道其他服务器的信息。
用户可以从"server.id=host:port:port" 中读取到相关信息。
在服务器的data(dataDir参数所指定的目录)下创建一个文件名为myid的文件,这个文件的内容只有一行,指定的是自身的id值。
比如,服务器"1"应该在myid文件中写入"1"。这个id必须在集群环境中服务器标识中是唯一的,且大小在1~255之间。
这一样配置中,zoo1代表第一台服务器的IP地址。第一个端口号(port)是从follower连接到leader机器的端口,第二个端口是用来进行leader选举时所用的端口。
所以,在集群配置过程中有三个非常重要的端口:clientPort=2181、port:2888、port:3888===============
 
注意:如果想更换日志输出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,还需要修改zkServer.sh文件,大概修改方式地方在
125行左右,内容如下:
[root@mcwkafka03 /usr/local/src]$ cp /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak
[root@mcwkafka03 /usr/local/src]$ vim /data/zk/bin/zkServer.sh
.......
125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"     #添加这一行
126 if [ ! -w "$ZOO_LOG_DIR" ] ; then
127 mkdir -p "$ZOO_LOG_DIR"
128 fi
 [root@mcwkafka03 /usr/local/src]$ diff /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak
125d124
<   ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"

在启动zookeeper服务之前,还需要分别在三个zookeeper节点机器上创建myid,方式如下:
[root@mcwkafka01 /usr/local/src]$ mkdir /data/zk/data/zookeeper/
[root@mcwkafka01 /usr/local/src]$ echo 1 >  /data/zk/data/zookeeper/myid


另外两个节点的myid分别为2、3(注意这三个节点机器的myid决不能一样,配置文件等其他都是一样配置)
[root@mcwkafka02 /usr/local/src]$ echo 2 >  /data/zk/data/zookeeper/myid
[root@mcwkafka03 /usr/local/src]$ echo 3 >  /data/zk/data/zookeeper/myid

启动三个节点的zookeeper服务
[root@mcwkafka01 /usr/local/src]$  /data/zk/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@mcwkafka01 /usr/local/src]$ ps -ef|grep zookeeper
root       6997      1 58 21:53 pts/0    00:00:25 /opt/jdk/bin/java -Dzookeeper.log.dir=/data/zk/bin/../logs -Dzookeeper.log.file=zookeeper-root-server-mcwkafka01.log -Dzookeepe.root.logger=INFO,CONSOLE -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p -cp /data/zk/bin/../zookeeper-server/target/classes:/data/zk/bin/../build/classes:/data/zk/bin/../zookeeper-server/target/lib/*.jar:/data/zk/bin/../build/lib/*.jar:/data/zk/bin/../lib/zookeeper-jute-3.5.9.jar:/data/zk/bin/../lib/zookeeper-3.5.9.jar:/data/zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/zk/bin/../lib/slf4j-api-1.7.25.jar:/data/zk/bin/../lib/netty-transport-native-unix-common-4.1.50.Final.jar:/data/zk/bin/../lib/netty-transport-native-epoll-4.1.50.Final.jar:/data/zk/bin/../lib/netty-transport-4.1.50.Final.jar:/data/zk/bin/../lib/netty-resolver-4.1.50.Final.jar:/data/zk/bin/../lib/netty-handler-4.1.50.Final.jar:/data/zk/bin/../lib/netty-common-4.1.50.Final.jar:/data/zk/bin/../lib/netty-codec-4.1.50.Final.jar:/data/zk/bin/../lib/netty-buffer-4.1.50.Final.jar:/data/zk/bin/../lib/log4j-1.2.17.jar:/data/zk/bin/../lib/json-simple-1.1.1.jar:/data/zk/bin/../lib/jline-2.14.6.jar:/data/zk/bin/../lib/jetty-util-ajax-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-util-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-servlet-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-server-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-security-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-io-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-http-9.4.35.v20201120.jar:/data/zk/bin/../lib/javax.servlet-api-3.1.0.jar:/data/zk/bin/../lib/jackson-databind-2.10.5.1.jar:/data/zk/bin/../lib/jackson-core-2.10.5.jar:/data/zk/bin/../lib/jackson-annotations-2.10.5.jar:/data/zk/bin/../lib/commons-cli-1.2.jar:/data/zk/bin/../lib/audience-annotations-0.5.0.jar:/data/zk/bin/../zookeeper-*.jar:/data/zk/bin/../zookeeper-server/src/main/resources/lib/*.jar:/data/zk/bin/../conf:.:/opt/jdk/lib:/opt/jdk/jre/lib:/opt/jdk/lib/tools.jar -Xmx1000m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /data/zk/bin/../conf/zoo.cfg
root       7033   4049  0 21:54 pts/0    00:00:00 grep --color=auto zookeeper
[root@mcwkafka01 /usr/local/src]$ lsof -i:2181
-bash: lsof: command not found

查看三个节点的zookeeper角色
[root@mcwkafka01 /usr/local/src]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@mcwkafka02 /usr/local/src]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@mcwkafka03 /usr/local/src]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

2)安装kafka(三个节点同样操作)

下载地址:http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
[root@mcwkafka01 /usr/local/src]$ wget http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin.tar.gz  kafka_2.11-1.0.1.tgz
[root@mcwkafka01 /usr/local/src]$ tar xf kafka_2.11-1.0.1.tgz
[root@mcwkafka01 /usr/local/src]$ mv kafka_2.11-1.0.1 /data/kafka

进入kafka下面的config目录,修改配置文件server.properties:
[root@mcwkafka03 /usr/local/src]$ cp /data/kafka/config/server.properties /data/kafka/config/server.properties.bak
[root@mcwkafka03 /usr/local/src]$ ls /data/kafka/
bin  config  libs  LICENSE  NOTICE  site-docs
[root@mcwkafka03 /usr/local/src]$ ls /data/kafka/config/
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties    tools-log4j.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties      zookeeper.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               server.properties.bak
[root@mcwkafka03 /usr/local/src]$ ls /data/kafka/bin/
connect-distributed.sh        kafka-consumer-groups.sh             kafka-reassign-partitions.sh    kafka-streams-application-reset.sh  zookeeper-server-start.sh
connect-standalone.sh         kafka-consumer-perf-test.sh          kafka-replay-log-producer.sh    kafka-topics.sh                     zookeeper-server-stop.sh
kafka-acls.sh                 kafka-delete-records.sh              kafka-replica-verification.sh   kafka-verifiable-consumer.sh        zookeeper-shell.sh
kafka-broker-api-versions.sh  kafka-log-dirs.sh                    kafka-run-class.sh              kafka-verifiable-producer.sh
kafka-configs.sh              kafka-mirror-maker.sh                kafka-server-start.sh           trogdor.sh
kafka-console-consumer.sh     kafka-preferred-replica-election.sh  kafka-server-stop.sh            windows
kafka-console-producer.sh     kafka-producer-perf-test.sh          kafka-simple-consumer-shell.sh  zookeeper-security-migration.sh
[root@mcwkafka03 /usr/local/src]$ 
[root@mcwkafka01 /usr/local/src]$ vim /data/kafka/config/server.properties
[root@mcwkafka01 /usr/local/src]$ cat /data/kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://10.0.0.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0



其他两个节点的server.properties只需要修改下面两行,其他配置都一样
[root@mcwkafka02 /usr/local/src]$ egrep  "listeners|broker" /data/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.0.0.132:9092


[root@mcwkafka03 /usr/local/src]$ egrep  "listeners|broker" /data/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://10.0.0.133:9092


启动三个节点的kafka服务
[root@mcwkafka01 /usr/local/src]$ nohup /data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties >/dev/null 2>&1 &
[1] 7224
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin.tar.gz  kafka_2.11-1.0.1.tgz
[root@mcwkafka01 /usr/local/src]$ lsof -i:9092
[root@mcwkafka01 /usr/local/src]$ ss -lntup|grep 9092
[root@mcwkafka01 /usr/local/src]$ 
[root@mcwkafka03 /usr/local/src]$ ss -lntup|grep 9092
tcp    LISTEN     0      50        ::ffff:10.0.0.133:9092                 :::*                   users:(("java",pid=5992,fd=103))
[root@mcwkafka02 /usr/local/src]$ ss -lntup|grep 9092
tcp    LISTEN     0      50        ::ffff:10.0.0.132:9092                 :::*                   users:(("java",pid=6766,fd=103))

验证服务

随便在其中一台节点主机执行
/data/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181 --replication-factor 1 --partitions 1 --topic test

[root@mcwkafka01 /usr/local/src]$ /data/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

/data/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181

[root@mcwkafka02 /usr/local/src]$ /data/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181
test

到此,kafka集群环境已部署完成!

客户端连接工具

连接配置:

 连接查看,之前命令行创建的test

参考:https://www.cnblogs.com/kevingrace/p/9021508.html

相关