运用mapreduce计算tf-idf

问题描述:给定一个大文件,文件中的内容每一行为:文档名,文档内容。

input

文档名1,word1 Word2 .......

文档名2,word1 Word2 .......

output

word  文档名  tfidf值

package com.elex.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.elex.mapreduce.TFIDF_4.IDFMap;
import com.elex.mapreduce.TFIDF_4.IDFReduce;
import com.elex.utils.DataClean;
import com.google.common.io.Closeables;

public class TFIDF_5 {
	public static String hdfsURL = "hdfs://namenode:8020";
	public static String fileURL = "/tmp/usercount";

	public static class TFMap extends Mapper<Object, Text, Text, Text> {
		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String userWordstmp = value.toString();
			StringTokenizer userWords = new StringTokenizer(userWordstmp, "\n");
			while (userWords.hasMoreTokens()) {
				String userWordFragtmp = userWords.nextToken();
				StringTokenizer userWordFrag = new StringTokenizer(
						userWordFragtmp, ",");
				String user = userWordFrag.nextToken();
				Text outputKey = new Text();
				Text outputValue = new Text();
				while (userWordFrag.hasMoreTokens()) {
					String words = userWordFrag.nextToken();
					HashMap<String, Integer> wordMap = DataClean.clean(words,
							"!total");
					int wordTotal = wordMap.get("!total");
					wordMap.remove("!total");
					for (Map.Entry<String, Integer> wordEntry : wordMap
							.entrySet()) {
						String word = wordEntry.getKey();
						int wordCount = wordEntry.getValue();
						float tf = (float) wordCount / (float) wordTotal;
						String outputStr = word + " " + Float.toString(tf)
								+ ",";
						byte[] bytes = outputStr.getBytes();
						outputValue.append(bytes, 0, bytes.length);
					}
				}
				outputKey.set(user);
				context.write(outputKey, outputValue);
			}
		}
	}

	public static class TFReduce extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			// StringBuffer sb = new StringBuffer();
			Iterator<Text> iter = values.iterator();
			while (iter.hasNext()) {
				// sb.append(iter.next().toString() + "\t");
				context.write(key, iter.next());
			}
			// Text outputValue = new Text();
			// outputValue.set(sb.toString());
			// context.write(key, outputValue);
		}
	}

	public static class IDFMap extends Mapper<Object, Text, Text, Text> {
		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String valuesTmp = value.toString();
			StringTokenizer userWordFrag = new StringTokenizer(valuesTmp, "\n");
			while (userWordFrag.hasMoreTokens()) {
				// String userWordtmp = userWordFrag.nextToken();
				StringTokenizer userWords = new StringTokenizer(
						userWordFrag.nextToken(), "\t");
				String user = userWords.nextToken();
				while (userWords.hasMoreTokens()) {
					StringTokenizer wordTFs = new StringTokenizer(
							userWords.nextToken(), ",");
					while (wordTFs.hasMoreTokens()) {
						StringTokenizer wordTF = new StringTokenizer(
								wordTFs.nextToken());
						String word = wordTF.nextToken();
						String tf = wordTF.nextToken();
						Text outputKey = new Text();
						Text outputValue = new Text();
						outputKey.set(word);
						outputValue.set(user + "\t" + tf);
						context.write(outputKey, outputValue);
					}
				}
			}

		}
	}

	public static class IDFReduce extends Reducer<Text, Text, Text, Text> {
		long userCount = 0;

		public void setup(Context context) throws IOException {
			Configuration conf = context.getConfiguration();
			Path path = new Path(fileURL);
			FileSystem fs = FileSystem.get(URI.create(hdfsURL), conf);
			if (!fs.isFile(path)) {
				FSDataOutputStream output = fs.create(path, true);
				output.close();
			}
			FSDataInputStream input = fs.open(path);
			StringBuffer sb = new StringBuffer();
			byte[] bytes = new byte[1024];
			int status = input.read(bytes);
			while (status != -1) {
				sb.append(new String(bytes));
				status = input.read(bytes);
			}
			if (!"".equals(sb.toString())) {
				userCount = Long.parseLong(sb.toString().trim());
			}
			input.close();
		}

		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			LinkedList<String> userList = new LinkedList<String>();
			Iterator<Text> iter = values.iterator();
			long wordCount = 0;
			while (iter.hasNext()) {
				wordCount++;
				userList.add(iter.next().toString());
			}
			float idf = (float) Math.log((float) userCount
					/ (float) (wordCount + 1));
			Iterator<String> userIter = userList.iterator();
			Text outputValue = new Text();
			while (userIter.hasNext()) {
				String usertftmp = userIter.next();
				StringTokenizer usertf = new StringTokenizer(usertftmp, "\t");
				String user = usertf.nextToken();
				String tfStr = usertf.nextToken();
				float tf = Float.parseFloat(tfStr.trim().toString());
				float tfidf = tf * idf;
				String outputTmp = user + "\t" + tfidf;
				outputValue.set(outputTmp);
				context.write(key, outputValue);
			}
		}
	}

	public static class UserCountMap extends Mapper<Object, Text, Text, Text> {

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String userWordtmp = value.toString();
			StringTokenizer userWord = new StringTokenizer(userWordtmp, "\n");
			while (userWord.hasMoreTokens()) {
				userWord.nextToken();
				Text outputKey = new Text();
				outputKey.set("usercount");
				Text one = new Text();
				one.set("1");
				context.write(outputKey, one);
			}
		}
	}

	public static class UserCountCombine extends
			Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			long user = 0;
			for (Text value : values) {
				String valueTmp = value.toString();
				user += Long.parseLong(valueTmp);
			}
			Text outputValue = new Text();
			outputValue.set(Long.toString(user));
			context.write(key, outputValue);
		}
	}

	public static class UserCountReduce extends Reducer<Text, Text, Text, Text> {
		int userCount = 0;

		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values) {
				String valueTmp = value.toString();
				userCount += Long.parseLong(valueTmp);
			}
		}

		public void cleanup(Context context) throws IOException {
			Configuration conf = context.getConfiguration();
			FileSystem fs = FileSystem.get(URI.create(hdfsURL), conf);
			Path path = new Path(fileURL);
			FSDataOutputStream output = fs.create(path, true);
			String content = Long.toString(userCount);
			output.write(content.getBytes());
			output.flush();
			output.close();
		}
	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		// conf.set("mapred.child.java.opts", "-Xmx4096m");
		Job tfJob = Job.getInstance(conf, "tfjob");
		tfJob.setJarByClass(TFIDF_5.class);
		tfJob.setMapperClass(TFMap.class);
		// tfJob.setCombinerClass(TFCombine.class);
		tfJob.setReducerClass(TFReduce.class);
		tfJob.setOutputKeyClass(Text.class);
		tfJob.setOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(tfJob, new Path(args[0]));
		FileOutputFormat.setOutputPath(tfJob, new Path(args[1]));
		tfJob.waitForCompletion(true);

		// Job userCountJob = Job.getInstance(conf, "usercountjob");
		// userCountJob.setJarByClass(TFIDF_5.class);
		// userCountJob.setMapperClass(UserCountMap.class);
		// userCountJob.setCombinerClass(UserCountCombine.class);
		// userCountJob.setReducerClass(UserCountReduce.class);
		// userCountJob.setOutputKeyClass(Text.class);
		// userCountJob.setOutputValueClass(Text.class);
		// FileInputFormat.setInputPaths(userCountJob, new Path(args[1]));
		// FileOutputFormat.setOutputPath(userCountJob, new Path(args[2]));
		// userCountJob.waitForCompletion(true);
