MapReduce -- TF-IDF

通过MapReduce实现 TF-IDF值的统计

数据:文章ID  文件内容

3823890378201539    今天约了姐妹去逛街吃美食,周末玩得很开心啊!
......
......

结果数据:

3823890378201539    开心:0.28558719539400335    吃:0.21277211221173534    了:0.1159152517783012    美食:0.29174432675350614    去:0.18044286652763497    玩:0.27205714412756765    啊:0.26272169358877784    姐妹:0.3983823545319593    逛街:0.33320559604063593    得很:0.45170136842118586    周末:0.2672478858982343    今天:0.16923426566752778    约:0.0946874743049455
......
......

在整个的处理过程中通过两步来完成

第一步主要生成三种格式的文件

1、使用分词工具将文章内容进行拆分成多个词条;并记录文章的总词条数 关于分词工具的使用请参考  TF-IDF
第一步处理后结果:

今天_3823890378201539    A:1,B:13,
周末_3823890378201539    A:1,B:13,
得很_3823890378201539    A:1,B:13,
约_3823890378201539    B:13,A:1,
......

2、记录词条在多少篇文章中出现过

处理后结果:

今天    118
周末    33
约    311
......

3、记录文章总数

处理后结果:

counter    1065

第二步将文件2,3的内容加载到缓存,利用2,3文件的内容对文件1的内容通过mapreduce进行计算

针对数据量不是很大的数据可以加载到缓存,如果数据量过大,不考虑这种方式;

源码

Step1.java:

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.Text;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Mapper;
  8 import org.apache.hadoop.mapreduce.Reducer;
  9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 import org.wltea.analyzer.core.IKSegmenter;
 12 import org.wltea.analyzer.core.Lexeme;
 13
 14 import java.io.IOException;
 15 import java.io.StringReader;
 16 import java.util.HashMap;
 17 import java.util.Map;
 18 import java.util.Map.Entry;
 19
 20 /**
 21  * Created by Edward on 2016/7/21.
 22  */
 23 public class Step1 {
 24
 25     public static void main(String[] args)
 26     {
 27         //access hdfs‘s user
 28         //System.setProperty("HADOOP_USER_NAME","root");
 29
 30         Configuration conf = new Configuration();
 31         conf.set("fs.defaultFS", "hdfs://node1:8020");
 32
 33         try {
 34             FileSystem fs = FileSystem.get(conf);
 35
 36             Job job = Job.getInstance(conf);
 37             job.setJarByClass(RunJob.class);
 38             job.setMapperClass(MyMapper.class);
 39             job.setReducerClass(MyReducer.class);
 40             job.setPartitionerClass(FilterPartition.class);
 41
 42             //需要指定 map out 的 key 和 value
 43             job.setOutputKeyClass(Text.class);
 44             job.setOutputValueClass(Text.class);
 45
 46             //设置reduce task的数量
 47             job.setNumReduceTasks(4);
 48
 49             FileInputFormat.addInputPath(job, new Path("/test/tfidf/input"));
 50
 51             Path path = new Path("/test/tfidf/output");
 52             if(fs.exists(path))//如果目录存在,则删除目录
 53             {
 54                 fs.delete(path,true);
 55             }
 56             FileOutputFormat.setOutputPath(job, path);
 57
 58             boolean b = job.waitForCompletion(true);
 59             if(b)
 60             {
 61                 System.out.println("OK");
 62             }
 63
 64         } catch (Exception e) {
 65             e.printStackTrace();
 66         }
 67     }
 68
 69     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
 70         @Override
 71         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 72             Map<String, Integer> map = new HashMap<String, Integer>();
 73
 74             String[] str = value.toString().split("\t");
 75             StringReader stringReader = new StringReader(str[1]);
 76             IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
 77             Lexeme lexeme = null;
 78             Long count = 0l;
 79             while((lexeme = ikSegmenter.next())!=null) {
 80                 String word = lexeme.getLexemeText();
 81                 if(map.containsKey(word)) {
 82                     map.put(word, map.get(word)+1);
 83                 }
 84                 else{
 85                     map.put(word, 1);
 86                 }
 87                 count++;
 88             }
 89             for(Entry<String, Integer> entry: map.entrySet())
 90             {
 91                 context.write(new Text(entry.getKey()+"_"+str[0]), new Text("A:"+entry.getValue()));//tf词条在此文章中的个数
 92                 context.write(new Text(entry.getKey()+"_"+str[0]), new Text("B:"+count));//此文章中的总词条数
 93                 context.write(new Text(entry.getKey()),new Text("1"));//词条在此文章中出现+1,计算词条在那些文章中出现过
 94             }
 95             context.write(new Text("counter"), new Text(1+""));//文章数累加器
 96         }
 97     }
 98
 99     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
