09、高级编程之基于排序机制的wordcount程序

package sparkcore.java;

import java.util.Arrays;

import java.util.Iterator;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**

* 排序的wordcount程序:根据单词出现的次数进行排序

*/

public class SortWordCount {

public static void main(String[] args) {

// 创建SparkConf和JavaSparkContext

SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 创建lines RDD

JavaRDD<String> lines = sc.textFile("test.txt");

// 执行我们之前做过的单词计数

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

@Override

public Iterator<String> call(String t) throws Exception {

return Arrays.asList(t.split(" ")).iterator();

}

});

JavaPairRDD<String, Integer> pairs = words.mapToPair(

new PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, Integer> call(String t) throws Exception {

return new Tuple2<String, Integer>(t, 1);

}

});

JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

// 到这里为止,就得到了每个单词出现的次数

// 但是,问题是,我们的新需求,是要按照每个单词出现次数的顺序,降序排序

// wordCounts RDD内的元素是什么?应该是这种格式的吧:(hello, 3) (you, 2)

// 我们需要将RDD转换成(3, hello) (2, you)的这种格式,才能根据单词出现次数进行排序把!

// 进行key-value的反转映射

JavaPairRDD<Integer, String> countWords = wordCounts.mapToPair(

new PairFunction<Tuple2<String, Integer>, Integer, String>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {

return new Tuple2<Integer, String>(t._2, t._1);

}

});

// 按照key进行排序。注:其实可以使用sortBy()函数来根据自定义排序规则来进行排序,而不用像这里在排序前后进行Key与Value对调

JavaPairRDD<Integer, String> sortedCountWords = countWords.sortByKey(false);

// 再次将value-key进行反转映射

JavaPairRDD<String, Integer> sortedWordCounts = sortedCountWords.mapToPair(

new PairFunction<Tuple2<Integer, String>, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {

return new Tuple2<String, Integer>(t._2, t._1);

}

});

// 到此为止,我们获得了按照单词出现次数排序后的单词计数

// 打印出来

sortedWordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

private static final long serialVersionUID = 1L;

@Override

public void call(Tuple2<String, Integer> t) throws Exception {

System.out.println(t._1 + " : " + t._2);

}

});

// 关闭JavaSparkContext

sc.close();

}

}

package sparkcore.scala

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object SortWordCount {

def main(args: Array[String]) {

val conf = new SparkConf()

.setAppName("SortWordCount")

.setMaster("local")

val sc = new SparkContext(conf)

val lines = sc.textFile("test.txt", 1)

val words = lines.flatMap { line => line.split(" ") }

val pairs = words.map { word => (word, 1) }

val wordCounts = pairs.reduceByKey(_ + _)

val countWords = wordCounts.map(wordCount => (wordCount._2, wordCount._1))

val sortedCountWords = countWords.sortByKey(false)

val sortedWordCounts = sortedCountWords.map(sortedCountWord => (sortedCountWord._2, sortedCountWord._1))

sortedWordCounts.foreach(sortedWordCount => println(sortedWordCount._1 + " : " + sortedWordCount._2))

}

}

时间: 2024-11-08 15:40:55

09、高级编程之基于排序机制的wordcount程序的相关文章

《C#高级编程》读书笔记

<C#高级编程>读书笔记 C#类型的取值范围 名称 CTS类型 说明 范围 sbyte System.SByte 8位有符号的整数 -128~127(−27−27~27−127−1) short System.Int16 16位有符号的整数 -32 768~32 767(−215−215~215−1215−1) int System.Int32 32位有符号的整数 -2 147 483 648~2 147 483 647(−231−231~231−1231−1) long System.Int

Linux C高级编程——网络编程基础(1)

Linux高级编程--BSD socket的网络编程 宗旨:技术的学习是有限的,分享的精神是无限的. 一网络通信基础 TCP/IP协议簇基础:之所以称TCP/IP是一个协议簇,是由于TCP/IP包括TCP .IP.UDP.ICMP等多种协议.下图是OSI模型与TCP/IP模型的对照.TCP/IP将网络划分为4层模型:应用层.传输层.网络层和网络接口层(有些书籍将其分为5层,即网络接口层由链路层和物理层组成) (1)网络接口层:模型的基层.负责数据帧的发送已接收(帧是独立的网络信息传输单元).网络

