sparkstreaming+socket workCount 小案例

Consumer代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
object NetWorkStream {
def main(args: Array[String]): Unit = {
//创建sparkConf对象
var conf=new SparkConf().setMaster("spark://192.168.177.120:7077").setAppName("netWorkStream");
//创建streamingContext:是所有数据流的一个主入口
//Seconds(1)代表每一秒,批量执行一次结果
var ssc=new StreamingContext(conf,Seconds(1));
//从192.168.99.143接受到输入数据
var lines= ssc.socketTextStream("192.168.99.143", 9999);
//计算出传入单词的个数
var words=lines.flatMap { line => line.split(" ")}
var wordCount= words.map { w => (w,1) }.reduceByKey(_+_);
//打印结果
wordCount.print();
ssc.start();//启动进程
ssc.awaitTermination();//等待计算终止
}

在另一台机器上出入

nc -lk 9999
zhang xing sheng zhang

消费者终端会显示消费结果

17/03/25 14:10:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 128.0 (TID 134) in 30 ms on 192.168.177.120 (1/1)
17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 128.0, whose tasks have all completed, from pool
17/03/25 14:10:33 INFO scheduler.DAGScheduler: ResultStage 128 (print at NetWorkStream.scala:18) finished in 0.031 s
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Job 64 finished: print at NetWorkStream.scala:18, took 0.080836 s
17/03/25 14:10:33 INFO spark.SparkContext: Starting job: print at NetWorkStream.scala:18
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Got job 65 (print at NetWorkStream.scala:18) with 1 output partitions
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Final stage: ResultStage 130 (print at NetWorkStream.scala:18)
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 129)
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Missing parents: List()
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Submitting ResultStage 130 (ShuffledRDD[131] at reduceByKey at NetWorkStream.scala:17), which has no missing parents
17/03/25 14:10:33 INFO memory.MemoryStore: Block broadcast_67 stored as values in memory (estimated size 2.8 KB, free 366.2 MB)
17/03/25 14:10:33 INFO memory.MemoryStore: Block broadcast_67_piece0 stored as bytes in memory (estimated size 1711.0 B, free 366.2 MB)
17/03/25 14:10:33 INFO storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on 192.168.177.120:37341 (size: 1711.0 B, free: 366.3 MB)
17/03/25 14:10:33 INFO spark.SparkContext: Created broadcast 67 from broadcast at DAGScheduler.scala:1012
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 130 (ShuffledRDD[131] at reduceByKey at NetWorkStream.scala:17)
17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Adding task set 130.0 with 1 tasks
17/03/25 14:10:33 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 130.0 (TID 135, 192.168.177.120, partition 1, NODE_LOCAL, 6468 bytes)
17/03/25 14:10:33 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 135 on executor id: 0 hostname: 192.168.177.120.
17/03/25 14:10:33 INFO storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on 192.168.177.120:45262 (size: 1711.0 B, free: 366.3 MB)
17/03/25 14:10:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 130.0 (TID 135) in 14 ms on 192.168.177.120 (1/1)
17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 130.0, whose tasks have all completed, from pool
17/03/25 14:10:33 INFO scheduler.DAGScheduler: ResultStage 130 (print at NetWorkStream.scala:18) finished in 0.014 s
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Job 65 finished: print at NetWorkStream.scala:18, took 0.022658 s
-------------------------------------------
Time: 1490422233000 ms
-------------------------------------------
(xing,1)
(zhang,2)
(sheng,1)
 

备注:

var conf=new SparkConfig();

new StreamingContext(conf,Seconds(1));//创建context

  1. 定义上下文之后,你应该做下面事情
  2. After a context is defined, you have to do the following.
  3. 根据创建DStream定义输入数据源
  4. Define the input sources by creating input DStreams.
  5. 定义计算方式DStream转换和输出
  6. Define the streaming computations by applying transformation and output operations to DStreams.
  7. 使用streamingContext.start()启动接受数据的进程
  8. Start receiving data and processing it using streamingContext.start().
  9. 等待进程结束
  10. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  11. 手动关闭进程
  12. The processing can be manually stopped using streamingContext.stop().

要点

  1. 一旦一个上下文启动,不能在这个上下文中设置新计算或者添加
  2. Once a context has been started, no new streaming computations can be set up or added to it.
  3. 一旦一个上下文停止,就不能在重启
  4. Once a context has been stopped, it cannot be restarted.
  5. 在同一时间一个jvm只能有一个StreamingContext 在活动
  6. Only one StreamingContext can be active in a JVM at the same time.//ssc.stop(false)
  7. 在StreamingContext 上使用stop函数,同事也会停止sparkContext,仅仅停止StreamingContext,在调用stopSparkContext设置参数为false
  8. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  9. 一个SparkContext 可以创建多个streamingContext和重用,只要在上一个StreamingContext停止前创建下一个StreamingContext
  10. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
