python3.5读取kafka中的数据

安装包 pykafka

代码如下:

from pykafka import KafkaClient

client = KafkaClient(hosts="test43:9092")
print(client.topics)
topic = client.topics[b‘rokid‘]    #topic名称
consumer = topic.get_simple_consumer()
for record in consumer:
    if record is not None:
        valuestr = record.value.decode()   #从bytes转为string类型
        valuedict = eval(valuestr)
        message = valuedict["message"]
        fields = message.split("\u0001")
        for field in fields:
            kv = field.split("\u0002")
            if len(kv) == 2:
                print(kv[0],‘----‘,kv[1])
        print(‘-‘*100)

以上仅供开发测试使用,真正发布到线上需要多地方加固。。。

mark

时间: 2024-08-14 16:28:41

python3.5读取kafka中的数据的相关文章

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874 3.编写Pom文件 <?xml v

读取文件中的数据(以结构体存放)

/* *读取文件中的数据(数据以结构体存放) */ #include<iostream> #include <fstream> //#define Field 31 //field_anal number #define Field 15 //field_post number using namespace std; //the level restore certain level data //level_z show the level struct Level { int

JAVA写个东西读取TXT中的数据 且要计算出平均值和总值 最后还要按总值排序

AVA写个东西读取TXT中的数据 且要计算出平均值和总值 最后还要按总值排序 例如:要计算a.txt文档中内容可如下: 学号 姓名    语文 数学 英语 平均值 总值 排序 1    肯德基   90   98   97 2    经典款   98   97   92 3    肯德的   93   92   97 import java.io.*; import java.io.File; import java.util.ArrayList; import java.util.Iterat

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

读取Excel中的数据到DataSet

读取Excel中的数据到DataSet 1.引用命名空间 using System.Data.OleDb; 2.输入Excel文件,输出DataSet public DataSet ExecleDs()    {        string strConn = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=D:\\data-for-source-apportionment\\PM-SO2-NOx-CO-O3-201311-20140324.xlsx

C# 读取Excel中的数据

#region 读取Excel中的数据 /// <summary> /// 读取Excel中的数据 /// </summary> /// <param name="excelFile">Excel文件名及路径,EG:C:\Users\JK\Desktop\导入测试.xls</param> /// <returns>Excel中的数据</returns> private DataTable GetTable(stri

IDEA,SparkSql读取HIve中的数据

传统Hive计算引擎为MapReduce,在Spark1.3版本之后,SparkSql正式发布,并且SparkSql与apache hive基本完全兼容,基于Spark强大的计算能力,使用Spark处理hive中的数据处理速度远远比传统的Hive快.在idea中使用SparkSql读取HIve表中的数据步骤如下1.首先,准备测试环境,将hadoop集群conf目录下的core-site.xml.hdfs-site.xml和Hive中conf目录下hive-site.xml拷贝在resources

Saiku数据库迁移后的刷新脚本-Shell脚本读取数据库中的数据(二十三)

Saiku数据库迁移后的刷新脚本 之前有谈过对saiku中的数据进行刷新,因为saiku默认会从缓存中查询数据,但是配置不使用缓存又会效率低下... 所以这里就需要做一个数据刷新,每次ETL之后都需要执行一遍数据刷新脚本. 刷新脚本主要分为两部分 1.使用shell命令从数据库中读取已有的用户信息 (因为已经做过数据迁移,数据库已经从h2转为我们自己的mysql) saikuRefresh.sh #!/bin/bash #数据库连接信息 HOSTNAME="10.11.22.33" #

java读取请求中body数据

/** * 获取request中body数据 * * @author lifq * * 2017年2月24日 下午2:29:06 * @throws IOException */ public static String getRequestBodyData(HttpServletRequest request) throws IOException{ BufferedReader bufferReader = new BufferedReader(request.getReader()); S