【记录】logstash 的filter 使用

概述

logstash 之所以强大和流行,与其丰富的过滤器插件是分不开的

过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的新事件到后续流程中

强大的文本解析工具 -- Grok

grok 是一个十分强大的 logstash filter 插件,他可以解析任何格式的文本,他是目前 logstash 中解析非结构化日志数据最好的方式

基本用法

Grok 的语法规则是:

%{语法 : 语义}

“语法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹配出数字,IP 则会匹配出 127.0.0.1 这样的 IP 地址:

%{NUMBER:lasttime}%{IP:client}

默认情况下,所有“语义”都被保存成字符串,你也可以添加转换到的数据类型

%{NUMBER:lasttime:int}%{IP:client}

目前转换类型只支持 int 和 float

覆盖 -- overwrite

使用 Grok 的 overwrite 参数也可以覆盖日志中的信息

filter {
    grok {
        match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
        overwrite => [ "message" ]
    }
}

日志中的 message 字段将会被覆盖

示例

对于下面的log,事实上是一个 HTTP 请求行:

55.3.244.1 GET /index.html 15824 0.043

我们可以使用下面的 logstash 配置:

input {
file {
path => "/var/log/http.log"
}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}

可以看到收集结果:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

将无结构的数据通过这样的方式实现了结构化输出

Grok 使用正则表达式

grok 是在正则表达式的基础上实现的(使用 Oniguruma 库),因此他可以解析任何正则表达式

创建模式

提取日志字段和正则表达式提取字段的规则一样:

(?<field_name>the pattern here)

首先,创建一个模式文件,写入你需要的正则表达式:

# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}

然后配置你的 Logstash:

filter {
    grok {
        patterns_dir => "./patterns"
            match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
    }
}

针对日志:

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<[email protected]>

可以匹配出:

timestamp: Jan 1 06:25:43

logsource: mailserver14

program: postfix/cleanup

pid: 21403

queue_id: BEF25A72965

syslog_message: message-id=<[email protected]>

IP 位置插件 -- Geoip

Logstash 1.3.0 以上可以使用 geoip 插件获取 IP 对应的地理位置,对于 accesslog 等的统计来说,IP 来源是非常有用的一个信息

使用方法

geoip {
    source => ...
}

示例

filter {
    geoip {
        source => "message"
    }
}

运行结果:

{
    "message" => "183.60.92.253",
    "@version" => "1",
    "@timestamp" => "2014-08-07T10:32:55.610Z",
    "host" => "raochenlindeMacBook-Air.local",
    "geoip" => {
        "ip" => "183.60.92.253",
        "country_code2" => "CN",
        "country_code3" => "CHN",
        "country_name" => "China",
        "continent_code" => "AS",
        "region_name" => "30",
        "city_name" => "Guangzhou",
        "latitude" => 23.11670000000001,
        "longitude" => 113.25,
        "timezone" => "Asia/Chongqing",
        "real_region_name" => "Guangdong",
        "location" => [
            [0] 113.25,
            [1] 23.11670000000001
        ]
    }
}

可以看到,logstash 通过提取到的 message 字段中的 IP,解析到了地理位置相关的一系列信息

当然,对于解析出的众多数据,你也可以通过 fields 选项进行筛选

filter {
    geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }
}

选项

上面我们看到了 source 和 fields 两个选项,geoip 还提供了下列选项:

geoip 提供的可选选项
选项 类型 是否必须 默认值 意义
add_field hash {} 为当前事件增加一个字段
add_tag array [] 为当前事件增加一个用于标识的tag
database path 位置信息库所在文件
fields array 在 geoip 的返回结果中筛选部分字段
lru_cashe_size int 1000 geoip 占用的缓存大小
periodic_flush bool false 是否定期调用刷新方e
remove_field array [] 从结果集中删除字段
remove_tag array [] 从结果集中删除tag
source string 需要解析的存有 IP 的字段名称
target string "geoip" 返回的结果中保存 geoip 解析结果的字段名

json

