Skywalking调用链数据接入其他平台方案(如logstash)
一般来说,做性能分析和AIOPS领域分析,要用到metrics、trace和log数据,因此需要考虑数据集成问题,本文描述一种将skywalking的trace数据接入logstash的方案。
接入前提:
- skywalking的agent日志级别 至少开启到INFO级别
- 已有部署skywalking Agent的应用产生调用
- 已有基于logstash的日志平台(如ELK或arcana),同时在logstash上开启对应UDP收数实例
实现思路:
将skywalking Agent log中记录的包含 “The content of data is: ” 的json消息,基于python做预处理,然后基于UDP发送至logstash平台。
原始skywalking agent日志段落:
ERROR 2022-01-06 15:57:48:350 UDPSender : Failed to Sending data to collector: /127.0.0.1:10800, err: ICMP Port Unreachable INFO 2022-01-06 15:57:49:070 UDPSender : The content of data is: {"spanID":"1775166584.39.16414558690680006.0","traceID":"1775166584.39.16414558690680007","operationName":"/","references":[{"refType":"CHILD_OF","spanID":"-1","traceID":"1775166584.39.16414558690680007"}],"startTime":1641455869068000,"duration":1000,"tags":[{"key":"url","value":"http://192.168.100.209:16080/","type":"string"},{"key":"http.method","value":"GET","type":"string"},{"key":"serviceType","value":"Tomcat","type":"string"},{"key":"layer","value":"HTTP","type":"string"}],"process":{"serviceName":"Sky_hello","tags":[{"key":"skywalking.version","value":"5.0","type":"string"},{"key":"hostname","value":"huangning","type":"string"},{"key":"ip","value":"192.168.100.209","type":"string"},{"key":"client-uuid","value":"1775166584","type":"string"}]},"logs":null} INFO 2022-01-06 15:59:00:973 UDPSender : The content of data is: {"spanID":"1775166584.32.16414559409710006.0","traceID":"1775166584.32.16414559409710007","operationName":"/","references":[{"refType":"CHILD_OF","spanID":"-1","traceID":"1775166584.32.16414559409710007"}],"startTime":1641455940971000,"duration":2000,"tags":[{"key":"url","value":"http://192.168.100.209:16080/","type":"string"},{"key":"http.method","value":"GET","type":"string"},{"key":"serviceType","value":"Tomcat","type":"string"},{"key":"layer","value":"HTTP","type":"string"}],"process":{"serviceName":"Sky_hello","tags":[{"key":"skywalking.version","value":"5.0","type":"string"},{"key":"hostname","value":"huangning","type":"string"},{"key":"ip","value":"192.168.100.209","type":"string"},{"key":"client-uuid","value":"1775166584","type":"string"}]},"logs":null} ERROR 2022-01-06 15:59:00:974 UDPSender : Failed to Sending data to collector: /127.0.0.1:10800, err: ICMP Port Unreachable
python预处理代码段:
依赖:pyinotify,python3环境
pip3 install pyinotify
代码:
#!/usr/bin/env python import pyinotify import time import os import re import socket ## 定义变量 key_word="The content of data is:" pattern=r'[\{]' s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) class ProcessTransientFile(pyinotify.ProcessEvent): def process_IN_MODIFY(self, event): line = file.readline() if key_word in line: #print(line) linedict = re.split(pattern, line, maxsplit=1) line = "{" + linedict[1] s.sendto(line.encode(),("192.168.100.209",10514)) filename = '/data/tmp/spring-demo-pkg/apache-tomcat-8.5.34-hello/agent/logs/skywalking-api.log' file = open(filename,'r') #找到文件的大小并移动到末尾 st_results = os.stat(filename) st_size = st_results[6] file.seek(st_size) wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm) wm.watch_transient_file(filename, pyinotify.IN_MODIFY, ProcessTransientFile) notifier.loop()
结构化之后发送至平台的结果:
注1:logstash一侧需要做解析引擎,主要是对于标准化json的解析,以及指定startTime作为event的_time。(arcana可以直接导入)
{"id":null,"createTime":1641458823302,"updateTime":1641458823302,"createBy":"admin","updateBy":"admin","name":"skywalking","dataType":"skywalking","description":"skywalking","extractType":"json","config":{"pattern":null,"delimiter":",","delimiterKv":":"},"schema":["spanID","traceID","operationName","references.refType","references.spanID","references.traceID","startTime","duration","tags.key","tags.value","tags.type","process.serviceName","process.tags.key","process.tags.value","process.tags.type","_raw"],"timeModel":"current","timeFormat":"","timeRegex":"","exampleData":"{\"spanID\":\"1775166584.35.16414580409060006.0\",\"traceID\":\"1775166584.35.16414580409060007\",\"operationName\":\"/\",\"references\":[{\"refType\":\"CHILD_OF\",\"spanID\":\"-1\",\"traceID\":\"1775166584.35.16414580409060007\"}],\"startTime\":1641458040906000,\"duration\":1000,\"tags\":[{\"key\":\"url\",\"value\":\"http://192.168.100.209:16080/\",\"type\":\"string\"},{\"key\":\"http.method\",\"value\":\"GET\",\"type\":\"string\"},{\"key\":\"serviceType\",\"value\":\"Tomcat\",\"type\":\"string\"},{\"key\":\"layer\",\"value\":\"HTTP\",\"type\":\"string\"}],\"process\":{\"serviceName\":\"Sky_hello\",\"tags\":[{\"key\":\"skywalking.version\",\"value\":\"5.0\",\"type\":\"string\"},{\"key\":\"hostname\",\"value\":\"huangning\",\"type\":\"string\"},{\"key\":\"ip\",\"value\":\"192.168.100.209\",\"type\":\"string\"},{\"key\":\"client-uuid\",\"value\":\"1775166584\",\"type\":\"string\"}]},\"logs\":null}","regexSchema":[],"condition":[],"calculation":[{"type":"eval","fieldsName":"_time","fieldsValue":"strftime(startTime/1000000,\"%Y-%m-%d %H:%M:%S\")"}],"regexOn":false}
注2:针对单台主机上存在多个instance的agent情况,修改python脚本中agent日志路径,新启动python实例即可,
建议启动时采用nohup
nohup python3 skywalking-trans.py &