21-hadoop-weibo推送广告

1, tf-idf

计算每个人的词条中的重要度

需要3个mapreduce 的 job执行, 第一个计算 TF 和 n, 第二个计算 DF, 第三个代入公式计算结果值

1, 第一个job

package com.wenbronk.weibo;

import java.io.IOException;
import java.io.StringReader;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

/**
 * 第一个map, 计算 TF 和 N
 *
 * @author root
 *
 */
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * TF 在一个文章中出现的词频 N 总共多少文章
     * 按行传入
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        String[] values = value.toString().trim().split("\t");

        if (values.length >= 2) {
            String id = values[0].trim();
            String content = values[1].trim();

            // 分词
            StringReader stringReader = new StringReader(content);
            IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
            Lexeme word = null;
            while ((word = ikSegmenter.next()) != null ) {
                String w = word.getLexemeText();
                context.write(new Text(w + "_" + id), new IntWritable(1));
            }
            context.write(new Text("count"), new IntWritable(1));
        }else {
            System.out.println(values.toString() + "---");
        }

    }

}

reduce

package com.wenbronk.weibo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 统计tf, n
 * @author root
 *
 */
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text arg0, Iterable<IntWritable> arg1,
            Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable intWritable : arg1) {
            sum += intWritable.get();
        }
        if (arg0.equals(new Text("count"))) {
            System.err.println(arg0.toString() + "---");
        }
        arg2.write(arg0, new IntWritable(sum));
    }

}

partition

package com.wenbronk.weibo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 决定分区, 计划分4个, n一个, tf三个
 * @author root
 *
 */
public class FirstPartition extends HashPartitioner<Text, IntWritable>{

    @Override
    public int getPartition(Text key, IntWritable value, int numReduceTasks) {
        if (key.equals(new Text("count"))) {
            return 3;
        }else {
            return super.getPartition(key, value, numReduceTasks - 1);
        }

    }

}

mainJob

package com.wenbronk.weibo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FirstJob {

    public static void main(String[] args) {
        Configuration config = new Configuration();
        config.set("fs.defaults", "hdfs://192.168.208.106:8020");
        config.set("yarn.resourcemanager.hostname", "192.168.208.106");
//        config.set("maper.jar", "E:\\sxt\\target\\weibo1.jar");

        try {

            Job job = Job.getInstance(config);
            job.setJarByClass(FirstJob.class);
            job.setJobName("first");

            job.setPartitionerClass(FirstPartition.class);
            job.setMapperClass(FirstMapper.class);
            job.setNumReduceTasks(4);
            job.setCombinerClass(FirstReducer.class);
            job.setReducerClass(FirstReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weibo2.txt"));

            FileSystem fileSystem = FileSystem.get(config);

            Path outPath = new Path("E:\\sxt\\1-MapReduce\\data\\weibo1");
            if (fileSystem.exists(outPath)) {
                fileSystem.delete(outPath);
            }
            FileOutputFormat.setOutputPath(job, outPath);

            boolean waitForCompletion = job.waitForCompletion(true);
            if (waitForCompletion) {
                System.out.println("first success");
            }

        }catch (Exception e) {
            e.printStackTrace();
        }

    }

}

2, 第二个

package com.wenbronk.weibo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 计算 DFi的值, 在多少个文章中出现过
 *
 */
public class SecondMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        // 获取当前maptask的数据片段
        FileSplit inputSplit = (FileSplit) context.getInputSplit();

        // count不被统计
        if (!inputSplit.getPath().getName().contains("part-r-00003")) {

            String[] values = value.toString().trim().split("\t");

            if (values.length >= 2) {
                String[] split = values[0].trim().split("_");
                if (split.length >= 2) {
                    String id = split[0];
                    context.write(new Text(id), new IntWritable(1));
                }
            }
        }else {
            System.out.println(value.toString() + "----");
        }

    }

}

reduce

package com.wenbronk.weibo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 *
 * @author root
 *
 */
public class SecondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text arg0, Iterable<IntWritable> arg1,
            Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable intWritable : arg1) {
            sum += intWritable.get();
        }
        arg2.write(new Text(arg0), new IntWritable(sum));
    }

}

mainjob

