Logstash——解析各类日志文件

原理

使用filebeat来上传日志数据,logstash进行日志收集与处理,elasticsearch作为日志存储与搜索引擎,最后使用kibana展现日志的可视化输出。所以不难发现,日志解析主要还是logstash做的事情。

从上图中可以看到,logstash主要包含三大模块:

  1. INPUTS: 收集所有数据源的日志数据([源有file、redis、beats等,filebeat就是使用了beats源*);
  2. FILTERS: 解析、整理日志数据(本文重点);
  3. OUTPUTS: 将解析的日志数据输出至存储器([elasticseach、file、syslog等);

FILTERS是重点,来看看它常用到的几个插件:

  1. grok:采用正则的方式,解析原始日志格式,使其结构化;
  2. geoip:根据IP字段,解析出对应的地理位置、经纬度等;
  3. date:解析选定时间字段,将其时间作为logstash每条记录产生的时间(若没有指定该字段,默认使用read line的时间作为该条记录时间);

*注意:codec也是经常会使用到的,它主要作用在INPUTS和OUTPUTS中,[提供有json的格式转换、multiline的多行日志合并等

配置文件

一个简单的配置文件:

input {
    log4j {
        port => "5400"
    }
    beats {
        port => "5044"
    }
}
filter {  # 多个过滤器会按声明的先后顺序执行
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {
        action => "index"
        hosts => "127.0.0.1:9200" # 或者 ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] ,支持均衡的写入ES的多个节点,一般为非master节点
        index  => "logstash-%{+YYYY-MM}"
    }
    stdout {
        codec=> rubydebug
    }
    file {
        path => "/path/to/target/file"
    }
}

场景

1. NodeJS 日志

  • 日志格式
$time - $remote_addr $log_level $path - $msg
  • 日志内容
2017-03-15 18:34:14.535 - 112.65.171.98 INFO /root/ws/socketIo.js - xxxxxx与ws server断开连接
  • filebeat配置建议filebeat使用rpm安装,以systemctl start filebeat方式启动
filebeat:
  prospectors:
    - document_type: nodejs #申明type字段为nodejs,默认为log
      paths:
        - /var/log/nodejs/log #日志文件地址
      input_type: log #从文件中读取
      tail_files: true #以文件末尾开始读取数据
output:
  logstash:
      hosts: ["${LOGSTASH_IP}:5044"]

#General Setting
name: "server1" #设置beat的名称,默认为主机hostname
  • logstash中FILTERS配置
filter {
    if [type] == "nodejs" { #根据filebeat中设置的type字段,来过滤不同的解析规则
        grok{
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} - %{IPORHOST:clientip} %{LOGLEVEL:level} %{PATH:path} - %{GREEDYDATA:msg}" }
        }
        geoip {
            source => "clientip" #填写IP字段
        }
    }
}
  • 结果为方便演示,数据有删减

  • Filter配置讲解
  1. grok中的match内容:

    1. key:表示所需解析的内容;
    2. value:表示解析的匹配规则,提取出对应的字段;
    3. 解析语法:%{正则模板:自定义字段},其中TIMESTAMP_ISO8601、IPORHOST等都是grok提供的正则模板;
  2. geoip:通过分析IP值,产生IP对应的地理位置信息;

这里是否发现@timestamp与timestamp不一致,@timestamp表示该日志的读取时间,在elasticsearch中作为时间检索索引。下面讲解Nginx日志时,会去修正这一问题。

2. Nginx 访问日志

  • 日志格式
$remote_addr - $remote_user [$time_local]
"$request" $status $body_bytes_sent "$http_referer"
"$http_user_agent" "$http_x_forwarded_for"
  • 日志内容
112.65.171.98 - - [15/Mar/2017:18:18:06 +0800] "GET /index.html HTTP/1.1" 200 1150 "http://www.yourdomain.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" "-"
  • filebeat中prospectors的配置
- document_type: nginx
  paths:
    - /var/log/nginx/access.log #日志文件地址
  input_type: log #从文件中读取
  tail_files: true #以文件末尾开始读取数据
  • logstash中FILTERS配置
filter {
    if [type] == "nginx" {
        grok{
           match => { "message" => "%{COMBINEDAPACHELOG}" }
        }

       date {
            match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601" ]
            target => "@timestamp" #可省略
        }
    }

}
  • 结果

  • Filter配置讲解
  1. grok:

    1. 是不是很不可思议,上一示例中我们匹配规则写了一长串,这个仅仅一个COMBINEDAPACHELOG就搞定了!
    2. grok除了提供上面那种基础的正则规则,还对常用的日志(java,http,syslog等)提供的相应解析模板,本质还是那么一长串正则,[详情见grok的120中正则模板;
  2. date:
    1. match:数组中第一个值为要匹配的时间字段,后面的n个是匹配规则,它们的关系是or的关系,满足一个即可;
    2. target:将match中匹配的时间替换该字段,默认替换@timestamp;

目前为止我们解析的都是单行的日志,向JAVA这样的,若果是多行的日志我们又该怎么做呢?

3. JAVA Log4j 日志

  • 日志内容
‘2017-03-16 15:52:39,580 ERROR TestController:26 - test:
java.lang.NullPointerException
    at com.test.TestController.tests(TestController.java:22)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137)‘
  • filebeat中prospectors的配置
- document_type: tomcat
  paths:
    - /var/log/java/log #日志文件地址
  input_type: log #从文件中读取
  tail_files: true #以文件末尾开始读取数据
  multiline:
    pattern: ^\d{4}
    match: after
    negate: true
  • logstash中FILTERS配置
filter {
    if [type] == "tomcat" {
        grok{
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVALOGMESSAGE:msg}" }
        }

        date {
            match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ]
        }
    }
}
  • 结果

  • Filebeat配置讲解

    1. multiline 合并多行日志:

      1. pattern:匹配规则,这里指匹配每条日志开始的年份;
      2. match:有before与after,这里指从该行开始向后匹配;
      3. negate:是否开始一个新记录,这里指当pattern匹配后,结束之前的记录,创建一条新日志记录;

      当然在logstash input中使用codec multiline设置是一样的

