1,统计词出现的次数
1/计算类别的先验概率
*输入格式:类别+文档id+文档词(切分成A,b,c)
*输出格式:类别+文档出现次数+文档出现的词的总数
2/计算每个词的条件概率
*输入格式:类别+文档id+文档词(切分成A,b,c)
*输出格式:类别+词+词的总数
3/假设二分类问题-计算概率值
* 1类别+文档出现次数+文档出现的词的总数
* 2类别+词+词的总数
* 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数))
* 输入格式:类别+词+词的总数
* 输出格式:"词","类别+log()值概率"+1,2+类别的先验概率
* 4/假设二分类问题-测试
* 1类别+文档出现次数+文档出现的词的总数
* 2类别+词+词的总数
* 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数))
*输入格式:新文档id+文档词(切分成A,b,c)
*输出格式:新文档id+类别
这个版本基本写了MapReduce的朴素贝叶斯思路--具体优化和修改以后再弄
Python版实现
http://blog.csdn.net/q383700092/article/details/51773364
R语言版调用函数
http://blog.csdn.net/q383700092/article/details/51774069
MapReduce简化实现版
http://blog.csdn.net/q383700092/article/details/51778765
spark版
后续添加
Bayes1
package com.ml.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 1/计算类别的先验概率 * 汇总到dict1.txt *输入格式:类别+文档id+文档词(切分成A,b,c) *输出格式:类别+文档出现次数+文档出现的词的总数 */ public class Bayes1 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();// 类别id private Text myval = new Text();// 文档id+文档长度 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = value.toString().split(","); String[] doc=array[2].split("-"); mykey.set(array[0]); myval.set("1"+","+doc.length); context.write(mykey, myval); }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于计算该类别总的个数 int sum = 0; //计算出现词的总个数 int wordsum = 0; // 循环遍历 Interable for (Text value : values) { // 累加 String[] array = value.toString().split(","); sum += Integer.parseInt(array[0]); wordsum += Integer.parseInt(array[1]); val.set(sum+","+wordsum); } context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(1); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate if (args.length != 2) { System.err.printf("Usage:%s [genneric options]<input><output>\n", tool.getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return null; } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/bayesTrain.txt", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/" }; // run mapreduce int status = ToolRunner.run(new Bayes1(), args); // 5 exit System.exit(status); } }
</pre>Bayes2<p></p><p></p><pre code_snippet_id="1734263" snippet_file_name="blog_20160628_2_8203234" name="code" class="java">package com.ml.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 2/计算每个词的条件概率 * 汇总到dict2.txt *输入格式:类别+文档id+文档词(切分成A,b,c) *输出格式:类别+词+词的总数 */ public class Bayes2 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();//类别+词 private Text myval = new Text();//出现个数 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] array = value.toString().split(","); String[] doc=array[2].split("-"); for (String str : doc) { mykey.set(array[0]+ ","+ str); myval.set("1"); context.write(mykey, myval); } }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于计算每个类别里面每个词出现的总数 int sum = 0; // 循环遍历 Interable for (Text value : values) { // 累加 String array = value.toString(); sum += Integer.parseInt(array); val.set(sum + ""); } context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(1); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate if (args.length != 2) { System.err.printf("Usage:%s [genneric options]<input><output>\n", tool.getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return null; } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/bayesTrain.txt", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/pword/" }; // run mapreduce int status = ToolRunner.run(new Bayes2(), args); // 5 exit System.exit(status); } }
Bayes3
package com.ml.mapreduce; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 3/假设二分类问题-计算概率值 * 1类别+文档出现次数+文档出现的词的总数 * 2类别+词+词的总数 * 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数)) * * 输入格式:类别+词+词的总数 * 输出格式:"词","类别+log()值概率"+1,2+类别的先验概率 */ public class Bayes3 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();// 类别+词 private Text myval = new Text();// 出现个数 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { BufferedReader br = null; //获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String lines = null; String[] class1 = {"0","0"}; String[] class0 = {"0","0"}; for(Path p : distributePaths){ if(p.getParent().toString().endsWith("bayes")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(lines=br.readLine())){ String[] pall= lines.split(","); if (pall[0].equals("1")) { class1[0]=pall[1]; class1[1]=pall[2]; }else { class0[0]=pall[1]; class0[1]=pall[2]; } } } } String[] array = value.toString().split(","); Double plog=0.0; if (array[0].equals("1")) { mykey.set(array[1]);// 词 plog=Math.log(Double.parseDouble(array[2])/Double.parseDouble(class1[1])); myval.set(array[0]+","+plog);// 类别+log概率 context.write(mykey, myval); }else { mykey.set(array[1]);// 词 plog=Math.log(Double.parseDouble(array[2])/Double.parseDouble(class0[1])); myval.set(array[0]+","+plog);// 类别+log概率 context.write(mykey, myval); } }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String vals="tab"; for (Text value : values) { // 累加 vals=vals+","+value.toString(); } val.set(vals); context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);//为该job添加缓存文件 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(0); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate // if (args.length != 2) { // System.err.printf("Usage:%s [genneric options]<input><output>\n", // tool.getClass().getSimpleName()); // ToolRunner.printGenericCommandUsage(System.err); // return null; // } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/bayes/pword/part-r-00000", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/pall/", "hdfs://192.168.192.129:9000/ml/bayes/part-r-00000"}; // run mapreduce int status = ToolRunner.run(new Bayes3(), args); // 5 exit System.exit(status); } }
Bayes4
package com.ml.mapreduce; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 4/假设二分类问题-测试 * 1类别+文档出现次数+文档出现的词的总数 * 2类别+词+词的总数 * 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数)) * *输入格式:新文档id+文档词(切分成A,b,c) *输出格式:新文档id+类别 */ public class Bayes4 extends Configured implements Tool { public static enum Counter { PARSER_ERR } public static class MyMap extends Mapper<LongWritable, Text, Text, Text> { private Text mykey = new Text();//类别+词 private Text myval = new Text();//出现个数 Map zidianString=new HashMap();//key是词 value是概率值-假设字典可以读到内存中//不能的话切分读取 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { BufferedReader br = null; //获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String lines = null; for(Path p : distributePaths){ if(p.getParent().toString().endsWith("pall")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(lines=br.readLine())){ String[] pall= lines.split(","); if (pall.length>4) { if (pall[2].equals("1")) { zidianString.put(pall[0], pall[2]+","+pall[3]+","+pall[4]+","+pall[5]); }else { zidianString.put(pall[0], pall[4]+","+pall[5]+","+pall[2]+","+pall[3]); } }else { if (pall[2].equals("1")) { zidianString.put(pall[0], pall[2]+","+pall[3]+","+"0"+","+"0.0"); }else { zidianString.put(pall[0], "1"+","+"0.0"+","+pall[2]+","+pall[3]); } } } } } String[] array = value.toString().split(","); String[] doc=array[1].split("-"); for (String str : doc) { if (zidianString.containsKey(str)) { String[] kk=zidianString.get(str).toString().split(",");//类别+概率 mykey.set(array[0]);//文档id myval.set(kk[0]+","+kk[1]+","+kk[2]+","+kk[3]);//类别+log概率 context.write(mykey, myval); } } }; } public static class MyReduce extends Reducer<Text, Text, Text, Text> { private Text val = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于计算每个类别里面词的概率 Double sum=0.5;//类别1的先验概率 --需要提前算好0-0这里可以考虑读入--等有空再修改 Double sum2=0.5;//类别0的先验概率 // 循环遍历 Interable for (Text value : values) { // 累加 String[] array = value.toString().split(","); sum += Double.parseDouble(array[1]);//似然概率 sum2 += Double.parseDouble(array[3]);//似然概率 } if (sum>sum2) { val.set("类别1"); }else { val.set("类别0"); } context.write(key, val); }; } @Override public int run(String[] args) throws Exception { // 1 conf Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", ",");// key value分隔符 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);//为该job添加缓存文件 // 2 create job // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName()); Job job = this.parseInputAndOutput(this, conf, args); // 3 set job // 3.1 set run jar class // job.setJarByClass(ModuleReducer.class); // 3.2 set intputformat job.setInputFormatClass(TextInputFormat.class); // 3.3 set input path // FileInputFormat.addInputPath(job, new Path(args[0])); // 3.4 set mapper job.setMapperClass(MyMap.class); // 3.5 set map output key/value class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 3.6 set partitioner class // job.setPartitionerClass(HashPartitioner.class); // 3.7 set reduce number // job.setNumReduceTasks(0); // 3.8 set sort comparator class // job.setSortComparatorClass(LongWritable.Comparator.class); // 3.9 set group comparator class // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 3.10 set combiner class // job.setCombinerClass(null); // 3.11 set reducer class job.setReducerClass(MyReduce.class); // 3.12 set output format job.setOutputFormatClass(TextOutputFormat.class); // 3.13 job output key/value class job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 3.14 set job output path // FileOutputFormat.setOutputPath(job, new Path(args[1])); // 4 submit job boolean isSuccess = job.waitForCompletion(true); // 5 exit // System.exit(isSuccess ? 0 : 1); return isSuccess ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws Exception { // validate // if (args.length != 2) { // System.err.printf("Usage:%s [genneric options]<input><output>\n", // tool.getClass().getSimpleName()); // ToolRunner.printGenericCommandUsage(System.err); // return null; // } // 2 create job Job job = new Job(conf, tool.getClass().getSimpleName()); // 3.1 set run jar class job.setJarByClass(tool.getClass()); // 3.3 set input path FileInputFormat.addInputPath(job, new Path(args[0])); // 3.14 set job output path FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://192.168.192.129:9000/ml/test.txt", // "hdfs://hadoop-00:9000/home910/liyuting/output/" }; "hdfs://192.168.192.129:9000/ml/bayes/result/", "hdfs://192.168.192.129:9000/ml/bayes/pall/part-r-00000"}; // run mapreduce int status = ToolRunner.run(new Bayes4(), args); // 5 exit System.exit(status); } }
时间: 2024-10-16 22:22:40