MapReduce实现等值连接,左外连接,右外连接,全外连接

#测试数据:

# more user.txt(用户ID,用户名)

[java] view plain copy

  1. 1   lavimer
  2. 2   liaozhongmin
  3. 3   liaozemin

#more post.txt(用户ID,帖子ID,标题)

[java] view plain copy

  1. 1   1   java
  2. 1   2   c
  3. 2   3   hadoop
  4. 4   4   hive
  5. 5   5   hbase
  6. 5   6   pig
  7. 5   7   flume

#等值连接结果如下:

[java] view plain copy

  1. 1   lavimer 1   1   java
  2. 1   lavimer 1   2   c
  3. 2   liaozhongmin    2   3   hadoop

#左外连接结果如下:

[java] view plain copy

  1. 1   lavimer 1   1   java
  2. 1   lavimer 1   2   c
  3. 2   liaozhongmin    2   3   hadoop
  4. 3   liaozemin   NULL


#右外连接结果如下:

[java] view plain copy

  1. 1   lavimer 1   1   java
  2. 1   lavimer 1   2   c
  3. 2   liaozhongmin    2   3   hadoop
  4. NULL    4   4   hive
  5. NULL    5   5   hbase
  6. NULL    5   6   pig
  7. NULL    5   7   flume

#全外连接结果如下:

[java] view plain copy

  1. 1   lavimer 1   1   java
  2. 1   lavimer 1   2   c
  3. 2   liaozhongmin    2   3   hadoop
  4. 3   liaozemin   NULL
  5. NULL    4   4   hive
  6. NULL    5   5   hbase
  7. NULL    5   6   pig
  8. NULL    5   7   flume

实现代码如下:

