TFIDF文档向量化-Mahout_MapReduce

              Mahout之SparseVectorsFromSequenceFiles源码分析

目标:将一个给定的sequence文件集合转化为SparseVectors

1、对文档分词

1.1)使用最新的{@link org.apache.lucene.util.Version}创建一个Analyzer,用来下文1.2分词;


      Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;

if (cmdLine.hasOption(analyzerNameOpt)) {
String className = cmdLine.getValue(analyzerNameOpt).toString();
analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
// try instantiating it, b/c there isn‘t any point in setting it if
// you can‘t instantiate it
AnalyzerUtils.createAnalyzer(analyzerClass);
}

1.2)使用{@link StringTuple}将input documents转化为token数组(input documents必须是{@link
org.apache.hadoop.io.SequenceFile}格式);

DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);

输入:inputDir     输出:tokenizedPath

SequenceFileTokenizerMapper:


 //将input documents按Analyzer进行分词,并将分得的词放在一个StringTuple中
TokenStream stream = analyzer.tokenStream(key.toString(), new StringReader(value.toString()));
CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);
stream.reset();
StringTuple document = new StringTuple();//StringTuple是一个能够被用于Hadoop Map/Reduce Job的String类型有序List
while (stream.incrementToken()) {
if (termAtt.length() > 0) {
document.add(new String(termAtt.buffer(), 0, termAtt.length()));
}
}

2、创建TF向量(Term Frequency Vectors)---多个Map/Reduce Job


        DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
outputDir,
tfDirName,
conf,
minSupport,
maxNGramSize,
minLLRValue,
-1.0f,
false,
reduceTasks,
chunkSize,
sequentialAccessOutput,
namedVectors);

2.1)全局词统计(TF)

startWordCounting(input, dictionaryJobPath, baseConf, minSupport);

使用Map/Reduce并行地统计全局的词频,这里只考虑(maxNGramSize == 1)

输入:tokenizedPath   输出:wordCountPath

TermCountMapper:


  //统计一个文本文档中的词频
OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<String>();
for (String word : value.getEntries()) {
if (wordCount.containsKey(word)) {
wordCount.put(word, wordCount.get(word) + 1);
} else {
wordCount.put(word, 1);
}
}
wordCount.forEachPair(new ObjectLongProcedure<String>() {
@Override
public boolean apply(String first, long second) {
try {
context.write(new Text(first), new LongWritable(second));
} catch (IOException e) {
context.getCounter("Exception", "Output IO Exception").increment(1);
} catch (InterruptedException e) {
context.getCounter("Exception", "Interrupted Exception").increment(1);
}
return true;
}
});

TermCountCombiner:(
同 TermCountReducer)

TermCountReducer:


//汇总所有的words和单词的weights,并将同一word的权重sum
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
if (sum >= minSupport) {//TermCountCombiner没有这个过滤)
context.write(key, new LongWritable(sum));
}

2.2)创建词典

 List<Path> dictionaryChunks;
dictionaryChunks =
createDictionaryChunks(dictionaryJobPath, output, baseConf, chunkSizeInMegabytes, maxTermDimension);

读取2.1词频Job的feature frequency List,并给它们指定id

输入:wordCountPath   输出:dictionaryJobPath


 /**
* Read the feature frequency List which is built at the end of the Word Count Job and assign ids to them.
* This will use constant memory and will run at the speed of your disk read
*/
private static List<Path> createDictionaryChunks(Path wordCountPath,
Path dictionaryPathBase,
Configuration baseConf,
int chunkSizeInMegabytes,
int[] maxTermDimension) throws IOException {
List<Path> chunkPaths = Lists.newArrayList();

Configuration conf = new Configuration(baseConf);

FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);

long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;//默认64M
int chunkIndex = 0;
Path chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
chunkPaths.add(chunkPath);

SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);

