Kafka 通过python简单的生产消费实现

使用CentOS6.5、python3.6、kafkaScala 2.10  - kafka_2.10-0.8.2.2.tgz (asc, md5)

一、下载kafka

下载地址

https://kafka.apache.org/downloads

里面包含zookeeper

二、安装Kafka

1、安装zookeeper

mkdir /root/kafka/

tar -vzxf kafka_2.10-0.8.2.2

cd /root/kafka/kafka_2.10-0.8.2.2

cat  config/zookeeper.properties | grep -v ‘#‘ >> config/zk.properties

mkdir -p /home/kafka/zk

vi zk.properties
dataDir=/home/kafka/zk  #因为zookeeper变更为zk,所以需要在这里修改一下

启动zookeeper(后台启动)

/root/kafka/kafka_2.10-0.8.2.2/bin/zookeeper-server-start.sh /root/kafka/kafka_2.10-0.8.2.2/config/zk.properties &

2、安装Kafka

cd /root/kafka/kafka_2.10-0.8.2.2

cat config/server.properties | grep -v ‘#‘  >> config/kafka_01.properties

启动Kafka(后台启动)

/root/kafka/kafka_2.10-0.8.2.2/bin/kafka-server-start.sh /root/kafka/kafka_2.10-0.8.2.2/config/kafka_01.properties &

三、新建Kafka topic

1、新建topic

cd /root/kafka/kafka_2.10-0.8.2.2

./bin/kafka-topics.sh --create --zookeeper 192.168.50.33:2181 --replication-factor 1 --partitions 1 --topic test

2、查看topic

./bin/kafka-topics.sh --list --zookeeper 192.168.50.33:2181

四、kafka生产者脚本

1、安装python的Kafka模块

pip3 install kafka-python(之前已安装)

2、kafka生产者脚本

cat kafka_pro.py
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time

class Kafka_producer():
    ‘‘‘
    使用kafka的生产模块
    ‘‘‘

def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers=‘{kafka_host}:{kafka_port}‘.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
        ))

def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode(‘utf-8‘))
            producer.flush()
        except KafkaError as e:
            print(e)

def main():
    ‘‘‘
    测试consumer和producer
    :return:
    ‘‘‘
    # 测试生产模块
    producer = Kafka_producer("127.0.0.1",9092,"test")
    for i in range(1000000000000):
        params = ‘test---‘ + str(i)
        print(params)
        producer.sendjsondata(params)
        time.sleep(1)

if __name__ == ‘__main__‘:
    main()
    import os
    print(os.uname)

五、kafka消费者脚本

cat kafka_cust.py
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time

class Kafka_consumer():
    ‘‘‘
    使用Kafka—python的消费模块
    ‘‘‘

def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id=self.groupid,
                                      bootstrap_servers=‘{kafka_host}:{kafka_port}‘.format(
                                          kafka_host=self.kafkaHost,
                                          kafka_port=self.kafkaPort))

def consume_data(self):
        try:
            for message in self.consumer:
                # print json.loads(message.value)
                yield message
        except KeyboardInterrupt as e:
            print(e)

def main():
    ‘‘‘
    测试consumer和producer
    :return:
    ‘‘‘
    # 测试消费模块
    # 消费模块的返回格式为ConsumerRecord(topic=u‘ranktest‘, partition=0, offset=202, timestamp=None,
    # \timestamp_type=None, key=None, value=‘"{abetst}:{null}---0"‘, checksum=-1868164195,
    # \serialized_key_size=-1, serialized_value_size=21)
    consumer = Kafka_consumer(‘127.0.0.1‘,
                              9092,
                              "test",
                              ‘test-python-test‘)
    message = consumer.consume_data()
    for i in message:
        print(i.value)

if __name__ == ‘__main__‘:
    main()

整理自:

https://www.cnblogs.com/hunttown/p/9041036.html

https://gitee.com/jalright/scriptstodo/blob/master/kafka/producer.py

https://gitee.com/jalright/scriptstodo/blob/master/kafka/cunsumer.py

原文地址:https://www.cnblogs.com/xibuhaohao/p/11721187.html

时间: 2024-10-11 12:25:09

Kafka 通过python简单的生产消费实现的相关文章

Kafka创建&查看topic,生产&消费指定topic消息

