背景:一个文件内有多条数据记录,每条记录为一行,记录按时间字段升序排序。
需求1:将多个这样的文件合并成一个按时间排序的文件
需求2:将一个按数据记录时间字段排好序的大文件分割成几个小文件
代码:
1 import java.io.BufferedReader; 2 import java.io.BufferedWriter; 3 import java.io.File; 4 import java.io.FileInputStream; 5 import java.io.FileOutputStream; 6 import java.io.FileReader; 7 import java.io.FileWriter; 8 import java.io.IOException; 9 import java.io.InputStreamReader; 10 import java.io.LineNumberReader; 11 import java.io.OutputStreamWriter; 12 import java.util.ArrayList; 13 import java.util.concurrent.atomic.AtomicInteger; 14 15 /** 16 * 假定个文件内的数据有序 17 * 18 * @author zsm 19 * @date 2017年3月9日 下午2:50:26 20 */ 21 public class Main_MultiFileMergeSort { 22 23 public static void main(String[] args) throws IOException { 24 // TODO Auto-generated method stub 25 if (args.length == 4 && Integer.parseInt(args[0]) == 1) {// merge sort 26 int threadNum = Integer.parseInt(args[1]); 27 String fileParentPath = args[2]; 28 String containedStr = args[3]; 29 30 FileSort fileSort = new FileSort(true); 31 System.out.println("file mergeing..."); 32 long startTime = System.currentTimeMillis(); 33 34 String resultFileName = fileSort.mergeSort(threadNum, fileParentPath, containedStr); 35 36 System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms"); 37 System.out.println("resultFileName: " + resultFileName + ", is sorted correct: " 38 + FileSort.isAscendingOrder(fileParentPath, resultFileName)); 39 } else if (args.length == 4 && Integer.parseInt(args[0]) == 2) {// file split 40 String fileParentPath = args[1]; 41 String srcFileName = args[2]; 42 int splitedFileNum = Integer.parseInt(args[3]); 43 44 System.out.println("file spliting..."); 45 long startTime = System.currentTimeMillis(); 46 47 FileSort.splitFile(fileParentPath, srcFileName, false, splitedFileNum); 48 49 System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms"); 50 } else { 51 System.out.println("\n*************"); 52 System.out.println("arguments of merge sort operation: 1 threadNum fileParentPath containedStr"); 53 System.out.println("arguments of file split operation: 2 fileParentPath srcFileName splitedFileNum"); 54 System.out.println("*************\n"); 55 } 56 } 57 58 public static void fileSplitTest() { 59 String parentPath = "F:/"; 60 System.out.println("file spliting..."); 61 long startTime = System.currentTimeMillis(); 62 63 FileSort.splitFile(parentPath, "17915_main_acttmp.txt", false, 10); 64 65 System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms"); 66 } 67 68 public static void fileSortTest() throws IOException { 69 String parentPath = "F:/2016-11-10"; 70 71 FileSort fileSort = new FileSort(true); 72 System.out.println("file mergeing..."); 73 long startTime = System.currentTimeMillis(); 74 75 String resultFileName = fileSort.mergeSort(4, parentPath, "gps.txt"); 76 77 System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms"); 78 System.out.println("resultFileName: " + resultFileName + ", is sorted correct: " 79 + FileSort.isAscendingOrder(parentPath, resultFileName)); 80 } 81 } 82 83 class FileSort { 84 /** 85 * 是否删除排序过程产生的临时文件 86 */ 87 private boolean isDeleteIntermediateFile; 88 89 /** 90 * 以唯一的数字来作为中间文件的文件名,数字的初始值 91 */ 92 private AtomicInteger count = new AtomicInteger(0); 93 94 public FileSort(boolean isDeleteIntermediateFile) { 95 this.isDeleteIntermediateFile = isDeleteIntermediateFile; 96 } 97 98 /** 99 * 将给定的两个文件合并.<br> 100 * 为了在得到合并结果后删除中间产生的文件时不至于把原始文件也删掉,通过文件名来区别:中间产生的文件的名字包含"_acttmpf",因此原始数据文件不能包含该字符串 101 * 102 * @return 合并后的文件名 103 */ 104 public String mergeSort(String fileParentPath, String srcFileName1, String srcFileName2) { 105 String strForIdentifyIntermediateFile = "_acttmpf"; 106 String tmpOutPutFileName = count.getAndIncrement() + "_" + Thread.currentThread().getName() 107 + strForIdentifyIntermediateFile + ".txt"; 108 try { 109 String tmpOutPutFilePath = fileParentPath + "/" + tmpOutPutFileName; 110 File file1 = new File(fileParentPath + "/" + srcFileName1); 111 File file2 = new File(fileParentPath + "/" + srcFileName2); 112 113 BufferedReader file1BufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file1))); 114 BufferedReader file2BufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file2))); 115 BufferedWriter tmpOutFile = new BufferedWriter( 116 new OutputStreamWriter(new FileOutputStream(tmpOutPutFilePath))); 117 // System.out.println("tmpFile:" + tmpOutPutFilePath); 118 119 String tmpTitle, tmpStr1, tmpStr2; 120 String[] tmpSplitStrs; 121 int tmpGpstime1, tmpGpstime2; 122 tmpTitle = file1BufferedReader.readLine();// 去掉表头,下同 123 file2BufferedReader.readLine(); 124 writeLine(tmpOutFile, tmpTitle); 125 126 tmpStr1 = file1BufferedReader.readLine(); 127 tmpStr2 = file2BufferedReader.readLine(); 128 do { 129 if (tmpStr1 == null || tmpStr2 == null) { 130 break; 131 } else { 132 tmpSplitStrs = tmpStr1.split(","); 133 tmpGpstime1 = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]); 134 tmpSplitStrs = tmpStr2.split(","); 135 tmpGpstime2 = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]); 136 if (tmpGpstime1 < tmpGpstime2) { 137 writeLine(tmpOutFile, tmpStr1); 138 tmpStr1 = file1BufferedReader.readLine(); 139 } else { 140 writeLine(tmpOutFile, tmpStr2); 141 tmpStr2 = file2BufferedReader.readLine(); 142 } 143 } 144 } while (true); 145 if (tmpStr1 != null) { 146 do { 147 writeLine(tmpOutFile, tmpStr1); 148 tmpStr1 = file1BufferedReader.readLine(); 149 } while (tmpStr1 != null); 150 } 151 if (tmpStr2 != null) { 152 do { 153 writeLine(tmpOutFile, tmpStr2); 154 tmpStr2 = file2BufferedReader.readLine(); 155 } while (tmpStr2 != null); 156 } 157 158 file1BufferedReader.close(); 159 file2BufferedReader.close(); 160 tmpOutFile.close(); 161 162 if (isDeleteIntermediateFile) { 163 // 删除中间产生的文件 164 if ((srcFileName1.indexOf(strForIdentifyIntermediateFile) != -1) && file1.exists()) { 165 file1.delete(); 166 } 167 if ((srcFileName2.indexOf(strForIdentifyIntermediateFile) != -1) && file2.exists()) { 168 file2.delete(); 169 } 170 } 171 172 } catch (IOException e) { 173 // TODO Auto-generated catch block 174 e.printStackTrace(); 175 } 176 return tmpOutPutFileName; 177 } 178 179 /** 180 * 将给定的多个文件合并 181 * 182 * @param fileParentPath 183 * 被排序文件所在目录的路径 184 * @param fileNameList 185 * 文件名数组 186 * @param posStart 187 * @param posEnd 188 * 文件名数组中[posStart,posEnd]间的文件才会参与合并排序 189 * @return 最终排好序的文件的文件名 190 */ 191 public String mergeSort(String fileParentPath, String[] fileNameList, int posStart, int posEnd) { 192 if (fileNameList == null || posStart < 0 || posEnd >= fileNameList.length || posStart > posEnd) { 193 System.err.println("error:one of the following condition is satified:"); 194 System.err 195 .println("fileNameList == null || posStart<0 || posEnd >= fileNameList.length || posStart>posEnd"); 196 return null; 197 } else if (posEnd - posStart == 0) {// 对一个文件排序 198 return fileNameList[posStart]; 199 } else if (posEnd - posStart == 1) {// 对两个文件排序 200 return mergeSort(fileParentPath, fileNameList[posStart], fileNameList[posEnd]); 201 } else { 202 int posMid = (posStart + posEnd) / 2; 203 String srcFileName1 = mergeSort(fileParentPath, fileNameList, posStart, posMid); 204 String srcFileName2 = mergeSort(fileParentPath, fileNameList, posMid + 1, posEnd); 205 return mergeSort(fileParentPath, srcFileName1, srcFileName2); 206 } 207 } 208 209 /** 210 * 对给定目录的所有文件进行合并排序,要求该目录下都为文件,不能有目录 211 * 212 * @param fileParentPath 213 * 被排序文件所在目录的路径 214 * @return 最终排好序的文件的文件名 215 */ 216 public String mergeSort(String fileParentPath) { 217 File[] fileList = new File(fileParentPath).listFiles(); 218 String[] fileNameList = new String[fileList.length]; 219 System.out.println(fileNameList.length + " files in " + fileParentPath); 220 for (int i = 0; i < fileNameList.length; i++) { 221 fileNameList[i] = fileList[i].getName(); 222 // System.out.println(fileNameList[i]); 223 } 224 return mergeSort(fileParentPath, fileNameList, 0, fileNameList.length - 1); 225 } 226 227 /** 228 * 对文件名能被正则条件匹配的文件进行排序 229 * 230 * @param fileParentPath 231 * 被排序文件所在目录的路径 232 * @param containedStr 233 * 文件名包含此字符串的文件才会加入排序 234 * @return 最终排好序的文件的文件名 235 */ 236 public String mergeSort(String fileParentPath, String containedStr) { 237 String[] fileNameList = getMatchedFileNames(fileParentPath, containedStr); 238 return mergeSort(fileParentPath, fileNameList, 0, fileNameList.length - 1); 239 } 240 241 /** 242 * 用多线程对文件名能被正则条件匹配的文件进行排序 243 * 244 * @param threadNum 245 * 线程数 246 * @param fileParentPath 247 * 被排序文件所在目录的路径 248 * @param containedStr 249 * 文件名包含此字符串的文件才会加入排序 250 * @return 最终排好序的文件的文件名 251 */ 252 public String mergeSort(int threadNum, String fileParentPath, String containedStr) { 253 254 String[] fileNameList = getMatchedFileNames(fileParentPath, containedStr); 255 256 if (threadNum > 1 && fileNameList.length > 2) {// 待合并文件至少3个且线程数至少2个时才用多线程 257 // 分多个线程进行合并 258 SortThread[] sortThread = new SortThread[threadNum]; 259 int fileCountPerThread = fileNameList.length / threadNum; 260 int tmpPosStart, tmpPosEnd; 261 for (int i = 0; i < threadNum; i++) { 262 tmpPosStart = i * fileCountPerThread; 263 tmpPosEnd = (i == threadNum - 1) ? (fileNameList.length - 1) : (tmpPosStart + fileCountPerThread - 1); 264 sortThread[i] = new SortThread(isDeleteIntermediateFile, fileParentPath, fileNameList, tmpPosStart, 265 tmpPosEnd); 266 sortThread[i].start(); 267 } 268 // 等各线程操作完成 269 for (int i = 0; i < threadNum; i++) { 270 try { 271 sortThread[i].join(); 272 } catch (InterruptedException e) { 273 // TODO Auto-generated catch block 274 e.printStackTrace(); 275 } 276 } 277 // 获得每个线程合并成的文件名 278 fileNameList = new String[threadNum]; 279 for (int i = 0; i < threadNum; i++) { 280 fileNameList[i] = sortThread[i].getResultFileName(); 281 } 282 } 283 284 // 将每个线程合并产生的文件合并 285 return mergeSort(fileParentPath, fileNameList, 0, fileNameList.length - 1); 286 } 287 288 class SortThread extends Thread { 289 private boolean isDeleteIntermediateFile; 290 private String fileParentPath; 291 private String[] fileNameList; 292 private int posStart; 293 private int posEnd; 294 295 private String resultFileName; 296 297 public SortThread(boolean isDeleteIntermediateFile, String fileParentPath, String[] fileNameList, int posStart, 298 int posEnd) { 299 super(); 300 this.isDeleteIntermediateFile = isDeleteIntermediateFile; 301 this.fileParentPath = fileParentPath; 302 this.fileNameList = fileNameList; 303 this.posStart = posStart; 304 this.posEnd = posEnd; 305 } 306 307 @Override 308 public void run() { 309 // TODO Auto-generated method stub 310 System.out.println(Thread.currentThread().getName() + ": [" + posStart + "," + posEnd + "]"); 311 this.resultFileName = (new FileSort(isDeleteIntermediateFile)).mergeSort(fileParentPath, fileNameList, 312 posStart, posEnd); 313 } 314 315 public String getResultFileName() { 316 return this.resultFileName; 317 } 318 } 319 320 private String[] getMatchedFileNames(String fileParentPath, String containedStr) { 321 // 获取匹配到的文件 322 File[] fileList = new File(fileParentPath).listFiles(); 323 ArrayList<String> selectedFileNameList = new ArrayList<>(); 324 String tmpFileName; 325 for (int i = 0; i < fileList.length; i++) { 326 tmpFileName = fileList[i].getName(); 327 if (fileList[i].isFile() && (tmpFileName.indexOf(containedStr) != -1)) { 328 // System.out.println(tmpFileName); 329 selectedFileNameList.add(tmpFileName); 330 } 331 } 332 System.out.println(selectedFileNameList.size() + " files in " + fileParentPath); 333 if (selectedFileNameList.size() == 0) { 334 System.err.println("no file matched in " + fileParentPath); 335 } 336 // 得到要进行合并排序的文件列表 337 String[] fileNameList = new String[selectedFileNameList.size()]; 338 for (int i = 0; i < fileNameList.length; i++) { 339 fileNameList[i] = selectedFileNameList.get(i); 340 } 341 return fileNameList; 342 } 343 344 private void writeLine(BufferedWriter bufferedWriter, String msg) throws IOException { 345 bufferedWriter.write(msg + "\n"); 346 } 347 348 /** 349 * 判断文件记录是否按gps时间升序排 350 */ 351 public static boolean isAscendingOrder(String fileParentPath, String fileName) throws IOException { 352 if (fileParentPath == null || fileName == null) { 353 return true; 354 } 355 BufferedReader file = new BufferedReader( 356 new InputStreamReader(new FileInputStream(fileParentPath + "/" + fileName))); 357 String tmpStr; 358 String[] tmpSplitStrs; 359 int lastGpstime, curGpstime; 360 tmpStr = file.readLine();// 略过表头 361 tmpStr = file.readLine();// 读第一行 362 363 if (tmpStr == null) { 364 return false; 365 } 366 367 tmpSplitStrs = tmpStr.split(","); 368 lastGpstime = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]); 369 while ((tmpStr = file.readLine()) != null) { 370 tmpSplitStrs = tmpStr.split(","); 371 curGpstime = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]); 372 if (lastGpstime > curGpstime) { 373 return false; 374 } else { 375 lastGpstime = curGpstime; 376 } 377 } 378 return true; 379 } 380 381 /** 382 * 文件分裂成多个 383 */ 384 public static void splitFile(String fileParentPath, String srcFileName, boolean isDeleteSrcFile, 385 int splitedFileNum) { 386 if (splitedFileNum < 1) { 387 System.err.println("splitedFileNum " + splitedFileNum + " is less than 1"); 388 return; 389 } 390 File srcFile = new File(fileParentPath + "/" + srcFileName); 391 // 获取总行数 392 try { 393 int srcFileLines = getFileLines(srcFile); 394 if (srcFileLines < splitedFileNum) { 395 System.out.println("splitedFileNum " + splitedFileNum + " is set to be srcFileLines " + srcFileLines); 396 splitedFileNum = srcFileLines; 397 } 398 399 // 分割文件 400 String srcFileNameWithoutExtension = srcFileName.substring(0, srcFileName.indexOf(‘.‘)); 401 String srcFileExtension = srcFileName.substring(srcFileName.indexOf(‘.‘)); 402 int splitedFileLines = srcFileLines / splitedFileNum; 403 BufferedReader br = new BufferedReader(new FileReader(srcFile)); 404 System.out.println(srcFileLines + " lines are splited into " + splitedFileNum + " files, each with " 405 + splitedFileLines + " lines."); 406 String tmpLine; 407 for (int i = 0; i < splitedFileNum; i++) { 408 BufferedWriter bw = new BufferedWriter(new FileWriter( 409 fileParentPath + "/" + srcFileNameWithoutExtension + "_splited_" + i + srcFileExtension)); 410 for (int j = 0; j < splitedFileLines; j++) { 411 tmpLine = br.readLine(); 412 if (tmpLine != null) { 413 bw.write(tmpLine + "\n"); 414 } 415 } 416 if (i == splitedFileNum - 1) {// 平均分下去还多出的行都写到最后一个文件里 417 while ((tmpLine = br.readLine()) != null) { 418 bw.write(tmpLine + "\n"); 419 } 420 } 421 bw.flush(); 422 bw.close(); 423 } 424 br.close(); 425 if (isDeleteSrcFile && srcFile.exists()) { 426 srcFile.delete(); 427 } 428 } catch (IOException e) { 429 e.printStackTrace(); 430 } 431 } 432 433 /** 434 * 获取文件总行数 435 * 436 * @throws IOException 437 */ 438 public static int getFileLines(File srcFile) throws IOException { 439 LineNumberReader reader = new LineNumberReader(new FileReader(srcFile)); 440 reader.skip(Long.MAX_VALUE); 441 int srcFileLines = reader.getLineNumber() + 1; 442 reader.close(); 443 return srcFileLines; 444 } 445 }
时间: 2024-10-25 06:57:54