Morphlines 介绍
Morphlines
Cloudera Morphlines, a new command-based framework that simplifies data preparation for Apache Hadoop workloads
A “morphline” is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data, and loads the results into a Hadoop component.
A morphline is an in-memory container of transformation commands.
Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record.
A record is an in-memory data structure of name-value pairs with optional blob attachments or POJO attachments.
? 基于丰富配置的开源框架,能够定义一系列转换链,从任意数据源消费任意格式的数据,最终加载到Hadoop组件。它能够用简单的配置代替java程序开发,减少整合、开发、维护的工作量。
? Morphlines 是一个能够嵌入到任意Java代码的库。它是一个转换命令的内存容器。
处理模型
数据模型
? Morphlines 处理连续或任意大的数据流。一个 command 转换一个记录为0或多条记录。数据模型可以用以下描述:一个记录是命名字段的集合,这个集合中的每个字段有一个或多个值的有序列表。一个值可以是任意的java对象。记录的本质是一个 hash table,每一个hashtable 的 entry 包含了字符串的 key 和一个 Java 对象的列表作为值。
? 不单结构化数据,二进制数据同样也可以被 morphline 处理。记录可以包含一个可选字段 named _attachment_body,这个记录可以是java 的 java.io.InputStream 或者 Java byte[]。这种二进制的数据可以被设置的字段 named _attachment_mimetype,attachment_charset,attachment_name 来详细地表征。以此来帮助检测和解析数据类型。
使用示例
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#morphline-interceptor
morphlines : [
{
# Name used to identify a morphline. E.g. used if there are multiple
# morphlines in a morphline config file
id : morphline1
# Import all morphline commands in these java packages and their
# subpackages. Other commands that may be present on the classpath are
# not visible to this morphline.
# 用的kitesdk,所以应该写为:importCommands : ["org.kitesdk.**"],如果用的cdk的,则 importCommands : ["com.cloudera.**"],否则将无法导入commands。
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{
# Parse input attachment and emit a record for each input line
readLine {
charset : UTF-8
}
}
{
grok {
# Consume the output record of the previous command and pipe another
# record downstream.
#
# A grok-dictionary is a config file that contains prefabricated
# regular expressions that can be referred to by name. grok patterns
# specify such a regex name, plus an optional output field name.
# The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME}
# The input line is expected in the "message" input field.
dictionaryFiles : [src/test/resources/grok-dictionaries]
expressions : {
message : """%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:msg}"""
}
}
}
# Consume the output record of the previous command, convert
# the timestamp, and pipe another record downstream.
#
# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : timestamp
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "MMM d HH:mm:ss"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}
# Consume the output record of the previous command, transform it
# and pipe the record downstream.
#
# This command deletes record fields that are unknown to Solr
# schema.xml. Recall that Solr throws an exception on any attempt to
# load a document that contains a field that isn't specified in
# schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : {
collection : collection1 # Name of solr collection
zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble
}
}
}
# log the record at INFO level to SLF4J
{ logInfo { format : "output record: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : {
collection : collection1 # Name of solr collection
zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble
}
}
}
]
}
]
解释:这里使用了 4 个 command ① readLine ② grok ③ convertTimestamp ④ logInfo
- ① :使用UTF-8 编码读取数据,并放置到头(header)的 message 字段中
- ② :利用预定义的正则字典文件提取数据并放到对应的字段中,其中 dictionaryFiles 指定一个目录,从中加载字典文件,可以自定义所需要的正则字典放到其指定的目录中。还可以添加参数 dictionaryString 以此来添加字典。
- ③ :转换字段为 timestamp 的数据
- ④ :在日志中打印最后结果
输入:Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22.
输出:{hostname=[syslog], message=[Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22.], msg=[listening on 0.0.0.0 port 22.], pid=[607], program=[sshd], timestamp=[1970-02-04T18:46:14.000Z]}
flume配置
agent.sources = r1
agent.channels = c1
agent.sinks = s1
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /tmp/flume
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent.sources.r1.interceptors.i1.morphlineFile=/home/user1/morphline.conf
agent.sources.r1.interceptors.i1.morphlineId=morphline1
agent.sinks.s1.type =org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = tutorials
agent.sinks.s1.brokerList = host1:6667
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.sinks.s1.channel = c1
agent.sources.r1.channels = c1
配置文件语法
HOCON format (Human Optimized Config Object Notation)
HOCON github page
人类优化配置对象符号,基于JSON格式调整的结构。
注意
一个 morphline 拦截器不能对每个记录生成多于1条输出。
Commands
grok
? 从非结构化的日志数据中使用正则提取结构化的字段
字典
INT (?:[+-]?(?:[0-9]+))
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
? 字典可以从文件,类路径资源和配置文件中的一行字符串中加载
语法
%{REGEX_NAME:GROUP_NAME}
参数列表
Property Name | Default | Description |
---|---|---|
dictionaryFiles | [] | A list of zero or more local files or directory trees from which to load dictionaries. |
dictionaryResources | [] | A list of zero or more classpath resources (i.e. dictionary files on the classpath) from which to load dictionaries. Unlike "dictionaryFiles" it is not possible to specify directories. |
dictionaryString | null | An optional inline string from which to load a dictionary. |
extract | true | Can be "false", "true", or "inplace". Add the content of named capturing groups to the input record ("inplace"), to a copy of the input record ("true"), or to no record ("false"). |
numRequiredMatches | atLeastOnce | Indicates the minimum and maximum number of field values that must match a given grok expression for each input field name. Can be "atLeastOnce" (default), "once", or "all". |
findSubstrings | false | Indicates whether the grok expression must match the entire input field value or merely a substring within. |
addEmptyStrings | false | Indicates whether zero length strings stemming from empty (but matching) capturing groups shall be added to the output record. |
MorphlineSolrSink
? solr:一个高速高可用的开源企业级搜索平台,基于Apache Lucene 实现。
? 这个 sink 将 Flume 事件的 body 填充到 morphline 记录的 _attachment_body 字段中,同时将 Flume 事件的 headers 复制到同名的记录字段中。
? 适用于将原始数据流式传输到Hdfs,或者etl同样的数据到Solr。etl功能可以使用 morphline 配置文件自定义。
Flume + Morphlines
从 1.4.0 开始,flume 默认没有将 morphlines 所需的依赖打包进 lib 目录中,按照下面的步骤可以解决此问题。
对于大于1.4.0 的版本,下载 flume 源码,编辑 flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml,将 “cdk-morphlines-all” 依赖下的 “
org.apache.maven.plugins
maven-dependency-plugin
package
copy-dependencies
${project.build.directory}/lib
runtime
provided
执行 mvn -D"hadoop.profile=2" clean package -pl flume-ng-sinks/flume-ng-morphline-solr-sink。
这里根据不同版本,"hadoop.profile=2"可能会有所变化,编译 hadoop 2.x 版本的配置即可。
执行完之后找到 target 目录中的所有依赖包,拷贝到 flume lib 的目录中就可以使用 morphlines 的相关功能。