python抓取系统metrics吐给kafka

本篇介绍用python写脚本,抓取系统metrics,然后调用kafka client library把metrics吐给kafka的案例分享。对于用kafka的同学实用性很高。

在运行本实例前需要先下载两个python库到本地 : six和kafka-python

cat config_system_metrics.json 

{

"env": {

"site": "cluster",

"component": "namenode",

"metric_prefix": "system"

},

"output": {

"kafka": {

"topic": "system_metrics_cluster",

"brokerList": ["10.10.10.1:9092", "10.10.10.2:9092", "10.10.10.3:9092"]

}

}

}

cat system_metrics.python

#!/usr/bin/env python

import sys

import os

import json

import socket

import re

import time

import logging

import threading

# load kafka-python

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),‘‘,‘lib/six‘))

import six

# load kafka-python

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)),‘‘,‘lib/kafka-python‘))

from kafka import KafkaClient, SimpleProducer, SimpleConsumer

logging.basicConfig(level=logging.INFO,

format=‘%(asctime)s %(name)-12s %(levelname)-6s %(message)s‘,

datefmt=‘%m-%d %H:%M‘)

CONFIG_FILE = ‘config_system_metrics.json‘

class LoadConfig(object):

def __init__(self):

config_file = "./" + CONFIG_FILE

try:

f = open(config_file, ‘r‘)

except Exception, e:

print "Load config file %s Error !" % config_file

sys.exit(1)

try:

config_json = json.loads(f.read())

except Exception, e:

print "Convert config file to Json format Error !"

sys.exit(1)

if f:

f.close()

self.config = config_json

class Kafka(LoadConfig):

def __init__(self):

LoadConfig.__init__(self)

self.broker = self.config["output"]["kafka"]["brokerList"]

def kafka_connect(self):

#print "Connecting to kafka "+str(self.broker)

# To send messages synchronously

kc = KafkaClient(self.broker,timeout = 30)

producer = SimpleProducer(kc,async=False,batch_send=True)

return kc, producer

def kafka_produce(self, producer, topic, kafka_json):

# ************  Sample of kafka_json ********************

# {‘timestamp‘: 1463710, ‘host‘: ‘xxx‘, ‘metric‘: ‘system.nic.receivedbytes‘, ‘value‘: ‘4739‘, ‘component‘: ‘namenode‘, ‘site‘: ‘apolloqa‘}

# ******************************************************

# Note that the application is responsible for encoding messages to type str

producer.send_messages(topic, kafka_json)

class Metric(LoadConfig):

def __init__(self):

LoadConfig.__init__(self)

try :

self.fqdn = socket.getfqdn()

except Exception , e:

print "Could not get hostname ! %s " %e

sys.exit(1)

self.data = []

self.datapoint = {}

self.datapoint["timestamp"] = int(round(time.time() * 1000))

self.datapoint["host"] = self.fqdn

self.datapoint["component"] = self.config[‘env‘][‘component‘]

self.datapoint["site"] = self.config[‘env‘][‘site‘]

class Metric_Uptime(Metric):

def __init__(self):

Metric.__init__(self)

self.demensions = ["uptime.day", "idletime.day"]

self.result = os.popen(‘cat /proc/uptime‘).readlines()

self.data = []

def metric_collect(self):

for line in self.result:

values = re.split("\s+", line.rstrip())

for i in range(len(self.demensions)):

self.datapoint["metric"] = self.config[‘env‘][‘metric_prefix‘] + "." + ‘uptime‘ + ‘.‘ + self.demensions[i]

self.datapoint["value"] = str(round(float(values[i]) / 86400 , 2))

self.data.append(self.datapoint.copy())

return self.data

class Metric_Loadavg(Metric):

def __init__(self):

Metric.__init__(self)

self.demensions = [‘cpu.loadavg.1min‘, ‘cpu.loadavg.5min‘, ‘cpu.loadavg.15min‘]

self.result = os.popen(‘cat /proc/loadavg‘).readlines()

self.data = []

def metric_collect(self):

for line in self.result:

values = re.split("\s+", line.strip())

for i in range(len(self.demensions)):

