Flume-Kafka-Logstash-ElasticSearch-Kibana流程说明

首先说明,各工具的安装就不在这说明了,网上很多,可自行查看。

我们在这里用实例说明各个工具的配置以及最后展示的效果。

假如我们有一批tracklog日志需要用ELK实时展示出来:

一、收集日志,我们使用flume工具

在日志服务器端布置agent发往收集collect,配置如下:

agent(可多个)


agent.sources = s1

agent.channels = m1

agent.sinks = k1

agent.sources.s1.interceptors=i1

agent.sources.s1.interceptors.i1.type=org.apache.flume.interceptor.HostBodyInterceptor$Builder

# For each one of the sources, the type is defined

agent.sources.s1.type = com.source.tailDir.TailDirSourceNG

agent.sources.s1.monitorPath=D:\\trackloguc

agent.sources.s1.channels = m1

agent.sources.s1.fileEncode=gb2312

# Each sink‘s type must be defined

agent.sinks.k1.type = avro

agent.sinks.k1.hostname=10.130.2.249

agent.sinks.k1.port=26003

#agent.sinks.k1.type = logger

agent.sinks.k1.channel = m1

# Each channel‘s type is defined.

#agent.channels.m1.type = memory

#agent.channels.m1.capacity=100000

agent.channels.m1.type = file

agent.channels.m1.checkpointDir=..\\mobilecheck

agent.channels.m1.dataDirs=..\\mobiledata

agent.channels.m1.transactionCapacity=3000000

agent.channels.m1.capacity=10000000

collect


agent.sources = s1

agent.channels = m1 m2

agent.sinks = k1 k2

agent.source.s1.selector.type=replicating

# For each one of the sources, the type is defined

agent.sources.s1.type = avro

agent.sources.s1.bind=10.130.2.249

agent.sources.s1.port=26002

agent.sources.s1.channels = m1 m2

#放入Kafka

agent.sinks.k1.type = org.apache.flume.plugins.KafkaSink

agent.sinks.k1.metadata.broker.list=bdc53.hexun.com:9092,bdc54.hexun.com:9092,bdc46.hexun.com:9092

agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.request.required.acks=0

agent.sinks.k1.max.message.size=100

agent.sinks.k1.producer.type=sync

agent.sinks.k1.custom.encoding=UTF-8

agent.sinks.k1.custom.topic.name=TrackLogT

agent.sinks.k1.channel = m2

#channel采用file方式,因为日志太大

agent.channels.m1.type = file

agent.channels.m1.checkpointDir=/opt/modules/apache-flume-1.5.2-bin/tracklog-kafka/checkPoint

agent.channels.m1.dataDirs=/opt/modules/apache-flume-1.5.2-bin/tracklog-kafka/dataDir

agent.channels.m1.transactionCapacity = 1000000

agent.channels.m1.capacity=1000000

agent.channels.m1.checkpointInterval = 30000

二、数据入Kafka

上面collect中的topic需要再Kafka中预先建立,其他的步骤入Kafka已经在collect中配置完毕.

创建topic语句参考:


%{Kafka_HOME}/bin/kafka-topics.sh --create --zookeeper bdc41.hexun.com  --replication-factor 3 --partitions 3 --topic TrackLogT

查看topic数据语句参考:


%{Kafka_HOME}/bin/kafka-console-consumer.sh --zookeeper bdc46.hexun.com:2181,bdc40.hexun.com:2181,bdc41.hexun.com:2181  --topic TrackLogT

三、从Kafka到ElasticSearch

我们使用logstash工具取kafka的数据入ES,主要原因是logstash工具与es和kibana比较贴合。

假如我们现在想获取topic为TrackLogT的数据入ES,logstash的配置如下:


input{

kafka {

zk_connect => "bdc41.hexun.com:2181,bdc40.hexun.com:2181,bdc46.hexun.com:2181,bdc54.hexun.com:2181,bdc53.hexun.com:2181"

group_id => "logstash"

topic_id => "TrackLogT"

reset_beginning => false # boolean (optional)

consumer_threads => 5  # number (optional)

decorate_events => true

}

}

