保存Key/Value对的RDD叫做Pair RDD。
1.创建Pair RDD:
1.1 创建Pair RDD的方式:
很多数据格式在导入RDD时,会直接生成Pair RDD。我们也可以使用map()来将之前讲到的普通RDD转化为Pair RDD。
1.2 Pair RDD转化实例:
下面例子中,把原始RDD,修改成首单词做Key,整行做Value的Pair RDD。
Java中没有tuple类型,所以使用scala的scala.Tuple2类来创建tuple。创建tuple: new Tuple2(elem1,elem2) ; 访问tuple的元素: 使用._1()和._2()方法来访问。
而且,在Python和Scala实现中使用基本的map()函数即可,java需要使用函数mapToPair():
/** * 将普通的基本RDD转化成一个Pair RDD,业务逻辑: 将每一行的首单词作为Key,整个句子作为Value 返回Key/Value PairRDD。 * @param JavaRDD<String> * @return JavaPairRDD<String,String> */ public JavaPairRDD<String,String> firstWordKeyRdd(JavaRDD<String> input){ JavaPairRDD<String,String> pair_rdd = input.mapToPair( new PairFunction<String,String,String>(){ @Override public Tuple2<String, String> call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String,String>(arg0.split(" ")[0],arg0); } } ); return pair_rdd; }
当从内存中的集合创建PairRDD时,Python和Scala需要使用函数SparkContext.parallelize();而Java使用函数SparkContext.parallelizePairs()。
2.Pair RDD的转化操作:
2.1 Pair RDD常见的转化操作列表:
基础RDD使用的转化操作也可以在Pair RDD中使用。因为Pair RDD中使用tuple,所以需要传递操作tuple的函数给Pair RDD.
下表列出Pair RDD常用的转化操作(事例RDD内容:{(1, 2), (3, 4), (3, 6)})
函数名 | 作用 | 调用例子 | 返回结果 |
reduceByKey(func) | Combine values with the same key. | rdd.reduceByKey((x, y) => x + y) | {(1,2),(3,10)} |
groupByKey() | Group values with the same key. | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner,mergeValue, mergeCombiners,partitioner) | Combine values with the same key using a different result type. | ||
mapValues(func) | Apply a function to each value of a pair RDD without changing the key. | rdd.mapValues(x =>x+1) | {(1,3),(3,5),(3,7)} |
flatMapValues(func) |
Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization. |
rdd.flatMapValues(x=> (x to 5) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys() | Return an RDD of just the keys. | rdd.keys() | {1, 3, 3} |
values() | Return an RDD of just the values. | rdd.values() | {2, 4, 6} |
sortByKey() | Return an RDD sorted by the key. | rdd.sortByKey() | {(1,2),(3,4),(3,6)} |
下表列举2个RDD之间的转化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3,9)}):
函数名 | 作用 | 调用例子 | 返回结果 |
subtractByKey | Remove elements with a key present in the other RDD. | rdd.subtractByKey(other) | {(1, 2)} |
join | Perform an inner join between two RDDs. | rdd.join(other) | {(3, (4, 9)),(3, (6, 9))} |
rightOuterJoin | Perform a join between two RDDs where the key must be present in the first RDD. | rdd.rightOuterJoin(other) | {(3,(Some(4),9)), (3,(Some(6),9))} |
leftOuterJoin | Perform a join between two RDDs where the key must be present in the other RDD. | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))} |
cogroup | Group data from both RDDs sharing the same key. | rdd.cogroup(other) | {(1,([2],[])),(3,([4, 6],[9]))} |
2.2 Pair RDD筛选操作:
Pair RDD也还是RDD,所以之前介绍的操作(例如filter)也同样适用于PairRDD。下面程序,筛选长度大于20的行:
/** * PairRDD筛选长度大于20的行。 * @param JavaPairRDD<String,String> * @return JavaPairRDD<String,String> */ public JavaPairRDD<String,String> filterMoreThanTwentyLines (JavaPairRDD<String,String> input){ JavaPairRDD<String,String> filter_rdd = input.filter( new Function<Tuple2<String, String>,Boolean>(){ @Override public Boolean call(Tuple2<String, String> arg0) throws Exception { // TODO Auto-generated method stub return (arg0._2.length()>20); } } ); return filter_rdd; }
2.3 聚合操作: