Zabbix与ELK整合实现对日志数据的实时监控

一、 ELK与zabbix有什么关系?

ELK大家应该比较熟悉了,zabbix应该也不陌生,那么将ELK和zabbix放到一起的话,可能大家就有疑问了?这两个放到一起是什么目的呢,听我细细道来。

ELK是一套日志收集套件,它其实有由Elasticsearch、Logstash和Kibana三个软件组成,通过ELK可以收集系统日志、网站日志、应用系统日志等各种日志数据,并且还可以对日志进行过滤、清洗,然后进行集中存放并可用于实时检索、分析。这是ELK的基础功能。

但是有些时候,我们希望在收集日志的时候,能够将日志中的异常信息(警告、错误、失败等信息)及时的提取出来,因为日志中的异常信息意味着操作系统、应用程序可能存在故障,如果能将日志中的故障信息及时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以避免很多故障的发生。

那么如何才能做到将ELK收集的日志数据中出现的异常信息及时的告知运维人员呢,这就需要用到zabbix了,ELK(更确切的说应该是logstash)可以实时的读取日志的内容,并且还可以过滤日志信息,通过ELK的读取和过滤功能,就可以将日志中的一些异常关键字(error、failed、OutOff、Warning)过滤出来,然后通过logstash的zabbix插件将这个错误日志信息发送给zabbix,那么zabbix在接收到这个数据后,结合自身的机制,然后发起告警动作,这样就实现了日志异常zabbix实时告警的功能了。

二、Logstash与zabbix插件的使用

Logstash支持多种输出介质,比如syslog、HTTP、TCP、elasticsearch、kafka等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了logstash-output-zabbix这个插件,此插件可以将Logstash与zabbix进行整合,也就是将Logstash收集到的数据进行过滤,将有错误标识的日志输出到zabbix中,最后通过zabbix的告警机制进行触发、告警。

logstash-output-zabbix是一个社区维护的插件,它默认没有在Logstash中安装,但是安装起来也很容易,直接在logstash中运行如下命令即可:

/usr/local/logstash/bin/logstash-plugin install logstash-output-zabbix

其中,/usr/local/logstash是Logstash的安装目录。

此外,logstash-plugin命令还有多种用法,我们来看一下:

2.1、列出目前已经安装的插件

将列出所有已安装的插件

/usr/local/logstash/bin/logstash-plugin list

#将列出已安装的插件及版本信息

/usr/local/logstash/bin/logstash-plugin list --verbose

#将列出包含namefragment的所有已安装插件

/usr/local/logstash/bin/logstash-plugin list "http"

#将列出特定组的所有已安装插件( input,filter,codec,output)

/usr/local/logstash/bin/logstash-plugin list --group input

2.2、安装插件

要安装某个插件,例如安装kafka插件,可执行如下命令:

/usr/local/logstash/bin/logstash-plugin install logstash-output-kafka
要使用此命令安装插件,需要你的电脑可以访问互联网。此插件安装方法,会检索托管在公共存储库(RubyGems.org)上的插件,然后下载到本地机器并在Logstash安装之上进行自动安装。

2.3、更新插件

每个插件有自己的发布周期和版本更新,这些更新通常是独立于Logstash的发布周期的。因此,有时候需要单独更新插件,可以使用update子命令获得最新版本的插件。

将更新所有已安装的插件

/usr/local/logstash/bin/logstash-plugin update

将仅更新指定的插件

/usr/local/logstash/bin/logstash-plugin update logstash-output-kafka

2.4、删除插件

如果需要从Logstash插件中删除插件,可执行如下命令:

/usr/local/logstash/bin/logstash-plugin remove logstash-output-kafka

这样就删除了logstash-output-kafka插件。

三、logstash-output-zabbix插件的使用

logstash-output-zabbix安装好之后,就可以在logstash配置文件中使用了,
下面是一个logstash-output-zabbix使用的例子:

