Python # pykafka - Producer and Consume

###

python版本:2.7.13

pykafka版本:2.6.0

注明:python 3.6.2版本会报错。

备注:这个是一个通过pykafka模块向kafka生产数据

Github地址:https://github.com/Parsely/pykafka

Pykafka Doc:http://pykafka.readthedocs.io/en/latest/usage.html

producer.py

# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.0.100:9092")
# 可接受多个client

print client.topics
# 查看所有的topic

topic = client.topics[‘test‘]
# 选择一个topic

message = "test message test message"
# 当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式
with topic.get_sync_producer() as producer:
    for i in range(4):
            producer.produce(‘test message ‘ + str(i ** 2))

###

consume.py

# -*- coding:utf-8 -*-
from pykafka import KafkaClient

client = KafkaClient(hosts=‘192.168.0.100:9092‘)

topic=client.topics[‘test‘]

balanced_consumer = topic.get_balanced_consumer(
    consumer_group=‘test_kafka_group‘,
    auto_commit_enable=False,
    # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息
    zookeeper_connect=‘192.168.0.100:2181‘#这里可以连接多个zk
)

for message in balanced_consumer:
    # print message
    if message is not None:
        print message.offset, message.value
        #打印接收到的消息体的偏移个数和值

###

时间: 2024-11-06 21:19:39

Python # pykafka - Producer and Consume的相关文章

ReferenceError: weakly-referenced object no longer exists Python kafka

Python存入kafka报错,ReferenceError: weakly-referenced object no longer exists. Exception in thread 14: pykafka.OwnedBroker.queue_reader for broker 101: Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_

python全栈开发 * 进程之间的通信,进程之间数据共享 * 180726

进程之间的通信(IPC)队列和管道一.队列 基于管道实现 管道 + 锁 数据安全(一).队列 队列遵循先进先出原则(FIFO) 多用于维护秩序,买票,秒杀 队列的所有方法: put()(给队列里添加数据),put_nowait(), get()(从队列中获取数据),get_nowait(), 相同点:有值的时候取值 区别:get()没有值时会阻塞 get_nowait() 没有值时会报错 full()(返回布尔值),empty()(返回bool值), qsize()(队列大小) 示例: from

Python之路(第三十九篇)管道、进程间数据共享Manager

一.管道 概念 管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信. 先画一幅图帮助大家理解下管道的基本原理 现有2个进程A和B,他们都在内存中开辟了空间,那么我们在内存中再开辟一个空间C,作用是连接这两个进程的.对于进程来说内存空间是可以共享的(任何一个进程都可以使用内存,内存当中的空间是用地址来标记的,我们通过查找某一个地址就能找到这个内存)A进程可以不断的向C空间输送东西,B进程可以不断的从C空间读取东西,这

安装Kafka集群

本文将介绍如何安装消息队列系统,Kafka集群: 1 安装Java yum install -y java-1.8.0-openjdk-devel 2 安装Zookeeper 下载.安装.启动Zookeeper wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz tar vxf zookeeper-3.4.11.tar.gz mv zookeeper

kafka使用 SASL/PLAIN 认证服务端/客户端配置

使用 SASL/PLAIN 认证 1.配置kafka server端(每个broker) vi $KAFKA_HOME/server.properties listeners=SASL_PLAINTEXT://x-x-x-x:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN 在$KAFKA_HOME路

2016-09-01

RabbitMQ消息队列(一): Detailed Introduction 详细介绍 RabbitMQ消息队列(二):”Hello, World“ global 全局,全球的super  超级的,全球priority  优先级load balance  负载平衡cluster  集群subscribe  订阅filter  滤波器producer  生产者consume 消费者payload  有效载荷merge  合并reject  拒绝declare 声明guard  警卫 cache 隐

2016-09-08单词

global               全局,全球priority             优先load                 平衡 balance            负载 cluster             集群subscribe         订阅filter                过滤,滤除,滤波器producer          生产者consume          消费者payload           有效负载merge             合

管道,进程间数据共享,进程池

一:管道   (了解) 使用:from multiprocessing import Process,Pipe 知识: 1 创建管道时候:Pipe()默认是双工的,如改成False,那么conn1只能接收,conn2只能发送. conn1,conn2=Pipe() 2 Pipe模块发送字符串不用bytes类型,直接是字符串类型. Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1, conn2表示管道两端的连接对象,强调一点:必须在产生Pr

进程---管道、数据共享Manager、进程池和回调函数(重要)(六)

#   管道 from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() #子进程只关闭conn2时会抛出一个EOFError(没数据可取时recv),根据EOFError结束循环 while True: try : msg = conn1.recv()#不判断话会阻塞 print(msg) except EOFError: conn1.close() break if __name__ == '__