mr微博内容推荐

 
 

 第一次迭代 1 package com.laoxiao.mr.weibo;
  2
  3 import java.io.StringReader;
  4
  5 import org.apache.commons.lang.StringUtils;
  6 import org.apache.hadoop.io.IntWritable;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.wltea.analyzer.core.IKSegmenter;
 11 import org.wltea.analyzer.core.Lexeme;
 12
 13 /**
 14  * 第一个MR,计算TF和计算N(微博总数)
 15  * @author root
 16  *
 17  */
 18 public class firstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 19
 20     protected void map(LongWritable key, Text value, Context context)
 21             throws java.io.IOException ,InterruptedException {
 22         String [] temp=StringUtils.split(value.toString(),"\t");
 23         if(temp.length>=2){
 24             String id=temp[0].trim();
 25             String str=temp[1].trim();
 26             StringReader sr =new StringReader(str);
 27             IKSegmenter ikSegmenter =new IKSegmenter(sr, true);
 28             Lexeme word=null;
 29             while( (word=ikSegmenter.next()) !=null ){
 30                 String w= word.getLexemeText();
 31                 context.write(new Text(w+"_"+id), new IntWritable(1));
 32             }
 33             context.write(new Text("count"), new IntWritable(1));
 34         }else{
 35             System.out.println("value is error:"+value.toString());
 36         }
 37     };
 38 }
 39 package com.laoxiao.mr.weibo;
 40
 41 import org.apache.hadoop.io.IntWritable;
 42 import org.apache.hadoop.io.Text;
 43 import org.apache.hadoop.mapreduce.Reducer;
 44
 45 import sun.management.resources.agent;
 46
 47 public class firstReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
 48
 49     protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context arg2)
 50             throws java.io.IOException ,InterruptedException {
 51         int sum=0;
 52         for (IntWritable i : arg1) {
 53             sum+=i.get();
 54         }
 55         arg2.write(arg0, new IntWritable(sum));
 56     };
 57 }
 58
 59 package com.laoxiao.mr.weibo;
 60
 61 import org.apache.hadoop.io.IntWritable;
 62 import org.apache.hadoop.io.Text;
 63 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 64
 65 public class firstRepartition extends HashPartitioner<Text, IntWritable>{
 66
 67     @Override
 68     public int getPartition(Text key, IntWritable value, int numReduceTasks) {
 69         if(key.toString().equals("count")){
 70             return 3;
 71         }else{
 72             return super.getPartition(key, value, numReduceTasks-1);
 73         }
 74
 75     }
 76 }
 77
 78
 79 package com.laoxiao.mr.weibo;
 80
 81
 82 import org.apache.hadoop.conf.Configuration;
 83 import org.apache.hadoop.fs.FileSystem;
 84 import org.apache.hadoop.fs.Path;
 85 import org.apache.hadoop.io.IntWritable;
 86 import org.apache.hadoop.io.Text;
 87 import org.apache.hadoop.mapreduce.Job;
 88 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 89 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 90
 91
 92 public class firstJob {
 93
 94     public static void main(String[] args) {
 95         Configuration config=new Configuration();
 96         config.set("fs.defaultFS", "hdfs://node1:8020");
 97         config.set("yarn.resourcemanager.hostname", "node1");
 98         try {
 99             FileSystem fs =FileSystem.get(config);
100             Job job=Job.getInstance(config);
101             job.setJarByClass(firstJob.class);
102             job.setJobName("weibo1");
103
104
105             job.setMapperClass(firstMapper.class);
106             job.setReducerClass(firstReducer.class);
107             job.setMapOutputKeyClass(Text.class);
108             job.setMapOutputValueClass(IntWritable.class);
109             job.setPartitionerClass(firstRepartition.class);
110             //job.setCombinerClass(firstReducer.class);
111             job.setNumReduceTasks(4);
112
113             FileInputFormat.addInputPath(job, new Path("/root/input/data/weibo.txt"));
114
115             Path path =new Path("/usr/output/weibo1");
116             if(fs.exists(path)){
117                 fs.delete(path, true);
118             }
119             FileOutputFormat.setOutputPath(job,path);
120
121             boolean f= job.waitForCompletion(true);
122             if(f){
123                 System.out.println("first job run finished!!");
124             }
125
126         } catch (Exception e) {
127             // TODO Auto-generated catch block
128             e.printStackTrace();
129         }
130     }
131 }