100         @Override
101         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
102
103             //计算总文章数
104             if(key.toString().equals("conter")) {
105                 long sum = 0l;
106                 for(Text v :values)
107                 {
108                     sum += Long.parseLong(v.toString());
109                 }
110                 context.write(key, new Text(sum+""));
111             }
112             else{
113                 if(key.toString().contains("_")) {
114                     StringBuilder stringBuilder = new StringBuilder();
115                     for (Text v : values) {
116                         stringBuilder.append(v.toString());
117                         stringBuilder.append(",");
118                     }
119                     context.write(key, new Text(stringBuilder.toString()));
120                 }
121                 else {//计算词条在那些文章中出现过
122                     long sum = 0l;
123                     for(Text v :values)
124                     {
125                         sum += Long.parseLong(v.toString());
126                     }
127                     context.write(key, new Text(sum+""));
128                 }
129             }
130         }
131     }
132 }

FilterPartition.java

 1 import org.apache.hadoop.io.Text;
 2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 3
 4 /**
 5  * Created by Edward on 2016/7/22.
 6  */
 7 public class FilterPartition extends HashPartitioner<Text, Text> {
 8
 9     @Override
10     public int getPartition(Text key, Text value, int numReduceTasks) {
11
12         if(key.toString().contains("counter"))
13         {
14             return numReduceTasks-1;
15         }
16
17         if(key.toString().contains("_"))
18         {
19             return super.getPartition(key, value, numReduceTasks-2);
20         }
21         else
22         {
23             return numReduceTasks-2;
24         }
25     }
26 }