self.datapoint["metric"] = self.config[‘env‘][‘metric_prefix‘] + "." + ‘loadavg‘ + ‘.‘ + self.demensions[i]

self.datapoint["value"] = values[i]

self.data.append(self.datapoint.copy())

return self.data

class Metric_Memory(Metric):

def __init__(self):

Metric.__init__(self)

self.result = os.popen(‘cat /proc/meminfo‘).readlines()

self.data = []

def metric_collect(self):

for line in self.result:

demensions = re.split(":?\s+", line.rstrip())

self.datapoint["metric"] = self.config[‘env‘][‘metric_prefix‘] + "." + ‘memory‘ + ‘.‘ + demensions[0] + ‘.kB‘

self.datapoint["value"] = demensions[1]

self.data.append(self.datapoint.copy())

return self.data

class Metric_CpuTemp(Metric):

def __init__(self):

Metric.__init__(self)

self.result = os.popen(‘sudo ipmitool sdr | grep Temp | grep CPU‘).readlines()

self.data = []

def metric_collect(self):

for line in self.result:

demensions = re.split("\|", line.strip())

self.datapoint["metric"] = self.config[‘env‘][‘metric_prefix‘] + "." + re.split(" ", demensions[0])[0] + ‘.Temp‘

self.datapoint["value"] = re.split(" ", demensions[1])[1]

self.data.append(self.datapoint.copy())

print self.data

return self.data

class Metric_Net(Metric):

def __init__(self):

Metric.__init__(self)

self.demensions = [‘receivedbytes‘, ‘receivedpackets‘, ‘receivederrs‘, ‘receiveddrop‘, ‘transmitbytes‘, ‘transmitpackets‘,

‘transmiterrs‘, ‘transmitdrop‘]

self.result = os.popen("cat /proc/net/dev").readlines()

self.data = []

def metric_collect(self):

for line in self.result:

if re.match(‘^(Inter|\s+face|\s+lo)‘, line) :

continue

interface = re.split(‘:?\s+‘, line)[1]

values = re.split(‘:?\s+‘, line)[2:6] + re.split(‘:?\s+‘, line)[9:13]

for i in range(len(self.demensions)):

self.datapoint["metric"] = self.config[‘env‘][‘metric_prefix‘] + "." + interface + "." + self.demensions[i]

self.datapoint["value"] = values[i]

self.data.append(self.datapoint.copy())

print self.data

return self.data

class Collect(LoadConfig):

def __init__(self):

LoadConfig.__init__(self)

self.topic = self.config[‘output‘][‘kafka‘][‘topic‘]

def connect(self):

self.kafkaclient = Kafka()

self.kc, self.producer = self.kafkaclient.kafka_connect()

return self.kc, self.producer

def send(self, kafka_producer, msg):

self.kafkaclient.kafka_produce(kafka_producer, self.topic, json.dumps(msg))

def close(self, kafka_producer, kafka_client):

if kafka_producer is not None:

kafka_producer.stop()

if kafka_client is not None:

kafka_client.close()

def run(self, kafka_client, kafka_producer, metric):

data = metric.metric_collect()

#print data

self.send(kafka_producer, data)

self.close(kafka_producer, kafka_client)

collector = Collect()

metrics = [Metric_Uptime(), Metric_Loadavg(), Metric_Memory(), Metric_CpuTemp(), Metric_Net()]

# Establish Tcp connection once forever, share the same Tcp conncetion

kafka_client, kafka_producer = collector.connect()

for metric in metrics:

t = threading.Thread(target = collector.run , args = (kafka_client, kafka_producer, metric) )

t.start()

时间: 2024-10-09 17:28:00

python抓取系统metrics吐给kafka的相关文章

Python抓取网页&批量下载文件方法初探(正则表达式+BeautifulSoup) (转)

Python抓取网页&批量下载文件方法初探(正则表达式+BeautifulSoup) 最近两周都在学习Python抓取网页方法,任务是批量下载网站上的文件.对于一个刚刚入门python的人来说,在很多细节上都有需要注意的地方,以下就分享一下我在初学python过程中遇到的问题及解决方法. 一.用Python抓取网页 基本方法: [python] view plaincopyprint? import urllib2,urllib url = 'http://www.baidu.com' req 

