Hadoop 文本分类 终于跑通了

Training

入口

package org.wordCount;

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class WordMain {

    // private static List<String> secondDir = new ArrayList<String>();

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        //下面两行很重要
        conf.set("mapred.jar", "E://eclipse//jar-work//WordMain.jar");
        conf.set("mapred.job.tracker", "192.168.190.128:9001");

        //设置单词先验概率的保存路径
        String priorProbality = "hdfs://192.168.190.128:9000/user/hadoop/output/priorP/priorProbability.txt";
        conf.set("priorProbality", priorProbality);

        //单词总种类数的保存路径
        String totalWordsPath = "hdfs://192.168.190.128:9000/user/hadoop/output/totalwords.txt";
        conf.set("totalWordsPath", totalWordsPath);

        //每个类别中单词总数
        String wordsInClassPath = "hdfs://192.168.190.128:9000/user/hadoop/mid/wordsFrequence/_wordsInClass/wordsInClass-r-00000";
        conf.set("wordsInClassPath", wordsInClassPath);

        //设置输入 和 单词词频的输出路径
        // "/user/hadoop/input/NBCorpus/Country"
        String input = "hdfs://192.168.190.128:9000/user/hadoop/input/NBCorpus/Country";
        String wordsOutput = "hdfs://192.168.190.128:9000/user/hadoop/mid/wordsFrequence";
        conf.set("input", input);
        conf.set("wordsOutput", wordsOutput);

        //每个类别单词概率保存路径,
        //单词词频的输入路径也就是单词词频的输出路径

        String freqOutput = "hdfs://192.168.190.128:9000/user/hadoop/output/probability/";
        conf.set("freqOutput", freqOutput);

        FileCount.run(conf);
        WordCount.run(conf);
        Probability.run(conf);
/*
        System.out.print("----------");

        String[] otherArgs = new String[] { "hdfs://192.168.190.128:9000/user/hadoop/test/",
                "hdfs://192.168.190.128:9000/user/hadoop/wordcount/output2/" };
        conf.set("mapred.jar", "E://eclipse//jar-work//WordMain.jar");

        Job job = new Job(conf, "word count");
        job.setJarByClass(WordMain.class);

        job.setInputFormatClass(MyInputFormat.class);

        job.setMapperClass(WordMapper.class);
//        job.setCombinerClass(WordReducer.class);
        job.setReducerClass(WordReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // MyUtils.addInputPath(job, new Path(otherArgs[0]), conf);

        List<Path> inputPaths = getSecondDir(conf, otherArgs[0]);
        for (Path path : inputPaths) {
            System.out.println("path = " + path.toString());
            MyInputFormat.addInputPath(job, path);

        }
        System.out.println("addinputpath     ok" );
//        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);*/

    }

    // 获取文件夹下面二级文件夹路径的方法
    static List<Path> getSecondDir(Configuration conf, String folder) throws Exception {
        Path path = new Path(folder);
        FileSystem fs = path.getFileSystem(conf);
        FileStatus[] stats = fs.listStatus(path);
        List<Path> folderPath = new ArrayList<Path>();
        for (FileStatus stat : stats) {
            if (stat.isDir()) {
                if (fs.listStatus(stat.getPath()).length > 10) { // 筛选出文件数大于10个的类别作为
                                                                    // 输入路径
                    folderPath.add(stat.getPath());
                }
            }
        }
        return folderPath;
    }

}

统计各个类别文本数

package org.wordCount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 *
 * 获取文件个数,并计算先验概率 先验概率保存在/user/hadoop/output/priorP/prior.txt
 *
 */

public class FileCount {

