Filebeat 关键字多行匹配日志采集(multiline与include_lines)

很多同事认为filebeat采集日志不能做到多行处理,今天这里讨论下filebeat的multiline与include_lines。

先来个案例,以下日志,我们只要求采集error的字段,

2017/06/22 11:26:30 [error] 26067#0: *17918 connect() failed (111: Connection refused) while connecting to upstream, client: 192.168.32.17, server: localhost, request: "GET /wss/ HTTP/1.1", upstream: "http://192.168.12.106:8010/", host: "192.168.12.106"
2017/06/22 11:26:30 [info] 26067#0:
2017/06/22 12:05:10 [error] 26067#0: *17922 open() "/data/programs/nginx/html/ws" failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: "GET /ws HTTP/1.1", host: "192.168.12.106"

filebeat.yml文件配置如下:

filebeat.prospectors:
- input_type: log
  paths:
    - /tmp/test.log
  include_lines: [‘error‘]
output.kafka:
  enabled: true
  hosts: ["192.168.12.105:9092"]
  topic: logstash-errors-log

查看下kafka队列

果然只有“error”关键字的日志被采集了

{"@timestamp":"2017-06-23T08:57:25.227Z","beat":{"name":"192.168.12.106"},"input_type":"log","message":"2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106","offset":30926,"source":"/tmp/test.log","type":"log"}
{"@timestamp":"2017-06-23T08:57:32.228Z","beat":{"name":"192.168.12.106"},"input_type":"log","message":"2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106","offset":31342,"source":"/tmp/test.log","type":"log"}

再来多行案例

[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***], source[{***}}
MapperParsingException[Field name [events.created] cannot contain ‘.‘]
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
    at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
    at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)
    at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)
    at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)
    at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)
    at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)
    at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
    at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
    at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
    at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

filebeat.yml文件配置如下:

filebeat.prospectors:
- input_type: log
  paths:
    - /tmp/test.log
   multiline:
        pattern: ‘^\[‘
        negate:  true
        match:   after
  fields:
    beat.name: 192.168.12.106
  fields_under_root: true
output.kafka:
  enabled: true
  hosts: ["192.168.12.105:9092"]
  topic: logstash-errors-log
kafka队列如下:

{"@timestamp":"2017-06-23T09:09:02.887Z","beat":{"name":"192.168.12.106"},"input_type":"log",
"message":"[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***], source[{***}}\n
MapperParsingException[Field name [events.created] cannot contain ‘.‘]\n    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)\n    
at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)\n    
at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)\n    
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)\n    
at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)\n    
at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)\n    
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)\n    
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)\n   
at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)\n    
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)\n    
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)\n    
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)\n   
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n    
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n    
at java.lang.Thread.run(Thread.java:745)\n\n\n\n","offset":35737,"source":"/tmp/test.log","type":"log"}

可以看出multiline将多行日志汇总。

multiline与include_lines,结合使用。

filebeat.yml文件配置如下:

