Hadoop实现全排序

1、1TB(或1分钟)排序的冠军 
作为分布式数据处理的框架,集群的数据处理能力究竟有多快?或许1TB排序可以作为衡量的标准之一。

1TB排序,就是对1TB(1024GB,大约100亿行数据)的数据进行排序。2008年,Hadoop赢得1TB排序基准评估第一名,排序1TB数据耗时209秒。后来,1TB排序被1分钟排序所取代,1分钟排序指的是在一分钟内尽可能多的排序。2009年,在一个1406个节点组成的hadoop集群,在59秒里对500GB完成了排序;而在1460个节点的集群,排序1TB数据只花了62秒

这么惊人的数据处理能力,是不是让你印象深刻呢?呵呵

下面我们来看看排序的过程吧。

2、排序的过程

1TB的数据?100亿条数据?都是什么样的数据呢?让我们来看几条:

  1. .t^#\|v$2\         0AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFGGGGGGGGGGHHHHHHHH
  2. [email protected]~?‘WdUF         1IIIIIIIIIIJJJJJJJJJJKKKKKKKKKKLLLLLLLLLLMMMMMMMMMMNNNNNNNNNNOOOOOOOOOOPPPPPPPP
  3. w[o||:N&H,         2QQQQQQQQQQRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTUUUUUUUUUUVVVVVVVVVVWWWWWWWWWWXXXXXXXX
  4. ^Eu)<n#kdP         3YYYYYYYYYYZZZZZZZZZZAAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFF
  5. +l-$$OE/ZH         4GGGGGGGGGGHHHHHHHHHHIIIIIIIIIIJJJJJJJJJJKKKKKKKKKKLLLLLLLLLLMMMMMMMMMMNNNNNNNN
  6. LsS8)|.ZLD         5OOOOOOOOOOPPPPPPPPPPQQQQQQQQQQRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTUUUUUUUUUUVVVVVVVV
  7. le5awB.$sm         6WWWWWWWWWWXXXXXXXXXXYYYYYYYYYYZZZZZZZZZZAAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDD
  8. q__[fwhKFg         7EEEEEEEEEEFFFFFFFFFFGGGGGGGGGGHHHHHHHHHHIIIIIIIIIIJJJJJJJJJJKKKKKKKKKKLLLLLLLL
  9. ;L+!2rT~hd         8MMMMMMMMMMNNNNNNNNNNOOOOOOOOOOPPPPPPPPPPQQQQQQQQQQRRRRRRRRRRSSSSSSSSSSTTTTTTTT
  10. M^*dDE;6^<         9UUUUUUUUUUVVVVVVVVVVWWWWWWWWWWXXXXXXXXXXYYYYYYYYYYZZZZZZZZZZAAAAAAAAAABBBBBBBB

.t^#\|v$2\ 0AAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFFFFGGGGGGGGGGHHHHHHHH [email protected]~?‘WdUF 1IIIIIIIIIIJJJJJJJJJJKKKKKKKKKKLLLLLLLLLLMMMMMMMMMMNNNNNNNNNNOOOOOOOOOOPPPPPPPP w[o||:N&H, 2QQQQQQQQQQRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTUUUUUUUUUUVVVVVVVVVVWWWWWWWWWWXXXXXXXX ^Eu)<n#kdP 3YYYYYYYYYYZZZZZZZZZZAAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDDDDEEEEEEEEEEFFFFFFFF +l-$$OE/ZH 4GGGGGGGGGGHHHHHHHHHHIIIIIIIIIIJJJJJJJJJJKKKKKKKKKKLLLLLLLLLLMMMMMMMMMMNNNNNNNN LsS8)|.ZLD 5OOOOOOOOOOPPPPPPPPPPQQQQQQQQQQRRRRRRRRRRSSSSSSSSSSTTTTTTTTTTUUUUUUUUUUVVVVVVVV le5awB.$sm 6WWWWWWWWWWXXXXXXXXXXYYYYYYYYYYZZZZZZZZZZAAAAAAAAAABBBBBBBBBBCCCCCCCCCCDDDDDDDD q__[fwhKFg 7EEEEEEEEEEFFFFFFFFFFGGGGGGGGGGHHHHHHHHHHIIIIIIIIIIJJJJJJJJJJKKKKKKKKKKLLLLLLLL ;L+!2rT~hd 8MMMMMMMMMMNNNNNNNNNNOOOOOOOOOOPPPPPPPPPPQQQQQQQQQQRRRRRRRRRRSSSSSSSSSSTTTTTTTT M^*dDE;6^< 9UUUUUUUUUUVVVVVVVVVVWWWWWWWWWWXXXXXXXXXXYYYYYYYYYYZZZZZZZZZZAAAAAAAAAABBBBBBBB

描述一下:每一行,是一条数据。每一条,由2部分组成,前面是一个由10个随即字符组成的key,后面是一个80个字符组成的value。

排序的任务:按照key的顺序排。

那么1TB的数据从何而来?答案是用程序随即生成的,用一个只有map,没有reduce的MapReduce job,在整个集群上先随即生成100亿行数据。然后,在这个基础上,再运行排序的MapReduce job,以测试集群排序性能。

3、排序的原理

先说明一点,熟悉MapReduce的人都知道:排序是MapReduce的天然特性!在数据达到reducer之前,mapreduce框架已经对这些数据按键排序了。

所以,在这个排序的job里,不需要特殊的Mapper和Reducer类。用默认的 
IdentityMapper和IdentityReducer
即可。

既然排序是天然特性,那么1TB排序的难点在哪里呢??答:100亿行的数据随即分散在1000多台机器上,mapper和reducer都是Identity的,这个难点就在MapReduce的shuffle阶段!关键在如何取样和怎么写Partitioner。

好在这个排序的源代码已近包含在hadoop的examples里了,下面我们就来分析一下。

4、取样和partition的过程

面对对这么大量的数据,为了partition的更均匀。要先“取样”

1) 对Math.min(10, splits.length)个split(输入分片)进行随机取样,总共10万个样,对每个split取10000个(split数目达到10个及以上时候, added by jiwan)。
2) 10万个样排序,根据reducer的数量(n),取出将所有10万个样平均分隔的n-1个样 
3) 将这个n-1个样写入partitionFile(_partition.lst,是一个SequenceFile),key是取的样,值是nullValue 
4) 将partitionFile写入DistributedCache

接下来,正式开始执行MapReduce job: 
5) 每个map节点: 
a.根据n-1个样,build一棵类似于B-数的“索引树”: 
* 每个非叶子节点,都有256个子节点(应为2^8=256, added by jiwan)。 
* 不算根节点的非叶子节点有1层,加上根节点和叶子节点,共3层。 
* 非叶子节点代表key的“byte path” 
* 每个叶子节点代表key的前2个bytes path 
* 叶子节点上,保存的是partition number的范围,有多少个reducer就有多少partition number

b.前缀相同的key,被分配到同一个叶子节点。 
c.一个子节点上,可能有多个reducer 
d.比第i个样小的key,被分配到第i个reducer,剩下的被分配到最后一个reducer。

6) 针对一个key,partition的过程:

a. 首选判断key的第1个byte,找到第1层非叶子节点 
b. 再根据key的第2个byte,叶子节点 
c. 每个叶子节点可能对应多个取样(即多个reducer),再逐个和每个样比较,确定分配给哪一个reducer

5、图解partition的“索引树”

对上面的文字描述可能比较难理解,etongg 同学建议我画个图。所有才有了下面这些文字。感谢etongg和大家对本帖的关注。

“索引树”的作用是为了让key快速找到对应的reducer。下图是我画的索引树示意图:

对上面的图做一点解释: 
1、为了简单,我只画了A、B、C三个节点,实际的是有256个节点的。 
2、这个图假设有20个reducer(下标0到19),那么我们最终获得n-1个样,即19个样(下标为18的为最后一个样) 
3、图中的圆圈,代表索引树上的节点,索引树共3层。 
4、叶子节点下面的长方形代表取样数组。红色的数字代表取样的下标。 
5、每个节点都对应取样数组上的一个下标范围(更准备的说,是对应一个partition number的范围,每个partition number代表一个reducer)。这个范围在途中用蓝色的文字标识。

