Hadoop Bloom filter应用示例

Hadoop0.20.2 Bloom filter应用示例

2014-06-04 11:55 451人阅读 评论(0) 收藏 举报

1. 简介

参见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69

2. 案例

网上大部分的说明仅仅是按照《Hadoop in Action》中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter。

案例文件如下:

customers.txt

1,Stephanie Leung,555-555-5555
    2,Edward Kim,123-456-7890
    3,Jose Madriz,281-330-8004
    4,David Stork,408-555-0000

-----------------------------------------------------------------

orders.txt

3,A,12.95,02-Jun-2008
    1,B,88.25,20-May-2008
    2,C,32.00,30-Nov-2007
    3,D,25.02,22-Jan-2009
    5,E,34.59,05-Jan-2010
    6,F,28.67,16-Jan-2008
    7,G,49.82,24-Jan-2009

两个文件通过customer ID关联。

3. 代码

[java] view plaincopy

  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.util.ArrayList;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.FSDataInputStream;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  18. import org.apache.hadoop.util.GenericOptionsParser;
  19. import org.apache.hadoop.util.bloom.BloomFilter;
  20. import org.apache.hadoop.util.bloom.Key;
  21. import org.apache.hadoop.util.hash.Hash;
  22. public class BloomMRMain {
  23. public static class BloomMapper extends Mapper<Object, Text, Text, Text> {
  24. BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH);
  25. protected void setup(Context context) throws IOException ,InterruptedException {
  26. Configuration conf = context.getConfiguration();
  27. String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt";
  28. Path file = new Path(path);
  29. FileSystem hdfs = FileSystem.get(conf);
  30. FSDataInputStream dis = hdfs.open(file);
  31. BufferedReader reader = new BufferedReader(new InputStreamReader(dis));
  32. String temp;
  33. while ((temp = reader.readLine()) != null) {
  34. //              System.out.println("bloom filter temp:" + temp);
  35. String[] tokens = temp.split(",");
  36. if (tokens.length > 0) {
  37. bloomFilter.add(new Key(tokens[0].getBytes()));
  38. }
  39. }
  40. }
  41. protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException {
  42. //获得文件输入路径
  43. String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
  44. if (pathName.contains("customers")) {
  45. String data = value.toString();
  46. String[] tokens = data.split(",");
  47. if (tokens.length == 3) {
  48. String outKey = tokens[0];
  49. String outVal = "0" + ":" + tokens[1] + "," + tokens[2];
  50. context.write(new Text(outKey), new Text(outVal));
  51. }
  52. } else if (pathName.contains("orders")) {
  53. String data = value.toString();
  54. String[] tokens = data.split(",");
  55. if (tokens.length == 4) {
  56. String outKey = tokens[0];
  57. System.out.println("in map and outKey:" + outKey);
  58. if (bloomFilter.membershipTest(new Key(outKey.getBytes()))) {
  59. String outVal = "1" + ":" + tokens[1] + "," + tokens[2]+ "," + tokens[3];
  60. context.write(new Text(outKey), new Text(outVal));
  61. }
  62. }
  63. }
  64. }
  65. }
  66. public static class BloomReducer extends Reducer<Text, Text, Text, Text> {
  67. ArrayList<Text> leftTable = new ArrayList<Text>();
  68. ArrayList<Text> rightTable = new ArrayList<Text>();
  69. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException ,InterruptedException {
  70. leftTable.clear();
  71. rightTable.clear();
  72. for (Text val : values) {
  73. String outVal = val.toString();
  74. System.out.println("key: " + key.toString() + " : " + outVal);
  75. int index = outVal.indexOf(":");
  76. String flag = outVal.substring(0, index);
  77. if ("0".equals(flag)) {
  78. leftTable.add(new Text(outVal.substring(index+1)));
  79. } else if ("1".equals(flag)) {
  80. rightTable.add(new Text(outVal.substring(index + 1)));
  81. }
  82. }
  83. if (leftTable.size() > 0 && rightTable.size() > 0) {
  84. for(Text left : leftTable) {
  85. for (Text right : rightTable) {
  86. context.write(key, new Text(left.toString() + "," + right.toString()));
  87. }
  88. }
  89. }
  90. }
  91. }
  92. public static void main(String[] args) throws Exception {
  93. Configuration conf = new Configuration();
  94. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  95. if (otherArgs.length != 2) {
  96. System.err.println("Usage: BloomMRMain <in> <out>");
  97. System.exit(2);
  98. }
  99. Job job = new Job(conf, "BloomMRMain");
  100. job.setJarByClass(BloomMRMain.class);
  101. job.setMapperClass(BloomMapper.class);
  102. job.setReducerClass(BloomReducer.class);
  103. job.setInputFormatClass(TextInputFormat.class);
  104. job.setOutputFormatClass(TextOutputFormat.class);
  105. job.setMapOutputKeyClass(Text.class);
  106. job.setMapOutputValueClass(Text.class);
  107. job.setOutputKeyClass(Text.class);
  108. job.setOutputValueClass(Text.class);
  109. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  110. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  111. System.exit(job.waitForCompletion(true) ? 0 : 1);
  112. }
  113. }
