Hadoop第5周练习—MapReduce计算气象温度等例子

1    运行环境说明... 4

1.1     硬软件环境... 4

1.2     机器网络环境... 4

2    书面作业1:对云计算的看法... 4

2.1     书面作业1内容... 4

2.2     回答... 5

3    书面作业2:使用MapReduce求每年最低温度... 6

3.1     书面作业2内容... 6

3.2     运行代码... 6

3.2.1   MinTemperature. 6

3.2.2   MinTemperatureMapper6

3.2.3   MinTemperatureReducer7

3.3     实现过程... 8

3.3.1   编写代码... 8

3.3.2   编译代码... 9

3.3.3   打包编译文件... 9

3.3.4   创建目录... 10

3.3.5   解压气象数据并上传到HDFS中... 10

3.3.6   运行程序... 12

3.3.7   查看结果... 12

3.3.8   通过页面结果... 12

4    书面作业3:求温度平均值能使用combiner吗?... 15

4.1     书面作业3内容... 15

4.2     回答... 15

4.3     程序代码... 15

4.3.1   AvgTemperature.java. 15

4.3.2   AvgTemperatureMapper.java. 16

4.3.3   AvgTemperatureCombiner.java. 17

4.3.4   AvgTemperatureReducer.java. 17

4.4     实现过程... 18

4.4.1   编写代码... 18

4.4.2   编译代码... 20

4.4.3   打包编译文件... 20

4.4.4   运行程序... 21

4.4.5   查看结果... 22

5    书面作业4:使用Hadoop流求最高温度(awk脚本)... 22

5.1     书面作业4内容... 22

5.2     程序代码... 22

5.2.1   执行代码... 22

5.2.2   mapper.sh. 22

5.2.3   reducer.sh. 23

5.3     实现过程... 23

5.3.1   编写代码... 23

5.3.2   运行程序... 24

5.3.3   查看结果... 24

6    书面作业4:使用Hadoop流求最高温度(Python语言)... 25

6.1     书面作业4内容... 25

6.2     程序代码... 25

6.2.1   执行代码... 25

6.2.2   mapper.py. 25

6.2.3   reducer.py. 25

6.3     实现过程... 26

6.3.1   编写代码... 26

6.3.2   运行程序... 27

6.3.3   查看结果... 27

7    书面作业5:MapReduce是否可以自动识别新增节点?... 28

7.1     书面作业5内容... 28

7.2     程序代码... 28

7.3     实现过程... 28

7.3.1   环境准备... 28

7.3.2   准备数据... 32

7.3.3   不启动Hadoop4,运行任务... 33

7.3.4   启动Hadoop4,运行任务... 34

7.3.5   重启Hadoop集群,运行任务... 35

7.3.6   结论... 37

8    书面作业6:使用Hadoop公平调度器... 38

8.1     书面作业6内容... 38

8.2     程序代码... 38

8.3     实现过程... 38

8.3.1   打开公平调度器... 38

8.3.2   验证启动公平调度器... 39

8.3.3   准备数据... 40

8.3.4   运行分析MapReduce. 41

8.3.5   观察结果... 42

9    问题解决... 43

9.1     在作业5中新增节点后,DataNode无法启动... 43

1 运行环境说明

1.1  硬软件环境

l  主机操作系统:Windows 64 bit,双核4线程,主频2.2G,6G内存

l  虚拟软件:VMware® Workstation 9.0.0 build-812388

l  虚拟机操作系统:CentOS 64位,单核,1G内存

l  JDK:1.7.0_55 64 bit

l  Hadoop:1.1.2

1.2  机器网络环境

集群包含三个节点:1个namenode、2个datanode,其中节点之间可以相互ping通。节点IP地址和主机名分布如下:


序号


IP地址


机器名


类型


用户名


运行进程


1


10.88.147.221


hadoop1


名称节点


hadoop


NN、SNN、JobTracer


2


10.88.147.222


hadoop2


数据节点


hadoop


DN、TaskTracer


3


10.88.147.223


hadoop3


数据节点


hadoop


DN、TaskTracer

所有节点均是CentOS6.5 64bit系统,防火墙均禁用,所有节点上均创建了一个hadoop用户,用户主目录是/usr/hadoop。所有节点上均创建了一个目录/usr/local/hadoop,并且拥有者是hadoop用户。

