【Logstash】Elastic Stack
一、Logstash 入门及运行机制
1-1 入门及架构简介
- 架构简介
主要分为三个阶段
Input【数据采集】-> Filter【数据解析/转换】-> Output【数据输出】
- Pipeline
input-filter-output 的三阶段处理流程
队列管理
插件生命周期管理
- Logstash Event
内部流转的数据表现形式
原始数据在input被转换为Event,在output被转换为目标格式数据
在配置文件中可以对Event中的属性进行增删改查
数据输入的时候,Data在Input中被decode转换成 Logstash Event,在内部流转
数据输出,Logstash Event被 encode转换成Data
- 一个简单的配置文件
#file codec.conf
input {
stdin {
#按每一行切割数据
codec => line
}
}
filter {}
output {
stdout {
#将每个logstash event 转换成json输出
codec => json
}
}
执行
bin/logstash -f codec.conf
也可以使用管道将一条数据输入做测试
echo "foo
bar"|bin/logstash -f codec.conf
Logstash架构简介
1-2 Life_of_an_Event
1-3 queue 简介
-
logstash有两种queue ,In Memory和 Persistent Queue In Disk
-
In Memory
无法处理进程Crash、机器宕机等情况,会导致数据丢失
- Persistent Queue In Disk【建议使用】
可处理进程Crash 等情况,保证数据不丢失
保证数据至少消费一次
充当缓冲区,可以代替kafka等消息队列的作用
配置修改
#默认是memory
queue.type:persisted
#队列存储最大数据量,默认1gb
queue.max_bytes:8gb
持久队列的工作机制:数据先从Input进来,进入Persistent Queue ,Persistent Queue会将它在磁盘上备份一份,然后告诉input这条数据我已经收到了。
然后数据就会被发送到 filter/Output,当filter/Output 将这个事件处理完以后就会向PQ发送ACK,当PQ收到后就会在磁盘上删除备份,这条数据就被完整的处理完了,解决了一个数据容灾的问题。
1-4 线程简介
-
Input Thread 【Input -> Codec】
-
Pipeline Worker Thread 【Batcher -> Filter-> Output】
我们调优的时候主要调整线程数,Pipeline Worker Thread,相关配置如下
- pipeline.workers | -w
pipeline 线程数,即filter_output的处理线程数,默认cpu核数,可以worker配置比cpu高,当线程闲置的时候也不会占用cpu的资源
- pipeline.batch.size | -b
Batcher 一次批量获取的待处理文档数,默认125,可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整
- Pipeline.batch.delay | -u
Batcher 等待的时长,单位为ms
1-5 配置简介
线上环境推荐采用配置文件的方式来设定logstash的相关配置,这样可以减少犯错的机会,而且文件便于进行版本化管理
命令行形式多用来进行快速的配置测试/验证/检查等
1-6 多实例运行
bin/logstash --path.setting instance1
bin/logstash --path.setting instance2
不同instance中修改logstash.yml ,自定义path.data ,确保其不相同即可
我一般执行logstash项目运行如下:
bin/logstash -f 配置文件目录/配置文件 --path.data=数据目录 >日志文件目录 2>&1 &
1-7 pipeline 配置简介
-
主要有如下的数值类型:
-
布尔类型 Boolen
- isFailed => true
-
数值类型 Number
- port => 33
-
字符串类型 String
- name => “hello world”
-
数组 Array/List
- users => [{id =>1,name =>bob},{id =>2,name =>Jane}]
- Path => [“/val/long/messages”,”/var/log/*.log”]
-
哈希类型Hash
match => { "field1" => "value1" "field2" => "value2" }
-
注释
- 井号
在配置中可以引用Logstash Event的属性(字段),主要有如下两种方式:
- 直接引用字段值
- 在字符串中以sprintf方式引用
{
"request":"/index.html"
"name":"张三"
"ua":{
"os":"windows 7"
}
}
直接引用字段值Field Reference
if[request] = ~ "index"{}
if[ua][os] = ~ "windows"{}
字符串中引用字段值sprintf,使用%{}来实现
%{request}
%{[ua][os]}
pipeline配置语法
- 支持条件判断语法,从而扩展了配置的多样性
if EXPRESSION{
...
}else if EXPRESSION {
...
}else{
...
}
- 表达式主要包括如下的操作符:
比较:== 、!=、<、>、<=、>=
正则是否匹配:=~、!~
包含(字符串或数组):in、not in
布尔操作符:and、or、nand、xor、!
分组操作符:()
二、Logstash 插件详解
2-1 input插件详解及glob讲解
1、Input Plugin - stdin
- 最简单的输入,从标准输入读取数据,通用配置为:
- codec类型为codec
- type类型为string,自定义该事件类型,可用于事件判断
- tags类型为arry,自定义该事件的tag,可用于后续判断
- Add_field类型为hash,为该事件添加字段
测试案例一:
执行
[elastic@node01 stu]$ echo "test" | ../bin/logstash -f input-stdin.conf
2、Input Plugin - file
思考??:从文件读取数据,如常见的日志文件。文件读取通常要解决几个问题:
- 文件内容如何只被读取一次?即重启LS时,从上次读取的位置继续
- sincedb
- 如何即时读取到文件内容的新内容?
- 定时检查文件是否有更新
- 如何发现新的文件并读取?
- 可以,定时检查新文件
- 如果文件发生了归档(rotation)操作,是否影响当前的内容读取?
- 不影响,被归档的文件可以继续读取
配置
- path 类型为数组,指明读取的文件路径,基于glob匹配语法
path => ["/var/log/**/*.log","/var/log/message"]
- exclue 类型为数组排除不想监听的文件规则,基于glob匹配语法
exclude => "*.gz"
- sincedb_path 类型为字符串,记录sincedb文件路径
- start_postion 类型为字符串,begining or end 是否从头读取文件
- stat_interval 类型为数值型,单位秒。定时检查文件是否有更新,默认1秒
- discover_interval 类型为数值,单位秒,定时检查是否有新文件待读取,默认15s
- ignore_older 类型为数值,单位秒,扫描文件列表时,如果该文件上传更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭
- close_older 类型为数值,单位秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600即1h
匹配语法
- 主要包含以下几种匹配符
* 匹配任意字符,但不匹配以 . 开头的隐藏文件,匹配这类文件时要使用 .*来匹配
** 递归匹配子目录
? 匹配单一字符
[] 匹配多个字符,比如[a-z]、[^a-z]
{} 匹配多个单词,比如{foo,bar,hello}
\转义符号
实际使用中的例子如下:
input {
file {
path => ["/var/log/access_log","/var/log/err_log"]
type => "web"
start_position => "begining"
}
}
2-2 codec插件详解
- Codec Plugin 作用于input和output plugin,负责将数据在原始与Logstash Event之间转换,常见的codec有
plain 读取原始内容
dots 将内容简化为点进行输出
rubydebyg 将 Logstash Events按照ruby格式输出,方便调试
line 处理带有换行符的内容
json 处理json格式的内容
multiline 处理多行数据的内容【读多行比如java报错信息】
测试
#按行读入数据,将数据都放入message中,','分割
bin/logstash -e "input{stdin{codec => line}} output{stdout{codec => rubydebug}}"
#按行读入数据,输入的是一个点,当我们做压测的时候会使用它
bin/logstash -e "input{stdin{codec => line}} output{stdout{codec => dots}}"
#读json数据,输出kv数据
bin/logstash -e "input{stdin{codec => json}} output{stdout{codec => rubydebug}}"
2-3 filter 插件简介及date插件讲解
- Filter是Logstash功能强大的主要原因,他可以对Logstash Event进行丰富的处理,比如解析数据、删除字段/类型转换等等,常见的有如下几个:
date 日志解析
grok 正则匹配解析
dissect 分割符解析
mutate 对字段做处理,比如重命名、删除、替换等
json 按照json解析字段内容到指定字段中
geoip 增加地理位置数据
ruby 利用ruby代码来动态修改Logstash Event
1、date字段
- 将日期字符串解析为日期类型,然后替换@timestamp字段或者指定的其它字段
filter {
date {
#{"logdate":"Jan 01 2018 12:02:08"}
match => ["logdate","MMM dd yyyy HH:mm:ss"]
}
}
- match
类型为数组,用于指定日期匹配的格式,可以一次指定多种日期格式
match => ["logdate","MMM dd yyyy HH:mm:ss","MMM d yyyy HH:mm:ss","ISO8601"]
- target
类型为字符串,用于指定赋值的字段名,默认是@timestamp
- timezone
类型为字符串,用于指定时区
2-4 filter 插件值grok简介上
推荐grok解析方式:
kibana 的 Grok Debugger
或者使用grok在线解析工具
2-5 filter 插件值grok简介下
#特殊匹配
(%{GREEDYDATA:json1}|-) 匹配所有数据
%{QS:agent} 匹配"-"数据
# 匹配规则
LogFile="%{WORD:logfile}";Time="%{DATA:time}";SIP="%{IPV4:sip}";
#关键字
WORD 匹配单词
DATA 匹配任意数据
IPV4 匹配IP
NUMBER 匹配数字
BASE10NUM 匹配10进制的数字
URIPATHPARAM 匹配url
# 关键字实例
%{NUMBER:duration} — 匹配浮点数
%{IP:client} — 匹配IP
(?([\S+]*)),自定义正则
(?([\S+]*)), 自定义正则匹配多个字符
\s*或者\s+,代表多个空格
\S+或者\S*,代表多个字符
大括号里面:xxx,相当于起别名
%{UUID},匹配类似091ece39-5444-44a1-9f1e-019a17286b48
%{WORD}, 匹配请求的方式
%{GREEDYDATA},匹配所有剩余的数据
%{LOGLEVEL:loglevel} ---- 匹配日志级别
自定义类型
- 也可以在logstash目录下
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/
grok-patterns这个文件可以查看匹配规则
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Shortcuts
QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]
2-6 filter插件之dissect
- 基于分割符原理解析数据,解决grok解析时消耗过多的cpu资源的问题
优势:dissect 解析比 grok效率高三倍左右
不足:使用局限,需要字段分割符格式相同
#需要解析的字段
hi - hello - 12
#如果不写名称,即%{},表名忽略该值
%{a} - %{b} - %{c}
解析结果如下
{
"a":"hi",
"b":"hello",
"c":"12"
}
2-7 filter插件之mutate【最常用】
- 使用最频繁的操作,可以对字段进行各种操作,比如重命名、删除、替换、更新
convert 类型转换
gsub 字符串替换
split/json/merge 字符串切割、数组合并为字符串、数据合并为数组
rename 字段重名名
update/replace 字段内容更新或替换
remove_field 删除字段
convert
- 实现字段类型转换,类型为hash,仅支持转换为integer、float、string和boolean
filter {
mutate {
convert => {"age" => "integer"}
}
}
gsub
- 对字段内容进行替换,类型为数组,每,项为一个替换配置
filter {
mutate {
gsub => [
#将path中的“/”替换为“_”
"path","/","_",
#将urlparams中的“\?#-”替换为“.”
"urlparams","[\\?#-]","."
]
}
}
split
- 将字符串切割为数组
filter {
mutate {
split => {"jobs" => ","}
}
}
join
- 将数组拼接为字符串
filter {
mutate {
join => {"params" => ","}
}
}
merge
- 将两个数组合并成1和数组,字符串会被转为1个元素的数组进行操作
filter {
mutate {
merge => {"dest_arr" => "source_arr"}
}
}
remove
- 删除字段
filter {
mutate {
remove_field => ["message"]
}
}
2-8 filter 插件之json
- 将字段内容为json格式的数据进行解析
filter {
json {
#要解析的字段名
source => "message"
#解析后的存储字段,默认和message同级别
target => "msg_json"
}
}
2-9 filter 插件之geoip和ruby
geoip
- 常用的插件,根据ip地址提供的队员的地理信息,比如经纬度、城市名等,方便进行地理数据分析
filter {
geoip{
source => "ip"
}
}
ruby
- 最灵活的插件,可以以ruby语言来随心所欲的修改Logstash Event对象
filter {
ruby {
#记录description的字符串的长度
code => 'size = event.get("description").size;
event.set("descripition_size",size)'
}
}
2-10 output 插件简介
1、stdout
- 输出到标准输出,多用于调试
output {
stdout {
codec => rubydebug
}
}
2、file
- 输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有的web机器的web 日志收集到1个文件中,从而方便查阅信息
output {
file {
path => "/var/log/web.log"
#默认输出json格式的数据,通过format可以输出原始格式
codec => line{dormat => "%{message}"}
}
}
3、elasticsearch
- 输出到elasticsearch ,是最常用的插件,基于http协议实现
output {
elasticsearch {
#默认端口是9200,我自己改成7920了
hosts => ["node01:7920,node02:7920,node03:7920"]
#索引可以按时间写入
index => "nginx-%{+YYYY.MM.dd}"
template => "./nginx_template.json"
template_name => "nginx_template"
template_overwrite => true
}
}
附录一、生产Input
1、kafka
kafka Input 官网
插件下载地址
离线安装方式
#在logstash目录下执行
./bin/logstash-plugin install file:///opt/app/logstash/logstash-output-jdbc.zip
kafka {
#消费kafka地址
bootstrap_servers => ["node01:9092,node02:9092,node03:9092"]
#消费位置策略 【earliest】从头消费 【latest】从当前位置开始消费
auto_offset_reset => "latest"
#可以自己指定,但是要满足 `从头消费` 需要改变group_id
group_id => "kafka_2021-11-24"
#消费线程数量,建议与kafka分区数量一致最佳
consumer_threads => 3
#kafka的两个优化条件,单位毫米
fetch_max_wait_ms => 1000
fetch_min_bytes => 1000000
}
优化
fetch_max_wait_ms
- Value type is number
- Default value is 500 milliseconds
The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy
fetch_min_bytes
. This should be less than or equal to the timeout used inpoll_timeout_ms
- 数值型是数字
- 默认值为500毫秒
最大间隔刷新时间
,如果没有足够的数据立即满足 fetch_min_bytes,服务器在响应 fetch 请求之前将阻塞的最长时间。这应该小于或等于 poll_timeout_ms 中使用的超时
fetch_min_bytes
Value type is number
There is no default value for this setting
The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
- 数值型是数字
- 此设没有默认值
最小批发送数量
服务器应为获取请求返回的最小数据量。如果没有足够的数据可用,则请求将在回答请求之前等待积累足够多的数据。
2、file
input{
file {
path => ["/opt/app/wsy/a.csv"]
start_position => "beginning"
}
}
在filter中需要把数据解析出来,可以转成json格式
张三|$|12|$|男
李四|$|13|$|女
王五|$|14|$|男
filter {
csv {
separator => "|$|"
columns => ["name","age","sex"]
}
#其它业务逻辑
}
附录二、filter
1、@timestamp时区相差8小时
#自定义一个字段“timestamp”,并将logstash中的“@timestamp”时间戳赋给该值
ruby {
code => "event.set('timestamp',event.get('@timestamp').time.localtime +8*60*60)"
}
#将自定义字段的值重新赋值给“@timestamp”
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
2、业务时间处理
test.ruby文件
def register(params)
@message = params["message"]
end
def filter(event)
require 'json'
txt = event.get('message')
begin
#将每条event事件转换成json
obj1 = JSON.parse(txt)
#获取json中的嵌套json字段所有内容
txt1 = obj1['properties']
event_id = obj1['event']
if event_id == 'APP' and event_id != nil
time = obi1['time']
#将业务时间戳转换成需要的时间格式
parseDate = Time.at(time/1000,(time%1000)*1000)
parseDateStr = parseDate.to_s()
event_date = parseDateStr[0,10]
event.set('event_date',event_date)
tradedate = parseDateStr[0,10]
event.set('tradedate',tradedate)
event_time = parseDateStr[0,19]
event.set('event_time',event_time)
#将获取json中的嵌套json字段所有内容转成kv形式
txt1.each do |k,v|
event.set(k,v)
end
return [event]
end
return []
end
end
附录三、生产Output
1、jdbc
jdbc {
#jdbc 的jar目录
driver_jar_path => "/opt/app/logstash/my_lib/mysql-connector-java-5.1.41.jar"
driver_class => "com.mysql.jdbc.Driver"
#配置jdbc的链接信息
connection_string => "jdbc:mysql://node01:10072/数据库名?user=账户&password=密码"
#插入表信息
statement => ["INSERT INTO 表名 (send_data,send_time,app_name) VALUES(?,?,?)","send_data","send_time","app_name"]
max_pool_size => 5
}