大数据Spark实时处理--数据收集1(Flume)


  • 基于Flume构建分布式日志收集
  • 0)Flume是一个日志数据的收集工具
  • 1)前提是数据采集,落在了log server的磁盘上,这一步已经完成。然后呢,我们需要通过什么样的技术/框架,将这些分散在log server上的一些日志,收集到统一的地方,比如说是HDFS上或者是kafka,最后来进行后续的操作。
  • 2)大数据离线/实时处理,比如将log server上的日志数据落在大数据平台上HDFS。要使用Hadoop的话,必须在log server的服务器上配置Hadoop相关get权限,才能使用hadoop fs -put命令。实际上给这些服务器统一配置Hadoop相关权限,首先考虑安全性,配置Hadoop的话数据会对外全部暴露,不安全;其次日志数据在HDFS上进行压缩,节省空间,而压缩需写代码进行压缩,太繁琐了。最后容错问题。若传输数据的put挂掉了,如何做容错。这些都是没有分布式日志收集框架存在的问题。
  • 3)目前存在将磁盘上的日志数据收集到大数据平台的方式:使用hdaoop的put、cp、然后定时调度crontab/azkaban,这种方式比较麻烦。而现在推荐的是HADOOP的Flume框架。
  • 4)离线数据收集存到HDFS,没问题。实时数据收集使用Flume,然存到HDFS或者kafka上,也是没问题的。数据存储ok后,再进行分布式数据的处理。
  • 5)大数据处理过程
  • 数据收集----->某个地方(HDFS)----->分布式数据的处理
  • 6)数据来源问题:业务数据:订单(一般是关系型数据库里)。爬虫数据。购买数据。用户产生的日志数据,采集过来的日志数据。针对业务数据和用户产生的日志数据收集的方法是不一样的。因为针对不同的数据库采用的方法不一样,针对实时和离线采用的方法不一样。
  • 7)采集VS收集
  • 采集偏向用户生成的数据。收集偏向于移动,A端移至B端,把存放在各个日志服务器上的数据收集到HDFS上。
  • Flume概述
  • 1)官网Welcome to Apache Flume — Apache Flume
  • 2)收集、聚合、移动海量日志数据
  • Flume ==》 HDFS ==》 离线处理
  • Flume ==》kafka ==》实时处理
  • 3)体系架构和核心三大组件
  • Flume的使用很简单,就是文件配置、查字典Documentation — Apache Flume
  • 三大核心组件--Source Flume 1.9.0 User Guide — Apache Flume
  • 从哪里收集数据?Source就可以从指定的源端,把数据收集过来。
  • 都有哪些Source:taildir(*****)、avro、exec、kafka、spooling
  • 三大核心组件--Channel
  • 是一个数据流通平台,可以看做数据存储池,将数据进行一个简单的汇总。
  • 有哪些数据存储:Memory(内存)、file(磁盘)、kafka
  • 三大核心组件--Sink
  • 读取Channel里的数据,写到目的地。
  • HDFS、Logger、kafka、HBase、Avro
  • Flume-NG版本中,有一个Agent概念
  • Agent是Flume中最小的独立运行单位,对应一个JVM
  • Agent是由Source、Channel、Sink组成
  • 业界数据收集框架对比
  • Apache Flume
  • Alibaba DataX
  • Apache Chukwa
  • Alibaba Canal
  • Elastic LogStash
  • Fluented
  • Flume环境部署及前置要求
  • 1)环境部署的系统要求:1.8的JRE、内存大小、磁盘空间、路径的读写权限。
  • 2)部署操作:
  • 将flume-ng的解压路径添加到系统环境变量里,并启动;
  • 更改配置文件flume-env.sh中的JAVA_HOME
  • 然后主要使用的是flume-ng