zabbix {
        zabbix_host => "[@metadata][zabbix_host]"
        zabbix_key => "[@metadata][zabbix_key]"
        zabbix_server_host => "x.x.x.x"
        zabbix_server_port => "xxxx"
        zabbix_value => "xxxx"
        }

其中:

zabbix_host:表示Zabbix主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必需的设置,没有默认值。

zabbix_key:表示Zabbix项目键的值,也就是zabbix中的item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。

zabbix_server_host:表示Zabbix服务器的IP或可解析主机名,默认值是 "localhost",需要设置为zabbix server服务器所在的地址。

zabbix_server_port:表示Zabbix服务器开启的监听端口,默认值是10051。

zabbix_value:表示要发送给zabbix item监控项的值对应的字段名称,默认值是 "message",也就是将"message"字段的内容发送给上面zabbix_key定义的zabbix item监控项,当然也可以指定一个具体的字段内容发送给zabbix item监控项。

四、将logstash与zabbix进行整合

这里我们以logstash收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用zabbix告警的流程,来看看如何配置logstash实现zabbix告警。

先说明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,例如ERR、error、ERROR、Failed、WARNING等,将日志中这些信息过滤出来,然后发送到zabbix上,最后借助zabbix的报警功能实现对系统日志中有上述关键字的告警。

对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,例如对http系统,可能需要过滤500、403、503等这些错误码,对于java相关的系统,可能需要过滤OutOfMemoryError、PermGen、Java heap等关键字。在某些业务系统的日志输出中,可能还有一些自定义的错误信息,那么这些也需要作为过滤关键字来使用。

4.1、配置logstash事件配置文件

接下来就是创建一个logstash事件配置文件,这里将配置文件分成三个部分来介绍,首先是input部分,内容如下:

input {
        file {
        path => ["/var/log/secure"]
        type => "system"
        start_position => "beginning"
        }
}

input部分是从/var/log/secure文件中读取数据,start_position 表示从secure文件开头读取内容。

接着是filter部分,内容如下:

filter {
    grok {
             match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:messag
e_pid}\])?: %{GREEDYDATA:message_content}" }        #这里通过grok对message字段的数据进行字段划分,这里将message字段划分了5个子字段。其中,message_content字段会在output中用到。
        }

      mutate {
             add_field => [ "[zabbix_key]", "oslogs" ]      #新增的字段,字段名是zabbix_key,值为oslogs。
             add_field => [ "[zabbix_host]", "%{host}" ]   #新增的字段,字段名是zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{host}获取的就是日志数据的主机名,这个主机名与zabbix web中“主机名称”需要保持一致。
         }
        mutate {        #这里是删除不需要的字段
            remove_field => "@version"
            remove_field => "message"
        }
        date {      #这里是对日志输出中的日期字段进行转换,其中message_timestamp字段是默认输出的时间日期字段,将这个字段的值传给 @timestamp字段。
                match => [ "message_timestamp","MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]
        }
}

filter部分是个重点,在这个部分中,重点关注的是message_timestamp字段、message_content字段。

最后是output部分,内容如下:

output {
        if [message_content]  =~ /(ERR|error|ERROR|Failed)/  {      #定义在message_content字段中,需要过滤的关键字信息,也就是在message_content字段中出现给出的这些关键字,那么就将这些信息发送给zabbix。
              zabbix {
                        zabbix_host => "[zabbix_host]"      #这个zabbix_host将获取上面filter部分定义的字段变量%{host}的值
                        zabbix_key => "[zabbix_key]"        #这个zabbix_key将获取上面filter部分中给出的值
                        zabbix_server_host => "172.16.213.140"  #这是指定zabbix server的IP地址
                        zabbix_server_port => "10051"           #这是指定zabbix server的监听端口
                        zabbix_value => "message_content"              #这个很重要,指定要传给zabbix监控项item(oslogs)的值, zabbix_value默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面filter部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容。
                        }
                    }
              #stdout { codec => rubydebug }   #这里是开启调试模式,当第一次配置的时候,建议开启,这样过滤后的日志信息直接输出的屏幕,方便进行调试,调试成功后,即可关闭。
}

