Kafka服务器部署


Kafka服务器部署

一、 部署环境

  1. JDK 1.8
  2. Zookeeper-3.6.3 (可选,单独部署)

二、 名词解释

名词 解释
Producer 生产者,消息的产生者,是消息的入口
Broker Broker是Kafka实例,每个服务器上有一个或多个Kafka的实例
Topic 消息的主题,可以理解为消息的分类
Partition Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高Kafka的吞吐量
Replication Replication是给每个partition提供备份功能,当一个Broker挂掉时可以迅速实现故障切换(failover)。可以在创建或修改topic时指定replica factor,来设定备份数目
Message 每一条发送的消息主体
Consumer 消费者,即消息的消费方,是消息的出口
Consumer Group 多个消费组组成一个消费者组,在Kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这是为了提高Kafka的吞吐量
Zookeeper Kafka集群依赖zookeeper来保存集群的元信息,来保证系统的可用性。

三、参数配置

参数 说明 配置值
broker.id Broke为集群中的唯一标识,必须为正数,kafka根据id识别broker机器。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 0
num.network.threads 处理网络请求的线程数量 3
num.io.threads 处理磁盘IO的线程数量 8
socket.send.buffer.bytes 发送套接字的缓冲区大小 102400
socket.receive.buffer.bytes 接受套接字的缓冲区大小 102400
socket.request.max.bytes 请求套接字的缓冲区大小 104857600
log.dirs Kafka数据的存放地址,多个地址则用逗号分割:/kafka/kafka-logs-1/kafka/kafka-logs-2 根据自己的情况选定
num.partitions topic在当前broker上的分片个数 1
num.recovery.threads.per.data.dir 用来恢复和清理data下数据的线程数量 1
offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数 1
transaction.state.log.replication.factor 事务主题的复制因子 1
transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置 1
log.retention.hours 一个日志的最小存活时间,可以被删除 168
log.retention.check.interval.ms 文件大小检查的周期时间,是否触发则在log.cleanup.policy中设置的策略 1073741824
zookeeper.connect zookeeper集群的地址,可以是多个,多个之间用逗号分割,一般端口都为2181;例:hostname1:port1,hostname2:port2,hostname3:port3 300000
zookeeper.connection.timeout.ms zookeeper链接超时时间 2181
group.initial.rebalance.delay.ms 分组协调设置 18000

四、Kafka安装步骤

  1. 获取Kafka并解压 https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
#输入命令:
$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0
  1. 启动Zookeeper环境(本地环境中必须安装JDK1.8+,本步骤采用内置zookeeper)

    # 输入命令:
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    
  2. 启动Kafka环境( 新终端会话 )

    # 输入命令:
    $ bin/kafka-server-start.sh config/server.properties
    
  3. 创建一个主题topic来存储事件(新终端会话 )

    # 输入命令:
    $ bin/kafka-topics.sh --create --partitions 1 -replication-factor 1 --topic quickstart-events--bootstrap-server localhost:9092
    
  4. 主题中写入一些事件

    # 输入命令:
    $ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
    > This is my first event
    > This is my second event
    

    停止生产者客户端Ctrl+C

  5. 阅读事件(新终端会话 )

    # 输入命令:
    
    $ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
    # 显示
    This is my first event
    
    This is my second event
    

    停止消费者客户端Ctrl+C

五、Zookeeper安装步骤

? 安装独立的Zookeeper服务

  1. 下载zookeeper:

https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

  1. 解压安装

    # 输入命令:
    
    $ tar -xzf apache-zookeeper-3.6.3-bin.tar.gz
    
    $ cd apache-zookeeper-3.6.3-bin/conf
    
  2. 复制并修改配置文件

    # 输入命令:
    cp zoo_sample.cfg zoo.cfg
    
    更改zookeeper的数据保存地址dataDir= (数据保存文件夹地址)
    新增日志保存地址dataLogDir=(日志保存文件夹地址)
    
  3. 启动Zookeeper环境

    # 输入命令:
    
    ./zkServer.sh start --启动服务
    
    ./zkServer.sh status --检查启动情况(选)
    
    ps -ef | grep zookeeper --可查看进程判断是否启动(选)
    
    /zkServer.sh stop --关闭服务(选)
    
  4. 编辑profile文件,并加入zookeeper的环境变量

    输入命令:
    
    export ZOOKEEPER_HOME=/usr/local/zookeeper/ apache-zookeeper-3.6.3-bin /
    
    export PATH=$ZOOKEEPER_HOME/bin:$PATH
    
  5. 防火墙中添加允许zookeeper访问的端口

    # 输入命令:
    firewall-cmd --permanent --zone=public --add-port=2181/tcp
    
  6. 重启防火墙

    # 输入命令:
    systemctl restart firewalld.service
    
    

注意事项

  1. 注意安装和配置的路径,根据自己的实际情况设置。

  2. 注意防火墙,需将端口号设置成可访问。

  3. 使用独立的zookeeper时将内置的zookeeper关闭。

SDK参考版本

  1. JDK1.8

  2. kafka_2.13-3.0.0:https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz

  3. zookeeper-3.6.3:https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz