Kafka(八)Python生产者和消费者API使用

单线程生产者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import sys
from kafka import KafkaProducer
from kafka.client import log
import time
import json

__metaclass__ = type

class Producer:
    def __init__(self, KafkaServer='127.0.0.1', KafkaPort='9092', ClientId="Procucer01", Topic='Test'):
        """
        用于设置生产者配置信息,这些配置项可以从源码中找到,下面为必要参数。
        :param KafkaServer: kafka服务器IP
        :param KafkaPort: kafka工作端口
        :param ClientId: 生产者名称
        :param Topic: 主题
        """
        self._bootstrap_server = '{host}:{port}'.format(host=KafkaServer, port=KafkaPort)
        self._topic = Topic
        self._clientId = ClientId

        """
        初始化一个生产者实例,生产者是线程安全的,多个线程共享一个生产者实例效率比每个线程都使用一个生产者实例要高
        acks: 消费者只能消费被提交的,而只有消息在所有副本中都有了才算提交,生产者发送了消息是否要等待所有副本都同步了该消息呢?这个值就是控制这个的。默认是1,表示只要该分区的Leader副本成功写入日志就返回。
              0表示生产者无需等待,发送完就返回;all是所有副本都写入该消息才返回。 all可靠性最高但是效率最低,0效率最高但是可靠性最低,所以一般用1。
        retries: 表示请求重试次数,默认是0,上面的acks配置请求完成的标准,如果请求失败,生产者将会自动重试,如果配置为0则不重试。但是如果重试则有可能发生重复发送消息。
        key_serializer: 键的序列化器,默认不设置,采用字节码
        value_serializer: 值得序列化器,默认不设置,采用字节码,因为可以发送单一字符,也可以发送键值型消息
        """
        try:
            self._producer = KafkaProducer(bootstrap_servers=self._bootstrap_server, client_id=self._clientId, acks=1,
                                           value_serializer=lambda m: json.dumps(m).encode('utf-8'))
        except Exception as err:
            print err.message

    def _TIMESTAMP(self):
        t = time.time()
        return int((round(t * 1000)))

    # 时间戳转换为普通时间
    def getNormalTime(self, temp_timeStamp, timeSize=10):
        timeStamp = temp_timeStamp
        if timeSize == 13:
            timeStamp = int(temp_timeStamp / 1000)
        timeArray = time.localtime(timeStamp)
        otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
        return otherStyleTime

    # 发送成功的回调函数
    def _on_send_success(self, record_metadata):
        print "Topic: %s Partition: %d Offset: %s" % (record_metadata.topic, record_metadata.partition, record_metadata.offset)

    # 发送失败的回调函数
    def _on_send_error(self, excp):
        log.error('I am an errback', exc_info=excp)

    def sendMsg(self, msg, partition=None):
        """
        发送消息
        :param msg: 消息
        :param partition: 分区也可以不指定
        :return:
        """
        if not msg:
            print "消息不能为空。"
            return None

        # 发送的消息必须是序列化后的,或者是字节
        message = json.dumps(msg, encoding='utf-8', ensure_ascii=False)
        try:
            TIMESTAMP = self._TIMESTAMP()
            # 发送数据,异步方式,调用之后立即返回,因为这里其实是发送到缓冲区,所以你可以多次调用,然后一起flush出去。
            self._producer.send(self._topic, partition=partition, key=self._clientId, value=message, timestamp_ms=TIMESTAMP).add_callback(self._on_send_success).add_errback(self._on_send_error)
            # 下面的 flush是阻塞的,只有flush才会真正通过网络把缓冲区的数据发送到对端,如果不调用flush,则等到时间或者缓冲区满了就会发送。
            self._producer.flush()
            print self.getNormalTime(TIMESTAMP, timeSize=13) + " send msg: " + message
        except Exception as err:
            print err

def main():
    p = Producer(KafkaServer="172.16.48.171", KafkaPort="9092", Topic='AAA')
    for i in range(10):
        time.sleep(1)
        closePrice = random.randint(1, 500)
        msg = {
            "股票代码": 60000 + i,
            "昨日收盘价": closePrice,
            "今日开盘价": 0,
            "今日收盘价": 0,
        }
        p.sendMsg(msg)

if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

消费者

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from kafka import KafkaConsumer
import json

__metaclass__ = type

