Hadoop日记Day16---命令行运行MapReduce程序

一、代码编写

1.1 单词统计

  回顾我们以前单词统计的例子,如代码1.1所示。

 1 package counter;
 2
 3 import java.net.URI;
 4
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
22     static final String OUT_PATH = "hdfs://hadoop:9000/out";
23
24     public static void main(String[] args) throws Exception {
25
26         Configuration conf = new Configuration();
27
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }
34
35         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
36
37         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
38
39         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
40
41         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
43         job.setMapOutputValueClass(LongWritable.class);
44
45         job.setPartitionerClass(HashPartitioner.class);//1.3 分区
46         job.setNumReduceTasks(1);//有一个reduce任务运行
47
48         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
49         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
50         job.setOutputValueClass(LongWritable.class);
51
52         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里
53
54         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
55
56         job.waitForCompletion(true);//把job提交给JobTracker运行
57     }
58
59     /**
60      * KEYIN    即k1        表示行的偏移量
61      * VALUEIN    即v1        表示行文本内容
62      * KEYOUT    即k2        表示行中出现的单词
63      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
64      */
65     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
66         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
67         //    final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
68
69             final String line = v1.toString();
70         /*    if(line.contains("hello")){
71                 //记录敏感词出现在一行中
72                 helloCounter.increment(1L);
73             }*/
74             final String[] splited = line.split(" ");
75             for (String word : splited) {
76                 context.write(new Text(word), new LongWritable(1));
77             }
78         };
79     }
80
81     /**
82      * KEYIN    即k2        表示行中出现的单词
83      * VALUEIN    即v2        表示行中出现的单词的次数
84      * KEYOUT    即k3        表示文本中出现的不同单词
85      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
86      *
87      */
88     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
89         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
90             long times = 0L;
91             for (LongWritable count : v2s) {
92                 times += count.get();
93             }
94             ctx.write(k2, new LongWritable(times));
95         };
96     }
97
98 }

代码 1.1

  分析上面代码,我们会发现该单词统计方法的输入输出路径都已经写死了,比如输入路径:INPUT_PATH = "hdfs://hadoop:9000/hello"输出路径:OUT_PATH = "hdfs://hadoop:9000/out"。这样一来,这个算法的输入出路径也就固定死了,想要使用这个算法,相应的数据就必须满足这个固定的路径要求,从而算法的灵活性和可操作性也就大大降低了,也就是说我们的算法,目前还不算是一个通用的算法。所以为了提高算法灵活性和可操作性,应该通过指令运行时参数来指定输入输出路径。

1.2 在命令行运行的单词统计

  在命令行运行的单词统计程序,如代码1.2所示。

  1 package cmd;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 19 import org.apache.hadoop.util.Tool;
 20 import org.apache.hadoop.util.ToolRunner;
 21
 22 public class WordCountApp extends Configured implements Tool{
 23     static String INPUT_PATH = "";
 24     static String OUT_PATH = "";
 25
 26     @Override
 27     public int run(String[] arg0) throws Exception {
 28         INPUT_PATH = arg0[0];
 29         OUT_PATH = arg0[1];
 30
 31         Configuration conf = new Configuration();
 32         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 33         final Path outPath = new Path(OUT_PATH);
 34         if(fileSystem.exists(outPath)){
 35             fileSystem.delete(outPath, true);
 36         }
 37
 38         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
 39
 40         job.setJarByClass(WordCountApp.class);//打包运行必须执行的秘密方法
 41         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
 42         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
 43
 44         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
 45         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
 46         job.setMapOutputValueClass(LongWritable.class);
 47
 48
 49         job.setPartitionerClass(HashPartitioner.class);//1.3 分区
 50         job.setNumReduceTasks(1);//有一个reduce任务运行
 51
 52
 53         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
 54         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
 55         job.setOutputValueClass(LongWritable.class);
 56
 57         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里
 58         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
 59
 60         job.waitForCompletion(true);//把job提交给JobTracker运行
 61         return 0;
 62     }
 63
 64     public static void main(String[] args) throws Exception {
 65         ToolRunner.run(new WordCountApp(), args);
 66     }
 67
 68     /**
 69      * KEYIN    即k1        表示行的偏移量
 70      * VALUEIN    即v1        表示行文本内容
 71      * KEYOUT    即k2        表示行中出现的单词
 72      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
 73      */
 74     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 75         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 76             final String[] splited = v1.toString().split("\t");
 77             for (String word : splited) {
 78                 context.write(new Text(word), new LongWritable(1));
 79             }
 80         };
 81     }
 82
 83     /**
 84      * KEYIN    即k2        表示行中出现的单词
 85      * VALUEIN    即v2        表示行中出现的单词的次数
 86      * KEYOUT    即k3        表示文本中出现的不同单词
 87      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
 88      *
 89      */
 90     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 91         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
 92             long times = 0L;
 93             for (LongWritable count : v2s) {
 94                 times += count.get();
 95             }
 96             ctx.write(k2, new LongWritable(times));
 97         };
 98     }
 99