    public static void run(Configuration conf) throws Exception {

        int sum = 0;
        String in = conf.get("input");

        Map<String, Integer> map = new HashMap<>();
        Map<String, Double> priorMap = new HashMap<>();

        // map传值(需要筛选测试集,有的类别文本数太少要删除)
        map = FileCount.getFileNumber(in);

        //测试打印出每个类别和文件总数
        Iterator<Map.Entry<String, Integer>> itrs = map.entrySet().iterator();
        while (itrs.hasNext()) {
//            System.out.println("ok");
            Map.Entry<String, Integer> it = itrs.next();
            if(it.getValue() <= 10){    //这两行代码可以不计算文本数少于10的类别
                itrs.remove();
            }else{
                sum += it.getValue();
                System.out.println(it.getKey() + "\t" + it.getValue());
            }
        }

        System.out.println("sum = " + sum);

        String output = conf.get("priorProbality");

        Path outputPath = new Path(output);
        FileSystem fs = outputPath.getFileSystem(conf);
        FSDataOutputStream outputStream = fs.create(outputPath);

        //计算每个类别文本占总文本的比率,即先验概率
        String ctx = "";
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            Double result = 0.0;
            result = Double.parseDouble(entry.getValue().toString()) / sum;
            priorMap.put(entry.getKey(), result);//保存在priorMap中
            ctx += entry.getKey() + "\t" + result + "\n";
        }
        outputStream.writeBytes(ctx);
        IOUtils.closeStream(outputStream);

        // 打印概率信息,同时可以写入文件中
        // map的另外一种遍历方法
        Iterator<Map.Entry<String, Double>> iterators = priorMap.entrySet().iterator();
        while (iterators.hasNext()) {
            Map.Entry<String, Double> iterator = iterators.next();
            System.out.println(iterator.getKey() + "\t" + iterator.getValue());
        }

    }

    // get 方法
    public static Map<String, Integer> getFileNumber(String folderPath) throws Exception {

        Map<String, Integer> fileMap = new HashMap<>();
        Configuration conf = new Configuration();

        Path path = new Path(folderPath);
        FileSystem hdfs = path.getFileSystem(conf);
        FileStatus[] status = hdfs.listStatus(path);
//        System.out.println(folderPath);
//        System.out.println("status.length = " + status.length);

        for (FileStatus stat : status) {
            if (stat.isDir()) {
                int length = hdfs.listStatus(stat.getPath()).length;
                String name = stat.getPath().getName();
                fileMap.put(name, length);
            }
        }

        return fileMap;
    }

}

文本中单词计数

package org.wordCount;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class WordCount {

    private static MultipleOutputs<Text, IntWritable> mos;
    // static String baseOutputPath = "/user/hadoop/test_out";

    // 设计两个map分别计算每个类别的文本数//和每个类别的单词总数
    // private static Map<String, List<String>> fileCountMap = new
    // HashMap<String, List<String>>();
    // private static Map<String, Integer> fileCount = new HashMap<String,
    // Integer>();
    // static Map<String, List<String>> wordsCountInClassMap = new
    // HashMap<String, List<String>>();

    static enum WordsNature {
        CLSASS_NUMBER, CLASS_WORDS, TOTALWORDS
    }

    // map
    static class First_Mapper extends Mapper<Text, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private final static IntWritable zero = new IntWritable(0);

        private Text countryName = new Text();

        @Override
        protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                String word = itr.nextToken();
                if (!(MyUtils.hasDigit(word) || word.contains("."))) { // 去掉无意义词
                    countryName.set(key.toString() + "\t" + word);

                    context.write(countryName, one); // 统计每个类别中的单词个数 ABL have 1
                    context.write(key, one); // 统计类别中的单词总数
                    context.write(new Text(word), zero); // 统计单词总数
                }
            }

        }
    }

    // Reducer
    static class First_Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        // result 表示每个类别中每个单词的个数
        IntWritable result = new IntWritable();
        Map<String, List<String>> classMap = new HashMap<String, List<String>>();
        Map<String, List<String>> fileMap = new HashMap<String, List<String>>();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                        throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }

            // sum为0,总得单词数加1,统计所有单词的种类
            if (sum == 0) {
                context.getCounter(WordsNature.TOTALWORDS).increment(1);
            } else {// sum不为0时,通过key的长度来判断,
                String[] temp = key.toString().split("\t");
                if (temp.length == 2) { // 用tab分隔类别和单词
                    result.set(sum);
                    context.write(key, result);
                    // mos.write(new Text(temp[1]), result, temp[0]);
                } else { // 类别中单词总数
                    result.set(sum);
                    mos.write(key, result, "_wordsInClass" + "\\" + "wordsInClass");
                }

            }

        }

        @Override
        protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            mos.close();
        }

        @Override
        protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            mos = new MultipleOutputs<Text, IntWritable>(context);
        }

    }

    public static int run(Configuration conf) throws Exception {
//        Configuration conf = new Configuration();
        // System.out.print("---run-------");
        // 设置不同文件的路径
        // 文本数路径
//        String priorProbality = "hdfs://192.168.190.128:9000/user/hadoop/output/priorP/priorProbality.txt";
//        conf.set("priorProbality", priorProbality);

        Job job = new Job(conf, "file count");

        job.setJarByClass(WordCount.class);

        job.setInputFormatClass(MyInputFormat.class);

        job.setMapperClass(WordCount.First_Mapper.class);
        job.setReducerClass(WordCount.First_Reducer.class);
        // System.out.println("---job-------");
        // 过滤掉文本数少于10的类别

        String input = conf.get("input");

        List<Path> inputPaths = MyUtils.getSecondDir(conf, input);
        for (Path path : inputPaths) {
            System.out.println("path = " + path.toString());
            MyInputFormat.addInputPath(job, path);
        }

        String wordsOutput = conf.get("wordsOutput");
        FileOutputFormat.setOutputPath(job, new Path(wordsOutput));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        int exitCode = job.waitForCompletion(true) ? 0 : 1;

        // 调用计数器
        Counters counters = job.getCounters();
        Counter c1 = counters.findCounter(WordsNature.TOTALWORDS);
        System.out.println("-------------->>>>: " + c1.getDisplayName() + ":" + c1.getName() + ": " + c1.getValue());

        // 将单词种类数写入文件中
        Path totalWordsPath = new Path("hdfs://192.168.190.128:9000/user/hadoop/output/totalwords.txt");
        FileSystem fs = totalWordsPath.getFileSystem(conf);
        FSDataOutputStream outputStream = fs.create(totalWordsPath);
        outputStream.writeBytes(c1.getDisplayName() + ":" + c1.getValue());
        IOUtils.closeStream(outputStream);

        // 下次求概率是尝试单词总种类数写到configuration中
        //
        // conf.set("TOTALWORDS", totalWords.toString());

        return exitCode;

    }

}

