基于大数据开发套件定时调度带资源文件的MapReduce作业

MaxCompute里的MR作业,很少是只要跑一次就好了的。如果需要周期性调度,目前MaxCompute(原名ODPS)只提供了计算引擎,任务调度可以使用大数据开发套件来实现。这篇帖子从基础开始,介绍了3种周期性调度的方法。同时还介绍了如何使用资源文件。

代码开发

代码以文档里的WordCount 作为例子。
在这个基础上,增加资源文件的读取方法,修改Reduce类。主要的逻辑是读取资源文件,资源文件里的数据格式是字符串1,字符串2。代码逻辑是如果word count里的word如果有在字符串1里出现的话,就替换成字符串2。

    public static class SumReducer extends ReducerBase {
        private Record result = null;
        private Map<String,String> maps = null;

        @Override
        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
            maps = new HashMap<String,String>();
            StringBuilder importdata = new StringBuilder();
            BufferedInputStream bufferedInput = null;
            try {
                byte[] buffer = new byte[1024];
                int bytesRead = 0;
                //读取资源文件的内容
                bufferedInput = context.readResourceFileAsStream("resource.txt");

                while ((bytesRead = bufferedInput.read(buffer)) != -1) {
                    String chunk = new String(buffer, 0, bytesRead);
                    importdata.append(chunk);
                }
                //解析资源文件的内容,把替换前,替换后的数据放到map里
                String lines[] = importdata.toString().split("\n");
                for (int i = 0; i < lines.length; i++) {
                    String[] ss = lines[i].split(",");
                    maps.put(ss[0].trim(), ss[1].trim());
                    System.out.println(ss[0]+"->"+ss[1]);
                }
            } catch (FileNotFoundException ex) {
                throw new IOException(ex);
            } catch (IOException ex) {
                throw new IOException(ex);
            } finally {
            }
        }

        @Override
        public void reduce(Record key, Iterator<Record> values,
                TaskContext context) throws IOException {
            long count = 0;
            while (values.hasNext()) {
                Record val = values.next();
                count += (Long) val.get(0);
            }
            String value = key.get(0).toString();
            if(maps.containsKey(value)){
                System.out.println(value+"->"+maps.get(value));
                value = maps.get(value);

            }
            result.set(0, value);
            result.set(1, count);
            context.write(result);
        }
    }

具体资源文件的用法可以参考文档 ,这里就不再多解释了。

客户端调用

对于测试数据,源文件的内容为

odps,MaxCompute
hello,Hello

我们先用手工调度来跑这个MR,这里跑通了后后面的所有的配置就很容易明白了。
首先需要把代码打出的jar包,和这个resource.txt文件上传到服务器上

>add jar D:\cx_word_count.jar -f;
OK: Resource ‘cx_word_count.jar‘ have been updated.
>add file D:\resource.txt -f;
OK: Resource ‘resource.txt‘ have been updated.

然后通过命令行来调用

jar -resources cx_word_count.jar,resource.txt -classpath D:\cx_word_count.jar com.aliyun.odps.mr.WordCount;

这里的-resources引用的是跑在服务器上的,-classpath是用来找到main方法的。理解这个对后面配置同步任务很有帮助。可以参阅文档

Crontab调用

odpscmd客户端有一个参数,是-e,可以在shell里直接调用jar命令来跑MR,当然也可以使用odpscmd -f来再调用一个脚本文件,但是这样有点麻烦了。这里就直接用-e来做。

你可以先用

/odps/cmd/bin/odpscmd  -e "jar -resources cx_word_count.jar,resource.txt -classpath /odps/cx_word_count.jar com.aliyun.odps.mr.WordCount;"

在Linux服务器上运行任务。注意安装odpscmd配置前需要先配置好java环境。然后后面的Crontab的配置就不展开了。

MR作业

配置DataIDE的MR作业的界面,很容易就让人想到MR任务的main方法。其实就是DataIDE会根据配置自己生成main方法,然后去调用MaxCompute上的任务。具体的配置可以参考这个截图:

可以在右边看到可以配置任务的调度周期和上下游依赖,从而实现每天的定时调度,而且还能是保证上游的数据导入、预处理完成后才开始做MR操作,非常好用。

Shell任务

上述的MR任务简单方便,但是DataIDE出于安全考虑,不让用户自己写main方法。如果需要用到诸如传参数之类的功能,可以自己写Shell任务,但是调度让DataIDE来做。这样就集上面两个方法之长了。

Shell任务需要先参考文档 先配置调度的ECS信息,这里不再展开。完成后写一个Shell脚本,内容为

##@resource_reference{"cx_word_count.jar,resource.txt"}
/opt/taobao/tbdpapp/odpswrapper/odpsconsole/bin/odpscmd   -u  testid  -p  testkey  --project=testproject --endpoint=http://service.odps.aliyun.com/api  -e "jar -resources cx_word_count.jar,resource.txt  -classpath /odps/cx_word_count.jar  com.aliyun.odps.mr.WordCount"

要把里面的Access id/key,Project 替换成你自己的,然后开始测试代码。需要特别注意的是,** Shell任务是在机器上的admin账号下运行的** ,如果发现各种奇怪的错误,比如明明存在的文件找不到一类的错误,可以先su - admin,调试下Shell命令,或者访问下对应的文件,看看是否是环境变量,文件目录权限的问题。另外也可以把错误日志重定向到某个文件里,比如/tmp文件夹下的某个临时日志文件里,方便事后调试。大家可以在admin账号下把shell调试通过后再放到数加上去调用。

