SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件

# -*- coding: UTF-8 -*-
#!/bin/env python3

# filename readFromKafkaStreamingGetLocation.py

import IP
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import datetime

class KafkaMessageParse:

    def extractFromKafka(self,kafkainfo):
        if type(kafkainfo) is tuple and len(kafkainfo) == 2:
            return kafkainfo[1]

    def lineFromLines(self,lines):
        if lines is not None and len(lines) > 0:
            return lines.strip().split("\n")

    def messageFromLine(self,line):
        if line is not None and "message" in line.keys():
            return line.get("message")

    def ip2location(self,ip):
        result = []
        country = ‘country‘
        province = ‘province‘
        city = ‘city‘
        ipinfo = IP.find(ip.strip())
        try:
            location = ipinfo.split("\t")
            if len(location) == 3:
                country = location[0]
                province = location[1]
                city = location[2]
            elif len(location) == 2:
                country = location[0]
                province = location[1]
            else:
                pass
        except Exception:
            pass
        result.append(ip)
        result.append(country)
        result.append(province)
        result.append(city)
        return result

    def vlistfromkv(self, strori, sep1, sep2):
        resultlist = []
        fields = strori.split(sep1)
        for field in fields:
            kv = field.split(sep2)
            resultlist.append(kv[1])
        return resultlist

    def extractFromMessage(self, message):
        if message is not None and len(message) > 1:
            if len(message.split("\u0001")) == 8:
                resultlist = self.vlistfromkv(message, "\x01", "\x02")
                source = resultlist.pop()
                ip = resultlist.pop()
                resultlist.extend(self.ip2location(ip))
                resultlist.append(source)
                result = "\x01".join(resultlist)
        return result

def tpprint(val, num=10000):
    """
    Print the first num elements of each RDD generated in this DStream.
    @param num: the number of elements from the first will be printed.
    """
    def takeAndPrint(time, rdd):
        taken = rdd.take(num + 1)
        print("########################")
        print("Time: %s" % time)
        print("########################")
        DATEFORMAT = ‘%Y%m%d‘
        today = datetime.datetime.now().strftime(DATEFORMAT)
        myfile = open("/data/speech/speech." + today, "a")
        for record in taken[:num]:
            print(record)
            myfile.write(str(record)+"\n")
        myfile.close()
        if len(taken) > num:
            print("...")
        print("")

    val.foreachRDD(takeAndPrint)

if __name__ == ‘__main__‘:
    zkQuorum = ‘datacollect-1:2181,datacollect-2:2181,datacollect-3:2181‘
    topic = {‘speech-1‘: 1, ‘speech-2‘: 1, ‘speech-3‘: 1, ‘speech-4‘:1, ‘speech-5‘:1}
    groupid = "rokid-speech-get-location"
    master = "local[*]"
    appName = "SparkStreamingRokid"
    timecell = 5

    sc = SparkContext(master=master, appName=appName)
    ssc = StreamingContext(sc, timecell)
    # ssc.checkpoint("checkpoint_"+time.strftime("%Y-%m-%d", time.localtime(time.time())))

    kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    kmp = KafkaMessageParse()
    lines = kvs.map(lambda x: kmp.extractFromKafka(x))
    lines1 = lines.flatMap(lambda x: kmp.lineFromLines(x))
    valuedict = lines1.map(lambda x: eval(x))
    message = valuedict.map(lambda x: kmp.messageFromLine(x))
    rdd2 = message.map(lambda x: kmp.extractFromMessage(x))

    # rdd2.pprint()

    tpprint(rdd2)
    # rdd2.fileprint(filepath="result.txt")

    # rdd2.foreachRDD().saveAsTextFiles("/home/admin/agent/spark/result.txt")

    # sc.parallelize(rdd2.cache()).saveAsTextFile("/home/admin/agent/spark/result", "txt")

    # rdd2.repartition(1).saveAsTextFiles("/home/admin/agent/spark/result.txt")

    ssc.start()
    ssc.awaitTermination()

主要是重写pprint()函数

参考:https://stackoverflow.com/questions/37864526/append-spark-dstream-to-a-single-file-in-python

时间: 2024-10-13 01:57:24

SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件的相关文章

flume 读取kafka 数据

本文介绍flume读取kafka数据的方法 代码: /******************************************************************************* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with

python读取数据库数据,读取出的中文乱码问题

最近遇到python读取数据库数据,读取出的中文乱码问题, 网络搜索的基本是: "1. Python文件设置编码 utf-8 (文件前面加上 #encoding=utf-8)2. MySQL数据库charset=utf-83. Python连接MySQL是加上参数 charset=utf84. 设置Python的默认编码为 utf-8 (sys.setdefaultencoding(utf-8)" 这些,一一尝试后仍未解决.去数据库查看了下,发现这个出现中文乱码的字段类型是varcha

python读取excel数据

excel是很常用的表格工具.不过,对程序员来说,这可不是件好事件.因为excel的数据既不像txt那样,随意一种语言.脚本,写个函数就能把数据读出来分析.也不像JSON这种开源的数据格式,有N多的开源库来读取.就算没有,知道数据格式,自己写个库也自己用不至于太难. 要想读取excel的数据,大多使用ODBC数据库驱动或者COM的方式.对于ODBC,不同的语言有不同的实现,如java的JDBC.而对于COM方式,几乎是在后台运行一个excel程序,像new Excel.Application()

python——读取MATLAB数据文件 *.mat

鉴于以后的目标主要是利用现有的Matlab数据(.mat或者.txt),主要考虑python导入Matlab数据的问题.以下代码可以解决python读取.mat文件的问题.主要使用sicpy.io即可.sicpy.io提供了两个函数loadmat和savemat,非常方便. # adapted from http://blog.csdn.net/rumswell/article/details/8545087 import scipy.io as sio #import matplotlib.p

java spark-streaming接收TCP/Kafka数据

本文将展示 1.如何使用spark-streaming接入TCP数据并进行过滤: 2.如何使用spark-streaming接入TCP数据并进行wordcount: 内容如下: 1.使用maven,先解决pom依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1

Spark Streaming 读取 Kafka 数据的两种方式

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同. 1. Receiver-based Approach val kafkaStream = KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 2. Direct Approach (No Receivers) v

利用Python读取json数据并求数据平均值

要做的事情:一共十二个月的json数据(即12个json文件),json数据的一个单元如下所示.读取这些数据,并求取各个(100多个)城市年.季度平均值. { "time_point": "2014-01", "area": "***", "aqi": "71", "pm2_5": "47", "pm10": "69&

python 读取SQLServer数据插入到MongoDB数据库中

# -*- coding: utf-8 -*-import pyodbcimport osimport csvimport pymongofrom pymongo import ASCENDING, DESCENDINGfrom pymongo import MongoClientimport binascii '''连接mongoDB数据库'''client = MongoClient('10.20.4.79', 27017)#client = MongoClient('10.20.66.10

python读取文本文件数据

本文要点刚要: (一)读文本文件格式的数据函数:read_csv,read_table 1.读不同分隔符的文本文件,用参数sep 2.读无字段名(表头)的文本文件 ,用参数names 3.为文本文件制定索引,用index_col 4.跳行读取文本文件,用skiprows 5.数据太大时需要逐块读取文本数据用chunksize进行分块. (二)将数据写成文本文件格式函数:to_csv 范例如下: (一)读取文本文件格式的数据集 1.read_csv和read_table的区别:  #read_cs