通过itchat调用zabbixAPI实现微信确认zabbix告警

环境:python3.6.1 ,pip 9.0.1

1,通过itchat,实现微信的扫码登录,和关键定识别

#coding=utf-8
import urllib.request, urllib.error, urllib.parse
import sys
import json
import argparse
from login import *
from function import *
########################################################################

import itchat
import time
import re
import requests
import threading
import shutil
import os

‘‘‘
reload(sys)
sys.setdefaultencoding(‘utf-8‘)
‘‘‘
self_name=‘PH‘

@itchat.msg_register(itchat.content.TEXT)
def cmd(revmsg):
    msg = revmsg[‘Text‘]

    if msg == "注销登录":
        quit()
        print("已注销")

#    elif msg == "检索事件":
#         getevents()

    elif str.isdigit("".join(re.findall(r‘查询事件(\d+)‘,msg))):
         print(msg)
         eventid = "".join(re.findall(r‘查询事件(\d+)‘,msg))
         print(eventid)
         itchat.send(‘识别到事件id,开始查询事件‘+eventid,toUserName = revmsg[‘FromUserName‘])
         getevent(eventid,revmsg[‘FromUserName‘])      

    elif str.isdigit("".join(re.findall(r‘确认事件(\d+)‘,msg))):
        eventid = "".join(re.findall(r‘确认事件(\d+)‘,msg))
        #messages = "".join(re.findall(r‘确认消息<(.+)>‘,msg))
        #有确认消息,不关闭问题
        if "".join(re.findall(r‘(确认消息)<.+>‘,msg)) == "确认消息" and "".join(re.findall(r‘【关闭】‘,msg)) == "":
            messages = "".join(re.findall(r‘确认消息<(.+)>‘,msg))
            itchat.send("识别到事件id,开始确认事件"+eventid+",确认消息为:"+messages,toUserName = revmsg[‘FromUserName‘])
            ackevent(eventid,revmsg[‘FromUserName‘],messages)
        #无确认消息,关闭问题
        elif "".join(re.findall(r‘(确认消息)<.+>‘,msg)) == "" and "".join(re.findall(r‘【关闭】‘,msg)) == "【关闭】":
            itchat.send("识别到事件id,开始确认事件"+eventid+",无确认消息。关闭问题。",toUserName = revmsg[‘FromUserName‘])
            ackevent(eventid,revmsg[‘FromUserName‘],action=1)
        #有确认消息,关闭问题
        elif "".join(re.findall(r‘(确认消息)<.+>‘,msg)) == "确认消息" and "".join(re.findall(r‘【关闭】‘,msg)) == "【关闭】":
            messages = "".join(re.findall(r‘确认消息<(.+)>‘,msg))
            itchat.send("识别到事件id,开始确认事件"+eventid+",确认消息为:"+messages+"。关闭问题。",toUserName = revmsg[‘FromUserName‘])
            ackevent(eventid,revmsg[‘FromUserName‘],messages,1)
        #无确认消息,不关闭问题。
        else:
            print("识别到事件id:"+eventid)
            itchat.send("识别到事件id,开始确认事件:"+eventid,toUserName = revmsg[‘FromUserName‘])
            ackevent(eventid,revmsg[‘FromUserName‘])      

    elif msg == "查询告警":
         gettrigger(revmsg[‘FromUserName‘])
         print("查询问题触发器")

  #  elif msg == "告警":
  #       gaojing()
  #       print("执行告警")

    else:
        welcome = "您好,欢迎使用zabbix微信告警系统!你可以回复关键字\n(查询告警、确认事件、查询事件等)实现功能\n如:确认事件12345,确认消息<已解决>【关闭】(确认消息以“<>”为分隔符)\n如:查询事件1234\n更多功能正在研发中,敬请期待!"
        itchat.send(welcome,toUserName = revmsg[‘FromUserName‘])
#######################################
#######################################
def quit():
    itchat.logout()
#######################################
def gaojing(fromuser):
    users = itchat.search_friends(name=u‘Feiger‘)
    userName = users[0][‘UserName‘]
    #登陆zabbix获取auth
    auth = authenticate(url, username, password)
    #状态0是启用监控,1是禁用监控
    status=1
    #定义操作ip
    hostip=‘192.168.15.139‘
    #通过hostip获取zabbix
    hostids=ipgetHostsid(hostip,url,auth)
    hostid=hostids[0][‘hostid‘]
    alerts=actionidgetalert(url,auth)
    xiaoxi=str(alerts[0])
    itchat.send(xiaoxi,toUserName = userName)

