同步工具之Vector
官网: https://vector.dev/
用于构建可观察性管道的轻量级、超快速工具
示例:
Toml数据格式 --- sources: kafka_app_events: type: "kafka" bootstrap_servers: "kafka1:9092,kafka2:9092,kafka3:9092" group_id: vector-sink-beta topics: - login_test - button_click_test auto_offset_reset: earliest transforms: remap_public_fields: type: remap drop_on_error: true inputs: - kafka_app_events source: |- msg = parse_json!(.message) msg.kafka_offset = .offset msg.kafka_partition = .partition msg.kafka_topic = .topic msg.app_id = to_int!(msg.app_id) msg.number_id = to_int!(msg.number_id) msg.player_id = to_string!(msg.player_id) msg.player_type = to_int!(msg.player_type) msg.platform = to_int!(msg.platform) msg.params = to_string!(msg.params) msg.client_version = to_string!(msg.client_version) msg.reg_channel = to_int!(msg.reg_channel) msg.channel = to_int(msg.channel)??0 msg.main_channel = msg.channel if msg.channel > 10000000 { msg.main_channel = to_int(msg.channel / 10000 ?? 0) } . = msg route_events: type: "route" inputs: - remap_public_fields route: login: .kafka_topic == "login_test" button_click: .kafka_topic == "button_click_test" remap_button_click_test: type: remap drop_on_error: true inputs: - route_events.button_click source: |- .button_id = to_int!(.button_id) remap_login_test: type: remap drop_on_error: true inputs: - route_events.login source: |- .is_new = to_int!(.is_new) .longitude = to_float!(.longitude) .latitude = to_float!(.latitude) sinks: clickhouse_button_click_test: type: clickhouse auth: user: vector_beta password: xxx strategy: basic inputs: - remap_button_click_test compression: gzip database: events_beta endpoint: http://xxx.com:8123 table: button_click_all encoding: only_fields: - kafka_partition - kafka_offset - data_time - app_id - tags - player_id - number_id - player_type - params - platform - reg_channel - channel - main_channel - client_version - button_id healthcheck: enabled: true clickhouse_login_test: type: clickhouse auth: user: vector_beta password: xxx strategy: basic inputs: - remap_login_test compression: gzip database: events_beta endpoint: http://xxx.com:8123 table: login_all encoding: only_fields: - kafka_partition - kafka_offset - data_time - app_id - tags - player_id - number_id - player_type - params - platform - reg_channel - channel - main_channel - client_version - is_new - ip - device_id - device_os - device_brand - device_model - ppi - longitude - latitude healthcheck: enabled: true
参考文档: https://vector.dev/docs/