MyInputFormat

package org.wordCount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class Probability {

    private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
    public static int total = 0;
    private static MultipleOutputs<Text, DoubleWritable> mos;

    // Client
    public static void run(Configuration conf) throws Exception {

        // 读取单词总数,设置到congfiguration中
        String totalWordsPath = conf.get("totalWordsPath");
//        String wordsInClassPath = conf.get("wordsInClassPath");

        // 先读取单词总类别数
        FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
        FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
        BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
        String strLine = buffer.readLine();
        String[] temp = strLine.split(":");
        if (temp.length == 2) {
            // temp[0] = TOTALWORDS
            conf.set(temp[0], temp[1]);// 设置两个String
        }

        total = Integer.parseInt(conf.get("TOTALWORDS"));
        LOG.info("------>total = " + total);

        System.out.println("total ==== " + total);

        Job job = new Job(conf, "file count");

        job.setJarByClass(Probability.class);

        job.setMapperClass(WordsOfClassCountMapper.class);
        job.setReducerClass(WordsOfClassCountReducer.class);

        String input = conf.get("wordsOutput");
        String output = conf.get("freqOutput");

        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    // Mapper
    static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {

        private static DoubleWritable number = new DoubleWritable();
        private static Text className = new Text();

        // 保存类别中单词总数
        private static Map<String, Integer> filemap = new HashMap<String, Integer>();

        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            int tot = Integer.parseInt(conf.get("TOTALWORDS"));

            System.out.println("total = " + total);
            System.out.println("tot = " + tot);

            // 输入的格式如下:
            // ALB weekend 1
            // ALB weeks 3
            Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
            // Map<String, Map<String, Double>> priorMap = new HashMap<String,
            // Map<String, Double>>(); // 保存每个单词出现的概率

            String[] temp = value.toString().split("\t");
            // 先将数据存到baseMap中
            if (temp.length == 3) {
                // 文件夹名类别名
                if (baseMap.containsKey(temp[0])) {
                    baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
                } else {
                    Map<String, Integer> oneMap = new HashMap<String, Integer>();
                    oneMap.put(temp[1], Integer.parseInt(temp[2]));
                    baseMap.put(temp[0], oneMap);
                }

            } // 读取数据完毕,全部保存在baseMap中

            int allWordsInClass = 0;

            for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
                allWordsInClass = filemap.get(entries.getKey());
                for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
                    double p = (entry.getValue() + 1.0) / (allWordsInClass + tot);

                    className.set(entries.getKey() + "\t" + entry.getKey());
                    number.set(p);
                    LOG.info("------>p = " + p);
                    mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件

//                    context.write(className, number);
                }
            }

        }

        //最后计算类别中不存在单词的概率,每个类别都是一个常数
        protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();
            int tot = Integer.parseInt(conf.get("TOTALWORDS"));
            for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别

                double notFind =  (1.0) / (entry.getValue() + tot);
                number.set(notFind);
                mos.write(new Text(entry.getKey()), number, "_notFound" + "\\" +"notFound");

            }
            mos.close();
        }

        protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Configuration conf = context.getConfiguration();
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
            String filePath = conf.get("wordsInClassPath");
            FileSystem fs = FileSystem.get(URI.create(filePath), conf);
            FSDataInputStream inputStream = fs.open(new Path(filePath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = null;
            while ((strLine = buffer.readLine()) != null) {
                String[] temp = strLine.split("\t");
                filemap.put(temp[0], Integer.parseInt(temp[1]));
            }
        }

    }

    // Reducer
    static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

        // result 表示每个文件里面单词个数
        DoubleWritable result = new DoubleWritable();
        // Configuration conf = new Configuration();
        // int total = conf.getInt("TOTALWORDS", 1);

        protected void reduce(Text key, Iterable<DoubleWritable> values,
                Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {

            double sum = 0L;
            for (DoubleWritable value : values) {
                sum += value.get();
            }
            result.set(sum);

            context.write(key, result);
        }

    }

}

