结合上次ELK实验,这次增加ZK+Kafka。用kafka的9200端口通信,filebeat作为kafka的生产者,logstash作为kafka的消费者
filebeat 主机配置:
[23:13:55 root@noise ~]#cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
log_type: "access"
fields_under_root: true
json.keys.under_root: true
json.overwrite_keys: true
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
fields:
log_type: "error"
fields_under_root: true
#output.elasticsearch:
# hosts: ["10.0.0.201:9200"]
#output.logstash:
# hosts: ["10.0.0.204:5044"]
# template.name: "filebeat"
output.kafka:
hosts: ["10.0.0.207:9092","10.0.0.208:9092","10.0.0.209:9092"]
topic: "nginx-kafka-log"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
logstash主机配置:
[23:14:31 root@noise ~]#cat /etc/logstash/conf.d/logstash.conf
input {
# beats {
# port => 5044
# }
kafka {
bootstrap_servers => "10.0.0.207:9092,10.0.0.208:9092,10.0.0.209:9092"
topics => ["nginx-kafka-log"]
codec => "json"
}
}
output {
if [log_type] == "access" {
elasticsearch {
hosts => ["http://10.0.0.201:9200"]
index => "kafka-nginx-access-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
if [log_type] == "error" {
elasticsearch {
hosts => ["http://10.0.0.201:9200"]
index => "kafka-nginx-error-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
}