package com.wenbronk.weibo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondJob {

    public static void main(String[] args) {
        Configuration config = new Configuration();
        config.set("fs.default", "hdfs://192.168.208.106:8020");
        config.set("yarn.resourcemanager.hostname", "192.168.208.106");

        try {

            Job job = Job.getInstance(config);
            job.setJarByClass(SecondJob.class);
            job.setJobName("second");

            job.setMapperClass(SecondMapper.class);
            job.setCombinerClass(SecondReducer.class);
            job.setReducerClass(SecondReducer.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weibo1"));

            FileSystem fileSystem = FileSystem.get(config);
            Path outPath = new Path("E:\\sxt\\1-MapReduce\\data\\weibo2");
            if (fileSystem.exists(outPath)) {
                fileSystem.delete(outPath);
            }
            FileOutputFormat.setOutputPath(job, outPath);

            boolean f = job.waitForCompletion(true);
            if (f) {
                System.out.println("job2 success");
            }

        }catch(Exception e) {
            e.printStackTrace();
        }

    }

}

3, 第三个Job

package com.wenbronk.weibo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ThirdMapper extends Mapper<LongWritable, Text, Text, Text>{

    //存放微博总数, 将小数据缓存进内存, 预加载
        public static Map<String, Integer> cmap = null;
        //存放df
        public static Map<String, Integer> df = null;

        // 在初始化类时执行, 将数据预加载进map
        protected void setup(Context context)
                throws IOException, InterruptedException {

            System.out.println("*****");
            if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
                URI[] cacheFiles = context.getCacheFiles();
                if (cacheFiles != null) {
                    for (URI uri : cacheFiles) {
                        if (uri.getPath().endsWith("part-r-00003")) {
                            Path path = new Path(uri.getPath());
                            // 获取文件
                            Configuration configuration = context.getConfiguration();
                            FileSystem fs = FileSystem.get(configuration);
                            FSDataInputStream open = fs.open(path);
                            BufferedReader reader = new BufferedReader(new InputStreamReader(open));

//                            BufferedReader reader = new BufferedReader(new FileReader(path.getName()));
                            String line = reader.readLine();
                            if (line.startsWith("count")) {
                                String[] split = line.split("\t");
                                cmap = new HashMap<>();
                                cmap.put(split[0], Integer.parseInt(split[1].trim()));
                            }
                            reader.close();
                        }else if (uri.getPath().endsWith("part-r-00000")) {
                            df = new HashMap<>();
                            Path path = new Path(uri.getPath());

                            // 获取文件
                            Configuration configuration = context.getConfiguration();
                            FileSystem fs = FileSystem.get(configuration);
                            FSDataInputStream open = fs.open(path);
                            BufferedReader reader = new BufferedReader(new InputStreamReader(open));
//                            BufferedReader reader = new BufferedReader(new FileReader(path.getName()));

                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                String[] ls = line.split("\t");
                                df.put(ls[0], Integer.parseInt(ls[1].trim()));
                            }
                            reader.close();
                        }
                    }
                }
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // 获取分片
            FileSplit inputSplit = (FileSplit) context.getInputSplit();

            if (!inputSplit.getPath().getName().contains("part-r-00003")) {
                String[] values = value.toString().trim().split("\t");

                if (values.length >= 2) {

                    int tf = Integer.parseInt(values[1].trim());

                    String[] ss = values[0].split("_");
                    if (ss.length >= 2) {
                        String word = ss[0];
                        String id = ss[1];

                        // 公式
                        Double s = tf * Math.log(cmap.get("count")) / df.get(word);
                        NumberFormat format = NumberFormat.getInstance();
                        // 取小数点后5位
                        format.setMaximumFractionDigits(5);

                        context.write(new Text(id), new Text(word + ": " + format.format(s)));
                    }else {
                        System.out.println(value.toString() + "------");
                    }
                }
            }
        }
}

reduce

package com.wenbronk.weibo;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ThirdReducer extends Reducer<Text, Text, Text, Text>{

    @Override
    protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
            throws IOException, InterruptedException {

        StringBuffer sb = new StringBuffer();
        for (Text text : arg1) {
            sb.append(text.toString() + "\t");
        }
        arg2.write(arg0, new Text(sb.toString()));
    }

}

mainJob