2 书面作业1:对云计算的看法

2.1  书面作业1内容

说说你对云计算的看法,是忽悠?还是能带来真实价值的东西?

2.2  回答

云计算是对现有资源集中优化后,对客户提供服务,从现在的情况来看云计算真实的为大家提供了服务,比如:网盘等。至于云计算更为准确的定义为美国国家标准与技术研究院(NIST)定义:云计算是一种按使用量付费的模式,这种模式提供可用的、便捷的、按需的网络访问,进入可配置的计算资源共享池(资源包括网络,服务器,存储,应用软件,服务),这些资源能够被快速提供,只需投入很少的管理工作或与服务供应商进行很少的交互。

云计算特点如下:

(1) 超大规模:“云”具有相当的规模,赋予用户前所未有的计算能力;

(2) 虚拟化:云计算支持用户在任意位置、使用各种终端获取应用服务;

(3) 高可靠性:“云”使用了数据多副本容错、计算节点同构可互换等措施来保障服务的高可靠性;

(4) 通用性:云计算不针对特定的应用,同一个“云”可以同时支撑不同的应用运行;

(5) 高可扩展性:“云”的规模可以动态伸缩,满足应用和用户规模增长的需要;

(6) 按需服务:“云”是一个庞大的资源池,可以需购买;

(7) 极其廉价:由于“云”的特殊容错措施可以采用极其廉价的节点来构成云,“云”的自动化集中式管理使大量企业无需负担日益高昂的数据中心管理成本;

(8) 潜在的危险性

云计算可以认为包括以下几个层次的服务:基础设施即服务(IaaS),平台即服务(PaaS)和软件即服务(SaaS)。

l  IaaS(Infrastructure-as-a-Service):基础设施即服务。消费者通过Internet可以从完善的计算机基础设施获得服务。例如:硬件服务器租用。

l  PaaS:PaaS(Platform-as-a- Service):平台即服务。PaaS实际上是指将软件研发的平台作为一种服务,以SaaS的模式提交给用户。因此,PaaS也是SaaS模式的一种应用。但是,PaaS的出现可以加快SaaS的发展,尤其是加快SaaS应用的开发速度。例如:软件的个性化定制开发。

l  SaaS:SaaS(Software-as-a- Service):软件即服务。它是一种通过Internet提供软件的模式,用户无需购买软件,而是向提供商租用基于Web的软件,来管理企业经营活动。例如:阳光云服务器。

3  书面作业2:使用MapReduce求每年最低温度

3.1  书面作业2内容

下载气象数据集部分数据,写一个Map-Reduce作业,求每年的最低温度,部署并运行之,抓图过程

3.2  运行代码

3.2.1MinTemperature

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass MinTemperature {

publicstaticvoid main(String[] args) throws Exception {

if(args.length != 2) {

System.err.println("Usage: MinTemperature<input path> <output path>");

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(MinTemperature.class);

job.setJobName("Min temperature");

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MinTemperatureMapper.class);

job.setReducerClass(MinTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

 

 

3.2.2MinTemperatureMapper

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

publicclass MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

privatestatic final intMISSING = 9999;

@Override

publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if(line.charAt(87) == ‘+‘) {

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature = Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if(airTemperature != MISSING && quality.matches("[01459]")) {

context.write(new Text(year), new IntWritable(airTemperature));

}

}

}

3.2.3MinTemperatureReducer

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

publicclass MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

publicvoid reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int minValue = Integer.MAX_VALUE;

for(IntWritable value : values) {

minValue = Math.min(minValue, value.get());

}

context.write(key, new IntWritable(minValue));

}

}

3.3  实现过程

3.3.1编写代码

进入/usr/local/hadoop-1.1.2/myclass目录,在该目录中建立MinTemperature.java、MinTemperatureMapper.java和MinTemperatureReducer.java代码文件,代码内容为3.2所示,执行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi MinTemperature.java

vi MinTemperatureMapper.java

vi MinTemperatureReducer.java

MinTemperature.java:

MinTemperatureMapper.java:

MinTemperatureReducer.java:

3.3.2编译代码

在/usr/local/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java