[hadoop@spark000 ~]$ cd software/
[hadoop@spark000 software]$ ls
flume-ng-1.6.0-cdh5.16.2.tar.gz
[hadoop@spark000 ~]$ cd app/
[hadoop@spark000 app]$ ls
apache-flume-1.6.0-cdh5.16.2-bin
[hadoop@spark000 software]$ cd
[hadoop@spark000 ~]$ cd app/
[hadoop@spark000 app]$ ls
apache-flume-1.6.0-cdh5.16.2-bin
[hadoop@spark000 app]$ cd apache-flume-1.6.0-cdh5.16.2-bin/
[hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ pwd
/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin

[hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ vi ~/.bash_profile
[hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ source ~/.bash_profile

export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin
export PATH=$FLUME_HOME/bin:$PATH


[hadoop@spark000 conf]$ cp flume-env.sh.template flume-env.sh
[hadoop@spark000 conf]$ vi flume-env.sh
# export JAVA_HOME=/usr/lib/jvm/java-6-sun 
export JAVA_HOME
=/home/hadoop/app/jdk1.8.0_202
  • 3)怎么用flume呢?
[hadoop@spark000 ~]$ cd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin
[hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ cd bin
[hadoop@spark000 bin]$ pwd
/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/bin
[hadoop@spark000 bin]$ ls
flume-ng
[hadoop@spark000 bin]$ ./flume-ng version
Flume 1.6.0-cdh5.16.2
  • Flume经典部署案例
  • 1)单节点Flume部署
  • 放置在配置文件中:
[hadoop@spark000 ~]$ cd /home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin
[hadoop@spark000 apache-flume-1.6.0-cdh5.16.2-bin]$ cd conf
[hadoop@spark000 conf]$ vi example.conf [hadoop@spark000 conf]$
# 作用:监听44444端口上的数据,输出到logger控制台
# example.conf: A single-node Flume configuration

#a1是agent的名字
#r1、k1、c1分别是agent三大核心组件的名字
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1 
# 配置source的type、bind、port
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置channel的type
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# 配置sink的type
# Describe the sink
a1.sinks.k1.type = logger

# source关联的是哪个channel, sink从哪个channel里取数据
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 2)启动flume
  • 命令行
flume-ng agent\
--conf 指向$FLUME_HOME下的conf 
--conf-file 指向自己开发的example.conf配置文件的路径$FLUME_HOME/config/example.conf
--name 指向agent的名字a1 
-Dflume.root.logger=INFO,console 指日志使用Info的级别在控制台可以看到
[hadoop@spark000 conf]$ flume-ng agent \
> --conf $FLUME_HOME/conf \
> --conf-file $FLUME_HOME/config/example.conf \
> --name a1 \
> -Dflume.root.logger=INFO,console
  • 3)完成了官方最基本的操作,监听指定的端口,将数据采过来
  • 在spark000界面运行上述命令行,再新打开一个spark000命令行界面,输入数据
  •  然后可以在第一个spark000界面,看到接收的数据
[hadoop@spark000 config]$ telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
jieqiong
OK
jiqiong^H
OK
1
OK
2
OK
3
OK
4
OK
  • 其中接收的信息
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6A 69 65 71 69 6F 6E 67 0D                      jieqiong. }
  • 4)进程Application就是agent的JVM,用完之后,就可以停掉了。