两个小工具

package org.wordCount;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class MyUtils {

    // 获取文件夹下面二级文件夹路径的方法
        static List<Path> getSecondDir(Configuration conf, String folder) throws Exception {
//            System.out.println("----getSencondDir----" + folder);
            Path path = new Path(folder);

            FileSystem fs = path.getFileSystem(conf);
            FileStatus[] stats = fs.listStatus(path);
            System.out.println("stats.length = " + stats.length);
            List<Path> folderPath = new ArrayList<Path>();
            for (FileStatus stat : stats) {
                if (stat.isDir()) {
//                    System.out.println("----stat----" + stat.getPath());
                    if (fs.listStatus(stat.getPath()).length > 10) { // 筛选出文件数大于10个的类别作为
                                                                        // 输入路径
                        folderPath.add(stat.getPath());
                    }
                }
            }
//            System.out.println("----folderPath----" + folderPath.size());
            return folderPath;
        }

        // 判断一个字符串是否含有数字
        static boolean hasDigit(String content) {

            boolean flag = false;

            Pattern p = Pattern.compile(".*\\d+.*");

            Matcher m = p.matcher(content);

            if (m.matches())

                flag = true;

            return flag;

        }

}

计算概率

package org.wordCount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class Probability {

    private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
    public static int total = 0;
    private static MultipleOutputs<Text, DoubleWritable> mos;

    // Client
    public static void run(Configuration conf) throws Exception {

        // 读取单词总数,设置到congfiguration中
        String totalWordsPath = conf.get("totalWordsPath");
//        String wordsInClassPath = conf.get("wordsInClassPath");

        // 先读取单词总类别数
        FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
        FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
        BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
        String strLine = buffer.readLine();
        String[] temp = strLine.split(":");
        if (temp.length == 2) {
            // temp[0] = TOTALWORDS
            conf.set(temp[0], temp[1]);// 设置两个String
        }

        total = Integer.parseInt(conf.get("TOTALWORDS"));
        LOG.info("------>total = " + total);

        System.out.println("total ==== " + total);

        Job job = new Job(conf, "file count");

        job.setJarByClass(Probability.class);

        job.setMapperClass(WordsOfClassCountMapper.class);
        job.setReducerClass(WordsOfClassCountReducer.class);

        String input = conf.get("wordsOutput");
        String output = conf.get("freqOutput");

        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    // Mapper
    static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {

        private static DoubleWritable number = new DoubleWritable();
        private static Text className = new Text();

        // 保存类别中单词总数
        private static Map<String, Integer> filemap = new HashMap<String, Integer>();

        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            int tot = Integer.parseInt(conf.get("TOTALWORDS"));

            System.out.println("total = " + total);
            System.out.println("tot = " + tot);

            // 输入的格式如下:
            // ALB weekend 1
            // ALB weeks 3
            Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
            // Map<String, Map<String, Double>> priorMap = new HashMap<String,
            // Map<String, Double>>(); // 保存每个单词出现的概率

            String[] temp = value.toString().split("\t");
            // 先将数据存到baseMap中
            if (temp.length == 3) {
                // 文件夹名类别名
                if (baseMap.containsKey(temp[0])) {
                    baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
                } else {
                    Map<String, Integer> oneMap = new HashMap<String, Integer>();
                    oneMap.put(temp[1], Integer.parseInt(temp[2]));
                    baseMap.put(temp[0], oneMap);
                }

            } // 读取数据完毕,全部保存在baseMap中

            int allWordsInClass = 0;

            for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
                allWordsInClass = filemap.get(entries.getKey());
                for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
                    double p = (entry.getValue() + 1.0) / (allWordsInClass + tot);

                    className.set(entries.getKey() + "\t" + entry.getKey());
                    number.set(p);
                    LOG.info("------>p = " + p);
                    mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件

//                    context.write(className, number);
                }
            }

        }

        //最后计算类别中不存在单词的概率,每个类别都是一个常数
        protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();
            int tot = Integer.parseInt(conf.get("TOTALWORDS"));
            for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别

                double notFind =  (1.0) / (entry.getValue() + tot);
                number.set(notFind);
                mos.write(new Text(entry.getKey()), number, "_notFound" + "\\" +"notFound");

            }
            mos.close();
        }

        protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Configuration conf = context.getConfiguration();
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
            String filePath = conf.get("wordsInClassPath");
            FileSystem fs = FileSystem.get(URI.create(filePath), conf);
            FSDataInputStream inputStream = fs.open(new Path(filePath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = null;
            while ((strLine = buffer.readLine()) != null) {
                String[] temp = strLine.split("\t");
                filemap.put(temp[0], Integer.parseInt(temp[1]));
            }
        }

    }

    // Reducer
    static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

        // result 表示每个文件里面单词个数
        DoubleWritable result = new DoubleWritable();
        // Configuration conf = new Configuration();
        // int total = conf.getInt("TOTALWORDS", 1);

        protected void reduce(Text key, Iterable<DoubleWritable> values,
                Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {

            double sum = 0L;
            for (DoubleWritable value : values) {
                sum += value.get();
            }
            result.set(sum);

            context.write(key, result);
        }

    }

}

