二次排序工具类:
import java.io.Serializable; import scala.math.Ordered; /** * @author 作者 E-mail: * @version 创建时间:2017年8月30日 下午3:48:11 * 类说明 */ //二次排序key public class SecondeIndexSort implements Ordered<SecondeIndexSort>, Serializable{ private static final long serialVersionUID = -2366006422945129991L; // 首先在自定义key里面,定义需要进行排序的列 private int first; private int second; public SecondeIndexSort(int first, int second) { this.first = first; this.second = second; } @Override public boolean $greater(SecondeIndexSort other) { if(this.first > other.getFirst()) { return true; } else if(this.first == other.getFirst() && this.second > other.getSecond()) { return true; } return false; } @Override public boolean $greater$eq(SecondeIndexSort other) { if(this.$greater(other)) { return true; } else if(this.first == other.getFirst() && this.second == other.getSecond()) { return true; } return false; } @Override public boolean $less(SecondeIndexSort other) { if(this.first < other.getFirst()) { return true; } else if(this.first == other.getFirst() && this.second < other.getSecond()) { return true; } return false; } @Override public boolean $less$eq(SecondeIndexSort other) { if(this.$less(other)) { return true; } else if(this.first == other.getFirst() && this.second == other.getSecond()) { return true; } return false; } @Override public int compare(SecondeIndexSort other) { if(this.first - other.getFirst() != 0) { return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } @Override public int compareTo(SecondeIndexSort other) { if(this.first - other.getFirst() != 0) { return this.first - other.getFirst(); } else { return this.second - other.getSecond(); } } // 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法 public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondeIndexSort other = (SecondeIndexSort) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * @author 作者 E-mail: * @version 创建时间:2017年8月30日 下午4:26:10 * 类说明 */ public class SortIndexByKey { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("sortAction").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> tf = sc.textFile("G://122.txt"); JavaPairRDD<SecondeIndexSort, String> mapToPair = tf.mapToPair(new PairFunction<String, SecondeIndexSort, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<SecondeIndexSort, String> call(String paramT) throws Exception { String[] lines = paramT.trim().split(" "); System.out.println("keys..."+Integer.valueOf(lines[0])); System.out.println("keys1..."+Integer.valueOf(lines[1])); SecondeIndexSort keys = new SecondeIndexSort( Integer.valueOf(lines[0]), Integer.valueOf(lines[1])); return new Tuple2<SecondeIndexSort, String>(keys, paramT); } }); JavaPairRDD<SecondeIndexSort, String> sortByKey = mapToPair.sortByKey(); JavaRDD<String> map = sortByKey.map(new Function<Tuple2<SecondeIndexSort,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<SecondeIndexSort, String> paramT1) throws Exception { return paramT1._2; } }); map.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String paramT) throws Exception { System.out.println("..."+paramT); } }); sc.close(); } }
测试数据:
1 4
3 4
4 6
4 9
4 18
6 6
5 9
8 9
8 9
4 6
时间: 2024-10-03 13:38:43