python-kafka实现produce与consumer

1.python-kafka:

api送上:https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaConsumer.html

2.实现一个broker、topic可配置的生产者与消费者:

#coding=utf-8

import time
import logging
import sys
import json
import etc.config as conf
sys.path.append(‘***********/kafka-python-1.3.3‘)
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from kafka import TopicPartition

def log_name():
    base_name = conf.kafka_logDir
    date = time.strftime(‘%Y%m%d‘,time.localtime(time.time())) + ‘.log‘
    return base_name + date

logging.basicConfig(level=logging.DEBUG,
        format=‘%(asctime)-15s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s‘,
        datefmt=‘%Y-%m-%d %H:%M:%S‘,
        filename=log_name(),
        filemode=‘a‘
        )
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger(‘‘).addHandler(console)

class kfkProducer(object):

    # producer = None

    def __init__(self, broker, kafkaPort, kafkaTopic=‘‘):
        self._broker = broker
        self._kafkaPort = kafkaPort
        self._kafkaTopic = kafkaTopic

    def __str__(self):
        logging.info("--------------------------------")
        logging.info("kafka-producer params ...")
        logging.info("[KAFKA-BROKER]:%s" %self._broker)
        logging.info("[KAFKA-PORT]:%s" %self._kafkaPort)
        logging.info("[KAFKA-TOPIC]:%s" %self._kafkaTopic)
        logging.info("--------------------------------")

    def registerKfkProducer(self):
        try:
            producer = KafkaProducer(bootstrap_servers = ‘{kafka_host}:{kafka_port}‘.format(
                kafka_host=self._broker,
                kafka_port=self._kafkaPort
                ))
        except KafkaError as e:
            logging.info(e)
        return producer

    def produceMsg(self, topic, msg, partition=0):
        # 自动将输入字符串转化为json格式,产出消息
        if(topic in (‘‘, None)):
            logging.error("topic is None, plz check!")
        else:
            try:
                # parmas_message = json.dumps(msg)#转化为json格式
                producer = self.registerKfkProducer()
                producer.send(topic, value=msg, partition=partition)
                producer.flush()
                # time.sleep(1)
            except KafkaError as e:
                logging.info(e)

class kfkConsumer(object):

    # consumer = None

    def __init__(self, broker, kafkaPort, kafkaTopic=‘‘):
        self._broker = broker
        self._kafkaPort = kafkaPort
        self._kafkaTopic = kafkaTopic

    def __str__(self):
        logging.info("--------------------------------")
        logging.info("kafka-consumer params ...")
        logging.info("[KAFKA-BROKER]:%s" %self._broker)
        logging.info("[KAFKA-PORT]:%s" %self._kafkaPort)
        logging.info("[KAFKA-TOPIC]:%s" %self._kafkaTopic)
        logging.info("--------------------------------")

    def registerConsumer(self):
        try:
            consumer = KafkaConsumer(
                bootstrap_servers=[self._broker+‘:‘+self._kafkaPort],
                auto_offset_reset=‘earliest‘)
        except KafkaError as e:
            logging.info(e)
        return consumer

    def consumerMsg(self, topic, partition=0):
        if(topic in (‘‘, None)):
            logging.error("topic is None, plz check!")
        else:
            try:
                v_consumer = self.registerConsumer()
                v_consumer.assign([TopicPartition(topic,partition)])
                # self.registerConsumer.subscribe([self._kafkaTopic])
                for message in v_consumer:
                    # message value and key are raw bytes -- decode if necessary!
                    # e.g., for unicode: `message.value.decode(‘utf-8‘)
                    logging.info("%s:%d:%d: msg=%s" % (message.topic, message.partition,
                                                            message.offset, message.value.decode(‘utf-8‘)))
            except KafkaError as e:
                logging.info(e)

3.实现命令行输入topic和partition,即可生产消息:

#coding=utf-8

import os
import sys
import json
import etc.config as conf
from PykafkaMgr import kfkProducer

#从json文件获取消息
def getMsgFromJsonfile(filePath):
    if(not os.path.isfile(filePath)):
        print(u"[%s] 输入的json文件路径有误,请检查..." %filePath)
    else:
        with open(filePath) as json_file:
            return json.load(json_file)

def except4v():
    if(len(sys.argv) <= 1):
        print(u"未输入topic和partition!\n你可以--help查看具体使用方法...")
    elif(sys.argv[1].startswith("--")):
        option = sys.argv[1][2:]
        # print(option)
        if(option in ("version", "Version")):
            print("Version 1.0 \nPython 2.7.3 (default, Nov  6 2015, 14:11:14)                     \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2")
        elif(option == "help"):
            print(u"produceMsg.py 接收两个参数, 第一个是topic, 第二个是partition \neg:python produceMsg.py test 0 \n向topic名为test第0分区生产消息")

