测开之路七十四:python处理kafka

kafka-python地址:https://github.com/dpkp/kafka-python

安装kafka-python:pip install kafka-python

接收消息

from kafka import KafkaConsumerconsumer = KafkaConsumer(‘test‘, bootstrap_servers=[‘localhost:9092‘])  # 定义消费者for msg in consumer:    print(msg)

发条消息尝试接收,结果是byte类型

发送消息

from kafka import KafkaProducerimport json

data = ‘123qweasd45‘producer = KafkaProducer(bootstrap_servers=[‘localhost:9092‘],                         value_serializer=lambda v: json.dumps(v).encode(‘utf-8‘))# value_serializer=lambda v: json.dumps(v).encode(‘utf-8‘):由于默认发送的数据是byte,这里把要发送的数据序列化成jsonproducer.send(‘test‘, json.dumps(data), partition=0)producer.close()

执行投递消息,查看消费者的输出

窗口也可以展示

原文地址:https://www.cnblogs.com/zhongyehai/p/11285963.html

时间: 2024-10-05 04:53:46

测开之路七十四:python处理kafka的相关文章

测开之路七十六:性能测试蓝图之html

<!-- 继承base模板 -->{% extends 'base.html' %} {% block script %} <!-- 从cdn引入ace edter的js --> <script src="https://cdn.bootcss.com/ace/1.4.5/ace.js"></script> <script src="https://cdn.bootcss.com/ace/1.4.5/mode-python

测开之路七十八:性能测试蓝图之视图层

from flask import requestfrom flask import jsonifyfrom flask import Blueprintfrom flask import render_templatefrom performance.logic import Logic performance = Blueprint('performance', __name__, static_folder='static', template_folder='templates', ur

测开之路七十九:性能测试蓝图之执行逻辑

import osimport csvimport json from kafka import KafkaProducerfrom kafka import KafkaConsumer from common.mongo import Mongofrom common import get_case_id class Logic(object): def __init__(self): pass def start(self, data): """ 这是投递消息函数,只负责

测开之路七十六:linux变量和环境变量

变量 赋值 variable=0,访问 $var或${var} 参数 $n 用``引住的会先执行(~键) 位置参数 环境变量/etc/profile:全局的环境变量 . bash_profile:用户的环境变量,在哪个用户的home下,就对哪个用户生效~/ .bashrc:用户的环境变量,在哪个用户的home下,就对哪个用户生效export:导出变量,即用户在外面使用source:修改环境变量后让变量文件生效 添加a到环境变量 也可以export其他的变量 在脚本中使用环境变量 管道 | 管道操

测开之路七十七:性能测试蓝图之js

//定义全局的editor = nullvar editor = null; //ace_editor的初始化函数function ace_editor() { var editor = ace.edit("editor"); //初始化对象,"editor"为前端页面的id //设置风格和语言(更多风格和语言,请到github上相应目录查看) //editor.setTheme("ace/theme/clouds"); //编辑界面的主题--云

测开之路二十五:彩票游戏

玩法 import random money = random.randint(0, 999) def generate_code(money): return list(set([ money[0] + money[1] + money[2], money[0]+money[2]+money[1], money[1] + money[0] + money[2], money[1] + money[2] + money[0], money[2] + money[0] + money[1], mo

测开之路七十三:用kafka实现消息队列之环境搭建

一:装java环境,确保java能正确调用 kafka下载地址:http://kafka.apache.org/downloads 下载并解压kafka: 新建两个文件夹,用于存放zookeeper和kafka的log数据 修改配置: 1.新建zookeeper_data\zookeeper文件夹,用于存放zookeeper的数据 编辑config--zookeeper.properties的dataDir为刚刚创建的文件夹,clientPort为zookeeper的默认端口号 2.新建一个文件

测开之路七十七:shell之if、case、for、while

选择语句(if语句) 大于:-gt判断目录是否存在:-d if [ 判断条件 ]; then statement1 Statement2elif [ 判断条件 ]; then statement1 Statement2.......else statement3 statement4fi 判断一个目录是否存在,如果存在,就删除,如果不存在,就创建 分支语句(case语句) case 值 inval1)    command1    command2    ...    commandN ;;va

测开之路七:列表

查看有哪些函数可以用 append # 在列表末尾添加新的对象count    # 统计某个元素在列表中出现的次数extend   # 两个列表合并为一个index    # 找出某个值第一个匹配项的索引位置insert     # 将对象插入列表pop    # 移除一个元素并且返回该元素的值remove     # 移除列表中某个值的第一个匹配项reverse        # 反向列表中元素sort       # 对原列表进行排序pop        # 按照索引位置删除元素clear