def gettrigger(fromuser): #获取当前触发器
    auth = authenticate(url, username, password)
    trigerIDs = gettrigetID(url,auth)    #获取触发器列表
    for trigerIDresault in trigerIDs:    #遍历触发器
        eventlist = triggergetevents(trigerIDresault[‘triggerid‘],url,auth)  #获取事件列表
        #for event in eventlist:        #遍历事件
        #   if event[‘acknowledged‘] == ‘0‘ and event[‘value‘] == ‘0‘:
        #       print(event)
        #       print(event[‘eventid‘])
        #   else:
        #       continue
    xiaoxi=str(trigerIDs)
    itchat.send(xiaoxi,toUserName = fromuser)

def ackevent(eventid,fromuser,message="已确认(微信默认消息)",action=0):  #确认事件
    auth = authenticate(url, username, password)
    #执行事件确认操作,并返回已确认事件的对象
    eventobj=eventackknowledge(eventid,url,auth,message,action)  #调取确认事件的方法
    if ‘eventids‘ in eventobj: #eventobj这事件ID的字典
        itchat.send(str(eventobj[‘eventids‘])+"事件确认成功",toUserName = fromuser)
    else:
        itchat.send("确认事件失败:"+eventobj,toUserName = fromuser)

def getevent(eventid,fromuser):
    auth = authenticate(url, username, password)
    event=eventget(eventid,url,auth)  #查询事件
    itchat.send(str(event),toUserName = fromuser)

def getevents():   #通过时间获取事件
    auth = authenticate(url, username, password)
    users = itchat.search_friends(name=u‘Feiger‘)
    userName = users[0][‘UserName‘]
    events = timegetevents("1523030400","1523116800",url,auth)
    print(events)
    #xiaoxi=str(events[0])
    #itchat.send(xiaoxi,toUserName = userName)

#######################################
if __name__ == ‘__main__‘:
    itchat.auto_login(enableCmdQR=2)
    itchat.run()

2,zabbix登录逻辑

#!/usr/bin/env python
#coding:utf-8
import urllib.request, urllib.error, urllib.parse
import json
#定义URL账户密码
url = ‘http://192.168.165.179:8027/api_jsonrpc.php‘
username = ‘admin‘
password = ‘zabbix‘
#定义通过HTTP方式访问API地址的函数,后面每次请求API的各个方法都会调用这个函数
def requestJson(url,values):
    data = json.dumps(values)
    data=bytes(data,‘utf8‘)
    req = urllib.request.Request(url, data, {‘Content-Type‘: ‘application/json-rpc‘})
    response = urllib.request.urlopen(req, data)
    output = json.loads(response.read())
    print(output)
    try:
        message = output[‘result‘]
    except:
        message = output[‘error‘][‘data‘]
        print(message)

    return message

#API接口认证的函数,登录成功会返回一个Token
def authenticate(url, username, password):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘user.login‘,
              ‘params‘: {
                  ‘user‘: username,
                  ‘password‘: password
              },
              ‘id‘: ‘0‘
              }
    idvalue = requestJson(url,values)
    return idvalue

3,zabbixAPI,此段代码也是网上找到的,自己看了Zabbix的官方文档添加了些许功能。理论上来说可以通过微信触发关键字的方式实现一切对zabbix的操作。

#coding=utf-8
import sys
import argparse
import urllib.request, urllib.error, urllib.parse
import json
from login import *
#定义检索问题状态下所有触发器的ID,名称和严重性,并按降序对其进行严重性排序。
def gettrigetID(url,auth):
    values = {
        "jsonrpc" : "2.0" ,
        "method" : "trigger.get" ,
        "params" : {
            "output" : [
                "triggerid" ,
                "description" ,
                "priority"
                      ] ,
            "filter" : {
            "value" : 1
             } ,
        "sortfield" : "priority" ,
        "sortorder" : "DESC"
    } ,
    "auth" : auth ,
    "id" : 1
    }
    output = requestJson(url,values)
    return output
#触发器检索事件,返回一系列以事件为元素的列表。
def triggergetevents(triggerID,url,auth):
    values = {
    "jsonrpc": "2.0",
    "method": "event.get",
    "params": {
        "output": ["eventid","objectid"],
        "select_acknowledges":"extend",
        #"output": "extend",
        "objectids": triggerID,
        "sortfield": ["clock", "eventid"],
        "sortorder": "DESC"
    },
    "auth": auth,
    "id": 1
    }
    output = requestJson(url,values)
    return output