启动zookeeper和Kafka之后,进入kafka目录(安装/启动kafka参考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html) 1.创建Topic 1)运行命令: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1 2181 是zookeeper 端口 图示为创建成

【Apache Kafka】Kafka安装及简单示例

(一)Apache Kafka安装 1.安装环境与前提条件 ??安装环境:Ubuntu16.04 ??前提条件: ubuntu系统下安装好jdk 1.8以上版本,正确配置环境变量 ubuntu系统下安装好scala 2.11版本 安装ZooKeeper(注:kafka自带一个Zookeeper服务,如果不单独安装,也可以使用自带的ZK) 2.安装步骤 ??Apache基金会开源的这些软件基本上安装都比较方便,只需要下载.解压.配置环境变量三步即可完成,kafka也一样,官网选择对应版本下载后直接

【JAVA】wait和notify用法,附生产/消费模型

关于wait和notify的用法,网上已经有很多详细解释了,我只是简单的总结下. wait用于释放锁A,并让wait所在的线程阻塞.除非被持有锁A的其它线程执行notify来唤醒,它才能重新"活"过来. notify用于唤醒因为等待锁A而阻塞的线程,让它们做好竞争锁A的准备.如果有多个线程因等待锁A而被阻塞,notify只唤醒一个,唤醒所有用notifyAll. 参考下面的线程状态图,对理解wait和notify有很大的帮助. 总结: wait和notify通常和synchronize

Python 简单爬虫案例

Python 简单爬虫案例 import requests url = "https://www.sogou.com/web" # 封装参数 wd = input('enter a word') param = { 'query':wd } response = requests.get(url=url,params=param) page_text = response.content fileName = wd+'.html' with open(fileName,'wb') as

Python简单操作笔记

Python 类型转换 str(),repr()|format() : 将非字符类型转成子串 int() : 转为整形 float() : 转为浮点型 list(s) : 将字串s转成列表 tuple(s) : 将字串s转成元组 set(s) : 将字串s转成集合 frozenset(s) : 将字串s转成不可变集合 dict(s) : 创建字典 其d必须是(key,value)的元组序列; chr(x) : 将整形转成字符 ord(x) : 将字符转成整形 hex(x) : 将整形转换成16进

Python简单实现基于VSM的余弦相似度计算

在知识图谱构建阶段的实体对齐和属性值决策.判断一篇文章是否是你喜欢的文章.比较两篇文章的相似性等实例中,都涉及到了向量空间模型(Vector Space Model,简称VSM)和余弦相似度计算相关知识.        这篇文章主要是先叙述VSM和余弦相似度相关理论知识,然后引用阮一峰大神的例子进行解释,最后通过Python简单实现百度百科和互动百科Infobox的余弦相似度计算. 一. 基础知识 第一部分参考我的文章: 基于VSM的命名实体识别.歧义消解和指代消解 第一步,向量空间模型VSM 

3、传统线程同步与通信--生产消费例子

核心点: 1.锁对象必须是同一个. 2.wait()和notify()方法必须是调用锁对象的方法,而非this(线程)的. 3.在多生产多消费的时候注意使用notifyAll而不是notifyAll,否则会造成死锁 测试代码: 1 import java.util.LinkedList; 2 import java.util.Queue; 3 import java.util.Random; 4 5 /** 6 * 多个生产 - 消费 线程同步通信 7 * 核心点: 8 * 1.锁对象必须是同一

Python 简单爬虫

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import os import time import webbrowser as web import random count = random.randint(20,40) j = 0 while j < count:     i = 0     while i <= 5:         web.open_new_tab('http://www.cnblogs.com/evilxr/p/37642

python网络爬虫入门(二)——用python简单实现调用谷歌翻译

最近在看国外的文档,有些生词不认识.就用谷歌翻译来理解,用着用着闲来无事就按F12查看了下页面的源代码.发现可以用python简单的实现下谷歌翻译的页面功能.于是先上网搜下有没有类似的文章博客,发现几篇不错的,于是参考其他代码与自己的思路,简单的实现了下翻译的功能,代码如下: import re import urllib,urllib2 #----------模拟浏览器的行为,向谷歌翻译发送数据,然后抓取翻译结果,这就是大概的思路------- def Gtranslate(text): #t