<span style="white-space: pre;">		</span>//计算文档数,并临时储存到hdfs上
		Counter ct = tfJob.getCounters().findCounter(
				"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS");
		System.out.println(ct.getValue());
		Iterable<String> groupNames = tfJob.getCounters().getGroupNames();
		for (String groupName : groupNames) {
			System.out.println(groupName);
		}
		FileSystem fs = FileSystem.get(URI.create(hdfsURL), conf);
		Path path = new Path(fileURL);
		FSDataOutputStream output = fs.create(path, true);
		String content = Long.toString(ct.getValue());
		output.write(content.getBytes());
		output.flush();
		output.close();

		Job idfJob = Job.getInstance(conf, "idfjob");
		idfJob.setJarByClass(TFIDF_5.class);
		idfJob.setMapperClass(IDFMap.class);
		idfJob.setReducerClass(IDFReduce.class);
		idfJob.setOutputKeyClass(Text.class);
		idfJob.setOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(idfJob, new Path(args[1]));
		FileOutputFormat.setOutputPath(idfJob, new Path(args[3]));
		System.exit(idfJob.waitForCompletion(true) ? 0 : 1);

	}

}

最初运用了一个单独的job计算文档数,后面经过公司前辈的指点,可以通过计算tf的时候运用输入数据的条数来巧妙的计算文档数。

时间: 2024-07-30 19:56:13

运用mapreduce计算tf-idf的相关文章

使用sklearn进行中文文本的tf idf计算

Created by yinhongyu at 2018-4-28 email: [email protected] 使用jieba和sklearn实现了tf idf的计算 import jieba import jieba.posseg as pseg from sklearn import feature_extraction from sklearn.feature_extraction.text import TfidfTransformer from sklearn.feature_e

