###
python版本:2.7.13
pykafka版本:2.6.0
注明:python 3.6.2版本会报错。
备注:这个是一个通过pykafka模块向kafka生产数据
Github地址:https://github.com/Parsely/pykafka
Pykafka Doc:http://pykafka.readthedocs.io/en/latest/usage.html
producer.py
# -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts="192.168.0.100:9092") # 可接受多个client print client.topics # 查看所有的topic topic = client.topics[‘test‘] # 选择一个topic message = "test message test message" # 当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式 with topic.get_sync_producer() as producer: for i in range(4): producer.produce(‘test message ‘ + str(i ** 2))
###
consume.py
# -*- coding:utf-8 -*- from pykafka import KafkaClient client = KafkaClient(hosts=‘192.168.0.100:9092‘) topic=client.topics[‘test‘] balanced_consumer = topic.get_balanced_consumer( consumer_group=‘test_kafka_group‘, auto_commit_enable=False, # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息 zookeeper_connect=‘192.168.0.100:2181‘#这里可以连接多个zk ) for message in balanced_consumer: # print message if message is not None: print message.offset, message.value #打印接收到的消息体的偏移个数和值
###
时间: 2024-11-06 21:19:39