第二次迭代

  1 package com.laoxiao.mr.weibo;
  2
  3 import java.io.IOException;
  4
  5 import org.apache.hadoop.io.IntWritable;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 10 //统计df:词在多少个微博中出现过。
 11 public class secondMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 12
 13     protected void map(LongWritable key, Text value, Context context)
 14             throws IOException, InterruptedException {
 15
 16         //获取当前    mapper task的数据片段(split)
 17         FileSplit fs = (FileSplit) context.getInputSplit();
 18
 19         if (!fs.getPath().getName().contains("part-r-00003")) {
 20
 21             String[] v = value.toString().trim().split("\t");
 22             if (v.length >= 2) {
 23                 String[] ss = v[0].split("_");
 24                 if (ss.length >= 2) {
 25                     String w = ss[0];
 26                     context.write(new Text(w), new IntWritable(1));
 27                 }
 28             } else {
 29                 System.out.println(value.toString() + "-------------");
 30             }
 31         }
 32
 33     }
 34 }
 35 package com.laoxiao.mr.weibo;
 36
 37 import org.apache.hadoop.io.IntWritable;
 38 import org.apache.hadoop.io.Text;
 39 import org.apache.hadoop.mapreduce.Reducer;
 40
 41 public class secondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
 42
 43     protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context context)
 44             throws java.io.IOException ,InterruptedException {
 45         int sum=0;
 46         for (IntWritable i : arg1) {
 47             sum+=1;
 48         }
 49         context.write(arg0, new IntWritable(sum));
 50     };
 51 }
 52 package com.laoxiao.mr.weibo;
 53
 54
 55 import org.apache.hadoop.conf.Configuration;
 56 import org.apache.hadoop.fs.FileSystem;
 57 import org.apache.hadoop.fs.Path;
 58 import org.apache.hadoop.io.IntWritable;
 59 import org.apache.hadoop.io.Text;
 60 import org.apache.hadoop.mapreduce.Job;
 61 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 62 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 63
 64
 65 public class secondJob {
 66
 67     public static void main(String[] args) {
 68         Configuration config=new Configuration();
 69         config.set("fs.defaultFS", "hdfs://node1:8020");
 70         config.set("yarn.resourcemanager.hostname", "node1");
 71         try {
 72             FileSystem fs =FileSystem.get(config);
 73             Job job=Job.getInstance(config);
 74             job.setJarByClass(secondJob.class);
 75             job.setJobName("weibo2");
 76
 77
 78             job.setMapperClass(secondMapper.class);
 79             job.setReducerClass(secondReducer.class);
 80             job.setMapOutputKeyClass(Text.class);
 81             job.setMapOutputValueClass(IntWritable.class);
 82             //job.setPartitionerClass(firstRepartition.class);
 83             //job.setCombinerClass(firstReducer.class);
 84             //job.setNumReduceTasks(4);
 85
 86             FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
 87
 88             Path path =new Path("/usr/output/weibo2");
 89             if(fs.exists(path)){
 90                 fs.delete(path, true);
 91             }
 92             FileOutputFormat.setOutputPath(job,path);
 93
 94             boolean f= job.waitForCompletion(true);
 95             if(f){
 96                 System.out.println("second job run finished!!");
 97             }
 98
 99         } catch (Exception e) {
100             // TODO Auto-generated catch block
101             e.printStackTrace();
102         }
103     }
104 }

第三次迭代

package com.laoxiao.mr.weibo;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