#按时间检索事件
def timegetevents(time_from,time_till,url,auth):
    values = {
    "jsonrpc": "2.0",
    "method": "event.get",
    "params": {
        "output": "extend",
        "time_from": time_from,
        "time_till": time_till,
        "sortfield": ["clock", "eventid"],
        "sortorder": "desc"
    },
    "auth": auth,
    "id": 1
    }
    output = requestJson(url,values)
    return output
#通过eventID查询事件
def eventget(eventid,url,auth):
    values = {
            "jsonrpc": "2.0",
            "method": "event.get",
            "params": {
                    "output": ["eventid","acknowledged","objectid"],
                    "select_acknowledges":["eventid","message","action","alias"],
                    "eventids": eventid,
                        },
            "auth": auth,
            "id": 1
            }
    output = requestJson(url,values)
    return output
#定义确认事件方法

def eventackknowledge(eventid,url,auth,message="已确认(微信默认确认消息)",action=0):
   values = {
            ‘jsonrpc‘ : ‘2.0‘,
            ‘method‘ : ‘event.acknowledge‘,
            ‘params‘ : {
                      ‘eventids‘ : eventid,
                      ‘message‘ : message,
                      ‘action‘ : action
                      },
            ‘auth‘ : auth,
            ‘id‘ : 1
           }
   print("调用了eventacknowledge方法,正在确认事件:%s"%(eventid))
   output = requestJson(url,values)
   if ‘eventids‘ in output:
        print("确认事件成功")
   else:
        print("确认事件失败")
   return output

#通过动作ID检索警报
def actionidgetalert(url,auth):
    values = {
    "jsonrpc": "2.0",
    "method": "alert.get",
    "params": {
        "output": "extend",
        "actionids": ‘7‘
    },
    "auth":auth,
    "id": 1
    }
    output = requestJson(url,values)
    return output
#定义更新action函数
def mediatypeupdate(mediatypeid,status,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘mediatype.update‘,
              ‘params‘: {
                  "mediatypeid": mediatypeid,
                  "status": status
              },
              ‘auth‘: auth,
              ‘id‘: ‘1‘
              }
    output = requestJson(url,values)
#定义读取状态函数
def triggerget(auth):
    values = {‘jsonrpc‘: ‘2.0‘,
           "method":"trigger.get",
               "params": {
                        "output": [
                        "triggerid",
                        "description",
                        "priority"
                        ],
              "filter": {
                         "value": 1
                         },
              "expandData":"hostname",
              "sortfield": "priority",
              "sortorder": "DESC"
            },
              ‘auth‘: auth,
              ‘id‘: ‘2‘
              }
    output = requestJson(url,values)
    return output

#定义通过ip获取主机id的函数
def ipgetHostsid(ip,url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘host.get‘,
              ‘params‘: {
                  ‘output‘: [ "host" ],
                  ‘filter‘: {
                      ‘ip‘: ip
                  },
              },
              ‘auth‘: auth,
              ‘id‘: ‘3‘
              }
    output = requestJson(url,values)
    return output

#定义通过主机id获取开启关闭监控函数
def idupdatehost(status,hostid,url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘host.update‘,
              ‘params‘: {
                  "hostid": hostid,
                  "status": status
              },
              ‘auth‘: auth,
              ‘id‘: ‘4‘
              }
    output = requestJson(url,values)
    return output
#定义通过项目hostid获取itemid函数
def getHostsitemsid(hostid,itemsname,url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: "item.get",
              "params": {
                    "output": ["itemids"],
                    "hostids": hostid,
            "filter":{
                    "key_": itemsname,
                },
                },

              ‘auth‘: auth,
              ‘id‘: ‘5‘
              }
    output = requestJson(url,values)
    if len(output)==0:
        return output
    else:
        return output[0][‘itemid‘]

#定义通过项目id获取监控项目最近值信息的函数
def getHostsitemsvalue(itemid,url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: "history.get",
              "params": {
                    "output": "extend",
                    "history":3,
                    "itemids":itemid,
                    "sortfield": "clock",
                    "sortorder": "DESC",
                    "limit":1,
                },

              ‘auth‘: auth,
              ‘id‘: ‘6‘
              }
    output = requestJson(url,values)
    if len(output)==0:
        return output
    else:
        return output[0]["value"]

