ExternalSort

code:

  1 import java.io.BufferedInputStream;
  2 import java.io.BufferedOutputStream;
  3 import java.io.DataInputStream;
  4 import java.io.DataOutputStream;
  5 import java.io.File;
  6 import java.io.FileInputStream;
  7 import java.io.FileOutputStream;
  8 import java.io.IOException;
  9 import java.util.Arrays;
 10
 11 public class ExternalSort {
 12
 13     private static final int MAX_ARRAY_SIZE = 100000;
 14     private static final int BUFFER_SIZE = 100000;
 15
 16     private static int initSegment(String originalFile, String f1, int segmentSize) {
 17         int numOfSegment = 0;
 18         try (DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(originalFile)));
 19                 DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(f1)));) {
 20             int[] array = new int[segmentSize];
 21             while (in.available() > 0) {
 22                 numOfSegment++;
 23                 int i = 0;
 24                 for (; in.available() > 0 && i < segmentSize; i++)
 25                     array[i] = in.readInt();
 26
 27                 Arrays.sort(array, 0, i);
 28
 29                 for (int j = 0; j < i; j++)
 30                     out.writeInt(array[j]);
 31             }
 32         } catch (Exception e) {
 33             e.printStackTrace();
 34         }
 35
 36         return numOfSegment;
 37     }
 38
 39     private static void copyHalfToF2(DataInputStream f1, DataOutputStream f2, int segmentSize, int numOfSegment) {
 40         try {
 41             for (int i = 0; i < (numOfSegment / 2) * segmentSize; i++)
 42                 f2.writeInt(f1.readInt());
 43         } catch (IOException e) {
 44             e.printStackTrace();
 45         }
 46     }
 47
 48     private static void mergeTwoSegment(DataInputStream f1, DataInputStream f2, DataOutputStream f3, int segmentSize)
 49             throws Exception {
 50         int int1 = f1.readInt(), f1C = 1;
 51         int int2 = f2.readInt(), f2C = 1;
 52
 53         while (true) {
 54             if (int1 < int2) {
 55                 f3.writeInt(int1);
 56                 if (f1.available() == 0 || f1C++ >= segmentSize) {
 57                     f3.writeInt(int2);
 58                     break;
 59                 }
 60                 int1 = f1.readInt();
 61             } else {
 62                 f3.writeInt(int2);
 63                 if (f2.available() == 0 || f2C++ >= segmentSize) {
 64                     f3.writeInt(int1);
 65                     break;
 66                 }
 67                 int2 = f2.readInt();
 68             }
 69         }
 70
 71         while (f1.available() > 0 && f1C++ < segmentSize)
 72             f3.writeInt(f1.readInt());
 73
 74         while (f2.available() > 0 && f2C++ < segmentSize)
 75             f3.writeInt(f2.readInt());
 76     }
 77
 78     private static void mergeSegment(DataInputStream f1, DataInputStream f2, DataOutputStream f3, int segmentSize,
 79             int numOfSegment) throws Exception {
 80         for (int i = 0; i < numOfSegment; i++)
 81             mergeTwoSegment(f1, f2, f3, segmentSize);
 82
 83         while (f1.available() > 0)
 84             f3.writeInt(f1.readInt());
 85     }
 86
 87     private static void mergeOneStep(String f1, String f2, String f3, int segmentSize, int numOfSegment)
 88             throws Exception {
 89         DataInputStream f1In = new DataInputStream(new BufferedInputStream(new FileInputStream(f1), BUFFER_SIZE));
 90         DataOutputStream f2Out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(f2), BUFFER_SIZE));
 91
 92         copyHalfToF2(f1In, f2Out, segmentSize, numOfSegment);
 93         f2Out.close();
 94
 95         DataInputStream f2In = new DataInputStream(new BufferedInputStream(new FileInputStream(f2), BUFFER_SIZE));
 96         DataOutputStream f3Out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(f3), BUFFER_SIZE));
 97
 98         mergeSegment(f1In, f2In, f3Out, segmentSize, numOfSegment >>> 1);
 99
100         f1In.close();
101         f2In.close();
102         f3Out.close();
103     }
104
105     private static void merge(String f1, String f2, String f3, String targetFile, int segmentSize, int numOfSegment)
106             throws Exception {
107         if (numOfSegment > 1) {
108             mergeOneStep(f1, f2, f3, segmentSize, numOfSegment);
109             merge(f1, f2, f3, targetFile, segmentSize * 2, (numOfSegment + 1) >>> 1);
110         } else {
111             File sortedFile = new File(targetFile);
112             if (sortedFile.exists())
113                 sortedFile.delete();
114             new File(f1).renameTo(sortedFile);
115         }
116     }
117
118     public static void sort(String originalFile, String targetFile) throws Exception {
119         int numOfSegment = initSegment(originalFile, "f1.dat", MAX_ARRAY_SIZE);
120
121         merge("f1.dat", "f2.dat", "f3.dat", targetFile, MAX_ARRAY_SIZE, numOfSegment);
122     }
123
124 }
时间: 2024-10-12 12:47:23

ExternalSort的相关文章

spark 性能优化

1.内存 spark.storage.memoryFraction:很明显,是指spark缓存的大小,默认比例0.6 spark.shuffle.memoryFraction:管理executor中RDD和运行任务时的用于对象创建内存比例,默认0.2 关于这两个参数的设置,常见的一个场景就是操作关系数据库 spark 可以通过jdbc操作关系数据库,但是若是没有分散数据的依据,则将所有数据都读到driver节点上时,这时,强烈建议先看一下表的数据量和集群中对spark的内存设置参数 假设 exe

Homework 2: UDF Caching in Spark

为spark编写UDF cache: 作业介绍 https://github.com/cs186-spring15/course/tree/master/hw2 我花了点时间做了下,觉得是学习spark sql和scala的好材料.现在把我写的作业记录如下: Task #1: Implementing DiskPartition and GeneralDiskHashedRelation Task #2: Implementing object DiskHashedRelation DiskPa