Flume对接Kafka详细过程
一、为什么要集成Flume和Kafka
一般使用Flume+Kafka来完成实时流式的日志处理,后面再连接上Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,可以把它理解为一个数据库,可以存放一段时间的数据。
因此数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。
二、flume 与 kafka 的关系及区别
- Flume
是一个分布式、高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也可以将数据写到各种数据接受方,用于转发数据。Flume的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具
- flume分为sources,channels,sinks三部分,每一部分都可以根据需求定制。
- 与kafka相比,flume 的数据采集部分做的很好,可以定制很多数据源,减少开发量。
- Kafka
是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn
的活跃数据,及日志数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。
- kafka 是分布式消息中间件,自带存储,提供 push 和 pull
存取数据的功能,是一个非常通用消息缓存的系统,可以有许多生产者和很多的消费者共享多个主题
三、Flume 对接 Kafka(详细步骤)
1. 增加flume文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# netcat 监听端口
a1.sources.r1.type = netcat
a1.sources.r1.bind =master1
a1.sources.r1.port = 10000
a1.sources.r1.channels = c1
# 一行的最大字节数
a1.sources.r1.max-line-length = 1024000
# channels具体配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# KAFKA_sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hello
a1.sinks.k1.brokerList = master1:9092,slave1:9092,slave2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
2. 启动zookeeper集群
[hadoop@master1 ~]# zkServer.sh start
3. 启动kafka集群
[hadoop@master1 ~]# kafka-server-start.sh /usr/local/src/kafka/config/server.properties
kafka后台运行:kafka-server-start.sh /usr/local/src/kafka/config/server.properties 1>/dev/null 2>&1 &
4.创建topic
[hadoop@master1 ~]# kafka-topics.sh --create --zookeeper master1:2181,slave1:2181,slave2:2181 --replication-factor 2 --topic hello --partitions 1
5. 查看 topic是否创建成功
[hadoop@master1 ~]# kafka-topics.sh --list --zookeeper master1:2181,slave1:2181,slave2:2181
6. 创建kafka消费者
[hadoop@master1 ~]# kafal-console-consumer.sh --zookeeper master1:2181,slave1:2181,slave2:2181 --topic hello --from-beginning
7. 启动flume
[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console
8. 向flume端口发送消息
[hadoop@master1 ~]# telnet master1 10000
9. 在kafka消费者观测接收到的信息
如有错误,欢迎私信纠正,谢谢支持!