filebeat + logstash + elasticsearch + granfa

filebeat + logstash + elasticsearch + granfa

https://www.cnblogs.com/wenchengxiaopenyou/p/9034213.html

一。背景

  前端web服务器为nginx,采用filebeat + logstash + elasticsearch + granfa 进行数据采集与展示,对客户端ip进行地域统计,监控服务器响应时间等。

二。业务整体架构:

  nginx日志落地——》filebear——》logstash——》elasticsearch——》grafna(展示)

三。先上个效果图,慢慢去一步步实现

如上只是简单的几个实用的例子,实际上有多维度的数据之后还可以定制很多需要的内容,如终端ip访问数,国家、地区占比,访问前十的省份,请求方法占比,referer统计,user_agent统计,慢响应时间统计,更有世界地图坐标展示等等,只要有数据就能多维度进行展示。这里提供模板搜索位置大家可以查找参考:https://grafana.com/dashboards

四,准备条件

需要具备如下条件:

1.nginx日志落地,需要主要落地格式,以及各个字段对应的含义。

2.安装filebeat。 filebeat轻量,功能相比较logstash而言比较单一。

3.安装logstash 作为中继服务器。这里需要说明一下的是,起初设计阶段并没有计划使用filebeat,而是直接使用logstash发往elasticsearch,但是当前端机数量增加之后logstash数量也随之增加,同时发往elasticsearch的数量增大,logstash则会抛出由于elasticsearch 限制导致的错误,大家遇到后搜索相应错误的代码即可。为此只采用logstash作为中继。

4.elasticsearch 集群。坑点是index templates的创建会影响新手操作 geoip模块。后文会有。

5.grafana安装,取代传统的kibana,grafana有更友好、美观的展示界面。

五。实现过程

1.nginx日志落地配置

nginx日志格式、字段的内容和顺序都是高度可定制化的,将需要收集的字段内容排列好。定义一个log_format

定义的形势实际上直接决定了logstash配置中对于字段抽取的模式,这里有两种可用,一种是直接在nginx日志中拼接成json的格式,在logstash中用codec => "json"来转换,

一种是常规的甚至是默认的分隔符的格式,在logstash中需要用到grok来进行匹配,这个会是相对麻烦些。两种方法各有优点。直接拼接成json的操作比较简单,但是在转码过程中

会遇到诸如 \x 无法解析的情况。 这里我也遇到过,如有必要后续会详谈。采用grok来匹配的方法相对难操作,但是准确性有提升。我们这里采用的是第一种方法,下面logstash部分

也会给出采用grok的例子。 nginx日志中日志格式定义如下:

1
2
3
4
5
6
7
8
9
10
log_format access_json ‘{"timestamp":"$time_iso8601",‘
‘"hostname":"$hostname",‘
‘"ip":"$remote_addrx",‘
‘"request_method":"$request_method",‘
‘"domain":"XXXX",‘
‘"size":$body_bytes_sent,‘
‘"status": $status,‘
‘"responsetime":$request_time,‘
‘"sum":"1"‘
‘}‘;

2.filebeat配置文件

关于filebeat更多内容请参考https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html

配置文件内容:filebeat.yml 这里应该不会遇到坑。

