Hadoop0.20.2 Bloom filter应用示例

1. 简介

参见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69

2. 案例

网上大部分的说明仅仅是按照《Hadoop in Action》中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter。

案例文件如下:

customers.txt

1,Stephanie Leung,555-555-5555

2,Edward Kim,123-456-7890

3,Jose Madriz,281-330-8004

4,David Stork,408-555-0000

-----------------------------------------------------------------

orders.txt

3,A,12.95,02-Jun-2008

1,B,88.25,20-May-2008

2,C,32.00,30-Nov-2007

3,D,25.02,22-Jan-2009

5,E,34.59,05-Jan-2010

6,F,28.67,16-Jan-2008

7,G,49.82,24-Jan-2009

两个文件通过customer ID关联。

3. 代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class BloomMRMain {
	public static class BloomMapper extends Mapper<Object, Text, Text, Text> {
		BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH);

		protected void setup(Context context) throws IOException ,InterruptedException {
			Configuration conf = context.getConfiguration();

			String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt";
			Path file = new Path(path);

			FileSystem hdfs = FileSystem.get(conf);
			FSDataInputStream dis = hdfs.open(file);
			BufferedReader reader = new BufferedReader(new InputStreamReader(dis));
			String temp;
			while ((temp = reader.readLine()) != null) {
//				System.out.println("bloom filter temp:" + temp);
				String[] tokens = temp.split(",");
				if (tokens.length > 0) {
					bloomFilter.add(new Key(tokens[0].getBytes()));
				}
			}
		}

		protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException {
			//获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            if (pathName.contains("customers")) {
            	String data = value.toString();
            	String[] tokens = data.split(",");
            	if (tokens.length == 3) {
            		String outKey = tokens[0];
            		String outVal = "0" + ":" + tokens[1] + "," + tokens[2];
            		context.write(new Text(outKey), new Text(outVal));
            	}
            } else if (pathName.contains("orders")) {
            	String data = value.toString();
            	String[] tokens = data.split(",");
            	if (tokens.length == 4) {
            		String outKey = tokens[0];
            		System.out.println("in map and outKey:" + outKey);
            		if (bloomFilter.membershipTest(new Key(outKey.getBytes()))) {
            			String outVal = "1" + ":" + tokens[1] + "," + tokens[2]+ "," + tokens[3];
            			context.write(new Text(outKey), new Text(outVal));
            		}
            	}
            }
		}
	}

	public static class BloomReducer extends Reducer<Text, Text, Text, Text> {
		ArrayList<Text> leftTable = new ArrayList<Text>();
		ArrayList<Text> rightTable = new ArrayList<Text>();

		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException ,InterruptedException {

			 leftTable.clear();
	         rightTable.clear();

			for (Text val : values) {
				String outVal = val.toString();
				System.out.println("key: " + key.toString() + " : " + outVal);
				int index = outVal.indexOf(":");
				String flag = outVal.substring(0, index);
				if ("0".equals(flag)) {
					leftTable.add(new Text(outVal.substring(index+1)));
				} else if ("1".equals(flag)) {
					rightTable.add(new Text(outVal.substring(index + 1)));
				}
			}

			if (leftTable.size() > 0 && rightTable.size() > 0) {
				for(Text left : leftTable) {
					for (Text right : rightTable) {
						context.write(key, new Text(left.toString() + "," + right.toString()));
					}
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

	    if (otherArgs.length != 2) {
	      System.err.println("Usage: BloomMRMain <in> <out>");
	      System.exit(2);
	    }	    

	    Job job = new Job(conf, "BloomMRMain");
	    job.setJarByClass(BloomMRMain.class);

	    job.setMapperClass(BloomMapper.class);
	    job.setReducerClass(BloomReducer.class);

	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(TextOutputFormat.class);

	    job.setMapOutputKeyClass(Text.class);
	    job.setMapOutputValueClass(Text.class);

	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);	

	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Hadoop0.20.2 Bloom filter应用示例

时间: 2024-10-28 13:39:56

Hadoop0.20.2 Bloom filter应用示例的相关文章

Hadoop0.20.2 Bloom filter应用演示样例

1. 简单介绍 參见<Hadoop in Action>P102 以及 <Hadoop实战(第2版)>(陆嘉恒)P69 2. 案例 网上大部分的说明不过依照<Hadoop in Action>中的演示样例代码给出.这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter. 案例文件例如以下: customers.txt 1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose

Hadoop Bloom filter应用示例

Hadoop0.20.2 Bloom filter应用示例 2014-06-04 11:55 451人阅读 评论(0) 收藏 举报 1. 简介 参见<Hadoop in Action>P102 以及 <Hadoop实战(第2版)>(陆嘉恒)P69 2. 案例 网上大部分的说明仅仅是按照<Hadoop in Action>中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter. 案例文件如下: customers.txt 1,St

Bloom Filter布隆过滤器

http://blog.csdn.net/pipisorry/article/details/64127666 Bloom Filter简介 Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合.布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的.它实际上是一个很长的二进制向量和一系列随机映射函数.布隆过滤器可以用于检索一个元素是否在一个集合中.它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定

Bloom filter的实现以及常用的hash函数

bloom filter利用时间换空间的思想,利用多个哈希函数,将一个元素的存在状态映射到多个bit中,特别是在网络环境中,BF具有广泛的用途,关键问题就是要减少false positive rate(可以设置参数来调节),扩展有 counting BF.这里选用的hash函数是表现较好的 BKDRHash , SDBMHash, DJBHash . Bloom-filter代码: bloom_filter.h #ifndef __BLOOM_FILTER_H__ #define __BLOOM

bloom filter 详解[转]

Bloom Filter概念和原理 焦萌 2007年1月27日 Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合.Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive).因此,Bloom Filter不适合那些“零错误”的应用场合.而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空

bloom filter与dawgdic(一种trie树)

我有一个做了一款移动浏览器的朋友. 他有这样一个需求:当用户输入一个网站的url时候,移动浏览器需要识别这个网址是否是一个恶意网址.另外,他有一个恶意网址库. 也许这样的解决方法有多种. 其中一种就是把恶意网址库放在本地,移动浏览器拿到一个网址的时候就把它与网址库中的每个地址匹配一下,根据匹配与否来判断网址的是否为一个恶意地址. 哦,我忘了补充的情况就是这个网址库中有150万条数据,压缩后23M,如果一个浏览器为了识别恶意网址这么一个功能而附加这么大的库,你会没有用户的. 我刚开始给出的解决方法

基于BitSet的布隆过滤器(Bloom Filter)

布隆过滤器 Bloom Filter 是由Howard Bloom 在 1970 年提出的二进制向量数据结构,它具有很好的空间和时间效率,被用来检测一个元素是不是集合中的一个成员.如果检测结果为是,该元素不一定在集合中:但如果检测结果为否,该元素一定不在集合中.因此Bloom filter具有100%的召回率.这样每个检测请求返回有"在集合内(可能错误)"和"不在集合内(绝对不在集合内)"两种情况,可见 Bloom filter 是牺牲了正确率和时间以节省空间. 当

Bloom Filter概念和原理

Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合.Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive).因此,Bloom Filter不适合那些“零错误”的应用场合.而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省. 集合表示和元素查询 下面我们具体来看Bloom

笔试算法题(43):布隆过滤器(Bloom Filter)

议题:布隆过滤器(Bloom Filter) 分析: BF由一个很长的二进制向量和一系列随机映射的函数组成,通过多个Hash函数将一个元素映射到一个Bit Array中的多个点,查询的时候仅当所有的映射点都为1才能判断元素存在于集合内:BF用于检索一个元素是否在一个集合中,记忆集合求交集:优点是空间 和时间效率都超过一般查询算法,缺点是有一定的误判概率和删除困难: 如下图,使用三个哈希函数对每个元素进行映射,这样每个元素对应HashTable中的三个位置,如果查找w是否在HashTable中则仍