将上面三部分内容合并到一个文件file_to_zabbix.conf中,然后启动logstash服务:

[[email protected] ~]#cd /usr/local/logstash
[[email protected] logstash]#nohup bin/logstash -f config/file_to_zabbix.conf --path.data /data/osdata &

这里的–path.data是指定此logstash进程的数据存储目录,用于在一个服务器上启动多个logstash进程的环境中。

4.2、zabbix平台配置日志告警

登录zabbix web平台,选择配置—>模板—>创建模板,名称定为logstash-output-zabbix,如下图所示:

接着,在此模块下创建一个应用集,点击应用集—–>创建应用集,如下图所示:

然后,在此模块下创建一个监控项,点击监控项—–>创建监控项,如下图所示:

到此为止,zabbix监控logstash的日志数据配置完成。

这里我们以客户端172.16.213.157主机为例,也就是监控172.16.213.157主机上的系统日志数据,当发现日志异常就进行告警。

在上面创建了一个模板和监控项,接着还需要将此模板链接到172.16.213.157主机上,选择“配置”—“主机”,然后选中172.16.213.157主机,选择“模板”标签,将上面创建的模板加入到172.16.213.157主机上,如下图所示:

这样,上面创建的监控项在172.16.213.157主机上就自动生效了。

下面我们模拟一个故障,在任意主机通过ssh登录172.16.213.157主机,然后输入一个错误密码,让系统的/var/log/secure文件产生错误日志,然后看看logstash是否能够过滤到,是否能够发送到zabbix中。

首先让系统文件/var/log/secure产生类似如下内容:

Sep  5 16:01:04 localhost sshd[27159]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=172.16.213.127  user=root
Sep  5 16:01:06 localhost sshd[27159]: Failed password for root from 172.16.213.127 port 59913 ssh2

这里面有我们要过滤的关键字Failed,因此logstash会将此内容过滤出来,发送到zabbix上。

接着,登录zabbix web平台,点击监测中—–>最新数据,如果zabbix能够接收到日志,就可以看到下图的最新数据:

点击历史记录,可以查看详细内容,如下图所示:

可以看到,红框中的内容就是在logstash中定义的message_content字段的内容。

到这里为止,zabbix已经可以收到logstash的发送过来的数据了,但是要实现报警,还需要在zabbix中创建一个触发器,进入配置—–>模板,选择logstash-output-zabbix这个模板,然后点击上面的触发器,继续点击右上角的创建触发器,如下图所示:

这里注意触发器创建中,表达式的写法,这里触发器的触发条件是:如果接收到logstash发送过来的数据,就进行告警,也就是说接收到的数据,如果长度大于0就告警。

触发器配置完成后,如果配置正常,就会进行告警了,告警内容如下图所示:

原文地址:https://www.cnblogs.com/flytor/p/11440799.html

时间: 2024-10-13 19:38:20

Zabbix与ELK整合实现对日志数据的实时监控的相关文章

mysql数据库数据变化实时监控

对于二次开发来说,很大一部分就找找文件和找数据库的变化情况 对于数据库变化.还没有发现比较好用的监控数据库变化监控软件. 今天,我就给大家介绍一个如何使用mysql自带的功能监控数据库变化 1.打开数据库配置文件my.ini (一般在数据库安装目录)(D:\MYSQL) 2.在数据库的最后一行添加 log=log.txt 代码 3.重启mysql数据库 4.去数据库数据目录 我的是(D:\MYSQL\data) 你会发现多了一个log.txt文件 我的是在C:\Documents and Set

ELK整合Filebeat监控nginx日志

ELK 日志分析 1. 为什么用到 ELK 一般我们需要进行日志分析场景:直接在日志文件中 grep. awk 就可以获得自己想要的信息.但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档.文本搜索太慢怎么办.如何多维度查询.需要集中化的日志管理,所有服务器上的日志收集汇总.常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问. 一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位

