sparkStreaming实现wordcount

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Seconds

import org.apache.spark.streaming.StreamingContext

object  WordCount extends App {

val conf=new SparkConf().setMaster("local[2]").setAppName("wordcount")

val streamContext=new StreamingContext(conf,Seconds(5))

val lines=streamContext.socketTextStream("myhadoop1", 9999)

//监控主机的一个端口  用命令nc -lk 9999

lines.flatMap {_.split(" ")}.map {(_,1)}.reduceByKey(_+_).print()

streamContext.start()

streamContext.awaitTermination()

}

时间: 2024-08-02 17:52:21

sparkStreaming实现wordcount的相关文章

6.SparkStreaming之WordCount(UpdateStateByKey)

代码: import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext} object UpdateStateByKeyWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel

SparkStreaming

Spark Streaming用于流式数据的处理.Spark Streaming支持的数据输入源很多,例如:Kafka.Flume.Twitter.ZeroMQ和简单的TCP套接字等等.数据输入后可以用Spark的高度抽象原语如:map.reduce.join.window等进行运算.而结果也能保存在很多地方,如HDFS,数据库等 和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream.DStream

Spark之SparkStreaming案例

一.Spark Streaming的介绍 ??Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理. 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理. 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板. 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法. ??在内部,它的工作原理如下. S

python3+spark2.1+kafka0.8+sparkStreaming

python代码: import time from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from operator import add sc = SparkContext(master="local[1]",appName="PythonSparkStreamingR

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一.Hadoop配置安装 注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库, 所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 ######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/hosts里面要配置的是内网IP地址和主机名的映射关系 4.关闭防火墙 5.s

Spark版本定制第1天:通过案例对SparkStreaming透彻理解之一

本期内容: 1 Spark Streaming另类在线实验 2 瞬间理解Spark Streaming本质 在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下.这里选择Spark Streaming作为版本定制的切入点也是大势所趋. 小技巧:将Batch interval放大,相当于看到了Streaming的慢放版本,可以更清楚它的各个环节,这里以黑名单过滤程序

SparkStreaming之基本数据源输入

输入DStreams表示从数据源获取的原始数据流.Spark Streaming拥有两类数据源 (1)基本源(Basic sources):这些源在StreamingContext API中直接可用.例如文件系统.套接字连接. Akka的actor等. (2)高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等. 1.基本数据源输入源码 SparkStream 对于外部的数据输入源,一共有下面几种: (1)用户自定义的数据源:recei

通过案例对SparkStreaming透彻理解之一

本期内容: 1 Spark Streaming另类在线实验 2 瞬间理解Spark Streaming本质 在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的 诸如SQL,MLlib等强大框架,它必将一统天下.这里选择Spark Streaming作为版本定制的切入点也是大势所趋. 小技巧:将Batch interval放大,相当于看到了Streaming的慢放版本,可以更清楚它的各个环节,这里以黑名单过滤程

SparkStreaming入门及例子

看书大概了解了下Streaming的原理,但是木有动过手啊...万事开头难啊,一个wordcount 2小时怎么都运行不出结果.是我太蠢了,好了言归正传. SparkStreaming是一个批处理的流式计算框架,适合处理实时数据与历史数据混合处理的场景(比如,你用streaming将实时数据读入处理,再使用sparkSQL提取历史数据,与之关联处理).Spark Streaming将数据流以时间片为单位分割形成RDD,使用RDD操作处理每一块数据,没块数据都会生成一个spark JOB进行处理,