oracle sql 高级编程 历史笔记整理

20130909 周一 oracle sql 开发指南 第7章 高级查询 1.层次化查询select level,ttt.*,sys_connect_by_path(ttt.col1,',') from ttt start with .. Connect by prior -因为先建立树,再进行where过滤的.在where中过滤和在cooonect by中过滤是不一样的. 2.rollup cube高级查询 select grouping(col1) .. From ttt group by

《Hadoop高级编程》之为Hadoop实现构建企业级安全解决方案

本章内容提要 ●    理解企业级应用的安全顾虑 ●    理解Hadoop尚未为企业级应用提供的安全机制 ●    考察用于构建企业级安全解决方案的方法 第10章讨论了Hadoop安全性以及Hadoop中用于提供安全控制的机制.当构建企业级安全解决方案(它可能会围绕着与Hadoop数据集交互的许多应用程序和企业级服务)时,保证Hadoop自身的安全仅仅是安全解决方案的一个方面.各种组织努力对数据采用一致的安全机制,而数据是从采用了不同安全策略的异构数据源中提取的.当这些组织从多个源获取数据,接

Linux C高级编程——网络编程(1)

Linux高级编程--BSD socket的网络编程 宗旨:技术的学习是有限的,分享的精神的无限的. 一网络通信基础 TCP/IP协议簇基础:之所以称TCP/IP是一个协议簇,是因为TCP/IP包含TCP .IP.UDP.ICMP等多种协议.下图是OSI模型与TCP/IP模型的对比,TCP/IP将网络划分为4层模型:应用层.传输层.网络层和网络接口层(有些书籍将其分为5层,即网络接口层由链路层和物理层组成) (1)网络接口层:模型的基层,负责数据帧的发送已接收(帧是独立的网络信息传输单元).网络

读《C#高级编程》第1章

读<C#高级编程>第1章 .Net机构体系笔记 网红的话:爸爸说我将来会是一个牛逼的程序员,因为我有一个梦,虽然脑壳笨但是做事情很能坚持. 本章主要是了解.Net的结构,都是一些概念,并没有可操作的代码.但是这些概念有联系,但深刻理解必须要有全局思维和实践基础. 目录: C#与.Net的关系 公共语言运行库(CLR) 中间语言 程序集 .NET Framework类 名称空间(命名空间):避免类名冲突的一种方式. 用C#创建.Net应用程序(介绍一堆产品概念) C#在.Net企业体系结构中的重

python高级编程:有用的设计模式1

# -*- coding: utf-8 -*-__author__ = 'Administrator'#python高级编程:有用的设计模式#设计械是可复用的,某种程序上它对软件设计中觉问题提供的语言相关解决识方案,最近最流行的书籍:"""gamma.heim.johson和vlissides a.k.a"四人组(gof)"编写的elements of reusable object-oriented software(中文:<设计模式:可复用面向对

python高级编程之迭代器与生成器

# -*- coding: utf-8 -*- # python:2.x __author__ = 'Administrator' #迭代器与生成器 #--------------------------------------- #迭代器基于2个方法 """ next:返回容器下一个项目 __iter__:返回迭代器本身 """ #通过内建函数和序列来创建 i=iter('abc') print i.next()#a print i.next(

iOS网络高级编程:iPhone和iPad的企业应用开发之错误处理

本章内容 ●    iOS应用中的网络错误源 ●    检测网络的可达性 ●    错误处理的经验法则 ●    处理网络错误的设计模式 到目前为止,我们所介绍的iPhone与其他系统的网络交互都是基于一切正常这个假设.本章将会放弃这个假设,并深入探究网络的真实世界.在真实世界中,事情是会出错的,有时可能是非常严重的错误:手机进入与离开网络.包丢掉或是延迟:网络基础设施出错:偶尔用户还会出错.如果一切正常,那么编写iOS应用就会简单不少,不过遗憾的是现实并非如此.本章将会探讨导致网络操作失败的几