83课:Scala和Java二种方式实战Spark Streaming开发

一、Java方式开发

1、开发前准备:假定您以搭建好了Spark集群。

2、开发环境采用eclipse maven工程,需要添加Spark Streaming依赖。

3、Spark streaming 基于Spark Core进行计算,需要注意事项:

设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有线程用于处理数据),随着时间的推移,内存和磁盘都会不堪重负)。

温馨提示:

对于集群而言,每隔exccutor一般肯定不只一个Thread,那对于处理Spark Streaming应用程序而言,每个executor一般分配多少core比较合适?根据我们过去的经验,5个左右的core是最佳的(段子:分配为奇数个core的表现最佳,例如:分配3个、5个、7个core等)

接下来,让我们开始动手写写Java代码吧!

第一步:创建SparkConf对象

第二步:创建SparkStreamingContext

我们采用基于配置文件的方式创建SparkStreamingContext对象:

第三步,创建Spark Streaming输入数据来源:

  我们将数据来源配置为本地端口9999(注意端口要求没有被占用):

第四步:我们就像对RDD编程一样,基于DStream进行编程,原因是DStream是RDD产生的模板,在Spark Streaming发生计算前,其实质是把每个Batch的DStream的操作翻译成为了RDD操作。

1、flatMap操作:

2、 mapToPair操作:

3、reduceByKey操作:

4、print等操作:

温馨提示:

除了print()方法将处理后的数据输出之外,还有其他的方法也非常重要,在开发中需要重点掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最为重要的是foreachRDD方法,这个方法可以将数据写入Redis,DB,DashBoard等,甚至可以随意的定义数据放在哪里,功能非常强大。

一、Scala方式开发

第一步,接收数据源:

第二步,flatMap操作:

第三步,map操作:

第四步,reduce操作:

第五步,print()等操作:

第六步:awaitTermination操作

总结:

使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。

编写人:IMF-Spark Steaming企业级开发实战小组(李林贵、姜伟等)

主编辑:王家林

备注:

资料来源于:DT_大数据梦工厂(IMF传奇行动绝密课程)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
Life is short,you need to Spark!

时间: 2024-10-25 02:26:07

83课:Scala和Java二种方式实战Spark Streaming开发的相关文章

第83课:Scala和Java二种方式实战Spark Streaming开发

一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有

第83讲:Scala和Java二种方式实战Spark Streaming开发

一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现 测试数据 java代码 1 package com.hzf.spark.study; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaPairRDD; 8 import org.apache.s

Scala学习2 ———— 三种方式完成HelloWorld程序

三种方式完成HelloWorld程序 分别采用在REPL,命令行(scala脚本)和Eclipse下运行hello world. 一.Scala REPL. 按照第一篇在windows下安装好scala后,直接Ctrl+R,然后在运行命令窗里输入scala,或者输入cmd后,进入命令行在输入scala. 然后我们输入 print("Hello World!") 看下结果: 第一种方式运行完毕. 注意:前两行命令使用了Tab键,可以像bash一样有补全的功能哦! 二.Scala脚本完成H

大数据基础教程:创建RDD的二种方式

大数据基础教程:创建RDD的二种方式 1.从集合中创建RDD val conf = new SparkConf().setAppName("Test").setMaster("local")      val sc = new SparkContext(conf)      //这两个方法都有第二参数是一个默认值2  分片数量(partition的数量)      //scala集合通过makeRDD创建RDD,底层实现也是parallelize      val 

SpringJUnit4加载类目录下(src)和WEF-INF目录下的配置文件二--获取注入的bean的二种方式

前言: spring容器以xml的形式注入bean,然后可以在类中获取,获取的形式主要有二种:第一种最简单--采用@Resource 或@Autowired关键字在加载spring文件时将bean注入到相应的类中:第二种方式是先用FileSystemXmlApplicationContext.ClassPathXmlApplicationContext 实例化ApplicationContext(Spring容器),然后调用其getBean方法获取. 下面直接代码说明: 注意:是在springJ

修改内核选项 重命名网卡名称的二种方式

第一种方式 修改Centos os 在安装的时候配置网卡名称为eth0 1.在系统启动过程中,出现引导安装界面的同时,按下TAB键 2.在底部修改内核选项 net.ifnames=0 biosdevname=0 回车,安装OS 第二种方式 安装系统后的修改方法 1.编辑网卡信息 cd /etc/sysconfig/network-scripts/ #进入网卡目录 mv ifcfg-en067761 ifcfg-eth0 #重命名网卡 cat ifcfg-eth0 TYPE=Ethernet BO

java几种方式实现单例设计模式

单例模式的几种实现方式: 一:饿汉式单例 方式一:枚举方式获得单例对象 方式二:静态属性获得单例对象 方式三:静态方法获得单例对象 二:懒汉式单例 方式一:静态方法获得单例对象(线程安全) 方式二:内部类方式去获取单例对象 示例: 恶汉式:方式一 enum Singleton{ INSTANCE;//单例 } 恶汉式:方式二 class Singleton{ public static final Singleton INSTANCE = new Singleton();//单例 private

Java 两种方式实现Token校验

方法一:AOP 代码如下定义一个权限注解 [java] view plain copy package com.thinkgem.jeesite.common.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Targe