前面文中有一句话: 
比第i个样小的key,被分配到第i个reducer,剩下的被分配到最后一个reducer

这里做一个小小的纠正,应该是: 
小于或者等于第i个样的key,被分配到第i个reducer,剩下的被分配到最后一个reducer。

下面开始partition: 
如果key以"AAA"开头,被分配到第“0”个reducer。 
如果key以"ACA"开头,被分配到第“4”个reducer。 
如果key以"ACD"开头,被分配到第“4”个reducer。 
如果key以"ACF"开头,被分配到第“5”个reducer。

那么, 
如果key以"ACZ"开头,被分配到第几个reducer?? 
答案是:被分配到第“6”个reducer。

同理, 
如果key以"CCZ"开头,被分配到第“19”个reducer,也就是最后一个reducer。

6、为什么不用HashPartitioner?

还需要再说明的一点: 
上面自定义的Partitinoner的作用除了快速找到key对应的reducer,更重要的一点是:这个Partitioner控制了排序的总体有序!

上文中提到的“排序是MapReduce的天然特性!”这句话有点迷惑性。更准确的说,这个“天然特性”只保证了:a) 每个map的输出结果是有序的; b) 每个reduce的输入是有序的(参考下面的图)。而1TB的整体有序还需要靠Partitioner的帮助!

Partitioner控制了相似的key(即前缀相同)落在同一个reducer里,然后mapreduce的“天然特性”再保证每个reducer的输入(在正式执行reduce函数前,有一个排序的动作)是有序的!

这样就理解了为什么不能用HashPartitioiner了。因为自定义的Partitioner要保证排序的“整体有序”大方向。

另外,推荐一篇关于partitioner博文:Hadoop Tutorial Series, Issue #2: Getting Started With (Customized) Partitioning

再贴《Hadoop.The.Definitive.Guide》中一张图,更有利于理解了:

 具体实现:

分为两步:取样+Partition对每条数据做标记(即发往哪个reducer做处理)

1. 取样

原理:取样工作在JobClient端进行,目的是取出n-1个、排序好的样本(可以划分出n个reducer),在partition的过程中,通过将当前keyvalue对的key跟样本中数据作比较,就可以知道该keyvalue对发往哪个reducer了。

以此我们需要写自己的“取样类”:

 static class TextSampler implements IndexedSortable {  
  
    public ArrayList<IntWritable> records = new ArrayList<IntWritable>();//全部样本数据   
  
    @Override  
    public int compare(int arg0, int arg1) {  
        IntWritable right = records.get(arg0);  
        IntWritable left = records.get(arg1);  
        return right.compareTo(left);  
    }  
  
    @Override  
    public void swap(int arg0, int arg1) {  
        IntWritable right = records.get(arg0);  
        IntWritable left = records.get(arg1);  
        records.set(arg0, left);  
        records.set(arg1, right);  
    }  
  
    public void addKey(IntWritable key) {  
        records.add(key);  
    }  
  
    public IntWritable[] createPartitions(int numPartitions) {  
        int numRecords = records.size();  
        if (numPartitions > numRecords) {  
            throw new IllegalArgumentException("Requested more partitions than input keys (" + numPartitions +  
                    " > " + numRecords + ")");  
        }  
        new QuickSort().sort(this, 0, records.size());  
        float stepSize = numRecords / (float) numPartitions;//取数的步长   
        IntWritable[] result = new IntWritable[numPartitions - 1];  
        for (int i = 1; i < numPartitions; ++i) {  
            result[i - 1] = records.get(Math.round(stepSize * i));//从全部样本数据中再抽出n-1个样本   
        }  
        return result;  
    }  
}

说明:实现了IndexedSortable接口,IndexedSortable接口是Hadoop中的排序器,Hadoop关于可排序的数据集定义了一个抽象接口IndexedSortable,也就是说任何能够排序的数据集必须要实现两个方法,一是能够比较它的数据集中任意两项的大小,二是能够交换它的数据集中任意两项的位置。实现了这个接口我们就可以使用hadoop预定义的快排进行排序。如上:new QuickSort().sort(this, 0, records.size());

那么样本怎么得来的呢?

我们需要从分片中获得,在Job启动前必须得到n-1个取样数据——>需要对输入的数据进行控制——>需要自定义实现InputFormat接口的类。InputFormat做了2件事:

(1)InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; 得到划分

(2)RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; 处理每个划分,对每个划分的数据生成KeyValue对

分片不用重写。需要自定义实现RecordReader接口的类。

static class TeraRecordReader implements RecordReader<IntWritable, Text> {  
  
        private LineRecordReader in;  
        private LongWritable junk = new LongWritable();  
        private Text line = new Text();  
  
        public TeraRecordReader(Configuration job, FileSplit split) throws IOException {  
            in = new LineRecordReader(job, split);  
        }  
  
        @Override  
        public void close() throws IOException {  
            in.close();  
        }  
  
        @Override  
        public IntWritable createKey() {  
            return new IntWritable();  
        }  
  
        @Override  
        public Text createValue() {  
            return new Text();  
        }  
  
        @Override  
        public long getPos() throws IOException {  
            // TODO Auto-generated method stub   
            return in.getPos();  
        }  
  
        @Override  
        public float getProgress() throws IOException {  
            // TODO Auto-generated method stub   
            return in.getProgress();  
        }  
  
        @Override  
        public boolean next(IntWritable key, Text value) throws IOException {  
            if (in.next(junk, line)) {  
                    key.set(Integer.parseInt(line.toString()));  
                    value.clear();  
                return true;  
            } else {  
                return false;  
            }  
        }  
    }//end RecordReader

默认情况下会对每个分片中的每行数据得到一个形如<Key=该行的起始位置:LongWritable,Value=该行的内容的:Text>的KeyValue对,我们需要将这个KeyValue对转化成我们想要的形式<Key=该行内容:IntWritable,Value=空字符串:Text>,所以如上重写了next函数。

到此我们可以按格式读到RecordReader提供的KeyValue对了。那么接下来我们就要找到读到的数据中你认为可以当做样本的数据:

public static void writePartitionFile(JobConf conf, Path partFile) throws IOException {  
    SamplerInputFormat inputFormat = new SamplerInputFormat();  
    TextSampler sampler = new TextSampler();  
    int partitions = conf.getNumReduceTasks(); // Reducer任务的个数   
    long sampleSize = conf.getLong(SAMPLE_SIZE, 100); // 采集数据-键值对的个数   
    InputSplit[] splits = inputFormat.getSplits(conf, conf.getNumMapTasks());// 获得数据分片   
    int samples = Math.min(10, splits.length);// 采集分片的个数   
    long recordsPerSample = sampleSize / samples;// 每个分片采集的键值对个数   
    int sampleStep = splits.length / samples; // 采集分片的步长   
    long records = 0;  
    IntWritable key = new IntWritable();  
    Text value = new Text();  
    for (int i = 0; i < samples; i++) {  
        //to particular split construct a record_reader   
        RecordReader<IntWritable, Text> reader = inputFormat.getRecordReader(splits[sampleStep * i], conf, null);  
        while (reader.next(key, value)) {  
            sampler.addKey(key);  
            key=new IntWritable();  
            value = new Text();  
            records += 1;  
            if ((i + 1) * recordsPerSample <= records) {  
                break;  
            }  
        }  
    }  
    FileSystem outFs = partFile.getFileSystem(conf);  
    if (outFs.exists(partFile)) {  
        outFs.delete(partFile, false);  
    }  
    SequenceFile.Writer writer = SequenceFile.createWriter(outFs, conf, partFile, IntWritable.class, NullWritable.class);  
    NullWritable nullValue = NullWritable.get();  
    for (IntWritable split : sampler.createPartitions(partitions)) {  
        writer.append(split, nullValue);  
    }  
    writer.close();  
}

如上所示,我们通过writer将(n-1)个样本写入到了临时的样本文件中。接下来可以启动Job了。

3. Partition对每条数据做标记(即发往哪个reducer做处理)

