【Logstash】Elastic Stack


一、Logstash 入门及运行机制

1-1 入门及架构简介

  • 架构简介

主要分为三个阶段

Input【数据采集】-> Filter【数据解析/转换】-> Output【数据输出】

  • Pipeline

input-filter-output 的三阶段处理流程

队列管理

插件生命周期管理

  • Logstash Event

内部流转的数据表现形式

原始数据在input被转换为Event,在output被转换为目标格式数据

在配置文件中可以对Event中的属性进行增删改查

数据输入的时候,Data在Input中被decode转换成 Logstash Event,在内部流转

数据输出,Logstash Event被 encode转换成Data

  • 一个简单的配置文件
#file codec.conf
input {
	stdin {
	#按每一行切割数据
		codec => line
	}
}
filter {}
output {
	stdout {
	#将每个logstash event 转换成json输出
	codec => json
	}
}

执行

bin/logstash -f codec.conf

也可以使用管道将一条数据输入做测试

echo "foo
bar"|bin/logstash -f codec.conf

Logstash架构简介

1-2 Life_of_an_Event

1-3 queue 简介

  • logstash有两种queue ,In Memory和 Persistent Queue In Disk

  • In Memory

无法处理进程Crash、机器宕机等情况,会导致数据丢失

  • Persistent Queue In Disk【建议使用】

可处理进程Crash 等情况,保证数据不丢失

保证数据至少消费一次

充当缓冲区,可以代替kafka等消息队列的作用

配置修改

#默认是memory
queue.type:persisted
#队列存储最大数据量,默认1gb
queue.max_bytes:8gb

持久队列的工作机制:数据先从Input进来,进入Persistent Queue ,Persistent Queue会将它在磁盘上备份一份,然后告诉input这条数据我已经收到了。

然后数据就会被发送到 filter/Output,当filter/Output 将这个事件处理完以后就会向PQ发送ACK,当PQ收到后就会在磁盘上删除备份,这条数据就被完整的处理完了,解决了一个数据容灾的问题。

1-4 线程简介

  • Input Thread 【Input -> Codec】

  • Pipeline Worker Thread 【Batcher -> Filter-> Output】

我们调优的时候主要调整线程数,Pipeline Worker Thread,相关配置如下

  • pipeline.workers | -w

pipeline 线程数,即filter_output的处理线程数,默认cpu核数,可以worker配置比cpu高,当线程闲置的时候也不会占用cpu的资源

  • pipeline.batch.size | -b

Batcher 一次批量获取的待处理文档数,默认125,可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整

  • Pipeline.batch.delay | -u

Batcher 等待的时长,单位为ms

1-5 配置简介

线上环境推荐采用配置文件的方式来设定logstash的相关配置,这样可以减少犯错的机会,而且文件便于进行版本化管理

命令行形式多用来进行快速的配置测试/验证/检查等

1-6 多实例运行

bin/logstash --path.setting instance1
bin/logstash --path.setting instance2
不同instance中修改logstash.yml ,自定义path.data ,确保其不相同即可

我一般执行logstash项目运行如下:

bin/logstash -f 配置文件目录/配置文件 --path.data=数据目录  >日志文件目录 2>&1 &