try {
long currentChunkSize = 0;
Path filesPattern = new Path(wordCountPath, OUTPUT_FILES_PATTERN);
int i = 0;
for (Pair<Writable,Writable> record
: new SequenceFileDirIterable<Writable,Writable>(filesPattern, PathType.GLOB, null, null, true, conf)) {
if (currentChunkSize > chunkSizeLimit) {//生成新的词典文件
Closeables.close(dictWriter, false);
chunkIndex++;

chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
chunkPaths.add(chunkPath);

dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
currentChunkSize = 0;
}

Writable key = record.getFirst();
int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
currentChunkSize += fieldSize;
dictWriter.append(key, new IntWritable(i++));//指定id
}
maxTermDimension[0] = i;//记录最大word数目
} finally {
Closeables.close(dictWriter, false);
}

return chunkPaths;
}

2.3)构造PartialVectors(TF)


int partialVectorIndex = 0;
Collection<Path> partialVectorPaths = Lists.newArrayList();
for (Path dictionaryChunk : dictionaryChunks) {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
makePartialVectors(input, baseConf, maxNGramSize, dictionaryChunk, partialVectorOutputPath,
maxTermDimension[0], sequentialAccess, namedVectors, numReducers);
}

将input documents使用a chunk of features创建a partial vector

(这是由于词典文件被分成了多个文件,每个文件只能构造总的vector的一部分,其中每一部分叫一个partial vector)

输入:tokenizedPath   输出:partialVectorPaths

Mapper:(Mapper)

TFPartialVectorReducer:


    //读取词典文件
//MAHOUT-1247
Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
// key is word value is id
for (Pair<Writable, IntWritable> record
: new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
dictionary.put(record.getFirst().toString(), record.getSecond().get());
}


//转化a document为a sparse vector
StringTuple value = it.next();

Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size

for (String term : value.getEntries()) {
if (!term.isEmpty() && dictionary.containsKey(term)) { // unigram
int termId = dictionary.get(term);
vector.setQuick(termId, vector.getQuick(termId) + 1);
}
}

2.4)合并PartialVectors(TF)


    Configuration conf = new Configuration(baseConf);

Path outputDir = new Path(output, tfVectorsFolderName);
PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, conf, normPower, logNormalize,
maxTermDimension[0], sequentialAccess, namedVectors, numReducers);

合并所有的partial {@link
org.apache.mahout.math.RandomAccessSparseVector}s为完整的{@link
org.apache.mahout.math.RandomAccessSparseVector}

输入:partialVectorPaths   输出:tfVectorsFolder

Mapper:(Mapper)

PartialVectorMergeReducer:

//合并partial向量为完整的TF向量
Vector vector = new RandomAccessSparseVector(dimension, 10);
for (VectorWritable value : values) {
vector.assign(value.get(), Functions.PLUS);//将包含不同word的向量合并为一个
}

3、创建IDF向量(document frequency Vectors)---多个Map/Reduce Job


      Pair<Long[], List<Path>> docFrequenciesFeatures = null;
// Should document frequency features be processed
if (shouldPrune || processIdf) {
log.info("Calculating IDF");
docFrequenciesFeatures =
TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize);
}

3.1)统计DF词频

Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);

startDFCounting(input, wordCountPath, baseConf);

输入:tfDir  输出:featureCountPath

TermDocumentCountMapper:


 //为一个文档中的每个word计数1、文档数1
Vector vector = value.get();
for (Vector.Element e : vector.nonZeroes()) {
out.set(e.index());
context.write(out, ONE);
}
context.write(TOTAL_COUNT, ONE);

Combiner:(TermDocumentCountReducer)

TermDocumentCountReducer:

   //将每个word的文档频率和文档总数sum
   long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}

3.2)df词频分块

 return createDictionaryChunks(wordCountPath, output, baseConf, chunkSizeInMegabytes);

将df词频分块存放到多个文件,记录word总数、文档总数

输入:featureCountPath    输出:dictionaryPathBase


  /**
* Read the document frequency List which is built at the end of the DF Count Job. This will use constant
* memory and will run at the speed of your disk read
*/
private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,
Path dictionaryPathBase,
Configuration baseConf,
int chunkSizeInMegabytes) throws IOException {
List<Path> chunkPaths = Lists.newArrayList();
Configuration conf = new Configuration(baseConf);

FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);

long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
int chunkIndex = 0;
Path chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
chunkPaths.add(chunkPath);
SequenceFile.Writer freqWriter =
new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);

try {
long currentChunkSize = 0;
long featureCount = 0;
long vectorCount = Long.MAX_VALUE;
Path filesPattern = new Path(featureCountPath, OUTPUT_FILES_PATTERN);
for (Pair<IntWritable,LongWritable> record
: new SequenceFileDirIterable<IntWritable,LongWritable>(filesPattern,
PathType.GLOB,
null,
null,
true,
conf)) {

if (currentChunkSize > chunkSizeLimit) {
Closeables.close(freqWriter, false);
chunkIndex++;

chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
chunkPaths.add(chunkPath);

freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
currentChunkSize = 0;
}

int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + Integer.SIZE / 8 + Long.SIZE / 8;
currentChunkSize += fieldSize;
IntWritable key = record.getFirst();
LongWritable value = record.getSecond();
if (key.get() >= 0) {
freqWriter.append(key, value);
} else if (key.get() == -1) {//文档数目
vectorCount = value.get();
}
featureCount = Math.max(key.get(), featureCount);

}
featureCount++;
Long[] counts = {featureCount, vectorCount};//word数目、文档数目
return new Pair<Long[], List<Path>>(counts, chunkPaths);
} finally {
Closeables.close(freqWriter, false);
}
}

4、创建TFIDF(Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors)

        TFIDFConverter.processTfIdf(
new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
sequentialAccessOutput, namedVectors, reduceTasks);

4.1)生成PartialVectors(TFIDF)


  int partialVectorIndex = 0;
List<Path> partialVectorPaths = Lists.newArrayList();
List<Path> dictionaryChunks = datasetFeatures.getSecond();
for (Path dictionaryChunk : dictionaryChunks) {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
makePartialVectors(input,
baseConf,
datasetFeatures.getFirst()[0],
datasetFeatures.getFirst()[1],
minDf,
maxDF,
dictionaryChunk,
partialVectorOutputPath,
sequentialAccessOutput,
namedVector);
}

使用a chunk of features创建a partial tfidf vector

输入:tfVectorsFolder   输出:partialVectorOutputPath

    DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);//缓存df分块文件

Mapper:(Mapper)

TFIDFPartialVectorReducer:


  //计算每个文档中每个word的TFIDF值
Vector value = it.next().get();
Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
for (Vector.Element e : value.nonZeroes()) {
if (!dictionary.containsKey(e.index())) {
continue;
}
long df = dictionary.get(e.index());
if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) {
continue;
}
if (df < minDf) {
df = minDf;
}
vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
}

4.2)合并partial向量(TFIDF)


    Configuration conf = new Configuration(baseConf);

Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);

PartialVectorMerger.mergePartialVectors(partialVectorPaths,
outputDir,
baseConf,
normPower,
logNormalize,
datasetFeatures.getFirst()[0].intValue(),
sequentialAccessOutput,
namedVector,
numReducers);

合并所有的partial向量为一个完整的文档向量

输入:partialVectorOutputPath   输出:outputDir

Mapper:Mapper

PartialVectorMergeReducer:

    //汇总TFIDF向量
  Vector vector = new RandomAccessSparseVector(dimension, 10);
for (VectorWritable value : values) {
vector.assign(value.get(), Functions.PLUS);
}

TFIDF文档向量化-Mahout_MapReduce,布布扣,bubuko.com

时间: 2024-10-02 01:34:48

TFIDF文档向量化-Mahout_MapReduce的相关文章

使用gensim和sklearn搭建一个文本分类器(一):文档向量化

总的来讲,一个完整的文本分类器主要由两个阶段,或者说两个部分组成:一是将文本向量化,将一个字符串转化成向量形式:二是传统的分类器,包括线性分类器,SVM, 神经网络分类器等等. 之前看的THUCTC的技术栈是使用 tf-idf 来进行文本向量化,使用卡方校验(chi-square)来降低向量维度,使用liblinear(采用线性核的svm) 来进行分类.而这里所述的文本分类器,使用lsi (latent semantic analysis, 隐性语义分析) 来进行向量化, 不需要降维, 因为可以