[java] view plain copy

    1. /**
    2. *
    3. * @author 廖钟民
    4. * time : 2015年1月30日下午1:23:36
    5. * @version
    6. */
    7. public class UserPostJoin {
    8. // 定义输入路径
    9. private static final String INPUT_PATH1 = "hdfs://liaozhongmin:9000/user_post_join/user.txt";
    10. private static final String INPUT_PATH2 = "hdfs://liaozhongmin:9000/user_post_join/post.txt";
    11. // 定义输出路径
    12. private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
    13. public static void main(String[] args) {
    14. try {
    15. // 创建配置信息
    16. Configuration conf = new Configuration();
    17. // 创建文件系统
    18. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
    19. // 如果输出目录存在,我们就删除
    20. if (fileSystem.exists(new Path(OUT_PATH))) {
    21. fileSystem.delete(new Path(OUT_PATH), true);
    22. }
    23. // 创建任务
    24. Job job = new Job(conf, UserPostJoin.class.getName());
    25. // 设置连接类型
    26. job.getConfiguration().set("joinType", "allOuter");
    27. // 设置多路径输入
    28. MultipleInputs.addInputPath(job, new Path(INPUT_PATH1), TextInputFormat.class, UserMapper.class);
    29. MultipleInputs.addInputPath(job, new Path(INPUT_PATH2), TextInputFormat.class, PostMapper.class);
    30. //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
    31. job.setMapOutputKeyClass(Text.class);
    32. job.setMapOutputValueClass(UserPost.class);
    33. //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
    34. job.setPartitionerClass(HashPartitioner.class);
    35. job.setNumReduceTasks(1);
    36. //1.4 排序
    37. //1.5 归约
    38. //2.1 Shuffle把数据从Map端拷贝到Reduce端。
    39. //2.2 指定Reducer类和输出key和value的类型
    40. job.setReducerClass(UserPostReducer.class);
    41. job.setOutputKeyClass(Text.class);
    42. job.setOutputValueClass(Text.class);
    43. //2.3 指定输出的路径和设置输出的格式化类
    44. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
    45. job.setOutputFormatClass(TextOutputFormat.class);
    46. // 提交作业 退出
    47. System.exit(job.waitForCompletion(true) ? 0 : 1);
    48. } catch (Exception e) {
    49. e.printStackTrace();
    50. }
    51. }
    52. /**
    53. * 自定义Mapper类用于处理来自user.txt文件的数据
    54. * @author 廖钟民
    55. * time : 2015年1月30日下午1:22:12
    56. * @version
    57. */
    58. public static class UserMapper extends Mapper<LongWritable, Text, Text, UserPost> {
    59. @Override
    60. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {
    61. // 对字符串进行切分
    62. String[] arr = value.toString().split("\t");
    63. // 创建UserId
    64. Text userId = new Text(arr[0]);
    65. // 把结果写出去
    66. context.write(userId, new UserPost("U", value.toString()));
    67. }
    68. }
    69. /**
    70. * 自定义Mapper类用于处理来自post.txt文件的数据
    71. * @author 廖钟民
    72. * time : 2015年1月30日下午1:22:16
    73. * @version
    74. */
    75. public static class PostMapper extends Mapper<LongWritable, Text, Text, UserPost> {
    76. @Override
    77. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {
    78. // 对数据进行切分
    79. String[] arr = value.toString().split("\t");
    80. // 创建用户ID
    81. Text userId = new Text(arr[0]);
    82. context.write(userId, new UserPost("P", value.toString()));
    83. }
    84. }
    85. /**
    86. * 自定义Reducer类用于处理不同Mapper类的输出
    87. * @author 廖钟民
    88. * time : 2015年1月30日下午1:23:05
    89. * @version
    90. */
    91. public static class UserPostReducer extends Reducer<Text, UserPost, Text, Text> {
    92. // 定义List集合用于存放用户
    93. private List<Text> users = new ArrayList<Text>();
    94. private List<Text> posts = new ArrayList<Text>();
    95. // 定义连接类型
    96. private String joinType;
    97. @Override
    98. protected void setup(Reducer<Text, UserPost, Text, Text>.Context context) throws IOException, InterruptedException {
    99. this.joinType = context.getConfiguration().get("joinType");
    100. System.out.println(joinType);
    101. }
    102. /**
    103. * 经过Shuffle后数据会分组,每一组数据都会调用一次reduce()函数
    104. *第一组数据:
    105. *1 lavimer
    106. *1 1   java
    107. *1 2   c
    108. *
    109. *第二组数据:
    110. *2 3   hadoop
    111. *2 liaozhongmin
    112. *
    113. *第三组数据:
    114. *3 liaozemin
    115. *
    116. *第四组数据:
    117. *4 4   hive
    118. *
    119. *第五组数据:
    120. *5 5   hbase
    121. *5 6   pig
    122. *5 7   flume
    123. *
    124. *每一组数据都会调用一次reduce()函数,我们以第一组数据为例进行讲解:
    125. *
    126. *进入reduce函数后,<1,lavimer>会被添加到users集合中
    127. *<1 1   java>和<1  2   c>会被添加到posts集合中
    128. *
    129. *然后是判断当前操作是什么类型的连接,我们以等值连接为例:
    130. *遍历两个集合得到的数据为:
    131. *【1    lavimer    1         1    java】
    132. *【1    lavimer    1         2    c】
    133. *
    134. *这是第一组数据的执行轨迹,其他依次类推就可以得到相关的操作
    135. */
    136. protected void reduce(Text key, Iterable<UserPost> values, Reducer<Text, UserPost, Text, Text>.Context context) throws IOException,
    137. InterruptedException {
    138. // 清空集合
    139. users.clear();
    140. posts.clear();
    141. // 迭代values集合把当前穿进来的某个组中的数据分别添加到对应的集合中
    142. for (UserPost val : values) {
    143. System.out.println("实际值:" + key + "===>" + values.toString());
    144. if (val.getType().equals("U")) {
    145. users.add(new Text(val.getData()));
    146. } else {
    147. posts.add(new Text(val.getData()));
    148. }
    149. }
    150. // 根据joinType关键字做对应的连接操作
    151. if (joinType.equals("innerJoin")) {// 内连接
    152. if (users.size() > 0 && posts.size() > 0) {
    153. for (Text user : users) {
    154. for (Text post : posts) {
    155. context.write(new Text(user), new Text(post));
    156. }
    157. }
    158. }
    159. } else if (joinType.equals("leftOuter")) {// 左外连接
    160. for (Text user : users) {
    161. if (posts.size() > 0) {
    162. for (Text post : posts) {
    163. context.write(new Text(user), new Text(post));
    164. }
    165. } else {
    166. context.write(new Text(user), createEmptyPost());
    167. }
    168. }
    169. } else if (joinType.equals("rightOuter")) {// 右外连接
    170. for (Text post : posts) {
    171. if (users.size() > 0) {
    172. for (Text user : users) {
    173. context.write(new Text(user), new Text(post));
    174. }
    175. } else {
    176. context.write(createEmptyUser(), post);
    177. }
    178. }
    179. } else if (joinType.equals("allOuter")) {// 全外连接
    180. if (users.size() > 0) {
    181. for (Text user : users) {
    182. if (posts.size() > 0) {
    183. for (Text post : posts) {
    184. context.write(new Text(user), new Text(post));
    185. }
    186. } else {
    187. context.write(new Text(user), createEmptyPost());
    188. }
    189. }
    190. } else {
    191. for (Text post : posts) {
    192. if (users.size() > 0) {
    193. for (Text user : users) {
    194. context.write(new Text(user), new Text(post));
    195. }
    196. } else {
    197. context.write(createEmptyUser(), post);
    198. }
    199. }
    200. }
    201. }
    202. }
    203. /**
    204. * 用户为空时用制表符代替
    205. *
    206. * @return
    207. */
    208. private Text createEmptyUser() {
    209. return new Text("NULL");
    210. }
    211. /**
    212. * 帖子为空时用制表符代替
    213. *
    214. * @return
    215. */
    216. private Text createEmptyPost() {
    217. return new Text("NULL");
    218. }
    219. }
    220. }
    221. /**
    222. * 自定义实体类封装两个表的数据
    223. * @author 廖钟民
    224. * time : 2015年1月30日下午1:23:50
    225. * @version
    226. */
    227. class UserPost implements Writable {
    228. // 类型(U表示用户,P表示帖子)
    229. private String type;
    230. private String data;
    231. public UserPost() {
    232. }
    233. public UserPost(String type, String data) {
    234. this.type = type;
    235. this.data = data;
    236. }
    237. public String getType() {
    238. return type;
    239. }
    240. public void setType(String type) {
    241. this.type = type;
    242. }
    243. public String getData() {
    244. return data;
    245. }
    246. public void setData(String data) {
    247. this.data = data;
    248. }
    249. public void write(DataOutput out) throws IOException {
    250. out.writeUTF(this.type);
    251. out.writeUTF(this.data);
    252. }
    253. public void readFields(DataInput in) throws IOException {
    254. this.type = in.readUTF();
    255. this.data = in.readUTF();
    256. }
    257. }
