zabbix配合脚本监控Kafka

简介:

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka如下特性,受到诸多公司的青睐。

1、高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息(核心目标之一)。

2、支持通过Kafka服务器和消费机集群来分区消息

…………

场景:

Kafka的作用我就不在这BB了,大家可以瞅瞅http://blog.jobbole.com/75328/,总结的非常好。

Kafka监控的几个指标

1、lag:多少消息没有消费

2、logsize:Kafka存的消息总数

3、offset:已经消费的消息

lag = logsize - offset, 主要监控lag是否正常

脚本:

  • spoorer.py文件,获取Kafka中的监控指标内容,并将监控结果写到spooer.log文件中

crontab设置每分钟执行spoorer.py

# -*- coding:utf-8 -*-

import os, sys, time, json, yaml
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kafka import (KafkaClient, KafkaConsumer)

class spoorerClient(object):

    def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url=‘/‘, timeout=3, log_dir=‘/tmp/spoorer‘):
        self.zookeeper_hosts = zookeeper_hosts
        self.kafka_hosts = kafka_hosts
        self.timeout = timeout
        self.log_dir = log_dir
        self.log_file = log_dir + ‘/‘ + ‘spoorer.log‘
        self.kafka_logsize = {}
        self.result = []
    self.log_day_file = log_dir + ‘/‘ + ‘spoorer_day.log.‘ + str(time.strftime("%Y-%m-%d", time.localtime()))
        self.log_keep_day = 1

        try:
            f = file(os.path.dirname(os.path.abspath(__file__)) + ‘/‘ + ‘spoorer.yaml‘)
            self.white_topic_group = yaml.load(f)
        except IOError as e:
            print ‘Error, spoorer.yaml is not found‘
            sys.exit(1)
        else:
            f.close()
            if self.white_topic_group is None:
                self.white_topic_group = {}

        if not os.path.exists(self.log_dir):     os.mkdir(self.log_dir)

    for logfile in [x for x in os.listdir(self.log_dir) if x.split(‘.‘)[-1] != ‘log‘ and x.split(‘.‘)[-1] != ‘swp‘]:
        if int(time.mktime(time.strptime(logfile.split(‘.‘)[-1], ‘%Y-%m-%d‘))) < int(time.time()) - self.log_keep_day * 86400:
            os.remove(self.log_dir + ‘/‘ + logfile)

    if zookeeper_url == ‘/‘:
        self.zookeeper_url = zookeeper_url
    else:
        self.zookeeper_url = zookeeper_url + ‘/‘

def spoorer(self):
    try:
        kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout)
    except Exception as e:
        print "Error, cannot connect kafka broker."
        sys.exit(1)
    else:
        kafka_topics = kafka_client.topics
    finally:
        kafka_client.close()

    try:
        zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout)
        zookeeper_client.start()
    except Exception as e:
        print "Error, cannot connect zookeeper server."
        sys.exit(1)

    try:
        groups = map(str,zookeeper_client.get_children(self.zookeeper_url + ‘consumers‘))
    except NoNodeError as e:
        print "Error, invalid zookeeper url."
        zookeeper_client.stop()
        sys.exit(2)
    else:
        for group in groups:
            if ‘offsets‘ not in zookeeper_client.get_children(self.zookeeper_url + ‘consumers/%s‘ % group): continue
            topic_path = ‘consumers/%s/offsets‘ % (group)
            topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path))
            if len(topics) == 0: continue

            for topic in topics:
                if topic not in self.white_topic_group.keys():
                    continue
                elif group not in self.white_topic_group[topic].replace(‘ ‘,‘‘).split(‘,‘):
                    continue
                partition_path = ‘consumers/%s/offsets/%s‘ % (group,topic)
                partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path))

                for partition in partitions:
                    base_path = ‘consumers/%s/%s/%s/%s‘ % (group, ‘%s‘, topic, partition)
                    owner_path, offset_path = base_path % ‘owners‘, base_path % ‘offsets‘
                    offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0]

                    try:
                        owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0]
                    except NoNodeError as e:
                        owner = ‘null‘

                    metric = {‘datetime‘:time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), ‘topic‘:topic, ‘group‘:group, ‘partition‘:int(partition), ‘logsize‘:None, ‘offset‘:int(offset), ‘lag‘:None, ‘owner‘:owner}
                    self.result.append(metric)
    finally:
        zookeeper_client.stop()

    try:
        kafka_consumer = KafkaConsumer(bootstrap_servers=self.kafka_hosts)
    except Exception as e:
        print "Error, cannot connect kafka broker."
        sys.exit(1)
    else:
        for kafka_topic in kafka_topics:
            self.kafka_logsize[kafka_topic] = {}
            partitions = kafka_client.get_partition_ids_for_topic(kafka_topic)

            for partition in partitions:
                offset = kafka_consumer.get_partition_offsets(kafka_topic, partition, -1, 1)[0]
                self.kafka_logsize[kafka_topic][partition] = offset

        with open(self.log_file,‘w‘) as f1, open(self.log_day_file,‘a‘) as f2:

            for metric in self.result:
                logsize = self.kafka_logsize[metric[‘topic‘]][metric[‘partition‘]]
                metric[‘logsize‘] = int(logsize)
                metric[‘lag‘] = int(logsize) - int(metric[‘offset‘])

                f1.write(json.dumps(metric,sort_keys=True) + ‘\n‘)
                f1.flush()
                f2.write(json.dumps(metric,sort_keys=True) + ‘\n‘)
                f2.flush()
    finally:
        kafka_consumer.close()

    return ‘‘