关于使用Filter减少Lucene tf idf打分计算的调研

将query改成filter,lucene中有个QueryWrapperFilter性能比较差,所以基本上都需要自己写filter,包括TermFilter,ExactPhraseFilter,ConjunctionFilter,DisjunctionFilter. 这几天验证下来,还是or改善最明显,4个termfilter,4508个返回结果,在我本机上性能提高1/3.ExactPhraseFilter也有小幅提升(5%-10%). 最令人不解的是and,原来以为跟结果数和子查询数相关,但几

tf–idf算法解释及其python代码实现(下)

tf–idf算法python代码实现 这是我写的一个tf-idf的核心部分的代码,没有完整实现,当然剩下的事情就非常简单了,我们知道tfidf=tf*idf,所以可以分别计算tf和idf值在相乘,首先我们创建一个简单的语料库,作为例子,只有四句话,每句表示一个文档 copus=['我正在学习计算机','它正在吃饭','我的书还在你那儿','今天不上班'] 由于中文需要分词,jieba分词是python里面比较好用的分词工具,所以选用jieba分词,文末是jieba的链接.首先对文档进行分词: i

[Elasticsearch] 控制相关度 (四) - 忽略TF/IDF

本章翻译自Elasticsearch官方指南的Controlling Relevance一章. 忽略TF/IDF 有时我们不需要TF/IDF.我们想知道的只是一个特定的单词是否出现在了字段中.比如我们正在搜索度假酒店,希望它拥有的卖点越多越好: WiFi 花园(Garden) 泳池(Pool) 而关于度假酒店的文档类似下面这样: { "description": "A delightful four-bedroomed house with ... " } 可以使用

tf–idf算法解释及其python代码实现(上)

tf–idf算法解释 tf–idf, 是term frequency–inverse document frequency的缩写,它通常用来衡量一个词对在一个语料库中对它所在的文档有多重要,常用在信息检索和文本挖掘中. 一个很自然的想法是在一篇文档中词频越高的词对这篇文档越重要,但同时如果这个词又在非常多的文档中出现的话可能就是很普通的词,没有多少信息,对所在文档贡献不大,例如‘的’这种停用词.所以要综合一个词在所在文档出现次数以及有多少篇文档包含这个词,如果一个词在所在文档出现次数很多同时整个

Elasticsearch学习之相关度评分TF&amp;IDF

relevance score算法,简单来说,就是计算出,一个索引中的文本,与搜索文本,他们之间的关联匹配程度 Elasticsearch使用的是 term frequency/inverse document frequency算法,简称为TF/IDF算法 Term frequency(TF):搜索文本中的各个词条在field文本中出现了多少次,出现次数越多,就越相关 Inverse document frequency(IDF):搜索文本中的各个词条在整个索引的所有文档中出现了多少次,出现的

55.TF/IDF算法

主要知识点: TF/IDF算法介绍 查看es计算_source的过程及各词条的分数 查看一个document是如何被匹配到的 一.算法介绍 relevance score算法,简单来说,就是计算出,一个索引中的文本,与搜索文本,他们之间的关联匹配程度.Elasticsearch使用的是 term frequency/inverse document frequency算法,简称为TF/IDF算法 1.Term frequency 搜索文本中的各个词条在field文本中出现了多少次,出现次数越多,

25.TF&IDF算法以及向量空间模型算法

主要知识点: boolean model IF/IDF vector space model 一.boolean model 在es做各种搜索进行打分排序时,会先用boolean model 进行初步的筛选,boolean model类似and这种逻辑操作符,先过滤出包含指定term的doc.must/must not/should(过滤.包含.不包含 .可能包含)这几种情况,这一步不会对各个doc进行打分,只分过滤,为下一步的IF/IDF算法筛选数据. 二.TF/IDF 这一步就是es为boo

文本分类学习(三) 特征权重(TF/IDF)和特征提取

上一篇中,主要说的就是词袋模型.回顾一下,在进行文本分类之前,我们需要把待分类文本先用词袋模型进行文本表示.首先是将训练集中的所有单词经过去停用词之后组合成一个词袋,或者叫做字典,实际上一个维度很大的向量.这样每个文本在分词之后,就可以根据我们之前得到的词袋,构造成一个向量,词袋中有多少个词,那这个向量就是多少维度的了.然后就把这些向量交给计算机去计算,而不再需要文本啦.而向量中的数字表示的是每个词所代表的权重.代表这个词对文本类型的影响程度. 在这个过程中我们需要解决两个问题:1.如何计算出适