用Python做SVD文档聚类---奇异值分解----文档相似性----LSI(潜在语义分析)

转载请注明出处:电子科技大学EClab——落叶花开http://www.cnblogs.com/nlp-yekai/p/3848528.html SVD,即奇异值分解,在自然语言处理中,用来做潜在语义分析即LSI,或者LSA.最早见文章 An introduction to latent semantic analysis SVD的有关资料,从很多大牛的博客中整理了一下,然后自己写了个python版本,放上来,跟大家分享- 关于SVD的讲解,参考博客 本文由LeftNotEasy发布于http:

机器学习入门-文本数据-构造Tf-idf词袋模型(词频和逆文档频率) 1.TfidfVectorizer(构造tf-idf词袋模型)

TF-idf模型:TF表示的是词频:即这个词在一篇文档中出现的频率 idf表示的是逆文档频率, 即log(文档的个数/1+出现该词的文档个数)  可以看出出现该词的文档个数越小,表示这个词越稀有,在这篇文档中也是越重要的 TF-idf: 表示TF*idf, 即词频*逆文档频率 词袋模型不仅考虑了一个词的词频,同时考虑了这个词在整个语料库中的重要性 代码: 第一步:使用DataFrame格式处理数据,同时数组化数据 第二步:定义函数,进行分词和停用词的去除,并使用' '连接去除停用词后的列表 第三

python 分词计算文档TF-IDF值并排序

文章来自于我的个人博客:python 分词计算文档TF-IDF值并排序 该程序实现的功能是:首先读取一些文档,然后通过jieba来分词,将分词存入文件,然后通过sklearn计算每个分词文档中的tf-idf值,再将文档排序输入一个大文件中 依赖包: sklearn jieba 注:此程序参考了一位同行的程序后进行了修改 # -*- coding: utf-8 -*- """ @author: jiangfuqiang """ import os

Python TF-IDF计算100份文档关键词权重

上一篇博文中,我们使用结巴分词对文档进行分词处理,但分词所得结果并不是每个词语都是有意义的(即该词对文档的内容贡献少),那么如何来判断词语对文档的重要度呢,这里介绍一种方法:TF-IDF. 一,TF-IDF介绍 TF-IDF(Term Frequency–Inverse Document Frequency)是一种用于资讯检索与文本挖掘的常用加权技术.TF-IDF是一种统计方法,用以评估一个字词对于一个文件集或一个语料库中的其中一份文件的重要程度.字词的重要性随着它在文件中出现的次数成正比增加,

如何计算两个文档的相似度(二)

注:完全进行了测试,并附有完整代码: # -*- coding: cp936 -*- from gensim import corpora, models, similarities import logging logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s' , level=logging.INFO) documents = ["Shipment of gold damaged in a fire&q

向量空间模型实现文档查询(Vector space model to realise document query)

xml中文档(query)的结构: <topic> <number>CIRB010TopicZH006</number> <title>科索沃難民潮</title> <question> 查詢科索沃戰爭中的難民潮情況,以及國際間對其採取的援助. </question> <narrative> 相關文件內容包括科省難民湧入的地點.人數,受安置的狀況,難民潮引發的問題,參與救援之國家與國際組織,其援助策略與行動內容

如何计算两个文档的相似度(三)

本文代码全部实现,并附上注释: # -*- coding: cp936 -*- import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem.lancaster import LancasterStemmer from gensim import corpora, models, similarities import logging courses = [

scikit learn 模块 调参 pipeline+girdsearch 数据举例:文档分类

scikit learn 模块 调参 pipeline+girdsearch 数据举例:文档分类数据集 fetch_20newsgroups #-*- coding: UTF-8 -*- import numpy as np from sklearn.pipeline import Pipeline from sklearn.linear_model import SGDClassifier from sklearn.grid_search import GridSearchCV from sk