3.Flink实时项目之流程分析及环境搭建
1. 流程分析
前面已经将日志数据(ods_base_log)及业务数据(ods_base_db_m)发送到kafka,作为ods层,接下来要做的就是通过flink消费kafka 的ods数据,进行简单的处理作为dwd层,然后再写回到kafka。
每层职能
分层 | 数据描述 | 计算工具 | 存储介质 |
---|---|---|---|
ODS | 原始数据,日志和业务 | 日志服务器,maxwell | kafka |
DWD | 根据数据对象为单位进行分流,比如订单、页面访问等等。 | flink | kafka |
DWM | 对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。 | flink | kafka |
DIM | 维度数据 | flink | hbase |
DWS | 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。 | flink | clickhouse |
ADS | 把 Clickhouse 中的数据根据可视化需要进行筛选聚合。 | clickhouse,sql | 可视化展示 |
目前进行到的阶段
2. 环境搭建
https://github.com/zhangbaohpu/gmall-flink-parent
环境
- jdk-1.8
- flink-1.12
- scala-2.12
- hadoop-2.7.7
在项目中新建maven模块gmall-realtime,没有父工程,pom文件如下
点击查看代码
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
com.zhangbao.gmall
gmall-realtime
1.0-SNAPSHOT
1.8
8
8
1.12.0
2.12
2.7.7
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.version}
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.version}
${flink.version}
org.apache.flink
flink-clients_${scala.version}
${flink.version}
org.apache.flink
flink-cep_${scala.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
com.alibaba
fastjson
1.2.68
org.apache.hadoop
hadoop-client
${hadoop.version}
org.slf4j
slf4j-api
1.7.25
org.slf4j
slf4j-log4j12
1.7.25
org.apache.logging.log4j
log4j-to-slf4j
2.14.0
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
项目结构
log4j.properties
点击查看代码
log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
接下来的准备:
我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。