1-7 pipeline 配置简介

  • 主要有如下的数值类型:

  • 布尔类型 Boolen

    • isFailed => true
  • 数值类型 Number

    • port => 33
  • 字符串类型 String

    • name => “hello world”
  • 数组 Array/List

    • users => [{id =>1,name =>bob},{id =>2,name =>Jane}]
    • Path => [“/val/long/messages”,”/var/log/*.log”]
  • 哈希类型Hash

    match => {
    	"field1" => "value1"
    	"field2" => "value2"
    }
    
  • 注释

    • 井号

在配置中可以引用Logstash Event的属性(字段),主要有如下两种方式:

  • 直接引用字段值
  • 在字符串中以sprintf方式引用
{
"request":"/index.html"
"name":"张三"
"ua":{
	"os":"windows 7"
}
}

直接引用字段值Field Reference

if[request] = ~ "index"{}
if[ua][os] = ~ "windows"{}

字符串中引用字段值sprintf,使用%{}来实现

%{request}
%{[ua][os]}

pipeline配置语法

  • 支持条件判断语法,从而扩展了配置的多样性
if EXPRESSION{
...
}else if EXPRESSION {
...
}else{
...
}
  • 表达式主要包括如下的操作符:
比较:== 、!=、<、>、<=、>=
正则是否匹配:=~、!~
包含(字符串或数组):in、not in
布尔操作符:and、or、nand、xor、!
分组操作符:()

二、Logstash 插件详解

2-1 input插件详解及glob讲解

1、Input Plugin - stdin

  • 最简单的输入,从标准输入读取数据,通用配置为:
    • codec类型为codec
    • type类型为string,自定义该事件类型,可用于事件判断
    • tags类型为arry,自定义该事件的tag,可用于后续判断
    • Add_field类型为hash,为该事件添加字段

测试案例一:

执行

[elastic@node01 stu]$ echo "test" | ../bin/logstash -f input-stdin.conf 

2、Input Plugin - file

思考??:从文件读取数据,如常见的日志文件。文件读取通常要解决几个问题:

  • 文件内容如何只被读取一次?即重启LS时,从上次读取的位置继续
    • sincedb
  • 如何即时读取到文件内容的新内容?
    • 定时检查文件是否有更新
  • 如何发现新的文件并读取?
    • 可以,定时检查新文件
  • 如果文件发生了归档(rotation)操作,是否影响当前的内容读取?
    • 不影响,被归档的文件可以继续读取

配置

  • path 类型为数组,指明读取的文件路径,基于glob匹配语法
path => ["/var/log/**/*.log","/var/log/message"]
  • exclue 类型为数组排除不想监听的文件规则,基于glob匹配语法
exclude => "*.gz"
  • sincedb_path 类型为字符串,记录sincedb文件路径
  • start_postion 类型为字符串,begining or end 是否从头读取文件
  • stat_interval 类型为数值型,单位秒。定时检查文件是否有更新,默认1秒
  • discover_interval 类型为数值,单位秒,定时检查是否有新文件待读取,默认15s
  • ignore_older 类型为数值,单位秒,扫描文件列表时,如果该文件上传更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭
  • close_older 类型为数值,单位秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600即1h

匹配语法

  • 主要包含以下几种匹配符
* 匹配任意字符,但不匹配以 . 开头的隐藏文件,匹配这类文件时要使用 .*来匹配
** 递归匹配子目录
? 匹配单一字符
[] 匹配多个字符,比如[a-z]、[^a-z]
{} 匹配多个单词,比如{foo,bar,hello}
\转义符号

实际使用中的例子如下:

input {
		file {
			path => ["/var/log/access_log","/var/log/err_log"]
			type => "web"
			start_position => "begining"
		}
}

2-2 codec插件详解

  • Codec Plugin 作用于input和output plugin,负责将数据在原始与Logstash Event之间转换,常见的codec有

plain 读取原始内容

dots 将内容简化为点进行输出

rubydebyg 将 Logstash Events按照ruby格式输出,方便调试

line 处理带有换行符的内容

json 处理json格式的内容

multiline 处理多行数据的内容【读多行比如java报错信息】

测试

#按行读入数据,将数据都放入message中,','分割
bin/logstash -e "input{stdin{codec => line}} output{stdout{codec => rubydebug}}"
#按行读入数据,输入的是一个点,当我们做压测的时候会使用它
bin/logstash -e "input{stdin{codec => line}} output{stdout{codec => dots}}"
#读json数据,输出kv数据
bin/logstash -e "input{stdin{codec => json}} output{stdout{codec => rubydebug}}"

2-3 filter 插件简介及date插件讲解

  • Filter是Logstash功能强大的主要原因,他可以对Logstash Event进行丰富的处理,比如解析数据、删除字段/类型转换等等,常见的有如下几个:

date 日志解析

grok 正则匹配解析

dissect 分割符解析

mutate 对字段做处理,比如重命名、删除、替换等

json 按照json解析字段内容到指定字段中

geoip 增加地理位置数据

ruby 利用ruby代码来动态修改Logstash Event

1、date字段

  • 将日期字符串解析为日期类型,然后替换@timestamp字段或者指定的其它字段
filter {
	date {
	#{"logdate":"Jan 01 2018 12:02:08"}
		match => ["logdate","MMM dd yyyy HH:mm:ss"]
	}

}
  • match

类型为数组,用于指定日期匹配的格式,可以一次指定多种日期格式

match => ["logdate","MMM dd yyyy HH:mm:ss","MMM d yyyy HH:mm:ss","ISO8601"]
  • target

类型为字符串,用于指定赋值的字段名,默认是@timestamp

  • timezone

类型为字符串,用于指定时区

2-4 filter 插件值grok简介上

推荐grok解析方式:

kibana 的 Grok Debugger

或者使用grok在线解析工具

2-5 filter 插件值grok简介下

#特殊匹配
(%{GREEDYDATA:json1}|-)  匹配所有数据
%{QS:agent}   匹配"-"数据
# 匹配规则
LogFile="%{WORD:logfile}";Time="%{DATA:time}";SIP="%{IPV4:sip}";
#关键字
WORD   匹配单词
DATA     匹配任意数据
IPV4       匹配IP
NUMBER  匹配数字
BASE10NUM     匹配10进制的数字
URIPATHPARAM  匹配url
# 关键字实例
%{NUMBER:duration} — 匹配浮点数
%{IP:client} — 匹配IP
(?([\S+]*)),自定义正则
(?([\S+]*)), 自定义正则匹配多个字符
\s*或者\s+,代表多个空格
\S+或者\S*,代表多个字符
大括号里面:xxx,相当于起别名
%{UUID},匹配类似091ece39-5444-44a1-9f1e-019a17286b48
%{WORD}, 匹配请求的方式
%{GREEDYDATA},匹配所有剩余的数据
%{LOGLEVEL:loglevel} ---- 匹配日志级别
自定义类型
  • 也可以在logstash目录下
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/
grok-patterns这个文件可以查看匹配规则
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]

