总结自己在Hadoop开发中遇到的问题,主要在mapreduce代码执行方面。大部分来自日常代码执行错误的解决方法,还有一些是对Java、Hadoop剖析。对于问题,通过查询stackoverflow、csdn找到了解决方法。汇总出来以后查询方便。内容将不定期更新。
1、jar包执行出错,提示“class wordcount.WordCountMapper not found”
错误原因:在run()代码中没有定义setJarByClass
解决方法:在wordcountJob.java中增加 job.setJarByClass(getClass());
2、jar包执行出错,提示“exception in thread “main” java.lang.ClassNotFound"
错误原因:
1)build.gradle文件project.ext.mainclass配置错误
解决方法:重新配置文件夹的build.gradle文件,把project.ext.mainclass配置为packageName.mainClassName(packageName与mainClassName用实际名称替换,如’wordcount.WordCountJob’)
2)源代码的source folder文件夹结构错误。
解决方法:source folder需要严格配置为src/main/java,package放在source folder里面,class放在package下面。
注意:
- eclipse编译jar包时候,需要读取build.gradle内容。在build.gradle的mainclass中,packageName与mainClassName需要与实际包、类名称一致。
- source folder文件夹结构必须为src/main/java。
- 各class(非inner class)名称需要与该代码的文件名一致。
3、jar包执行出错,提示“Java.lang.NullPointerException”
错误原因:相关类没有实例化。在错误日志中,“at wordcount.WordCount.Mapper.Map(WordCountMapper.java:26)”指示第26行outputValue没有初始化(代码中仅仅声明private IntWritable outputValue;,没有用new IntWritable()分配存储空间),但后面代码直接使用set方法(outputValue.set),导致意外。
解决方法:在outputValue声明时候,增加new内存分配(private intWritable outputValue = new IntWritable();)
4、jar包执行结果关键counter
FILE_BYTES_READ //job读取本地文件系统的文件字节数。假定我们当前map的输入数据都来自于HDFS,那么在map阶段,这个数据应该是0。但reduce在执行前,它的输入数据是经过shuffle的merge后存储在reduce端本地磁盘中,所以这个数据就是所有reduce的总输入字节数。
FILE_BYTES_WRITTEN //map的中间结果都会spill到本地磁盘中,在map执行完后,形成最终的spill文件。所以map端这里的数据就表示map task往本地磁盘中总共写了多少字节。与map端相对应的是,reduce端在shuffle时,会不断地拉取map端的中间结果,然后做merge并不断spill到自己的本地磁盘中。最终形成一个单独文件,这个文件就是reduce的输入文件。
HDFS_BYTES_READ //整个job执行过程中,只有map端运行时,才从HDFS读取数据,这些数据不限于源文件内容,还包括所有map的split元数据。所以这个值应该比FileInputFormatCounters.BYTES_READ 要略大些。
HDFS_BYTES_WRITTEN //Reduce的最终结果都会写入HDFS,就是一个job执行结果的总量。
Combine input records //Combiner是为了减少尽量减少需要拉取和移动的数据,所以combine输入条数与map的输出条数是一致的。
Combine output records //经过Combiner后,相同key的数据经过压缩,在map端自己解决了很多重复数据,表示最终在map端中间文件中的所有条目数
Failed Shuffles //copy线程在抓取map端中间数据时,如果因为网络连接异常或是IO异常,所引起的shuffle错误次数
GC time elapsed(ms) //通过JMX获取到执行map与reduce的子JVM总共的GC时间消耗
Map input records //所有map task从HDFS读取的文件总行数
Map output records //map task的直接输出record是多少,就是在map方法中调用context.write的次数,也就是未经过Combine时的原生输出条数
Map output bytes //Map的输出结果key/value都会被序列化到内存缓冲区中,所以这里的bytes指序列化后的最终字节之和
Merged Map outputs //记录着shuffle过程中总共经历了多少次merge动作
Reduce input groups //Reduce总共读取了多少个这样的groups
Reduce input records //如果有Combiner的话,那么这里的数值就等于map端Combiner运算后的最后条数,如果没有,那么就应该等于map的输出条数
5、在类的方法中定义的变量,不能加访问控制(private/protect/public),否则出错。
类内部 | 本包 | 子类 | 外部包 | |
public | √ | √ | √ | √ |
protected | √ | √ | √ | × |
default | √ | √ | × | × |
private | √ | × | × | × |
(1)public:可以被所有其他类所访问。可以访问任何一个在CLASSPATH下的类、接口、异常等。它往往用于对外的情况,也就是对象或类对外的一种接口的形式。
(2)private:只能被自己访问和修改。
(3)protected:自身,子类及同一个包中类可以访问。
(4)default(默认):同一包中的类可以访问,声明时没有加修饰符,认为是friendly。
static:如果将类中的域定义为static,则这个域属于这个类,而不属于这个类的某个对象,每个类中只有一个这样的域,而每一个类对象对于所有的实例域(即没有定义为static的域)都有自己的一份拷贝。
静态常量:如果一个域被定义为static final,则这个域就是一个静态常量。不能省略任何一个关键字,若是少了static,则该域变成了一个实例域,需要由类对象对其进行访问。若是省略了final,则该域变成了静态域,静态方法可以对其进行修改。
静态方法:静态方法是一种不能向对象实施操作的方法。Math的pow方法就是一个静态方法,在运算时,不使用任何Math对象,换句话说,没有隐式的参数this。因为静态方法不能操作对象,所以不能在静态方法中访问实例域,但是静态方法可以访问自身类中的静态域。可以使用对象调用静态方法,但是这样容易引起混淆,因为计算的结果与对象毫无关系,建议还是使用类名,而不是类对象调用静态方法。
在下面两种情况下使用静态方法:
1.一个方法不需要访问对象的状态,其所需的参数都是通过显式的提供
2.一个方法只需访问类的静态域
6、Java泛型(generics)说明
1)总体介绍
在引入范型之前,Java类型分为原始类型、复杂类型,其中复杂类型分为数组和类。引入范型后,一个复杂类型就可以在细分成更多的类型。例如原先的类型List,现在在细分成List<Object>, List<String>等更多的类型。注意,现在List<Object>, List<String>是两种不同的类型,他们之间没有继承关系,即使String继承了Object。下面的代码是非法的
List<String> ls = new ArrayList<String>();
List<Object> lo = ls;
这样设计的原因在于,根据lo的声明,编译器允许你向lo中添加任意对象(例如Integer),但是此对象是List<String>,破坏了数据类型的完整性。
在引入范型之前,要在类中的方法支持多个数据类型,就需要对方法进行重载,在引入范型后,可以解决此问题(多态),更进一步可以定义多个参数以及返回值之间的关系。例如
public void write(Integer i, Integer[] ia);
public void write(Double d, Double[] da);
的范型版本为
public <T> void write(T t, T[] ta);
2)定义&使用
类型参数的命名风格为:推荐你用简练的名字作为形式类型参数的名字(如果可能,单个字符)。最好避免小写字母,这使它和其他的普通的形式参数很容易被区分开来。使用T代表类型,无论何时都没有比这更具体的类型来区分它。这经常见于泛型方法。如果有多个类型参数,我们可能使用字母表中T的临近的字母,比如S。如果一个泛型函数在一个泛型类里面出现,最好避免在方法的类型参数和类的类型参数中使用同样的名字来避免混淆。对内部类也是同样。
- 定义带类型参数的类
在定义带类型参数的类时,在紧跟类命之后的<>内,指定一个或多个类型参数的名字,同时也可以对类型参数的取值范围进行限定,多个类型参数之间用,号分隔。定义完类型参数后,可以在定义位置之后的类的几乎任意地方(静态块,静态属性,静态方法除外)使用类型参数,就像使用普通的类型一样。注意,父类定义的类型参数不能被子类继承。
public class TestClassDefine<T, S extends T> {
....
}
- 定义带类型参数方法
在定义带类型参数的方法时,在紧跟可见范围修饰(例如public)之后的<>内,指定一个或多个类型参数的名字,同时也可以对类型参数的取值范围进行限定,多个类型参数之间用,号分隔。定义完类型参数后,可以在定义位置之后的方法的任意地方使用类型参数,就像使用普通的类型一样。
例如:
public <T, S extends T> T testGenericMethodDefine(T t, S s){
...
}
注意:定义带类型参数的方法,主要目的是为了表达多个参数以及返回值之间的关系。例如本例子中T和S的继
承关系, 返回值的类型和第一个类型参数的值相同。如果仅仅是想实现多态,请优先使用通配符解决。通配符的内容见下面章节。
public <T> void testGenericMethodDefine2(List<T> s){
...
}
应改为
public void testGenericMethodDefine2(List<?> s){
...
}
3.)类型参数赋值
当对类或方法的类型参数进行赋值时,要求对所有的类型参数进行赋值。否则,将得到一个编译错误。
- 对带类型参数的类进行类型参数赋值
对带类型参数的类进行类型参数赋值有两种方式
第一声明类变量或者实例化时。例如
List<String> list;
list = new ArrayList<String>;
第二继承类或者实现接口时。例如
public class MyList<E> extends ArrayList<E> implements List<E> {...}
- 对带类型参数方法进行赋值
当调用范型方法时,编译器自动对类型参数进行赋值,当不能成功赋值时报编译错误。例如
public <T> T testGenericMethodDefine3(T t, List<T> list){
...
}
public <T> T testGenericMethodDefine4(List<T> list1, List<T> list2){
...
}
Number n = null;
Integer i = null;
Object o = null;
testGenericMethodDefine(n, i);//此时T为Number, S为Integer
testGenericMethodDefine(o, i);//T为Object, S为Integer
List<Number> list1 = null;
testGenericMethodDefine3(i, list1)//此时T为Number
List<Integer> list2 = null;
testGenericMethodDefine4(list1, list2)//编译报错
- 通配符
在上面两小节中,对是类型参数赋予具体的值,除此,还可以对类型参数赋予不确定值。例如
List<?> unknownList;
List<? extends Number> unknownNumberList;
List<? super Integer> unknownBaseLineIntgerList;
注意: 在Java集合框架中,对于参数值是未知类型的容器类,只能读取其中元素,不能像其中添加元素,因为,其类型是未知,所以编译器无法识别添加元素的类型和容器的类型是否兼容,唯一的例外是NULL
List<String> listString;
List<?> unknownList2 = listString;
unknownList = unknownList2;
listString = unknownList;//编译错误
4)数组范型
可以使用带范型参数值的类声明数组,却不可有创建数组
List<Integer>[] iListArray;
new ArrayList<Integer>[10];//编译时错误
5)实现原理
Java范型时编译时技术,在运行时不包含范型信息,仅仅Class的实例中包含了类型参数的定义信息。泛型是通过java编译器的称为擦除(erasure)的前端处理来实现的。你可以(基本上就是)把它认为是一个从源码到源码的转换,它把泛型版本转换成非泛型版本。基本上,擦除去掉了所有的泛型类型信息。所有在尖括号之间的类型信息都被扔掉了,因此,比如说一个List<String>类型被转换为List。所有对类型变量的引用被替换成类型变量的上限(通常是Object)。而且,无论何时结果代码类型不正确,会插入一个到合适类型的转换。
<T> T badCast(T t, Object o) {
return (T) o; // unchecked warning
}
类型参数在运行时并不存在。这意味着它们不会添加任何的时间或者空间上的负担,这很好。不幸的是,这也意味着你不能依靠他们进行类型转换。
- 一个泛型类被其所有调用共享
下面的代码打印的结果是什么?
List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());
或许你会说false,但是你想错了。它打印出true。因为一个泛型类的所有实例在运行时具有相同的运行时类(class),而不管他们的实际类型参数。事实上,泛型之所以叫泛型,就是因为它对所有其可能的类型参数,有同样的行为;同样的类可以被当作许多不同的类型。作为一个结果,类的静态变量和方法也在所有的实例间共享。这就是为什么在静态方法或静态初始化代码中或者在静态变量的声明和初始化时使用类型参数(类型参数是属于具体实例的)是不合法的原因。
- 转型和instanceof
泛型类被所有其实例(instances)共享的另一个暗示是检查一个实例是不是一个特定类型的泛型类是没有意义的。
Collection cs = new ArrayList<String>();
if (cs instanceof Collection<String>) { ...} // 非法
类似的,如下的类型转换
Collection<String> cstr = (Collection<String>) cs;
得到一个unchecked warning,因为运行时环境不会为你作这样的检查。
6)Class的范型处理
Java 5之后,Class变成范型化了。JDK1.5中一个变化是类 java.lang.Class是泛型化的。这是把泛型扩展到容器类之外的一个很有意思的例子。现在,Class有一个类型参数T, 你很可能会问,T 代表什么?它代表Class对象代表的类型。比如说,String.class类型代表 Class<String>,Serializable.class代表 Class<Serializable>。这可以被用来提高你的反射代码的类型安全。
特别的,因为 Class的 newInstance() 方法现在返回一个T, 你可以在使用反射创建对象时得到更精确的类型。比如说,假定你要写一个工具方法来进行一个数据库查询,给定一个SQL语句,并返回一个数据库中符合查询条件的对象集合(collection)。一个方法是显式的传递一个工厂对象,像下面的代码:
interface Factory<T> {
public T[] make();
}
public <T> Collection<T> select(Factory<T> factory, String statement) {
Collection<T> result = new ArrayList<T>();
/* run sql query using jdbc */
for ( int i=0; i<10; i++ ) { /* iterate over jdbc results */
T item = factory.make();
/* use reflection and set all of item’s fields from sql results */
result.add( item );
}
return result;
}
你可以这样调用:
select(new Factory<EmpInfo>(){
public EmpInfo make() {
return new EmpInfo();
}
} , ”selection string”);
也可以声明一个类 EmpInfoFactory 来支持接口 Factory:
class EmpInfoFactory implements Factory<EmpInfo> { ...
public EmpInfo make() { return new EmpInfo();}
}
然后调用:
select(getMyEmpInfoFactory(), "selection string");
这个解决方案的缺点是它需要下面的二者之一:调用处那冗长的匿名工厂类,或为每个要使用的类型声明一个工厂类并传递其对象给调用的地方,这很不自然。使用class类型参数值是非常自然的,它可以被反射使用。没有泛型的代码可能是:
Collection emps = sqlUtility.select(EmpInfo.class, ”select * from emps”); ...
public static Collection select(Class c, String sqlStatement) {
Collection result = new ArrayList();
/* run sql query using jdbc */
for ( /* iterate over jdbc results */ ) {
Object item = c.newInstance();
/* use reflection and set all of item’s fields from sql results */
result.add(item);
}
return result;
}
但是这不能给我们返回一个我们要的精确类型的集合。现在Class是泛型的,我们可以写:
Collection<EmpInfo> emps=sqlUtility.select(EmpInfo.class, ”select * from emps”); ...
public static <T> Collection<T> select(Class<T>c, String sqlStatement) {
Collection<T> result = new ArrayList<T>();
/* run sql query using jdbc */
for ( /* iterate over jdbc results */ ) {
T item = c.newInstance();
/* use reflection and set all of item’s fields from sql results */
result.add(item);
}
return result;
}
来通过一种类型安全的方式得到我们要的集合。这项技术是一个非常有用的技巧,它已成为一个在处理注释(annotations)的新API中被广泛使用的习惯用法。
7)新老代码兼容
- 为了保证代码的兼容性,下面的代码编译器(javac)允许,类型安全有你自己保证
List l = new ArrayList<String>();
List<String> l = new ArrayList();
- 在将你的类库升级为范型版本时,慎用协变式返回值。
例如,将代码
public class Foo {
public Foo create(){
return new Foo();
}
}
public class Bar extends Foo {
public Foo create(){
return new Bar();
}
}
采用协变式返回值风格,将Bar修改为
public class Bar extends Foo {
public Bar create(){
return new Bar();
}
}
要小心你类库的客户端。
7、独立创建Hadoop代码操作说明
1)本地新建文件夹
2)把build.gradle拷贝到该文件夹中,并修改该文件的projectName、mainClassName
3)在eclipse中import该文件夹,build模型即可
8、在eclipse执行run as -> gradle时候,弹出空窗口
解决方法:在空白输入框(提示需要ctrl + space)中,输入“clean”,换行后再输入”build“。点击apply - run。
9、Mapper类说明
1)InputFormat 产生 InputSplit,并且调用RecordReader将这些逻辑单元(InputSplit)转化为map task的输入。其中InputSplit是map task处理的最小输入单元的逻辑表示。
2)在客户端代码中调用Job类来设置参数,并执行在hadoop集群的上的MapReduce程序。
3)Mapper类在Job中被实例化,并且通过MapContext对象来传递参数设置。可以调用Job.getConfiguration().set(“myKey”, “myVal”)来设置参数。
4)Mapper的run()方法调用了自身的setup()来设置参数。
5)Mapper的run()方法调用map(KeyInType, ValInType, Context)方法来依次处理InputSplit中输入的key/value数据对。并且可以通过Context参数的Context.write(KeyOutType, ValOutType)方法来发射处理的结果。用户程序还可以用Context来设置状态信息等。同时,用户还可以用Job.setCombinerClass(Class)来设置Combiner,将map产生的中间态数据在Mapper本地进行汇聚,从而减少传递给Reducer的数据。
6)Mapper的run()方法调用cleanup()方法。
一些说明:
1)所有的中间结果都会被MapReduce框架自动的分组,然后传递给Reducer(s)去产生最后的结果。用户可以通过Job.setGroupingComparatorClass(Class)来设置Comparator。如果这个Comparator没有被设置,那么所有有一样的key的数据不会被排序。
2)Mapper的结果都是被排序过的,并被划分为R个区块(R是Reducer的个数)。用户可以通过实现自定义的Partitioner类来指定哪些数据被划分给哪个Reducer。
3)中间态的数据往往用(key-len, key, value-len, value)的简单格式存储。用户程序可以指定CompressionCodec来压缩中间数据。
4)Map的数目由输入数据的总大小决定。一般来说,一个计算节点10-100个map任务有较好的并行性。如果cpu的计算量很小,那么平均每个计算节点300个map任务也是可以的。但是每个任务都是需要时间来初始化的,因此每个任务不能划分的太小,至少也要平均一个任务执行个一分钟。可以通过mapreduce.job.maps参数来设置map的数目。更进一步说,任务的数量是有InputFormat.getSplits()方法来控制的,用户可以重写这个方法。
Reducer类说明
Reducer主要分三个步骤:
1)Shuffle洗牌:这步骤是Reducer从相关的Mapper节点获取中间态的数据。
2)Sort排序:在洗牌的同时,Reducer对获取的数据进行排序。在这个过程中,可以用Job.setGroupingComparatorClass(Class)来对同一个key的数据进行Secondary Sort二次排序。
3)Reduce:Reduce的调用顺序和Map差不多,也是通过run()方法调用setup(),reduce(),cleanup()来实现的。Reducer输出的数据是没有经过排序的。
- Reduce 的数目可以通过Job.setNumReduceTasks(int)来设置。一般来说,Reduce的数目是节点数的0.95到1.75倍。
- Reduce的数目也可以设置为0,那么这样map的输出会直接写到文件系统中。
- Reduce中还可以使用Mark-Reset的功能。简而言之就是可以在遍历map产生的中间态的数据的时候可以进行标记,然后在后面适当的时候用reset回到最近标记的位置。当然这是有一点限制的,如下面的例子,必须自己在Reduce方法中对reduce的Iterator重新new 一个MarkableIterator才能使用。
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
MarkableIterator<IntWritable> mitr = new MarkableIterator<IntWritable>(values.iterator());
// Mark the position
mitr.mark();
while (mitr.hasNext()) {
i = mitr.next();
// Do the necessary processing
}
// Reset
mitr.reset();
// Iterate all over again. Since mark was called before the first
// call to mitr.next() in this example, we will iterate over all
// the values now
while (mitr.hasNext()) {
i = mitr.next();
// Do the necessary processing
}
}
Partitioner 对map输出的中间态数据按照reduce的数目进行分区,一般通过hash来进行划分。默认的实现是HashPartitioner。
MapReduce程序可以通过mapper或者reducer的Context来汇报应用程序的状态。
当我们在写MapReduce程序的时候,通常,在main函数里。建立一个Job对象,设置它的JobName,然后配置输入输出路径,设置我们的Mapper类和Reducer类,设置InputFormat和正确的输出类型等等。然后我们会使用job.waitForCompletion()提交到JobTracker,等待job运行并返回,这就是一般的Job设置过程。JobTracker会初始化这个Job,获取输入分片,然后将一个一个的task任务分配给TaskTrackers执行。TaskTracker获取task是通过心跳的返回值得到的,然后TaskTracker就会为收到的task启动一个JVM来运行。
Job其实就是提供配置作业、获取作业配置、以及提交作业的功能,以及跟踪作业进度和控制作业。Job类继承于JobContext类。JobContext提供了获取作业配置的功能,如作业ID,作业的Mapper类,Reducer类,输入格式,输出格式等等,它们除了作业ID之外,都是只读的。 Job类在JobContext的基础上,提供了设置作业配置信息的功能、跟踪进度,以及提交作业的接口和控制作业的方法。
一个Job对象有两种状态,DEFINE和RUNNING,Job对象被创建时的状态时DEFINE,当且仅当Job对象处于DEFINE状态,才可以用来设置作业的一些配置,如Reduce task的数量、InputFormat类、工作的Mapper类,Partitioner类等等,这些设置是通过设置配置信息conf来实现的;当作业通过submit()被提交,就会将这个Job对象的状态设置为RUNNING,这时候作业以及提交了,就不能再设置上面那些参数了,作业处于调度运行阶段。处于RUNNING状态的作业我们可以获取作业、maptask和reduce task的进度,通过代码中的*Progress()获得,这些函数是通过info来获取的,info是RunningJob对象,它是实际在运行的作业的一组获取作业情况的接口,如Progress。
在waitForCompletion()中,首先用submit()提交作业,然后等待info.waitForCompletion()返回作业执行完毕。verbose参数用来决定是否将运行进度等信息输出给用户。submit()首先会检查是否正确使用了new API,这通过setUseNewAPI()检查旧版本的属性是否被设置来实现的,接着就connect()连接JobTracker并提交。实际提交作业的是一个JobClient对象,提交作业后返回一个RunningJob对象,这个对象可以跟踪作业的进度以及含有由JobTracker设置的作业ID。
getCounter()函数是用来返回这个作业的计数器列表的,计数器被用来收集作业的统计信息,比如失败的map task数量,reduce输出的记录数等等。它包括内置计数器和用户定义的计数器,用户自定义的计数器可以用来收集用户需要的特定信息。计数器首先被每个task定期传输到TaskTracker,最后TaskTracker再传到JobTracker收集起来。这就意味着,计数器是全局的。
10、job.setoutputFormat(TextoututFormat)提示出错
解决方法:import org.apache.hadoop.mapred.TextoutputFormat修改为import org.apache.hadoop.mapreduce.lib.output.TextoututFormat
org.apache.hadoop.mapred.TextoutputFormat为旧MapReduce包,新包为org.apache.hadoop.mapreduce.lib.output.TextoutputFormat
11、Mapper类的方法说明
Mapper类有四个方法:setup()、cleanup()、run()、map()。在run方法中调用了上面的三个方法:setup方法,map方法,cleanup方法。
- setup方法和cleanup方法默认是不做任何操作,且它们只被执行一次。setup方法一般会在map函数之前执行一些准备工作,如作业的一些配置信息等;
- cleanup方法则是在map方法运行完之后最后执行 的,该方法是完成一些结尾清理的工作,如:资源释放等。如果需要做一些配置和清理的工作,需要在Mapper/Reducer的子类中进行重写来实现相应的功能。
- map方法会在对应的子类中重新实现,就是我们自定义的map方法。该方法在一个while循环里面,表明该方法是执行很多次的。run方法就是每个maptask调用的方法。
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
/**
* Maps input key/value pairs to a set of intermediate key/value pairs.
*
* <p>Maps are the individual tasks which transform input records into a
* intermediate records. The transformed intermediate records need not be of
* the same type as the input records. A given input pair may map to zero or
* many output pairs.</p>
*
* <p>The Hadoop Map-Reduce framework spawns one map task for each
* {@link InputSplit} generated by the {@link InputFormat} for the job.
* <code>Mapper</code> implementations can access the {@link Configuration} for
* the job via the {@link JobContext#getConfiguration()}.
*
* <p>The framework first calls
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
* {@link #map(Object, Object, Context)}
* for each key/value pair in the <code>InputSplit</code>. Finally
* {@link #cleanup(Context)} is called.</p>
*
* <p>All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.</p>
*
* <p>The <code>Mapper</code> outputs are partitioned per
* <code>Reducer</code>. Users can control which keys (and hence records) go to
* which <code>Reducer</code> by implementing a custom {@link Partitioner}.
*
* <p>Users can optionally specify a <code>combiner</code>, via
* {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
* intermediate outputs, which helps to cut down the amount of data transferred
* from the <code>Mapper</code> to the <code>Reducer</code>.
*
* <p>Applications can specify if and how the intermediate
* outputs are to be compressed and which {@link CompressionCodec}s are to be
* used via the <code>Configuration</code>.</p>
*
* <p>If the job has zero
* reduces then the output of the <code>Mapper</code> is directly written
* to the {@link OutputFormat} without sorting by keys.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class TokenCounterMapper
* extends Mapper<Object, Text, Text, IntWritable>{
*
* private final static IntWritable one = new IntWritable(1);
* private Text word = new Text();
*
* public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
* StringTokenizer itr = new StringTokenizer(value.toString());
* while (itr.hasMoreTokens()) {
* word.set(itr.nextToken());
* context.write(word, one);
* }
* }
* }
* </pre></blockquote></p>
*
* <p>Applications may override the {@link #run(Context)} method to exert
* greater control on map processing e.g. multi-threaded <code>Mapper</code>s
* etc.</p>
*
* @see InputFormat
* @see JobContext
* @see Partitioner
* @see Reducer
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
12、Reducer类说明
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
import java.util.Iterator;
/**
* Reduces a set of intermediate values which share a key to a smaller set of
* values.
*
* <p><code>Reducer</code> implementations
* can access the {@link Configuration} for the job via the
* {@link JobContext#getConfiguration()} method.</p>
* <p><code>Reducer</code> has 3 primary phases:</p>
* <ol>
* <li>
*
* <h4 id="Shuffle">Shuffle</h4>
*
* <p>The <code>Reducer</code> copies the sorted output from each
* {@link Mapper} using HTTP across the network.</p>
* </li>
*
* <li>
* <h4 id="Sort">Sort</h4>
*
* <p>The framework merge sorts <code>Reducer</code> inputs by
* <code>key</code>s
* (since different <code>Mapper</code>s may have output the same key).</p>
*
* <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
* being fetched they are merged.</p>
*
* <h5 id="SecondarySort">SecondarySort</h5>
*
* <p>To achieve a secondary sort on the values returned by the value
* iterator, the application should extend the key with the secondary
* key and define a grouping comparator. The keys will be sorted using the
* entire key, but will be grouped using the grouping comparator to decide
* which keys and values are sent in the same call to reduce.The grouping
* comparator is specified via
* {@link Job#setGroupingComparatorClass(Class)}. The sort order is
* controlled by
* {@link Job#setSortComparatorClass(Class)}.</p>
*
*
* For example, say that you want to find duplicate web pages and tag them
* all with the url of the "best" known example. You would set up the job
* like:
* <ul>
* <li>Map Input Key: url</li>
* <li>Map Input Value: document</li>
* <li>Map Output Key: document checksum, url pagerank</li>
* <li>Map Output Value: url</li>
* <li>Partitioner: by checksum</li>
* <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
* <li>OutputValueGroupingComparator: by checksum</li>
* </ul>
* </li>
*
* <li>
* <h4 id="Reduce">Reduce</h4>
*
* <p>In this phase the
* {@link #reduce(Object, Iterable, Context)}
* method is called for each <code><key, (collection of values)></code> in
* the sorted inputs.</p>
* <p>The output of the reduce task is typically written to a
* {@link RecordWriter} via
* {@link Context#write(Object, Object)}.</p>
* </li>
* </ol>
*
* <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
* Key,IntWritable> {
* private IntWritable result = new IntWritable();
*
* public void reduce(Key key, Iterable<IntWritable> values,
* Context context) throws IOException, InterruptedException {
* int sum = 0;
* for (IntWritable val : values) {
* sum += val.get();
* }
* result.set(sum);
* context.write(key, result);
* }
* }
* </pre></blockquote></p>
*
* @see Mapper
* @see Partitioner
*/
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Reducer} implementations.
*/
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
13、StringUtils类说明
- IsEmpty/IsBlank - checks if a String contains text
- Trim/Strip - removes leading and trailing whitespace
- Equals - compares two strings null-safe
- startsWith - check if a String starts with a prefix null-safe
- endsWith - check if a String ends with a suffix null-safe
- IndexOf/LastIndexOf/Contains - null-safe index-of checks
- IndexOfAny/LastIndexOfAny/IndexOfAnyBut/LastIndexOfAnyBut - index-of any of a set of Strings
- ContainsOnly/ContainsNone/ContainsAny - does String contains only/none/any of these characters
- Substring/Left/Right/Mid - null-safe substring extractions
- SubstringBefore/SubstringAfter/SubstringBetween - substring extraction relative to other strings
- Split/Join - splits a String into an array of substrings and vice versa
- Remove/Delete - removes part of a String
- Replace/Overlay - Searches a String and replaces one String with another
- Chomp/Chop - removes the last part of a String
- LeftPad/RightPad/Center/Repeat - pads a String
- UpperCase/LowerCase/SwapCase/Capitalize/Uncapitalize - changes the case of a String
- CountMatches - counts the number of occurrences of one String in another
- IsAlpha/IsNumeric/IsWhitespace/IsAsciiPrintable - checks the characters in a String
- DefaultString - protects against a null input String
- Reverse/ReverseDelimited - reverses a String
- Abbreviate - abbreviates a string using ellipsis
- Difference - compares Strings and reports on their differences
- LevensteinDistance - the number of changes needed to change one String into another
14、StringBuilder类说明
String对象是不可改变的。每次使用 System.String类中的方法之一时,都要在内存中创建一个新的字符串对象,这就需要为该新对象分配新的空间。在需要对字符串执行重复修改的情况下,与创建新的 String对象相关的系统开销可能会非常昂贵。如果要修改字符串而不创建新的对象,则可以使用System.Text.StringBuilder类。例如,当在一个循环中将许多字符串连接在一起时,使用 StringBuilder类可以提升性能。
通过用一个重载的构造函数方法初始化变量,可以创建 StringBuilder类的新实例,正如以下示例中所阐释的那样。
StringBuilder MyStringBuilder = new StringBuilder("Hello World!");
(一)设置容量和长度
虽然 StringBuilder对象是动态对象,允许扩充它所封装的字符串中字符的数量,但是您可以为它可容纳的最大字符数指定一个值。此值称为该对象的容量,不应将它与当前 StringBuilder对象容纳的字符串长度混淆在一起。例如,可以创建 StringBuilder类的带有字符串“Hello”(长度为 5)的一个新实例,同时可以指定该对象的最大容量为 25。当修改 StringBuilder时,在达到容量之前,它不会为其自己重新分配空间。当达到容量时,将自动分配新的空间且容量翻倍。可以使用重载的构造函数之一来指定 StringBuilder类的容量。以下代码示例指定可以将 MyStringBuilder对象扩充到最大 25个空白。
StringBuilderMyStringBuilder = new StringBuilder("Hello World!", 25);
另外,可以使用读/写 Capacity属性来设置对象的最大长度。以下代码示例使用 Capacity属性来定义对象的最大长度。
MyStringBuilder.Capacity= 25;
(二)下面列出了此类的几个常用方法:
(1)Append 方法可用来将文本或对象的字符串表示形式添加到由当前 StringBuilder对象表示的字符串的结尾处。以下示例将一个 StringBuilder对象初始化为“Hello World”,然后将一些文本追加到该对象的结尾处。将根据需要自动分配空间。
StringBuilderMyStringBuilder = new StringBuilder("Hello World!");
MyStringBuilder.Append(" What a beautiful day.");
Console.WriteLine(MyStringBuilder);
此示例将 Hello World! What abeautiful day.显示到控制台。
(2)AppendFormat 方法将文本添加到 StringBuilder的结尾处,而且实现了 IFormattable接口,因此可接受格式化部分中描述的标准格式字符串。可以使用此方法来自定义变量的格式并将这些值追加到 StringBuilder的后面。以下示例使用 AppendFormat方法将一个设置为货币值格式的整数值放置到 StringBuilder的结尾。
int MyInt= 25;
StringBuilder MyStringBuilder = new StringBuilder("Your total is ");
MyStringBuilder.AppendFormat("{0:C} ", MyInt);
Console.WriteLine(MyStringBuilder);
此示例将 Your total is $25.00显示到控制台。
(3)Insert 方法将字符串或对象添加到当前 StringBuilder中的指定位置。以下示例使用此方法将一个单词插入到 StringBuilder的第六个位置。
StringBuilderMyStringBuilder = new StringBuilder("Hello World!");
MyStringBuilder.Insert(6,"Beautiful ");
Console.WriteLine(MyStringBuilder);
此示例将 Hello BeautifulWorld!显示到控制台。
(4)可以使用 Remove方法从当前 StringBuilder中移除指定数量的字符,移除过程从指定的从零开始的索引处开始。以下示例使用 Remove方法缩短 StringBuilder。
StringBuilderMyStringBuilder = new StringBuilder("Hello World!");
MyStringBuilder.Remove(5,7);
Console.WriteLine(MyStringBuilder);
此示例将 Hello显示到控制台。
(5)使用 Replace方法,可以用另一个指定的字符来替换 StringBuilder对象内的字符。以下示例使用 Replace方法来搜索 StringBuilder对象,查找所有的感叹号字符 (!),并用问号字符 (?)来替换它们。
StringBuilderMyStringBuilder = new StringBuilder("Hello World!");
MyStringBuilder.Replace(‘!‘, ‘?‘);
Console.WriteLine(MyStringBuilder);
此示例将 Hello World?显示到控制台
getSqlMapClientTemplate().queryForList((new StringBuilder()).append(entityClass.getName()).append(".select").toString(), null);
Java的StringBuilder类
如果程序对附加字符串的需求很频繁,不建议使用+来进行字符串的串联。可以考虑使用java.lang.StringBuilder 类,使用这个类所产生的对象默认会有16个字符的长度,您也可以自行指定初始长度。如果附加的字符超出可容纳的长度,则StringBuilder 对象会自动增加长度以容纳被附加的字符。如果有频繁作字符串附加的需求,使用StringBuilder 类能使效率大大提高。如下代码:
Java代码
public class AppendStringTest
{
public static void main(String[] args)
{
String text = "" ;
long beginTime = System.currentTimeMillis();
for ( int i= 0 ;i< 10000 ;i++)
text = text + i;
long endTime = System.currentTimeMillis();
System.out.println("执行时间:" +(endTime-beginTime));
StringBuilder sb = new StringBuilder ( "" );
beginTime = System.currentTimeMillis();
for ( int i= 0 ;i< 10000 ;i++)
sb.append(String.valueOf(i));
endTime = System.currentTimeMillis();
System.out.println("执行时间:" +(endTime-beginTime));
}
}
public class AppendStringTest
{
public static void main(String[] args)
{
String text = "";
long beginTime = System.currentTimeMillis();
for(int i=0;i<10000;i++)
text = text + i;
long endTime = System.currentTimeMillis();
System.out.println("执行时间:"+(endTime-beginTime));
StringBuilder sb = new StringBuilder ("");
beginTime = System.currentTimeMillis();
for(int i=0;i<10000;i++)
sb.append(String.valueOf(i));
endTime = System.currentTimeMillis();
System.out.println("执行时间:"+(endTime-beginTime));
}
} 此段代码输出:
执行时间:3188
执行时间:15
StringBuilder 是j2se1.5.0才新增的类,在此之前的版本若有相同的需求,则使用java.util.StringBuffer。事实上StringBuilder 被设计为与StringBuffer具有相同的操作接口。在单机非线程(MultiThread)的情况下使用StringBuilder 会有较好的效率,因为StringBuilder 没有处理同步的问题。StringBuffer则会处理同步问题,如果StringBuilder 会有多线程下被操作,则要改用StringBuffer,让对象自行管理同步问题。
15、jar包执行出错,提示“java.lang.NoSuch Method Exception:grep.Grep$Grep Mapper.<init>()”
错误原因:Grep为inner class,在代码中缺少static
解决方法:在class申明中,增加static
16、Mapper<LongWritable, Text>.Context context修改为Context context
17、combiner设置
- 在run()函数中,设置job.setCombinerClass(xxSumReducer)
- xxSumReducer可以设置FieldSelectionReducer, IntSumReducer, LongSumReducer, SecondarySort.Reduce, WordCount.IntSumReducer
18、在Eclipse中编译jar包,提示警告“compile Java 警告. [options]未与 -source1.7 一起设置引导类路径”
错误原因:代码从其他机器上拷贝过来执行。这个是因为使用的JDK不是1.6的版本,假如使用的是1.7的版本或更高的版本就会出现这个警告。
解决办法:generate JavaDoc在生成Doc文档时,先选择好要生成文档的文件以及生成的路径,单击【Next】,在这个界面下给文档取个名字(Document title),然后单击【Next】。然后,在这个界面中找到:“JRE source compatibility:”这个选项,选择你所用的JDK版本(如:1.7),最后单击【Finish】就OK了。
不处理该警告,jar包也可以正常执行。
19、partitionv中hashcode含义
hash code是一种编码方式,在Java中,每个对象都会有一个hashcode,Java可以通过这个hashcode来识别一个对象。至于hashcode的具体编码方式,比较复杂(事实上这个编码是可以由程序员通过继承和接口的实现重写的),可以参考数据结构书籍。而hashtable等结构,就是通过这个哈希实现快速查找键对象。这是他们的内部联系,但一般编程时无需了解这些,只要知道hashtable实现了一种无顺序的元素排列就可以了。
两个对象值相同(x.equals(y) == true),则一定有相同的hash code。
因为:Hash,一般翻译做“散列”,也有直接音译为"哈希"的,就是把任意长度的输入(又叫做预映射, pre-image),通过散列算法,变换成固定长度的输出,该输出就是散列值。这种转换是一种压缩映射,也就是,散列值的空间通常远小于输入的空间,不同的输入可能会散列成相同的输出,而不可能从散列值来唯一的确定输入值。
public int hashCode() {
return super.hashCode();
}
public int hashCode() {
return WritableComparator.hashBytes(getBytes(), getLength());
}
/** Compute hash for binary data. */
public static int hashBytes(byte[] bytes, int length) {
return hashBytes(bytes, 0, length);
}
/** Compute hash for binary data. */
public static int hashBytes(byte[] bytes, int offset, int length) {
int hash = 1;
for (int i = offset; i < offset + length; i++)
hash = (31 * hash) + (int)bytes[i];
return hash;
}
20、partition的getpartitioner()的方法
1)如果自定义partitioner,需要在run()方法中设定job.setPartitionClass(xx.class)。
2)再创建partitioner的子类,继承getPartitioner()方法。
3)在getPartitioner()方法中,实现自定义partition。
配置total oerder partition方法:
job.setPartitionerClass(TotalOrderPartitioner.class); //在job中配置Partitioner类
InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 200, 3); //创建采样器
InputSampler.writePartitionFile(job, sampler);
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
job.addCacheArchive(partitionUri);
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
final InputFormat inf =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = (K[])sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator = (RawComparator<K>) job.getSortComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k = Math.round(stepSize * i);
while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
22、Eclipse创建helloworld
1)点击菜单,file - new - project - Java - Java Project
2)填写project name - next
3)在package explorer中,展开src,右键new - class
4)创建带有main的类
23、注意,对于Text()的对象,取值没有get()方法,而是用toString()。
24、totalOrderPartition中,setInputFormatClass()用KeyValueTextInputFormat.class。
说明:
TextInputFormat
用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置(偏移量,LongWritable类型),value是每一行的内容,Text类型。
KeyValueTextInputFormat
同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;如果没有分隔符,整行作为 key,value为空
SequenceFileInputFormat
用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;
SequenceFileAsTextInputFormat,将key和value以 Text的类型读出。
SequenceFileInputFilter
根据filter从sequence文件中取得部分满足条件的数据,通过setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;PercentFilter通过指定参数f,取记录行数%f==0的记录;MD5Filter通过指定参数f,取MD5(key)%f==0的记录。
NLineInputFormat
0.18.x新加入,可以将文件以行为单位进行split,比如文件的每一行对应一个map。得到的key是每一行的位置(偏移量,LongWritable类型),value是每一行的内容,Text类型。
CompositeInputFormat,用于多个数据源的join。
TextOutputFormat,输出到纯文本文件,格式为 key + ” ” + value。
NullOutputFormat,hadoop中的/dev/null,将输出送进黑洞。
SequenceFileOutputFormat, 输出到sequence file格式文件。
MultipleSequenceFileOutputFormat, MultipleTextOutputFormat,根据key将记录输出到不同的文件。
DBInputFormat和DBOutputFormat,从DB读取,输出到DB,预计将在0.19版本加入。
25、secondary sort实现说明
主要步骤:
1)【排序】定制key class,实现WritableComparable,作为Map outputKey。重点需要实现compareTo()方法。
key class必须实现WritableComparable,这需要定义compareTo方法。
如果没有定制key class,将无法实现部分outputvalue参与排序过程。
public int compareTo(Stock arg0) {
// TODO Auto-generated method stub
int response = this.symbol.compareTo(arg0.symbol);
if(response==0){
response = this.date.compareTo(arg0.date);
}
return response;
}
Compares this object with the specified object for order. Returns a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than the specified object.
The implementor must ensure sgn(x.compareTo(y)) == -sgn(y.compareTo(x)) for all x and y. (This implies that x.compareTo(y) must throw an exception iff y.compareTo(x) throws an exception.)
The implementor must also ensure that the relation is transitive: (x.compareTo(y)>0 && y.compareTo(z)>0) implies x.compareTo(z)>0.
Finally, the implementor must ensure that x.compareTo(y)==0 implies that sgn(x.compareTo(z)) == sgn(y.compareTo(z)), for all z.
It is strongly recommended, but not strictly required that (x.compareTo(y)==0) == (x.equals(y)). Generally speaking, any class that implements the Comparable interface and violates this condition should clearly indicate this fact. The recommended language is "Note: this class has a natural ordering that is inconsistent with equals."
In the foregoing description, the notation sgn(expression) designates the mathematical signum function, which is defined to return one of -1, 0, or 1 according to whether the value of expression is negative, zero or positive.
2)【分区】定制custom partition,实现按照key class的部分属性进行分区。
char firstLetter = key.getSymbol().trim().charAt(0);
return (firstLetter - ‘A‘) % numPartitions;
在run()中需要增加 job.setPartitionerClass(xxx.class);
如果没有定制custom partition,将使用默认hashpartition,将无法实现按照outputKey进行group。
3)【合并】编写定制 grouping comparator class,继承WritableComparator。重点实现compare()方法,确定两个key class如何实现group
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
Stock lhs = (Stock) a;
Stock rhs = (Stock) b;
return lhs.getSymbol().compareTo(rhs.getSymbol());
}
在run()中需要增加 job.setGroupingComparatorClass(xx.class);
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueGroupingComparator(cls);
}
setGroupingComparatorClass说明:Define the comparator that controls which keys are grouped together for a single call to Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
void org.apache.hadoop.mapred.JobConf.setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)
Set the user defined RawComparator comparator for grouping keys in the input to the reduce.
This comparator should be provided if the equivalence rules for keys for sorting the intermediates are
different from those for grouping keys before each call to Reducer.reduce(Object, java.util.Iterator,
OutputCollector, Reporter).
For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed in a single call to the reduce
function if K1 and K2 compare as equal.
Since setOutputKeyComparatorClass(Class) can be used to control how keys are sorted, this can be used
in conjunction to simulate secondary sort on values.
Note: This is not a guarantee of the reduce sort being stable in any sense. (In any case, with the order of
available map-outputs to the reduce being non-deterministic, it wouldn‘t make that much sense.)
Parameters:
theClass the comparator class to be used for grouping keys. It should implement RawComparator.
See Also:
setOutputKeyComparatorClass(Class)
setCombinerKeyGroupingComparator(Class)
4)定义Reducer的outputValue class,实现Writable虚类,重点实现write()、readFields()、toString()等方法。
26、执行jar包错误
15/12/01 03:27:34 INFO mapreduce.Job: Task Id : attempt_1447915042936_0028_r_000000_0, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "dividends"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)
at java.lang.Double.parseDouble(Double.java:540)
at customSort.Customsort$CustomReducer.reduce(Customsort.java:46)
at customSort.Customsort$CustomReducer.reduce(Customsort.java:35)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
错误原因:格式转换错误,提示输入数据有“dividends”这样的英文字符串,无法格式转换。
解决方法:代码中有如下内容,String类型对比不能直接使用!=,而应该使用equals()方法。
String [] words = StringUtils.split(value.toString(), ‘,‘);
if(words[0]!="exchange”) //需要修改为words[0].equals(“exchange")
java中判断字符串是否相等有两种方法:
1、用“==”运算符,该运算符表示指向字符串的引用是否相同,比如: String a="abc";String b="abc",那么a==b将返回true。这是因为在java中字符串的值是不可改变的,相同的字符串在内存中只会存一份,所以a和b指向的是同一个对象;再比如:String a=new String("abc"); String b=new String("abc");那么a==b将返回false,因为a和b指向不同的对象。
2、用equals方法,该方法比较的是字符串的内容是否相同,比如:String a=new String("abc"); String b=new String("abc"); a.equals(b);将返回true。所以通常情况下,为了避免出现上述问题,判断字符串是否相等使用equals方法。
equals()方法说明:
/**
* Compares this string to the specified object. The result is {@code
* true} if and only if the argument is not {@code null} and is a {@code
* String} object that represents the same sequence of characters as this
* object.
*
* @param anObject
* The object to compare this {@code String} against
*
* @return {@code true} if the given object represents a {@code String}
* equivalent to this string, {@code false} otherwise
*
* @see #compareTo(String)
* @see #equalsIgnoreCase(String)
*/
public boolean equals(Object anObject) {
if (this == anObject) {
return true;
}
if (anObject instanceof String) {
String anotherString = (String)anObject;
int n = value.length;
if (n == anotherString.value.length) {
char v1[] = value;
char v2[] = anotherString.value;
int i = 0;
while (n-- != 0) {
if (v1[i] != v2[i])
return false;
i++;
}
return true;
}
}
return false;
}
27、执行jar包出错,namenode为safemode状态
[[email protected] jar]# yarn jar customsort.jar /xuefei/dividends /xuefei/output
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot delete /xuefei/output. Name node is in safe mode.
Resources are low on NN. Please add or free up more resources then turn off safe mode manually. NOTE: If you turn off safe mode before adding resources, the NN will immediately return to safe mode. Use "hdfs dfsadmin -safemode leave" to turn safe mode off.
错误原因:磁盘空间不足,导致namenode无法写文件(原因之一)
解决方法:
1)尝试执行
[[email protected] jar]# hdfs dfsadmin -safemode leave
safemode: Access denied for user root. Superuser privilege is required
2)删除大文件,清理磁盘,尝试重启后恢复
28、Java的this用法
/**
* @author fengzhi-neusoft
*
* 本示例为了说明this的三种用法!
*/
package test;
public class ThisTest {
private int i=0;
//第一个构造器:有一个int型形参
ThisTest(int i){
this.i=i+1;//此时this表示引用成员变量i,而非函数参数i
System.out.println("Int constructor i——this.i: "+i+"——"+this.i);
System.out.println("i-1:"+(i-1)+"this.i+1:"+(this.i+1));
//从两个输出结果充分证明了i和this.i是不一样的!
}
// 第二个构造器:有一个String型形参
ThisTest(String s){
System.out.println("String constructor: "+s);
}
// 第三个构造器:有一个int型形参和一个String型形参
ThisTest(int i,String s){
this(s);//this调用第二个构造器
//this(i);
/*此处不能用,因为其他任何方法都不能调用构造器,只有构造方法能调用他。
但是必须注意:就算是构造方法调用构造器,也必须为于其第一行,构造方法也只能调
用一个且仅一次构造器!*/
this.i=i++;//this以引用该类的成员变量
System.out.println("Int constructor: "+i+"/n"+"String constructor: "+s);
}
public ThisTest increment(){
this.i++;
return this;//返回的是当前的对象,该对象属于(ThisTest)
}
public static void main(String[] args){
ThisTest tt0=new ThisTest(10);
ThisTest tt1=new ThisTest("ok");
ThisTest tt2=new ThisTest(20,"ok again!");
System.out.println(tt0.increment().increment().increment().i);
//tt0.increment()返回一个在tt0基础上i++的ThisTest对象,
//接着又返回在上面返回的对象基础上i++的ThisTest对象!
}
}
运行结果:
Int constructor i——this.i: 10——11
String constructor: ok
String constructor: ok again!
Int constructor: 21
String constructor: ok again!
14
细节问题注释已经写的比较清楚了,这里不在赘述,只是总结一下,其实this主要要三种用法:
1、表示对当前对象的引用!
2、表示用类的成员变量,而非函数参数,注意在函数参数和成员变量同名是进行区分!其实这是第一种用法的特例,比较常用,所以那出来强调一下。
3、用于在构造方法中引用满足指定参数类型的构造器(其实也就是构造方法)。但是这里必须非常注意:只能引用一个构造方法且必须位于开始!
还有就是注意:this不能用在static方法中!所以甚至有人给static方法的定义就是:没有this的方法!虽然夸张,但是却充分说明this不能在static方法中使用!
29、执行jar包出错
15/12/03 23:41:28 INFO mapreduce.Job: Task Id : attempt_1449157066412_0001_r_000000_0, Status : FAILED
Error: java.lang.NullPointerException at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:158)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
错误原因:WritableComparator类没有重构构造函数
解决方法:重构构造函数,在类中增加如下代码
public CustomCompator() {
super(Stock.class, true);
// TODO Auto-generated constructor stub
}
30、NullWritable类说明
NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。
31、InputFormat说明
1)继承RecordReader类,重点实现initialize()、nextKeyValue()、getCurrentKey()、getCurrentValue()、getProgress()等方法
RecordRead类:The record reader breaks the data into key/value pairs for input to the Mapper。相关方法被mapper调用,获取key/value作为mapper输入
- initialize() Called once at initialization.
- nextKeyValue() Read the next key, value pair, return true if a key/value pair was read
- getCurrentKey() Get the current value, return the current key or null if there is no current key
- getCurrentValue() Get the current value, return the object that was read
- getProgress() The current progress of the record reader through its data, return a number between 0.0 and 1.0 that is the fraction of the data read
- close() close the record reader
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
2)继承FileInputFormat类,重点实现createRecordReader()方法
Create a record reader for a given split. The framework will call RecordReader.initialize(InputSplit, TaskAttemptContext) before the split is used.
Overrides: createRecordReader(...) in InputFormat
Parameters:
split the split to be read
context the information about the task
Returns:
a new record reader
Throws:
IOException
InterruptedException
3)设置job.setInputFormatClass(xx)
32、outputFormatClass为sequenceFileOutputFormat,查看class出错
[[email protected] jar]# hdfs dfs -text /xuefei/output/custominput/part-r-00000
-text: Fatal internal error
java.lang.RuntimeException: java.io.IOException: WritableName can‘t load class: customInput.Stock
at org.apache.hadoop.io.SequenceFile$Reader.getKeyClass(SequenceFile.java:2016)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1947)
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1813)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1762)
错误原因:sequenceFileOutputFormat不可以直接查看,需要load相关类
解决方法:把bin目录下的class文件放到hadoop classpath中
33、CombineFileSplit类说明
A sub-collection of input files. Unlike FileSplit, CombineFileSplit class does not represent a split of a file, but a split of input files into smaller sets. A split may contain blocks from different file but all the blocks in the same split are probably local to some rack .
CombineFileSplit can be used to implement RecordReader‘s, with reading one record per file.
34、LineRecordReader类说明
Treats keys as offset in file and value as line.
35、MultipleInputs类说明
This class supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path
有以下4个方法:
- public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass)
- public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)
- static Map<Path, InputFormat> getInputFormatMap(JobContext job)
- static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobContext job)
36、下面几个区别
implements WritableComparable
implement Writable
implement Comparable
extends WritableComparator
extends Comparator
简单说:
1.extends是继承父类,只要那个类不是声明为final或者那个类定义为abstract的就能继承,
2.JAVA中不支持多重继承(一个类可以同时继承多个父类的行为和特征功能),但是可以用接口来实现,这样就要用到implements,
3.继承只能继承一个类,但implements可以实现多个接口,用逗号分开就行了 ,
比如 class A extends B implements C,D,E
术语话来说:
extends 继承类;implements 实现接口。
类和接口是不同的:类里是有程序实现的;而接口无程序实现,只可以预定义方法
Java也提供继承机制﹐但还另外提供一个叫interface的概念。由于Java的继承机制只能提供单一继承(就是只能继承一种父类别)﹐所以就以Java的interface来代替C++的多重继承。interface就是一种介面﹐规定欲沟通的两物件﹐其通讯该有的规范有哪些。
如以Java程式语言的角度来看﹐Java的interface则表示:
一些函数或资料成员为另一些属于不同类别的物件所需共同拥有,则将这些函数与资料成员定义在一个interface中,然后让所有不同类别的Java物件可以共同操作使用之。
Java的class只能继承一个父类别(用extends关键字), 但可以拥有(或称实作)许多interface(用implements关键字)。
extends和implements有什么不同?
对于class而言,extends用于(单)继承一个类(class),而implements用于实现一个接口(interface)。
interface的引入是为了部分地提供多继承的功能。在interface中只需声明方法头,而将方法体留给实现的class来做。 这些实现的class的实例完全可以当作interface的实例来对待。 在interface之间也可以声明为extends(多继承)的关系。
注意: 一个interface可以extends多个其他interface。
WritableComparable
org.apache.hadoop.io.Writable
@Public
@Stable
A serializable object which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput.
Any key or value type in the Hadoop Map-Reduce framework implements this interface.
Implementations typically implement a static read(DataInput) method which constructs a new instance, calls readFields(DataInput) and returns the instance.
Example:
public class MyWritable implements Writable {
// Some data
private int counter;
private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}
}
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
writable
org.apache.hadoop.io.Writable
@Public
@Stable
A serializable object which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput.
Any key or value type in the Hadoop Map-Reduce framework implements this interface.
Implementations typically implement a static read(DataInput) method which constructs a new instance, calls readFields(DataInput) and returns the instance.
Example:
public class MyWritable implements Writable {
// Some data
private int counter;
private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public static MyWritable read(DataInput in) throws IOException {
MyWritable w = new MyWritable();
w.readFields(in);
return w;
}
}
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
Comparable
java.lang.Comparable<Stock>
This interface imposes a total ordering on the objects of each class that implements it. This ordering is referred to as the class‘s natural ordering, and the class‘s compareTo method is referred to as its natural comparison method.
Lists (and arrays) of objects that implement this interface can be sorted automatically by Collections.sort (and Arrays.sort). Objects that implement this interface can be used as keys in a sorted map or as elements in a sorted set, without the need to specify a comparator.
The natural ordering for a class C is said to be consistent with equals if and only if e1.compareTo(e2) == 0 has the same boolean value as e1.equals(e2) for every e1 and e2 of class C. Note that null is not an instance of any class, and e.compareTo(null) should throw a NullPointerException even though e.equals(null) returns false.
It is strongly recommended (though not required) that natural orderings be consistent with equals. This is so because sorted sets (and sorted maps) without explicit comparators behave "strangely" when they are used with elements (or keys) whose natural ordering is inconsistent with equals. In particular, such a sorted set (or sorted map) violates the general contract for set (or map), which is defined in terms of the equals method.
For example, if one adds two keys a and b such that (!a.equals(b) && a.compareTo(b) == 0) to a sorted set that does not use an explicit comparator, the second add operation returns false (and the size of the sorted set does not increase) because a and b are equivalent from the sorted set‘s perspective.
Virtually all Java core classes that implement Comparable have natural orderings that are consistent with equals. One exception is java.math.BigDecimal, whose natural ordering equates BigDecimal objects with equal values and different precisions (such as 4.0 and 4.00).
For the mathematically inclined, the relation that defines the natural ordering on a given class C is:
{(x, y) such that x.compareTo(y) <= 0}.
The quotient for this total order is:
{(x, y) such that x.compareTo(y) == 0}.
It follows immediately from the contract for compareTo that the quotient is an equivalence relation on C, and that the natural ordering is a total order on C. When we say that a class‘s natural ordering is consistent with equals, we mean that the quotient for the natural ordering is the equivalence relation defined by the class‘s equals(Object) method:
{(x, y) such that x.equals(y)}.
This interface is a member of the Java Collections Framework.
Type Parameters:
<T> the type of objects that this object may be compared to
Since:
1.2
Author:
Josh Bloch
See Also:
java.util.Comparator
public interface Comparable<T> {
public int compareTo(T o);
}
WritableComparator
org.apache.hadoop.io.WritableComparator
@Public
@Stable
A Comparator for WritableComparables.
This base implemenation uses the natural ordering. To define alternate orderings, override compare(WritableComparable, WritableComparable).
One may optimize compare-intensive operations by overriding compare(byte [], int, int, byte [], int, int). Static utility methods are provided to assist in optimized implementations of this method.
public class WritableComparator implements RawComparator {
private static final ConcurrentHashMap<Class, WritableComparator> comparators
= new ConcurrentHashMap<Class, WritableComparator>(); // registry
/** Get a comparator for a {@link WritableComparable} implementation. */
public static WritableComparator get(Class<? extends WritableComparable> c) {
WritableComparator comparator = comparators.get(c);
if (comparator == null) {
// force the static initializers to run
forceInit(c);
// look to see if it is defined now
comparator = comparators.get(c);
// if not, use the generic one
if (comparator == null) {
comparator = new WritableComparator(c, true);
}
}
return comparator;
}
/**
* Force initialization of the static members.
* As of Java 5, referencing a class doesn‘t force it to initialize. Since
* this class requires that the classes be initialized to declare their
* comparators, we force that initialization to happen.
* @param cls the class to initialize
*/
private static void forceInit(Class<?> cls) {
try {
Class.forName(cls.getName(), true, cls.getClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Can‘t initialize class " + cls, e);
}
}
/** Register an optimized comparator for a {@link WritableComparable}
* implementation. Comparators registered with this method must be
* thread-safe. */
public static void define(Class c, WritableComparator comparator) {
comparators.put(c, comparator);
}
private final Class<? extends WritableComparable> keyClass;
private final WritableComparable key1;
private final WritableComparable key2;
private final DataInputBuffer buffer;
protected WritableComparator() {
this(null);
}
/** Construct for a {@link WritableComparable} implementation. */
protected WritableComparator(Class<? extends WritableComparable> keyClass) {
this(keyClass, false);
}
protected WritableComparator(Class<? extends WritableComparable> keyClass,
boolean createInstances) {
this.keyClass = keyClass;
if (createInstances) {
key1 = newKey();
key2 = newKey();
buffer = new DataInputBuffer();
} else {
key1 = key2 = null;
buffer = null;
}
}
/** Returns the WritableComparable implementation class. */
public Class<? extends WritableComparable> getKeyClass() { return keyClass; }
/** Construct a new {@link WritableComparable} instance. */
public WritableComparable newKey() {
return ReflectionUtils.newInstance(keyClass, null);
}
/** Optimization hook. Override this to make SequenceFile.Sorter‘s scream.
*
* <p>The default implementation reads the data into two {@link
* WritableComparable}s (using {@link
* Writable#readFields(DataInput)}, then calls {@link
* #compare(WritableComparable,WritableComparable)}.
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2); // compare them
}
/** Compare two WritableComparables.
*
* <p> The default implementation uses the natural ordering, calling {@link
* Comparable#compareTo(Object)}. */
@SuppressWarnings("unchecked")
public int compare(WritableComparable a, WritableComparable b) {
return a.compareTo(b);
}
@Override
public int compare(Object a, Object b) {
return compare((WritableComparable)a, (WritableComparable)b);
}
/** Lexicographic order of binary data. */
public static int compareBytes(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
}
/** Compute hash for binary data. */
public static int hashBytes(byte[] bytes, int offset, int length) {
int hash = 1;
for (int i = offset; i < offset + length; i++)
hash = (31 * hash) + (int)bytes[i];
return hash;
}
/** Compute hash for binary data. */
public static int hashBytes(byte[] bytes, int length) {
return hashBytes(bytes, 0, length);
}
/** Parse an unsigned short from a byte array. */
public static int readUnsignedShort(byte[] bytes, int start) {
return (((bytes[start] & 0xff) << 8) +
((bytes[start+1] & 0xff)));
}
/** Parse an integer from a byte array. */
public static int readInt(byte[] bytes, int start) {
return (((bytes[start ] & 0xff) << 24) +
((bytes[start+1] & 0xff) << 16) +
((bytes[start+2] & 0xff) << 8) +
((bytes[start+3] & 0xff)));
}
/** Parse a float from a byte array. */
public static float readFloat(byte[] bytes, int start) {
return Float.intBitsToFloat(readInt(bytes, start));
}
/** Parse a long from a byte array. */
public static long readLong(byte[] bytes, int start) {
return ((long)(readInt(bytes, start)) << 32) +
(readInt(bytes, start+4) & 0xFFFFFFFFL);
}
/** Parse a double from a byte array. */
public static double readDouble(byte[] bytes, int start) {
return Double.longBitsToDouble(readLong(bytes, start));
}
/**
* Reads a zero-compressed encoded long from a byte array and returns it.
* @param bytes byte array with decode long
* @param start starting index
* @throws java.io.IOException
* @return deserialized long
*/
public static long readVLong(byte[] bytes, int start) throws IOException {
int len = bytes[start];
if (len >= -112) {
return len;
}
boolean isNegative = (len < -120);
len = isNegative ? -(len + 120) : -(len + 112);
if (start+1+len>bytes.length)
throw new IOException(
"Not enough number of bytes for a zero-compressed integer");
long i = 0;
for (int idx = 0; idx < len; idx++) {
i = i << 8;
i = i | (bytes[start+1+idx] & 0xFF);
}
return (isNegative ? (i ^ -1L) : i);
}
/**
* Reads a zero-compressed encoded integer from a byte array and returns it.
* @param bytes byte array with the encoded integer
* @param start start index
* @throws java.io.IOException
* @return deserialized integer
*/
public static int readVInt(byte[] bytes, int start) throws IOException {
return (int) readVLong(bytes, start);
}
}
Comparator
org.apache.hadoop.io.Text.Comparator
/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
}
37、Map side Join说明(replicated Join)
步骤:
1)把小数据集放在LocalResource
job.addCacheFile(new URI(“afile.txt”));
job.addCacheArchive(new URI(“helper.zip”));
2)在mapper的setup()方法中,获取小数据集,并加载到内存中(使用hashmap、treemap等数据结构)
3)当大数据集记录被map()方法读取时,使用小数据集进行匹配过滤
4)如果匹配成功,把两个数据集的记录拼接,并作为output输出
LocalResource表示运行container需要的文件、库,典型的LocalResource有:运行container所需要的jar包、container运行前所需要的配置(远程服务URLs、程序配置等)、静态字典文件等。NodeManager负责在运行container之前,Localize相关资源。
38、Hash Map说明
了解HashMap之前,我们需要知道Object类的两个方法hashCode和equals,我们先来看一下这两个方法的默认实现:
[java] view plaincopyprint?在CODE上查看代码片派生到我的代码片
/** JNI,调用底层其它语言实现 */
public native int hashCode();
/** 默认同==,直接比较对象 */
public boolean equals(Object obj) {
return (this == obj);
}
equals方法我们太熟悉了,我们经常用于字符串比较,String类中重写了equals方法,比较的是字符串值,看一下源码实现:
[java] view plaincopyprint?在CODE上查看代码片派生到我的代码片
public boolean equals(Object anObject) {
if (this == anObject) {
return true;
}
if (anObject instanceof String) {
String anotherString = (String) anObject;
int n = value.length;
if (n == anotherString.value.length) {
char v1[] = value;
char v2[] = anotherString.value;
int i = 0;
// 逐个判断字符是否相等
while (n-- != 0) {
if (v1[i] != v2[i])
return false;
i++;
}
return true;
}
}
return false;
}
重写equals要满足几个条件:
自反性:对于任何非空引用值 x,x.equals(x) 都应返回 true。
对称性:对于任何非空引用值 x 和 y,当且仅当 y.equals(x) 返回 true 时,x.equals(y) 才应返回 true。
传递性:对于任何非空引用值 x、y 和 z,如果 x.equals(y) 返回 true,并且 y.equals(z) 返回 true,那么 x.equals(z) 应返回 true。
一致性:对于任何非空引用值 x 和 y,多次调用 x.equals(y) 始终返回 true 或始终返回 false,前提是对象上 equals 比较中所用的信息没有被修改。
对于任何非空引用值 x,x.equals(null) 都应返回 false。
Object 类的 equals 方法实现对象上差别可能性最大的相等关系;即,对于任何非空引用值 x 和 y,当且仅当 x 和 y 引用同一个对象时,此方法才返回 true(x == y 具有值 true)。 当此方法被重写时,通常有必要重写 hashCode 方法,以维护 hashCode 方法的常规协定,该协定声明相等对象必须具有相等的哈希码。
下面来说说hashCode方法,这个方法我们平时通常是用不到的,它是为哈希家族的集合类框架(HashMap、HashSet、HashTable)提供服务,hashCode 的常规协定是:
在 Java 应用程序执行期间,在同一对象上多次调用 hashCode 方法时,必须一致地返回相同的整数,前提是对象上 equals 比较中所用的信息没有被修改。从某一应用程序的一次执行到同一应用程序的另一次执行,该整数无需保持一致。
如果根据 equals(Object) 方法,两个对象是相等的,那么在两个对象中的每个对象上调用 hashCode 方法都必须生成相同的整数结果。
以下情况不 是必需的:如果根据 equals(java.lang.Object) 方法,两个对象不相等,那么在两个对象中的任一对象上调用 hashCode 方法必定会生成不同的整数结果。但是,程序员应该知道,为不相等的对象生成不同整数结果可以提高哈希表的性能。
当我们看到实现这两个方法有这么多要求时,立刻凌乱了,幸好有IDE来帮助我们,Eclipse中可以通过快捷键alt+shift+s调出快捷菜单,选择Generate hashCode() and equals(),根据业务需求,勾选需要生成的属性,确定之后,这两个方法就生成好了,我们通常需要在JavaBean对象中重写这两个方法。
好了,这两个方法介绍完之后,我们回到HashMap。HashMap是最常用的集合类框架之一,它实现了Map接口,所以存储的元素也是键值对映射的结构,并允许使用null值和null键,其内元素是无序的,如果要保证有序,可以使用LinkedHashMap。HashMap是线程不安全的,下篇文章会讨论。HashMap的类结构如下:
java.util
类 HashMap<K,V>
java.lang.Object
继承者 java.util.AbstractMap<K,V>
继承者 java.util.HashMap<K,V>
所有已实现的接口:
Serializable,Cloneable,Map<K,V>
直接已知子类:
LinkedHashMap,PrinterStateReasons
HashMap中我们最长用的就是put(K, V)和get(K)。我们都知道,HashMap的K值是唯一的,那如何保证唯一性呢?我们首先想到的是用equals比较,没错,这样可以实现,但随着内部元素的增多,put和get的效率将越来越低,这里的时间复杂度是O(n),假如有1000个元素,put时需要比较1000次。实际上,HashMap很少会用到equals方法,因为其内通过一个哈希表管理所有元素,哈希是通过hash单词音译过来的,也可以称为散列表,哈希算法可以快速的存取元素,当我们调用put存值时,HashMap首先会调用K的hashCode方法,获取哈希码,通过哈希码快速找到某个存放位置,这个位置可以被称之为bucketIndex,通过上面所述hashCode的协定可以知道,如果hashCode不同,equals一定为false,如果hashCode相同,equals不一定为true。所以理论上,hashCode可能存在冲突的情况,有个专业名词叫碰撞,当碰撞发生时,计算出的bucketIndex也是相同的,这时会取到bucketIndex位置已存储的元素,最终通过equals来比较,equals方法就是哈希码碰撞时才会执行的方法,所以前面说HashMap很少会用到equals。HashMap通过hashCode和equals最终判断出K是否已存在,如果已存在,则使用新V值替换旧V值,并返回旧V值,如果不存在 ,则存放新的键值对<K, V>到bucketIndex位置。
现在我们知道,执行put方法后,最终HashMap的存储结构会有这三种情况,情形3是最少发生的,哈希码发生碰撞属于小概率事件。到目前为止,我们了解了两件事:
HashMap通过键的hashCode来快速的存取元素。
当不同的对象hashCode发生碰撞时,HashMap通过单链表来解决,将新元素加入链表表头,通过next指向原有的元素。单链表在Java中的实现就是对象的引用(复合)。
来鉴赏一下HashMap中put方法源码:
public V put(K key, V value) {
// 处理key为null,HashMap允许key和value为null
if (key == null)
return putForNullKey(value);
// 得到key的哈希码
int hash = hash(key);
// 通过哈希码计算出bucketIndex
int i = indexFor(hash, table.length);
// 取出bucketIndex位置上的元素,并循环单链表,判断key是否已存在
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
// 哈希码相同并且对象相同时
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
// 新值替换旧值,并返回旧值
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
// key不存在时,加入新元素
modCount++;
addEntry(hash, key, value, i);
return null;
}
到这里,我们了解了HashMap工作原理的一部分,那还有另一部分,如,加载因子及rehash,HashMap通常的使用规则,多线程并发时HashMap存在的问题等等,这些会留在下一章说明。
本文来自:高爽|Coder,原文地址:http://blog.csdn.net/ghsau/article/details/16843543,转载请注明。
java.util.HashMap<Stock, Double>
Hash table based implementation of the Map interface. This implementation provides all of the optional map operations, and permits null values and the null key. (The HashMap class is roughly equivalent to Hashtable, except that it is unsynchronized and permits nulls.) This class makes no guarantees as to the order of the map; in particular, it does not guarantee that the order will remain constant over time.
This implementation provides constant-time performance for the basic operations (get and put), assuming the hash function disperses the elements properly among the buckets. Iteration over collection views requires time proportional to the "capacity" of the HashMap instance (the number of buckets) plus its size (the number of key-value mappings). Thus, it‘s very important not to set the initial capacity too high (or the load factor too low) if iteration performance is important.
An instance of HashMap has two parameters that affect its performance: initial capacity and load factor. The capacity is the number of buckets in the hash table, and the initial capacity is simply the capacity at the time the hash table is created. The load factor is a measure of how full the hash table is allowed to get before its capacity is automatically increased. When the number of entries in the hash table exceeds the product of the load factor and the current capacity, the hash table is rehashed (that is, internal data structures are rebuilt) so that the hash table has approximately twice the number of buckets.
As a general rule, the default load factor (.75) offers a good tradeoff between time and space costs. Higher values decrease the space overhead but increase the lookup cost (reflected in most of the operations of the HashMap class, including get and put). The expected number of entries in the map and its load factor should be taken into account when setting its initial capacity, so as to minimize the number of rehash operations. If the initial capacity is greater than the maximum number of entries divided by the load factor, no rehash operations will ever occur.
If many mappings are to be stored in a HashMap instance, creating it with a sufficiently large capacity will allow the mappings to be stored more efficiently than letting it perform automatic rehashing as needed to grow the table. Note that using many keys with the same hashCode() is a sure way to slow down performance of any hash table. To ameliorate impact, when keys are Comparable, this class may use comparison order among keys to help break ties.
Note that this implementation is not synchronized. If multiple threads access a hash map concurrently, and at least one of the threads modifies the map structurally, it must be synchronized externally. (A structural modification is any operation that adds or deletes one or more mappings; merely changing the value associated with a key that an instance already contains is not a structural modification.) This is typically accomplished by synchronizing on some object that naturally encapsulates the map. If no such object exists, the map should be "wrapped" using the Collections.synchronizedMap method. This is best done at creation time, to prevent accidental unsynchronized access to the map:
Map m = Collections.synchronizedMap(new HashMap(...));
The iterators returned by all of this class‘s "collection view methods" are fail-fast: if the map is structurally modified at any time after the iterator is created, in any way except through the iterator‘s own remove method, the iterator will throw a ConcurrentModificationException. Thus, in the face of concurrent modification, the iterator fails quickly and cleanly, rather than risking arbitrary, non-deterministic behavior at an undetermined time in the future.
Note that the fail-fast behavior of an iterator cannot be guaranteed as it is, generally speaking, impossible to make any hard guarantees in the presence of unsynchronized concurrent modification. Fail-fast iterators throw ConcurrentModificationException on a best-effort basis. Therefore, it would be wrong to write a program that depended on this exception for its correctness: the fail-fast behavior of iterators should be used only to detect bugs.
This class is a member of the Java Collections Framework.
Type Parameters:
<K> the type of keys maintained by this map
<V> the type of mapped values
Since:
1.2
Author:
Doug Lea
Josh Bloch
Arthur van Hoff
Neal Gafter
See Also:
Object.hashCode()
Collection
Map
TreeMap
Hashtable
39、URI Class说明
URIs, URLs, and URNs
首先,URI,是uniform resource identifier,统一资源标识符,用来唯一的标识一个资源。而URL是uniform resource locator,统一资源定位器,它是一种具体的URI,即URL可以用来标识一个资源,而且还指明了如何locate这个资源。而URN,uniform resource name,统一资源命名,是通过名字来标识资源,比如mailto:[email protected]。也就是说,URI是以一种抽象的,高层次概念定义统一资源标识,而URL和URN则是具体的资源标识的方式。URL和URN都是一种URI。
在Java的URI中,一个URI实例可以代表绝对的,也可以是相对的,只要它符合URI的语法规则。而URL类则不仅符合语义,还包含了定位该资源的信息,因此它不能是相对的,schema必须被指定。
ok,现在回答文章开头提出的问题,到底是imgUrl好呢,还是imgUri好?显然,如果说imgUri是肯定没问题的,因为即使它实际上是url,那它也是uri的一种。那么用imgUrl有没有问题呢?此时则要看它的可能取值,如果是绝对路径,能够定位的,那么用imgUrl是没问题的,而如果是相对路径,那还是不要用ImgUrl的好。总之,用imgUri是肯定没问题的,而用imgUrl则要视实际情况而定。
第二个,从HttpServletRequest的javadoc中可以看出,getRequestURI返回一个String,“the part of this request’s URL from the protocol name up to the query string in the first line of the HTTP request”,比如“POST /some/path.html?a=b HTTP/1.1”,则返回的值为”/some/path.html”。现在可以明白为什么是getRequestURI而不是getRequestURL了,因为此处返回的是相对的路径。而getRequestURL返回一个StringBuffer,“The returned URL contains a protocol, server name, port number, and server path, but it does not include query string parameters.”,完整的请求资源路径,不包括querystring。
总结一下:URL是一种具体的URI,它不仅唯一标识资源,而且还提供了定位该资源的信息。URI是一种语义上的抽象概念,可以是绝对的,也可以是相对的,而URL则必须提供足够的信息来定位,所以,是绝对的,而通常说的relative URL,则是针对另一个absolute URL,本质上还是绝对的。
注:这里的绝对(absolute)是指包含scheme,而相对(relative)则不包含scheme。
URI抽象结构 [scheme:]scheme-specific-part[#fragment]
[scheme:][//authority][path][?query][#fragment]
authority为[[email protected]]host[:port]
参考资料:
http://docs.oracle.com/javase/1.5.0/docs/api/java/net/URI.html
http://en.wikipedia.org/wiki/Uniform_Resource_Identifier
http://docs.oracle.com/javaee/5/api/javax/servlet/http/HttpServletRequest.html
ps:
java.net.URL类不提供对标准RFC2396规定的特殊字符的转义,因此需要调用者自己对URL各组成部分进行encode。而java.net.URI则会提供转义功能。因此The recommended way to manage the encoding and decoding of URLs is to use java.net.URI. 可以使用URI.toURL()和URL.toURI()方法来对两个类型的对象互相转换。对于HTML FORM的url encode/decode可以使用java.net.URLEncoder和java.net.URLDecoder来完成,但是对URL对象不适用。
Constructs a URI by parsing the given string.
This constructor parses the given string exactly as specified by the grammar in RFC 2396, Appendix A, except for the following deviations:
An empty authority component is permitted as long as it is followed by a non-empty path, a query component, or a fragment component. This allows the parsing of URIs such as "file:///foo/bar", which seems to be the intent of RFC 2396 although the grammar does not permit it. If the authority component is empty then the user-information, host, and port components are undefined.
Empty relative paths are permitted; this seems to be the intent of RFC 2396 although the grammar does not permit it. The primary consequence of this deviation is that a standalone fragment such as "#foo" parses as a relative URI with an empty path and the given fragment, and can be usefully resolved against a base URI.
IPv4 addresses in host components are parsed rigorously, as specified by RFC 2732: Each element of a dotted-quad address must contain no more than three decimal digits. Each element is further constrained to have a value no greater than 255.
Hostnames in host components that comprise only a single domain label are permitted to start with an alphanum character. This seems to be the intent of RFC 2396 section 3.2.2 although the grammar does not permit it. The consequence of this deviation is that the authority component of a hierarchical URI such as s://123, will parse as a server-based authority.
IPv6 addresses are permitted for the host component. An IPv6 address must be enclosed in square brackets (‘[‘ and ‘]‘) as specified by RFC 2732. The IPv6 address itself must parse according to RFC 2373. IPv6 addresses are further constrained to describe no more than sixteen bytes of address information, a constraint implicit in RFC 2373 but not expressible in the grammar.
Characters in the other category are permitted wherever RFC 2396 permits escaped octets, that is, in the user-information, path, query, and fragment components, as well as in the authority component if the authority is registry-based. This allows URIs to contain Unicode characters beyond those in the US-ASCII character set.
Parameters:
str The string to be parsed into a URI
Throws:
NullPointerException - If str is null
URISyntaxException - If the given string violates RFC 2396, as augmented by the above deviations
39、执行jar包出错,nullPointerException
15/12/15 22:50:46 INFO mapreduce.Job: Task Id : attempt_1450188830584_0013_m_000000_1, Status : FAILED
Error: java.lang.NullPointerException
at mapsidejoin.Mapsidejoin$MapsideMapper.map(Mapsidejoin.java:37)
at mapsidejoin.Mapsidejoin$MapsideMapper.map(Mapsidejoin.java:26)
错误原因:hashmap使用containKey方法,需要调用key class的equals()方法
解决方法:做stock class(implements WritableComparable)漏重构方法equal()和hashCode()
Java code
/** * Returns the entry associated with the specified key in the * HashMap. Returns null if the HashMap contains no mapping * for the key. */
final Entry<K,V> getEntry(Object key) {
int hash = (key == null) 0 : hash(key.hashCode());
for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null; }
40、执行jar包出错,unknowHostexception
[email protected]_Java:~/java/labs/Solutions/Lab7.1/MapSideJoin# yarn jar mapsidejoin.jar AIT
15/12/15 23:07:36 INFO impl.TimelineClientImpl: Timeline service address: http://resourcemanager:8188/ws/v1/timeline/
15/12/15 23:07:36 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.17.0.3:8050
15/12/15 23:07:37 INFO mapreduce.JobSubmitter: Cleaning up the staging area /user/root/.staging/job_1450188830584_0017
java.lang.IllegalArgumentException: java.net.UnknownHostException: sandbox
41、在Map Side Join中,使用hashmap存储小表的时候,需要对key的对象重载hashCode()函数,否则hashmap的containsKey()函数找不到对应内容
42、StringUtils.split,对于分列为空字符串,代码会自动删除(因StringUtils.split中,默认separatorChars为false)。如果需要保留,建议使用String.split,或者使用StringUtils.splitWorker(str,separatorchar,-1,true)。
public static String[] split(String str, String separatorChars) {
return splitWorker(str, separatorChars, -1, false);
}
/**
* Performs the logic for the <code>split</code> and
* <code>splitPreserveAllTokens</code> methods that do not return a
* maximum array length.
*
* @param str the String to parse, may be <code>null</code>
* @param separatorChar the separate character
* @param preserveAllTokens if <code>true</code>, adjacent separators are
* treated as empty token separators; if <code>false</code>, adjacent
* separators are treated as one separator.
* @return an array of parsed Strings, <code>null</code> if null String input
*/
private static String[] splitWorker(String str, char separatorChar, boolean preserveAllTokens) {
43、reduce side join,需要在map outputKey和map outputValuel里面都设置flag。
outputKey:用于排序数据集;
outputValue flag:用于剔除数据集中记录不存在的情况。
44、设置Partitioner()出错,提示非法分区
16/06/10 13:07:58 INFO mapreduce.Job: Task Id : attempt_1465527536456_0018_m_000000_0, Status : FAILED
Error: java.io.IOException: Illegal partition for Theoldmanandsea,!termnum (-2)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1082)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at tfidf.Tfidf$Tfmapper.cleanup(Tfidf.java:111)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:149)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
错误原因:在设置Partitioner函数中,使用key.getYear(String类型)与numPartitions取模作为返回值。由于取模函数支持负数运算,并且运算符号与被除数一致。当key.getYear.hashCode()返回负数时候, (key.getYear.hashCode()%numPartitions也是负数,导致程序出错。
解决方法:修改为 Math.abs(key.getYear.hashCode()%numPartitions);
public static class Exampartitioner extends Partitioner<Airport, DoubleWritable> {
@Override
public int getPartition(Airport key, DoubleWritable value,
int numPartitions) {
// TODO Auto-generated method stub
return key.getYear.hashCode()%numPartitions;
}
}
45、partitioner的判断条件需等于Group的条件,或者是Group条件的子集。
46、TotalOrderPartitioner出错,提示如下,
16/06/09 14:01:20 INFO mapreduce.Job: Task Id : attempt_1465443169491_0015_m_000000_2, Status : FAILED
Error: java.lang.IllegalArgumentException: Can‘t read partitions file
at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:116)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:701)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: Wrong number of partitions in keyset
at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:90)
... 10 more
问题原因:job.setNumReduceTask(3);放在了InputSampler.Sampler的后面。
解决方法:把job.setNumReduceTask(3)放在前面。
根据网络资料总结。