kafka-3python生产者和消费者实用demo

程序分为productor.py是发送消息端,consumer为消费消息端,

启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,

productor.py

#!/usr/bin/env python2.7
#_*_coding: utf-8 _*_
from kafka import KafkaProducer

kafka_host = ‘192.168.1.200‘  # kafka服务器地址
kafka_port = 9092  # kafka服务器的端口

producer = KafkaProducer(bootstrap_servers=[‘{kafka_host}:{kafka_port}‘.format(
    kafka_host = kafka_host,
    kafka_port = kafka_port
)])

#简单for循环10次,发送10条消息
for i in range(1,10):
    message_string = ‘some message‘.format(i)

    #调用send方法,发送名字为‘topic1‘的topicid ,发送的消息为message_string
    response = producer.send(‘topic1‘, message_string.encode(‘utf-8‘))
    print response

consumer.py

#!/usr/bin/env python
#_*_coding: utf-8 _*_
import json
from kafka import *

kafka_host = ‘192.168.1.200‘  # kafka服务器地址
kafka_port = 9092  # kafka服务器端口

#消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id,
# 如果想一条消费多次消费,可以换一个group_id,会从头开始消费
consumer = KafkaConsumer(
    ‘topic1‘,
    group_id = ‘my-group‘,
    bootstrap_servers = [‘{kafka_host}:{kafka_port}‘.format(kafka_host=kafka_host, kafka_port=kafka_port)]
)
for message in consumer:
    #json读取kafka的消息
    content = json.loads(message.value)
    print content
时间: 2024-12-07 10:56:48

kafka-3python生产者和消费者实用demo的相关文章

使用java创建kafka的生产者和消费者

创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111        [[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3    查看Kafka的主题详情        [[email protected] kafka]# bin/kafka-topics.sh

kafka中生产者和消费者API

使用idea实现相关API操作,先要再pom.xml重添加Kafka依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <artifactId>jmxtools&

java_Thread生产者与消费者 Demo

1 package com.bjsxt.Thread.Demo; 2 public class ProducerConsumer { 3 /** 4 * 生产者与消费者 5 * @param args 6 */ 7 public static void main(String[] args) {// 模拟线程 8 SyncStack ss = new SyncStack(); 9 Producer p = new Producer(ss); 10 Consumer c = new Consume

Kafka之生产者消费者示例

本例以kafka2.10_0.10.0.0为例,不同版本的kafka Java api有些区别! 增加maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.0</version> </dependency> 生产者 package com.zns.k

[Golang] kafka集群搭建和golang版生产者和消费者

一.kafka集群搭建 至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了. 1. 下载zookeeper  https://zookeeper.apache.org/releases.html 2. 下载kafka http://kafka.apache.org/downloads 3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台) 1)配置zookeeper. 修改复制一份 zookeeper-3.4.13/c

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Kafka使用总结与生产消费Demo实现

什么是kafka Kafka官网自己的介绍是:一个可支持分布式的流平台.kafka官网介绍 kafka三个关键能力: 1.发布订阅记录流,类似于消息队列与企业信息系统 2.以容错的持久方式存储记录流 3.对流进行处理 kafka通常应用再两大类应用中: 1.构建实时流数据管道,在系统或应用程序之间可靠地获取数据 2.构建转换或响应数据流的实时流应用程序 kafka的一些基本概念: 1.Kafka作为一个集群运行在一个或多个服务器上,这些服务器可以跨越多个数据中心. 2.Kafka集群将记录流存储

Java中的生产者、消费者问题

Java中的生产者.消费者问题描述: 生产者-消费者(producer-consumer)问题, 也称作有界缓冲区(bounded-buffer)问题, 两个进程共享一个公共的固定大小的缓冲区(仓库). 其中一个是生产者, 用于将产品放入仓库: 另外一个是消费者, 用于从仓库中取出产品消费. 问题出现在当仓库已经满了, 而此时生产者还想向其中放入一个新的产品的情形, 其解决方法是让生产者此时进行等待, 等待消费者从仓库中取走了一个或者多个产品后再去唤醒它. 同样地, 当仓库已经空了, 而消费者还

多线程编程之生产者和消费者之间的问题

前段时间没事研究了一些生产者和消费者之间的问题,期间也查看了不少资料.又重新有了新的认识.特别作为一个IT农民工,必须要掌握的技能啊. 个人理解,这个应该说是一种模型吧,学会它,可以应用到多个方面的技术上去.数据流文件的读写,程序中的数据缓冲技术,播放缓冲技术等等. 废话不多说...直接上代码.下面是个C# 写的代码.比较粗糙,请谅解,有问题大家可以一起讨论研究. 1 using System; 2 using System.Threading; 3 4 namespace Consumer 5