package com.wenbronk.weibo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ThirdJob {

    public static void main(String[] args) {

        Configuration config = new Configuration();
        config.set("fs.defaults", "hdfs://192.168.208.106:8020");
        config.set("yarn.resourcemanager.hostname", "192.168.208.106");
        try {
            Job job = Job.getInstance(config);
            job.setJarByClass(ThirdJob.class);
            job.setJobName("third");
//            job.setInputFormatClass(KeyValueTextInputFormat.class);

            //把微博总数加载到内存
            job.addCacheFile(new Path("E:\\sxt\\1-MapReduce\\data\\weibo1\\part-r-00003").toUri());
            //把df加载到内存
            job.addCacheFile(new Path("E:\\sxt\\1-MapReduce\\data\\weibo2\\part-r-00000").toUri());

            job.setMapperClass(ThirdMapper.class);
            job.setReducerClass(ThirdReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            FileSystem fs = FileSystem.get(config);
            FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weibo1"));
            Path path = new Path("E:\\sxt\\1-MapReduce\\data\\weibo3");
            if (fs.exists(path)) {
                fs.delete(path);
            }
            FileOutputFormat.setOutputPath(job, path);

            boolean waitForCompletion = job.waitForCompletion(true);
            if(waitForCompletion) {
                System.out.println("执行job成功");
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

}

系列来自尚学堂视频

时间: 2024-10-09 10:28:06

21-hadoop-weibo推送广告的相关文章

无良斐讯路由器强制推送广告

2017年7月以来,经常发现网页右下被强制插入iframe广告,广告内容多为假借“今日头条.东方头条”的名义,其实内容与其无关. 最初弹出次数较少,不以为然.以为是运营商的DNS劫持.后来愈加频繁,尤其是晚间,每5次访问网页就会被强插1次iframe广告. 多方排查,最终确定为斐讯路由器强制推送广告. 早有耳闻一些国产智能路由器会搜集用户信息,没想到竟然会在不告知用户的情况下,强制插入广告.实在缺德. 我还如何信任你?智能路由. 以下为转载,原文地址:http://www.right.com.c

android极光推送

版权声明:本文为博主原创文章,未经博主允许不得转载. Android开发记录18-集成推送服务的一点说明 关于推送服务,国内有很多选择,笔者也对它们进行了一个详细的对比,一般我们产品选择推送服务主要考量以下几个要素: 1.是否收费,如何收费? 2.推送内容是是什么(是否包含通知.消息.富媒体等等) 3.稳定性.及时性如何? 4.集成难度是否简单 5.支持平台有哪些(主流Android.IOS) 6.服务端支持语言(Java.C#.PHP.Python等) 下面笔者例举国内主要的一些推送服务: 来

Ionic JPush极光推送 插件实例

1.需要去这里注册https://www.jiguang.cn 注册成功获取AppKey 备注填写应用包名规范点,在项目还要用那 2.创建ionic 项目 指定你注册时候的包名(假如:com.ionicframework.ltapp) ionic start  -i com.ionicframework.ltapp ltapp blank 3.添加JPush 插件 进入 项目目录下 cd  ltapp git clone https://github.com/jpush/jpush-phoneg

推送 从入门到放弃

推送 推送简直就是一种轻量级的骚扰方式 自从有了推送,各个公司基本上都在使用推送,这确实是一个比较好的提醒方式,Android较iOS强的一个部分,也就是在于Android的Notification.Google教育我们利用好Android的通知模块,做更多友好的交互,可这句话,翻译成中文,不知不觉,就变成了在Notification中推送各种广告,而且仅仅就是一些广告,Notification各种牛逼的功能,完全不需要,这也违背了Google设计Notification的初衷. 更关键的是,现

Android开发记录18-集成极光推送的一点说明

Android开发记录18-集成推送服务的一点说明 关于推送服务,国内有很多选择,笔者也对它们进行了一个详细的对比,一般我们产品选择推送服务主要考量以下几个要素: 1.是否收费,如何收费? 2.推送内容是是什么(是否包含通知.消息.富媒体等等) 3.稳定性.及时性如何? 4.集成难度是否简单 5.支持平台有哪些(主流Android.IOS) 6.服务端支持语言(Java.C#.PHP.Python等) 下面笔者例举国内主要的一些推送服务: 来自Devstore的统计,共收录了国内21家推送服务,

MIUI(ADUI)关闭广告推送步骤方法

MIUI自从到了版本MIUI8之后,系统增加了各种推送,让人们所诟病.很多消费者因为这个原因,不再考虑小米手机,尽管小米手机确实很便宜. 下面就说一下如何关闭所有的MIUI 8的广告推送.方法源自MIUI官方论坛. 1.应用商店.我的->设置->接收推送通知 2.浏览器.右下角三条横杠->设置->消息通知管理->接收通知栏消息 3.音乐.我的->设置->更多高级设置->资讯广告推荐 4.下载管理.右上角三个点->设置->信息流设置->资源推

09_用户行为分析_广告精准推送项目介绍

1.用户行为分析 用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析,从中发现用户访问网站的规律, 并将这些规律与网络营销策略等相结合,从而发现目前网络营销活动中可能存在的问题,并为进一步修正或重新制定网络营 销策略提供依据.这是狭义的只指网络上的用户行为分析 意义:通过对用户行为监测获得的数据进行分析,可以让企业更加详细.清楚地了解用户的行为习惯,从而找出网站.推 广渠道等企业营销环境存在的问题 2.用户行为分析与个性化推送服务技术: 用户的行为分析是基于数据的,它利

针对微信的一篇推送附有的数据链接进行MapReduce统计

原推送引用:https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg 版权归原作者所有,如有侵权请及时联系本人,见谅! 原文采用Excel进行统计数据,这里采用刚学习的工具进行练习. 1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.a

iOS10推送必看UNNotificationServiceExtension

转:http://www.cocoachina.com/ios/20161017/17769.html (收录供个人学习用) iOS10推送UNNotificationServic 招聘信息: 产品经理/Product Manager Unity开发工程师 高级iOS工程师 SDK产品经理 cocos2d-x游戏开发工程师 iOS高级开发工程师 [成都 | 远程办公] 招聘Android工程师 技术合伙人 移动应用开发项目经理 iOS开发工程师 移动APP研发工程师 如果大家还没有看我的这两篇文