记一次埋点日志采集方案
一、Flume采集
Flume使用CDH自带的,需要调整Flume Java 堆栈的大小,具体的大小需要根据数据量情况调整,这里设置为4G
source:kafka source
channel:允许存在数据丢失的情况下,可以使用memory channel,性能较高,但在机器宕机等情况下会存在数据丢失;生产上建议使用File Channel
sink:hdfs
- 完整的配置文件
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092 a1.sources.r1.kafka.topics = topic-event-data a1.sources.r1.kafka.consumer.group.id = flume_collection a1.sources.r1.kafka.consumer.auto.offset.reset = latest a1.sources.r1.batchSize = 10000 # 每次拉取10000条 a1.sources.r1.batchDurationMillis = 2000 # 间隔为2000ms # 配置拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.shydow.bigdata.interceptor.CustomerInterceptor$Builder # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 20000 # 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/topic-millstone-event-data/p_dymd=%{log_time} a1.sinks.k1.hdfs.filePrefix = event a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.rollSize=1342177280 a1.sinks.k1.hdfs.rollInterval=7200 a1.sinks.k1.hdfs.minBlockReplicas = 1 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.hdfs.callTimeout = 20000 a1.sinks.k1.hdfs.codeC = bzip2 a1.sinks.k1.hdfs.fileType = CompressedStream # 配置关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
File Channel
a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/event/ckp/ a1.channels.c1.dataDirs = /opt/module/event/data/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 20000000 a1.channels.c1.keep-alive = 3 a1.channels.c1.parseAsFlumeEvent = false
- flume 拦截器
package com.shydow.bigdata.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @author Shydow * @date 2021-04-14
* @desc 解析日志里面的server_time添加到header中,用来hdfs路径命名 */ public class CustomerInterceptor implements Interceptor { private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取event主体 String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取header Mapheaders = event.getHeaders(); try { JSONObject jsonObject = JSON.parseObject(eventBody); String server_time = jsonObject.getString("server_time"); headers.put("log_time", format.format(Long.parseLong(server_time))); event.setHeaders(headers); } catch (Exception e){ headers.put("log_time", "unknown"); event.setHeaders(headers); } return event; } @Override public List intercept(List list) { ArrayList out = new ArrayList<>(); for (Event event : list) { Event outEvent = intercept(event); if (null != outEvent){ out.add(outEvent); } } return out; } @Override public void close() {} public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } } }
二、Logstash
- 安装logstash
tar -zxvf logstash-7.7.0.tar.gz
# 配置jvm, 修改jvm大小:-Xms4g -Xms4g
vim jvm.options -
read kafka sink hdfs
input { kafka { bootstrap_servers => "node:9092" topics => ["click"] group_id => "click" consumer_threads => 4 auto_offset_reset => "latest" } } filter { ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } } output { webhdfs { host => "hadoop001" port => 9870 user => "root" path => "/user/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log" compression => "snappy" codec => line { format => "%{message}" } } }
-
启动
# 检查配置文件是否正确 ./bin/logstash -f ./scripts/marmot_collect.conf --config.test_and_exit # 后台启动 nohup ./bin/logstash -f ./scripts/marmot_collect.conf &
三、Flink StreamingFileSink / Flink SQL
1)Stream Api:行编码格式
- 代码
package org.shydow; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @author Shydow * @date 2022/1/15 13:44 */ public class StreamingFileLauncher { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 需要开启checkpoint设置 env.enableCheckpointing(10 * 6000L); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000L)); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop001:8020/ckp"); // kafka source Properties props = new Properties(); props.setProperty("bootstrap.servers", "node:9092"); props.setProperty("group.id", "click"); FlinkKafkaConsumer
consumer = new FlinkKafkaConsumer<>("click", new SimpleStringSchema(), props); consumer.setStartFromGroupOffsets(); DataStreamSource source = env.addSource(consumer); StreamingFileSink sink = StreamingFileSink .forRowFormat(new Path("hdfs://hadoop001:8020/click/sink"), new SimpleStringEncoder ("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(10)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build() ) .build(); source.addSink(sink); env.execute(); } } -
Pom依赖
<dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-kafka_${scala.binary.version}artifactId> <version>${flink.version}version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-clientartifactId> <version>2.7.5version> dependency> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-shaded-hadoop-2-uberartifactId> <version>2.7.5-10.0version> dependency>
- Flink On Yarn
# 需要添加hadoop环境变量 export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop export HADOOP_CONF_DIR=/etc/hadoop/conf export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` # 启动 ./bin/flink run -m yarn-cluster -ys 1 -yjm 1g -ytm 1g -yqu root.flink -c org.shydow.StreamingFileLauncher ./scripts/streaming-file-sink-1.0-SNAPSHOT.jar
2)Flink Sql:Streamx上执行
- kafka2file.sql
-- 保留原始日志格式,format:row, 将整条记录当作是一个字符串 create table click_log( `message` string, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR `event_time` AS `event_time` ) with ( 'connector' = 'kafka', 'topic' = 'click', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'marmot', 'scan.startup.mode' = 'latest-offset', 'format' = 'raw' ); create table t_file_sink( `log` string, `dt` string, `hour` string ) PARTITIONED BY (dt, `hour`) with ( 'connector'='filesystem', 'path'='file:///kafkaSink', -- 这里写本地了,写hdfs需要:hdfs://hadoop001:8020/marmot/sink 'format'='parquet', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='success-file', 'auto-compaction' = 'true', -- 自动合并小文件,在每次checkpoint的时候进行合并 'compaction.file-size' = '1024M' ); insert into t_file_sink select `message`, DATE_FORMAT(`event_time`, 'yyyy-MM-dd'), DATE_FORMAT(`event_time`, 'HH') from click_log;