Logstash配置
?
?
input {
kafka {
zk_connect => "127.0.0.1:2181"
topic_id => "cluster"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
?
?
?
?
output {
if [type]=="cluster3" or [type]=="cluster2" or [type]=="clusterjson"
{
elasticsearch {
hosts => ["localhost:9200"]
index => "test-kafka-%{type}-%{+YYYY-MM}"
}
}
?
?
stdout { codec => rubydebug }
}
?
?
Server.properties 主要内容
broker.id=0
?
?
############################# Socket Server Settings #############################
?
?
listeners=PLAINTEXT://:9092
?
?
# The port the socket server listens on
port=9092
?
?
?
?
Server-1.properties 主要内容
?
?
broker.id=1
?
?
############################# Socket Server Settings #############################
?
?
listeners=PLAINTEXT://:9093
?
?
# The port the socket server listens on
port=9093
?
?
?
?
启动kafka,server.properties和server-1.properties
E:\soft\elk\kafka_2.11-0.9.0.1>bin\windows\kafka-server-start.bat config\custom\server-1.properties
?
?
创建topic,并查看状态
?
?
?
?
查找端口占用情况,并删除一个节点
?
?
?
?
查看 topic状态
?
?
?
?
?
?
生产者输入json数据,如下
?
?
{"Name":"BULK 7","Data":7,"CreateTime":"2016-04-05T10:16:22Z","type":"cluster3"}
?
?
Kibana 4显示如下
?
?
?
?
?
?
?
?
?
?
?
?
?
?