kafka集群有权限校验,在连接时需要加入client.id。但pykafka不能配置该选项。搜索了一下,需要使用confluent-kafka
链接: https://blog.csdn.net/lanyang123456/article/details/80639625
#coding:utf-8
from confluent_kafka import Consumer, KafkaError
mybroker = "127.0.0.1:9092" #host
client_id = "校验id"
my_topic = "你的topic"
c = Consumer({
'bootstrap.servers': mybroker,
'group.id': 'mygroup',
'client.id': client_id ,
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe([my_topic])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
原文地址:https://www.cnblogs.com/huim/p/10673302.html
时间: 2024-11-09 00:36:24