LogStash的Filter的使用

最近在项目中使用LogStash做日志的采集和过滤,感觉LogStash还是很强大的。

input {
     file{
         path => "/XXX/syslog.txt"
         start_position => beginning
         codec => multiline{
             patterns_dir => ["/XX/logstash-1.5.3/patterns"]
             pattern => "^%{MESSAGE}"
             negate => true
             what => "previous"
         }
     }
}
filter{
    mutate{
     split => ["message","|"]
        add_field =>   {
            "tmp" => "%{[message][0]}"
        }
        add_field =>   {
            "DeviceProduct" => "%{[message][2]}"
        }
        add_field =>   {
            "DeviceVersion" => "%{[message][3]}"
        }
        add_field =>   {
            "Signature ID" => "%{[message][4]}"
        }
        add_field =>   {
            "Name" => "%{[message][5]}"
        }
    }

    mutate{
     split => ["tmp",":"]
        add_field =>   {
            "tmp1" => "%{[tmp][1]}"
        }
        add_field =>   {
            "Version" => "%{[tmp][2]}"
        }
        remove_field => [ "tmp" ]
    }

    grok{
       patterns_dir => ["/XXX/logstash-1.5.3/patterns"]
       match => {"tmp1" => "%{TYPE:type}"}
       remove_field => [ "tmp1"]
    }

    kv{
       include_keys => ["eventId", "msg", "end", "mrt", "modelConfidence", "severity", "relevance","assetCriticality","priority","art","rt","cs1","cs2","cs3","locality","cs2Label","cs3Label","cs4Label","flexString1Label","ahost","agt","av","atz","aid","at","dvc","deviceZoneID","deviceZoneURI","dtz","eventAnnotationStageUpdateTime","eventAnnotationModificationTime","eventAnnotationAuditTrail","eventAnnotationVersion","eventAnnotationFlags","eventAnnotationEndTime","eventAnnotationManagerReceiptTime","_cefVer","ad.arcSightEventPath"]
    }
    mutate{
     split => ["ad.arcSightEventPath",","]
        add_field =>   {
            "arcSightEventPath" => "%{[ad.arcSightEventPath][0]}"
        }
        remove_field => [ "ad.arcSightEventPath" ]
        remove_field => [ "message" ]
    }

}
output{
    kafka{
        topic_id => "rawlog"
        batch_num_messages => 20
        broker_list => "10.3.162.193:39192,10.3.162.194:39192,10.3.162.195:39192"
        codec => "json"
    }
    stdout{
       codec => rubydebug
    }

input:接入数据源

filter:对数据源进行过滤

output: 输出的

其中最重要的是filter的处理,目前我们的需求是需要对字符串进行key-value的提取

1、使用了mutate中的split,能通过分割符对分本处理。

2、通过grok使用正则对字符串进行截取处理。

3、使用kv 提取所有的key-value

时间: 2024-11-09 05:16:43

LogStash的Filter的使用的相关文章

ELK 学习笔记之 Logstash之filter配置

Logstash之filter: json filter: input{ stdin{ } } filter{ json{ source => "message" } } output{ stdout{ codec => json } 输入: {"name": "CSL", "age": 20} 输出: Grok filter: pattern: https://github.com/logstash-plugin

logstash之filter处理中括号包围的内容

如题,logstash之filter处理中括号包围的内容: $grep -v "#" config/logstash-nlp.yml input { kafka { bootstrap_servers => "datacollect-1:9092,datacollect-2:9092,datacollect-3:9092" codec => "json" group_id => "logstash-newtrace-n

【记录】logstash 的filter 使用

概述 logstash 之所以强大和流行,与其丰富的过滤器插件是分不开的 过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的新事件到后续流程中 强大的文本解析工具 -- Grok grok 是一个十分强大的 logstash filter 插件,他可以解析任何格式的文本,他是目前 logstash 中解析非结构化日志数据最好的方式 基本用法 Grok 的语法规则是: %{语法 : 语义} “语法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹

logstash实战filter插件之grok(收集apache日志)

有些日志(比如apache)不像nginx那样支持json可以使用grok插件 grok利用正则表达式就行匹配拆分 预定义的位置在 /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns apache的在文件grok-patterns 查看官方文档 https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.ht

Logstash 常用 filter 插件介绍(二)

Filter是Logstash功能强大的主要原因,它可以对Logstash Event进行丰富的处理,比如说解析数据.删除字段.类型转换等等,常见的有如下几个: date:日志解析 grok:正则匹配解析 dissect:分割符解析 mutate:对字段做处理,比如重命名.删除.替换等 json:按照 json 解析字段内容到指定字段中 geoip:增加地理位置数据 ruby: 利用 ruby 代码来动态修改 Logstash Event data 从字段解析日期以用作事件的Logstash时间

Logstash使用grok解析IIS日志

Logstash使用grok解析IIS日志 1. 安装配置 安装Logstash前请确认Elasticsearch已经安装正确,参见RedHat6.4安装Elasticsearch5.2.0. 下载链接为:logstash-5.2.0.rpm. 下载完成后,rpm -i logstash-5.2.0.rpm即可安装. Logstash默认的配置文件位置为./config和/etc/logstash/,后者已经存在,但直接运行依然会报错: WARNING: Could not find logst

使用logstash+elasticsearch+kibana快速搭建日志平台

日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数据自动触发消息通知 基于日志的数据挖掘 很多团队在日志方面可能遇到的一些问题有: 开发人员不能登录线上服务器查看详细日志,经过运维周转费时费力 日志数据分散在多个系统,难以查找 日志数据量大,查询速度慢 一个调用会涉及多个系统,难以在这些系统的日志中快速定位数据 数据不够实时 常见的一些重量级的开源

logstash 中多行合并

这里我之前是在input里面配置的多行合并,合并语法为: input { beats { type => beats port => 7001 codec => multiline { patterns_dir => ["/data/package/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns"] pattern => ".*#ELK#.*&

ELK 之Filebeat 结合Logstash 过滤出来你想要的日志

先扯点没用的 收集日志的目的是有效的利用日志,有效利用日志的前提是日志经过格式化符合我们的要求,这样才能真正的高效利用收集到elasticsearch平台的日志.默认的日志到达elasticsearch 是原始格式,乱的让人抓狂,这个时候你会发现Logstash filter的可爱之处,它很像一块橡皮泥,如果我们手巧的话就会塑造出来让自己舒舒服服的作品,but 如果你没搞好的话那就另说了,本文的宗旨就是带你一起飞,搞定这块橡皮泥.当你搞定之后你会觉得kibana 的世界瞬间清爽了很多!FIleb