Kafka学习笔记1——Kafka的安装和启动
一、准备工作
1. 安装JDK
可以用命令 java -version 查看版本2.下载Kafka
这里下载的是二进制版本(V2.3.1)。kafka自带打包和配置好 zookeeper,无需单独安装zookeeper。解压后,可以看到目录结构如下: kafka Kafka 根目录 ├─bin Kafka 运行的脚本 │ ├─connect-distributed.sh 连接 kafka 集群模式 │ ├─connect-standalone.sh 连接 kafka 单机模式 │ ├─kafka-acls.sh │ ├─kafka-broker-api-versions.sh │ ├─kafka-configs.sh 配置管理脚本 │ ├─kafka-console-consumer.sh kafka 消费者控制台 │ ├─kafka-console-producer.sh kafka 生产者控制台 │ ├─kafka-consumer-groups.sh kafka 消费者组相关信息 │ ├─kafka-consumer-perf-test.sh kafka 消费者性能测试 │ ├─kafka-delegation-tokens.sh │ ├─kafka-delete-records.sh 删除低水位的日志文件 │ ├─kafka-dump-log.sh │ ├─kafka-log-dirs.sh kafka消息日志目录 │ ├─kafka-mirror-maker.sh 不同数据中心 kafka 集群复制工具 │ ├─kafka-preferred-replica-election.sh 触发 preferred replica 选举 │ ├─kafka-producer-perf-test.sh kafka 生产者性能测试脚本 │ ├─kafka-reassign-partitions.sh 分区重分配脚本 │ ├─kafka-replica-verification.sh 复制进度验证脚本 │ ├─kafka-run-class.sh │ ├─kafka-server-start.sh 启动 kafka 服务 │ ├─kafka-server-stop.sh 停止 kafka 服务 │ ├─kafka-streams-application-reset.sh │ ├─kafka-topics.sh kafka主题 │ ├─kafka-verifiable-consumer.sh 可检验的 kafka 消费者 │ ├─kafka-verifiable-producer.sh 可检验的 kafka 生产者 │ └─trogdor.sh │ ├─windows 在 Windows 系统下执行的脚本目录 │ │ ├─connect-distributed.bat │ │ └─ … 更多 windows 下执行的脚本文件 │ ├─zookeeper-security-migration.sh │ ├─zookeeper-server-start.sh 启动 zk 服务 │ ├─zookeeper-server-stop.sh 停止 zk 服务 │ └─zookeeper-shell.sh zk 客户端脚本 │ ├─config Kafka、zookeeper 等配置文件 │ ├─connect-console-sink.properties │ ├─connect-console-source.properties │ ├─connect-distributed.properties │ ├─connect-file-sink.properties │ ├─connect-file-source.properties │ ├─connect-log4j.properties │ ├─connect-standalone.properties │ ├─consumer.properties 消费者配置 │ ├─log4j.properties │ ├─producer.properties 生产者配置 │ ├─server.properties kafka 服务配置 │ ├─tools-log4j.properties │ ├─trogdor.conf │ └─zookeeper.properties zk 服务配置 │ ├─libs Kafka 运行的依赖库 │ ├─activation-1.1.1.jar │ └─... │ ├─site-docs/ Kafka 相关文档 │ ├─kafka_2.12-2.3.1-site-docs.tgz二、启动
1. 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2. 启动kafka
bin/kafka-server-start.sh config/server.properties
可以通过 jps 命令查看 Kafka 服务进程是否已经启动,
jps -l
三、生产和消费
1. 主题
在前面的目录结构中,我们可以知道 bin 下提供的 kafka-topics.sh 脚本,与主题相关。- 创建主题
bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4
--zookeeper 指定了 Kafka 所连接的 ZooKeeper 服务地址,
--create 是创建主题的动作指令,
--bootstrap-server 指定了连接的 Kafka 集群地址,
--topic 指定了所要创建主题的名称,--replication-factor 指定了副本因子,
--partitions 指定了分区个数。
即创建了一个分区为 4、副本因子为 3 的主题 topic-demo。- 查看主题
bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-demo
- 查看所有的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
2. 生产和消费
bin 目录下提供了两个脚本 kafka-console-producer.sh 和 kafka-console-consumer.sh,通过控制台收发消息。- 生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo--broker-list 指定了连接的 Kafka 集群地址, --topic 指定了发送消息时的主题。 上述命令报错:Error while fetching metadata with correlation id 1 : {topic-demo=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 这是因为无法识别 hostname。
listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://your.host.name:9092
改为
listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092
保存、重启 kafka 即可。再次发送消息没有报错了。
- 消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
--bootstrap-server 指定了连接的 Kafka 集群地址,
--topic 指定了消费者订阅的主题。 可以看到,当使用 kafka-console-producer.sh 脚本发送消息至主题 topic-demo后,当前终端窗口会同步刚刚输入的消息内容,这说明,消费者消费了消息(这话怎么这么拗口~)