ELK 之Filebeat 结合Logstash 过滤出来你想要的日志

先扯点没用的

收集日志的目的是有效的利用日志,有效利用日志的前提是日志经过格式化符合我们的要求,这样才能真正的高效利用收集到elasticsearch平台的日志。默认的日志到达elasticsearch 是原始格式,乱的让人抓狂,这个时候你会发现Logstash filter的可爱之处,它很像一块橡皮泥,如果我们手巧的话就会塑造出来让自己舒舒服服的作品,but 如果你没搞好的话那就另说了,本文的宗旨就是带你一起飞,搞定这块橡皮泥。当你搞定之后你会觉得kibana 的世界瞬间清爽了很多!
FIlebeat 的4大金刚
Filebeat 有4个非常重要的概念需要我们知道,
Prospector(矿工);
Harvest (收割者);
libeat (汇聚层);
registry(注册计入者);
Prospector 负责探索日志所在地,就如矿工一样要找矿,而Harvest如矿主一样的收割者矿工们的劳动成果,哎,世界无处不剥削啊!每个Prospector 都有一个对应的Harvest,然后他们有一个共同的老大叫做Libbeat,这个家伙会汇总所有的东西,然后把所有的日志传送给指定的客户,这其中还有个非常重要的角色”registry“,这个家伙相当于一个会计,它会记录Harvest 都收割了些啥,收割到哪里了,这样一但有问题了之后,harvest就会跑到会计哪里问:“上次老子的活干到那块了”?Registry 会告诉Harvest 你Y的上次干到哪里了,去哪里接着干就行了。这样就避免了数据重复收集的问题!

FIlebeat 详细配置:

filebeat.prospectors:
- input_type: log
  enabled: True
  paths:
    - /var/log/mysql-slow-*
#这个地方是关键,我们给上边日志加上了tags,方便在logstash里边通过这个tags 过滤并格式化自己想要的内容;
  tags: ["mysql_slow_logs"]
#有的时候日志不是一行输出的,如果不用multiline的话,会导致一条日志被分割成多条收集过来,形成不完整日志,这样的日志对于我们来说是没有用的!通过正则匹配语句开头,这样multiline 会在匹配开头
之后,一直到下一个这样开通的语句合并成一条语句。
#pattern:多行日志开始的那一行匹配的pattern
#negate:是否需要对pattern条件转置使用,不翻转设为true,反转设置为false
#match:匹配pattern后,与前面(before)还是后面(after)的内容合并为一条日志
#max_lines:合并的最多行数(包含匹配pattern的那一行 默认值是500行
#timeout:到了timeout之后,即使没有匹配一个新的pattern(发生一个新的事件),也把已经匹配的日志事件发送出去
  multiline.pattern: ‘^\d{4}/\d{2}/\d{2}‘  (2018\05\01  我的匹配是已这样的日期开头的)
  multiline.negate: true
  multiline.match: after
  multiline.Max_lines:20
  multiline.timeout: 10s
- input_type: log
  paths:
    - /var/log/mysql-sql-*
  tags: ["mysql_sql_logs"]
  multiline.pattern: ‘^\d{4}/\d{2}/\d{2}‘
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 10s
  encoding: utf-8
  document_type: mysql-proxy
  scan_frequency: 20s
  harverster_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
 #tail_files:如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送,而不是从文件开始处重新发送所有内容。默认是false;

Logstash 的详细配置

input {
  kafka {
    topics_pattern => "mysql.*"
    bootstrap_servers => "x.x.x.x:9092"
    #auto_offset_rest => "latest"
    codec => json
    group_id => "logstash-g1"
   }
}
#终于到了关键的地方了,logstash的filter ,使用filter 过滤出来我们想要的日志,
filter {
     #if 还可以使用or 或者and 作为条件语句,举个栗子: if “a” or “b”  or “c” in [tags],这样就可以过滤多个tags 的标签了,我们这个主要用在同型号的交换设备的日志正规化,比如说你有5台交换机,把日志指定到了同一个syslog-ng 上,收集日志的时候只能通过同一个filebeat,多个prospector加不同的tags。这个时候过滤就可以通过判断相应的tags来完成了。
    if "mysql_slow_logs" in [tags]{
    grok {
     #grok 里边有定义好的现场的模板你可以用,但是更多的是自定义模板,规则是这样的,小括号里边包含所有一个key和value,例子:(?<key>value),比如以下的信息,第一个我定义的key是data,表示方法为:?<key> 前边一个问号,然后用<>把key包含在里边去。value就是纯正则了,这个我就不举例子了。这个有个在线的调试库,可以供大家参考,http://grokdebug.herokuapp.com/
        match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
#过滤完成之后把之前的message 移出掉,节约磁盘空间。
        remove_field => ["message"]
    }
  }
    else if "mysql_sql_logs" in [tags]{
    grok {
        match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+\.\d+)\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
        remove_field => ["message"]}
  }
}

我要过滤的日志样本:

