ELK + Kafka


结合上次ELK实验,这次增加ZK+Kafka。用kafka的9200端口通信,filebeat作为kafka的生产者,logstash作为kafka的消费者

filebeat 主机配置:
[23:13:55 root@noise ~]#cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log 
  fields:
    log_type: "access"
  fields_under_root: true
  json.keys.under_root: true
  json.overwrite_keys: true
- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  fields:
    log_type: "error"
  fields_under_root: true


#output.elasticsearch:
#  hosts: ["10.0.0.201:9200"]

#output.logstash:
#  hosts: ["10.0.0.204:5044"]
#  template.name: "filebeat"

output.kafka:
  hosts: ["10.0.0.207:9092","10.0.0.208:9092","10.0.0.209:9092"]
  topic: "nginx-kafka-log"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000



logstash主机配置:
[23:14:31 root@noise ~]#cat /etc/logstash/conf.d/logstash.conf 
input {
#  beats {
#    port => 5044
#  }
   kafka {
     bootstrap_servers => "10.0.0.207:9092,10.0.0.208:9092,10.0.0.209:9092"
     topics => ["nginx-kafka-log"]
     codec => "json"
   }
}

output {
  if [log_type] == "access" {
    elasticsearch {
      hosts => ["http://10.0.0.201:9200"]
      index => "kafka-nginx-access-%{+YYYY.MM.dd}"
      #user => "elastic"
      #password => "changeme"
    }
  }
  if [log_type] == "error" {
    elasticsearch {
      hosts => ["http://10.0.0.201:9200"]
      index => "kafka-nginx-error-%{+YYYY.MM.dd}"
      #user => "elastic"
      #password => "changeme"
    }
  }
}