spark示例——WordCount修改版

java代码:

注:打包的时候一个依赖jar都不要。

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public final class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        if (args != null && args.length < 2) {
            System.err.println("Usage: JavaWordCount <inputfile> <output>");
            System.exit(1);
        }

        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = ctx.textFile(args[0], 1);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = -5362343744041430068L;

            @Override
            public Iterable<String> call(String s) {
                return Arrays.asList(SPACE.split(s));
            }
        });

        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 3338227964635666047L;

            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?, ?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }

        // output方式1:保存到文件中:
        //counts.saveAsTextFile(args[1]);

        // output方式2:保存到hdfs中:
        counts.saveAsHadoopFile(args[1], Text.class, IntWritable.class, TextOutputFormat.class);

        ctx.stop();
    }
}

spark on yarn 运行脚本:

#!/bin/sh

echo $SPARK_JAR
spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn-client --num-executors 3 --driver-memory 400m --executor-memory 500m --executor-cores 1 /usr/local/spark/spark-example-1.0.0-SNAPSHOT.jar hdfs://cluster1:9000/tmp/1.txt hdfs://cluster1:9000/out/spark/wordcount

执行结果:

时间: 2024-10-13 19:47:20

spark示例——WordCount修改版的相关文章

spark 的 wordcount

记录spark的Wordcount小程序: 前提:hdfs已经打开 创建一个name为wc.input的文件,上传到hdfs中的/user/hadoop/spark/中,内容如上图 [[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -put wc.input /user/hadoop/spark/            上传 [[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs

SevenZip.pas BUG修改版

本来用的是Henri Gourvest <[email protected]> 1.2版本 然后发现了2个问题: 1.对于文件名中带有空格的文件, 无法压缩, 原因是1488行, 压缩调用的是TStringList.Delimiter 来拆分文件字符串, 而空格是默认分行符, 导致文件名错误 2.解压缩函数, 如果目标文件已存在并且为只读属性时, 报错, 原因是1105行 创建文件流的时候直接使用了TFileStream.Create(path, fmCreate)导致 针对以上2个问题, 对

VB程序逆向反汇编常见的函数(修改版)

VB程序逆向常用的函数 1) 数据类型转换: a) __vbaI2Str    将一个字符串转为8 位(1个字节)的数值形式(范围在 0 至 255 之间) 或2 个字节的数值形式(范围在 -32,768 到 32,767 之间). b)__vbaI4Str   将一个字符串转为长整型(4个字节)的数值形式(范围从-2,147,483,6482,147,483,647) c)__vbar4Str  将一个字符串转为单精度单精度浮点型(4个字节)的数值形式 d)__vbar8Str   将一个字符

Spark的wordcount程序产生多少个RDD?

val rdd = sc.textFile("hdfs://Master.hdp:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collectrdd.saveAsTextFile("hdfs://Master.hdp:9000/out01") 思考:在spark的wordcount过程一共产生多少个RDD? 通过该命令(scala>

Delphi版的Base64转换函数(修改版)

Delphi版的Base64转换函数(修改版) 重新组织编写Delphi的MD2.MD4.MD5类

Android 仿美团网,大众点评购买框悬浮效果之修改版

我之前写了一篇关于美团网,大众点评的购买框效果的文章Android对ScrollView滚动监听,实现美团.大众点评的购买悬浮效果,我自己感觉效果并不是很好,如果快速滑动界面,显示悬浮框的时候会出现一卡的现象,有些朋友说有时候会出现两个布局的情况,特别是对ScrollView滚动的Y值得监听,我还使用了Handler来获取,还有朋友给我介绍了Scrolling Tricks这个东西,我下载试了下,确实美团网,大众点评的购买框用的是这种效果,但是Scrolling Tricks只能在API11以上

1.spark的wordcount解析

一.Eclipse(scala IDE)开发local和cluster (一). 配置开发环境 要在本地安装好java和scala. 由于spark1.6需要scala 2.10.X版本的.推荐 2.10.4,java版本最好是1.8.所以提前我们要需要安装好java和scala并在环境变量中配置好. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.html  打开ide新建scala project 点击file -

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

jsorder 第三方修改版 修正bug 增加总价

我主要运用这个jsorder,修正了它的不足//1.0版本bug:刷新页面 无法增加或者删除原来添加的商品//1.1版本:修正了1.0版本  新增bug 能够修改原来的商品 但出现产品数量为0 仍然保留在购物车中.//1.2版本:修正了1.1版本的产品为0 并且增加了购物车总价原文:代码一共6k,基于jquery的购物车实现,实现订单的本地cookie存储,支持购物车自定义样式,金额的计算.通过json与后台交互.实现可配置化的购物车系统,可应用于电子商务平台. ?1. [代码]使用代码