另外Shell任务可以调整调度的机器,可以参考

阅读原文请点击

时间: 2024-11-15 23:55:34

基于大数据开发套件定时调度带资源文件的MapReduce作业的相关文章

云享团——基于大数据开发套件的增量同步策略

免费开通大数据服务:https://www.aliyun.com/product/odps 转载自云享团 因为近期遇到用户在做ETL操作导入数据到MaxCompute的时候,对如何设置数据同步策略有疑惑,所以今天第一波我们来聊一下数据的同步策略,根据数据的特性,看看哪些数据适合增量同步,哪些适合全量同步,又是如何实现的?请认真看完下面的介绍,这些问题都不是事儿. 我们把需要同步的数据,根据数据写入后是否会发生变化分为:会变化的数据(人员表比如说,人员的状态会发生变化)和不会发生变化的数据(一般是

大数据开发初学者该怎么做?

经常有初学者在问,自己想往大数据方向发展,该学哪些技术,学习路线是什么样的,觉得大数据很火,就业很好,薪资很高.如果自己很迷茫,为了这些原因想往大数据方向发展,也可以,那么我就想问一下,你的专业是什么,对于计算机/软件,你的兴趣是什么?是计算机专业,对操作系统.硬件.网络.服务器感兴趣?是软件专业,对软件开发.编程.写代码感兴趣?还是数学.统计学专业,对数据和数字特别感兴趣.. 其实这就是想告诉你的大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘.请不要问

做了五年大数据开发工程师总结的的大数据学习路线

先扯一下大数据的4V特征: 数据量大,TB->PB 数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等: 商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来: 处理时效性高,海量数据的处理需求不再局限在离线计算当中. 现如今,正式为了应对大数据的这几个特点,开源的大数据框架越来越多,越来越强,先列举一些常见的: 文件存储:Hadoop HDFS.Tachyon.KFS 离线计算:Hadoop MapReduce.Spark 流式.实时计算:Storm

大数据开发学习路线整理

参考博客:做了五年大数据开发工程师总结的的大数据学习路线 大数据的4V特征: 1.        数据量大,TB->PB 2.        数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等: 3.        商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来: 4.        处理时效性高,海量数据的处理需求不再局限在离线计算当中. 常见的大数据的开源框架: l  文件存储:Hadoop HDFS.Tachyon.KFS l  离线计算:

大数据开发学习步骤

经常有初学者 问我,自己想往大数据方向发展,该学哪些技术,学习路线是什么样的,觉得大数据很火,就业很好,薪资很高.如果自己很迷茫,为了这些原因想往大数据方向发展,也可以,那么我就想问一下,你的专业是什么,对于计算机/软件,你的兴趣是什么?是计算机专业,对操作系统.硬件.网络.服务器感兴趣?是软件专业,对软件开发.编程.写代码感兴趣?还是数学.统计学专业,对数据和数字特别感兴趣. 其实这就是想告诉你的大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘.请不要问

数据仓库工程师、大数据开发工程师、BI工程师、ETL工程师之间有什么区别?

商务智能.商务智能工程师是商业智能行业的工程师.从需求分析师到数据仓库架构师.ETL工程师.数据分析工程师.报表开发工程师.数据挖掘工程师等,都可以称为BI工程师. ETL工程师:从事系统编程.数据库编程和设计,掌握各种常用编程语言的专业技术人员.也称为数据库工程师. 盲目地解释数据仓库的概念可能并不有趣.让我们从不同的角色开始. 老板:我是一家手机公司的老板.今天我要向董事会汇报.我将准备一份关于用户增长.用户保留率.用户活动以及过去三年中我手机中每个应用程序的使用率的报告.如果下面没有BI,

联合国“全球脉动”计划 《大数据开发:机遇与挑战》

联合国"全球脉动"计划发布<大数据开发:机遇与挑战>2012 年 5 月 29 日,联合国"全球脉动"( Global Pulse)计划发布<大数据开发:机遇与挑战>报告,阐述了各国特别是发展中国家在运用大数据促进社会发展方面所面临的历史机遇和挑战,并为正确运用大数据提出了策略建议.1. 引言技术创新和数字设备的普及带来了"数据的产业革命".对日益扩大的数字数据的分析将揭示关于集体行为的潜在联系,并有可能改进决策方式.大数

从0-1体验大数据开发

觉得裸用MaxCompute(原ODPS)门槛较高?想做数据开发,却苦于没有好的管理工具? 想体验下数加(阿里云大数据)推出的Data IDE,却苦于: 开通云账号—实名认证—购买MaxCompute—创建AK—创建/绑定项目, 经过5步,最后终于能开始体验了,是不是瞬间: 想从开通云账号后立即进入体验吗? 为此,我们提供了大数据开发免费体验环境,点击进入:https://data.aliyun.com/experience 我们还提供了使用教程,手把手教您基于MaxCompute做数据开发:点

Spark修炼之道(基础篇)——Linux大数据开发基础:第一节、Linux介绍、安装及使用初步

本节主要内容 Linux简史 Linux特点 Ubuntu Linux安装 Linux使用初步 1. Linux简史 要讲述大名鼎鼎的Linux,必然要先从UNIX系统谈起,下面这幅图给出了Unix系统的进化图: 图片来源:http://baike.baidu.com/link?url=QfoqWtWGs-BjpnfEy_AUk7Bm3XHuf6JbN92HCOoUBfFfj8BuSDkbwmldtmUEmGRDUwqsQMIV4jCKHvdkSPr3Lq 从进化图中可以看到,目前所有的主流操作