1
2
3
4
5
6
7
filebeat.prospectors:

  • input_type: log
    paths:

    • /data0/logs/log_json/*.log #nginx日志路径

output.logstash:
hosts: ["xxxx.xxxx.xxxx.xxx:12121"] #logstash 服务器地址
  

3.logstahs配置文件内容:

这里是针对json已经拼接号,直接进行json转码的情况:

需要注意如下:

1)date模块必须有,否则会造成数据无法回填导致最终的图像出现锯齿状影响稳定性(原因是排列时间并不是日志产生的时间,而是进入logstash的时间)。这里后面的 yyyy-MM-dd‘T‘HH:mm:ssZZ 需要根据你日志中的日期格式进行匹配匹配规则见:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html

 2)需要说明的是我下面去掉了好多的字段(remove_field),原因是我们数据量大,es服务器有限。 可以根据需要随时调整收集的字段。

 3) geoip 仅需要指定源ip字段名称即可,fields并不是必须的,我加入的原因还是由于资源有限导致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
input {
beats {
port => 12121
host => "10.13.0.80"
codec => "json"
}
}

filter {
date {
match => [ "timestamp", "yyyy-MM-dd‘T‘HH:mm:ssZZ" ]
#timezone => "Asia/Shanghai"
#timezone => "+00:00"
}
mutate {
convert => [ "status","integer" ]
convert => [ "sum","integer" ]
convert => [ "size","integer" ]
remove_field => "message"
remove_field => "source"
remove_field => "tags"
remove_field => "beat"
remove_field => "offset"
remove_field => "type"
remove_field => "@source"
remove_field => "input_type"
remove_field => "@version"
remove_field => "host"
remove_field => "client"
#remove_field => "request_method"
remove_field => "size"
remove_field => "timestamp"
#remove_field => "domain"
#remove_field => "ip"
}
geoip {
source => "ip"
fields => ["country_name", "city_name", "timezone","region_name","location"]
}
}

output {
elasticsearch {
hosts => ["xxx:19200","xxx:19200","xxx:19200"]
user => "xxx"
password => "xxx"
index => "logstash-suda-alllog-%{+YYYY.MM.dd}"
flush_size => 10000
idle_flush_time => 35
}
}
  下面给出一个采用grok的例子:

   其中match内的内容不用参考我的,需要根据你的字段个数,以及格式来定制。 这里其实是正则表达式。自带了一部分几乎就可以满足所有的需求了无需自己写了,可以参考:https://github.com/elastic/logstash/blob/v1.4.0/patterns/grok-patterns  直接用即可。

复制代码
input {
file {
type => "access"
path => ["/usr/local/nginx/logs/main/*.log"]
}
}

filter {
if [type] == "access" {
if [message] =~ "^#" {
drop {}
}

grok {
match => ["message", "[%{HTTPDATE:log_timestamp}] %{HOSTNAME:server_name} "%{WORD:request_method} %{NOTSPACE:query_string} HTTP/%{NUMBER:httpversion}" "%{GREEDYDATA:http_user_agent}" %{NUMBER:status} %{IPORHOST:server_addr} "%{IPORHOST:remote_addr}" "%{NOTSPACE:http_referer}" %{NUMBER:body_bytes_sent} %{NUMBER:time_taken} %{GREEDYDATA:clf_body_bytes_sent} %{NOTSPACE:uri} %{NUMBER:m_request_time}"]
}

date {
match => [ "log_timestamp", "dd/MMM/yyyy:mm:ss:SS Z" ]
timezone => "Etc/UTC"
}

mutate {
convert => [ "status","integer" ]
convert => [ "body_bytes_sent","integer" ]
convert => [ "m_request_time","float" ]
}
复制代码
在提供一个高级的用法:

ruby:强大的模块, 可以进行诸如时间转换,单位计算等,多个可以用分号隔开。

复制代码
ruby {
code => "event.set(‘logdateunix‘,event.get(‘@timestamp‘).to_i);event.set(‘request_time‘, event.get(‘m_request_time‘) / 1000000 )"
}

mutate {
add_field => {
"http_host" => "%{server_name}"
"request_uri" => "%{uri}"
}
复制代码
在windowns上使用lgostsh需要注意的是:(win上收集iis的日志,我想正常环境是不会用到的,但是我确实用到了。。。。。。)

path路径一定要采用 linux中的分割符来拼接路径,用win的格式则正则不能实现,大家可以测试下。其他配置则无区别。

复制代码
input {
file {
type => "access"
path => ["C:/WINDOWS/system32/LogFiles/W3SVC614874788/*.log"]
}
}
复制代码

4.elasticsearch配置,集群的安装以及启动,调优这里不多说(说不完),需要注意的一个是,geoip location的格式,我这里采用的是index templates来实现的如下:

最重要的是 "location" 定义 (否则geoip_location字段格式有问题,无法拼接成坐标),其他可以根据情况自定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
"order": 0,
"version": 50001,
"index_patterns": [
"suda-*"
],
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "1",
"refresh_interval": "200s"
}
},
"mappings": {
"default": {
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"mapping": {
"norms": false,
"type": "text"
},
"match_mapping_type": "string"
}
},
{
"string_fields": {
"mapping": {
"norms": false,
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"match_mapping_type": "string",
"match": "*"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"@version": {
"type": "keyword"
}
}
}
},
"aliases": {}
}
  

5 grafana 安装以及模板创建,这个比较简单,安装完直接写语句即可附一个例子如下:

这里的变量需要自定义:

通过上面应该能够完成一个完整的收集和展示情况,这里实际上提供了一种可行的方法,并么有太大的具体操作。

希望多多交流

推荐内容:kibana中文指南(有ibook版本,看着挺方便) 三斗室著

     https://www.elastic.co/cn/

     https://grafana.com/dashboards

     https://grafana.com/

原文地址:https://www.cnblogs.com/Leo_wl/p/9060098.html

时间: 2024-10-02 18:09:18

filebeat + logstash + elasticsearch + granfa的相关文章

filebeat+logstash+elasticsearch收集haproxy日志

filebeat用于是日志收集,感觉和 flume相同,但是用go开发,性能比较好 在2.4版本中, 客户机部署logstash收集匹配日志,传输到 kafka,在用logstash 从消息队列中抓取日志存储到elasticsearch中. 但是在 5.5版本中,使用filebeat 收集日志,减少对客户机的性能影响, filebeat 收集日志 传输到 logstash的 5044端口, logstash接收日志,然后传输到es中 实验 filebeat ---- kafka ------lo

ELK 做日志分析(filebeat+logstash+elasticsearch)配置

利用 Filebeat去读取日志发送到 Logstash ,再由 Logstash 处理后发送给 Elasticsearch . 一.Filebeat 项目日志文件: 利用 Filebeat 去读取文件,paths 下面配置路径地址,Filebeat 会自动去读取 /data/share/business_log/TA-*/debug.log 文件 #=========================== Filebeat prospectors ========================