/**
 * 最后计算
 * @author root
 *
 */
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
    //存放微博总数
    public static Map<String, Integer> cmap = null;
    //存放df
    public static Map<String, Integer> df = null;

    // 在map方法执行之前
    protected void setup(Context context) throws IOException,
            InterruptedException {
        System.out.println("******************");
        if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {

            URI[] ss = context.getCacheFiles();
            if (ss != null) {
                for (int i = 0; i < ss.length; i++) {
                    URI uri = ss[i];
                    if (uri.getPath().endsWith("part-r-00003")) {//微博总数
                        Path path =new Path(uri.getPath());
//                        FileSystem fs =FileSystem.get(context.getConfiguration());
//                        fs.open(path);
                        BufferedReader br = new BufferedReader(new FileReader(path.getName()));
                        String line = br.readLine();
                        if (line.startsWith("count")) {
                            String[] ls = line.split("\t");
                            cmap = new HashMap<String, Integer>();
                            cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
                        }
                        br.close();
                    } else if (uri.getPath().endsWith("part-r-00000")) {//词条的DF
                        df = new HashMap<String, Integer>();
                        Path path =new Path(uri.getPath());
                        BufferedReader br = new BufferedReader(new FileReader(path.getName()));
                        String line;
                        while ((line = br.readLine()) != null) {
                            String[] ls = line.split("\t");
                            df.put(ls[0], Integer.parseInt(ls[1].trim()));
                        }
                        br.close();
                    }
                }
            }
        }
    }

    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        FileSplit fs = (FileSplit) context.getInputSplit();
