记一次埋点日志采集方案


一、Flume采集

  Flume使用CDH自带的,需要调整Flume Java 堆栈的大小,具体的大小需要根据数据量情况调整,这里设置为4G 

  source:kafka source

  channel:允许存在数据丢失的情况下,可以使用memory channel,性能较高,但在机器宕机等情况下会存在数据丢失;生产上建议使用File Channel

  sink:hdfs

  • 完整的配置文件
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.kafka.bootstrap.servers = hadoop001:9092,hadoop002:9092,hadoop003:9092
    a1.sources.r1.kafka.topics = topic-event-data
    a1.sources.r1.kafka.consumer.group.id = flume_collection
    a1.sources.r1.kafka.consumer.auto.offset.reset = latest
    a1.sources.r1.batchSize = 10000   # 每次拉取10000条
    a1.sources.r1.batchDurationMillis = 2000 # 间隔为2000ms
    
    # 配置拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.shydow.bigdata.interceptor.CustomerInterceptor$Builder
    
    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 20000
    a1.channels.c1.transactionCapacity = 20000
    
    # 配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/topic-millstone-event-data/p_dymd=%{log_time}
    a1.sinks.k1.hdfs.filePrefix = event
    a1.sinks.k1.hdfs.rollCount=0
    a1.sinks.k1.hdfs.rollSize=1342177280
    a1.sinks.k1.hdfs.rollInterval=7200
    a1.sinks.k1.hdfs.minBlockReplicas = 1
    a1.sinks.k1.hdfs.threadsPoolSize = 10
    a1.sinks.k1.hdfs.hdfs.callTimeout = 20000
    a1.sinks.k1.hdfs.codeC = bzip2
    a1.sinks.k1.hdfs.fileType = CompressedStream
    
    # 配置关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  •  File Channel

    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/event/ckp/
    a1.channels.c1.dataDirs = /opt/module/event/data/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 20000000
    a1.channels.c1.keep-alive = 3
    a1.channels.c1.parseAsFlumeEvent = false
  • flume 拦截器
    package com.shydow.bigdata.interceptor;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.base.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author Shydow
     * @date 2021-04-14
    * @desc 解析日志里面的server_time添加到header中,用来hdfs路径命名
    */ public class CustomerInterceptor implements Interceptor { private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取event主体 String eventBody = new String(event.getBody(), Charsets.UTF_8); // 获取header Map headers = event.getHeaders(); try { JSONObject jsonObject = JSON.parseObject(eventBody); String server_time = jsonObject.getString("server_time"); headers.put("log_time", format.format(Long.parseLong(server_time))); event.setHeaders(headers); } catch (Exception e){ headers.put("log_time", "unknown"); event.setHeaders(headers); } return event; } @Override public List intercept(List list) { ArrayList out = new ArrayList<>(); for (Event event : list) { Event outEvent = intercept(event); if (null != outEvent){ out.add(outEvent); } } return out; } @Override public void close() {} public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomerInterceptor(); } @Override public void configure(Context context) { } } }

二、Logstash

  • 安装logstash
    tar -zxvf logstash-7.7.0.tar.gz

    # 配置jvm, 修改jvm大小:-Xms4g -Xms4g
    vim jvm.options
  • read kafka sink hdfs

    input {
        kafka {
            bootstrap_servers => "node:9092"
            topics => ["click"]
            group_id => "click"
            consumer_threads => 4
            auto_offset_reset => "latest"
        }
    }
    
    filter {
        ruby { 
            code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
        }
        ruby {
            code => "event.set('@timestamp',event.get('timestamp'))"
        }
    }
    
    output {
        webhdfs {
            host => "hadoop001"
            port => 9870
            user => "root"
            path => "/user/logstash/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log"
            compression => "snappy"
            codec => line {
                format => "%{message}"
            }
        }
    }
  • 启动

    # 检查配置文件是否正确
    ./bin/logstash -f ./scripts/marmot_collect.conf --config.test_and_exit
    
    # 后台启动
    nohup ./bin/logstash -f  ./scripts/marmot_collect.conf &

三、Flink StreamingFileSink / Flink SQL

 1)Stream Api:行编码格式

  • 代码
    package org.shydow;
    
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Shydow
     * @date 2022/1/15 13:44
     */
    
    public class StreamingFileLauncher {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    
            // 需要开启checkpoint设置
            env.enableCheckpointing(10 * 6000L);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000L));
            env.setStateBackend(new HashMapStateBackend());
            env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop001:8020/ckp");
    
            // kafka source
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "node:9092");
            props.setProperty("group.id", "click");
            FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("click", new SimpleStringSchema(), props);
            consumer.setStartFromGroupOffsets();
            DataStreamSource source = env.addSource(consumer);
    
            StreamingFileSink sink = StreamingFileSink
                    .forRowFormat(new Path("hdfs://hadoop001:8020/click/sink"), new SimpleStringEncoder("UTF-8"))
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    .withRolloverInterval(TimeUnit.MINUTES.toMillis(10))
                                    .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                    .withMaxPartSize(1024 * 1024 * 1024)
                                    .build()
                    )
                    .build();
            source.addSink(sink);
    
            env.execute();
        }
    }
  • Pom依赖

    <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
          <version>${flink.version}version>
    dependency>
    <dependency>
          <groupId>org.apache.hadoopgroupId>
          <artifactId>hadoop-clientartifactId>
          <version>2.7.5version>
    dependency>
    <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-shaded-hadoop-2-uberartifactId>
          <version>2.7.5-10.0version>
    dependency>
  • Flink On Yarn
    # 需要添加hadoop环境变量
    export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    
    # 启动
    ./bin/flink run -m yarn-cluster -ys 1 -yjm 1g -ytm 1g -yqu root.flink -c org.shydow.StreamingFileLauncher ./scripts/streaming-file-sink-1.0-SNAPSHOT.jar

 2)Flink Sql:Streamx上执行

  • kafka2file.sql
    -- 保留原始日志格式,format:row, 将整条记录当作是一个字符串
    create table click_log(
        `message` string,
        `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
        WATERMARK FOR `event_time` AS `event_time`
    ) with (
        'connector' = 'kafka',
        'topic' = 'click',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'marmot',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'raw'
    );
    
    create table t_file_sink(
        `log` string,
        `dt` string,
        `hour` string
    ) PARTITIONED BY (dt, `hour`) with (
        'connector'='filesystem',
        'path'='file:///kafkaSink', -- 这里写本地了,写hdfs需要:hdfs://hadoop001:8020/marmot/sink
        'format'='parquet',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.policy.kind'='success-file',
        'auto-compaction' = 'true',  -- 自动合并小文件,在每次checkpoint的时候进行合并
        'compaction.file-size' = '1024M'
    );
    
    insert into t_file_sink
    select
        `message`,
        DATE_FORMAT(`event_time`, 'yyyy-MM-dd'),
        DATE_FORMAT(`event_time`, 'HH')
    from click_log;