对于 json 格式的 log,可以通过 codec 的 json 编码进行解析,但是如果记录中只有一部分是 json,这时候就需要在 filter 中使用 json 解码插件

示例

filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}

运行结果:

{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "jsoncontent": {
        "uid": 3081609001,
        "type": "signal"
    }
}

上面的例子中,解析结果被放到了 target 所指向的节点下,如果希望将解析结果与 log 中其他字段保持在同一层级输出,那么只需要去掉 target 即可:

{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "uid": 3081609001,
    "type": "signal"
}

时间分割 -- split

mutiline 让 logstash 将多行数据变成一个事件,当然了,logstash 同样支持将一行数据变成多个事件

logstash 提供了 split 插件,用来把一行数据拆分成多个事件

示例:

filter {
    split {
        field => "message"
        terminator => "#"
    }
}

运行结果:

对于 "test1#test2",上述 logstash 配置将其变成了下面两个事件:

{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "test1"
}
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "test2"
}

需要注意的是,当 split 插件执行结束后,会直接进入 output 阶段,其后的所有 filter 都将不会被执行

数据修改 -- mutate

logstash 还支持在 filter 中对事件中的数据进行修改

重命名 -- rename

对于已经存在的字段,重命名其字段名称

filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}

更新字段内容 -- update

更新字段内容,如果字段不存在,不会新建

filter {
    mutate {
        update => { "sample" => "My new message" }
    }
}

替换字段内容 -- replace

与 update 功能相同,区别在于如果字段不存在则会新建字段

filter {
    mutate {
        replace => { "message" => "%{source_host}: My new message" }
    }
}

数据类型转换 -- convert

filter {
    mutate {
        convert => ["request_time", "float"]
    }
}

文本替换 -- gsub

gsub 提供了通过正则表达式实现文本替换的功能

filter {
    mutate {
        gsub => [
            # replace all forward slashes with underscore
            "fieldname", "/", "_",
            # replace backslashes, question marks, hashes, and minuses
            # with a dot "."
            "fieldname2", "[\\?#-]", "."
        ]
    }
}

大小写转换 -- uppercase、lowercase

filter {
    mutate {
        uppercase => [ "fieldname" ]
    }
}

去除空白字符 -- strip

类似 php 中的 trim,只去除首尾的空白字符

filter {
    mutate {
        strip => ["field1", "field2"]
    }
}

删除字段 -- remove、remove_field

remove 不推荐使用,推荐使用 remove_field

filter {
    mutate {
        remove_field => [ "foo_%{somefield}" ]
    }
}

删除字段 -- remove、remove_field

remove 不推荐使用,推荐使用 remove_field

filter {
    mutate {
        remove_field => [ "foo_%{somefield}" ]
    }
}

分割字段 -- split

将提取到的某个字段按照某个字符分割

filter {
    mutate {
        split => ["message", "|"]
    }
}

针对字符串 "123|321|adfd|dfjld*=123",可以看到输出结果:

{
    "message" => [
        [0] "123",
        [1] "321",
        [2] "adfd",
        [3] "dfjld*=123"
    ],
    "@version" => "1",
    "@timestamp" => "2014-08-20T15:58:23.120Z",
    "host" => "raochenlindeMacBook-Air.local"
}

聚合数组 -- join

将类型为 array 的字段中的 array 元素使用指定字符为分隔符聚合成一个字符串

如我们可以将 split 分割的结果再重新聚合起来:

filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        join => ["message", ","]
    }
}

输出:

{
    "message" => "123,321,adfd,dfjld*=123",
    "@version" => "1",
    "@timestamp" => "2014-08-20T16:01:33.972Z",
    "host" => "raochenlindeMacBook-Air.local"
}

合并数组 -- merge

对于几个类型为 array 或 hash 或 string 的字段,我们可以使用 merge 合并

filter {
    mutate {
        merge => [ "dest_field", "added_field" ]
    }
}

需要注意的是,array 和 hash 两个字段是不能 merge 的

原文地址;https://www.cnblogs.com/dyh004/p/9699813.html

原文地址:https://www.cnblogs.com/wbl001/p/11674791.html

