1 介绍组件
Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)。
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch 中的 Index 是一组具有相似特征的文档集合,类似于关系数据库模型中的数据库实例,Index 中可以指定 Type 区分不同的文档,类似于数据库实例中的关系表,Document 是存储的基本单位,都是 JSON 格式,类似于关系表中行级对象。我们处理后的 JSON 文档格式的日志都要在 Elasticsearch 中做索引,相应的 Logstash 有 Elasticsearch output 插件,对于用户是透明的。
Hadoop 生态圈为大规模数据集的处理提供多种分析功能,但实时搜索一直是 Hadoop 的软肋。如今,Elasticsearch for Apache Hadoop(ES-Hadoop)弥补了这一缺陷,为用户整合了 Hadoop 的大数据分析能力以及 Elasticsearch 的实时搜索能力。
Logstash 是一种功能强大的信息采集工具,类似于 Hadoop 生态圈里的 Flume。通常在其配置文件规定 Logstash 如何处理各种类型的事件流,一般包含 input、filter、output 三个部分。Logstash 为各个部分提供相应的插件,因而有 input、filter、output 三类插件完成各种处理和转换;另外 codec 类的插件可以放在 input 和 output 部分通过简单编码来简化处理过程。
Kibana是ElasticSearch的用户界面。
在实际应用场景下,为了满足大数据实时检索的场景,利用Filebeat去监控日志文件,将Kafka作为Filebeat的输出端,Kafka实时接收到Filebeat后以Logstash作为输出端输出,到Logstash的数据也许还不是我们想要的格式化或者特定业务的数据,这时可以通过Logstash的一些过了插件对数据进行过滤最后达到想要的数据格式以ElasticSearch作为输出端输出,数据到ElasticSearch就可以进行丰富的分布式检索了。
2 下载安装包
下载 elasticsearch、logstash、kibana、filebeat 的压缩包,并将四个压缩包上传到 /data/elk 目录下
官方文档:
Filebeat:
https://www.elastic.co/cn/products/beats/filebeat
Logstash:
https://www.elastic.co/cn/products/logstash
Kibana:
https://www.elastic.co/cn/products/kibana
Elasticsearch:
https://www.elastic.co/cn/products/elasticsearch
elasticsearch中文社区:
https://elasticsearch.cn/
3 安装ELK
3.1 安装jdk1.8
略
3.2 安装ElasticSearch
3.2.1 安装配置es
说明:ElasticSearch的运行不能用root执行,自己用useradd命令新建一个用户如下所示:
添加普通用户elk并设置密码
useradd elk
passwd elk 然后根据提示输入密码即可
修改文件所有者
chown -R elk:elk /data/elk
解压
tar -zxvf elasticsearch-6.7.0.tar.gz
修改 es 的配置文件
cd /data/elk/elasticsearch-6.7.0/config
vim elasticsearch.yml
修改方法参考如下:
cluster.name: my-applicationnode.name: node-1node.attr.rack: r1path.data: /data/elk/elasticsearch-6.7.0/datapath.logs: /data/elk/elasticsearch-6.7.0/logsnetwork.host: localhosthttp.port: 9200
3.2.2 启动
su elk
./bin/elasticsearch -d
启动时间有点慢,耐心等待10-20s或者更长,查看9200,9300端口是否开启
netstat -tnlp|grep 9[23]00
tcp6 0 0 localhost:9200 :::* LISTEN 30155/java tcp6 0 0 localhost:9300 :::* LISTEN 30155/java
访问地址:http://localhost:9200
停止服务:
ps -ef |grep elasticsearch kill PID
3.3 安装filebeat
3.3.1 安装配置
filebeat:部署在具体的业务机器上,通过定时监控的方式获取增量的日志,并转发到logstash、elasticsearch、kafka等等。
解压
tar –zxvf filebeat-6.7.0-linux-x86_64.tar.gz
cd filebeat-6.7.0-linux-x86_64
修改kibana 的配置文件
vim filebeat.yml
===================Filebeat prospectors ===========
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so# you can use different prospectors for various configurations.# Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /data/elk/logfile#- c:\programdata\elasticsearch\logs\*
#输出到logstash#-------------------- Logstash output --------------------------------output.logstash: # The Logstash hosts hosts: ["localhost:5044"]
3.3.2 启动
./filebeat -e -c filebeat.yml -d "publish"
3.4 安装logstash
3.4.1 安装配置
解压
tar -zxvf logstash-6.7.0.tar.gz
cd logstash-6.7.0
配置
在logstash文件夹的下bin目录创建配置文件logstash.conf ,内容如下:
input:对应输入的配置,其中path是监控的文**件路劲, codec编码个格式
output:elasticsearch : { hosts => "localhost:9200" }
对应输出到elasticsearch,hosts是elasticsearch安装地址
input { beats { host => "localhost" port => 5044 codec => json }}
output {
stdout { codec => rubydebug }
elasticsearch { hosts => "localhost:9200" index="test_index" }}
3.4.2 启动
测试你的配置文件 是否正确( 解析配置文件并报告任何错误。)
./logstash -f logstash.conf --config.test_and_exit
Sending Logstash logs to /data/elk/logstash-6.7.0/logs which is now configured via log4j2.properties[2019-03-28T17:55:34,114][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/data/elk/logstash-6.7.0/data/queue"}[2019-03-28T17:55:34,169][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/data/elk/logstash-6.7.0/data/dead_letter_queue"}[2019-03-28T17:55:34,856][WARN ][logstash.config.source.multilocal] Ignoring the ‘pipelines.yml‘ file because modules or command line options are specifiedConfiguration OK[2019-03-28T17:55:43,327][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
启动命令(启用自动配置重新加载,这样就不必每次修改配置文件时都停止并重新启动Logstash )
./logstash -f logstash.conf --config.reload.automatic
停止服务
ps aux | grep logstash
kill -9 pid
3.5 安装kibana
3.5.1 安装配置
解压
tar –zxvf kibana-6.7.0-linux-x86_64.tar.gz
cd kibana-6.7.0-linux-x86_64/config
配置
vim kibana.yml
server.port: 5601server.host: "localhost"
#elasticsearch.username: "elastic"#elasticsearch.password: "changeme"
elasticsearch.hosts: ["http://localhost:9200"]kibana.index: ".kibana"
3.5.2 启动
./bin/kibana
访问地址:
http://localhost:5601
netstat -anltp|grep 5601查找对应端口
kill -9 pid
4 测试
4.1 监测指定文件,Kibana展示
4.1.1 Logstash监控文件
我们通过echo往这文件/data/elk/test.log追加内容,来测试整个日志收集系统是否可行
vim test2.conf
input { file { type =>"syslog" path => ["/data/elk/test.log" ] } syslog { type =>"syslog" port =>"5544" }}output { stdout { codec=> rubydebug } elasticsearch {hosts => "http:// localhost:9200"}}
启动
./logstash -f test2.conf --config.reload.automatic
向监控文件中写入数据
echo "{ "firstName": "1", "lastName":"McLaughlin", "email": "aaaa" }" >> test.log
Logstash打印接收到的数据:
{ "message" => "{ firstName: 1, lastName:McLaughlin, email: aaaa }", "@timestamp" => 2019-03-29T02:39:35.229Z, "@version" => "1", "host" => "node223", "path" => "/data/elk/test.log"}
4.1.2 es 查看接收数据
查看es所有index
curl -X GET ‘http://localhost:9200/_cat/indices?v‘
health status index uuid pri rep docs.count docs.deleted store.size pri.store.sizegreen open .kibana_task_manager ZYN7zZl3QLuB1d-EE_ysHA 1 0 2 0 12.5kb 12.5kbyellow open logstash-2019.03.29 rfkkxW7rQDKnl40_Ekaf7g 5 1 2 0 10.8kb 10.8kbyellow open test-index hFBQF-jZSkqajPRYzegfwg 5 1 0 0 1.2kb 1.2kbgreen open .kibana_1 sEUUnVMxQ0amHbQGcOedOQ 1 0 4 1 19.8kb 19.8kb
查看单个index数据
curl ‘localhost:9200/logstash-2019.03.29/_search?pretty=true‘
{ "took" : 4, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "logstash-2019.03.29", "_type" : "doc", "_id" : "W85Rx2kBAxlZ_OvPIlZf", "_score" : 1.0, "_source" : { "message" : "{ firstName: 1, lastName:McLaughlin, email: aaaa }", "@timestamp" : "2019-03-29T02:39:35.229Z", "@version" : "1", "host" : "node223", "path" : "/data/elk/test.log" } } ] }}
4.1.3 用ES连接到Kibana
在你开始用Kibana之前,你需要告诉Kibana你想探索哪个Elasticsearch索引。第一次访问Kibana是,系统会提示你定义一个索引模式以匹配一个或多个索引的名字。
1、访问Kibana UI。例如,localhost:56011 或者 http://YOURDOMAIN.com:5601
2、指定一个索引模式来匹配一个或多个你的Elasticsearch索引。当你指定了你的索引模式以后,任何匹配到的索引都将被展示出来。
(画外音:*匹配0个或多个字符; 指定索引默认是为了匹配索引,确切的说是匹配索引名字)
3、点击“Next Step”以选择你想要用来执行基于时间比较的包含timestamp字段的索引。如果你的索引没有基于时间的数据,那么选择“I don’t want to use the Time Filter”选项。
4、点击“Create index pattern”按钮来添加索引模式。第一个索引模式自动配置为默认的索引默认,以后当你有多个索引模式的时候,你就可以选择将哪一个设为默认。(提示:Management > Index Patterns)
现在,Kibana已经连接到你的Elasticsearch数据。Kibana展示了一个只读的字段列表,这些字段是匹配到的这个索引配置的字段。
4.1.4 Kibana展示
http://localhost:5601
4.2 监控Spring Boot服务日志
要求日志格式为json格式
4.2.1 json日志格式配置
在Spring Boot工程resources目录下配置logback.xml文件,旨在将log4j日志按json格式输出到指定目录,logback.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?><configuration debug="false"> <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 <property name="LOG_HOME" value="E:/demo/home" /> -->
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 --> <!--<property name="LOG_PATTERN" value="[ %d{yyyy-MM-dd HH:mm:ss.SSS}] [UUID:%X{requestId}] [%level]|[${HOSTNAME}] [%thread]|[%logger{36}] | %msg|%n " />--> <property name="TIME_PATTERN" value="yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘" /> <property name="LOG_PATTERN" value=‘{"event":"log","timestamp":"%d{${TIME_PATTERN}}","level":"%level","serviceName":"${springAppName:-}","pid": "${PID:-}","host":"${HOSTNAME}","class": "%logger{40}","thread":"%thread","message":"%msg"}%n ‘ />
<!-- Console 输出设置 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${LOG_PATTERN}</pattern> <charset>utf8</charset> </encoder> </appender>
<!-- 按照每天生成日志文件 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>logs/newsinfo-dataservice.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <FileNamePattern>logs/newsinfo-dataservice.%d{yyyy-MM-dd}.log</FileNamePattern> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${LOG_PATTERN}</pattern> </encoder> </appender>
<!--log4jdbc --> <logger name="com.sohu" level="debug" additivity="false"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> </logger>
<!-- 日志输出级别 --> <root level="info"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> </root></configuration>
输出日志示例如下:
{"event":"log","timestamp":"2019-04-09T19:47:07.908Z","level":"INFO","serviceName":"springAppName_IS_UNDEFINED","pid": "7196","host":"node1","class": "o.s.b.web.servlet.FilterRegistrationBean","thread":"localhost-startStop-1","message":"Mapping filter: ‘MyFilter‘ to urls: [/getUser, /hello]"}
4.2.2 DomeOS启动Spring Boot服务
启动服务的同事,配置日志收集到kafka,然后调用ELK读取kafka日志存入ES,进行日志的全文检索和实时告警分析。
创建新版本时,如上配置下日志收集模块,日志就自动通过Flume接入Kafka了。
路径:/code/logs/newsinfo-dataservice.log自动收集 :自动将日志收集到 KafkaKafka Topic:newsinfo_appdata_service_log
原文地址:https://www.cnblogs.com/xiaodf/p/10679336.html