2-6 filter插件之dissect

  • 基于分割符原理解析数据,解决grok解析时消耗过多的cpu资源的问题

优势:dissect 解析比 grok效率高三倍左右

不足:使用局限,需要字段分割符格式相同

#需要解析的字段
hi - hello - 12
#如果不写名称,即%{},表名忽略该值
%{a} - %{b} - %{c}

解析结果如下

{
	"a":"hi",
	"b":"hello",
	"c":"12"
}

2-7 filter插件之mutate【最常用】

  • 使用最频繁的操作,可以对字段进行各种操作,比如重命名、删除、替换、更新

convert 类型转换

gsub 字符串替换

split/json/merge 字符串切割、数组合并为字符串、数据合并为数组

rename 字段重名名

update/replace 字段内容更新或替换

remove_field 删除字段

convert

  • 实现字段类型转换,类型为hash,仅支持转换为integer、float、string和boolean
filter {
	mutate {
		convert => {"age" => "integer"}
	}
}

gsub

  • 对字段内容进行替换,类型为数组,每,项为一个替换配置
filter {
	mutate {
		gsub => [
			#将path中的“/”替换为“_”
			"path","/","_",
			#将urlparams中的“\?#-”替换为“.”
			"urlparams","[\\?#-]","."
		]
	}
}

split

  • 将字符串切割为数组
filter {
	mutate {
		split => {"jobs" => ","}
	}
}

join

  • 将数组拼接为字符串
filter {
	mutate {
		join => {"params" => ","}
	}
}

merge

  • 将两个数组合并成1和数组,字符串会被转为1个元素的数组进行操作
filter {
	mutate {
		merge => {"dest_arr" => "source_arr"}
	}
}

remove

  • 删除字段
filter {
	mutate {
		remove_field => ["message"]
	}
}

2-8 filter 插件之json

  • 将字段内容为json格式的数据进行解析
filter {
	json {
		#要解析的字段名
		source => "message"
		#解析后的存储字段,默认和message同级别
		target => "msg_json"
	}
}

2-9 filter 插件之geoip和ruby

geoip

  • 常用的插件,根据ip地址提供的队员的地理信息,比如经纬度、城市名等,方便进行地理数据分析
filter {
	geoip{
		source => "ip"
	}
}

ruby

  • 最灵活的插件,可以以ruby语言来随心所欲的修改Logstash Event对象
filter {
	ruby {
		#记录description的字符串的长度
		code => 'size = event.get("description").size;
		event.set("descripition_size",size)'
	}
}

2-10 output 插件简介

1、stdout

  • 输出到标准输出,多用于调试
output {
	stdout {
		codec => rubydebug
	}
}

2、file

  • 输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有的web机器的web 日志收集到1个文件中,从而方便查阅信息
output {
  file {
  	path => "/var/log/web.log"
  	#默认输出json格式的数据,通过format可以输出原始格式
  	codec => line{dormat => "%{message}"}
  }
}