filebeat.prospectors:
- input_type: log
  paths:
    - /tmp/test.log
  include_lines: [‘error‘]
  multiline:
        pattern: ‘^\[‘
        negate:  true
        match:   after
output.kafka:
  enabled: true
  hosts: ["192.168.12.105:9092"]
  topic: logstash-errors-log

即日志中如果有"error"关键字的日志,进行多行合并,发送至kafka.

经验证,在日志不断输入的情况,会把不含"error"的行也进行合并,日志有间隔的情况输入,过滤效果比较好,具体结合业务情况实用吧。

总之一句话,filebeat可以多行合并和进行关键字日志采集。

时间: 2024-11-08 18:59:26

Filebeat 关键字多行匹配日志采集(multiline与include_lines)的相关文章

Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产生协同作用,而且 logstash比filebeat功能更强大一点,2个都使用是因为:Beats 是一个轻量级的采集器,支持从边缘机器向 Logstash 和 Elasticsearch 发送数据.考虑到 Logstash 占用系 统资源较多,我们采用 Filebeat 来作为我们的日志采集器.并且

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台(转)

参考:http://www.tuicool.com/articles/R77fieA 我在做ELK日志平台开始之初选择为ELK+Redis直接构建,在采集nginx日志时一切正常,当我采集我司业务报文日志类后,logstash会报大量的redis connect timeout.换成redis cluster后也是同样的情况后,就考虑对消息中间件进行替换重新选型,经过各种刷文档,决定选用kafka来替换redis.根据网上找的一篇参考文档中的架构图如下: 注:由于环境有限,在对该架构图中的ela

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://tchuairen.blog.51cto.com/3848118/1861167 什么要做日志分析平台? 随着业务量的增长,每天业务服务器将会产生上亿条的日志,单个日志文件达几个GB,这时我们发现用Linux自带工具,cat grep awk 分析越来越力不从心了,而且除了服务器日志,还有程序报错日志,分布在不同的服务器,查阅繁琐. 待解决的痛点: 1.大量不同种类的日志成为了运

Filebeat7 Kafka Gunicorn Flask Web应用程序日志采集

本文的内容 如何用filebeat kafka es做一个好用,好管理的日志收集工具 放弃logstash,使用elastic pipeline gunicron日志格式与filebeat/es配置 flask日志格式与异常日志采集与filebeat/es配置 以上的配置 概况 我有一个HTTP请求,经过的路径为 Gateway(kong)-->WebContainer(gunicorn)-->WebApp(flask) 我准备以下流向处理我的日志 file --> filebeat -

Rancher体系下容器日志采集

引言 一个完整的容器平台,容器日志都是很重要的一环.尤其在微服务架构大行其道状况下,程序的访问监控健康状态很多都依赖日志信息的收集,由于Docker的存在,让容器平台中的日志收集和传统方式很多不一样,日志的输出和采集点收集和以前大有不同.本文就探讨一下,Rancher平台内如何做容器日志收集. 当前现状 纵览当前容器日志收集的场景,无非就是两种方式:一是直接采集Docker标准输出,容器内的服务将日志信息写到标准输出,这样通过Docker的log driver可以发送到相应的收集程序中:二是延续

filebeat+logstash+elasticsearch收集haproxy日志

filebeat用于是日志收集,感觉和 flume相同,但是用go开发,性能比较好 在2.4版本中, 客户机部署logstash收集匹配日志,传输到 kafka,在用logstash 从消息队列中抓取日志存储到elasticsearch中. 但是在 5.5版本中,使用filebeat 收集日志,减少对客户机的性能影响, filebeat 收集日志 传输到 logstash的 5044端口, logstash接收日志,然后传输到es中 实验 filebeat ---- kafka ------lo

sed中的多行匹配

sed中的多行匹配,可以用N,N的意思是把下一行写入保持空间,保持空间可能不好理解,可以理解为机器的内存一样,把一下行也写入内存,可能会好理解多了,当把一下行也写入保持空间的时间,转行符"\n",也可以用.*匹配到了,前几天在帮开发做全局替换,在一对php标签中,替换里面的两行内容,其他有两个字段组合起来就是整个页面都是唯一的 <?php $_web = str_replace(array('s-','www.'),'',$_web);\$_webcontents = file_

LC3视角:Kubernetes下日志采集、存储与处理技术实践

摘要: 在Kubernetes服务化.日志处理实时化以及日志集中式存储趋势下,Kubernetes日志处理上也遇到的新挑战,包括:容器动态采集.大流量性能瓶颈.日志路由管理等问题.本文介绍了"Logtail + 日志服务 + 生态"架构,介绍了:Logtail客户端在Kubernetes日志采集场景下的优势:日志服务作为基础设施一站式解决实时读写.HTAP两大日志强需求:日志服务数据的开放性以及与云产品.开源社区相结合,在实时计算.可视化.采集上为用户提供的丰富选择. Kubernet

使用日志服务进行Kubernetes日志采集

阿里云容器服务Kubernetes集群集成了日志服务(SLS),您可在创建集群时启用日志服务,快速采集Kubernetes 集群的容器日志,包括容器的标准输出以及容器内的文本文件. 新建 Kubernetes 集群 如果您尚未创建任何的 Kubernetes 集群,可以按照本节的步骤来进行操作: 登录 容器服务管理控制台. 单击左侧导航栏中集群,单击右上角创建Kubernetes集群. 进入创建页面后,参见创建Kubernetes集群进行配置. 拖动到页面底部,勾选日志服务配置项,表示在新建的