class Consumer:
    def __init__(self, KafkaServer='127.0.0.1', KafkaPort='9092', GroupID='TestGroup', ClientId="Test", Topic='Test'):
        """
        用于设置消费者配置信息,这些配置项可以从源码中找到,下面为必要参数。
        :param KafkaServer: kafka服务器IP
        :param KafkaPort: kafka工作端口
        :param GroupID: 消费者组ID
        :param ClientId: 消费者名称
        :param Topic: 主题
        """
        self._bootstrap_server = '{host}:{port}'.format(host=KafkaServer, port=KafkaPort)
        self._groupId = GroupID
        self._topic = Topic
        self._clientId = ClientId

    def consumeMsg(self):

        try:
            """
            初始化一个消费者实例,消费者不是线程安全的,所以建议一个线程实现一个消费者,而不是一个消费者让多个线程共享
            下面这些是可选参数,可以在初始化KafkaConsumer实例的时候传递进去
            enable_auto_commit 是否自动提交,默认是true
            auto_commit_interval_ms 自动提交间隔毫秒数
            """
            consumer = KafkaConsumer(self._topic, bootstrap_servers=self._bootstrap_server,
                                     group_id=self._groupId, client_id=self._clientId, enable_auto_commit=True,
                                     auto_commit_interval_ms=5000, value_deserializer=lambda m: json.loads(m.decode('utf-8')))

            """
            这里不需要显示的调用订阅函数,在初始化KafkaConsumer对象的时候已经指定了主题,如果主题字段不为空则会自动调用订阅函数,至于
            这个线程消费哪个分区则是自动分配的。如果你希望手动指定分区则就需要使用 assign() 函数,并且在初始的时候不输入主题。
            """
            # consumer.subscribe(self._topicList)

            # 返回一个集合
            print "当前消费的分区为:", consumer.partitions_for_topic(self._topic)
            print "当前订阅的主题为:", consumer.subscription()

            while True:
                for msg in consumer:
                    if msg:
                        print "Topic: %s Partition: %d Offset: %s Key: %s Message: %s " % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        except Exception as err:
            print err

def main():
    try:
        c = Consumer(KafkaServer='172.16.48.171', Topic='AAA')
        c.consumeMsg()
    except Exception as err:
        print err.message

if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

执行效果

原文地址:http://blog.51cto.com/littledevil/2148729

时间: 2024-10-06 02:37:36

Kafka(八)Python生产者和消费者API使用的相关文章

kafka中生产者和消费者API

使用idea实现相关API操作,先要再pom.xml重添加Kafka依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <artifactId>jmxtools&

python 生产者与消费者模式

生产者与消费者模式 1. 队列 先进先出 2. 栈 先进后出 Python的Queue模块中提供了同步的.线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用.可以使用队列来实现线程间的同步. 用FIFO队列实现上述生产者与消费者问题的代码如下: import threading import time from q

Python 生产者与消费者模型

定义: 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度.     为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式.     什么是

python生产者和消费者模式实现(一)

import timeimport randomfrom multiprocessing import Queue # 生产者def producer(q, num): for i in range(1, num + 1): food = 'Spam-%d' % i # time.sleep(random.uniform(1, 2)) timeVal = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print('时间:%s\

Python 生产者和消费者模型

import timedef consumer(name): print ("%s consumer one product" %name) while True: baozi = yield print ("baozi[%s] bei [%s] chile" %(baozi,name)) def producter(name): c = consumer('A') c2 = consumer('B') c.__next__() c2.__next__() prin

Python生产者和消费者

# Author:XiangLiangimport threading,timeimport queue q = queue.Queue(maxsize=10)def Producer(name): count = 1 while True: q.put("骨头 %s" %count) print("生产了骨头",count) count += 1 time.sleep(0.1) def Consumer(name): while True: print("

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

rabbitmq的安装和命令介绍及python程序模拟生产者和消费者

[介绍] RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. RabbitMQ是流行的开源消息队列系统,用erlang语言开发 RabbitMQ是AMQP(高级消息队列协议)的标准实现 官网:http://www.rabbitmq.com/ [安装] 方式:yum/rpm 系统环境 [[email protected]_server scripts]# ifconfig | sed -n 's#.*inet addr:

黑马程序员_日记18_Java多线程(八)--生产者消费者问题JDK1.5特性

--- android培训.java培训.期待与您交流! ---- 生产者消费者问题JDK1.5特性 一.概述 在JDK1.5之前,解决生产者和消费者问题, 用的是synchronized同步+while+notify(): 但是这种方法很不安全,很容易让线程全部陷入无限等待状态. 于是我们改用notiyfyAll();来解决. 这样虽然解决了安全问题,但还是存在不足和安全隐患. notifyAll方法唤醒了线程池中全部的线程, 这并不是我们想要的! 而且,同步套同步很容易发生死锁! 在JDK1