100
101 }

代码 1.2

  在编写能够在命令行运行的单词统计程序时,我们的类要继承Configured类实现Tool接口,实现Tool接口就要添加一个run()方法。在run()方法中执行我们原来在Main()方法中运行的配置代码。然而run方法如何运行呢?那就要在Main方法中调用run方法,调用方式如代码1.3所示。

1 public static void main(String[] args) throws Exception {
2         ToolRunner.run(new WordCountApp(), args);
3     }

代码 1.3

  我们看一下run方法的参数,ToolRunner.run(Tool tool, String[] args),第一参数为Tool接口,我们知道该程序的类就是Tool的实现类所以我们可以,用该程序类的对象来作为参数。他的第二个参数,是一个字符数组args。在这里我们先讲一下,main函数的args参数。这个参数是运行程序前给它的参数。如果你在你程序要用这个参数的话,就需要在运行前指定。比如一个打印helloworld的程序如下:

public class HelloWorld{
    public static void main(String[] args) {
        System.out.println(args[0]);
    }
}

  执行命令java HelloWorld ceshi ceshi1 ceshi2,那么在HelloWorld的main方法里面 args就是{"ceshi", "ceshi1", "ceshi2"},打印的结果就是creshi。

  经过对main方法的分析,我应该就知道了,run方法的第二个参数就应该是main函数的参数,这样就能够接受命令行所指定的参数了。那么既然输入输出路径由运行时的命令行的参数指定,那么就不需要在代码中指定路径了,所以将INPUT_PATH和OUT_PATH初始化为空。然后在run方法中通过由命令行传过来的参数来进行赋值,如下所示。

INPUT_PATH = arg0[0];//表示输入路径
OUT_PATH = arg0[1];//表示输出路径

  而为了我们的程序能够在命令行运行,必须添加“job.setJarByClass(WordCountApp.class);”代码,表示我们的程序以打包的方式运行。

二、运行方式

2.1 将程序以.jar类型导出到桌面

<1> 选择WordCountApp右击选择Export,如图2.1所示。

图 2.1

<2>  选择JAR file,选择Next,如图2.2所示。

图 2.2

<3> 选择Next后,弹出如下界面,如图2.3,再次选择Next。

图 2.3

<4> 选择Next之后,弹出如图2.4的界面,选择Browse。

图 2.4

<5> 选择Browse后,在弹出的界面选择ok,如图2.5所示。

图 2.5

<6> 选择Ok后,直接选择finish即可,如图2.6所示。

图 2.6

2.2 将jar包传到Linux

  使用WinScp将程序的jar包传到Linux,如图2.7所示。

图 2.7

2.3 在Linux命令行执行jar包

2.3.1 创建输入输出路径

  执行命令:

hadoop fs -mkdir /input
hadoop fs -mkdir /output

2.3.2 编写、上传file文件

  执行命令:vi file1

  输入内容:

        hello    word
        hello    me

  执行命令:hadoop fs -put file1 /

2.3.3 执行程序

执行命令:hadoop jar jar.jar hdfs://hadoop:9000/input  hdfs: //hadoop:9000/output

运行过程:

