Python3操作Kafka


前言

操作Kafka之前,先启动Kafka:

方式一:进入Kafka安装目录,常规模式启动:
bin/kafka-server-start.sh config/server.properties

方式二:进入Kafka安装目录,进程守护模式启动kafka:
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & 

另外,Kafka的关闭命令:

进入Kafka安装目录,执行:
bin/kafka-server-stop.sh

Python操作Kafka发送字符串

编写Kafka生产者Producer:

# kafka_producer.py

from kafka import KafkaProducer
from time import sleep

def start_producer():
    producer = KafkaProducer(bootstrap_servers='192.168.0.157:9092')
    for i in range(0,100000):
        msg = 'msg is ' + str(i)
        producer.send('topic_test', msg.encode('utf-8'))
        sleep(3)

if __name__ == '__main__':
    start_producer()

编写Kafka消费者KafkaConsumer:

# kafka_consumer.py

from kafka import KafkaConsumer
import time

def start_consumer():
    consumer = KafkaConsumer('topic_test', bootstrap_servers = '192.168.0.157:9092')
    for msg in consumer:
        print(msg)
        print("topic = %s" % msg.topic) # topic default is string
        print("partition = %d" % msg.offset)
        print("value = %s" % msg.value.decode()) # bytes to string
        print("timestamp = %d" % msg.timestamp)
        print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )

if __name__ == '__main__':
    start_consumer()

以上两个代码,先运行消费者,后运行生产者,就看到消费者在监听生产者发信息。

发送结构化json待续。。

以上。