底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1、一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交;2、Scala学习角度讲,比Java难。找Scala的高手比Java难,项目的维护和二次开发比较困难;3、很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combationHadoop以后作为基础设施,Spark作为灵魂1、下载最新版的Eclipse2、解压并启动Eclipse3、创建Maven工程4、建立quick-start5、通过buildpath把默认的java5变成java86、配置pom.xml程序,添加程序开发时候的相关依赖,并配置具体打包的buildhttp://maven.outofmemory.cn/org.apache.sparkpackage com.dt.spark.SparkApps.cores;
import java.util.Arrays;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/** * 使用JAVA的方式进行本地测试开发Spark的Word Count * * @author DT大数据梦工厂 * */
public class WordCount {
        public static void main(String[] args) {               // TODO Auto-generated method stub               /**               * 1、创建Spark配置对象SparkConf,设置Spark程序的运行时的程序配置信息               * 例如:通过setMaster来设置程序要连接的Spark集群的 url,               * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如只有1G的内存)的初学者               */              SparkConf conf = new SparkConf().setAppName("Spark Word Count In JAVA").setMaster("local");
               /**               * 2、创建SparkContext对象               * SparkContext是Spark程序所有功能的唯一入口,无论采用 Scala、Java、Python、R等都必须要(               * 不同的语言具体的类名称不同,如果是JAVA,则为JavaSparkContext)               * SparkContext核心作用:初始化Spark应用程序所运行所需要的核心组件,包括DAGScheduler、TaskScheduler               * 、SchedulerBackend 同时还会负责Spark程序往Master注册程序等               * SparkContext是整个Spark应用程序中最为至关重要的一个对象               */              JavaSparkContext sc = new JavaSparkContext(conf );// 其地产曾实际上就是 Scala的SparkContext
               /**               * 3、根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD               * RDD创建基本有三种方式:根据外部的数据来源(例如HDFS),根据 Scala集合、由其它的RDD操作               * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴               */              JavaRDD<String> lines = sc.textFile( "F:/安装文件/操作系统/spark-1.6.0-bin-hadoop2.6/README.md" );
               /**               * 4、对初始的RDD进行transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算               * 4.1、将每一行的字符串拆分成单个的单词 FlatMapFunction<String,               * String>是匿名内部类第二个参数泛型,因为知道是String,所以用String               */              JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() {// 如果是 Scala,由于SAM转换,所以可以写成1行代码 val                                                                                                                                                       // words                                                                                                                                                       // =                                                                                                                                                       // lines.flatMap                                                                                                                                                       // {                                                                                                                                                       // line                                                                                                                                                       // =>                                                                                                                                                       // line.split("                                                                                                                                                       // ")                                                                                                                                                       // }
                      @Override                      public Iterable<String> call(String line ) throws Exception {                            // TODO Auto-generated method stub                            return Arrays.asList( line.split( " "));                     }              });
               /**               * 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word=>(word,1)               */              JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() {
                      @Override                      public Tuple2<String, Integer> call(String word) throws Exception {                            // TODO Auto-generated method stub                            return new Tuple2<String, Integer>(word, 1);                     }              });
               /**               * 4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数               */
              JavaPairRDD<String, Integer> wordCounts = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() {
                      @Override                      public Integer call(Integer v1 , Integer v2) throws Exception {                            // TODO Auto-generated method stub                            return v1 + v2 ;                     }              });                             wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {                                           @Override                      public void call(Tuple2<String, Integer> wordNumberPair) throws Exception {                            // TODO Auto-generated method stub                            System. out.println(wordNumberPair ._1 +":" +wordNumberPair ._2 );                     }              });                             /**            * 5、关闭Spark上下文释放相关资源            */           sc.close();
       }
}
关于在Windows下找不winutils.exe
https://github.com/amihalik/hadoop-common-2.6.0-bin/blob/master/bin/winutils.exe

作业:用JAVA方式采用maven方式开发WordCount并运行在集群上

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]

时间: 2024-10-20 05:46:07

底层战详解使用Java开发Spark程序(DT大数据梦工厂)的相关文章

Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等(DT大数据梦工厂)

内容: 1.Spark 1.6 RPC解析: 2.RPCEnv源码解析: 3.RPCEndpoint等源码解析: 以前和现在的RPC都是采用Akka,以前和现在的不同就在于RPCEnv,现在就是基于RPCEnv去做RPC通信的 ==========Spark 1.6 RPC解析============ 1.Spark 1.6推出了以RPCEnv.RPCEndpoint.RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka: 2.Akka是基于Sc

从物理执行的角度透视Spark Job(DT大数据梦工厂)

内容: 1.再次思考pipeline: 2.窄依赖物理执行内幕: 3.宽依赖物理执行内幕: 4.Job提交流程: 物理执行是更深层次的角度. ==========再次思考pipeline ============ 即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录: 2.f(records), f一次性作用于集合的全部数据: Spark运行的时候用的是第一种方式,为什么呢? 1.无需等待,

DT大数据梦工厂-Scala学习笔记(1):Scala开发环境搭建和HelloWorld解析

一.scala是函数式编程和面向对象编程结合的语言,这两种编程的特点分别是什么? 答:函数式编程擅长数值的计算:面向对象编程特别适合于大型工程或项目的组织,以及团队的分工协作. 二.Scala的特点 Scala结构优雅.计算精致.富有表现力 三.scala的安装需要什么资源? Java,推荐安装Java8(Java7也可以) 支持scala 2.10.*以上(可以安装2.10.4,http://scala-lang.org/download) 四.设置环境变量(系统变量): (1)JAVA_HO

DT大数据梦工厂第三十五课 Spark系统运行循环流程

本节课内容: 1.     TaskScheduler工作原理 2.     TaskScheduler源码 一.TaskScheduler工作原理 总体调度图: 通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理. 回顾: DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程:运行时从前往后运行的.每个Stage中有很多任务Task,Task是可以并行执行的.它们的执行逻辑完

DT大数据梦工厂Spark机器学习相关视频资料

大数据未来几年发展的重点方向,大数据战略已经在十八届五中全会上作为重点战略方向,中国在大数据方面才刚刚起步,但是在美国已经产生了上千亿的市场价值.举个例子,美国通用公司是一个生产飞机发动机的一个公司,这家公司在飞机发动机的每一个零部件上都安装了传感器,这些传感器在飞机发动机运作的同时不断的把发动机状态的数据传到通用公司的云平台上,通用公司又有很多数据分析中心专门接受这些数据,根据大数据的分析可以随时掌握每一家航空公司发动机的飞行状况,可以告知这些航空公司发动机的哪些部件需要检修或保养,避免飞机事

IDEA下Spark的开发(DT大数据梦工厂)

IDEA越使用效果越好,快捷键方便,阅读源码方便 一般阅读Spark或者Scala的源码都采用IDEA使用 下载IDEA最新版本的社区版本即可, 安装的时候必须安装Scala,这个过程是IDEA自动化的插件管理,所以点击后会自动下载(跳过在setting plugins里面也可以安装) 本地JAVA8和Scala2.10.4软件套件的安装和Eclipse不同 打开 打开之后点击File->Project Structure来设置工程的Libraries 核心是添加Spark的jar依赖 代码拷贝

Eclipse下开发Scala(DT大数据梦工厂)

本讲主要内容:环境安装.配置.本地模式.集群模式.自动化脚本.web状态监控 ==========单机============ 开发工具开发 下载最新版Scala For Eclipse 1.建立工程,修改scala编译版本 2.加入Spark1.6.0的jar文件依赖 下载 http://apache.opencas.org/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz spark-assembly-1.6.0-hadoop2.6.0.jar

Spark运行原理和RDD解析(DT大数据梦工厂)

Spark一般基于内存,一些情况下也会基于磁盘 Spark优先会把数据放到内存中,如果内存实在放不下,也会放到磁盘里面的 不单能计算内存放的下的数据,也能计算内存放不下的数据 实际如果数据大于内存,则要考虑数据放置策略和优化算法,因为Spark初衷是一寨式处理 小到5~10台的分布式大到8000台的规模,Spark都能运行 大数据计算问题:交互式查询(基于shell.sparkSQL).批处理.机器学习和计算等等 底层基于RDD,分布式弹性数据级,支持各种各样的比如流处理.SQL.SparkR等

CacheManager彻底解密:CacheManager运行原理流程图和源码详解(DT大数据梦工厂)

内容: 1.CacheManager重大价值: 2.CacheManager运行原理图: 3.CacheManager源码解析: BlockManager针对Cache这样的行为做了CacheManager Spark出色的原因: 1.Spark基于RDD构成了一体化.多元化的大数据处理中心(不需要再处理多种范式来部署多种框架,只要Spark!!!降低成本投入获得更高的产出): 2.迭代,因为在计算的时候迭代,在构建复杂算法的时候非常方便(图计算.机器学习.数据仓库),而CacheManager