同步工具之Vector


官网: https://vector.dev/

用于构建可观察性管道的轻量级、超快速工具

 示例:

Toml数据格式



---

sources:
  kafka_app_events:
    type: "kafka"
    bootstrap_servers: "kafka1:9092,kafka2:9092,kafka3:9092"
    group_id: vector-sink-beta  
    topics:
      - login_test
      - button_click_test
    auto_offset_reset: earliest

transforms:
  remap_public_fields:
    type: remap
    drop_on_error: true
    inputs:
      - kafka_app_events
    source: |-
      msg = parse_json!(.message)
      msg.kafka_offset = .offset
      msg.kafka_partition = .partition
      msg.kafka_topic = .topic

      msg.app_id = to_int!(msg.app_id)
      msg.number_id = to_int!(msg.number_id)
      msg.player_id = to_string!(msg.player_id)
      msg.player_type = to_int!(msg.player_type)
      msg.platform = to_int!(msg.platform)
      msg.params = to_string!(msg.params)
      msg.client_version = to_string!(msg.client_version)
      msg.reg_channel = to_int!(msg.reg_channel) 
      msg.channel = to_int(msg.channel)??0
      msg.main_channel = msg.channel
      if msg.channel > 10000000 {
        msg.main_channel = to_int(msg.channel / 10000 ?? 0)
      }
      . = msg    
  
  route_events:
    type: "route"
    inputs:
      - remap_public_fields
    route:
      login: .kafka_topic == "login_test"
      button_click: .kafka_topic == "button_click_test"

  remap_button_click_test:
    type: remap
    drop_on_error: true
    inputs:
    - route_events.button_click
    source: |-
      .button_id = to_int!(.button_id)
  
  remap_login_test:
    type: remap
    drop_on_error: true
    inputs:
    - route_events.login
    source: |-
      .is_new = to_int!(.is_new)  
      .longitude = to_float!(.longitude)   
      .latitude = to_float!(.latitude)

sinks:
  clickhouse_button_click_test:
    type: clickhouse
    auth:
      user: vector_beta
      password: xxx
      strategy: basic   
    inputs:
    - remap_button_click_test
    compression: gzip
    database: events_beta
    endpoint: http://xxx.com:8123
    table: button_click_all
    encoding:
      only_fields:
      - kafka_partition
      - kafka_offset    

      - data_time
      - app_id
      - tags
      - player_id
      - number_id
      - player_type
      - params
      - platform
      - reg_channel
      - channel
      - main_channel
      - client_version
      - button_id
    healthcheck:
      enabled: true

  clickhouse_login_test:
    type: clickhouse
    auth:
      user: vector_beta
      password: xxx
      strategy: basic
    inputs:
    - remap_login_test
    compression: gzip
    database: events_beta
    endpoint: http://xxx.com:8123
    table: login_all
    encoding:
      only_fields:
      - kafka_partition
      - kafka_offset   

      - data_time
      - app_id
      - tags
      - player_id
      - number_id
      - player_type
      - params
      - platform
      - reg_channel
      - channel
      - main_channel      
      - client_version

      - is_new
      - ip
      - device_id
      - device_os
      - device_brand
      - device_model
      - ppi
      - longitude
      - latitude
    healthcheck:
      enabled: true      

参考文档: https://vector.dev/docs/