2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[];
过滤后的结果如下:
{
  "date": [
    [
      "2018/05/01 16:16:01.892"
    ]
  ],
  "datetime": [
    [
      "16:16:01.892"
    ]
  ],
  "TIME": [
    [
      "16:16:01.892"
    ]
  ],
  "HOUR": [
    [
      "16"
    ]
  ],
  "MINUTE": [
    [
      "16"
    ]
  ],
  "SECOND": [
    [
      "01.892"
    ]
  ],
  "status": [
    [
      "OK"
    ]
  ],
  "respond_time": [
    [
      "759"
    ]
  ],
  "client": [
    [
      "172.29.1.7"
    ]
  ],
  "IPV6": [
    [
      null,
      null
    ]
  ],
  "IPV4": [
    [
      "172.29.1.7",
      "172.7.1.39"
    ]
  ],
  "client-port": [
    [
      "35184"
    ]
  ],
  "server": [
    [
      "172.7.1.39"
    ]
  ],
  "server-port": [
    [
      "3306"
    ]
  ],
  "databases": [
    [
      "<DB>"
    ]
  ],
  "SQL": [
    [
      "select count(*) from test[];"
    ]
  ]
}

#有图有真相:

原文地址:http://blog.51cto.com/seekerwolf/2110174

时间: 2024-08-01 14:34:26

ELK 之Filebeat 结合Logstash 过滤出来你想要的日志的相关文章

ELK 之数据收集传输过滤 Filebeat+Logstash 部署

本文与前文是有关联的,之前的两篇文章客官可以抬腿出门右转 导读,ELK 之前端,ELK 之分布式发#前端和消息队列搞定之后,我们需要安装数据采集工具filebeats和数据过滤机运输工具Logstash,一般情况我们都使用filebeats 用来收集日志文件,我自定义了一个log文件,文件内容如下:55.3.244.1 GET /index.html 15824 0.04355.3.244.1 GET /index.html 15824 0.043文件位置放到/tmp/test.log cd /

Ubuntu 16.04安装Elasticsearch,Logstash和Kibana(ELK)Filebeat

https://www.howtoing.com/how-to-install-elasticsearch-logstash-and-kibana-elk-stack-on-ubuntu-16-04 在本教程中,我们将在安装Elasticsearch ELK在Ubuntu 16.04(即,Elasticsearch 2.3.x版本,Logstash 2.3.x版本,并Kibana 4.5.x).我们也将告诉你如何配置它收集和可视化你的系统的系统日志进行集中... 分类:UbuntuUbuntu

ELK教程3:logstash的部署、SpringBoot整合ELK+Filebeat

本篇文章主要讲解如下安装Logstash,logstash依赖于Java环境,首先安装Java,安装脚本如下: 1 yum install java logstash安装 Logstash的安装脚本如下: 1 # 下载logstash的压缩包 2 wget https://artifacts.elastic.co/downloads/logstash/logstash-7.2.0.zip 3 # 解压压缩包 4 upzip logstash-7.2.0.zip 5 # 将解压的包移到/usr/s

0415关于通过FILEBEAT,LOGSTASH,ES,KIBNA实现数据的采集

如何通过FILEBEAT,LOGSTASH,ES,KIBNA实现数据的采集总体参考网址:https://www.olinux.org.cn/elk/1157.html官方网址:https://www.elastic.co/guide/en/beats/filebeat/6.2/filebeat-getting-started.html 第一步 启动ES,ES的安装请自行百度第二步 启动LOGSTASH,LOGSTASH的安装请自行百度启动命令../bin/logstash -f logstash

ELK + Kafka + Filebeat

ELK + Kafka + Filebeat学习 https://blog.csdn.net/qq_21383435/article/details/79463832 https://blog.csdn.net/xiangyuan1988/article/details/78977471 https://www.jianshu.com/p/f149a76ea5b5 https://blog.csdn.net/qq_21383435/article/category/7486820 ELK + K

6.3.1版本elk+redis+filebeat收集docker+swarm日志分析

最近公司比较忙,没来的及更新博客,今天为大家更新一篇文章,elk+redis+filebeat,这里呢主要使用与中小型公司的日志收集,如果大型公司 可以参考上面的kafka+zookeeper配合elk收集,好了开始往上怼了: Elk为了防止数据量突然键暴增,吧服务器搞奔溃,这里需要添加一个redis,让数据输入到redis当中,然后在输入到es当中 Redis安装: #!/bin/bash # 6379  Redis-Server tar zxf redis-3.0.0-rc5.tar.gz

Logstash过滤分析日志数据/kibanaGUI调试

[Logstash] [[email protected] ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gz [[email protected] ~]# tar zxvf logstash-6.3.2.tar.gz -C /usr/local/ [[email protected] ~]# mv /usr/local/logstash-6.3.2/ /usr/local/logstash

Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产生协同作用,而且 logstash比filebeat功能更强大一点,2个都使用是因为:Beats 是一个轻量级的采集器,支持从边缘机器向 Logstash 和 Elasticsearch 发送数据.考虑到 Logstash 占用系 统资源较多,我们采用 Filebeat 来作为我们的日志采集器.并且

ELK系列(1) - Elasticsearch + Logstash + Kibana + Log4j2快速入门与搭建用例

前言 最近公司分了个ELK相关的任务给我,在一边学习一边工作之余,总结下这些天来的学习历程和踩坑记录. 首先介绍下使用ELK的项目背景:在项目的数据库里有个表用来存储消息队列的消费日志,这些日志用于开发者日后的维护.每当客户端生产一条消息并发送到消息队列后,就会插入一条对应的记录到数据库里.当这条消息被消费之后,又会更新数据库里对应的记录的几个column的值,比如status.updated_on这些常用的column. 由于客户每天生产消费的消息很多,导致数据库里的这个表里的数据很多,长年累