实现一个mapreduce的job

介绍

Hadoop安装好后,有人会想做一个mapreduce的job跑一跑,mapreduce其实是两个功能,一个是mapper,一个是reducer,废话不多说,现在开始。

正文

1 环境

1.1 部署hadoop

单机版即可,namenode,datanode,resourcemanager, nodemanager,secondnamenode都部署在同一台机器上。

创建hadoop用户

生成ssh公钥私钥,保证ssh localhost能通

配置文件core-site.xml

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-2.6.0/tmp</value>
    </property>

<property>
      <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
        </property>

</configuration>

配置文件hdfs-site.xml

<configuration>
<property>
             <name>dfs.namenode.name.dir</name>
                     <value>file:/opt/hadoop-2.6.0/hdfs/name</value>
                         </property>
    <property>
            <name>dfs.datanode.data.dir</name>
                    <value>file:/opt/hadoop-2.6.0/hdfs/data</value>
                        </property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

配置文件mapred-site.xml

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8000</value>
</property>
</configuration>

具体步骤清参见我的博客的文章:

linux上部署hadoop集群 基础篇

1.2 安装eclipse

这个自己想办法搞定吧,只要能启动就行了,这里就不一一赘述了。

2 写java程序

2.1 新建一个java project,再新建一个类:MaxTemperatureMapper

代码如下:

package mapreduce_maxtempature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends
 Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
String line = value.toString();
 String year = line.substring(15, 19);
 int airTemperature;
 if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus
 // signs
 airTemperature = Integer.parseInt(line.substring(88, 92));
 } else {
 airTemperature = Integer.parseInt(line.substring(87, 92));
 }
 String quality = line.substring(92, 93);
 if (airTemperature != MISSING && quality.matches("[01459]")) {
 context.write(new Text(year), new IntWritable(airTemperature));
 }
 }
}

2.2 再建一个类:MaxTemperatureReducer

代码如下:

package mapreduce_maxtempature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
 Context context)
 throws IOException, InterruptedException {
 int maxValue = Integer.MIN_VALUE;
 for (IntWritable value : values) {
 maxValue = Math.max(maxValue, value.get());
 }
 context.write(key, new IntWritable(maxValue));
}
}

2.3 再建一个类:MaxTemperatureDriver

代码如下:

package mapreduce_maxtempature;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*This class is responsible for running map reduce job*/
public class MaxTemperatureDriver extends Configured implements Tool{
public int run(String[] args) throws Exception
 {
 if(args.length !=2) {
 System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>");
 System.exit(-1);
 }
 @SuppressWarnings("deprecation")
Job job = new Job();
 job.setJarByClass(MaxTemperatureDriver.class);
 job.setJobName("Max Temperature");
 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job,new Path(args[1]));
 job.setMapperClass(MaxTemperatureMapper.class);
 job.setReducerClass(MaxTemperatureReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 System.exit(job.waitForCompletion(true) ? 0:1); 
 boolean success = job.waitForCompletion(true);
 return success ? 0 : 1;
 }
public static void main(String[] args) throws Exception {
 MaxTemperatureDriver driver = new MaxTemperatureDriver();
 int exitCode = ToolRunner.run(driver, args);
 System.exit(exitCode);
 }
}

最后一个调用ToolRunner来运行job。保证三个java程序都没有错误。

2.4 打包成jar包

如何打jar包,这里不再赘述,不会的请自己搞定。

2.5 生成sample.txt

新建一个文件sample.txt,内容如下:

