MapReduce作业读取文件
Map任务数量由InputSplit决定,InputSplit分片大小默认是HDFS块大小(hadoop1.x=64mb,hadoop2.x是128mb)。
例如:
MapReduce作业读取HDFS上(hadoop2.x)两个文件,一个是200MB,一个是100MB,这时候就有3个InputSplit,每个InputSplit会起一个Mapper任务读取,通过
RecordReader转换为key,value提供Mapper函数使用
MapReduce作业调优:
1、关闭推测执行
MapReduce模型将作业分解成任务,然后并行的运行任务是作业整体执行时间少于各个任务顺序执行时间。这使作业执行时间多运行缓慢的任务很敏感,因为运行一个缓慢
的任务会使整个作业所用的时间远远长于其他任务的时间。当一个作业由成百上千时可能会出现拖后腿的任务。
任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但检测问题具体原因很难,因为任务总能成功执行完,尽管比预计时间长。hadoop不会尝试诊断或修复执行
慢的任务。在一个任务运行比预期慢时,他会尽量检测,并启动另一个相同的任务作为备份,这就是推测执行(speculative execution);如果同时启动两个相同的任务他们
会相互竞争,导致推测执行无法正常工作,这对集群资源是一种浪费。只有在一个作业的所有任务都启动之后才会启动推测任务,并只针对已经运行一段时间(至少一分钟)且
比作业中其他任务平均进度慢的任务。一个任务成功完成后任何正在运行的重复任务都将中止。如果原任务在推测任务之前完成则推测任务就会被中止,同样,如果推测任务先
完成则原任务就会被中止。
推测任务只是一种优化措施,它并不能使作业运行的更加可靠。如果有一些软件缺陷造成的任务挂起或运行速度慢,依靠推测执行是不能成功解决的。默认情况下推测执行
是启用的,可根据集群或每个作业,单独为map任务或reduce任务启用或禁用该功能。
job.getConfiguration().setBoolean(job.MAP_SPECULATIVE, false); //是否对Map Task启用推测执行机制,默认是true
job.getConfiguration().setBoolean(job.REDUCE_SPECULATIVE, false); //是否对Reduce Task启用推测执行机制,默认是true
2、开启jvm重用
属性:mapred.job.reuse.jvm.num.tasks,默认值是1,在一个taskTracker上对于给定的作业的每个jvm上可以运行任务最大数。-1表示无限制,即同一个jvm可以被该作
业的所有任务使用。
conf.set("mapreduce.job.jvm.numtasks", "-1"); //开启jvm重用
job.getConfiguration().setInt(job.JVM_NUMTASKS_TORUN, -1); //开启jvm重用
3、跳过坏记录
在MRv1中可通过启用skipping mode来跳过损坏的记录(格式错误或字段丢失),当启用后任务将正在处理的记录报告给tasktracker,任务失败时tasktracker会重新运行
该任务,跳过导致任务失败的记录。由于额外的网络流量和记录错误以维护失败记录范围,所有只有在任务失败两次以上才会启用skipping mode。
在MRv2中不再支持skipping mode选项。权威指南中建议在mapper和reducer代码中处理被损坏的记录。运行时检测到坏记录根据业务需求选择忽略或通过抛异常来终止作业
运行;如果业务要求必须处理则可以通过计数器来计算作业中总的坏记录数,看问题影响的范围进行处理。
conf.set("mapreduce.map.skip.maxrecords", "0"); //Map Task跳过损坏记录的最大数,默认0,如果应用能接受任何坏记录则取Long.MAX_VALUE
job.getConfiguration().setLong(job.MAP_SKIP_MAX_RECORDS, Long.MAX_VALUE); //Map Task跳过损坏记录的最大数,默认0,如果应用能接受任何坏记录则取Long.MAX_VALUE(9223372036854775807)个坏记录
4、设置Map和Reduce任务失败重试次数
conf.set("mapreduce.map.maxattempts", "4"); //Map Task最大失败尝试次数,默认是4次
conf.set("mapreduce.reduce.maxattempts", "4"); //Reduce Task最大失败尝试次数,默认是4次