//        System.out.println("--------------------");
        if (!fs.getPath().getName().contains("part-r-00003")) {

            String[] v = value.toString().trim().split("\t");
            if (v.length >= 2) {
                int tf =Integer.parseInt(v[1].trim());//tf值
                String[] ss = v[0].split("_");
                if (ss.length >= 2) {
                    String w = ss[0];
                    String id=ss[1];

                    double s=tf * Math.log(cmap.get("count")/df.get(w));
                    NumberFormat nf =NumberFormat.getInstance();
                    nf.setMaximumFractionDigits(5);
                    context.write(new Text(id), new Text(w+":"+nf.format(s)));
                }
            } else {
                System.out.println(value.toString() + "-------------");
            }
        }
    }
}
package com.laoxiao.mr.weibo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class LastReduce extends Reducer<Text, Text, Text, Text>{

    protected void reduce(Text key, Iterable<Text> arg1,
            Context context)
            throws IOException, InterruptedException {

        StringBuffer sb =new StringBuffer();

        for( Text i :arg1 ){
            sb.append(i.toString()+"\t");
        }

        context.write(key, new Text(sb.toString()));
    }

}
package com.laoxiao.mr.weibo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LastJob {

    public static void main(String[] args) {
        Configuration config =new Configuration();
        config.set("fs.defaultFS", "hdfs://node1:8020");
        config.set("yarn.resourcemanager.hostname", "node1");
        //config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
        try {
            FileSystem fs =FileSystem.get(config);
            //JobConf job =new JobConf(config);
            Job job =Job.getInstance(config);
            job.setJarByClass(LastJob.class);
            job.setJobName("weibo3");

//            DistributedCache.addCacheFile(uri, conf);
            //2.5
            //把微博总数加载到内存
            job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
            //把df加载到内存
            job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());

            //设置map任务的输出key类型、value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
//            job.setMapperClass();
            job.setMapperClass(LastMapper.class);
            job.setReducerClass(LastReduce.class);

            //mr运行时的输入数据从hdfs的哪个目录中获取
            FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
            Path outpath =new Path("/usr/output/weibo3");
            if(fs.exists(outpath)){
                fs.delete(outpath, true);
            }
            FileOutputFormat.setOutputPath(job,outpath );

            boolean f= job.waitForCompletion(true);
            if(f){
                System.out.println("执行job成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
时间: 2024-10-14 22:40:05

mr微博内容推荐的相关文章

微博广告推荐中有关Hadoop的那些事

一.背景 微博,一个DAU上亿.每日发博量几千万的社交性产品,拥有庞大的数据集.如何高效得从如此规模的数据集中挖掘出有价值的信息,以增强用户粘性,提高信息传播速度,就成了重中之重.因此,我们引入了hadoop 分布式计算平台,对用户数据和内容数据进行分析和挖掘,作为广告推荐的基础. 二.问题及解决方案 在hadoop平台上进行开发时,主要遇到了以下一些问题: 2.1 数据量庞大 问题:无论在进行针对用户的协同过滤运算,还是在计算用户可能错过的微博中,无一例外的都遇到了数据量太大无法进行运算的情况

新浪微博客户端(55)-高亮显示微博内容中的昵称,话题,超链接

DJStatus.h #import <Foundation/Foundation.h> @class DJUser; /** 微博 */ @interface DJStatus : NSObject /** 微博id */ @property (nonatomic,copy) NSString *idstr; /** 微博内容 */ @property (nonatomic,copy) NSString *text; /** 微博内容(带属性) */ @property (nonatomic

新浪微博客户端(56)-拼接微博内容中的昵称,超链接,表情图片

DJStatusPart.h #import <Foundation/Foundation.h> @interface DJStatusPart : NSObject /** 文本块内容 */ @property (nonatomic,copy) NSString *text; /** 文本块范围 */ @property (nonatomic,assign) NSRange range; /** 当前文本块是否是特殊文本(昵称,超链接,Emoji) */ @property (nonatom

基于KNN的相关内容推荐

如果做网站的内容运营,相关内容推荐可以帮助用户更快地寻找和发现感兴趣的信息,从而提升网站内容浏览的流畅性,进而提升网站的价值转化.相关内容 推荐最常见的两块就是“关联推荐”和“相关内容推荐”,关联推荐就是我们常说的购物篮分析,即使用购买了某商品的用户同时购买了什么这个规则来发现商品间 的潜在联系,之前有相关的文章介绍——向上营销.交叉营销与关联推荐:关联推荐是基于用户行为分析的推荐,而相关内容推荐是基于内容固有特征的推荐,只与内容本身有关,与用户的行为完全无关,所以相关内容推荐的模型是一种“冷启

基于朴素贝叶斯的内容推荐算法

论文出处: http://www.cs.utexas.edu/~ml/papers/libra-sigir-wkshp-99.pdf 引言 这篇文章里面将会详细介绍基于多项式贝叶斯的内容推荐算法的符号以及术语,公式推导以及核心思想,学习如何从文本分类的角度来实现物品推荐.详细了解算法过程后,你应该可以利用里面的公式来计算出某个用户对于单词级别的喜好强度列表(profile),根据这个强度大小来对其他物品(需先用该强度来对该物品做加权算出该物品的喜好强度)做一个推荐的排序,从而得到用户可能最喜欢的

防微博内容展示,使用Html.fromHtml(),解决内容不能换行的问题

使用Html.fromHtml(),解决内容不能换行的问题,模仿微博内容展示效果. 一.需求要实现的效果 如下图中箭头指向的微博内容部分,包含超链接,点击超链接后要跳转到相应的WebView页面.(csdn上传图片试了好多遍也不成功,大家脑补一下吧,辛苦了).    二. 实现思路 首先获取网络数据,通过Html.fromHtml()解析获取到的数据,这时超链接<a></a>.段落符<p>.换行符<br>等将会被展示成对应的表现形式,就会出现上图所示的效果.

零授权 抓取新浪微博任何用户的微博内容

一.微博API 使用微博API获取数据是最简单方便,同时数据完整性高的方式,缺点是微博开发平台对于API的调用次数做了严格的限制.具体使用过程参考http://open.weibo.com/,有详细的教程,对于API次数的限制,我们是通过注册多个开发者账号来绕过,对于某个IP调用API次数的限制,暂时没办法解决.微博API是通过httpclient发起请求,返回json形式的数据.对于数据重复获取方面,也有专门的接口通过参数控制获取增量数据.优点:简单,数据完整性高,增量简单.缺点:API次数有

python 新闻推荐系统(基于新闻内容推荐)

# -*- coding:utf-8 -*-__version__ = '1.0.0.0'"""@brief : 基于新闻的内容推荐系统@details: 详细信息@author : zhphuang@date : 2019-08-07"""import jieba from pandas import *from sklearn.metrics import pairwise_distancesfrom bs4 import Beautiful

新浪微博客户端(58)-处理点击微博内容中的关键字

DJStatus.m // 创建一个用于包含特殊文本的集合 NSMutableArray *specialTextArray = [NSMutableArray array]; // 取出数组中的文本块进行拼接 for (DJStatusPart *part in statusParts) { NSAttributedString *subString = nil; if (part.isSpecial) { // 判断是否是特殊文本(若是特殊文本,则进行特殊处理,超链接:变色,表情文本:更换成