预测

package org.wordCount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class Probability {

    private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
    public static int total = 0;
    private static MultipleOutputs<Text, DoubleWritable> mos;

    // Client
    public static void run(Configuration conf) throws Exception {

        // 读取单词总数,设置到congfiguration中
        String totalWordsPath = conf.get("totalWordsPath");
//        String wordsInClassPath = conf.get("wordsInClassPath");

        // 先读取单词总类别数
        FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
        FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
        BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
        String strLine = buffer.readLine();
        String[] temp = strLine.split(":");
        if (temp.length == 2) {
            // temp[0] = TOTALWORDS
            conf.set(temp[0], temp[1]);// 设置两个String
        }

        total = Integer.parseInt(conf.get("TOTALWORDS"));
        LOG.info("------>total = " + total);

        System.out.println("total ==== " + total);

        Job job = new Job(conf, "file count");

        job.setJarByClass(Probability.class);

        job.setMapperClass(WordsOfClassCountMapper.class);
        job.setReducerClass(WordsOfClassCountReducer.class);

        String input = conf.get("wordsOutput");
        String output = conf.get("freqOutput");

        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    // Mapper
    static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {

        private static DoubleWritable number = new DoubleWritable();
        private static Text className = new Text();

        // 保存类别中单词总数
        private static Map<String, Integer> filemap = new HashMap<String, Integer>();

        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            int tot = Integer.parseInt(conf.get("TOTALWORDS"));

            System.out.println("total = " + total);
            System.out.println("tot = " + tot);

            // 输入的格式如下:
            // ALB weekend 1
            // ALB weeks 3
            Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
            // Map<String, Map<String, Double>> priorMap = new HashMap<String,
            // Map<String, Double>>(); // 保存每个单词出现的概率

            String[] temp = value.toString().split("\t");
            // 先将数据存到baseMap中
            if (temp.length == 3) {
                // 文件夹名类别名
                if (baseMap.containsKey(temp[0])) {
                    baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
                } else {
                    Map<String, Integer> oneMap = new HashMap<String, Integer>();
                    oneMap.put(temp[1], Integer.parseInt(temp[2]));
                    baseMap.put(temp[0], oneMap);
                }

            } // 读取数据完毕,全部保存在baseMap中

            int allWordsInClass = 0;

            for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
                allWordsInClass = filemap.get(entries.getKey());
                for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
                    double p = (entry.getValue() + 1.0) / (allWordsInClass + tot);

                    className.set(entries.getKey() + "\t" + entry.getKey());
                    number.set(p);
                    LOG.info("------>p = " + p);
                    mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件

//                    context.write(className, number);
                }
            }

        }

        //最后计算类别中不存在单词的概率,每个类别都是一个常数
        protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {

            Configuration conf = context.getConfiguration();
            int tot = Integer.parseInt(conf.get("TOTALWORDS"));
            for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别

                double notFind =  (1.0) / (entry.getValue() + tot);
                number.set(notFind);
                mos.write(new Text(entry.getKey()), number, "_notFound" + "\\" +"notFound");

            }
            mos.close();
        }

        protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Configuration conf = context.getConfiguration();
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
            String filePath = conf.get("wordsInClassPath");
            FileSystem fs = FileSystem.get(URI.create(filePath), conf);
            FSDataInputStream inputStream = fs.open(new Path(filePath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = null;
            while ((strLine = buffer.readLine()) != null) {
                String[] temp = strLine.split("\t");
                filemap.put(temp[0], Integer.parseInt(temp[1]));
            }
        }

    }

    // Reducer
    static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

        // result 表示每个文件里面单词个数
        DoubleWritable result = new DoubleWritable();
        // Configuration conf = new Configuration();
        // int total = conf.getInt("TOTALWORDS", 1);

        protected void reduce(Text key, Iterable<DoubleWritable> values,
                Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {

            double sum = 0L;
            for (DoubleWritable value : values) {
                sum += value.get();
            }
            result.set(sum);

            context.write(key, result);
        }

    }

}