14/09/28 20:08:08 WARN mapred.JobClient: Use GenericOptionsParser for parsi                                                                                             ng the arguments. Applications should implement Tool for the same.
14/09/28 20:08:09 INFO input.FileInputFormat: Total input paths to process                                                                                              : 1
14/09/28 20:08:09 INFO util.NativeCodeLoader: Loaded the native-hadoop libr                                                                                             ary
14/09/28 20:08:09 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/28 20:08:11 INFO mapred.JobClient: Running job: job_201409281916_0001
14/09/28 20:08:12 INFO mapred.JobClient:  map 0% reduce 0%
14/09/28 20:09:03 INFO mapred.JobClient:  map 100% reduce 0%
14/09/28 20:09:14 INFO mapred.JobClient:  map 100% reduce 100%
14/09/28 20:09:14 INFO mapred.JobClient: Job complete: job_201409281916_0001
14/09/28 20:09:14 INFO mapred.JobClient: Counters: 29
14/09/28 20:09:14 INFO mapred.JobClient:   Job Counters
14/09/28 20:09:14 INFO mapred.JobClient:     Launched reduce tasks=1
14/09/28 20:09:14 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=47675
14/09/28 20:09:14 INFO mapred.JobClient:     Total time spent by all reduces waiting after rese                                                                         rving slots (ms)=0
14/09/28 20:09:14 INFO mapred.JobClient:     Total time spent by all maps waiting after reservi                                                                         ng slots (ms)=0
14/09/28 20:09:14 INFO mapred.JobClient:     Launched map tasks=1
14/09/28 20:09:14 INFO mapred.JobClient:     Data-local map tasks=1
14/09/28 20:09:14 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10902
14/09/28 20:09:14 INFO mapred.JobClient:   File Output Format Counters
14/09/28 20:09:14 INFO mapred.JobClient:     Bytes Written=21
14/09/28 20:09:14 INFO mapred.JobClient:   FileSystemCounters
14/09/28 20:09:14 INFO mapred.JobClient:     FILE_BYTES_READ=67
14/09/28 20:09:14 INFO mapred.JobClient:     HDFS_BYTES_READ=116
14/09/28 20:09:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=105834
14/09/28 20:09:14 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=21
14/09/28 20:09:14 INFO mapred.JobClient:   File Input Format Counters
14/09/28 20:09:14 INFO mapred.JobClient:     Bytes Read=21
14/09/28 20:09:14 INFO mapred.JobClient:   Map-Reduce Framework
14/09/28 20:09:14 INFO mapred.JobClient:     Map output materialized bytes=67
14/09/28 20:09:14 INFO mapred.JobClient:     Map input records=2
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce shuffle bytes=67
14/09/28 20:09:14 INFO mapred.JobClient:     Spilled Records=8
14/09/28 20:09:14 INFO mapred.JobClient:     Map output bytes=53
14/09/28 20:09:14 INFO mapred.JobClient:     CPU time spent (ms)=35140
14/09/28 20:09:14 INFO mapred.JobClient:     Total committed heap usage (bytes)=131665920
14/09/28 20:09:14 INFO mapred.JobClient:     Combine input records=0
14/09/28 20:09:14 INFO mapred.JobClient:     SPLIT_RAW_BYTES=95
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce input records=4
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce input groups=3
14/09/28 20:09:14 INFO mapred.JobClient:     Combine output records=0
14/09/28 20:09:14 INFO mapred.JobClient:     Physical memory (bytes) snapshot=181952512
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce output records=3
14/09/28 20:09:14 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=752697344
14/09/28 20:09:14 INFO mapred.JobClient:     Map output records=4

执行结果:

[root@hadoop Downloads]# hadoop fs -ls /output
Found 3 items
-rw-r--r--   1 root supergroup          0 2014-09-28 20:09 /output/_SUCCESS
drwxr-xr-x   - root supergroup          0 2014-09-28 20:08 /output/_logs
-rw-r--r--   1 root supergroup         21 2014-09-28 20:09 /output/part-r-00000
[root@hadoop Downloads]# hadoop fs -cat /output/part-r-00000
hello   2
me      1
world   1
[root@hadoop Downloads]#
时间: 2024-09-29 04:21:33

Hadoop日记Day16---命令行运行MapReduce程序的相关文章

使用命令行运行Java程序