0035227070999991902122213004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-01721+99999101221ADDGF107991999999999999999999MW1721
0029227070999991902122220004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-02001+99999101551ADDGF100991999999999999999999
0035227070999991902122306004+62167+030650FM-12+010299999V0201401N002119999999N0000001N9-01611+99999101161ADDGF100991999999999999999999MW1721
0035227070999991902122313004+62167+030650FM-12+010299999V0201801N001019999999N0000001N9-00781+99999100191ADDGF108991999999999999999999MW1721
0029227070999991902122320004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-00281+99999099601ADDGF108991999999999999999999
0029227070999991902122406004+62167+030650FM-12+010299999V0203201N004119999999N0000001N9-00111+99999098601ADDGF100991999999999999999999
0029227070999991902122413004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-00281+99999098711ADDGF108991999999999999999999
0029227070999991902122420004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-00501+99999098831ADDGF100991999999999999999999
0029227070999991902122506004+62167+030650FM-12+010299999V0201801N001019999999N0000001N9-00281+99999097351ADDGF108991999999999999999999
0029227070999991902122513004+62167+030650FM-12+010299999V0201801N015919999999N0000001N9-00221+99999095821ADDGF108991999999999999999999
0029227070999991902122520004+62167+030650FM-12+010299999V0201801N015919999999N0000001N9-00331+99999095751ADDGF108991999999999999999999
0029227070999991902122606004+62167+030650FM-12+010299999V0201801N006219999999N0000001N9-00891+99999095401ADDGF100991999999999999999999
0029227070999991902122613004+62167+030650FM-12+010299999V0202301N002119999999N0000001N9-00891+99999095281ADDGF107991999999999999999999
0029227070999991902122620004+62167+030650FM-12+010299999V0202301N001019999999N0000001N9-01331+99999095581ADDGF100991999999999999999999
0029227070999991902122706004+62167+030650FM-12+010299999V0203201N001019999999N0000001N9-01111+99999095801ADDGF108991999999999999999999
0035227070999991902122713004+62167+030650FM-12+010299999V0203601N002119999999N0000001N9-01061+99999096221ADDGF108991999999999999999999MW1721
0029227070999991902122720004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01171+99999096711ADDGF108991999999999999999999
0029227070999991902122806004+62167+030650FM-12+010299999V0203201N004119999999N0000001N9-01171+99999097441ADDGF100991999999999999999999
0029227070999991902122813004+62167+030650FM-12+010299999V0202901N001019999999N0000001N9-00891+99999097791ADDGF108991999999999999999999
0029227070999991902122820004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01061+99999097671ADDGF100991999999999999999999
0029227070999991902122906004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01281+99999097601ADDGF100991999999999999999999
0029227070999991902122913004+62167+030650FM-12+010299999V0203601N004119999999N0000001N9-01221+99999097721ADDGF108991999999999999999999
0029227070999991902122920004+62167+030650FM-12+010299999V0203601N001019999999N0000001N9-01441+99999097821ADDGF100991999999999999999999
0029227070999991902123006004+62167+030650FM-12+010299999V0203601N006219999999N0000001N9-01561+99999098041ADDGF106991999999999999999999
0029227070999991902123013004+62167+030650FM-12+010299999V0200501N001019999999N0000001N9-01561+99999097981ADDGF108991999999999999999999
0029227070999991902123020004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-01501+99999098461ADDGF100991999999999999999999
0029227070999991902123106004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01171+99999098641ADDGF108991999999999999999999
0029227070999991902123113004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01281+99999099551ADDGF108991999999999999999999
0029227070999991902123120004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01831+99999100111ADDGF100991999999999999999999

2.6 上传sample.txt到hdfs文件系统,即hdfs://localhost:9000/

hadoop dfs -put sample.txt hdfs://localhost:9000/

查看结果:

hadoop dfs -ls hdfs://localhost:9000/

2.7 执行job

hadoop jar mapreduce_maxtempature.jar     /sample.txt /output

成功后会在hdfs文件系统中自动生成output文件夹,里面有内容,是job执行结果。

若有错误请根据具体的结果调试。

我的结果如下:

hdfs dfs -cat hdfs://localhost:9000/output/part-r-00000
1901    317
1902    244
时间: 2024-10-08 10:03:00

实现一个mapreduce的job的相关文章

Hadoop学习---第三篇Hadoop的第一个Mapreduce程序