小技巧:关于grok的正则匹配,官方有给出Grok Constructor方法,在这上面提供了debugger、自动匹配等工具,方便大家编写匹配规则

ES Output插件

主要的选项包括:

# action,默认是index,索引文档(logstash的事件)(ES架构与核心概念参考)。
# host,声明ES服务器地址端口
# index,事件写入的ES index,默认是logstash-%{+YYYY.MM.dd},按天分片index,一般来说我们会按照时间分片,时间格式参考http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html。

详情请参考我的另外一篇文章:https://www.cnblogs.com/caoweixiong/p/11791396.html

参考:

https://www.jianshu.com/p/cc8dbd3bb401

原文地址:https://www.cnblogs.com/caoweixiong/p/12576375.html

时间: 2024-07-30 16:25:10

Logstash——解析各类日志文件的相关文章

Logstash处理json格式日志文件的三种方法

假设日志文件中的每一行记录格式为json的,如: {"Method":"JSAPI.JSTicket","Message":"JSTicket:kgt8ON7yVITDhtdwci0qeZg4L-Dj1O5WF42Nog47n_0aGF4WPJDIF2UA9MeS8GzLe6MPjyp2WlzvsL0nlvkohw","CreateTime":"2015/10/13 9:39:59",&

Logstash使用grok解析IIS日志

Logstash使用grok解析IIS日志 1. 安装配置 安装Logstash前请确认Elasticsearch已经安装正确,参见RedHat6.4安装Elasticsearch5.2.0. 下载链接为:logstash-5.2.0.rpm. 下载完成后,rpm -i logstash-5.2.0.rpm即可安装. Logstash默认的配置文件位置为./config和/etc/logstash/,后者已经存在,但直接运行依然会报错: WARNING: Could not find logst

logstash对nginx日志进行解析

logstash对nginx日志进行解析过滤转换等操作:配置可以用于生产环境,架构为filebeat读取日志放入redis,logstash从redis读取日志后进行操作:对user_agent和用户ip也进行了解析操作,便于统计: input { redis { host => "192.168.1.109" port => 6379 db => "0" data_type => "list" key => &qu

logstash+elasticsearch+kibana日志收集

一. 环境准备 角色 SERVER IP logstash agent 10.1.11.31 logstash agent 10.1.11.35 logstash agent 10.1.11.36 logstash central 10.1.11.13 elasticsearch  10.1.11.13 redis 10.1.11.13 kibana 10.1.11.13 架构图如下: 整个流程如下: 1) 远程节点的logstash agent收集本地日志后发送到远程redis的list队列

logstash解析嵌套json格式数据

logstash解析嵌套json格式数据 1.源文件 1.原日志文件为 2019-10-28 09:49:44:947 [http-nio-8080-exec-23] INFO [siftLog][qewrw123ffwer2323fdsafd] - logTime:2019-10-28 09:49:25.833-receiveTime:2019-10-28 09:49:44.044-{"area":"","frontInitTime":0,&q

PHP error_log()将错误信息写入日志文件

error_log() 是发送错误信息到某个地方的一个函数,在程序编程中比较常见,尤其是在程序调试阶段. bool error_log ( string $message [, int $message_type = 0 [, string $destination [, string $extra_headers ]]] ) 把错误信息发送到 web 服务器的错误日志,或者到一个文件里. message 应该被记录的错误信息.信息长度限制:The default seem to be 1024

logstash+elasticsearch +kibana 日志管理系统

Logstash是一个完全开源的工具,他可以对你的日志进行收集.分析,并将其存储供以后使用(如,搜索),您可以使用它.说到搜索,logstash带有一个web界面,搜索和展示所有日志.kibana 也是一个开源和免费的工具,他可以帮助您汇总.分析和搜索重要数据日志并提供友好的web界面.他可以为 Logstash 和 ElasticSearch 提供的日志分析的 Web 界面. 目的就是为了运维.研发很方便的进行日志的查询.Kibana一个免费的web壳:Logstash集成各种收集日志插件,还

logresolve - 解析Apache日志中的IP地址为主机名

logresolve是一个解析Apache访问日志中IP地址的后处理程序. 为了使对名称服务器的影响降到最低,logresolve拥有极为自主的内部散列表缓存, 使每个IP值仅仅在第一次从日志文件中读出时才被解析一次. 此程序从标准输入设备上获得需要解析的Apache日志文件, 其中,IP地址必须在每行的开始处,行中其余信息必须以空格分隔. 概要 logresolve [ -s filename ] [ -c ] < access_log > access_log.new 选项 -s file

GlusterFS源码解析 —— GlusterFS日志解析

Logging.c: /* Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or