#定义更新读取状态action函数
def mediatypeget(mediatypeid,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘mediatype.get‘,
              ‘params‘: {
                "output": "extend",

              "filter": {
                        "mediatypeid":mediatypeid,
                         },
              },

              ‘auth‘: auth,
              ‘id‘: ‘7‘
              }
    output = requestJson(url,values)
    if len(output)==0:
        return output
    else:
        return output[0][‘status‘]

#定义maintenance维修模式host函数
def maintenancecreate(maintenancename,active_since,active_till,hostid,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘maintenance.create‘,
              ‘params‘: {
              "name": maintenancename,
              "active_since": active_since,
              "active_till": active_till,
              "hostids": [
                    hostid
                ],
                "timeperiods": [
                            {
                "timeperiod_type": 0,
                "every": 1,
                "dayofweek": 64,
                "start_time": 64800,
                "period": 3600
                            }
                                ]
              },
              ‘auth‘: auth,
              ‘id‘: ‘8‘
              }
    output = requestJson(url,values)

#定义通过模糊获取关闭主机信息函数
def disabledhostget(url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘host.get‘,
            "params": {
                "output": ["host"],
                ‘selectInterfaces‘: [ "ip" ],
                "filter": {
                    "status":1
        }
    },
              ‘auth‘: auth,
              ‘id‘: ‘9‘
              }
    output = requestJson(url,values)
    return output

#定义maintenance维修模式group函数
def maintenancecreategroup(maintenancename,active_since,active_till,groupid,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘maintenance.create‘,
              ‘params‘: {
              "name": maintenancename,
              "active_since": active_since,
              "active_till": active_till,
              "groupids": [
                    groupid
                ],
                "timeperiods": [
                            {
                "timeperiod_type": 0,
                "every": 1,
                "dayofweek": 64,
                "start_time": 64800,
                "period": 3600
                            }
                                ]
              },
              ‘auth‘: auth,
              ‘id‘: ‘10‘
              }
    output = requestJson(url,values)

#定义通过host groups named 获取groupid
def groupnameGroupid(groupname,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘hostgroup.get‘,
              "params": {
                    "output": "extend",
                    "filter": {
                        "name": [
                            groupname
                        ]
                    }
                },
              ‘auth‘: auth,
              ‘id‘: ‘11‘
              }
    output = requestJson(url,values)
    return output

#定义模糊查询维护主机
def maintenanceget(url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘maintenance.get‘,
              "params": {
                    "output": "extend",
                },
              ‘auth‘: auth,
              ‘id‘: ‘12‘
              }
    output = requestJson(url,values)
    return output

#定义批量恢复处于维护主机
def maintenancedelete(maintenanceid,url,auth):
    values = {‘jsonrpc‘: ‘2.0‘,
              ‘method‘: ‘maintenance.delete‘,
              "params": [
                    maintenanceid
                ],
              ‘auth‘: auth,
              ‘id‘: ‘13‘
              }
    output = requestJson(url,values)
    return output

#定义通过hostid获取graphid的函数
def getgraphid(hostid,graphname,url,auth):
        values = {‘jsonrpc‘: ‘2.0‘,
                          ‘method‘: ‘graph.get‘,
                          ‘params‘: {
                                  "output": "name",
                                  "hostids": hostid,
                                  "sortfield": "name",
                          ‘filter‘: {
                                        "name": graphname
                                  },

                          },
                          ‘auth‘: auth,
                          ‘id‘: ‘14‘
                          }
        output = requestJson(url,values)
        return output

09:57:03

原文地址:https://www.cnblogs.com/feigerlan/p/9054699.html

时间: 2025-01-13 11:15:44

通过itchat调用zabbixAPI实现微信确认zabbix告警的相关文章

微信企业号升级企业微信后zabbix告警发不出去

微信企业号升级企业微信后便没有zabbix告警发出,单独运行脚本报错: 进入接口调试页面http://qydev.weixin.qq.com/debug,输入CorpID和Secret,得到access_token: 之前的格式是: 于是修改告警脚本,之前获取access_token的语句是: 由于现在格式变了(由第四列变成了第10列),于是将$4改为$10,再次运行脚本还有错误: 提示"Invalid input",单独运行输出格式: 发现唯一有异常的地方是agentid列有空格,试

Zabbix 实现微信短信告警