filter {

#multiline可以多行合一行,采用的行头正则匹配的方式。

multiline {

pattern => "^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}\s\d{4}-\d{1,2}-\d{1,2}\s\d{2}:\d{2}:\d{2}"

negate => true

what => "previous"

}

#下面是用空格分隔每一行

ruby {

init =>"@kname =[‘hostIP‘,‘dateday‘,‘datetime‘,‘ip‘,‘cookieid‘,‘userid‘,‘logserverip‘,‘referer‘,‘requesturl‘,‘remark1‘,‘remark2‘,‘alexaflag‘,‘ua‘,‘wirelessflag‘]"

code =>"event.append(Hash[@kname.zip(event[‘message‘].split(/ /))])"

remove_field => ["message"]

add_field=>{

"logsdate"=>"%{dateday}"

}

}

#下面是替换logsdate字段中的-为空

mutate{

gsub=>["logsdate","-",""]

# convert=>{"dateday"=>"integer"}

}

#对于logsdate的格式不合规范的数据drop

if [logsdate] !~ /\d{8}/ {

drop{}

}

#对外网ip进行解析,自动得出地理位置信息

geoip {

source => "ip"

# type => "linux-syslog"

add_tag => [ "geoip" ]

}

#对ua进行解析

useragent {

source => "ua"

# type => "linux-syslog"

add_tag => [ "useragent" ]

}

}

output{

#入es

elasticsearch{

hosts => [ "10.130.2.53:9200","10.130.2.46:9200","10.130.2.54:9200" ]

flush_size=>50000

workers => 5

index=> "logstash-tracklog"

}

}

需要注意:

1.上面之所以对logsdate进行替换,原因在于:譬如2016-01-01形式的字段,入ES的时候,会被认为是时间格式,被自动补全为:2016-01-01 08:00:00 ,导致kibana需要按天展示字段不对。

2.对于一些异常数据,譬如logsdate列应为时间数字,如20160101,如果出现一些字母汉字的异常数据,在kibana中显示就会有问题,所以对这些数据drop掉。

3.因为不同的业务数据有不同的格式,需要对数据在filter中处理 ,需要用到的相关插件、相关语法,建议多看看logstash官方文档.

四、数据在Kibana展示

下面的是使用实例,仅供参考:

1.首先进入kibana页面,点击菜单【Setting】-【Indices】,

@:可以填入名称的通配形式,这样可以监控多个索引(一般是按天分索引的数据)

点击Create即可。

2.点击菜单【Discover】,选择你刚刚建立的Setting映射,可以发现如下:

@然后点击右上角的保存,输入名称即可。

@这个是后面展示图所要用到的数据源,当然你也可以在这里搜索你的数据,注意字符串两边最好加双引号。

3.点击【Visualize】,进行各种图标制作。

可以选择制作哪种展示图,例如制作日统计量的柱状图,点击最后一个。

@order by的字段类型必须是date或int型的,这就是为什么前面导数据的时候要强调,数据类型的重要性。

4.最后点击【DashBoard】菜单,进行仪表盘的制作;可以把前面的discover和Visualize报存的数据和图集合在此仪表盘。

时间: 2024-08-29 04:20:01

Flume-Kafka-Logstash-ElasticSearch-Kibana流程说明的相关文章

Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产生协同作用,而且 logstash比filebeat功能更强大一点,2个都使用是因为:Beats 是一个轻量级的采集器,支持从边缘机器向 Logstash 和 Elasticsearch 发送数据.考虑到 Logstash 占用系 统资源较多,我们采用 Filebeat 来作为我们的日志采集器.并且

logstash+elasticsearch +kibana 日志管理系统