Step2.java

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.Text;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Mapper;
  8 import org.apache.hadoop.mapreduce.Reducer;
  9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11
 12 import java.io.BufferedReader;
 13 import java.io.FileReader;
 14 import java.io.IOException;
 15 import java.net.URI;
 16 import java.util.HashMap;
 17 import java.util.Map;
 18
 19 /**
 20  * Created by Edward on 2016/7/22.
 21  */
 22 public class Step2 {
 23     public static void main(String[] args)
 24     {
 25         //access hdfs‘s user
 26         //System.setProperty("HADOOP_USER_NAME","root");
 27
 28         Configuration conf = new Configuration();
 29         conf.set("fs.defaultFS", "hdfs://node1:8020");
 30
 31         try {
 32             FileSystem fs = FileSystem.get(conf);
 33
 34             Job job = Job.getInstance(conf);
 35             job.setJarByClass(RunJob.class);
 36             job.setMapperClass(MyMapper.class);
 37             job.setReducerClass(MyReducer.class);
 38
 39             //需要指定 map out 的 key 和 value
 40             job.setOutputKeyClass(Text.class);
 41             job.setOutputValueClass(Text.class);
 42
 43             //分布式缓存,每个slave都能读到数据
 44                 //词条在多少文章中出现过
 45             job.addCacheFile(new Path("/test/tfidf/output/part-r-00002").toUri());
 46                 //文章的总数
 47             job.addCacheFile(new Path("/test/tfidf/output/part-r-00003").toUri());
 48
 49             FileInputFormat.addInputPath(job, new Path("/test/tfidf/output"));
 50
 51             Path path = new Path("/test/tfidf/output1");
 52             if(fs.exists(path))//如果目录存在,则删除目录
 53             {
 54                 fs.delete(path,true);
 55             }
 56             FileOutputFormat.setOutputPath(job, path);
 57
 58             boolean b = job.waitForCompletion(true);
 59             if(b)
 60             {
 61                 System.out.println("OK");
 62             }
 63         } catch (Exception e) {
 64             e.printStackTrace();
 65         }
 66     }
 67
 68
 69     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
 70
 71         public static Map<String, Double> dfmap = new HashMap<String, Double>();
 72
 73         public static Map<String, Double> totalmap = new HashMap<String, Double>();
 74
 75         @Override
 76         protected void setup(Context context) throws IOException, InterruptedException {
 77             URI[] cacheFiles = context.getCacheFiles();
 78             Path pArtNum = new Path(cacheFiles[0].getPath());
 79             Path pArtTotal = new Path(cacheFiles[1].getPath());
 80
 81             //加载词条在多少篇文章中出现过
 82             BufferedReader buffer = new BufferedReader(new FileReader(pArtNum.getName()));
 83             String line = null;
 84             while((line = buffer.readLine()) != null){
 85                 String[] str = line.split("\t");
 86                 dfmap.put(str[0], Double.parseDouble(str[1]));
 87             }
 88
 89             //加载文章总数
 90             buffer = new BufferedReader(new FileReader(pArtTotal.getName()));
 91             line = null;
 92             while((line = buffer.readLine()) != null){
 93                 String[] str = line.split("\t");
 94                 totalmap.put(str[0], Double.parseDouble(str[1]));
 95             }
 96         }
 97
 98         @Override
 99         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
100
101             String[] strings = value.toString().split("\t");
102             String k = strings[0];
103
104             if(k.contains("counter")) {
105                 //过滤掉 文章总数
106             }
107             else if(k.contains("_")){
108                 String word = k.split("_")[0];
109                 String[] info = strings[1].split(",");
110                 String n=null;
111                 String num=null;
112                 if(info[0].contains("A")){
113                     n = info[0].substring(info[0].indexOf(":")+1);
114                     num = info[1].substring(info[0].indexOf(":")+1);
115                 }
116                 if(info[0].contains("B")){
117                     num = info[0].substring(info[0].indexOf(":")+1);
118                     n = info[1].substring(info[0].indexOf(":")+1);
119                 }
120                 double result = 0l;
121
122                 result = (Double.parseDouble(n)/Double.parseDouble(num)) * Math.log( totalmap.get("counter")/dfmap.get(word));
123                 System.out.println("n=" + Double.parseDouble(n));
124                 System.out.println("num=" + Double.parseDouble(num));
125                 System.out.println("counter=" + totalmap.get("counter"));
126                 System.out.println("wordnum=" + dfmap.get(word));
127                 context.write(new Text(k.split("_")[1]), new Text(word+":"+result));
128             }
129             else{
130                 //过滤掉 词条在多少篇文章中出现过
131             }
132         }
133     }
134
135     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
136         @Override
137         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
138
139             StringBuilder stringBuilder = new StringBuilder();
140             for(Text t: values){
141                 stringBuilder.append(t.toString());
142                 stringBuilder.append("\t");
143             }
144             context.write(key, new Text(stringBuilder.toString()) );
145         }
146     }
147 }
时间: 2024-10-02 22:40:53

MapReduce -- TF-IDF的相关文章

tf–idf算法解释及其python代码实现(下)

tf–idf算法python代码实现 这是我写的一个tf-idf的核心部分的代码,没有完整实现,当然剩下的事情就非常简单了,我们知道tfidf=tf*idf,所以可以分别计算tf和idf值在相乘,首先我们创建一个简单的语料库,作为例子,只有四句话,每句表示一个文档 copus=['我正在学习计算机','它正在吃饭','我的书还在你那儿','今天不上班'] 由于中文需要分词,jieba分词是python里面比较好用的分词工具,所以选用jieba分词,文末是jieba的链接.首先对文档进行分词: i

[Elasticsearch] 控制相关度 (四) - 忽略TF/IDF

本章翻译自Elasticsearch官方指南的Controlling Relevance一章. 忽略TF/IDF 有时我们不需要TF/IDF.我们想知道的只是一个特定的单词是否出现在了字段中.比如我们正在搜索度假酒店,希望它拥有的卖点越多越好: WiFi 花园(Garden) 泳池(Pool) 而关于度假酒店的文档类似下面这样: { "description": "A delightful four-bedroomed house with ... " } 可以使用