ls

3.3.3打包编译文件

把编译好class文件打包,否则在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:

jar cvf ./MinTemperature.jar ./*.class

ls

mv *.jar ..

rm *.class

3.3.4创建目录

进入/usr/local/hadoop-1.1.2/bin目录,在HDFS中创建气象数据存放路径/usr/hadoop/in,执行命令如下:

cd /usr/local/hadoop-1.1.2/bin

hadoop fs -mkdir /usr/hadoop/in

hadoop fs -ls /usr/hadoop                

3.3.5解压气象数据并上传到HDFS中

使用SSH工具(参见第1、2周2.1.3.1Linux文件传输工具所描述)把从NCDC下载的气象数据上传到上步骤创建的目录/usr/local/hadoop-1.1.2/input中。

使用zcat命令把这些数据文件解压并合并到一个sample.txt文件中,合并后把这个文件上传到HDFS文件系统的/usr/hadoop/in目录中:

cd /usr/local/hadoop-1.1.2/input

zcat *.gz > sample.txt

hadoop fs -copyFromLocal sample.txt /usr/hadoop/in

气象数据具体的下载地址为 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,该数据包括1900年到现在所有年份的气象数据,大小大概有70多个G。为了测试简单,我们这里选取一部分的数据进行测试

3.3.6运行程序

以jar的方式启动MapReduce任务,执行输出目录为/usr/hadoop/out:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt  /usr/hadoop/out

3.3.7查看结果

执行成功后,查看/usr/hadoop/out目录中是否存在运行结果,使用cat查看结果:

hadoop fs -ls /usr/hadoop/out

hadoop fs -cat /usr/hadoop/out/part-r-00000

3.3.8通过页面结果

1.     查看jobtracker.jsp

http://10.88.147.221:50030/jobtracker.jsp

已经完成的作业任务:

任务的详细信息:

2.     查看dfshealth.jsp

http://10.88.147.221:50070/dfshealth.jsp

分别查看HDFS文件系统和日志

4    书面作业3:求温度平均值能使用combiner吗?

4.1  书面作业3内容

(选作)如果求温度的平均值,能使用combiner吗?有没有变通的方法?说说你的看法

4.2  回答

不能使用,因为求平均值和前面求最值存在差异,各局部最值的最值还是等于整体的最值的,但是对于平均值而言,各局部平均值的平均值将不再是整体的平均值了,所以不能用combiner。可以通过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平均值,而是存储所有值的和个数,最后在reducer输出时再用和除以个数得到平均值。

4.3  程序代码

4.3.1AvgTemperature.java

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass AvgTemperature {

publicstaticvoid main(String[] args) throws Exception {

if(args.length != 2) {

System.out.println("Usage: AvgTemperatrue <input path><output path>");

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(AvgTemperature.class);

job.setJobName("Avg Temperature");

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(AvgTemperatureMapper.class);

job.setCombinerClass(AvgTemperatureCombiner.class);

job.setReducerClass(AvgTemperatureReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

4.3.2AvgTemperatureMapper.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {

privatestaticfinalintMISSING = 9999;

@Override

publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if(line.charAt(87) == ‘+‘) {

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature =  Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if(airTemperature != MISSING && !quality.matches("[01459]")) {

context.write(new Text(year), new Text(String.valueOf(airTemperature)));

}

}

}

4.3.3AvgTemperatureCombiner.java

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

publicclass AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{

@Override

publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

double sumValue = 0;

long numValue = 0;

for(Text value : values) {

sumValue += Double.parseDouble(value.toString());

numValue ++;

}

context.write(key, new Text(String.valueOf(sumValue) + ‘,‘ + String.valueOf(numValue)));

}

}

4.3.4AvgTemperatureReducer.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

publicclass AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{

@Override

publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

double sumValue = 0;

long numValue = 0;

int avgValue = 0;

for(Text value : values) {

String[] valueAll = value.toString().split(",");

sumValue += Double.parseDouble(valueAll[0]);

numValue += Integer.parseInt(valueAll[1]);

}

avgValue  = (int)(sumValue/numValue);

context.write(key, new IntWritable(avgValue));

}

}

4.4  实现过程

4.4.1编写代码

进入/usr/local/hadoop-1.1.2/myclass目录,在该目录中建立AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代码文件,代码内容为4.3所示,执行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi AvgTemperature.java

vi AvgTemperatureMapper.java

vi AvgTemperatureCombiner.java

vi AvgTemperatureReducer.java

AvgTemperature.java:

AvgTemperatureMapper.java:

AvgTemperatureCombiner.java:

AvgTemperatureReducer.java:

4.4.2编译代码

在/usr/local/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java

ls

4.4.3打包编译文件

把编译好class文件打包,否则在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:

jar cvf ./AvgTemperature.jar ./*.class

ls

mv *.jar ..

rm *.class

4.4.4运行程序

数据使用作业2求每年最低温度的气象数据,数据在HDFS位置为/usr/ hadoop/in/sample.txt,以jar的方式启动MapReduce任务,执行输出目录为/usr/hadoop/out1:

cd /usr/local/hadoop-1.1.2

hadoop jar AvgTemperature.jar AvgTemperature /usr/hadoop/in/sample.txt  /usr/hadoop/out1

4.4.5查看结果

执行成功后,查看/usr/hadoop/out目录中是否存在运行结果,使用cat查看结果:

hadoop fs -ls /usr/hadoop/out1

hadoop fs -cat /usr/hadoop/out1/part-r-00000

5    书面作业4:使用Hadoop流求最高温度(awk脚本)

5.1  书面作业4内容

(选作)使用hadoop流的方法来实现对气象数据集求最高温度的分析任务(可能要使用awk这类脚本工具)

5.2  程序代码

5.2.1执行代码

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_awk \

-mapper myclass/map.sh \

-reducer myclass/reducer.sh \

-file myclass/map.sh \

-file myclass/reducer.sh

5.2.2mapper.sh

#!/usr/bin/awk -f

BEGIN { FS = "," }

{ year = substr($0, 16, 4) + 0;

temp = substr($0, 88, 5) + 0;

q = substr($0, 93, 1);

LF="\n";

if(temp != 9999 && q ~ /[01459]/)

{

printf "%s %s %s",  year, temp,LF;

}

}

END {

printf "\n"

}

5.2.3reducer.sh

#!/usr/bin/awk -f

{

temp[$1] =  $2

if ( temp[$1] > maxtemp[$1] )

maxtemp[$1] = temp[$1];

}

END {

for(num in maxtemp)

printf "%s %s\n", num, maxtemp[num];

}

5.3  实现过程

5.3.1编写代码

进入/usr/local/hadoop-1.1.2/myclass目录,在该目录中建立mapper.sh和reducer.sh代码文件,代码内容为5.2所示,执行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi mapper.sh

vi reducer.sh

mapper.sh:

reducer.sh:

5.3.2运行程序

数据使用作业2求每年最低温度的气象数据,数据在HDFS位置为/usr /hadoop/in/sample.txt,启动MapReduce任务,执行输出目录为/usr/hadoop/out_awk:

cd /usr/local/hadoop-1.1.2

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_awk \

-mapper myclass/mapper.sh \

-reducer myclass/reducer.sh \

-file myclass/mapper.sh \

-file myclass/reducer.sh

5.3.3查看结果

执行成功后,查看/usr/hadoop/out_awk目录中是否存在运行结果,使用cat查看结果:

hadoop fs -ls /usr/hadoop/out_awk

hadoop fs -cat /usr/hadoop/out_awk/part-00000

6    书面作业4:使用Hadoop流求最高温度(Python语言)

6.1  书面作业4内容

(选作)使用hadoop流的方法来实现对气象数据集求最高温度的分析任务(可能要使用awk这类脚本工具)

6.2  程序代码

6.2.1执行代码

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar  \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_python \

-mapper myclass/mapper.py \

-reducer myclass/reduce.py \

-file myclass/mapper.py \

-file myclass/reduce.py

6.2.2mapper.py

#!/usr/bin/env python

import re

import sys

for line in sys.stdin:

val = line.strip()

(year, temp, q) = (val[15:19], val[87:92], val[92:93])

if (temp != "+9999" and re.match("[01459]", q)):

print "%s\t%s" % (year, temp)

6.2.3reducer.py

#!/usr/bin/env python

import sys

(last_key, max_val) = (None, -sys.maxint)

for line in sys.stdin:

(key, val) = line.strip().split("\t")

if last_key and last_key != key:

print "%s\t%s" % (last_key, max_val)

(last_key, max_val) = (key, int(val))

else:

(last_key, max_val) = (key, max(max_val, int(val)))

if last_key:

print "%s\t%s" % (last_key, max_val)

6.3  实现过程

6.3.1编写代码

进入/usr/local/hadoop-1.1.2/myclass目录,在该目录中建立mapper.py和reducer.py代码文件,代码内容为6.2所示,执行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi mapper.py

vi reducer.py

mapper.py:

reducer.py:

6.3.2运行程序

数据使用作业2求每年最低温度的气象数据,数据在HDFS位置为/usr/hadoop/in/sample.txt,启动MapReduce任务,执行输出目录为/usr/hadoop/out_py:

cd /usr/local/hadoop-1.1.2

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_py \

-mapper myclass/mapper.py \

-reducer myclass/reducer.py \

-file myclass/mapper.py \

-file myclass/reducer.py

6.3.3查看结果

执行成功后,查看/usr/hadoop/out_py目录中是否存在运行结果,使用cat查看结果:

hadoop fs -ls /usr/hadoop/out_py

hadoop fs -cat /usr/hadoop/out_py/part-00000

7    书面作业5:MapReduce是否可以自动识别新增节点?

7.1  书面作业5内容

(选作)如果向正在运行的Hadoop集群增加一个新节点,Map-Reduce体系是否可以自动识别并使用这个新节点?如果不能怎样才可以将其加入到Map-Reduce体系?用实验进行验证

7.2  程序代码

参见作业2代码并打成jar包

7.3  实现过程

在这里将运行本周作业2求每年最低温度的MapReduce(当然也可以运行求最高温度等例子),为了能够更清楚的观察实验的效果,在该作业中运行的数据尽可能大,以下实验中将使用1970~1972年各年份前400个气象站数据,解压后大概480M左右。在本实验中将观察两个情况:一种是运行过程新增节点,另一种是重启集群观察作业是否运行在新增节点上?

新增节点信息:


序号


IP地址


机器名


类型


用户名


运行进程


4


10.88.147.224


Hadoop4


数据节点


hadoop


DN、TaskTracer

7.3.1环境准备

参考第三周第二题第四个问题--SNN与NN分离--进行环境准备,增加一个新节点并确保各节点能够免密码SSH访问

7.3.1.1复制虚拟机

复制DataNode节点所在虚拟机,新虚拟机作为新增节点

7.3.1.2设置新增节点虚拟机IP地址

设置该节点IP地址为:10.88.147.224

7.3.1.3设置新增节点虚拟机名称

设置新增节点虚拟机名称为:hadoop4

sudo vi /etc/sysconfig/network

7.3.1.4所有节点hosts 文件加入新增节点的 IP对应信息

在所有节点/etc/hosts文件中加入新增节点的IP地址10.88.147.224对应hadoop4

sudo vi /etc/hosts

7.3.1.5所有节点slavers文件加入新增节点信息

在新增节点hadoop4机器的slaves文件中加入新增节点机器名信息,使用如下命令:

sudo vi /usr/local/hadoop-1.1.2/conf/slaves

在slaves文件中加入新增节点机器名

7.3.1.6重启所有虚拟机

7.3.1.7配置ssh免密码登录

1.     在hadoop4(10.88.147.244)节点中使用ssh-keygen -t rsa生成私钥和公钥;

2.     把hadoop4(10.88.147.244)节点中公钥信息加入到authorized_keys文件中;

chmod 400 -R /home/hadoop/.ssh

cat id_rsa.pub >> authorized_keys

cat authorized_keys

3.     把authorized_keys分发到各个节点上;

scp authorized_keys [email protected]:/home/hadoop/.ssh

4.     验证是否能够免登录到各个节点;

7.3.2准备数据

参考作业2,为了能够更清楚的观察实验的效果,在该作业中运行的数据尽可能大,在这里上传数据大概480M左右。使用SSH工具(参见第1、2周2.1.3.1    Linux文件传输工具所描述)把从NCDC下载的气象数据上传到上步骤创建的目录/usr/local/hadoop-1.1.2/input中。

先把这三年的数据放到/usr/local/hadoop-1.1.2/input目录下,然后使用zcat命令把这些数据文件解压并合并到一个newnode.txt文件中,合并后把这个文件上传到HDFS文件系统的/usr/hadoop/in目录中:

cd /usr/local/hadoop-1.1.2/input

mv 1971/*.gz .

mv 1972/*.gz .

mv 1973/*.gz .

zcat *.gz > newnode.txt

ll newnode.txt

hadoop fs -copyFromLocal newnode.txt /usr/hadoop/in

hadoop fs -ls /usr/hadoop/in

7.3.3不启动Hadoop4,运行任务

使用本周作业2打包的jar包,启动作业,执行输出目录为/usr/hadoop/out_newnode1:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt  /usr/hadoop/out_newnode1

查看运行作业节点信息,Hadoop集群由两个DataNode节点,分别为Hadoop2和Hadoop3:

7.3.4启动Hadoop4,运行任务

再次启动作业,执行输出目录为/usr/hadoop/out_newnode2:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt  /usr/hadoop/out_newnode2

运行作业后,启动新增节点Hadoop4中的DataNode和TaskTracker进程

cd /usr/local/hadoop-1.1.2/bin

hadoop-daemon.sh start datanode

hadoop-daemon.sh start tasktracker

jps

观察结果,可以看到Hadoop集群识别到Hadoop4节点并发现该节点运行的TaskTracker:

在新增节点Hadoop4正在运行task任务

7.3.5重启Hadoop集群,运行任务

停止Hadoop集群运行:

cd /usr/local/hadoop-1.1.2/bin

stop-all.sh

在Hadoop4节点中结束DataNode和TaskTracker进程:

cd /usr/local/hadoop-1.1.2/bin

hadoop-daemon.sh stop datanode

hadoop-daemon.sh stop tasktracker

在Hadoop1节点中HADOOP_HOME/conf/salvers中加入Hadoop4节点

cd /usr/local/hadoop-1.1.2/conf

sudo vi /usr/local/hadoop-1.1.2/conf/slaves

启动Hadoop集群,从下图可以看见Hadoop4节点已经随Hadoop集群启动:

cd /usr/local/hadoop-1.1.2/bin

start-all.sh

再次启动运行作业,可以观察到Hadoop4节点得到运行任务:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt  /usr/hadoop/out_newnode3

7.3.6结论

综上,当Hadoop集群有新节点加入时,正在运行的MapReduce作业自动识别并使用新节点;当新节点加入Hadoop集群后,再启动MapReduce作业,MapReduce作业也能自动识别并使用新节点。

8    书面作业6:使用Hadoop公平调度器

8.1  书面作业6内容

(选作)怎样打开Hadoop的公平调度器?怎样证实当前处于公平调度下。设计一个实验方案验证FIFO调度器和公平调度器的异同,最好能实践之

8.2  程序代码

参见作业2代码并打成jar包

8.3  实现过程

在这里将运行本周作业2求每年最低温度的MapReduce(当然也可以运行求最高温度等例子),为了能够更加明显看到实验效果该实验将准备两份实验数据:数据A为1970~1972年各年份前10个气象站合并数据,数据B为1970~1972年各年份前240个气象站合并数据,那么分析数据A的MapReduceA时间将大大少于分析数据B的MapReduceB时间。在实验过程中将让分析数据B的MapReduceB先启动,然后启动分析数据A的MapReduceA,观察MapReduceA和MapReduceB哪一个先结束?

8.3.1打开公平调度器

要让公平调度器能在Hadoop中运行,需要把相应的jar放到CLASSPATH中。在Hadoop1.X以前需要把hadoop-*-fairscheduler.jar从HADOOP_HOME/build/contrib/fairscheduler拷贝到HADOOP_HOME/lib,在Hadoop1.1.2版本在HADOOP_HOME/lib已经包含了公平调度器和能力调度器两个jar包

并需要在Hadoop的配置文件HADOOP_CONF_DIR/mapred-site.xml中设置下列属性让Hadoop启用公平调度器:

  <property>

    <name>mapred.jobtracker.taskScheduler</name>

    <value>org.apache.hadoop.mapred.FairScheduler</value>

  </property>

8.3.2验证启动公平调度器

重启集群

cd /usr/local/hadoop-1.1.2/bin

stop-all.sh

start-all.sh

通过jobtracker web UI查看:http://<JobTracker URL>/scheduler,在该实验地址为http://10.88.147.221:50030/scheduler,如下图所示

8.3.3准备数据

使用SSH工具(参见第1、2周2.1.3.1Linux文件传输工具所描述)把从NCDC下载的气象数据上传到上步骤创建的目录/usr/local/hadoop-1.1.2/input中。

使用zcat命令把这些数据文件分别解压并合并到shortData.txt和longData.txt文件中,合并后把这两个文件上传到HDFS文件系统的/usr/hadoop/in目录中:

cd /usr/local/hadoop-1.1.2/input

zcat *.gz > shortData.txt

zcat *.gz > longData.txt

hadoop fs -copyFromLocal shortData.txt /usr/hadoop/in

hadoop fs -copyFromLocal longData.txt /usr/hadoop/in

hadoop fs -ls /usr/hadoop/in

8.3.4运行分析MapReduce

使用本周作业2打包的jar包,先启动MapReduceB,然后在另外一个登录Session中启动MapReduceA,执行输出目录为/usr/hadoop/out_long 和/usr/hadoop/out_short:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/longData.txt  /usr/hadoop/out_long

在另外一个登录Session运行情况:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/shortData.txt  /usr/hadoop/out_short

8.3.5观察结果

可以从上步骤结果图观察到:

MapReduceB 开始执行时间2014-10-14 16:51:13 结束时间:2014-10-14 16:53:36

MapReduceA 开始执行时间2014-10-14 16:51:20 结束时间:2014-10-14 16:53:28

MapReduceB这个job先运行,但因为这个job处理的数据量较大,而后一个MapReduceA的 job处理的数据量较小,所以MapReduceA的 job反而较早运行结束,可知公平调度器设置成功。

MapReduceB 作业ID为:job_201410141618_0009

MapReduceA 作业ID为:job_201410141618_0010

9    问题解决

9.1  在作业5中新增节点后,DataNode无法启动

通过拷贝Hadoop3(10.88.147.223)节点,修改名称后成为Hadoop4(10.88.147.224)节点,启动Hadoop3或者Hadoop4的DataNode会出现如下错误:

2014-10-15 16:28:40,344 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DataNode is shutting down: org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException: Data node 10.88.147.223:50010 is attempting to report storage ID DS-60030049-10.88.147.223-50010-1411456729315. Node 10.88.147.224:50010 is expected to serve this storage.

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDatanode(FSNamesystem.java:4776)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.processReport(FSNamesystem.java:3628)

at org.apache.hadoop.hdfs.server.namenode.NameNode.blockReport(NameNode.java:1041)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:578)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1393)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1389)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1387)

at org.apache.hadoop.ipc.Client.call(Client.java:1107)

at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)

at com.sun.proxy.$Proxy5.blockReport(Unknown Source)

at org.apache.hadoop.hdfs.server.datanode.DataNode.offerService(DataNode.java:1026)

at org.apache.hadoop.hdfs.server.datanode.DataNode.run(DataNode.java:1527)

at java.lang.Thread.run(Thread.java:745)

解决办法是修改Hadoop4节点中HADOOP_HOME/hdfs/data/current/VERSION文件,把storageID由DS-60030049-10.88.147.223-50010-1411456729315修改成DS-60030049-10.88.147.224-50010-1411456729315即可

时间: 2024-07-31 07:49:28

Hadoop第5周练习—MapReduce计算气象温度等例子的相关文章

Hadoop第7周练习—MapReduce进行数据查询和实现推简单荐系统(转)

1  运行环境说明 1.1 硬软件环境 1.2 机器网络环境 2  书面作业1:计算员工相关 2.1 书面作业1内容 2.2  实现过程 2.2.1   准备测试数据 2.2.2   问题1:求各个部门的总工资 2.2.3   问题2:求各个部门的人数和平均工资 2.2.4   问题3:求每个部门最早进入公司的员工姓名 2.2.5   问题4:求各个城市的员工的总工资 2.2.6   问题5:列出工资比上司高的员工姓名及其工资 2.2.7   问题6:列出工资比公司平均工资要高的员工姓名及其工资

hadoop之魂--mapreduce计算框架,让收集的数据产生价值 (第4篇)

  通过前面的学习,大家已经了解了HDFS文件系统.有了数据,下一步就要分析计算这些数据,产生价值.接下来我们介绍Mapreduce计算框架,学习数据是怎样被利用的. Mapreduce计算框架 如果将Hadoop比做一头大象,那么MapReduce就是那头大象的电脑.MapReduce是Hadoop核心编程模型.在Hadoop中,数据处理核心就是MapReduce程序设计模型. 本章内容: 1) MapReduce编程模型 2) MapReduce执行流程 3) MapReduce数据本地化

Hadoop MapReduce计算框架

1.MapReduce理论 1.1.MapReduce是什么? MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据):MapReduce是谷歌公司在研究如何处理海量数据所提出的一种面向大规模数据处理的并行计算模型和方法. 1.2.MapReduce概述 MapReduce是一个计算框架,用于对大数据进行处理,它的主要思想就是"分而治之":整个MapReduce计算过程可以分为Map(映射)阶段

使用mapreduce计算环比的实例

最近做了一个小的mapreduce程序,主要目的是计算环比值最高的前5名,本来打算使用spark计算,可是本人目前spark还只是简单看了下,因此就先改用mapreduce计算了,今天和大家分享下这个例子,也算是对自己写的程序的总结了. 首先解释下环比,例如我们要算本周的环比,那么计算方式就是本周的数据和上周数字的差值除以上周数值就是环比了,如果是月的环比就是本月和上月数据的差值除以上月数字就是本月环比了.不过本mapreduce实例不会直接算出比值,只是简单求出不同时间段数值的差值,最终环比结

Hadoop学习笔记(2) 关于MapReduce

1. 查找历年最高的温度. MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段.每个阶段都以键/值对作为输入和输出,并由程序员选择它们的类型.程序员还需具体定义两个函数:map函数和reduce函数. 对应的Java MapReduce代码如下: public class MaxTemperature{ static class MaxTemperatureMapper extends Mapper<LongWritable,Text,Text,IntWritable>

Hadoop学习基础之三:MapReduce

现在是讨论这个问题的不错的时机,因为最近媒体上到处充斥着新的革命所谓“云计算”的信息.这种模式需要利用大量的(低端)处理器并行工作来解决计算问题.实际上,这建议利用大量的低端处理器来构建数据中心,而不是利用数目少的多的高端服务器来构建. 举例来说,IBM和Google已经宣布计划用1000台处理器构建的集群提供给部分大学,传授学生们如何使用MapReduce工具在这些集群上编程.加利福尼亚大学伯克利分校甚至打算开设使用MapReduce框架编程的课程.我们对MapReduce支持者大肆炒作它如何

从 WordCount 到 MapReduce 计算模型

概述 虽然现在都在说大内存时代,不过内存的发展怎么也跟不上数据的步伐吧.所以,我们就要想办法减小数据量.这里说的减小可不是真的减小数据量,而是让数据分散开来.分开存储.分开计算.这就是 MapReduce 分布式的核心. 版权说明 著作权归作者所有. 商业转载请联系作者获得授权,非商业转载请注明出处. 本文作者:Coding-Naga 发表日期: 2016年5月10日 本文链接:http://blog.csdn.net/lemon_tree12138/article/details/513677

MapReduce计算模型二

之前写过关于Hadoop方面的MapReduce框架的文章MapReduce框架Hadoop应用(一) 介绍了MapReduce的模型和Hadoop下的MapReduce框架,此文章将进一步介绍mapreduce计算模型能用于解决什么问题及有什么巧妙优化. MapReduce到底解决什么问题? MapReduce准确的说,它不是一个产品,而是一种解决问题的思路,能够用分治策略来解决问题.例如:网页抓取.日志处理.索引倒排.查询请求汇总等等问题.通过分治法,将一个大规模的问题,分解成多个小规模的问

Hadoop二次排序及MapReduce处理流程实例详解

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map.