00、Wordcount

1、pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>sparkcore</groupId>

<artifactId>sparkcore-java</artifactId>

<version>1.0</version>

<packaging>jar</packaging>

<name>sparkcore-java</name>

<url>http://maven.apache.org</url>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

</properties>

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.11</artifactId>

<version>2.1.1</version>

<scope>compile</scope>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.1.1</version>

<scope>compile</scope>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_2.11</artifactId>

<version>2.1.1</version>

<scope>compile</scope>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.11</artifactId>

<version>2.1.1</version>

<scope>compile</scope>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.7.3</version>

<scope>compile</scope>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka_2.11</artifactId>

<version>1.6.3</version>

<scope>compile</scope>

</dependency>

<dependency>

<groupId>com.fasterxml.jackson.core</groupId>

<artifactId>jackson-core</artifactId>

<version>2.8.9</version>

</dependency>

</dependencies>

</project>

2、Windows本地模拟运行

无需启动Spark服务即可在Windows环境下的Eclipse中运行此代码

2.1、Java版

package sparkcore;

import java.util.Arrays;

import java.util.Iterator;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**

* 使用java开发本地测试的wordcount程序

*

* @author Administrator

*

*/

public class WordCountLocal {

public static void main(String[] args) {

// 编写Spark应用程序

// 本地执行,是可以执行在eclipse中的main方法中,执行的

// 第一步:创建SparkConf对象,设置Spark应用的配置信息

// 使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url

// 但是如果设置为local则代表,在本地运行

SparkConf conf = new SparkConf().setAppName("WordCountLocal").setMaster("local");

// 第二步:创建JavaSparkContext对象

// 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写

// 都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括

// 调度器(DAGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等

// 一句话,SparkContext,是Spark应用中,可以说是最最重要的一个对象

// 但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用scala,

// 使用的就是原生的SparkContext对象

// 但是如果使用Java,那么就是JavaSparkContext对象

// 如果是开发Spark SQL程序,那么就是SQLContext、HiveContext

// 如果是开发Spark Streaming程序,那么就是它独有的SparkContext

// 以此类推

JavaSparkContext sc = new JavaSparkContext(conf);

// 第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD

// 输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集

// 我们这里呢,因为是本地测试,所以呢,就是针对本地文件

// SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法

// 在Java中,创建的普通RDD,都叫做JavaRDD

// 在这里呢,RDD中,有元素这种概念,如果是hdfs或者本地文件呢,创建的RDD,每一个元素就相当于

// 是文件里的一行

JavaRDD<String> lines = sc.textFile("test.txt");

// 第四步:对初始RDD进行transformation操作,也就是一些计算操作

// 通常操作会通过创建function,并配合RDD的map、flatMap等算子来执行

// function,通常,如果比较简单,则创建指定Function的匿名内部类

// 但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类

// 先将每一行拆分成单个的单词

// FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型

// 我们这里呢,输入肯定是String,因为是一行一行的文本,输出,其实也是String,只不过是多个放在集合中

// 这里先简要介绍flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

public Iterator<String> call(String line) throws Exception {

return Arrays.asList(line.split(" ")).iterator();

}

});

// 接着,需要将每一个单词,映射为(单词, 1)的这种格式

// 因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加

// mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素

// 如果大家还记得scala里面讲的tuple,那么没错,这里的tuple2就是scala类型,包含了两个值

// mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型

// 第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值的类型

// JavaPairRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型

JavaPairRDD<String, Integer> pairs = words.mapToPair(

new PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

public Tuple2<String, Integer> call(String word) throws Exception {

return new Tuple2<String, Integer>(word, 1);

}

});

// 接着,需要以单词作为key,统计每个单词出现的次数

// 这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作

// 比如JavaPairRDD中有几个元素,分别为(hello, 1) (hello, 1) (hello, 1) (world, 1)

// reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算

// 比如这里的hello,那么就相当于是,首先是1 + 1 = 2,然后再将2 + 1 = 3

// 最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value

// reduce之后的结果,相当于就是每个单词出现的次数

JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

// 第一与第二个参数为输入类型(为两个Tuple2的第二个元素类型),第三个为输出类型

new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

// 到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数

// 但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作

// 一个Spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action

// 接着,最后,可以使用一种叫做action操作的,比如说,foreach,来触发程序的执行

wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

private static final long serialVersionUID = 1L;

public void call(Tuple2<String, Integer> wordCount) throws Exception {

System.out.println(wordCount._1 + " : " + wordCount._2);

}

});

sc.close();

}

}

2.2、Scala版:

新建Scala工程,然后将其转换为Maven工程(右击工程--》Configure--》Covert to Maven Project),pom.xm文件与Java版一样

package sparkcore

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object WordCount {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("WordCount").setMaster("local")

val sc = new SparkContext(conf)

val lines = sc.textFile("test.txt", 1);

val words = lines.flatMap { line => line.split(" ") }

val pairs = words.map { word => (word, 1) }

val wordCounts = pairs.reduceByKey { _ + _ }

wordCounts.foreach(wordCount => println(wordCount._1 + " : " + wordCount._2 ))

}

}

时间: 2024-10-05 19:03:20

