1.1.1 全排序
(1)全排序概述
指的是让所有的输出结果都是有序的,最简单的方法就是用一个reduce任务,但是这样处理大型文件时效率极低,失去的并行架构的意义。所以可以采用分组排序的方法来实现全局排序,例如现在要实现按键的全局的排序,可以将键值按照取值范围分为n个分组,<-10℃,-10℃~0℃, 0℃~10℃,>10℃。实现partitioner类,创建4个分区,将温度按照取值范围分类到四个分区中,每个分区进行排序,然后将4个分区结果合并成一个,既是一个全局有序的输出。
(2)分组排序
分组排序就是按照值的大小将数据进行分组,第i组的数据小于所有第i+1组的数据,每组排序,在合并,就是全局有序。按照上述分区的方法,可能数据落在每个区间内的数据数量并不相同,可能所占比例非常大,有的非常下,这样reduce任务有的处理数据多,有的处理数据少。理想情况是让各个分区所含的记录数大致相等,使作业的总体执行时间不会受制于个别reducer任务。为了让数据尽量均匀的分布到各个区间,又不用对所有数据进行统计(消耗太大),可以通过采样的方法,对数据进行采样分区,得到分区的边界值。
(3)采样分组
用InputSampler对象对输入数据进行采样,得到数据的采样区间分隔值,将这些值写入到一个文件中。然后TotalOrderPartitioner类读取这些边界值作为分区依据。采样分组就是通过采集输入的部分数据,得到相对均匀的分布区间,每个区间的数据量差不多。InputSampler是采样方式有三种:前n条记录采样SplitSample,随机采样RandomSample,固定间隔采样?IntervalSample。
类名称 |
采样方式 |
构造方法 |
效率 |
SplitSampler(int numSamples, int maxSplitsSampled) |
对输入分片均匀采样,每个分片取前n个。 |
采样总数,用于采样的分片数 |
最高 |
RandomSampler(double freq, int numSamples, int maxSplitsSampled) |
遍历所有数据,随机采样 |
采样频率,采样总数,划分数 |
最低 |
IntervalSampler(double freq, int maxSplitsSampled) |
固定间隔采样对有序的数据十分适用 |
采样频率,划分数 |
中 |
(4)InputSampler原理
1)InputSampler是个hadoop任务类,继承Configured,实现Tool接口,main函数作为入口函数,run函数用来执行任务,InputSampler还有另外一个writePartitionFile函数,它是将采样的值排序,然后按照分区的数量进行划分,得到边界值写入分区文件,其定义为如下:
public class InputSampler<K, V> extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(InputSampler.class); static int printUsage() { System.out.println("sampler -r <reduces>\n [-inFormat <input format class>]\n [-keyClass <map input & output key class>]\n [-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)\n -splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)\n -splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)"); System.out.println("Default sampler: -splitRandom 0.1 10000 10"); ToolRunner.printGenericCommandUsage(System.out); return -1; } public InputSampler(Configuration conf) { this.setConf(conf); } public static <K, V> void writePartitionFile(Job job, InputSampler.Sampler<K, V> sampler) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = job.getConfiguration(); InputFormat inf = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//有numPartitions个reduce任务就有numPartitions分区,产生numPartitions个文件 int numPartitions = job.getNumReduceTasks();
//获取采样的值 K[] samples = (Object[])sampler.getSample(inf, job); LOG.info("Using " + samples.length + " samples");
//获取排序函数,对采样值进行排序 RawComparator<K> comparator = job.getSortComparator(); Arrays.sort(samples, comparator);
//获取分区文件的路径 Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); FileSystem fs = dst.getFileSystem(conf);
//如果存在,删除原来的分区文件 if (fs.exists(dst)) { fs.delete(dst, false); }
创建写入对象,创建新的文件 Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class); NullWritable nullValue = NullWritable.get();
//获取间隔值的步长,已经排好序之后,每隔stepSize取一个值作为分组的边界值 float stepSize = (float)samples.length / (float)numPartitions; int last = -1; for(int i = 1; i < numPartitions; ++i) { int k; for(k = Math.round(stepSize * (float)i); last >= k && comparator.compare(samples[last], samples[k]) == 0; ++k) { ; } writer.append(samples[k], nullValue); last = k; } writer.close(); } //run函数执行采样任务,入参是采样类型 public int run(String[] args) throws Exception { Job job = new Job(this.getConf()); ArrayList<String> otherArgs = new ArrayList(); InputSampler.Sampler<K, V> sampler = null; for(int i = 0; i < args.length; ++i) { try { if ("-r".equals(args[i])) { ++i; job.setNumReduceTasks(Integer.parseInt(args[i])); } else if ("-inFormat".equals(args[i])) { ++i; job.setInputFormatClass(Class.forName(args[i]).asSubclass(InputFormat.class)); } else if ("-keyClass".equals(args[i])) { ++i; job.setMapOutputKeyClass(Class.forName(args[i]).asSubclass(WritableComparable.class)); } else if ("-splitSample".equals(args[i])) { ++i; int numSamples = Integer.parseInt(args[i]); ++i; int maxSplits = Integer.parseInt(args[i]); if (0 >= maxSplits) { maxSplits = 2147483647; }
分区采样 sampler = new InputSampler.SplitSampler(numSamples, maxSplits); } else { int maxSplits; double pcnt; if ("-splitRandom".equals(args[i])) { ++i; pcnt = Double.parseDouble(args[i]); ++i; maxSplits = Integer.parseInt(args[i]); ++i; int maxSplits = Integer.parseInt(args[i]); if (0 >= maxSplits) { maxSplits = 2147483647; }
//随机采样 sampler = new InputSampler.RandomSampler(pcnt, maxSplits, maxSplits); } else if ("-splitInterval".equals(args[i])) { ++i; pcnt = Double.parseDouble(args[i]); ++i; maxSplits = Integer.parseInt(args[i]); if (0 >= maxSplits) { maxSplits = 2147483647; }
//间隔采样 sampler = new InputSampler.IntervalSampler(pcnt, maxSplits); } else { otherArgs.add(args[i]); } } } catch (NumberFormatException var10) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException var11) { System.out.println("ERROR: Required parameter missing from " + args[i - 1]); return printUsage(); } }
// reduce任务数量不能<=2,否则分组就没有了任何意义 if (job.getNumReduceTasks() <= 1) { System.err.println("Sampler requires more than one reducer"); return printUsage(); } else if (otherArgs.size() < 2) { System.out.println("ERROR: Wrong number of parameters: "); return printUsage(); } else { if (null == sampler) {
//默认采用随机采样 sampler = new InputSampler.RandomSampler(0.1D, 10000, 10); } Path outf = new Path((String)otherArgs.remove(otherArgs.size() - 1)); TotalOrderPartitioner.setPartitionFile(this.getConf(), outf); Iterator i$ = otherArgs.iterator(); while(i$.hasNext()) { String s = (String)i$.next(); FileInputFormat.addInputPath(job, new Path(s)); }
//默任执行写入分区文件 writePartitionFile(job, (InputSampler.Sampler)sampler); return 0; } } public static void main(String[] args) throws Exception { InputSampler<?, ?> sampler = new InputSampler(new Configuration()); int res = ToolRunner.run(sampler, args); System.exit(res); }
public interface Sampler<K, V> { K[] getSample(InputFormat<K, V> var1, Job var2) throws IOException, InterruptedException; } }
2)InputSampler类定义个一个采样接口Sample接口,定义方法getSample,SplitSample、RandomSample、IntervalSample类都实现了这个接口,采用不同的方法获取采样值。三个类都是InputSampler的内部静态类,实现了getSample方法,下面分别阐述。
SplitSample类定义
总的采样数除以用于采样的分片数量,得到每个分片的取样数n,采取每个分片的前n个数据。
public static class SplitSampler<K, V> implements InputSampler.Sampler<K, V> { protected final int numSamples; protected final int maxSplitsSampled; public SplitSampler(int numSamples) { this(numSamples, 2147483647); } public SplitSampler(int numSamples, int maxSplitsSampled) { this.numSamples = numSamples;//采样总数 this.maxSplitsSampled = maxSplitsSampled;// 用于取样的分片数量,不大于实际分片数 } public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException { //获取分片数
List<InputSplit> splits = inf.getSplits(job);
//采样总数创建数组 ArrayList<K> samples = new ArrayList(this.numSamples);
//用于取样的分片数量 int splitsToSample = Math.min(this.maxSplitsSampled, splits.size()); //每个分片需要采集多少个数据
int samplesPerSplit = this.numSamples / splitsToSample; long records = 0L; for(int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
//创建读取记录的reader RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext); reader.initialize((InputSplit)splits.get(i), samplingContext); while(reader.nextKeyValue()) {
//采样数据写入smaple数组 samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null)); ++records;
//每个分片只采集前个samplesPerSplit数据,超出则退出 if ((long)((i + 1) * samplesPerSplit) <= records) { break; } } reader.close(); } return (Object[])samples.toArray(); } }
IntervalSample类定义
遍历用于采样的分片数据,根据采样率来等间隔采集数据,例如采样率是0.1,则每隔10个采集一个数据。
public static class IntervalSampler<K, V> implements InputSampler.Sampler<K, V> { protected final double freq; protected final int maxSplitsSampled; public IntervalSampler(double freq) { this(freq, 2147483647); } public IntervalSampler(double freq, int maxSplitsSampled) { this.freq = freq;//采样率 this.maxSplitsSampled = maxSplitsSampled;//用于采样的分片数 } public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList(); int splitsToSample = Math.min(this.maxSplitsSampled, splits.size()); long records = 0L;//遍历的记录数 long kept = 0L;//采集的记录数 for(int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext); reader.initialize((InputSplit)splits.get(i), samplingContext); while(reader.nextKeyValue()) { //假设freq为0.1,第一次循环,record为1,kept为0,0/1小于freq0.1,第一条记录会被采到,kept变为1;第二次循环,record=2,kept=1,1/2大于freq0.1,第二条记录不会取到;kept/records的值从1/2,1/3,1/4……1/10大于等于freq0.1,第11条记录时,1/11小于0.1,第11条记录会被取到,kept变成2,只有到2/21时,才会取第三条数据,所以是每隔10条取一个,是等间隔取数据。
++records; if ((double)kept / (double)records < this.freq) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null)); ++kept; } } reader.close(); } return (Object[])samples.toArray(); } }
RandomSample类定义
随机采样输入参数是采样频率,采样总数,用于采样的的分片数。遍历用于采样的分片中的记录,如果随机数小于采样率则进行采样,添加进入采样数组,或者更换已满数组中的值。同时减小采样率,越往后面,采集到数据的概率越小。
public static class RandomSampler<K, V> implements InputSampler.Sampler<K, V> { protected double freq; protected final int numSamples; protected final int maxSplitsSampled; public RandomSampler(double freq, int numSamples) { this(freq, numSamples, 2147483647); } public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { this.freq = freq;//采样率 this.numSamples = numSamples;//采样总数 this.maxSplitsSampled = maxSplitsSampled;//用于采样的分片数 } public K[] getSample(InputFormat<K, V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList(this.numSamples);//采样保存申请空间 int splitsToSample = Math.min(this.maxSplitsSampled, splits.size());//计算用于采样的分片数 Random r = new Random();//创建随机对象 long seed = r.nextLong();//创建随机种子 r.setSeed(seed); InputSampler.LOG.debug("seed: " + seed); int i;//将分片打乱顺序,随机获取第j个分片,和第i个分片进行交换 for(i = 0; i < splits.size(); ++i) { InputSplit tmp = (InputSplit)splits.get(i); int j = r.nextInt(splits.size()); splits.set(i, splits.get(j)); splits.set(j, tmp); }
//循环从用于采样的分片中随机获取数据,直到采样分片遍历完(可能数量不够numSamples个),或者已经采集到numSamples个数据
for(i = 0; i < splitsToSample || i < splits.size() && samples.size() < this.numSamples; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); RecordReader<K, V> reader = inf.createRecordReader((InputSplit)splits.get(i), samplingContext); reader.initialize((InputSplit)splits.get(i), samplingContext); while(reader.nextKeyValue()) {
//随机double值小于采样率,符合条件,进行获取当前值,这样有可能,遍历所有的值,可能没有获取到指定的numSamples记录? if (r.nextDouble() <= this.freq) {
//采样数组中数据还不足则add进去,如果已经采集到了numSamples个记录,则随机替换set到sample数组中 if (samples.size() < this.numSamples) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null)); } else { int ind = r.nextInt(this.numSamples); if (ind != this.numSamples) { samples.set(ind, ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), (Object)null)); }
//每采样到一个数据,采样率会减小,r.nextDouble() <= this.freq采样到的数据概率会减小 this.freq *= (double)(this.numSamples - 1) / (double)this.numSamples; } } } reader.close(); } return (Object[])samples.toArray(); } }
(1) TotalOrderPartitioner
全局有序分区的类,通过函数job.setPartitionerClass(TotalOrderPartitioner.class);输入数据就会将key值传入TotalOrderPartitioner中分getPartition()函数获取分区号。分区是按照采样的结果得出的分区区间。
public class TotalOrderPartitioner<K extends WritableComparable<?>, V> extends Partitioner<K, V> implements Configurable {
(2) 随机采样全局排序实例
下面的实例就是将输入文件进行按键值排序,首先采用随机采样的方式,采样率为0.1,从10个分片文件中采集10000个记录的key值。进行排序,如果要分成4个分区,则取2500位置处的5.6℃,5000位置处的13.9℃,7500位置处的22.0℃作为分界点,将温度分为4个区间,将边界值写入分区文件中,TotalOrderPartitioner会读取文件中的值,作为分区边界。这样每个分区内都会得到大致相等数量的数据。处理数据时,会根据温度值调用getPartition()函数,返回所属分区的编号,将该条记录交给该分区的reduce处理。最后得到四个文件,每个文件内都是有序的,且文件之间也是有序的,四个文件合并之后就得到一个全局有序的顺序文件。
温度区间 |
<5.6 |
[5.6,13.9] |
[13.9,22.0) |
>=22.0 |
package Temperature; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.net.URI; public class SortTempetatureTotalOrder extends Configured implements Tool { public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } //设置输入类型,输出键类型,输出文件类型,压缩、压缩类型 job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); //设置partitioner为全局分区类 job.setPartitionerClass(TotalOrderPartitioner.class); //设置采样随机采样频率为0.1,采样值为10000,用于采样的分片数为10. InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>( 0.1, 10000, 10); //进行采样,并把分区的边界值写入分区文件中,路径默认设置为mapreduce.totalorderpartitioner.path InputSampler.writePartitionFile(job,sampler); Configuration conf =job.getConfiguration(); //将分区文件加入缓冲区,提供给TotalOrderPartitioner读取,getPartition函数会根据键值判断属于哪个分区区间,从而返回partition值 String partitionFile=TotalOrderPartitioner.getPartitionFile(conf); URI partitionUri=new URI(partitionFile); job.addCacheFile(partitionUri); return job.waitForCompletion(true)? 0:1; } public static class JobBuilder { public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException { if (args.length != 2) { return null; } Job job = null; try { job = new Job(conf, tool.getClass().getName()); } catch (IOException e) { e.printStackTrace(); } FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortTempetatureTotalOrder(), args); System.exit(exitCode); } }
执行任务的hadoop命令如下, -totalsort表示采用全局排序
%hadoop jar Hadoop-example.jar SortTempetatureTotalOrder –D mapreduce.job.reduces=4 input/ncdc/all –seq outout -totalsort
参考文献
https://www.cnblogs.com/xiaoyh/p/9322244.html
自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:
https://www.cnblogs.com/bclshuai/p/11380657.html
原文地址:https://www.cnblogs.com/bclshuai/p/12315331.html