Hadoop0.20.2 Bloom filter应用演示样例

1. 简单介绍

參见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69

2. 案例

网上大部分的说明不过依照《Hadoop in Action》中的演示样例代码给出。这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter。

案例文件例如以下:

customers.txt

1,Stephanie Leung,555-555-5555

2,Edward Kim,123-456-7890

3,Jose Madriz,281-330-8004

4,David Stork,408-555-0000

-----------------------------------------------------------------

orders.txt

3,A,12.95,02-Jun-2008

1,B,88.25,20-May-2008

2,C,32.00,30-Nov-2007

3,D,25.02,22-Jan-2009

5,E,34.59,05-Jan-2010

6,F,28.67,16-Jan-2008

7,G,49.82,24-Jan-2009

两个文件通过customer ID关联。

3. 代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class BloomMRMain {
	public static class BloomMapper extends Mapper<Object, Text, Text, Text> {
		BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH);

		protected void setup(Context context) throws IOException ,InterruptedException {
			Configuration conf = context.getConfiguration();

			String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt";
			Path file = new Path(path);

			FileSystem hdfs = FileSystem.get(conf);
			FSDataInputStream dis = hdfs.open(file);
			BufferedReader reader = new BufferedReader(new InputStreamReader(dis));
			String temp;
			while ((temp = reader.readLine()) != null) {
//				System.out.println("bloom filter temp:" + temp);
				String[] tokens = temp.split(",");
				if (tokens.length > 0) {
					bloomFilter.add(new Key(tokens[0].getBytes()));
				}
			}
		}

		protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException {
			//获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            if (pathName.contains("customers")) {
            	String data = value.toString();
            	String[] tokens = data.split(",");
            	if (tokens.length == 3) {
            		String outKey = tokens[0];
            		String outVal = "0" + ":" + tokens[1] + "," + tokens[2];
            		context.write(new Text(outKey), new Text(outVal));
            	}
            } else if (pathName.contains("orders")) {
            	String data = value.toString();
            	String[] tokens = data.split(",");
            	if (tokens.length == 4) {
            		String outKey = tokens[0];
            		System.out.println("in map and outKey:" + outKey);
            		if (bloomFilter.membershipTest(new Key(outKey.getBytes()))) {
            			String outVal = "1" + ":" + tokens[1] + "," + tokens[2]+ "," + tokens[3];
            			context.write(new Text(outKey), new Text(outVal));
            		}
            	}
            }
		}
	}

	public static class BloomReducer extends Reducer<Text, Text, Text, Text> {
		ArrayList<Text> leftTable = new ArrayList<Text>();
		ArrayList<Text> rightTable = new ArrayList<Text>();

		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException ,InterruptedException {

			 leftTable.clear();
	         rightTable.clear();

			for (Text val : values) {
				String outVal = val.toString();
				System.out.println("key: " + key.toString() + " : " + outVal);
				int index = outVal.indexOf(":");
				String flag = outVal.substring(0, index);
				if ("0".equals(flag)) {
					leftTable.add(new Text(outVal.substring(index+1)));
				} else if ("1".equals(flag)) {
					rightTable.add(new Text(outVal.substring(index + 1)));
				}
			}

			if (leftTable.size() > 0 && rightTable.size() > 0) {
				for(Text left : leftTable) {
					for (Text right : rightTable) {
						context.write(key, new Text(left.toString() + "," + right.toString()));
					}
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

	    if (otherArgs.length != 2) {
	      System.err.println("Usage: BloomMRMain <in> <out>");
	      System.exit(2);
	    }	    

	    Job job = new Job(conf, "BloomMRMain");
	    job.setJarByClass(BloomMRMain.class);

	    job.setMapperClass(BloomMapper.class);
	    job.setReducerClass(BloomReducer.class);

	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(TextOutputFormat.class);

	    job.setMapOutputKeyClass(Text.class);
	    job.setMapOutputValueClass(Text.class);

	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);	

	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
时间: 2024-10-28 20:59:20

Hadoop0.20.2 Bloom filter应用演示样例的相关文章

Hadoop0.20.2 Bloom filter应用示例

1. 简介 参见<Hadoop in Action>P102 以及 <Hadoop实战(第2版)>(陆嘉恒)P69 2. 案例 网上大部分的说明仅仅是按照<Hadoop in Action>中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter. 案例文件如下: customers.txt 1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz

Java 8 时间日期库的20个使用演示样例

除了lambda表达式,stream以及几个小的改进之外,Java 8还引入了一套全新的时间日期API,在本篇教程中我们将通过几个简单的任务演示样例来学习怎样使用Java 8的这套API.Java对日期,日历及时间的处理一直以来都饱受诟病.尤其是它决定将java.util.Date定义为可改动的以及将SimpleDateFormat实现成非线程安全的. 看来Java已经意识到须要为时间及日期功能提供更好的支持了,这对已经习惯使用Joda时间日期库的社区而言也是件好事. 关于这个新的时间日期库的最

java设计模式演示样例

创建模式 1.工厂方法模式(Factory Method)  将程序中创建对象的操作,单独出来处理,创建一个产品的工厂接口,把实际的工作转移到详细的子类.大大提高了系统扩展的柔性,接口的抽象化处理给相互依赖的对象创建提供了最好的抽象模式. public class TestFactoryMethod { public static void main(String[] args) { AnimalFactory af=new DogFactory(); Animal1 a=af.getAnima

kqueue演示样例

网络server通常都使用epoll进行异步IO处理,而开发人员通常使用mac,为了方便开发.我把自己的handy库移植到了mac平台上. 移植过程中,网上竟然没有搜到kqueue的使用样例.让我吃惊不已.为了让大家不用像我一样再次花费大力气搞定kqueue,我整理了一个简单清晰可执行的kqueue样例,供大家參考. kqueue一共同拥有几个函数: int kqueue(void); //相似epoll_create int kevent(int kq, const struct kevent

C编程规范, 演示样例代码。

/*************************************************************** *Copyright (c) 2014,TianYuan *All rights reserved. * *文件名: standard.h *文件标识: 编程规范演示样例代码 * *当前版本号:V1.0 *作者:wuyq *完毕日期:20140709 * *改动记录1: //改动历史记录.包含改动日期.版本号号.改动人及改动内容等 *改动日期 版本号号 改动人 改动内

TreeSet排序,存储自己定义对象,自己定义比較器演示样例

Set:无序.不能够反复元素. |--HashSet:数据结构是哈希表.线程是非同步的. 保证元素唯一性的原理:推断元素的hashCode值是否同样. 假设同样,还会继续推断元素的equals方法.是否为true. |--TreeSet:能够对Set集合中的元素进行排序. 底层数据结构是二叉树. 保证元素唯一性的根据:compareTo方法return 0. TreeSet排序的第一种方式:让元素自身具备比較性. 元素须要实现Comparable接口,覆盖compareTo方法. 也种方式也成为

Eureka 的 Application Service client的注冊以及执行演示样例

Eureka 服务器架起来了(关于架设步骤參考博客<Linux 下 Eureka 服务器的部署>),如今怎样把我们要负载均衡的服务器(也就是从 Application Client 接收请求并返回一个响应的 Application Service)注冊到 Eureka?本文以一个演示样例介绍 Eureka Application Service 客户端的 Eureka 生命周期(包含启动时的注冊.侍服演示样例.关闭时的取消注冊)情况.相信读完本文之后,读者能够对 Eureka 的 Applic

图标插件--jqplot实现柱状图及饼图,表盘图演示样例

柱状图 在jqPlot图表插件使用说明(一)中,我们已经能够通过jqPlot绘制出比較简单的线形图.通过查看源码.我们也能够看出,线形图是jqPlot默认的图表类型: [javascript] view plaincopy /** * Class: Series * An individual data series object.  Cannot be instantiated directly, but created * by the Plot oject.  Series propert

Cocos2d-x 3.2 Lua演示样例 ClickAndMoveTest(点击移动測试)

Cocos2d-x 3.2 Lua演示样例 ClickAndMoveTest(点击移动測试) 本篇博客介绍Cocos2d-x 3.2Lua演示样例中点击移动的样例,在这个样例你能够得到怎样创建单点触摸的事件和注冊事件监听回调方法. 演示样例代码: --[[ ClickAndMoveTest.lua 点击与移动 ]]-- -- 获取屏幕尺寸 local size = cc.Director:getInstance():getWinSize() local layer = nil -- 层 loca