预测的inputformat

package org.wordCount.predict;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {

        return false;
    }

}

class WholeFileRecordReader extends RecordReader<LongWritable, Text>{

    private FileSplit fileSplit;    //保存输入的分片,他将被转换成一条<key, value>记录
    private Configuration conf;        //配置对象
    private Text value = new Text();//
    private LongWritable key = new LongWritable();    //key对象,为空
    private boolean processed = false;    //布尔变量记录记录是否被处理过

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit)split;        //将输入分片强制转换成fileSplit
        this.conf = context.getConfiguration();

    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(!processed){
            byte[] contents = new byte[(int)fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try{
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            }finally{
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {

        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {

        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub

    }

}

时间: 2024-10-19 08:03:08

Hadoop 文本分类 终于跑通了的相关文章

2017知乎看山杯总结(多标签文本分类)

http://blog.csdn.net/jerr__y/article/details/77751885 关于比赛详情,请戳:2017 知乎看山杯机器学习挑战赛 代码:https://github.com/yongyehuang/zhihu-text-classification 基于:python 2.7, TensorFlow 1.2.1 任务描述:参赛者需要根据知乎给出的问题及话题标签的绑定关系的训练数据,训练出对未标注数据自动标注的模型. 标注数据中包含 300 万个问题,每个问题有

如何在Windows下用cpu模式跑通py-faster-rcnn 的demo.py

关键字:Windows.cpu模式.Python.faster-rcnn.demo.py 声明:本篇blog暂时未经二次实践验证,主要以本人第一次配置过程的经验写成.计划在7月底回家去电脑城借台机子试试验证步骤的正确性,本blog将根据实际遇到的问题持续更新.另外blog中除提到的下载链接外我还会给出网盘链接方便下载,包括我的整个工程的网盘链接.如果有些报错解决不了可直接拿本人的相关文件替换,本篇blog具有较高的参考性. 本人微软版caffe工程     下载链接:http://pan.bai

NLP系列(2)_用朴素贝叶斯进行文本分类(上)

作者:寒小阳 && 龙心尘 时间:2016年1月. 出处:http://blog.csdn.net/longxinchen_ml/article/details/50597149 http://blog.csdn.net/han_xiaoyang/article/details/50616559 声明:版权全部,转载请联系作者并注明出处 1. 引言 贝叶斯方法是一个历史悠久.有着坚实的理论基础的方法,同一时候处理非常多问题时直接而又高效.非常多高级自然语言处理模型也能够从它演化而来.因此,

文本分类:特征选择统计量

在文本分类中,用于特征选择的统计量主要有这些: 特征频度(term frequency,tf) 原理是:低频的往往对分类影响不大,从而可以被排除掉.同时,不是所以高频都是影响大的,比如在文本中分布均匀的高频的贡献是不大的. 应用: 主要用在文本标引时直接删除某些低频特征 2. 文本频度(document frequency,df) 原理是:稀有词条可能是噪声,但也可能对某一类别的区分作用显著 应用:多于tf结合起来使用 3. 信息熵(特征熵) 公式理解: 某个随机变量的取值x能够提供的信息量为l

文本分类需要CNN?No!fastText完美解决你的需求(前篇)

http://blog.csdn.net/weixin_36604953/article/details/78195462?locationNum=8&fps=1 文本分类需要CNN?No!fastText完美解决你的需求(前篇) fastText是个啥?简单一点说,就是一种可以得到和深度学习结果准确率相同,但是速度快出几个世纪的文本分类算法.这个算法类似与CBOW,可爱的读着是不是要问CBOW又是个什么鬼?莫急,听小编给你慢慢到来,一篇文章,让你了解word2vec的原理,CBOW.Skip-

中文文本分类1

文本挖掘(Text Mining)是从非结构化文本信息中获取用户感兴趣或者有用的模式的过程. 文本挖掘是指从大量文本数据中抽取事先未知的.可理解的.最终可用的知识的过程,同时运用这些知识更好地组织信息以便将来参考. 文本预处理 文本处理的核心任务是把非结构化和半结构化的文本转换为结构化的形式,即向量空间模型. 具体步骤: 1. 选择处理的文本范围 选择恰当的范围取决于文本挖掘任务的目标: 对于分类或聚类的任务,往往把整个文档作为处理单位: 对于情感分析.文档自动摘要或信息检索,段落或章节可能更合

Spark ML下实现的多分类adaboost+naivebayes算法在文本分类上的应用

1. Naive Bayes算法 朴素贝叶斯算法算是生成模型中一个最经典的分类算法之一了,常用的有Bernoulli和Multinomial两种.在文本分类上经常会用到这两种方法.在词袋模型中,对于一篇文档$d$中出现的词$w_0,w_1,...,w_n$, 这篇文章被分类为$c$的概率为$$p(c|w_0,w_1,...,w_n) = \frac{p(c,w_0,w_1,...,w_n)}{p(w_0,w_1,...,w_n)} = \frac{p(w_0,w_1,...,w_n|c)*p(c

LingPipe-TextClassification(文本分类)

What is Text Classification? Text classification typically involves assigning a document to a category by automated or human means. LingPipe provides a classification facility that takes examples of text classifications--typically generated by a huma

使用libsvm实现文本分类

文本分类,首先它是分类问题,应该对应着分类过程的两个重要的步骤,一个是使用训练数据集训练分类器,另一个就是使用测试数据集来评价分类器的分类精度.然而,作为文本分类,它还具有文本这样的约束,所以对于文本来说,需要额外的处理过程,我们结合使用libsvm从宏观上总结一下,基于libsvm实现文本分类实现的基本过程,如下所示: 选择文本训练数据集和测试数据集:训练集和测试集都是类标签已知的: 训练集文本预处理:这里主要包括分词.去停用词.建立词袋模型(倒排表): 选择文本分类使用的特征向量(词向量):