[hadoop@spark000 config]$ jps
8033 Jps
7804 Application
  • Event
  • 1)上述接收的信息,其中Event
  • 表层理解为:从Source端采集过来一条数据,为一个Event
  • Event是Flume数据传输的基本单元。
  • 数据从Source到Sink,这个过程Event就是作为单位进行处理的。
  • 以Event的形式将数据从源头传送到最终的目的地。
  • Event是由headers+body来构成的。
  • 目前headers是空的,默认情况下为空。headers很重要,在后面会用到。
  • body是由字节数组构成的(byte array)
  • Flume经典部署方案
  • 0)Flume 1.9.0 User Guide — Apache Flume
  • 1)在部署案例1中的方案在做测试学习的时候,单层agent的采集方案是没问题。
  • 如果Source、Channel、Sink任意一节点挂掉的话,整一个流程不能实现高可用高可靠的。
  • 2)多agent的串联
  • 两个agent的,第一个是foo,第二个是bar,这个是如何构建的呢?
  • 工作当中可能会有多个agent串联起来,形成一条Event数据线,然后进行传输数据
  • 注意:前面的agent和后面的agent通信的时候,左面agent输出采用avro方式,右面接收的时候,也要采用avro的方式。
  • 在不同的agent/机器上,要经过一个RPC的一个传输,这里是使用的AVRO的方式来进行交互的。
  • 即,前面的Sink和后面的Source务必要采用AVRO的方式

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channels = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
  • 3)多agent的串并联
  • 采集收集webserver上日志方案时,一般采用这种方式。
  • 每台机器上都要对应一个agent。每一层的每个机器上的数据通过avro source的方式收集到agent,收集到之后,再通过channel、sink。agent1、2、3是一样的。
  • 按照常理,之前我们是将日志落在webserver上的磁盘上,即一个文件。若采用单节点的agent方式,理论上是行的通的,但这里是有安全隐患的。HDFS的信息是直接暴露出来的,无安全性可言。若此时HDFS升级,升级的过程中,sink是写不过来的,无容错性。
  • 采用多agent的串并联,在agent4的source处再做一次聚合,即第二层的flume的agent。通过sink和source的关系,将数据聚合在第二层的agent里,再通过channel和sink到HDFS上。
  • 这样带来的好处:若前面有10000+台机器,同时访问HDFS的话,并发量、吞吐量太大。中间多做了一次聚合操作,安全性提高,直接访问HDFS的并发量减少。实际上,第二层的flume和HDFS是在一个机房的,第一层的flume和webserver是不在大数据的范围之内的。
  • 第一层的flume的agent是在一个配置文件中的。

 

  •  4)一个source出来到多个channel
  • 每个组合都是根据实际的业务场景配置的。
  • 从source进来数据123,三个channel里的数据是根据选择器决定三个channel里通过的数据是什么。
  • 若选择器是复制属性replicating,则三个channel里的数据相同。
  • 若是multiplexing,则三个channel里通过不同的数据。

  • Flume收集文件数据到HDFS需求分析
  • 0)具体参数设置:Flume 1.9.0 User Guide — Apache Flume
  • 1)若webserver实时产生数据,在/home/hadoop/logs中的access.log实时接收数据,如何将access.log收集到HDFS之上呢?
  • 2)文件数据收集到HDFS上,当拿到一个Flume这种agent配置的功能的时候,首先考虑Agent选型:
    • Source
    • 监听文件时,type值为exec,命令行是tail -f access.log
    • Channel
    • 对于数据量不大的时候,type值为memory;要确保数据不丢失type值为file
    • Sink
    • type值为hdfs,可以配置多长时间滚一次数据,或多少数据量滚一次数据
    • hdfs.path值为hdfs://namenode/flume/webdata/
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
  • Flume收集文件数据到HDFS
  • 1)写Flume配置文件
[hadoop@spark000 config]$ ls
flume-exec-hdfs.conf
[hadoop@spark000 config]$ pwd
/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config
[hadoop@spark000 config]$ vi flume-exec-hdfs.conf
#define agent
#agent 名字即 exec-hdfs-agent exec-hdfs-agent.sources = exec-source exec-hdfs-agent.channels = exec-memory-channel exec-hdfs-agent.sinks = hdfs-sink #define source exec-hdfs-agent.sources.exec-source.type = exec exec-hdfs-agent.sources.exec-source.command = tail -F ~/data/data.log exec-hdfs-agent.sources.exec-source.shell = /bin/sh -c #define channel exec-hdfs-agent.channels.exec-memory-channel.type = memory #define sink exec-hdfs-agent.sinks.hdfs-sink.type = hdfs exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://spark000:8020/data/flume/tail exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text exec-hdfs-agent.sinks.hdfs-sink.hdfs.batchSize = 10 #bind source and sink to channel exec-hdfs-agent.sources.exec-source.channels = exec-memory-channel exec-hdfs-agent.sinks.hdfs-sink.channel = exec-memory-channel
  • 2) 在data目录下创建data.log文件:touch data.log
  • 3)启动hdfs、flume配置文件
[hadoop@spark000 sbin]$ ./start-dfs.sh
[hadoop@spark000 sbin]$ ./start-yarn.sh
[hadoop@spark000 config]$ jps
7840 ResourceManager
10849 Jps
8170 NodeManager
7308 NameNode
7661 SecondaryNameNode
7470 DataNode
[hadoop@spark000 config]$ flume-ng agent \
> --conf $FLUME_HOME/conf \
> --conf-file $FLUME_HOME/config/flume-exec-hdfs.conf \
> --name exec-hdfs-agent \
> -Dflume.root.logger=INFO,console
  • 4)往日志里追加信息
[hadoop@spark000 data]$ pwd
/home/hadoop/data
[hadoop@spark000 data]$ echo jieqiong >> data.log
[hadoop@spark000 data]$ echo hello >> data.log
  • 5)在启动的界面里可以查看在hadoop上的地址