房东要给我涨800房租,生气的我用Python抓取帝都几万套房源信息,我主动涨了1000。

老猫我在南五环租了一个80平两居室,租房合同马上到期,房东打电话问续租的事,想要加房租:我想现在国家正在也在抑制房价,房子价格没怎么涨,房租应该也不会涨,于是霸气拒绝了,以下是聊天记录:确认不续租之后,两三年没找过房的我上网搜索租房,没想到出来一坨自如,蛋壳,贝壳等中介网站:进去看看,各种房照非常漂亮,但是一看房租,想送给自己一首<凉凉>:附近房租居然比我当前房租高1000多RMB:自我安慰下,这些网站房源价格不是真实的,于是切换到我爱我家,链家等大中介平台,结果发现房租价格都差不多:心想这才

手把手教你使用Python抓取QQ音乐数据!

[一.项目目标] 通过手把手教你使用Python抓取QQ音乐数据(第一弹)我们实现了获取 QQ 音乐指定歌手单曲排行指定页数的歌曲的歌名.专辑名.播放链接. 通过手把手教你使用Python抓取QQ音乐数据(第二弹)我们实现了获取 QQ 音乐指定歌曲的歌词和指定歌曲首页热评. 通过手把手教你使用Python抓取QQ音乐数据(第三弹)我们实现了获取更多评论并生成词云图. 此次我们将将三个项目封装在一起,通过菜单控制爬取不同数据. [二.需要的库] 主要涉及的库有:requests.openpyxl.

Python抓取页面乱码问题的解决

import urllib2 response=urllib2.urlopen('http://house.focus.cn/') html=response.read() print html.decode('gbk') Python抓取页面乱码问题的解决,布布扣,bubuko.com

使用python抓取CSDN关注人的所有发布的文章

# -*- coding: utf-8 -*- """ @author: jiangfuqiang """ import re import urllib2 import cookielib import time def startParser(author,page=1): reg = r'<a href="/\w+/article/details/\d+">\s*\t*\n*\s*\t*\s*.*?\t*\n

运用python抓取博客园首页的所有数据,而且定时持续抓取新公布的内容存入mongodb中

原文地址:运用python抓取博客园首页的所有数据,而且定时持续抓取新公布的内容存入mongodb中 依赖包: 1.jieba 2.pymongo 3.HTMLParser # -*- coding: utf-8 -*- """ @author: jiangfuqiang """ from HTMLParser import HTMLParser import re import time from datetime import date im

Python抓取需要cookie的网页

Python抓取需要cookie的网页 在仿照<Python小练习:可视化人人好友关系>一文时,需要登录模拟登录人人网.然而自从CSDN事件之后,人人网开始使用加密方式处理登录名和密码,直接使用post方式已经无法登陆人人网.这时,从豆瓣讨论中找到了解决方法: 1. 首先使用浏览器登陆人人,然后找到浏览器中关于登陆的Cookie: 2. 将Cookie记录下来,在Python中使用cookie模块模拟浏览器的行为: 3. 取得并解析数据. 1. HTTP协议与Cookie 抓取网页的过程跟浏览

使用python抓取CSDN关注人的全部公布的文章

# -*- coding: utf-8 -*- """ @author: jiangfuqiang """ import re import urllib2 import cookielib import time def startParser(author,page=1): reg = r'<a href="/\w+/article/details/\d+">\s*\t*\n*\s*\t*\s*.*?\t*\n

python抓取百度彩票的双色球数据

最近在学习<机器学习实战>这本书,在学习的过程中不免要自己去实践,写些练习.这写练习的第一步就需要收集数据,所以为了写好自己的练习程序,我得先学会收集一些网络数据.了解到用python抓取网页数据的一些方法后,我就根据别人的demo,自己实践了一下,学着从百度彩票网站上抓取双色球的历史数据.以下我就介绍一下自己的小程序. 大致思路如下 找到相关url和其参数 找出页面上你要抓取的数据的位置,也就是说这个数据在那些标签下 将每页中学要的数据取下来按一定格式存放在自己本地 需要的环境: pytho