Mahout之SparseVectorsFromSequenceFiles源码分析
目标:将一个给定的sequence文件集合转化为SparseVectors
1、对文档分词
1.1)使用最新的{@link org.apache.lucene.util.Version}创建一个Analyzer,用来下文1.2分词;
Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;if (cmdLine.hasOption(analyzerNameOpt)) {
String className = cmdLine.getValue(analyzerNameOpt).toString();
analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
// try instantiating it, b/c there isn‘t any point in setting it if
// you can‘t instantiate it
AnalyzerUtils.createAnalyzer(analyzerClass);
}
1.2)使用{@link StringTuple}将input documents转化为token数组(input documents必须是{@link
org.apache.hadoop.io.SequenceFile}格式);
DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);
输入:inputDir 输出:tokenizedPath
SequenceFileTokenizerMapper:
//将input documents按Analyzer进行分词,并将分得的词放在一个StringTuple中
TokenStream stream = analyzer.tokenStream(key.toString(), new StringReader(value.toString()));
CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);
stream.reset();
StringTuple document = new StringTuple();//StringTuple是一个能够被用于Hadoop Map/Reduce Job的String类型有序List
while (stream.incrementToken()) {
if (termAtt.length() > 0) {
document.add(new String(termAtt.buffer(), 0, termAtt.length()));
}
}
2、创建TF向量(Term Frequency Vectors)---多个Map/Reduce Job
DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,
outputDir,
tfDirName,
conf,
minSupport,
maxNGramSize,
minLLRValue,
-1.0f,
false,
reduceTasks,
chunkSize,
sequentialAccessOutput,
namedVectors);
2.1)全局词统计(TF)
startWordCounting(input, dictionaryJobPath, baseConf, minSupport);
使用Map/Reduce并行地统计全局的词频,这里只考虑(maxNGramSize == 1)
输入:tokenizedPath 输出:wordCountPath
TermCountMapper:
//统计一个文本文档中的词频
OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<String>();
for (String word : value.getEntries()) {
if (wordCount.containsKey(word)) {
wordCount.put(word, wordCount.get(word) + 1);
} else {
wordCount.put(word, 1);
}
}
wordCount.forEachPair(new ObjectLongProcedure<String>() {
@Override
public boolean apply(String first, long second) {
try {
context.write(new Text(first), new LongWritable(second));
} catch (IOException e) {
context.getCounter("Exception", "Output IO Exception").increment(1);
} catch (InterruptedException e) {
context.getCounter("Exception", "Interrupted Exception").increment(1);
}
return true;
}
});
TermCountCombiner:(
同 TermCountReducer)
TermCountReducer:
//汇总所有的words和单词的weights,并将同一word的权重sum
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
if (sum >= minSupport) {//TermCountCombiner没有这个过滤)
context.write(key, new LongWritable(sum));
}
2.2)创建词典
List<Path> dictionaryChunks;
dictionaryChunks =
createDictionaryChunks(dictionaryJobPath, output, baseConf, chunkSizeInMegabytes, maxTermDimension);
读取2.1词频Job的feature frequency List,并给它们指定id
输入:wordCountPath 输出:dictionaryJobPath
/**
* Read the feature frequency List which is built at the end of the Word Count Job and assign ids to them.
* This will use constant memory and will run at the speed of your disk read
*/
private static List<Path> createDictionaryChunks(Path wordCountPath,
Path dictionaryPathBase,
Configuration baseConf,
int chunkSizeInMegabytes,
int[] maxTermDimension) throws IOException {
List<Path> chunkPaths = Lists.newArrayList();Configuration conf = new Configuration(baseConf);
FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);
long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;//默认64M
int chunkIndex = 0;
Path chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
chunkPaths.add(chunkPath);SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
try {
long currentChunkSize = 0;
Path filesPattern = new Path(wordCountPath, OUTPUT_FILES_PATTERN);
int i = 0;
for (Pair<Writable,Writable> record
: new SequenceFileDirIterable<Writable,Writable>(filesPattern, PathType.GLOB, null, null, true, conf)) {
if (currentChunkSize > chunkSizeLimit) {//生成新的词典文件
Closeables.close(dictWriter, false);
chunkIndex++;chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
chunkPaths.add(chunkPath);dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
currentChunkSize = 0;
}Writable key = record.getFirst();
int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
currentChunkSize += fieldSize;
dictWriter.append(key, new IntWritable(i++));//指定id
}
maxTermDimension[0] = i;//记录最大word数目
} finally {
Closeables.close(dictWriter, false);
}return chunkPaths;
}
2.3)构造PartialVectors(TF)
int partialVectorIndex = 0;
Collection<Path> partialVectorPaths = Lists.newArrayList();
for (Path dictionaryChunk : dictionaryChunks) {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
makePartialVectors(input, baseConf, maxNGramSize, dictionaryChunk, partialVectorOutputPath,
maxTermDimension[0], sequentialAccess, namedVectors, numReducers);
}
将input documents使用a chunk of features创建a partial vector
(这是由于词典文件被分成了多个文件,每个文件只能构造总的vector的一部分,其中每一部分叫一个partial vector)
输入:tokenizedPath 输出:partialVectorPaths
Mapper:(Mapper)
TFPartialVectorReducer:
//读取词典文件
//MAHOUT-1247
Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
// key is word value is id
for (Pair<Writable, IntWritable> record
: new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {
dictionary.put(record.getFirst().toString(), record.getSecond().get());
}
//转化a document为a sparse vector
StringTuple value = it.next();Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size
for (String term : value.getEntries()) {
if (!term.isEmpty() && dictionary.containsKey(term)) { // unigram
int termId = dictionary.get(term);
vector.setQuick(termId, vector.getQuick(termId) + 1);
}
}
2.4)合并PartialVectors(TF)
Configuration conf = new Configuration(baseConf);Path outputDir = new Path(output, tfVectorsFolderName);
PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, conf, normPower, logNormalize,
maxTermDimension[0], sequentialAccess, namedVectors, numReducers);
合并所有的partial {@link
org.apache.mahout.math.RandomAccessSparseVector}s为完整的{@link
org.apache.mahout.math.RandomAccessSparseVector}
输入:partialVectorPaths 输出:tfVectorsFolder
Mapper:(Mapper)
PartialVectorMergeReducer:
//合并partial向量为完整的TF向量
Vector vector = new RandomAccessSparseVector(dimension, 10);
for (VectorWritable value : values) {
vector.assign(value.get(), Functions.PLUS);//将包含不同word的向量合并为一个
}
3、创建IDF向量(document frequency Vectors)---多个Map/Reduce Job
Pair<Long[], List<Path>> docFrequenciesFeatures = null;
// Should document frequency features be processed
if (shouldPrune || processIdf) {
log.info("Calculating IDF");
docFrequenciesFeatures =
TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize);
}
3.1)统计DF词频
Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);
startDFCounting(input, wordCountPath, baseConf);
输入:tfDir 输出:featureCountPath
TermDocumentCountMapper:
//为一个文档中的每个word计数1、文档数1
Vector vector = value.get();
for (Vector.Element e : vector.nonZeroes()) {
out.set(e.index());
context.write(out, ONE);
}
context.write(TOTAL_COUNT, ONE);
Combiner:(TermDocumentCountReducer)
TermDocumentCountReducer:
//将每个word的文档频率和文档总数sum
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
3.2)df词频分块
return createDictionaryChunks(wordCountPath, output, baseConf, chunkSizeInMegabytes);
将df词频分块存放到多个文件,记录word总数、文档总数
输入:featureCountPath 输出:dictionaryPathBase
/**
* Read the document frequency List which is built at the end of the DF Count Job. This will use constant
* memory and will run at the speed of your disk read
*/
private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,
Path dictionaryPathBase,
Configuration baseConf,
int chunkSizeInMegabytes) throws IOException {
List<Path> chunkPaths = Lists.newArrayList();
Configuration conf = new Configuration(baseConf);FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);
long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
int chunkIndex = 0;
Path chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
chunkPaths.add(chunkPath);
SequenceFile.Writer freqWriter =
new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);try {
long currentChunkSize = 0;
long featureCount = 0;
long vectorCount = Long.MAX_VALUE;
Path filesPattern = new Path(featureCountPath, OUTPUT_FILES_PATTERN);
for (Pair<IntWritable,LongWritable> record
: new SequenceFileDirIterable<IntWritable,LongWritable>(filesPattern,
PathType.GLOB,
null,
null,
true,
conf)) {if (currentChunkSize > chunkSizeLimit) {
Closeables.close(freqWriter, false);
chunkIndex++;chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);
chunkPaths.add(chunkPath);freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);
currentChunkSize = 0;
}int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + Integer.SIZE / 8 + Long.SIZE / 8;
currentChunkSize += fieldSize;
IntWritable key = record.getFirst();
LongWritable value = record.getSecond();
if (key.get() >= 0) {
freqWriter.append(key, value);
} else if (key.get() == -1) {//文档数目
vectorCount = value.get();
}
featureCount = Math.max(key.get(), featureCount);}
featureCount++;
Long[] counts = {featureCount, vectorCount};//word数目、文档数目
return new Pair<Long[], List<Path>>(counts, chunkPaths);
} finally {
Closeables.close(freqWriter, false);
}
}
4、创建TFIDF(Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors)
TFIDFConverter.processTfIdf(
new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),
outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,
sequentialAccessOutput, namedVectors, reduceTasks);
4.1)生成PartialVectors(TFIDF)
int partialVectorIndex = 0;
List<Path> partialVectorPaths = Lists.newArrayList();
List<Path> dictionaryChunks = datasetFeatures.getSecond();
for (Path dictionaryChunk : dictionaryChunks) {
Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);
partialVectorPaths.add(partialVectorOutputPath);
makePartialVectors(input,
baseConf,
datasetFeatures.getFirst()[0],
datasetFeatures.getFirst()[1],
minDf,
maxDF,
dictionaryChunk,
partialVectorOutputPath,
sequentialAccessOutput,
namedVector);
}
使用a chunk of features创建a partial tfidf vector
输入:tfVectorsFolder 输出:partialVectorOutputPath
DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);//缓存df分块文件
Mapper:(Mapper)
TFIDFPartialVectorReducer:
//计算每个文档中每个word的TFIDF值
Vector value = it.next().get();
Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
for (Vector.Element e : value.nonZeroes()) {
if (!dictionary.containsKey(e.index())) {
continue;
}
long df = dictionary.get(e.index());
if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) {
continue;
}
if (df < minDf) {
df = minDf;
}
vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
}
4.2)合并partial向量(TFIDF)
Configuration conf = new Configuration(baseConf);Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);
PartialVectorMerger.mergePartialVectors(partialVectorPaths,
outputDir,
baseConf,
normPower,
logNormalize,
datasetFeatures.getFirst()[0].intValue(),
sequentialAccessOutput,
namedVector,
numReducers);
合并所有的partial向量为一个完整的文档向量
输入:partialVectorOutputPath 输出:outputDir
Mapper:Mapper
PartialVectorMergeReducer:
//汇总TFIDF向量
Vector vector = new RandomAccessSparseVector(dimension, 10);
for (VectorWritable value : values) {
vector.assign(value.get(), Functions.PLUS);
}
TFIDF文档向量化-Mahout_MapReduce,布布扣,bubuko.com