2021-11-04 14:15:41,591 (hdfs-hdfs-sink-call-runner-3) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://spark000:8020/data/flume/tail/FlumeData.1636006511532.tmp to hdfs://spark000:8020/data/flume/tail/FlumeData.1636006511532
2021-11-04 14:14:20,492 (hdfs-hdfs-sink-call-runner-8) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://spark000:8020/data/flume/tail/FlumeData.1636006430434.tmp to hdfs://spark000:8020/data/flume/tail/FlumeData.1636006430434
  • 6)查看在hadoop上的存储
[hadoop@spark000 data]$ hadoop fs -text /data/flume/tail/FlumeData.1636006511532
[hadoop@spark000 data]$ hadoop fs -text /data/flume/tail/FlumeData.1636006430434
  • 数据查到了,说明数据可以顺利写道HDFS上的,但这种方式并不好
  • 我们现在使用的是使用linux命令来添加数据的:tail -F ~/data/data.log
  • tail -F这种可靠性并不是很好的,程序容易挂掉。而且只能监控一个文件,不能监控一个文件夹。
  • 所以现在采用exec这种方式,来收集文件的数据到HDFS上的局限性很大。所以接下来调整一种方式。
  • Flume收集文件夹数据到HDFS
  • 1)具体参数设置:Flume 1.9.0 User Guide — Apache Flume
  • 2)文件夹数据收集到HDFS上,首先考虑Agent选型:
    • Source
    • 监听文件夹时,type值为spooldir
    • Channel
    • 对于数据量不大的时候,type值为memory;要确保数据不丢失type值为file
    • Sink
    • type值为hdfs,可以配置多长时间滚一次数据,或多少数据量滚一次数据
    • hdfs.path值为hdfs://namenode/flume/webdata/
  • 3)Flume配置文件
[hadoop@spark000 config]$ vi flume-spooling.conf
[hadoop@spark000 config]$ pwd
/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config
#define agent = spooling-hdfs-agent
spooling-hdfs-agent.sources = spooling-source
spooling-hdfs-agent.channels = spooling-memory-channel
spooling-hdfs-agent.sinks = hdfs-sink

#define source
spooling-hdfs-agent.sources.spooling-source.type = spooldir
spooling-hdfs-agent.sources.spooling-source.spoolDir = /home/hadoop/data/spool_data
#先不添加这句话,后续添加。只要是以.txt格式结尾的文件都不收集
#spooling-hdfs-agent.sources.spooling-source.ignorePattern = ^(.)*\\.txt$

#define channel 
spooling-hdfs-agent.channels.spooling-memory-channel.type = memory

#define sink 
#这里输出到文件系统 spooling-hdfs-agent.sinks.hdfs-sink.type = hdfs spooling-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://spark000:8020/data/flume/spooling #配置压缩的文件类型
spooling-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream #当使用了压缩的文件类型,一定要配置Codec
spooling
-hdfs-agent.sinks.hdfs-sink.hdfs.codeC=org.apache.hadoop.io.compress.GzipCodec #配置显示的文件前缀为events
spooling
-hdfs-agent.sinks.hdfs-sink.hdfs.filePrefix = events- #以时间间隔30s或者是数据量到达1000000条来监控收集数据,不以数据大小来监控数据
spooling
-hdfs-agent.sinks.hdfs-sink.hdfs.rollSize = 0 spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollCount = 1000000 spooling-hdfs-agent.sinks.hdfs-sink.hdfs.rollInterval = 30 #bind source and sink to channel spooling-hdfs-agent.sources.spooling-source.channels = spooling-memory-channel spooling-hdfs-agent.sinks.hdfs-sink.channel = spooling-memory-channel
  • 4)启动Flume
[hadoop@spark000 config]$ flume-ng agent \
> --conf $FLUME_HOME/conf \
> --conf-file $FLUME_HOME/config/flume-spooling.conf \
> --name spooling-hdfs-agent \
> -Dflume.root.logger=INFO,console
  • 5)启动成功
  • 当看到Component type: SOURCE, name: spooling-source started 即成功
  • 6)测试传数据
  • 打开浏览器界面Browsing HDFS