开源分布式搜索平台ELK+Redis+Syslog-ng实现日志实时搜索

logstash + elasticsearch + Kibana+Redis+Syslog-ng ElasticSearch是一个基于Lucene构建的开源,分布式,RESTful搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便.支持通过HTTP使用JSON进行数据索引. logstash是一个应用程序日志.事件的传输.处理.管理和搜索的平台.你可以用它来统一对应用程序日志进行收集管理,提供 Web 接口用于查询和统计.其实logstash是可以被别的替换,比如常见

ELK+Filebeat 集中式日志解决方案详解

原文:ELK+Filebeat 集中式日志解决方案详解 链接:https://www.ibm.com/developerworks/cn/opensource/os-cn-elk-filebeat/index.html?ca=drs- ELK Stack 简介 ELK 不是一款软件,而是 Elasticsearch.Logstash 和 Kibana 三种软件产品的首字母缩写.这三者都是开源软件,通常配合使用,而且又先后归于 Elastic.co 公司名下,所以被简称为 ELK Stack.根据

logstash将Kafka中的日志数据订阅到HDFS

前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式.但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉.那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年.一年甚至更长时间的大数据分析.下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs. 一:安装logstash(下载tar包安装也行,我直接yum装了) #y

万能日志数据收集器 Fluentd - 每天5分钟玩转 Docker 容器技术(91)

前面的 ELK 中我们是用 Filebeat 收集 Docker 容器的日志,利用的是 Docker 默认的 logging driver json-file,本节我们将使用 fluentd 来收集容器的日志. Fluentd 是一个开源的数据收集器,它目前有超过 500 种的 plugin,可以连接各种数据源和数据输出组件.在接下来的实践中,Fluentd 会负责收集容器日志,然后发送给 Elasticsearch.日志处理流程如下: 这里我们用 Filebeat 将 Fluentd 收集到的

使用开源工具ELK可视化 Azure NSG日志

国内的Azure最近上线了网络观察程序服务,可以帮助用户监控和分析VNET虚拟网络.其中一个很重要的功能就是可以记录NSG的安全访问日志了.但是如果用户设置了NSG流日志,并下载日志想要分析一下的话,会发现日志其实并不是很友好,NSG流日志是以json格式记录的,可以看到的内容大致如下图所示:日志会记录NSG规则名,系统时间,源地址,目的地址,源端口,目的端口,协议类型,流量方向和处理规则. 不过所有的记录都连在一起,如果要查找某个具体的安全访问记录,非常困难. 不过我们可以使用多个多种开源工具

ELK之方便的日志收集、搜索、展示工具

大家在做分部署系统开发的时候是不是经常因为查找日志而头疼,因为各服务器各应用都有自己日志,但比较分散,查找起来也比较麻烦,今天就给大家推荐一整套方便的工具ELK,ELK是Elastic公司开发的一整套完整的日志分析技术栈,它们是Elasticsearch,Logstash,和Kibana,简称ELK.Logstash做日志收集分析,Elasticsearch是搜索引擎,而Kibana是Web展示界面. 1.日志收集分析Logstash LogstashLogstash 是一个接收,处理,转发日志

Elastic Stack实战学习教程~日志数据的收集、分析与可视化

Elastic Stack介绍 近几年,互联网生成数据的速度不断递增,为了便于用户能够更快更精准的找到想要的内容,站内搜索或应用内搜索成了不可缺少了的功能之一.同时,企业积累的数据也再不断递增,对海量数据分析处理.可视化的需求也越来越高. 在这个领域里,开源项目ElasticSearch赢得了市场的关注,比如,去年Elastic公司与阿里云达成合作伙伴关系提供阿里云 Elasticsearch 的云服务.今年10月Elastic公司上市,今年11月举行了Elastic 中国开发者大会.目前各大云