时间: 2024-10-22 23:42:35

【记录】logstash 的filter 使用的相关文章

ELK 学习笔记之 Logstash之filter配置

Logstash之filter: json filter: input{ stdin{ } } filter{ json{ source => "message" } } output{ stdout{ codec => json } 输入: {"name": "CSL", "age": 20} 输出: Grok filter: pattern: https://github.com/logstash-plugin

logstash之filter处理中括号包围的内容

如题,logstash之filter处理中括号包围的内容: $grep -v "#" config/logstash-nlp.yml input { kafka { bootstrap_servers => "datacollect-1:9092,datacollect-2:9092,datacollect-3:9092" codec => "json" group_id => "logstash-newtrace-n

logstash实战filter插件之grok(收集apache日志)

有些日志(比如apache)不像nginx那样支持json可以使用grok插件 grok利用正则表达式就行匹配拆分 预定义的位置在 /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns apache的在文件grok-patterns 查看官方文档 https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.ht

LogStash的Filter的使用

最近在项目中使用LogStash做日志的采集和过滤,感觉LogStash还是很强大的. input { file{ path => "/XXX/syslog.txt" start_position => beginning codec => multiline{ patterns_dir => ["/XX/logstash-1.5.3/patterns"] pattern => "^%{MESSAGE}" negat

Logstash 常用 filter 插件介绍(二)

Filter是Logstash功能强大的主要原因,它可以对Logstash Event进行丰富的处理,比如说解析数据.删除字段.类型转换等等,常见的有如下几个: date:日志解析 grok:正则匹配解析 dissect:分割符解析 mutate:对字段做处理,比如重命名.删除.替换等 json:按照 json 解析字段内容到指定字段中 geoip:增加地理位置数据 ruby: 利用 ruby 代码来动态修改 Logstash Event data 从字段解析日期以用作事件的Logstash时间

使用logstash+elasticsearch+kibana快速搭建日志平台

日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数据自动触发消息通知 基于日志的数据挖掘 很多团队在日志方面可能遇到的一些问题有: 开发人员不能登录线上服务器查看详细日志,经过运维周转费时费力 日志数据分散在多个系统,难以查找 日志数据量大,查询速度慢 一个调用会涉及多个系统,难以在这些系统的日志中快速定位数据 数据不够实时 常见的一些重量级的开源

ELK 之Filebeat 结合Logstash 过滤出来你想要的日志

先扯点没用的 收集日志的目的是有效的利用日志,有效利用日志的前提是日志经过格式化符合我们的要求,这样才能真正的高效利用收集到elasticsearch平台的日志.默认的日志到达elasticsearch 是原始格式,乱的让人抓狂,这个时候你会发现Logstash filter的可爱之处,它很像一块橡皮泥,如果我们手巧的话就会塑造出来让自己舒舒服服的作品,but 如果你没搞好的话那就另说了,本文的宗旨就是带你一起飞,搞定这块橡皮泥.当你搞定之后你会觉得kibana 的世界瞬间清爽了很多!FIleb

kibana应用 logstash应用

Top NSD ARCHITECTURE DAY04 案例1:导入数据 案例2:综合练习 1 案例1:导入数据 1.1 问题 本案例要求批量导入数据: 批量导入数据并查看 1.2 步骤 实现此案例需要按照如下步骤进行. 步骤一:导入数据 使用POST方式批量导入数据,数据格式为json,url 编码使用data-binary导入含有index配置的json文件 [[email protected] ~]# scp /var/ftp/elk/*.gz 192.168.1.66:/root/ [[e

六、Kibana使用、logstash扩展插件

1.导入数据 批量导入数据并查看 1.1 导入数据 1) 使用POST方式批量导入数据,数据格式为json,url 编码使用data-binary导入含有index配置的json文件 ]# scp /var/ftp/elk/*.gz 192.168.1.66:/root/ kibana ~]# gzip -d logs.jsonl.gz kibana ~]# gzip -d accounts.json.gz kibana ~]# gzip -d shakespeare.json.gz kiban