[hadoop@spark000 spool_data]$ cp /home/hadoop/logs/access.log 01
[hadoop@spark000 spool_data]$ ls
01.COMPLETED 
2021-11-05 10:05:12,807 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /home/hadoop/data/spool_data/01 to /home/hadoop/data/spool_data/01.COMPLETED
2021-11-05 10:05:42,579 (hdfs-hdfs-sink-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://spark000:8020/data/flume/spooling/events-.1636077911675.gz.tmp to hdfs://spark000:8020/data/flume/spooling/events-.1636077911675.gz
  • 7)这种方式也是存在问题的:
  • 程序有没有处理,或者处理到哪里都是没有记录的
  • 这不过比前一种tail -F的方式好一点而已
  • 这种方式在生产上,也要慎用。
  • TailDirSource实战(*****一定要掌握)
  • 1)TailDirSource是Flume收集文件tail及文件夹spoolingdir的结合体。
  • 2)这种source不会丢失数据,因为在处理数据的过程中,会周期性的将每一个收集过来的文件数据的偏移量写入到一个JSON文件中。也就是说这批次的数据处理到哪里了,会将这个偏移量写进到JSON里,如果再进行下一个批次文件数据的处理,就从这个JSON里将上次的偏移量取出来。即使Flume挂掉了,现在继续将数据往Flume里灌入,启动Flume,也会从指定的偏移量继续向后获取到数据。这就是比较厉害的一点。
  • 3)taildir-memory-logger.conf配置文件
  • 这里做测试的主要目的是,看taildir是不是一个高可用的
[hadoop@spark000 spool_data]$ jps
6912 DataNode
7281 ResourceManager
8215 Application
9304 Jps
6745 NameNode
7101 SecondaryNameNode
7599 NodeManager
[hadoop@spark000 spool_data]$ kill -9 8215
[hadoop@spark000 config]$ ls
taildir-memory-logger.conf
[hadoop@spark000 config]$ pwd
/home/hadoop/app/apache-flume-1.6.0-cdh5.16.2-bin/config
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/tmp/position/taildir_position.json
a1.sources.r1.filegroups = f1 f2
#f1的路径
a1.sources.r1.filegroups.f1
= /home/hadoop/tmp/flume/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 #f2的路径 a1.sources.r1.filegroups.f2 = /home/hadoop/tmp/flume/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true # Describe the sink # 这里是输出到磁盘
a1.sinks.k1.type
= logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 创建f1、f2路径及文件
[hadoop@spark000 config]$ mkdir -p /home/hadoop/tmp/flume/test1
[hadoop@spark000 config]$ cd /home/hadoop/tmp/flume/test1
[hadoop@spark000 test1]$ ls
[hadoop@spark000 test1]$ touch example.log
[hadoop@spark000 test1]$ cd ..
[hadoop@spark000 flume]$ mkdir test2
  • 启动Flume
  • 当出现Component type: SOURCE, name: r1 started 即启动成功
  • 目前是一个无数据的状态
[hadoop@spark000 config]$ flume-ng agent \
> --conf $FLUME_HOME/conf \
> --conf-file $FLUME_HOME/config/taildir-memory-logger.conf \
> --name a1 \
> -Dflume.root.logger=INFO,console
  • 切入到test1目录下,并在example.log中放入数据
  • 切入到test2目录下,并在1.log、2.log中放入数据
[hadoop@spark000 test1]$ echo >> example.log
[hadoop@spark000 test1]$ echo aaa >> example.log
[hadoop@spark000 test1]$ echo bbb >> example.log
[hadoop@spark000 test1]$ cd ..
[hadoop@spark000 flume]$ cd test2
[hadoop@spark000 test2]$ echo 111 >> 1.log
[hadoop@spark000 test2]$ echo 222 >> 2.log
[hadoop@spark000 test2]$ echo 333 >> 1.log
2021-11-05 14:22:15,835 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value1, file=/home/hadoop/tmp/flume/test1/example.log} body: 61 61 61      aaa }
2021-11-05 14:22:30,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value1, file=/home/hadoop/tmp/flume/test1/example.log} body: 62 62 62    bbb }
2021-11-05 14:24:28,156 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/hadoop/tmp/flume/test1/example.log, inode: 68630857, pos: 9
2021-11-05 14:36:54,493 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:290)] Opening file: /home/hadoop/tmp/flume/test2/1.log, inode: 35924199, pos: 0
2021-11-05 14:36:54,493 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/1.log} body: 31 31 31      111 }
2021-11-05 14:37:01,504 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:290)] Opening file: /home/hadoop/tmp/flume/test2/2.log, inode: 35924200, pos: 0
2021-11-05 14:37:03,499 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/2.log} body: 32 32 32      222 }
2021-11-05 14:38:05,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/1.log} body: 33 33 33      333 }
2021-11-05 14:39:03,738 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /home/hadoop/tmp/flume/test2/2.log, inode: 35924200, pos: 4
  • 查看偏移量