Zabbix简介 Zabbix 近几年得到了各大互联网公司的认可,当然第一点归功与它强大的监控功能,第二点免费开源也得到了广大用户的青睐.Zabbix 能将操作系统中的绝大部分指标进行监控,比如(CPU 负荷,内存使用,网络状况,端口监视,日志监视等等等等指标!).监控指标的广度是一方面,它强大的功能特点也省去了很多的配置操作. Zabbix 功能特点: 自动发现服务器和网络设备 分布式监控网络,集中式管理(agent .server 分开) 监控指标模版丰富 可灵活地分配用户权限 系统各个指标

Zabbix微信个人账号告警

前言: 最近研究zabbix告警,网上看了帖子有各式各样姿势:电话语音告警,邮件告警,短信告警,微信公众号告警等等等..姿势五花八门,真是纠结. 电话语音告警,短信告警首先pass 前者花钱,后者通过设置139邮箱,就可以实现伪短信告警效果. 剩下邮件告警与微信公众号告警.邮件告警已经在部署的时候配置完毕,剩下这个微信公众号告警,查一下帖子,申请各种麻烦.那么有没有基于微信个人账号的告警呢?想到这个点,马上github一番. 搜索到一些优秀的开源代码:https://github.com/0x5

利用微信公众号实现zabbix告警

之前觉得没必要写这个,这两天有同学问到zabbix关于微信告警的相关问题,于是昨天就注册了一个微信公众号,当做学习交流一下: 首先:我们要明白我们创建微信公众号发送消息到底需要哪些参数,这样我们再创建时候注意生成就可以了,需要的有如下几个参数: 1.通讯用户:touser 2.用于生成token的:corpid,secret 3.用于ID 开始操作:企业号注册连接:https://qy.weixin.qq.com/cgi-bin/loginpage 1.点击注册,然后选择团队主从即可: 2.创建

Zabbix告警集成 实时接收Zabbix告警,提供微信、移动APP、短信邮件提醒。

1.下载agent软件包 请在Zabbix服务器中,使用root或zabbix用户下载软件.下载agent 2.添加应用 创建Zabbix应用,并获取appkey,见下图04b9832b-14b4-4c61-343d-5926ff8af672 3.安装Agent 将agent更新到zabbix的外部告警脚本目录alertscripts,如果是源码安装的请自行更改目录. tar xvf alert-agent-4.0.1-RC2.tar.gz cp -R alert-agent /usr/lib/

zabbix告警通知

之前使用邮件和短信发送zabbix告警信息,但告警信息无法实时查看或者无法发送,故障无法及时通知运维人员. 后来使用第三方微信接口发送信息,愉快地用了一年多,突然收费了. zabbix告警一直是我的痛点,近期发现一个基于个人微信号的信息发送工具-lykchat. lykchat信息发送系统是Python3开发的,通过模拟微信网页端,基于个人微信号,为系统管理人员提供信息发送工具. 实现的功能有用户登录管理.微信登陆管理和微信信息发送功能. 代码地址:https://github.com/lyko

一个简单好用的zabbix告警信息发送工具

之前使用邮件和短信发送zabbix告警信息,但告警信息无法实时查看或者无法发送,故障无法及时通知运维人员. 后来使用第三方微信接口发送信息,愉快地用了一年多,突然收费了. zabbix告警一直是我的痛点,近期发现一个基于个人微信号的信息发送工具-lykchat. 引用:http://blog.csdn.net/liyingke112/article/details/68955298 lykchat信息发送系统是Python3开发的,通过模拟微信网页端,基于个人微信号,为系统管理人员提供信息发送工

微信公众号告警

微信公众号告警 首先在配置文件里修改AlertScriptsPath [[email protected] alertscripts]# vim /usr/local/zabbix/etc/zabbix_server.conf AlertScriptsPath=/usr/local/zabbix/alertscripts (1)先注册一个企业号.然后扫码登录: 地址:https://qy.weixin.qq.com/cgi-bin/loginpage (注册过程就不一一展示了非常简单) (2)之

CMDB机柜平台结合zabbix告警展示

最近看了刘天斯老师的机柜展示平台,非常绚丽,而且有大屏显示的话也是能够体现运维价值的,这里就说下我最近在做的CMDB平台的一些数据: CMDB数据: 机房,机柜,机柜电源,机柜位置,机房合同,合同到期时间,机房联系人. 服务器,CPU,硬盘,是否虚拟化,宿主机,raid类型,内存. 资产ID,上架日期,下架记录,服务器代理商,代理商联系方式,服务器到保日期. IP地址,MAC地址,业务线,产品线,操作系统. 通信这块主要技术json-rpc,然后提供Api接口给程序调用,按照固定格式导入即可:硬