我现在身边有好多人在学习java,只要一提到学习java语言,我们马上想到的工具是eclipse,MyEclipse,NetBeans等等. 也许是我们用惯了vc,对集成开发环境有太多的依赖.但是,我个人觉得,对于一个开发人员有必要熟悉各种开发工具,更要尝试最原始的开发工具,即命令行. 同时我还发现用命令行更对java的语法有更深刻的理解,而不是仅仅停留在背诵记忆的层次上. 我们先介绍一下常用的命令: ①cd命令---切换目录:   e.g. cd src ②javac命令---编译java源程

Unbun命令行运行后台程序方法

假设要用命令运行easystroke程序,并且不依赖命令行进程,有如下2种方法: 法一: $easystroke & $exit 法二: $nohup easystroke 以上2种方法中,法一在手动广播shell窗口时,会使程序退出;法二在<ctrl-c>时,会使程序退出. 若想程序不受以上2种可能退出的操作影响,可如下使用: $nohup easystroke &

Java 命令行运行java程序,出现“找不到或无法加载主类 ”的注意事项

引用:http://blog.chinaunix.net/uid-27106528-id-5209914.html 要在CMD命令行中使用java 运行java程序,关于出现 “找不到或无法加载主类 ”错误的解决办法,   网络上基本都是在说关于"classpath"路径的配置问题,要加入 ” .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;“    前面加入”.;",表面要在当前目录下面寻找类.       如果你试过了

Ubuntu命令行运行C程序和C++程序

首先Ctrl + T 打开一个终端,cd到你建立C/C++文件的目录下. 下面以建立 helloc.c 和 hellocpp.cpp 进行演示 vim helloc.c 按 i 进入插入操作,然后写C代码: #include<stdio.h> int main() { printf("hello,world!\n"); return 0; } 按 Ctrl + C 停止插入操作,按 Shift + :后,输入 wq 回到终端界面. 下面运行 helloc.c 程序 : gc

从命令行及java程序运行MyBatis Generator 1.3.x生成MyBatis3.x代码

近期因为项目需要,调研了myBatis 3.x的使用,当然,顺便也就研究了一下使用Generator来通过逆向工程生成pojo,mapper等文件.使用这个工具之前,要先下载相关的jar包,我使用的是最新的mybatis-generator-core-1.3.2.jar.下面将generatorConfig.xml列出来: <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE generatorConf

使用Eclipse编译运行MapReduce程序 Hadoop2.6.0_Ubuntu/CentOS

文章来源:http://www.powerxing.com/hadoop-build-project-using-eclipse/ 使用Eclipse编译运行MapReduce程序 Hadoop2.6.0_Ubuntu/CentOS 本教程介绍的是如何在 Ubuntu/CentOS 中使用 Eclipse 来开发 MapReduce 程序,在 Hadoop 2.6.0 下验证通过.虽然我们可以使用命令行编译打包运行自己的MapReduce程序,但毕竟编写代码不方便.使用 Eclipse,我们可以

Java命令行运行参数说明大全(偷来的)

Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOME"bin"java –option 来启动,-option为虚拟机参数,JAVA_HOME为JDK安装路径,通过这些参数可对虚拟机的运行状态进行调整,掌握参数的含义可对虚拟机的运行模式有更深入理解. 一.         查看参数列表:虚拟机参数分为基本和扩展两类,在命令行中输入JAVA_HOME"bin"java 就可得到基本参数列表,在命令行输入J

dos命令下运行java程序(链接mysql为例子)

1 说明 使用了阿里云的Windows版的服务器,想在上面运行连接数据库的Java程序,然后就不知道,怎么运行起来.我是直接把eclipse中的文件拷到服务器上的.所有的.class 文件已经编译完成,只是不知道怎么在服务器上运行起来. 问题点1 :找不到要运行的类 问题点2 :找不到驱动,也就是无法运行导入的jar包 问题1的解决: –因为我的DBHelper文件中包含有package package mydatahelper; 所以运行时类名称则写为 mydatahelper.DBHelpe

命令行运行python

doctest:  python -m doctest XXX.py -m pdb:       python -m pdb myscript.py -o :        python -O XXX.py      不使用assert 查看版本: python -V 命令行运行程序: python -c 'print "hello" '