kafka spark streaming例子——TODO 没有成功



python3+spark2.1+kafka0.8+sparkStreaming


python代码:

import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add

sc = SparkContext(master="local[1]",appName="PythonSparkStreamingRokidDtSnCount")
ssc = StreamingContext(sc, 2)
zkQuorum = ‘localhost:2181‘
topic = {‘rokid‘:1}
groupid = "test-consumer-group"
lines = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
lines1 = lines.flatMap(lambda x: x.split("\n"))
valuestr = lines1.map(lambda x: x.value.decode())
valuedict = valuestr.map(lambda x:eval(x))
message = valuedict.map(lambda x: x["message"])
rdd2 = message.map(lambda x: (time.strftime("%Y-%m-%d",time.localtime(float(x.split("\u0001")[0].split("\u0002")[1])/1000))+"|"+x.split("\u0001")[1].split("\u0002")[1],1)).map(lambda x: (x[0],x[1]))
rdd3 = rdd2.reduceByKey(add)
rdd3.saveAsTextFiles("/tmp/wordcount")
rdd3.pprint()
ssc.start()
ssc.awaitTermination()

执行SparkStreaming:

spark/bin/spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar ReadFromKafkaStreaming.py

其中spark-streaming-kafka-0.98-assembly_2.11-2.1.0.jar从以下网站下载
http://search.maven.org

作为入门参考。

"""
 Counts words in UTF8 encoded, ‘\n‘ delimited text received from the network every second.
 Usage: kafka_wordcount.py <zk> <topic>
 To run this on your local machine, you need to setup Kafka and create a producer first, see
 http://kafka.apache.org/documentation.html#quickstart
 and then run the example
    `$ bin/spark-submit --jars       external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar       examples/src/main/python/streaming/kafka_wordcount.py       localhost:2181 test`
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" "))         .map(lambda word: (word, 1))         .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test

 

kafka 使用:

1.3 Quick Start

This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data. Since Kafka console scripts are different for Unix-based and Windows platforms, on Windows platforms use bin\windows\ instead of bin/, and change the script extension to .bat.

Step 1: Download the code

Download the 0.11.0.0 release and un-tar it.


1

2

> tar -xzf kafka_2.11-0.11.0.0.tgz

> cd kafka_2.11-0.11.0.0

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don‘t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.


1

2

3

> bin/zookeeper-server-start.sh config/zookeeper.properties

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

...

Now start the Kafka server:


1

2

3

4

> bin/kafka-server-start.sh config/server.properties

[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

...

Step 3: Create a topic

Let‘s create a topic named "test" with a single partition and only one replica:


1

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

We can now see that topic if we run the list topic command:


1

2

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.

Run the producer and then type a few messages into the console to send to the server.


1

2

3

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

Step 5: Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output.


1

2

3

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

时间: 2024-11-07 09:17:15

kafka spark streaming例子——TODO 没有成功的相关文章

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

如何配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 安装spark的服务器: 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(八)安装zookeeper-3.4.12

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 如何安装hadoop2.9.0请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)安装hadoop2.9.0> 如何安装spark2.2.1请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(三)安装spark2.2.1

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。

Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.>配置好虚拟机,正在使用中,让它强制断电后,启动起来发现ip无法访问,而且重启网络失败: 执行:systemctl restart network.service 出现异常:Failed to start LSB: Br

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)kafka+spark streaming打包好的程序提交时提示虚拟内存不足(Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 G)

异常问题:Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container. spark-submit提交脚本: [[email protected] work]$ more submit.sh #! /bin/bash jars=""

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave2 具体请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 2. 安装zookeeper分布式集群具体请参考<Kafka:ZK+Kafka+Spark Streaming集群

kafka + spark Streaming + Tranquility Server发送数据到druid

花了很长时间尝试druid官网上说的Tranquility嵌入代码进行实时发送数据到druid,结果失败了,各种各样的原因造成了失败,现在还没有找到原因,在IDEA中可以跑起,放到线上就死活不行,有成功了的同仁希望贴个链接供我来学习学习:后来又尝试了从kafka实时发送到druid,还是有些错误,感觉不太靠谱:最后没办法呀,使用Tranquility Server呗 _ _! Tranquility Server的配置和启动请移步:https://github.com/druid-io/tran

关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式

一.前言 在使用Spark Streaming中的Kafka Direct API进行Kafka消费的过程中,通过spark-submit的方式提交jar包,会出现如下错误信息,提示无法找到KafkaUtils. Exceptionin thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at com.zhkmxx.scala.app.KafkaStream

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十七)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. 原文地址:https://www.cnblogs.com/yy3b2007com/p/9315