[hadoop@spark000 test2]$ cat /home/hadoop/tmp/position/taildir_position.json
[{"inode":68630857,"pos":9,"file":"/home/hadoop/tmp/flume/test1/example.log"},{"inode":35924199,"pos":8,"file":"/home/hadoop/tmp/flume/test2/1.log"},{"inode":35924200,"pos":4,"file":"/home/hadoop/tmp/flume/test2/2.log"}]
  • 先停止Flume,但日志端还不停的往日志服务目录中sink数据。
  • 若Flume重启之后,在上次挂掉,到这次重启成功之后,这一波儿数据会不会丢失,就是测试这个功能的。
  • 重启Flume,看一下这一波儿的数据能否收到,收到了!
  • 这是因为example.log,1.log,2.log的文件收集到哪里,偏移量就记录在哪里(记录在json文件中)
  • json中的pos是记录flume挂掉之前的数据,当flume重新启动后,会从新的pos位置继续重新记录,会自然而然开始记录新的数据。生产上就不会丢失数据。
#先停止Flume,再添加数据
[hadoop@spark000 test2]$ echo jieqiong >> 1.log
[hadoop@spark000 test2]$ echo jieqiong >> 2.log
[hadoop@spark000 test1]$ echo jieiqong >> example.log
#启动Flume,查看这三组数据是否加载成功,并查看json
[hadoop@spark000 test1]$ cat /home/hadoop/tmp/position/taildir_position.json
[{"inode":68630857,"pos":18,"file":"/home/hadoop/tmp/flume/test1/example.log"},{"inode":35924199,"pos":17,"file":"/home/hadoop/tmp/flume/test2/1.log"},{"inode":35924200,"pos":13,"file":"/home/hadoop/tmp/flume/test2/2.log"}][hadoop@spark000 test1]$ 
2021-11-05 15:09:37,684 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value1, file=/home/hadoop/tmp/flume/test1/example.log} body: 6A 69 65 69 71 6F 6E 67                         jieiqong }
2021-11-05 15:09:37,684 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/1.log} body: 6A 69 65 71 69 6F 6E 67                         jieqiong }
2021-11-05 15:09:37,684 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{headerKey1=value2, headerKey2=value2-2, file=/home/hadoop/tmp/flume/test2/2.log} body: 6A 69 65 71 69 6F 6E 67                         jieqiong }
  • Flume实战之拦截器的二次开发
  • 1)这里拦截器要用多agent来实现
  • 2)flume interceptors Flume 1.9.0 User Guide — Apache Flume
  • 将数据打标,过滤。
  • 要实现拦截器,必须要有一个接口,用于修改和删除数据。
  • 3)需求分析
    • Agent1
      • source:netcat
        • 将imooc.com的数据抓取出来,写到一个地方hdfs或者其他地方。
        • 将gifshow.com的数据抓取出来,写到另外一个地方。
        • 数据1:imooc.com
        • 数据2:imooc.com
        • 数据3:gifshow.com
        • .....
        • 这里的话,是一个source进来。然后channel1、channel2,分别至avro-sink1、avro-sink2。
        • 目前我们采用一台机器模拟,所以在sink出来后,先采用不同的端口,例:hadoop000+44445,hadoop000+44446
        • 接下来使用Agent2的avro-source1接44445数据,使用avro-source2接44446数据。
        • Agent2接到数据后,输出到控制台。
      • channel
        • channel的选择器:flume channel selectors
        • channel可以多个,使用multiplexing channel selector来标识,即一个source出来到多个channel
      • sink
        • 多Agent之间的通信,一般采用avro的sink,然后采用avro的source来接上一个sink,中间的hostname+port只要相等即可。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

     

  • 4)每一条数据即为一个Event:imooc.com
  • Event =  header + body
  • Interceptor 接口:
  • Event: body
  • 内容 contains("imooc.com")
  • 打标 header: type = imooc
  • 打标 header:type = other