生产环境filebeat logstash配置模板

filebeat logstash配置模板说明 为了让不同类型的日志记录到不同index,实现日志分类,需要更改默认的配置文件,ELK更新迭代速度很快,网上以前的文档适用于之前的版本 filebeat的docment_type配置项已经在6版本中弃用,请使用本文配置 filebeat配置模板 [email protected]:~$ cat /etc/filebeat/filebeat.yml filebeat.prospectors: - type: log   enabled: true  

海量日志下的日志架构优化:filebeat+logstash+kafka+ELK

前言: 实验需求说明 在前面的日志收集中,都是使用的filebeat+ELK的日志架构.但是如果业务每天会产生海量的日志,就有可能引发logstash和elasticsearch的性能瓶颈问题.因此改善这一问题的方法就是filebeat+logstash+kafka+ELK,也就是将存储从elasticsearch转移给消息中间件,减少海量数据引起的宕机,降低elasticsearch的压力,这里的elasticsearch主要进行数据的分析处理,然后交给kibana进行界面展示 实验架构图:

logstash+elasticsearch +kibana 日志管理系统

Logstash是一个完全开源的工具,他可以对你的日志进行收集.分析,并将其存储供以后使用(如,搜索),您可以使用它.说到搜索,logstash带有一个web界面,搜索和展示所有日志.kibana 也是一个开源和免费的工具,他可以帮助您汇总.分析和搜索重要数据日志并提供友好的web界面.他可以为 Logstash 和 ElasticSearch 提供的日志分析的 Web 界面. 目的就是为了运维.研发很方便的进行日志的查询.Kibana一个免费的web壳:Logstash集成各种收集日志插件,还

logstash+elasticsearch+kibana+redis 实战

写此文章和就是为了记录logstash+elasticsearch+kibana+redis搭建过程.所有程序都是运行在windows 平台下. 1. 下载 1.1 logstash, elasticsearch, kinana 从官方站点下载: https://www.elastic.co/ 1.2 redis 官方的没有windows平台的.可以从github上下载windows平台版: https://github.com/MSOpenTech/redis/releases 2. 启动各部

日志收集分析工具logstash + elasticsearch

Your logs are your data: logstash + elasticsearch by Andrey Redko on February 25th, 2013 | Filed in: Enterprise Java Tags: Elasticsearch, Logging, Logstash Topic of today's post stays a bit aside from day-to-day coding and development but nonetheless

使用logstash+elasticsearch+kibana快速搭建日志平台

日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数据自动触发消息通知 基于日志的数据挖掘 很多团队在日志方面可能遇到的一些问题有: 开发人员不能登录线上服务器查看详细日志,经过运维周转费时费力 日志数据分散在多个系统,难以查找 日志数据量大,查询速度慢 一个调用会涉及多个系统,难以在这些系统的日志中快速定位数据 数据不够实时 常见的一些重量级的开源

Logstash+Elasticsearch+Kibana 联合使用搭建日志分析系统(Windows系统)

最近在做日志分析这块儿,要使用 Logstash+Elasticsearch+Kibana 实现日志的导入.过滤及可视化管理,官方文档写的不够详细,网上的文章大多要么是针对Linux系统的用法,要么就是抄袭别人的配置大都没法运行.费了很大劲才搞定了这仨东西,写一篇用法心得,废话不多说,进入主题. 首先,你的电脑上要装Java 的JDK环境,要使用  Logstash+Elasticsearch+Kibana,需要下载这三个软件和一些必要的插件,列表如下 : 1.Java JDK (最新版Logs