tf–idf算法解释及其python代码实现(上)

tf–idf算法解释 tf–idf, 是term frequency–inverse document frequency的缩写,它通常用来衡量一个词对在一个语料库中对它所在的文档有多重要,常用在信息检索和文本挖掘中. 一个很自然的想法是在一篇文档中词频越高的词对这篇文档越重要,但同时如果这个词又在非常多的文档中出现的话可能就是很普通的词,没有多少信息,对所在文档贡献不大,例如‘的’这种停用词.所以要综合一个词在所在文档出现次数以及有多少篇文档包含这个词,如果一个词在所在文档出现次数很多同时整个

关于使用Filter减少Lucene tf idf打分计算的调研

将query改成filter,lucene中有个QueryWrapperFilter性能比较差,所以基本上都需要自己写filter,包括TermFilter,ExactPhraseFilter,ConjunctionFilter,DisjunctionFilter. 这几天验证下来,还是or改善最明显,4个termfilter,4508个返回结果,在我本机上性能提高1/3.ExactPhraseFilter也有小幅提升(5%-10%). 最令人不解的是and,原来以为跟结果数和子查询数相关,但几

Elasticsearch学习之相关度评分TF&amp;IDF

relevance score算法,简单来说,就是计算出,一个索引中的文本,与搜索文本,他们之间的关联匹配程度 Elasticsearch使用的是 term frequency/inverse document frequency算法,简称为TF/IDF算法 Term frequency(TF):搜索文本中的各个词条在field文本中出现了多少次,出现次数越多,就越相关 Inverse document frequency(IDF):搜索文本中的各个词条在整个索引的所有文档中出现了多少次,出现的

使用solr的函数查询,并获取tf*idf值

1. 使用函数df(field,keyword) 和idf(field,keyword). http://118.85.207.11:11100/solr/mobile/select?q={!func}product%28idf%28title,%E9%97%AE%E9%A2%98%29,tf%28title,%E9%97%AE%E9%A2%98%29%29&fl=title,score,product%28idf%28title,%E9%97%AE%E9%A2%98%29,tf%28title

55.TF/IDF算法

主要知识点: TF/IDF算法介绍 查看es计算_source的过程及各词条的分数 查看一个document是如何被匹配到的 一.算法介绍 relevance score算法,简单来说,就是计算出,一个索引中的文本,与搜索文本,他们之间的关联匹配程度.Elasticsearch使用的是 term frequency/inverse document frequency算法,简称为TF/IDF算法 1.Term frequency 搜索文本中的各个词条在field文本中出现了多少次,出现次数越多,

25.TF&IDF算法以及向量空间模型算法

主要知识点: boolean model IF/IDF vector space model 一.boolean model 在es做各种搜索进行打分排序时,会先用boolean model 进行初步的筛选,boolean model类似and这种逻辑操作符,先过滤出包含指定term的doc.must/must not/should(过滤.包含.不包含 .可能包含)这几种情况,这一步不会对各个doc进行打分,只分过滤,为下一步的IF/IDF算法筛选数据. 二.TF/IDF 这一步就是es为boo

文本分类学习(三) 特征权重(TF/IDF)和特征提取

上一篇中,主要说的就是词袋模型.回顾一下,在进行文本分类之前,我们需要把待分类文本先用词袋模型进行文本表示.首先是将训练集中的所有单词经过去停用词之后组合成一个词袋,或者叫做字典,实际上一个维度很大的向量.这样每个文本在分词之后,就可以根据我们之前得到的词袋,构造成一个向量,词袋中有多少个词,那这个向量就是多少维度的了.然后就把这些向量交给计算机去计算,而不再需要文本啦.而向量中的数字表示的是每个词所代表的权重.代表这个词对文本类型的影响程度. 在这个过程中我们需要解决两个问题:1.如何计算出适

使用sklearn进行中文文本的tf idf计算

Created by yinhongyu at 2018-4-28 email: [email protected] 使用jieba和sklearn实现了tf idf的计算 import jieba import jieba.posseg as pseg from sklearn import feature_extraction from sklearn.feature_extraction.text import TfidfTransformer from sklearn.feature_e