通过SDK提交MapReduce作业

大数据计算服务(MaxCompute)

快速、完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。

了解更多

通过SDK提交MR作业的步骤如下:

步骤一:
      编写MR程序,导出jar包,jar包可以不包含main方法(main方法是在本地执行)
        
步骤二:
       上传jar包及所需的资源
       (1) 通过console上传jar包到server端: add jar xxx.jar
       (2)也可以通过SDK写程序上传,参考相关方法:com.aliyun.odps.ODPS.resources().create(xxx,xxx)

步骤三:

对main方法进行改进 ,主要包括两部分:
   (1)设置账户信息(accessId/accessKey/endpoint),充当console/conf/odps_conf.ini中的配置功能
   (2)设置MR中使用的资源,充当jar -resources xxx1.jar,xxx2.jar的功能
           通过方法job.setResources("test13.jar");设置
 
注:本地用户Mapper类和Reducer类方法是空的(本地并不会执行这份代码),存在的目的是保证main方法编译通过

package com.aliyun.odps.examples.mr;

import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.conf.SessionState;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;

/*
 * 该示例展示了MapReduce程序中的基本结构
 *
 */
public class WordCount {

  public static class TokenizerMapper extends MapperBase {
  }

  /**
   * A combiner class that combines map output by sum them.
   */
  public static class SumCombiner extends ReducerBase {
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class SumReducer extends ReducerBase {

  }

  public static void main(String[] args) throws Exception {

    // /////////////额外添加的代码//////////
    String endpoint = "your_endpoint";
    String accessId = "your_access_id";
    String accessKey = "your_access_key";
    String project = "your_project";

    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setDefaultProject(project);
    odps.setEndpoint(endpoint);

    SessionState.get().setOdps(odps);
    SessionState.get().setLocalRun(false);
    // ///////////////////////////////

    JobConf job = new JobConf();
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class);
    job.setReducerClass(SumReducer.class);

    // /////////////额外添加的代码//////////
    // 资源名称列表,多个资源用逗号分隔
    job.setResources("test13.jar");
    // //////////////////////////////////

    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));

    InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(), job);
    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);

    RunningJob rj = JobClient.runJob(job);
    rj.waitForCompletion();
  }

}

阅读原文请点击

时间: 2024-10-29 19:12:07

通过SDK提交MapReduce作业的相关文章

Hadoop之 - 剖析 MapReduce 作业的运行机制(MapReduce 2)

在0.20版本及更早期的系列中,mapred.job.tracker 决定了执行MapReduce程序的方式.如果这个配置属性被设置为local(默认值),则使用本地的作业运行器.运行器在耽搁JVM上运行整个作业.它被设计用来在小的数据集上测试和运行MapReduce程序. 如果 mapred.job.tracker 被设置为用冒号分开的主机和端口对(主机:端口),那么该配置属性就被解释为一个jobtracker地址,运行器则将作业提交给该地址的jobtracker. Hadoop 2.x引入了

剖析MapReduce 作业运行机制

包含四个独立的实体: ·  Client Node 客户端:编写 MapReduce代码,配置作业,提交MapReduce作业. ·  JobTracker :初始化作业,分配作业,与 TaskTracker通信,协调整个作业的运行. jobtracker是一个Java 应用程序,它的主类是 JobTracker. ·  TaskTracker :保持与 JobTracker通信,在分配的数据片段上执行 Map或Reduce 任务.tasktracker是 Java应用程序,它的主类是TaskT

MapReduce作业的map task和reduce task调度参数

MapReduce作业可以细分为map task和reduce task,而MRAppMaster又将map task和reduce task分为四种状态: 1.pending:刚启动但尚未向resourcemanager发送资源请求: 2.scheduled:已经向resourceManager发送资源请求,但尚未分配到资源: 3.assigned:已经分配到了资源且正在运行: 4.completed:已经运行完成. map task的生命周期为:scheduled -> assigned -

MapReduce作业运行第三方配置文件的共享方法

其实MapReduce作业运行第三方配置文件的共享方法往小了说其实就是参数在MapReduce作业中的传递,往大了说其实就是DistributedCache的应用. 在MapReduce中传递参数普遍用Configuration,Configuration是一个键值对,将所需的参数值表示成键值对(键值对为字符串类型),调用Configuration的set方法就保存进去了,用的时候调用get方法. 这是最基础的,在工作中难免遇到一些特殊的情况,比如,如何传递一个对象型参数?当你的MapReduc

Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)

v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了四件事: 1.通过设置作业Job的成员变量setupProgress为1,标记作业setup已完成: 2.调度作业Job的Map Task: 3.调度作业的JobReduce Task: 4.如果没有task了,则生成J

Hadoop学习三十二:Win7下无法提交MapReduce Job到集群环境

一. 对hadoop eclipse plugin认识不足 http://zy19982004.iteye.com/blog/2024467曾经说到我最hadoop eclipse plugin作用的认识.但事实上我犯了一个错误,Win7 Eclipse里的MyWordCount程序一直在本地运行,没有提交到集群环境上运行(查看192.168.1.200:50030)没有这个Job.运行方式分为两种,右键Run As Java Application Run on Hadoop 如果说Run A

[转]hadoop运行mapreduce作业无法连接0.0.0.0/0.0.0.0:10020

14/04/04 17:15:12 INFO mapreduce.Job:  map 0% reduce 0% 14/04/04 17:19:42 INFO mapreduce.Job:  map 41% reduce 0% 14/04/04 17:19:53 INFO mapreduce.Job:  map 64% reduce 0% 14/04/04 17:19:55 INFO mapreduce.Job:  map 52% reduce 0% 14/04/04 17:19:57 INFO 

Win7下无法提交MapReduce Job到集群环境(转)

一. 对hadoop eclipse plugin认识不足 http://zy19982004.iteye.com/blog/2024467曾经说到我最hadoop eclipse plugin作用的认识.但事实上我犯了一个错误,Win7 Eclipse里的MyWordCount程序一直在本地运行,没有提交到集群环境上运行(查看192.168.1.200:50030)没有这个Job.运行方式分为两种,右键Run As Java Application Run on Hadoop 如果说Run A

MapReduce 作业的生命周期

这个过程分为以下 5 个步骤: 步骤 1 作业提交与初始化.用户提交作业后,首先由 JobClient 实例将作业相关信 息,比如将程序 jar 包.作业配置文件.分片元信息文件等上传到分布式文件系统(一般为HDFS)上,其中,分片元信息文件记录了每个输入分片的逻辑位置信息.然后 JobClient 通过 RPC通知JobTracker.JobTracker收到新作业提交请求后,由作业调度模块对作业进 行初始化 :为作业创建一个JobInProgress对象以跟踪作业运行状况,而 obInPro