时间: 2024-10-08 15:32:08

sparkstreaming+socket workCount 小案例的相关文章

C# Socket通信 小案例

本文将编写2个控制台应用程序,一个是服务器端(server),一个是客户端(client), 通过server的监听,有新的client连接后,接收client发出的信息. server代码如下: 1 using System; 2 using System.Net; 3 using System.Net.Sockets; 4 using System.Text; 5 using System.Threading; 6 7 namespace Server 8 { 9 class Program

四周第一次课(1月2日) 4.10/lvm讲解 4.11/lvm讲解 4.12/lvm讲解 4.13 磁盘故障小案例

四周第一次课(1月2日)4.10/lvm讲解4.11/lvm讲解4.12/lvm讲解4.13 磁盘故障小案例 onnecting to 192.168.183.128:22...Connection established.To escape to local shell, press 'Ctrl+Alt+]'. Last login: Tue Jan 2 19:34:17 2018[[email protected] ~]# ip add1: lo: <LOOPBACK,UP,LOWER_UP

Windows Server之浅谈SMB以及SMB小案例分享

SMB由来 服务器消息区块(英语:Server Message Block,缩写为SMB,服务器消息区块),又称网络文件共享系统(英语:Common Internet File System,缩写为CIFS),一种应用层网络传输协议,由微软开发,主要功能是使网络上的机器能够计算机文件.打印机.串行端口和通讯等资源.它也提供经认证的进程间通信机能.它主要用在装有Microsoft Windows的机器上,在这样的机器上被称为Microsoft Windows Network. SMB版本 OS W

Thinkphp 生成订单号小案例

Thinkphp 生成订单号小案例小伙伴们在日常的商城项目开发中,都会遇到订单号生成的问题,今天呢思梦PHP就带领大家去解读一下生成订单号的问题!首先,订单号我们要明确它有有3个性质:1.唯一性 2.不可推测性 3.效率性,唯一性和不可推测性不用说了,效率性是指不能频繁的去数据库查询以避免重复.况且满足这些条件的同时订单号还要足够的短.不知道小伙伴们在日常的项目中是否也和我一样去思考过生成订单的一些小问题,可能你也会说,这些东西不用想的那么复杂,其实呢,小编也是同意大家的看法,但是殊不知我们做程

几个数据库的小案例(一):将文本文件中的信息导入数据库的表中

从文本文件添加到数据库用户表的记录(有两个文件:frmMain.cs  SqlHelper.cs  ) //FrmMain.cs//作者:Meusing System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Fo

Java小案例(行星移动)

Java小案例 行星移动:参考:三百集 使用软件:idea2017,java 1,图片集:这里  (idea图片源放在target目录下,才能访问到),建议从小往上看... 2,定义MyFrame package my.university; import java.awt.*; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; public class MyFrame extends Frame {

图书借阅的小案例

这个小案例,主要练习类和方法的创建与调用 void Main() { while (true) { Console.WriteLine ("请输入标题或作者"); var key = Console.ReadLine().Trim(); Library.Search(key).Dump(); Console.WriteLine ("请输入要借阅/归还的图书编号"); var id = Convert.ToInt32(Console.ReadLine()); var

Cookie小案例-----记住浏览过的商品记录

Cookie小案例------记住浏览过的商品记录 我们知道,这个功能在电商项目中很常见.这里处理请求和页面显示都是由servlet实现,主要是为了体现cookie的作用, 实现功能如下: 1,点击购买的商品后,显示到另一页面 2,记住用户浏览过的商品,并在页面时中显示 3,当浏览过的数量超过最大值限度时,最下面一个商品被挤下去 4,当浏览过的商品本身就在浏览记录中,显示列表将其从中间移到最上面 显示一打开网站的样子和显示用户的浏览记录: package cn.itcast.cookie; im

Session小案例-----简单购物车的使用

Session小案例-----简单购物车的使用 同上篇一样,这里的处理请求和页面显示同样用的都是servlet. 功能实现如下: 1,显示网站的所有商品 2,用户点击购买后,能够记住用户选择的商品 3,实现了多个会话共享一个session 4, 实现了浏览器禁用cookie后数据共享问题的处理 首页: package cn.itcast.shopping; import java.io.IOException; import java.io.PrintWriter; import java.io