Logstash是一个完全开源的工具,他可以对你的日志进行收集.分析,并将其存储供以后使用(如,搜索),您可以使用它.说到搜索,logstash带有一个web界面,搜索和展示所有日志.kibana 也是一个开源和免费的工具,他可以帮助您汇总.分析和搜索重要数据日志并提供友好的web界面.他可以为 Logstash 和 ElasticSearch 提供的日志分析的 Web 界面. 目的就是为了运维.研发很方便的进行日志的查询.Kibana一个免费的web壳:Logstash集成各种收集日志插件,还

使用logstash+elasticsearch+kibana快速搭建日志平台

日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数据自动触发消息通知 基于日志的数据挖掘 很多团队在日志方面可能遇到的一些问题有: 开发人员不能登录线上服务器查看详细日志,经过运维周转费时费力 日志数据分散在多个系统,难以查找 日志数据量大,查询速度慢 一个调用会涉及多个系统,难以在这些系统的日志中快速定位数据 数据不够实时 常见的一些重量级的开源

【转载】使用logstash+elasticsearch+kibana快速搭建日志平台

原文链接:http://www.cnblogs.com/buzzlight/p/logstash_elasticsearch_kibana_log.html 日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数据自动触发消息通知 基于日志的数据挖掘 很多团队在日志方面可能遇到的一些问题有: 开发人员不能登录线上服务器查看详细日志,经过运维周转费时费力

logstash+elasticsearch+kibana+redis 实战

写此文章和就是为了记录logstash+elasticsearch+kibana+redis搭建过程.所有程序都是运行在windows 平台下. 1. 下载 1.1 logstash, elasticsearch, kinana 从官方站点下载: https://www.elastic.co/ 1.2 redis 官方的没有windows平台的.可以从github上下载windows平台版: https://github.com/MSOpenTech/redis/releases 2. 启动各部

Logstash+Elasticsearch+Kibana 联合使用搭建日志分析系统(Windows系统)

最近在做日志分析这块儿,要使用 Logstash+Elasticsearch+Kibana 实现日志的导入.过滤及可视化管理,官方文档写的不够详细,网上的文章大多要么是针对Linux系统的用法,要么就是抄袭别人的配置大都没法运行.费了很大劲才搞定了这仨东西,写一篇用法心得,废话不多说,进入主题. 首先,你的电脑上要装Java 的JDK环境,要使用  Logstash+Elasticsearch+Kibana,需要下载这三个软件和一些必要的插件,列表如下 : 1.Java JDK (最新版Logs

Logstash+Elasticsearch+Kibana+Nginx set up our private log query system

Logstash+Elasticsearch+Kibana+S3+Nginx build our private log query system System structure How to setup Logstash-index (Logstash server) yum -y install java-1.7.0-openjdk Install and configure Elasticsearch ( Logstash 1.4.2 recommends Elasticsearch 1

(原)logstash-forwarder + logstash + elasticsearch + kibana

[logstash-forwarder + logstash + elasticsearch + kibana]------------------------------------------------------------------------------------------------------------------------------------------------摘要:logstash-forwarder搜集日志,汇总给logstash,然后输出到elastic

安装logstash+elasticsearch+kibana

系统环境 # cat /etc/redhat-release CentOS release 6.4 (Final) # uname -a Linux localhost.localdomain 2.6.32-358.el6.x86_64 #1 SMP Fri Feb 22 00:31:26 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux 1.下载软件包 # curl -O https://download.elasticsearch.org/logstash/lo

安装logstash,elasticsearch,kibana三件套

原文地址:http://www.cnblogs.com/yjf512/p/4194012.html logstash,elasticsearch,kibana三件套 elk是指logstash,elasticsearch,kibana三件套,这三件套可以组成日志分析和监控工具 注意: 关于安装文档,网络上有很多,可以参考,不可以全信,而且三件套各自的版本很多,差别也不一样,需要版本匹配上才能使用.推荐直接使用官网的这一套:elkdownloads. 比如我这里下载的一套是logstash 1.4