时间: 2024-08-07 08:40:23

MapReduce实现等值连接,左外连接,右外连接,全外连接的相关文章

数据库左连接、右连接、全联接、左外、右外、全外

内联 SELECT*FROMtemployee employees0INNER JOIN tcustomer customer1 ON ( customer1.id = employees0.id ); 左联 SELECT*FROMtemployee employees0LEFT OUTER JOIN tcustomer customer1 ON ( customer1.id = employees0.id ); 右联 SELECT*FROMtemployee employees0RIGHT O

oracle 内连接 左外连接 右外连接的用法,(+)符号用法

1. 内连接很简单 select A.*, B.* from A,B where A.id = B.id select A.*, B.* from A inner join B on A.id = B.id 以上两句是完全等价的 2. 左外连接 select * from emp a left join dept d on a.deptno=d.deptno select * from emp a,dept d where a.deptno=d.deptno(+) 以上两句是完全等价的 3. 右

oracle sql 内连接 左外连接 右外连接 全外连接

1.创建测试表并准备测试数据[email protected]> create table a (a number(1),b number(1),c number(1));[email protected]> create table b (a number(1),d number(1),e number(1));[email protected]> insert into a values(1,1,1);[email protected]> insert into a value

深入理解SQL的四种连接,左外连接,右外连接,内连接,全连接