if __name__ == ‘__main__‘:
    check = spoorerClient(zookeeper_hosts=‘zookeeperIP地址:端口‘, zookeeper_url=‘znode节点‘, kafka_hosts=‘kafkaIP:PORT‘, log_dir=‘/tmp/log/spoorer‘, timeout=3)
    print check.spoorer()
  • spoorer.py读取同一目录的spoorer.yaml配置文件

格式:

kafka_topic_name:
    group_name1,
    group_name2,
(group名字缩进4个空格,严格按照yaml格式)
  • spoorer.log数据格式

{"datetime": "2016-03-18 11:36:02", "group": "group_name1", "lag": 73, "logsize": 28419259, "offset": 28419186, "owner": "消费partition线程", "partition": 3, "topic": "kafka_topic_name"}

monitor_kafka.sh脚本检索spoorer.log文件,并配合zabbix监控

#!/bin/bash

    topic=$1
group=$2
#$3可取值lag、logsize、offset
class=$3

case $3 in
lag)
echo "`cat /tmp/log/spoorer/spoorer.log | grep -w \\"${topic}\\" | grep -w \\"${group}\\" |awk -F‘[ ,]‘ ‘{sum+=$9}‘END‘{print sum}‘`"
;;
logsize)
echo "`cat /tmp/log/spoorer/spoorer.log | grep -w \\"${topic}\\" | grep -w \\"${group}\\" |awk -F‘[ ,]‘ ‘{sum+=$12}‘END‘{print sum}‘`"
;;
offset)
echo "`cat /tmp/log/spoorer/spoorer.log | grep -w \\"${topic}\\" | grep -w \\"${group}\\" |awk -F‘[ ,]‘ ‘{sum+=$15}‘END‘{print sum}‘`"
;;
*)
echo "Error input:"
;;
esac
exit 0

zabbix_agentd.conf扩展配置

UserParameter=kafka.lag[*],/usr/local/zabbix-2.4.5/script/monitor_kafka.sh $1 $2 lag
UserParameter=kafka.offset[*],/usr/local/zabbix-2.4.5/script/monitor_kafka.sh $1 $2 offset
UserParameter=kafka.logsize[*],/usr/local/zabbix-2.4.5/script/monitor_kafka.sh $1 $2 logsize

zabbix设置Key

kafka.lag[kafka_topic_name,group_name1]
kafka.logsize[kafka_topic_name,group_name1]
kafka.offset[kafka_topic_name,group_name1]
  • 出现问题第一时间发送报警消息。

报警的Trigger触发规则也是对lag的值做报警,具体阀值设置为多少,还是看大家各自业务需求了。

接收告警消息可以选择邮件和短信、网上教程也比较多,教程帖子:
http://www.iyunv.com/thread-22904-1-1.html 10 http://www.iyunv.com/thread-40998-1-1.html 12

如果觉得自己搞这些比较麻烦的话,也可以试试 OneAlert 一键集成zabbix,短信、电话、微信、APP啥都能搞定,还免费,用着不错。
http://www.onealert.com/activity/zabbix.html 37



原文地址:https://www.cnblogs.com/xionggeclub/p/9087141.html

时间: 2024-07-31 06:11:37

zabbix配合脚本监控Kafka的相关文章

zabbix 自定义脚本监控配置之网卡

注:要添加自定义脚本监控,必须升级zabbix agent版本至2.0.0以上, 一:配置步骤 1. 完成自定义监控脚本的编写(windows或linux脚本) 脚本要求: (1)既然是监控,那必然要有输出结果值(字符串,数字皆可) (2)必须要求zabbix用户有执行权限,当然可以直接设置所有用户都有执行权限(chmod 777 脚本文件) (3)若脚本需要传入参数,按照参数传入的顺序,在脚本中可用$1-$9来引用传入的参数 2 找到zabbix agent的配置文件zabbix_agentd

zabbix自定义脚本监控pps(Packets per Second,包转发率)

一:介绍 网络的性能通常用吞吐率(throughput)这个指标来衡量.常用的网络吞吐率的单位有:PPS(即每秒发送多少个分组数据包).BPS(Bytes Per Second;即每秒发送多少字节).bPS (bits Per Second;即每秒发送多少比特).TPS(TransactionsPer Second;即每秒完成多少次发送过程). pps:(包每秒)包转发率标志了交换机转发数据包能力的大小.一般交换机的包转发率在几十Kpps到几百Mpps.包转发速率是指交换机每秒可以转发多少百万个

zabbix运行脚本监控ggsci报错

/u01/app/oracle/oracle/ogg/ggsci: error while loading shared libraries: libdb-6.1.so: cannot open shared object file: No such file or directory 增加脚本环境变量设置 PATH=$PATH:$HOME/bin export ORACLE_BASE=/u01/app/oracle export ORACLE_HOME=$ORACLE_BASE/11/db_1

zabbix自定义脚本监控pps(Packets per Second,包转发率)_下

接上篇,Linux的做完了,再搞windows的. 首先查了下windows的在哪里获取,在windows的性能计数器中可以获取,叫做Packets Received/sec(获取出来的值就可以直接使用,不像Linux还得需要求差) 那如何获取呢: 1,新建个ITEM,使用这个KEY,perf_counter[]来实现,这个key直接获取性能计数器的数值. 那么只需要找到需要监控的网卡的包转发率的名称 2,获取windows性能计数器上的值 方法一:在win 的 命令cmd窗口下,运行  typ

zabbix调用脚本监控 或者调用程序

zabbix监控kafka消费

一.Kafka监控的几个指标 1.lag:多少消息没有消费 lag=logsize-offset 2.logsize:Kafka存的消息总数 3.offset:已经消费的消息 Kafka管理工具 介绍: https://www.iteblog.com/archives/1605.html 二.查看zookeeper配置 cat /home/app/zookeeper/zookeeper/conf/zoo.cfg | egrep -v "^$|^#" clientPort=2181 三.

zabbix自定义脚本做监控及自制模板初探

一.说明 zabbix监控支持自定义脚本以及自制模板来扩展监控,换句话说就是对业务自定义监控;因此通过撰写脚本完成自定义监控十分有必要;这里的脚本既可以用shell也可以用python等语言;另外自定义了脚本主要目的是获取业务相关的监控数据;还需要结合zabbix web GUI上的模板才能生效;本文的目的就是基于之前的zabbix相关部署操作之后的补充! 任务:通过撰写脚本获取tcp 的各种状态,添加tcp状态模板,添加触发器;添加图形:完整实现脚本自定义监控的整个步聚流程! 二.agent端

关于在zabbix监测脚本中使用ps命令监控进程CPU使用率和内存使用率,获得数据为0的情况描述

前提:想自己编写zabbix监测脚本,然后通过配置模板的方式,实现对资源(cpu和内存)使用率高的进程进行监控. 过程描述:zabbix版本为2.21,被监控主机操作系统为CentOS 6.4.脚本中主要命令如下:percent=0; #通过脚本输入参数process=$1; #通过ps aux参数,获取CPU%和MEM%值,使用awk将第四行的MEM%值筛选出来percent=ps aux | grep $process | grep -v grep | head -1 | awk '{pri

Zabbix对客户端监控+报警

环境说明: node1:zabbix服务器 IP地址:172.16.4.100 node2:zabbix客户端 IP地址:172.16.4.101 配置环境:监控node2主机的网卡流量(流入.流入),以及报警和报警升级 文章概览 1.使用zabbix监控客户端主机    1.1 客户端设置    1.2 定义主机组    1.3 定义主机    1.4 定义应用集    1.5 定义监控项目    1.6 定义出图2.报警设置    2.1 触发器    2.2 示警媒介Medias: