Skywalking调用链数据接入其他平台方案(如logstash)


一般来说,做性能分析和AIOPS领域分析,要用到metrics、trace和log数据,因此需要考虑数据集成问题,本文描述一种将skywalking的trace数据接入logstash的方案。

接入前提:

  • skywalking的agent日志级别 至少开启到INFO级别
  • 已有部署skywalking Agent的应用产生调用
  • 已有基于logstash的日志平台(如ELK或arcana),同时在logstash上开启对应UDP收数实例

实现思路:

  将skywalking Agent log中记录的包含 “The content of data is: ” 的json消息,基于python做预处理,然后基于UDP发送至logstash平台。

原始skywalking agent日志段落:

ERROR 2022-01-06 15:57:48:350 UDPSender :  Failed to Sending data to collector: /127.0.0.1:10800, err: ICMP Port Unreachable 
INFO 2022-01-06 15:57:49:070 UDPSender :  The content of data is: {"spanID":"1775166584.39.16414558690680006.0","traceID":"1775166584.39.16414558690680007","operationName":"/","references":[{"refType":"CHILD_OF","spanID":"-1","traceID":"1775166584.39.16414558690680007"}],"startTime":1641455869068000,"duration":1000,"tags":[{"key":"url","value":"http://192.168.100.209:16080/","type":"string"},{"key":"http.method","value":"GET","type":"string"},{"key":"serviceType","value":"Tomcat","type":"string"},{"key":"layer","value":"HTTP","type":"string"}],"process":{"serviceName":"Sky_hello","tags":[{"key":"skywalking.version","value":"5.0","type":"string"},{"key":"hostname","value":"huangning","type":"string"},{"key":"ip","value":"192.168.100.209","type":"string"},{"key":"client-uuid","value":"1775166584","type":"string"}]},"logs":null} 
INFO 2022-01-06 15:59:00:973 UDPSender :  The content of data is: {"spanID":"1775166584.32.16414559409710006.0","traceID":"1775166584.32.16414559409710007","operationName":"/","references":[{"refType":"CHILD_OF","spanID":"-1","traceID":"1775166584.32.16414559409710007"}],"startTime":1641455940971000,"duration":2000,"tags":[{"key":"url","value":"http://192.168.100.209:16080/","type":"string"},{"key":"http.method","value":"GET","type":"string"},{"key":"serviceType","value":"Tomcat","type":"string"},{"key":"layer","value":"HTTP","type":"string"}],"process":{"serviceName":"Sky_hello","tags":[{"key":"skywalking.version","value":"5.0","type":"string"},{"key":"hostname","value":"huangning","type":"string"},{"key":"ip","value":"192.168.100.209","type":"string"},{"key":"client-uuid","value":"1775166584","type":"string"}]},"logs":null} 
ERROR 2022-01-06 15:59:00:974 UDPSender :  Failed to Sending data to collector: /127.0.0.1:10800, err: ICMP Port Unreachable 

python预处理代码段:

依赖:pyinotify,python3环境

pip3 install pyinotify

代码:

#!/usr/bin/env python
import pyinotify
import time
import os
import re
import socket

## 定义变量
key_word="The content of data is:"
pattern=r'[\{]'

s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

class ProcessTransientFile(pyinotify.ProcessEvent):
  def process_IN_MODIFY(self, event):
    line = file.readline()
    if key_word in line:
      #print(line)
      linedict = re.split(pattern, line, maxsplit=1)
      line = "{" + linedict[1]
      s.sendto(line.encode(),("192.168.100.209",10514))

     

filename = '/data/tmp/spring-demo-pkg/apache-tomcat-8.5.34-hello/agent/logs/skywalking-api.log'
file = open(filename,'r')

#找到文件的大小并移动到末尾
st_results = os.stat(filename)
st_size = st_results[6]
file.seek(st_size)

wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
wm.watch_transient_file(filename, pyinotify.IN_MODIFY, ProcessTransientFile)

notifier.loop()

结构化之后发送至平台的结果:

 注1:logstash一侧需要做解析引擎,主要是对于标准化json的解析,以及指定startTime作为event的_time。(arcana可以直接导入)

{"id":null,"createTime":1641458823302,"updateTime":1641458823302,"createBy":"admin","updateBy":"admin","name":"skywalking","dataType":"skywalking","description":"skywalking","extractType":"json","config":{"pattern":null,"delimiter":",","delimiterKv":":"},"schema":["spanID","traceID","operationName","references.refType","references.spanID","references.traceID","startTime","duration","tags.key","tags.value","tags.type","process.serviceName","process.tags.key","process.tags.value","process.tags.type","_raw"],"timeModel":"current","timeFormat":"","timeRegex":"","exampleData":"{\"spanID\":\"1775166584.35.16414580409060006.0\",\"traceID\":\"1775166584.35.16414580409060007\",\"operationName\":\"/\",\"references\":[{\"refType\":\"CHILD_OF\",\"spanID\":\"-1\",\"traceID\":\"1775166584.35.16414580409060007\"}],\"startTime\":1641458040906000,\"duration\":1000,\"tags\":[{\"key\":\"url\",\"value\":\"http://192.168.100.209:16080/\",\"type\":\"string\"},{\"key\":\"http.method\",\"value\":\"GET\",\"type\":\"string\"},{\"key\":\"serviceType\",\"value\":\"Tomcat\",\"type\":\"string\"},{\"key\":\"layer\",\"value\":\"HTTP\",\"type\":\"string\"}],\"process\":{\"serviceName\":\"Sky_hello\",\"tags\":[{\"key\":\"skywalking.version\",\"value\":\"5.0\",\"type\":\"string\"},{\"key\":\"hostname\",\"value\":\"huangning\",\"type\":\"string\"},{\"key\":\"ip\",\"value\":\"192.168.100.209\",\"type\":\"string\"},{\"key\":\"client-uuid\",\"value\":\"1775166584\",\"type\":\"string\"}]},\"logs\":null}","regexSchema":[],"condition":[],"calculation":[{"type":"eval","fieldsName":"_time","fieldsValue":"strftime(startTime/1000000,\"%Y-%m-%d %H:%M:%S\")"}],"regexOn":false}

注2:针对单台主机上存在多个instance的agent情况,修改python脚本中agent日志路径,新启动python实例即可,

建议启动时采用nohup

nohup python3 skywalking-trans.py &