Mapreducer程序写了好几个了,但是之前一直都没有仔细的测试过本地运行和集群上运行的区别,今天写了一个Mapreduce程序,在此记录下来. 本地运行注意事项有以下几点: 1.本地必须配置好Hadoop的开发环境 2.在src里不加入配置文件运行,或者如果本地的src里有mapred-site.xml和yarn-site.xml配置文件,那么mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local 测试说明:sr

用一个MapReduce job实现去重,多目录输出功能

总结之前工作中遇到的一个问题. 背景: 运维用scribe从apache服务器推送过来的日志有重复记录,所以这边的ETL处理要去重,还有个需求是要按业务类型多目录输出,方便挂分区,后面的使用. 这两个需求单独处理都没有问题,但要在一个mapreduce里完成,需要一点技巧. 1.map输入数据,经过一系列处理,输出时: if(ttype.equals("other")){ file = (result.toString().hashCode() & 0x7FFFFFFF)%40

RHadoop教程翻译系列 _Mapreduce(1)_第一个Mapreduce任务

如果单从概念上来说,Mapreduce和R中的函数lapply, tapply并无差别,它们都是把元素转化成列,然后计算索引(Mapreduce中的键),最后合并成一个定义好的组合.首先,让我们看一个简单的lappy的例子. small.ints = 1:1000 sapply(small.ints, function(x) x^2) 这个例子比较简单,只是计算了前1000个整数的平方,不过我们可以从这个例子中对lappy这个函数有个基本的认知,接下来关于这个函数还有更多有意思的例子.现在让我们

hadoop下实现kmeans算法——一个mapreduce的实现方法

写mapreduce程序实现kmeans算法,我们的思路可能是这样的 1. 用一个全局变量存放上一次迭代后的质心 2. map里,计算每个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为key,样本作为value,输出 3. reduce里,输入的key是质心,value是其他的样本,这时重新计算聚类中心,将聚类中心put到一个全部变量t中. 4. 在main里比较前一次的质心和本次的质心是否发生变化,如果变化,则继续迭代,否则退出. 本文的思路基本上是按照上面的步骤来做的,只不过

第六章 开发一个MapReduce应用 第一节 配置

在第二章中,我们介绍了MapReduce模式.在本章中,我们看看在实际中开发一个MapReduce 应用. 写一个MapReduce程序要遵循一个特定的模式.开始时你要写map和reduce函数,最好把单元 测试也写上,确保程序做了你想做的.然后你写一个驱动程序来运行一个job,它可以使用数据的一 小部分在你的集成开发环境中运行以检查它是否正常工作.如果失败,你需要使用你的IDE调试器来 找到程序的问题.然后你可以扩展你的单元测试来覆盖这种情况,促使你的map或reduce可以正确 处理这种输入

hadoop在实现kmeans算法——一个mapreduce实施

写mapreduce程序实现kmeans算法.我们的想法可能是 1. 次迭代后的质心 2. map里.计算每一个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为key,样本作为value,输出 3. reduce里,输入的key是质心,value是其它的样本,这时又一次计算聚类中心,将聚类中心put到一个所有变量t中. 4. 在main里比較前一次的质心和本次的质心是否发生变化,假设变化,则继续迭代,否则退出. 本文的思路基本上是依照上面的步骤来做的,仅仅只是有几个问题须要解决 1

HDFS设计思路,HDFS使用,查看集群状态,HDFS,HDFS上传文件,HDFS下载文件,yarn web管理界面信息查看,运行一个mapreduce程序,mapreduce的demo

26 集群使用初步 HDFS的设计思路 l 设计思想 分而治之:将大文件.大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析: l 在大数据系统中作用: 为各类分布式运算框架(如:mapreduce,spark,tez,--)提供数据存储服务 l 重点概念:文件切块,副本存放,元数据 26.1 HDFS使用 1.查看集群状态 命令:   hdfs  dfsadmin –report 可以看出,集群共有3个datanode可用 也可打开web控制台查看HDFS集群

第一个MapReduce程序

计算文件中每个单词的频数 wordcount 程序调用 wordmap 和 wordreduce 程序. 1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduc

Hadoop 6、第一个mapreduce程序 WordCount

1.程序代码 Map: import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public