def calcMsg(jsonMsg):
    sumMsg, sumAcct = 0, 0
    msgNum = len(jsonMsg)
    print("------------------------------------------")
    for i in range(msgNum):
        acct_num = len(jsonMsg[i]["MSGBODY"])
        print(u"第[%d]条消息,包含ACCT_ID账户数:[%d]个"%(i+1, acct_num))
        sumMsg = i+1
        sumAcct += acct_num
        acct_num = 0
    print(u"本次生产消息总共[%d]条, 总共账户数:[%d]个"%(sumMsg, sumAcct))
    print("------------------------------------------")

if __name__ == ‘__main__‘:

    except4v()

    if(len(sys.argv) == 3):
        topic = sys.argv[1]
        partition = int(sys.argv[2])
        produce = kfkProducer(conf.kafka_mgr["broker"], conf.kafka_mgr["port"], topic)
        produce.__str__()
        jsonMsg = getMsgFromJsonfile(conf.kafka_produce)
        for i in range(len(jsonMsg)):
            produce.produceMsg(topic, (‘%s‘%jsonMsg[i]).encode(‘utf-8‘), partition)
        calcMsg(jsonMsg)

4.设置两个配置文件:

第一个是config.py

#coding=utf-8

#broker配置还有一种方式是:kafka_mgr={"broker":‘ip1:port,ip2:port,...,ipn:port‘},就是改为kafka集群,不过代码要稍微作调整(参数列表改下就行了)。当然配置两种,通过一个开关去控制也可以。自选
kafka_mgr = {
    "broker" : ‘10.***.***.***‘,
    "port" : 6667,
}

kafka_logDir = r"/*******/log/****"

#生产者输入json文件
kafka_produce = r"/**********/data/input/produceMsg.json"
生产者输入json文件:produceMsg.json
json文件附上说明,具体可以按照说明配置

hi, welcome here~

produceMsg.json
=================================
输入json格式数据,作为生产者消息的输入。
1.支持多条json数据输入。格式如下:
    [
    json1,
    json2,
    ...,
    jsonN
    ]
总体结构是:[  ,  ]

2.此json文件不能加注释,因为会破坏json文件格式,导致无法解析
3.输入只要是json格式,不需要关注是不是一行或多行,多换行、空格等都不影响解析

消费者也是利用以上两个配置文件去实现即可。此处代码略

时间: 2024-10-04 00:04:40

python-kafka实现produce与consumer的相关文章

python kafka

kafka简介(摘自百度百科) 简介: kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案.Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费.

Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 本文的图片是通过PPT截图出的,读者如果修改意见请联系我 一.Consumer为何需要实现多线程 假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息.该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用AP

.net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

依据Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代码块,还原本地环境,跟踪调试,发现自定义Consumer Group 的Consumer并没有分配到PartionID,如下图. 据此,基本就可以定位到不同组Consumer无法覆盖Partition的问题根源了. 仔细阅读Rebalance代码,发现Kafka.Client 在获取consumer时,并没有根据Group做筛选,获取到的是所有组的Consumer,如下图 (此处只

ReferenceError: weakly-referenced object no longer exists Python kafka

Python存入kafka报错,ReferenceError: weakly-referenced object no longer exists. Exception in thread 14: pykafka.OwnedBroker.queue_reader for broker 101: Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_

Kafka 学习笔记之 Consumer API

Kafka提供了两种Consumer API High Level Consumer API Low Level Consumer API(Kafka诡异的称之为Simple Consumer API,实际上非常复杂) 1. High Level Consumer API概述 High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Consumer group

设计Kafka的High Level Consumer

原文:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 为什么使用High Level Consumer 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们只关心数据能被消费即可.High Level 就是用于抽象这类消费动作的. 消息消费已Consumer Group为单位,每一个Consumer Group中能够有多个consumer.每一个consumer

Kafka的Producer和Consumer源码学习

先解释下两个概念: high watermark (HW) 它表示已经被commited的最后一个message offset(所谓commited, 应该是ISR中所有replica都已写入),HW以下的消息都已被ISR中各个replica同步,从而保持一致.HW以上的消息可能是脏数据:部分replica写成功,但最终失败了. Kafka Partition:  1> 均衡各个Broker之间的数据和请求压力: 2> 分摊处理不同的消费者进程: 3> 在partition内可以保证局部

Kafka的Producer和Consumer的示例

我使用的kafka版本是:0.7.2 jdk版本是:1.6.0_20 http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的. 分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm A

python kafka权限校验client.id

kafka集群有权限校验,在连接时需要加入client.id.但pykafka不能配置该选项.搜索了一下,需要使用confluent-kafka 链接: https://blog.csdn.net/lanyang123456/article/details/80639625 #coding:utf-8 from confluent_kafka import Consumer, KafkaError mybroker = "127.0.0.1:9092" #host client_id