时间: 2024-10-12 09:03:41

Hadoop Bloom filter应用示例的相关文章

Hadoop0.20.2 Bloom filter应用示例

1. 简介 参见<Hadoop in Action>P102 以及 <Hadoop实战(第2版)>(陆嘉恒)P69 2. 案例 网上大部分的说明仅仅是按照<Hadoop in Action>中的示例代码给出,这里是Hadoop0.20.2版本,在该版本中已经实现了BloomFilter. 案例文件如下: customers.txt 1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz

Bloom Filter布隆过滤器

http://blog.csdn.net/pipisorry/article/details/64127666 Bloom Filter简介 Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合.布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的.它实际上是一个很长的二进制向量和一系列随机映射函数.布隆过滤器可以用于检索一个元素是否在一个集合中.它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定

Hadoop0.20.2 Bloom filter应用演示样例

1. 简单介绍 參见<Hadoop in Action>P102 以及 <Hadoop实战(第2版)>(陆嘉恒)P69 2. 案例 网上大部分的说明不过依照<Hadoop in Action>中的演示样例代码给出.这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter. 案例文件例如以下: customers.txt 1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose

布隆过滤器(Bloom Filter)的原理和实现

什么情况下需要布隆过滤器? 先来看几个比较常见的例子 字处理软件中,需要检查一个英语单词是否拼写正确 在 FBI,一个嫌疑人的名字是否已经在嫌疑名单上 在网络爬虫里,一个网址是否被访问过 yahoo, gmail等邮箱垃圾邮件过滤功能 这几个例子有一个共同的特点: 如何判断一个元素是否存在一个集合中? 常规思路 数组 链表 树.平衡二叉树.Trie Map (红黑树) 哈希表 虽然上面描述的这几种数据结构配合常见的排序.二分搜索可以快速高效的处理绝大部分判断元素是否存在集合中的需求.但是当集合里

[转载] 布隆过滤器(Bloom Filter)详解

转载自http://www.cnblogs.com/haippy/archive/2012/07/13/2590351.html   布隆过滤器[1](Bloom Filter)是由布隆(Burton Howard Bloom)在1970年提出的.它实际上是由一个很长的二进制向量和一系列随机映射函数组成,布隆过滤器可以用于检索一个元素是否在一个集合中.它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率(假正例False positives,即Bloom Filter报告某一

redis 和 bloom filter

今天打算使用redis 的bitset搞一个 bloom filter, 这样的好处是可以节省内存,坏处是可能在会有一些数据因为提示重复而无法保存. bloom filter 的大体原理就是通过不同的hash函数将一个字符串映射到几个不同的位,并将这几个不同的位设置为1. 如果在查找某个字符串的时候,发现通过hash映射后的位有的不为1,说明该字符串不存在. 如果发现所有的位都为1,那该字符串有一定的概率不存在,通常这个概率会很小. 相关内容可以查看: http://olylakers.itey

Bloom Filter(布隆过滤器)

布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的.它实际上是一个很长的二进制矢量和一系列随机映射函数.布隆过滤器可以用于检索一个元素是否在一个集合中.它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难. 如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定.链表.树.散列表(又叫哈希表,Hash table)等等数据结构都是这种思路.但是随着集合中元素的增加,我们需要的存储空间越来越大.同时检索速度也越来

Bloom Filter算法

<?php /*Bloom Filter算法来去重过滤. 介绍下Bloom Filter的基本处理思路:申请一批空间用于保存0 1信息,再根据一批哈希函数确定元素对应的位置,如果每个哈希函数对应位置的值为全部1,说明此元素存在.相反,如果为0,则要把对应位置的值设置为1.由于不同的元素可能会有相同的哈希值,即同一个位置有可能保存了多个元素的信息,从而导致存在一定的误判率. 如果申请空间太小,随着元素的增多,1会越来越多,各个元素冲突的机会越来越来大,导致误判率会越来越大.另外哈希函数的选择及个数

Bloom filter的实现以及常用的hash函数

bloom filter利用时间换空间的思想,利用多个哈希函数,将一个元素的存在状态映射到多个bit中,特别是在网络环境中,BF具有广泛的用途,关键问题就是要减少false positive rate(可以设置参数来调节),扩展有 counting BF.这里选用的hash函数是表现较好的 BKDRHash , SDBMHash, DJBHash . Bloom-filter代码: bloom_filter.h #ifndef __BLOOM_FILTER_H__ #define __BLOOM