【kafka】celery与kafka的联用问题

背景:一个小应用,用celery下发任务,任务内容为kafka生产一些数据。

问题:使用confluent_kafka模块时,单独启用kafka可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了。

解决:换了个pykafka模块,结果问题就没有了。

我很疑惑啊,是我调用confluent_kafka的方法不对吗,怎么套上celery就不行了呢?

可以用的pykafka代码:

tasks.py

from celery import Celery
from pykafka import KafkaClient
import json

app = Celery(‘tasks‘, backend=‘amqp‘, broker=‘amqp://xxx:[email protected]/xxxhost‘)

@app.task
def produce():
    client = KafkaClient(hosts="localhost:9092")
    print client.topics
    topic = client.topics[‘test_result‘]
    with topic.get_sync_producer() as producer:
        for i in range(3):
            data = {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"}
            print(‘Producing message: %s‘ % data)
            producer.produce(json.dumps(data))
        print "finish produce"
        producer.stop()
        print "stop"                 

run_worker.py

from tasks import produce

for i in range(1000):
    result = produce.delay()
    print result.status

无法正常生产数据的confluent_kafka代码:

tasks.py

from celery import Celery
from kafka_producer import p
import json

app = Celery(‘tasks‘, backend=‘amqp‘, broker=‘amqp://xxx:[email protected]/xxxhost‘)

@app.task
def produce():
    for i in range(3000):
        data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"}
        print(‘Producing message: %s‘ % data)
        p.produce(‘test_result3‘, json.dumps(data))
    print "finish produce"
    p.flush()
    print "finish flush"

run_worker.py

from tasks import produce
result = produce.delay()
print result.status
print result.ready()
print result.get()
print result.status
时间: 2024-10-23 01:10:43

【kafka】celery与kafka的联用问题的相关文章

Kafka剖析:Kafka背景及架构介绍

<Kafka剖析:Kafka背景及架构介绍> <Kafka设计解析:Kafka High Availability(上)> <Kafka设计解析:Kafka High Availability (下)> <Kafka设计解析:Replication工具> <Kafka设计解析:Kafka Consumer解析> Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

kafka学习(三)-kafka集群搭建

kafka集群搭建 下面简单的介绍一下kafka的集群搭建,单个kafka的安装更简单,下面以集群搭建为例子. 我们设置并部署有三个节点的 kafka 集合体,必须在每个节点上遵循下面的步骤来启动 kafka 服务器,kafka集群需要依赖zookeeper集群,上一篇已经说道了zookeeper的搭建,方法请参考:http://www.cnblogs.com/chushiyaoyue/p/5615267.html 1.环境准备 测试服务器(2n+1)奇数台 192.168.181.128 ce

Zookeeper 集群+kafka集群+kafka manager搭建

软件需求,软件包都上传到 /usr/local/src目录: jdk-8u101-linux-x64.tar.gz kafka.2.11-0.8.22.tar.gz zookeeper-3.4.9.tar.gz kafka-manager-1.3.0.7.zip * kafka-manager是通过scala打包获取一个编译完的项目,需要提前编译好,参考 https://github.com/yahoo/kafka-manager 硬件需求,四个主机: 192.168.100.100 : kaf

[异常处理]class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread)

在kafka.out日志里出现大量 ERROR [ReplicaFetcherThread-0-1], Error for partition [FLAG_DATA_SYC,1] to broker 1:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) 这是因为删除topic没删干净 在zookeeper里删除下列路径下的数据:/brokers/topics/[topic

Kafka教程(一)Kafka入门教程

1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件负责处理连接服务.消息的路由和传送.持久性.安全性以及日志记录.消息服务器可以使用一个或多个代理实例. JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生.发送.接收消息的接口简化企业应用的开发,翻译为Java

Kafka实战系列--Kafka API使用体验

前言: kafka是linkedin开源的消息队列, 淘宝的metaq就是基于kafka而研发. 而消息队列作为一个分布式组件, 在服务解耦/异步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要讲解kafka的一些基本概念和api的使用. *) 准备工作1) 配置maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</

向spark集群提交消费kafka应用时kafka鉴权配置问题

提交消费kafka应用里面包含sasl.jaas.config,通常需要配置文件.但是打成jar包后的应用,通过classload读不到jar包中配置文件. 需要初始化kafka时增加properties属性. kafkaParams.put("sasl.jaas.config", "xxxx required\n"+                 " accessKey=\"xxxx\"\n"+              

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

如何配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 安装spark的服务器: 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(八)安装zookeeper-3.4.12

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1