在map-reduce流程中,partitioner会负责“告知”每条数据的归属地reducer,这里我们要根据上面写好的临时样本文件判断每天数据的归属,因此需要自定义实现Partitioner接口的类:

// 自定义的Partitioner     
public static class TotalOrderPartitioner implements Partitioner<IntWritable, NullWritable> {    
      
    private IntWritable[] splitPoints;    
      
    public TotalOrderPartitioner() {    
    }    
      
    @Override    
    public int getPartition(IntWritable key, NullWritable value, int numReduceTasks) {    
        // TODO Auto-generated method stub     
        return findPartition(key);    
    }    
      
    public void configure(JobConf conf) {    
        try {    
            FileSystem fs = FileSystem.get(conf);  
            Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);    
            splitPoints = readPartitions(fs, partFile, conf,splitPoints); // 读取采集文件    
        } catch (IOException ie) {    
            throw new IllegalArgumentException("can‘t read paritions file", ie);    
        }    
    }  
    //通过找区间的方式定位partition   
    public int findPartition(IntWritable key) {    
        int len = splitPoints.length;    
        for (int i = 0; i < len; i++) {    
            int res = key.compareTo(splitPoints[i]);    
            if (res > 0 && i < len - 1) {    
                continue;    
            } else if (res == 0) {    
                return i;    
            } else if (res < 0) {    
                return i;    
            } else if (res > 0 && i == len - 1) {    
                return i + 1;    
            }    
        }   
        return 0;    
    }    
      
    private static IntWritable[] readPartitions(FileSystem fs, Path p, JobConf job, IntWritable[] splitPoints) throws IOException {   
        URI[] uris = DistributedCache.getCacheFiles(fs.getConf());  
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(uris[0]), job);    
        ArrayList<IntWritable> parts = new ArrayList<IntWritable>();    
        IntWritable key = new IntWritable();             
        NullWritable value = NullWritable.get();   
        while (reader.next(key, value)) {    
            parts.add(key);     
            key=new IntWritable();  
            value = NullWritable.get();  
        }    
        reader.close();    
        splitPoints = new IntWritable[parts.size()];  
        for(int i=0;i<parts.size();i++) {  
            splitPoints[i] = parts.get(i);  
        }  
        return splitPoints;  
    }    
}

如上所示,一个自定义的Partitioner只需要实现两个功能:getPartition()和configure()。

(1)getPartition()函数返回一个0到(Reducer数目-1)之间的int值来确定将<key,value>键值对送到哪一个Reducer中。

(2)configure()使用Hadoop Job Configuration来配置partitioner,并读取样本数据。

至此,我们控制了哪些数据发往哪些reducer,且这种控制是有序的控制,在每个reducer中的数据,hadoop会自动实现排序,因此整体上实现了全排序。

以上是整形的全排序,字符串的全排序与此大同小异。

注意:伪分布式reducer的个数只能是0或1,无法设置reducer的个数。

无论字符串排序还是整型排序都是在job启动前先把采样的样本放到SequenceFile中,然后job开始后,读取SequenceFile中的样本数据到一维数组中。之后,

(1)如果是字符串排序,既可以使用字符串比较的方法通过查找区间来定位partition,也可以通过构建2层字典树(terasort中使用的方法)定位partition;

(2)如果是整形排序,就直接按找区间的方法定位partition;

时间: 2024-10-03 21:28:20

Hadoop实现全排序的相关文章

hadoop —— Reducer全排序

目录 一.关于Reducer全排序 1.1. 什么叫全排序 1.2. 分区的标准是什么 二.全排序的三种方式 2.1. 一个Reducer 2.2. 自定义分区函数 2.3. 采样 一.关于Reducer全排序 1.1.什么叫全排序? 在所有的分区(Reducer)中,KEY都是有序的: 正确举例:如Reducer分区1中的KEY是1.3.4,分区2中的key是5.8.9 错误举例:如Reducer分区1中的KEY是1.3.4,分区2中的key是2.7.9 1.2.数据分区的标准是什么? 默认的

9.2.2 hadoop全排序实例详解

