Spark 编程模型(中)

先在IDEA新建一个maven项目

我这里用的是jdk1.8,选择相应的骨架

这里选择本地在window下安装的maven

新的项目创建成功

我的开始pom.xml文件配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.gong.spark</groupId>
  <artifactId>learning-spark</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.10.4</scala.version>
    <spark.version>1.6.1</spark.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
    <!--spark-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>

  <build>
    <!--
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        -->
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.1</version>
        <executions>
          <!-- Run shade goal on package phase -->
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <!-- add Main-Class to manifest file -->
                <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <!--<mainClass>com.dajiang.MyDriver</mainClass>-->
                </transformer>
              </transformers>
              <createDependencyReducedPom>false</createDependencyReducedPom>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

先在终端下试下打包

可以看到失败了!!!

把这几个生成默认的东西删除掉

再次测试

成功了

把他clean一下

进入自己在虚拟机安装的centos里面的spark

先测试一下spark环境有没有问题

OK运行成,环境没问题!

1.创建RDD

从集合创建RDD

回到idea,在main路径下新建java目录,并且对其以下操作:

在test路径下新建java目录,对其以下操作:

建包

在当前包下起个名字,其实也就是在这个包的路径下再建下一级目录

因为我们现在要写的是java程序,所以新建一个java类

写个简单的程序测下运行一下

OK没问题,可以运行!

把这里的这个插件由原来的1.5改成1.8,因为刚刚跑的时候有警告

在这里新建一个包,具体怎么操作这里就不重复了

插入之前写好的MyJavaWordCount.java的代码

MyJavaWordCount.java参考代码

package com.gong.spark.chap2_3;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

public class MyJavaWordCount {
     public  static  void main(String[] args){
         //参数检查
  if(args.length<2) {
      System.err.println("Usage:MyJavaCount <input> <output>");
      System.exit(1);
  }

         //获取参数
     String input=args[0];
     String output=args[1];

         //创建java版本的SparkContext
         SparkConf conf=new SparkConf().setAppName("MyJavaWordCount");
         JavaSparkContext sc = new JavaSparkContext(conf);

         //读取数据
         JavaRDD<String> inputRdd=sc.textFile(input);

         //进行相关计算
        JavaRDD<String> words=inputRdd.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairRDD<String,Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() {
             public Tuple2<String,Integer> call(String word) throws Exception{
                 return new Tuple2(word,1);
             }
         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
             public Integer call(Integer x, Integer y) throws Exception {
                 return x+y;
             }
         });

         //保存结果
        result.saveAsTextFile(output);

         //关闭sc
     sc.stop();
     }
}
时间: 2024-12-24 01:16:23

Spark 编程模型(中)的相关文章

Spark机器学习:Spark 编程模型及快速入门

http://blog.csdn.net/pipisorry/article/details/52366356 Spark编程模型 SparkContext类和SparkConf类 我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象.val sc = new SparkContext("local[4]", "Test Spark App") 这段代码会创建一个4线程的 SparkContext 对象,并将其相应的任务命

Spark入门实战系列--3.Spark编程模型(上)--概念及SparkShell实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark编程模型 1.1 术语定义 应用程序(Application): 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor: 驱动程序(Driver Program):运行Application的main()函数并且创建SparkContext,通常用SparkContext代表Driver Program: 执行单元(Executor): 是为某

Spark入门实战系列--3.Spark编程模型(下)--IDEA搭建及实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 安装IntelliJ IDEA IDEA 全称 IntelliJ IDEA,是java语言开发的集成环境,IntelliJ在业界被公认为最好的java开发工具之一,尤其在智能代码助手.代码自动提示.重构.J2EE支持.Ant.JUnit.CVS整合.代码审查. 创新的GUI设计等方面的功能可以说是超常的.IDEA是JetBrains公司的产品,这家公司总部位于捷克共和国的首都布拉格,开发人员以严谨

Spark编程模型(核心篇 一)

目录 RDD概述 RDD实现 RDD运行流程 RDD分区 RDD操作分类 RDD编程接口说明 一.RDD概述 RDD:是Resilient distributed datasets的简称,中文为弹性分布式数据集;是Spark最核心的模块和类 DAG: Spark将计算转换为一个有向无环图(DAG)的任务集合,通过为RDD提供一种基于粗粒度变换(如map, filter, join等)的接口 RDD类型:mappedRDD, SchemaRDD RDD操作分类:转换操作(又分为创建操作.转换操作)

Spark编程模型及RDD操作

转载自:http://blog.csdn.net/liuwenbo0920/article/details/45243775 1. Spark中的基本概念 在Spark中,有下面的基本概念.Application:基于Spark的用户程序,包含了一个driver program和集群中多个executorDriver Program:运行Application的main()函数并创建SparkContext.通常SparkContext代表driver programExecutor:为某App

Spark 编程模型(上)

初识RDD 什么是RDD? 定义:Resilient distributed datasets (RDD), an efficient, general-purpose and fault-tolerant abstraction for sharing data in cluster applications. RDD 是只读的. RDD 是分区记录的集合. RDD 是容错的.--- lineage RDD 是高效的. RDD 不需要物化.---物化:进行实际的变换并最终写入稳定的存储器上 R

Spark 编程模型(下)

创建Pair RDD 什么是Pair RDD 创建Pair RDD Pair RDD的转化操作 Pair RDD的转化操作1 在xshell启动

简单地使用线程之一:使用异步编程模型

.NetFramework的异步编程模型从本质上来说是使用线程池来完成异步的任务,异步委托.HttpWebRequest等都使用了异步模型. 这里我们使用异步委托来说明异步编程模型. 首先,我们来明确一下,对于多线程来说,我们需要关注哪些问题. “线程是一段执行中的代码流”,从这句话中,我们可以关注这段代码流何时开始执行.何时结束.从主线程如何传递参数至从子线程.从子线程如何返回结果至主线程? 问题1:在异步编程模型中,子线程何时开始执行? 对于异步编程模型来说,使用BeginXXX来开始执行线

CUDA编程模型

p { margin-bottom: 0.1in; direction: ltr; color: #00000a; line-height: 120%; text-align: justify; orphans: 0; widows: 0 } p.western { font-family: "Calibri", serif; font-size: 10pt } p.cjk { font-family: ; font-size: 10pt } p.ctl { font-family: