前面有说过使用redis来缓解elk的数据接受压力,但是呢,如果redis面对突发情况也会承受不住的,这里需要借助两个工具,zookeeper和kafka
Zookeeper主要值借助分布式锁,保证事务的不变,原子性隔离性。。。
Kafka消息队列,从生产这到filebeta再到消费这logstash接受到es中,起到缓存,减缓压力
来吧开始往上怼了
首先下载zookeeper和卡夫卡
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz
这里需要注意我是单台服务器安装的,要添加hosts
这里zookeeper和卡夫卡的安装可以参考文档:
http://www.cnblogs.com/saneri/p/8822116.html
只需要zookeeper和kafka参考就行了,注意修改ip和hostname
完成后验证:
Zookeeper+Kafka集群测试
创建topic:
显示topic:
行了这个成功之后开始配置filebeat,这里换是收集dockerswarm集群的tomat和nginx容器的日志
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/docker-nginx/access_json.log
fields:
log_topics: 192.168.9.36-nginx
- type: log
enabled: true
paths:
- /var/log/docker-tomcat/catalina.out
fields:
log_topics: 192.168.9.36-tomcat
# include_lines: ['ERROR','WARN']
# exclude_lines: ['DEBUG']
output.kafka:
enabled: true
hosts: ["node1:9092"]
topic: '%{[fields][log_topics]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
接下来配置logstash
input {
kafka{
bootstrap_servers => "node1:9092"
topics => ["192.168.9.36-nginx","192.168.9.36-tomcat"]
codec => "json"
consumer_threads => 1
decorate_events => true
auto_offset_reset => "latest"
}
}
filter{
date{
match=>["logdate","MMM dd HH:mm:ss yyyy"]
target=>"@timestamp"
timezone=>"Asia/Shanghai"
}
ruby{
code =>"event.timestamp.time.localtime+8*60*60"
}
}
output {
if [fields][log_topics] == "192.168.9.36-nginx" {
elasticsearch {
hosts => ["http://192.168.9.142:9200"]
index => "192.168.9.36-nginx-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
if [fields][log_topics] == "192.168.9.36-tomcat" {
elasticsearch {
hosts => ["http://192.168.9.142:9200"]
index => "192.168.9.36-tomcat-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
}
完成后启动,然后测试一下
去查看一下
原文地址:http://blog.51cto.com/xiaorenwutest/2153575