1.1.1         全排序 (1)全排序概述 指的是让所有的输出结果都是有序的,最简单的方法就是用一个reduce任务,但是这样处理大型文件时效率极低,失去的并行架构的意义.所以可以采用分组排序的方法来实现全局排序,例如现在要实现按键的全局的排序,可以将键值按照取值范围分为n个分组,<-10℃,-10℃~0℃, 0℃~10℃,>10℃.实现partitioner类,创建4个分区,将温度按照取值范围分类到四个分区中,每个分区进行排序,然后将4个分区结果合并成一个,既是一个全局有序的输出.

hive全排序

全排序 hive的排序关键字是SORT BY,它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序. 1.1.1     例1 set mapred.reduce.tasks=2; 原值 select cookie_id,page_id,id fromc02_clickstat_fatdt1 where cookie_idIN('1.193.131.218.1288611279693.0','1.193.148.164.1288609861509.2

Hadoop---mapreduce排序和二次排序以及全排序

自己学习排序和二次排序的知识整理如下. 1.Hadoop的序列化格式介绍:Writable 2.Hadoop的key排序逻辑 3.全排序 4.如何自定义自己的Writable类型 5.如何实现二次排序 1.Hadoop的序列化格式介绍:Writable 要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式.更多的可能是要关注他的Subinterfaces:WritableComparable<T>.他是继承Writable和Co

五 数据组织模式 4)全排序、混排。

前面讲的 分区.分箱模式 都是不关心数据的顺序. 接下来 全排序.混排序模式 关心的是数据按照指定键进行并行排序. 全排序解释: 排序在顺序结构程序中容易实现, 但是在MapReduce 中,或者说在并行编程中不易实现.这是典型的 "分治法". 每个 reduce 将按照键对他的数据排序,但这种排序并不是全局意义上的排序. 这里想做的是全排序,记录是整个数据集按照顺序排列好的. 作用: 排序号的数据有很多有用的特性,比如时间排序可以提供一个基于时间轴的视图.在一个已排序好的数据集中查找

M/R全排序

例如:1KW数据,200个map,100个reduce. (1)map阶段,每个map分别局部排序,得到200个排好顺序的结果 (2)对所有的数据进行99个抽样s1,s2...s99(按照顺序排列) (3)根据每个map中数据在抽样数据的前后,将每个map划分成(最多)100个部分m1-1,m1-2....m1-100,m2-1,m2-2...m2-100...m200-1,m200-2,m200-100,所有map(最多)2W个部分 (4)这2W个部分每个部分按照抽样和抽样的比较分别放在每个抽

全排序与康拓展开

全排列: n=3 123 132 213 231 312 321 (由1~n组成,且出现一次,从小到大排序(或从大到小排序,或不排序)) 求全排序: I. Dfs1 //全排序没有顺序 #include <iostream> using namespace std; long a[100],n; void swap(long &a,long &b) { long temp; temp=a; a=b; b=temp; } void dfs(long pos) { long i;

全排序之字典排序

字典序全排列算法研究 一. 非递归算法(字典序法) 对给定的字符集中的字符规定了一个先后关系,在此基础上规定两个全排列的先后是从左到右逐个比较对应的字符的先后. 例如:字符集{1,2,3},较小的数字位置较先,这样按字典序生成的全排列是 123,132,213,231,312,321 ※ 一个全排列可看做一个字符串,字符串可有前缀.后缀. 生成给定全排列的下一个排列.所谓一个的下一个就是这一个与下一个之间没有其他的.这就要求这一个与下一个有尽可能长的共同前缀,也即变化限制在尽可能短的后缀上. 对

全排序的两种想法

字典序全排序: 1 import java.util.Arrays; 2 import java.util.Scanner; 3 public class Cao41 { 4 /** 5 * @param 第一行输入个数N,第二行输入序列1-9, 6 * 输出字典序排列 具体做法如下: 首先要将数组a从小到大排序,输出第一个排序,然后进入循环 先用getindex方法找到最靠后的,后面有比啊a[index]大的index,当index不存在设index=-1,结束循环. 然后从 index到en