大数据Spark实时处理--数据收集1(Flume)
- 基于Flume构建分布式日志收集
- 0)Flume是一个日志数据的收集工具
- 1)前提是数据采集,落在了log server的磁盘上,这一步已经完成。然后呢,我们需要通过什么样的技术/框架,将这些分散在log server上的一些日志,收集到统一的地方,比如说是HDFS上或者是kafka,最后来进行后续的操作。
- 2)大数据离线/实时处理,比如将log server上的日志数据落在大数据平台上HDFS。要使用Hadoop的话,必须在log server的服务器上配置Hadoop相关get权限,才能使用hadoop fs -put命令。实际上给这些服务器统一配置Hadoop相关权限,首先考虑安全性,配置Hadoop的话数据会对外全部暴露,不安全;其次日志数据在HDFS上进行压缩,节省空间,而压缩需写代码进行压缩,太繁琐了。最后容错问题。若传输数据的put挂掉了,如何做容错。这些都是没有分布式日志收集框架存在的问题。
- 3)目前存在将磁盘上的日志数据收集到大数据平台的方式:使用hdaoop的put、cp、然后定时调度crontab/azkaban,这种方式比较麻烦。而现在推荐的是HADOOP的Flume框架。
- 4)离线数据收集存到HDFS,没问题。实时数据收集使用Flume,然存到HDFS或者kafka上,也是没问题的。数据存储ok后,再进行分布式数据的处理。
- 5)大数据处理过程
- 数据收集----->某个地方(HDFS)----->分布式数据的处理
- 6)数据来源问题:业务数据:订单(一般是关系型数据库里)。爬虫数据。购买数据。用户产生的日志数据,采集过来的日志数据。针对业务数据和用户产生的日志数据收集的方法是不一样的。因为针对不同的数据库采用的方法不一样,针对实时和离线采用的方法不一样。
- 7)采集VS收集
- 采集偏向用户生成的数据。收集偏向于移动,A端移至B端,把存放在各个日志服务器上的数据收集到HDFS上。
- Flume概述
- 1)官网Welcome to Apache Flume — Apache Flume
- 2)收集、聚合、移动海量日志数据
- Flume ==》 HDFS ==》 离线处理
- Flume ==》kafka ==》实时处理
- 3)体系架构和核心三大组件
- Flume的使用很简单,就是文件配置、查字典Documentation — Apache Flume
- 三大核心组件--Source Flume 1.9.0 User Guide — Apache Flume
- 从哪里收集数据?Source就可以从指定的源端,把数据收集过来。
- 都有哪些Source:taildir(*****)、avro、exec、kafka、spooling
- 三大核心组件--Channel
- 是一个数据流通平台,可以看做数据存储池,将数据进行一个简单的汇总。
- 有哪些数据存储:Memory(内存)、file(磁盘)、kafka
- 三大核心组件--Sink
- 读取Channel里的数据,写到目的地。
- HDFS、Logger、kafka、HBase、Avro
- Flume-NG版本中,有一个Agent概念
- Agent是Flume中最小的独立运行单位,对应一个JVM
- Agent是由Source、Channel、Sink组成
- 业界数据收集框架对比
- Apache Flume
- Alibaba DataX
- Apache Chukwa
- Alibaba Canal
- Elastic LogStash
- Fluented
- Flume环境部署及前置要求
- 1)环境部署的系统要求:1.8的JRE、内存大小、磁盘空间、路径的读写权限。
- 2)部署操作:
- 将flume-ng的解压路径添加到系统环境变量里,并启动;
- 更改配置文件flume-env.sh中的JAVA_HOME
- 然后主要使用的是flume-ng
[hadoop@spark000 ~]$ cd software/ [hadoop@spark000 software]$ ls flume-ng-1.6.0-cdh5.16.2.tar.gz [hadoop@spark000 ~]$ cd app/ [hadoop@spark000 app]$ ls apache-flume-1.6.0-cdh5.16.2-bin [hadoop@spark000 software]$ cd [hadoop@spark000 ~]$ cd app/ [hadoop@spark000 app]$ ls apache-flume-1.6.0-cdh5.16.2-bin [hadoop@spark000 app]$ cd apache-flume-1.6.0-cdh5.16.2-bin/ [hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ pwd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin [hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ vi ~/.bash_profile [hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ source ~/.bash_profile export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin export PATH=$FLUME_HOME/bin:$PATH [hadoop@spark000 conf]$ cp flume-env.sh.template flume-env.sh
[hadoop@spark000 conf]$ vi flume-env.sh
# export JAVA_HOME=/usr/lib/jvm/java-6-sun
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_202
- 3)怎么用flume呢?
[hadoop@spark000 ~]$ cd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin [hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ cd bin [hadoop@spark000 bin]$ pwd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/bin [hadoop@spark000 bin]$ ls flume-ng [hadoop@spark000 bin]$ ./flume-ng version Flume 1.6.0-cdh5.16.2
- Flume经典部署案例
- 1)单节点Flume部署
- 放置在配置文件中:
[hadoop@spark000 ~]$ cd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin [hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ cd conf
[hadoop@spark000 conf]$ vi example.conf [hadoop@spark000 conf]$
# 作用:监听44444端口上的数据,输出到logger控制台 # example.conf: A single-node Flume configuration #a1是agent的名字 #r1、k1、c1分别是agent三大核心组件的名字 # Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source的type、bind、port # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置channel的type # Use a channel which buffers events in memory a1.channels.c1.type = memory # 配置sink的type # Describe the sink a1.sinks.k1.type = logger # source关联的是哪个channel, sink从哪个channel里取数据 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 2)启动flume
- 命令行
flume-ng agent\ --conf 指向$FLUME_HOME下的conf --conf-file 指向自己开发的example.conf配置文件的路径$FLUME_HOME/config/example.conf --name 指向agent的名字a1 -Dflume.root.logger=INFO,console 指日志使用Info的级别在控制台可以看到
[hadoop@spark000 conf]$ flume-ng agent \ > --conf $FLUME_HOME/conf \ > --conf-file $FLUME_HOME/config/example.conf \ > --name a1 \ > -Dflume.root.logger=INFO,console
- 3)完成了官方最基本的操作,监听指定的端口,将数据采过来
- 在spark000界面运行上述命令行,再新打开一个spark000命令行界面,输入数据
- 然后可以在第一个spark000界面,看到接收的数据
[hadoop@spark000 config]$ telnet localhost 44444 Trying ::1... Connected to localhost. Escape character is '^]'. jieqiong OK jiqiong^H OK 1 OK 2 OK 3 OK 4 OK
- 其中接收的信息
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6A 69 65 71 69 6F 6E 67 0D jieqiong. }
- 4)进程Application就是agent的JVM,用完之后,就可以停掉了。
[hadoop@spark000 config]$ jps 8033 Jps 7804 Application
- Event
- 1)上述接收的信息,其中Event
- 表层理解为:从Source端采集过来一条数据,为一个Event
- Event是Flume数据传输的基本单元。
- 数据从Source到Sink,这个过程Event就是作为单位进行处理的。
- 以Event的形式将数据从源头传送到最终的目的地。
- Event是由headers+body来构成的。
- 目前headers是空的,默认情况下为空。headers很重要,在后面会用到。
- body是由字节数组构成的(byte array)
- Flume经典部署方案
- 0)Flume 1.9.0 User Guide — Apache Flume
- 1)在部署案例1中的方案在做测试学习的时候,单层agent的采集方案是没问题。
- 如果Source、Channel、Sink任意一节点挂掉的话,整一个流程不能实现高可用高可靠的。
- 2)多agent的串联
- 两个agent的,第一个是foo,第二个是bar,这个是如何构建的呢?
- 工作当中可能会有多个agent串联起来,形成一条Event数据线,然后进行传输数据
- 注意:前面的agent和后面的agent通信的时候,左面agent输出采用avro方式,右面接收的时候,也要采用avro的方式。
- 在不同的agent/机器上,要经过一个RPC的一个传输,这里是使用的AVRO的方式来进行交互的。
- 即,前面的Sink和后面的Source务必要采用AVRO的方式
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channels = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545 a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
- 3)多agent的串并联
- 采集收集webserver上日志方案时,一般采用这种方式。
- 每台机器上都要对应一个agent。每一层的每个机器上的数据通过avro source的方式收集到agent,收集到之后,再通过channel、sink。agent1、2、3是一样的。
- 按照常理,之前我们是将日志落在webserver上的磁盘上,即一个文件。若采用单节点的agent方式,理论上是行的通的,但这里是有安全隐患的。HDFS的信息是直接暴露出来的,无安全性可言。若此时HDFS升级,升级的过程中,sink是写不过来的,无容错性。
- 采用多agent的串并联,在agent4的source处再做一次聚合,即第二层的flume的agent。通过sink和source的关系,将数据聚合在第二层的agent里,再通过channel和sink到HDFS上。
- 这样带来的好处:若前面有10000+台机器,同时访问HDFS的话,并发量、吞吐量太大。中间多做了一次聚合操作,安全性提高,直接访问HDFS的并发量减少。实际上,第二层的flume和HDFS是在一个机房的,第一层的flume和webserver是不在大数据的范围之内的。
- 第一层的flume的agent是在一个配置文件中的。
- 4)一个source出来到多个channel
- 每个组合都是根据实际的业务场景配置的。
- 从source进来数据123,三个channel里的数据是根据选择器决定三个channel里通过的数据是什么。
- 若选择器是复制属性replicating,则三个channel里的数据相同。
- 若是multiplexing,则三个channel里通过不同的数据。
- Flume收集文件数据到HDFS需求分析
- 0)具体参数设置:Flume 1.9.0 User Guide — Apache Flume
- 1)若webserver实时产生数据,在/home/hadoop/logs中的access.log实时接收数据,如何将access.log收集到HDFS之上呢?
- 2)文件数据收集到HDFS上,当拿到一个Flume这种agent配置的功能的时候,首先考虑Agent选型:
- Source
- 监听文件时,type值为exec,命令行是tail -f access.log
- Channel
- 对于数据量不大的时候,type值为memory;要确保数据不丢失type值为file
- Sink
- type值为hdfs,可以配置多长时间滚一次数据,或多少数据量滚一次数据
- hdfs.path值为hdfs://namenode/flume/webdata/
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute
- Flume收集文件数据到HDFS
- 1)写Flume配置文件
[hadoop@spark000 config]$ ls flume-exec-hdfs.conf [hadoop@spark000 config]$ pwd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config [hadoop@spark000 config]$ vi flume-exec-hdfs.conf
#define agent
#agent 名字即 exec-hdfs-agent exec-hdfs-agent.sources = exec-source exec-hdfs-agent.channels = exec-memory-channel exec-hdfs-agent.sinks = hdfs-sink #define source exec-hdfs-agent.sources.exec-source.type = exec exec-hdfs-agent.sources.exec-source.command = tail -F ~/data/data.log exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c #define channel exec-hdfs-agent.channels.exec-memory-channel.type = memory #define sink exec-hdfs-agent.sinks.hdfs-sink.type = hdfs exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://spark000:8020/data/flume/tail exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text exec-hdfs-agent.sinks.hdfs-sink.hdfs.batchSize = 10 #bind source and sink to channel exec-hdfs-agent.sources.exec-source.channels = exec-memory-channel exec-hdfs-agent.sinks.hdfs-sink.channel = exec-memory-channel
- 2) 在data目录下创建data.log文件:touch data.log
- 3)启动hdfs、flume配置文件
[hadoop@spark000 sbin]$ ./start-dfs.sh [hadoop@spark000 sbin]$ ./start-yarn.sh [hadoop@spark000 config]$ jps 7840 ResourceManager 10849 Jps 8170 NodeManager 7308 NameNode 7661 SecondaryNameNode 7470 DataNode [hadoop@spark000 config]$ flume-ng agent \ > --conf $FLUME_HOME/conf \ > --conf-file $FLUME_HOME/config/flume-exec-hdfs.conf \ > --name exec-hdfs-agent \ > -Dflume.root.logger=INFO,console
- 4)往日志里追加信息
[hadoop@spark000 data]$ pwd /home/hadoop/data [hadoop@spark000 data]$ echo jieqiong >> data.log [hadoop@spark000 data]$ echo hello >> data.log
- 5)在启动的界面里可以查看在hadoop上的地址
2021-11-04 14:15:41,591 (hdfs-hdfs-sink-call-runner-3) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://spark000:8020/data/flume/tail/FlumeData.1636006511532.tmp to hdfs://spark000:8020/data/flume/tail/FlumeData.1636006511532 2021-11-04 14:14:20,492 (hdfs-hdfs-sink-call-runner-8) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://spark000:8020/data/flume/tail/FlumeData.1636006430434.tmp to hdfs://spark000:8020/data/flume/tail/FlumeData.1636006430434
- 6)查看在hadoop上的存储
[hadoop@spark000 data]$ hadoop fs -text /data/flume/tail/FlumeData.1636006511532 [hadoop@spark000 data]$ hadoop fs -text /data/flume/tail/FlumeData.1636006430434
- 数据查到了,说明数据可以顺利写道HDFS上的,但这种方式并不好
- 我们现在使用的是使用linux命令来添加数据的:tail -F ~/data/data.log
- tail -F这种可靠性并不是很好的,程序容易挂掉。而且只能监控一个文件,不能监控一个文件夹。
- 所以现在采用exec这种方式,来收集文件的数据到HDFS上的局限性很大。所以接下来调整一种方式。
- Flume收集文件夹数据到HDFS
- 1)具体参数设置:Flume 1.9.0 User Guide — Apache Flume
- 2)文件夹数据收集到HDFS上,首先考虑Agent选型:
- Source
- 监听文件夹时,type值为spooldir
- Channel
- 对于数据量不大的时候,type值为memory;要确保数据不丢失type值为file
- Sink
- type值为hdfs,可以配置多长时间滚一次数据,或多少数据量滚一次数据
- hdfs.path值为hdfs://namenode/flume/webdata/
- 3)Flume配置文件
[hadoop@spark000 config]$ vi flume-spooling.conf [hadoop@spark000 config]$ pwd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config
#define agent = spooling-hdfs-agent spooling-hdfs-agent.sources = spooling-source spooling-hdfs-agent.channels = spooling-memory-channel spooling-hdfs-agent.sinks = hdfs-sink #define source spooling-hdfs-agent.sources.spooling-source.type = spooldir spooling-hdfs-agent.sources.spooling-source.spoolDir = /home/hadoop/data/spool_data #先不添加这句话,后续添加。只要是以.txt格式结尾的文件都不收集 #spooling-hdfs-agent.sources.spooling-source.ignorePattern = ^(.)*\\.txt$ #define channel spooling-hdfs-agent.channels.spooling-memory-channel.type = memory #define sink
#这里输出到文件系统 spooling-hdfs-agent.sinks.hdfs-sink.type = hdfs spooling-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://spark000:8020/data/flume/spooling #配置压缩的文件类型
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream #当使用了压缩的文件类型,一定要配置Codec
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.codeC=org.apache.hadoop.io.compress.GzipCodec #配置显示的文件前缀为events
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = events- #以时间间隔30s或者是数据量到达1000000条来监控收集数据,不以数据大小来监控数据
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 0 spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 1000000 spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30 #bind source and sink to channel spooling-hdfs-agent.sources.spooling-source.channels = spooling-memory-channel spooling-hdfs-agent.sinks.hdfs-sink.channel = spooling-memory-channel
- 4)启动Flume
[hadoop@spark000 config]$ flume-ng agent \ > --conf $FLUME_HOME/conf \ > --conf-file $FLUME_HOME/config/flume-spooling.conf \ > --name spooling-hdfs-agent \ > -Dflume.root.logger=INFO,console
- 5)启动成功
- 当看到Component type: SOURCE, name: spooling-source started 即成功
- 6)测试传数据
- 打开浏览器界面Browsing HDFS
[hadoop@spark000 spool_data]$ cp /home/hadoop/logs/access.log 01 [hadoop@spark000 spool_data]$ ls 01.COMPLETED
2021-11-05 10:05:12,807 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /home/hadoop/data/spool_data/01 to /home/hadoop/data/spool_data/01.COMPLETED 2021-11-05 10:05:42,579 (hdfs-hdfs-sink-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://spark000:8020/data/flume/spooling/events-.1636077911675.gz.tmp to hdfs://spark000:8020/data/flume/spooling/events-.1636077911675.gz
- 7)这种方式也是存在问题的:
- 程序有没有处理,或者处理到哪里都是没有记录的
- 这不过比前一种tail -F的方式好一点而已
- 这种方式在生产上,也要慎用。
- TailDirSource实战(*****一定要掌握)
- 1)TailDirSource是Flume收集文件tail及文件夹spoolingdir的结合体。
- 2)这种source不会丢失数据,因为在处理数据的过程中,会周期性的将每一个收集过来的文件数据的偏移量写入到一个JSON文件中。也就是说这批次的数据处理到哪里了,会将这个偏移量写进到JSON里,如果再进行下一个批次文件数据的处理,就从这个JSON里将上次的偏移量取出来。即使Flume挂掉了,现在继续将数据往Flume里灌入,启动Flume,也会从指定的偏移量继续向后获取到数据。这就是比较厉害的一点。
- 3)taildir-memory-logger.conf配置文件
- 这里做测试的主要目的是,看taildir是不是一个高可用的
[hadoop@spark000 spool_data]$ jps 6912 DataNode 7281 ResourceManager 8215 Application 9304 Jps 6745 NameNode 7101 SecondaryNameNode 7599 NodeManager [hadoop@spark000 spool_data]$ kill -9 8215 [hadoop@spark000 config]$ ls taildir-memory-logger.conf [hadoop@spark000 config]$ pwd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /home/hadoop/tmp/position/taildir_position.json a1.sources.r1.filegroups = f1 f2 #f1的路径
a1.sources.r1.filegroups.f1 = /home/hadoop/tmp/flume/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 #f2的路径 a1.sources.r1.filegroups.f2 = /home/hadoop/tmp/flume/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true # Describe the sink # 这里是输出到磁盘
a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 创建f1、f2路径及文件
[hadoop@spark000 config]$ mkdir -p /home/hadoop/tmp/flume/test1 [hadoop@spark000 config]$ cd /home/hadoop/tmp/flume/test1 [hadoop@spark000 test1]$ ls [hadoop@spark000 test1]$ touch example.log [hadoop@spark000 test1]$ cd .. [hadoop@spark000 flume]$ mkdir test2
- 启动Flume
- 当出现Component type: SOURCE, name: r1 started 即启动成功
- 目前是一个无数据的状态
[hadoop@spark000 config]$ flume-ng agent \ > --conf $FLUME_HOME/conf \ > --conf-file $FLUME_HOME/config/taildir-memory-logger.conf \ > --name a1 \ > -Dflume.root.logger=INFO,console
- 切入到test1目录下,并在example.log中放入数据
- 切入到test2目录下,并在1.log、2.log中放入数据
[hadoop@spark000 test1]$ echo >> example.log [hadoop@spark000 test1]$ echo aaa >> example.log [hadoop@spark000 test1]$ echo bbb >> example.log [hadoop@spark000 test1]$ cd .. [hadoop@spark000 flume]$ cd test2 [hadoop@spark000 test2]$ echo 111 >> 1.log [hadoop@spark000 test2]$ echo 222 >> 2.log [hadoop@spark000 test2]$ echo 333 >> 1.log
2021-11-05 14:22:15,835 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value1, file=/home/hadoop/tmp/flume/test1/example.log} body: 61 61 61 aaa } 2021-11-05 14:22:30,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value1, file=/home/hadoop/tmp/flume/test1/example.log} body: 62 62 62 bbb } 2021-11-05 14:24:28,156 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/hadoop/tmp/flume/test1/example.log, inode: 68630857, pos: 9 2021-11-05 14:36:54,493 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:290)] Opening file: /home/hadoop/tmp/flume/test2/1.log, inode: 35924199, pos: 0 2021-11-05 14:36:54,493 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/1.log} body: 31 31 31 111 } 2021-11-05 14:37:01,504 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:290)] Opening file: /home/hadoop/tmp/flume/test2/2.log, inode: 35924200, pos: 0 2021-11-05 14:37:03,499 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/2.log} body: 32 32 32 222 } 2021-11-05 14:38:05,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/1.log} body: 33 33 33 333 } 2021-11-05 14:39:03,738 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/hadoop/tmp/flume/test2/2.log, inode: 35924200, pos: 4
- 查看偏移量
[hadoop@spark000 test2]$ cat /home/hadoop/tmp/position/taildir_position.json [{"inode":68630857,"pos":9,"file":"/home/hadoop/tmp/flume/test1/example.log"},{"inode":35924199,"pos":8,"file":"/home/hadoop/tmp/flume/test2/1.log"},{"inode":35924200,"pos":4,"file":"/home/hadoop/tmp/flume/test2/2.log"}]
- 先停止Flume,但日志端还不停的往日志服务目录中sink数据。
- 若Flume重启之后,在上次挂掉,到这次重启成功之后,这一波儿数据会不会丢失,就是测试这个功能的。
- 重启Flume,看一下这一波儿的数据能否收到,收到了!
- 这是因为example.log,1.log,2.log的文件收集到哪里,偏移量就记录在哪里(记录在json文件中)
- json中的pos是记录flume挂掉之前的数据,当flume重新启动后,会从新的pos位置继续重新记录,会自然而然开始记录新的数据。生产上就不会丢失数据。
#先停止Flume,再添加数据 [hadoop@spark000 test2]$ echo jieqiong >> 1.log [hadoop@spark000 test2]$ echo jieqiong >> 2.log [hadoop@spark000 test1]$ echo jieiqong >> example.log #启动Flume,查看这三组数据是否加载成功,并查看json [hadoop@spark000 test1]$ cat /home/hadoop/tmp/position/taildir_position.json [{"inode":68630857,"pos":18,"file":"/home/hadoop/tmp/flume/test1/example.log"},{"inode":35924199,"pos":17,"file":"/home/hadoop/tmp/flume/test2/1.log"},{"inode":35924200,"pos":13,"file":"/home/hadoop/tmp/flume/test2/2.log"}][hadoop@spark000 test1]$
2021-11-05 15:09:37,684 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value1, file=/home/hadoop/tmp/flume/test1/example.log} body: 6A 69 65 69 71 6F 6E 67 jieiqong } 2021-11-05 15:09:37,684 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/1.log} body: 6A 69 65 71 69 6F 6E 67 jieqiong } 2021-11-05 15:09:37,684 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/2.log} body: 6A 69 65 71 69 6F 6E 67 jieqiong }
- Flume实战之拦截器的二次开发
- 1)这里拦截器要用多agent来实现
- 2)flume interceptors Flume 1.9.0 User Guide — Apache Flume
- 将数据打标,过滤。
- 要实现拦截器,必须要有一个接口,用于修改和删除数据。
- 3)需求分析
- Agent1
- source:netcat
- 将imooc.com的数据抓取出来,写到一个地方hdfs或者其他地方。
- 将gifshow.com的数据抓取出来,写到另外一个地方。
- 数据1:imooc.com
- 数据2:imooc.com
- 数据3:gifshow.com
- .....
- 这里的话,是一个source进来。然后channel1、channel2,分别至avro-sink1、avro-sink2。
- 目前我们采用一台机器模拟,所以在sink出来后,先采用不同的端口,例:hadoop000+44445,hadoop000+44446
- 接下来使用Agent2的avro-source1接44445数据,使用avro-source2接44446数据。
- Agent2接到数据后,输出到控制台。
- channel
- channel的选择器:flume channel selectors
- channel可以多个,使用multiplexing channel selector来标识,即一个source出来到多个channel
- sink
- 多Agent之间的通信,一般采用avro的sink,然后采用avro的source来接上一个sink,中间的hostname+port只要相等即可。
- source:netcat
- Agent1
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
- 4)每一条数据即为一个Event:imooc.com
- Event = header + body
- Interceptor 接口:
- Event: body
- 内容 contains("imooc.com")
- 打标 header: type = imooc
- 打标 header:type = other