3、elasticsearch

  • 输出到elasticsearch ,是最常用的插件,基于http协议实现
output {
	elasticsearch {
		#默认端口是9200,我自己改成7920了
		hosts => ["node01:7920,node02:7920,node03:7920"]
		#索引可以按时间写入
		index => "nginx-%{+YYYY.MM.dd}"
		template => "./nginx_template.json"
		template_name => "nginx_template"
		template_overwrite => true
	}
}

附录一、生产Input

1、kafka

kafka Input 官网

插件下载地址

离线安装方式

#在logstash目录下执行
./bin/logstash-plugin install file:///opt/app/logstash/logstash-output-jdbc.zip
kafka {
	#消费kafka地址
	bootstrap_servers => ["node01:9092,node02:9092,node03:9092"]
	#消费位置策略 【earliest】从头消费 【latest】从当前位置开始消费
	auto_offset_reset => "latest"
	#可以自己指定,但是要满足 `从头消费` 需要改变group_id
	group_id => "kafka_2021-11-24"
	#消费线程数量,建议与kafka分区数量一致最佳
	consumer_threads => 3
	#kafka的两个优化条件,单位毫米 
	fetch_max_wait_ms => 1000
	fetch_min_bytes => 1000000
}

优化

fetch_max_wait_ms

  • Value type is number
  • Default value is 500 milliseconds

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch_min_bytes. This should be less than or equal to the timeout used in poll_timeout_ms

  • 数值型是数字
  • 默认值为500毫秒

最大间隔刷新时间,如果没有足够的数据立即满足 fetch_min_bytes,服务器在响应 fetch 请求之前将阻塞的最长时间。这应该小于或等于 poll_timeout_ms 中使用的超时

fetch_min_bytes

  • Value type is number

  • There is no default value for this setting

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.

  • 数值型是数字
  • 此设没有默认值

最小批发送数量服务器应为获取请求返回的最小数据量。如果没有足够的数据可用,则请求将在回答请求之前等待积累足够多的数据。

2、file

input{
	file {
		path => ["/opt/app/wsy/a.csv"]
		start_position => "beginning"
	}
}

在filter中需要把数据解析出来,可以转成json格式

张三|$|12|$|男
李四|$|13|$|女
王五|$|14|$|男
filter {
	csv {
		separator => "|$|"
		columns => ["name","age","sex"]
	}
	
	#其它业务逻辑
}

附录二、filter

1、@timestamp时区相差8小时

#自定义一个字段“timestamp”,并将logstash中的“@timestamp”时间戳赋给该值
ruby {
  code => "event.set('timestamp',event.get('@timestamp').time.localtime +8*60*60)"
}
#将自定义字段的值重新赋值给“@timestamp”
ruby {
  code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
  remove_field => ["timestamp"]
}

2、业务时间处理

test.ruby文件

def register(params)
  @message = params["message"]
end
def filter(event)
  require 'json'
  txt = event.get('message')
  begin
    #将每条event事件转换成json
    obj1 = JSON.parse(txt)
    #获取json中的嵌套json字段所有内容
    txt1 = obj1['properties']
    event_id = obj1['event']
    if event_id == 'APP' and event_id != nil
      time = obi1['time']
      #将业务时间戳转换成需要的时间格式
      parseDate = Time.at(time/1000,(time%1000)*1000)
      parseDateStr = parseDate.to_s()
      event_date = parseDateStr[0,10]
      event.set('event_date',event_date)
      tradedate = parseDateStr[0,10]
      event.set('tradedate',tradedate)
      event_time = parseDateStr[0,19]
      event.set('event_time',event_time)
      #将获取json中的嵌套json字段所有内容转成kv形式
      txt1.each do |k,v|
        event.set(k,v)
      end
      return [event]
    end
    return []
  end
end
      

附录三、生产Output

1、jdbc

jdbc {
	#jdbc 的jar目录
	driver_jar_path => "/opt/app/logstash/my_lib/mysql-connector-java-5.1.41.jar"
	driver_class => "com.mysql.jdbc.Driver"
	#配置jdbc的链接信息
	connection_string => "jdbc:mysql://node01:10072/数据库名?user=账户&password=密码"
	#插入表信息
	statement => ["INSERT INTO 表名 (send_data,send_time,app_name) VALUES(?,?,?)","send_data","send_time","app_name"]
	max_pool_size => 5
}