00、Wordcount的相关文章

[lushengduan]【基于Android N详解HelloWorld系列】00、简述和HelloWorld实现

想必大家对HelloWorld并不陌生,堪称"编程入门经典"!我们知道,HelloWorld是一个最简单的小程序,但是,要运行这个简单小程序,Android系统框架可做了不少事情,这涉及到AMS.PMS.WMS等各种系统服务,系统服务之间相互协作,有条不紊地完成应用程序的安装.运行等操作:网上也有很多文章对Android框架进行了分析,大部分讲得也很精彩.很透彻,但是,确实是这些资料有些老旧,因此,想围绕着HelloWorld,基于Android N(Android 7.0/7.1)整

00、生物芯片笔记前言

基因芯片主要分为双通道cDNA芯片和高密度寡核苷酸芯片. 双通道cDNA芯片:每个微阵列产生两个探针水平的数据集(红色通道和绿色通道). 高密度寡核苷酸芯片:每个微阵列产生一个探针水平的数据集.一些探针是匹配探针(Perfect match,PM),一些探针是错配探针(Mismatch,MM.不过有些芯片无MM探针). 本博客主要介绍高密度寡核苷酸芯片,以Affymetrix芯片为例.这些芯片有多种型号,如常见的Affymetrix 3’ -biased Arrays 有HG_U95Av2.HG

00、Java概述(Dos命令、JDK1.8环境配置、第三方记事本、MarkDown语法)

1.基本的Windows快捷键和常用DOS命令 1.1.Windows快捷键 1.2.DOS命令 1.3.计算机语言发展史 2.Jdk1.8的安装.环境配置 2.1.Jdk1.8的下载安装 2.1.Jdk1.8的环境配置 2.3.JDK.JRE.JVM 3.HelloWorld和第三方记事本 3.1.HelloWorld程序 3.2.注释 3.3.EditPlus和Notepad++的安装 4.关键字.标识符 4.1.关键字 4.2.标识符 5.MarkDown语法 1.基本Windows快捷键

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

[转载]关于标准电阻阻值的说明(E6、E12、E24、E48、E96、E192)

关于电容电阻的使用,只知道有一些所谓的典型值,有助于选型,却不知道这些值是怎么来的.今天查了一下,才发现所谓的“E”规格. “ E ”表示“指数间距”(Exponential Spacing) 在上个20世纪的电子管时代,电子元器件厂商为了便于元件规格的管理和选用.大规模生产的电阻符合标准化的要求,同时也为了使电阻的规格不致太多,协商采用了统一的标准组成元件的数值. 它的基础是宽容一部定的误差,并以指数间距为标准规格. 这种标准已在国际上广泛采用,这一系列的阻值就叫做电阻的标称阻值.      

第十四天、保护模式开启

loader的任务有两个:开启保护模式,将核心 kernel 载入内存.保护模式照抄上次的代码就行, kernel 嘛,先读个文本文件到内存,然后显示出来--检查下效果就行. 常量里加上 kernel.bin 载入内存的段基址 ; Constant.inc ; 常量 ; 四彩 ; 2015-11-17 %ifndef _CONSTANT_INC %define _CONSTANT_INC ; ====================================================

[PCB制作] 1、记录一个简单的电路板的制作过程——四线二项步进电机驱动模块(L6219)

前言 现在,很多人手上都有一两个电子设备,但是却很少有人清楚其中比较关键的部分(PCB电路板)是如何制作出来的.我虽然懂点硬件,但是之前设计的简单系统都是自己在万能板上用导线自己焊接的(如下图左),复杂的都是模块拼接的(如下图右):      工作中原理图和PCB也有专门的工程师来制作,因此我对这一块了解比较少.而最近闲来无事,又因为手头上确实少一个四线二项步进电机驱动模块.起初是在淘宝上找了很久才找到一个适合的,结果实验了一下午还是不行:又考虑自己在万能板上焊接,可是发现该模块外围需要10个左

Redis 一、数据结构与对象--五大数据类型的底层结构实现

redis上手比较简单,但是它的底层实现原理一直很让人着迷.具体来说的话,它是怎么做到如此高的效率的?阅读Redis设计与实现这本书,可以很好的理解Redis的五种基本类型:String,List,Hash,Set,ZSet是如何在底层实现的.还可以了解Redis的其他机制的原理.我们现在来看看Redis中的基本的数据结构吧. 简单动态字符串 Redis的简单动态字符串,通常被用来存储字符串值,几乎所有的字符串类型都是SDS的.另外,SDS还常被用作缓冲区(buffer):AOF模块中的AOF缓

JStorm第一个程序WordCount详解

一.Strom基本知识(回顾) 1,首先明确Storm各个组件的作用,包括Nimbus,Supervisor,Spout,Bolt,Task,Worker,Tuple nimbus是整个storm任务的管理者,并不实际进行工作.负责在集群中分发代码,对节点分配任务,并监视主机故障. supervisor是实际进行工作的节点,负责监听工作节点上已经分配的主机作业,启动和停止Nimbus已经分配的工作进程. Worker是具体处理Spout/Bolt逻辑的进程,worker数量由拓扑中的conf.s