1.内联接(典型的联接运算,使用像 =  或 <> 之类的比较运算符).包括相等联接和自然联接.     内联接使用比较运算符根据每个表共有的列的值匹配两个表中的行.例如,检索 students和courses表中学生标识号相同的所有行.       2.外联接.外联接可以是左向外联接.右向外联接或完整外部联接.     在 FROM子句中指定外联接时,可以由下列几组关键字中的一组指定:     1)LEFT  JOIN或LEFT OUTER JOIN     左向外联接的结果集包括  LEF

左外连接,右外连接,全外连接

左外连接 用在查询块的from短语中 又称左连接,列出左边所有元组,A left join B on 条件表达式中的on决定了B表中符合条件表达式的数据才保留,不符合的右边字段为null where短语的条件等到外连接结束后才使用,对外连接结果进行过滤 例子: create table t1(c1 int primary key, c2 int); create table t2(cc1 int primary key, cc2 int); insert into t1 values (1,1)

Oracle内连接、外连接、右外连接、全外连接小总结

数据库版本:Oracle 9i 表TESTA,TESTB,TESTC,各有A, B两列 A B 001 10A 002 20A A B 001 10B 003 30B A B 001 10C 004 40C 连接分为两种:内连接与外连接. A.内连接 内连接,即最常见的等值连接,例: SELECT * FROM TESTA,TESTBWHERE TESTA.A=TESTB.A 结果 A B A B 001 10A 001 10B B.外连接 外连接分为左外连接,右外连接和全外连接. 1.  左外

左连接、右连接、交叉连接、全外连接

第一部分.连接查询 一.内连接 内连接查询操作列出与连接条件匹配的数据行,它使用比较运算符比较被连接列的列值.内连接分三种: 1.等值连接:在连接条件中使用等于号(=)运算符比较被连接列的列值,其查询结果中列出被连接表中的所有列,包括其中的重复列. 2.不等连接: 在连接条件使用除等于运算符以外的其它比较运算符比较被连接的列的列值.这些运算符包括>.>=.<=.<.!>.!<和<>. 3.自然连接:在连接条件中使用等于(=)运算符比较被连接列的列值,但它使用

Oracle左连接、右连接、全外连接以及(+)号用法(转)

Oracle  外连接(OUTER JOIN) 左外连接(左边的表不加限制) 右外连接(右边的表不加限制) 全外连接(左右两表都不加限制) 对应SQL:LEFT/RIGHT/FULL OUTER JOIN. 通常省略OUTER关键字, 写成:LEFT/RIGHT/FULL JOIN. 在左连接和右连接时都会以一张A表为基础表,该表的内容会全部显示,然后加上A表和B表匹配的内容. 如果A表的数据在B表中没有记录. 那么在相关联的结果集行中列显示为空值(NULL). 对于外连接, 也可以使用“(+)

Oracle左连接、右连接、全外连接以及(+)号用法

阅读目录 1.准备工作 2.左外连接(LEFT OUTER JOIN/ LEFT JOIN) 3.右外连接(RIGHT OUTER JOIN/RIGHT JOIN) 4.全外连接(FULL OUTER JOIN/FULL JOIN) 1.准备工作 Oracle  外连接(OUTER JOIN)包括以下: 左外连接(左边的表不加限制) 右外连接(右边的表不加限制) 全外连接(左右两表都不加限制) 对应SQL:LEFT/RIGHT/FULL OUTER JOIN. 通常省略OUTER关键字, 写成: