MapReduce案例----影评分析(年份,电影id,电影名字,平均评分)

题目:

 1 现有如此三份数据:(这里只需用后两份)
 2 1、users.dat    数据格式为:  2::M::56::16::70072
 3 对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
 4 对应字段中文解释:用户id,性别,年龄,职业,邮政编码
 5
 6 2、movies.dat        数据格式为:1::Toy Story (1995)::Animation|Children‘s|Comedy  ;  2::Jumanji (1995)::Adventure|Children‘s|Fantasy  ;  3::Grumpier Old Men (1995)::Comedy|Romance
 7 对应字段为:MovieID BigInt, Title String, Genres String
 8 对应字段中文解释:电影ID,电影名字,电影类型
 9
10 3、ratings.dat        数据格式为:  1::1193::5::978300760  ;  1::661::3::978302109  ;  1::914::3::978301968
11 对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
12 对应字段中文解释:用户ID,电影ID,评分,评分时间戳
13
14 用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
15 userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
16 需求:
17     关联两张表。
18     计算每部电影的平均评分,并按评分大小进行排序。评分一样,按照电影名排序。
19     (1):按照年份进行分组,要求结果展示形式:
20         年份,电影id,电影名字,平均分。

思路:

  首先从 ratings.dat 中计算出电影id,平均评分。得出一个中间表。

  通过分析,中间表比 movis.dat 要小,所以优先考虑将中间表加载到内存中,写入到一个hashmap中,做 map join。

  Map 端处理movies.dat 中的数据,根据电影 id 关联 hashmap,得到该电影的平均评分,并提取出电影的年份。

  将年份,电影id,电影名字,平均评分封装到一个对象中,然后自定义排序规则。按照电影平均评分大小排序。

  然后自定义分区,将相同年份的分到一个分区中。使得相同年份的数据出现在一个文件中。

求出平均评分代码:

 1 package com.lhb.demo;
 2 import org.apache.hadoop.conf.Configuration;
 3 import org.apache.hadoop.fs.FileSystem;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.DoubleWritable;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 import java.io.IOException;
14
15 public class Test1AvgRate {
16     //map端
17     public static class Test1AvgRateMapper extends Mapper<LongWritable, Text, LongWritable, DoubleWritable> {
18         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
19             String[] split = value.toString().split("::");
20             if (split.length >= 4) {
21                 context.write(new LongWritable(Long.valueOf(split[1])), new DoubleWritable(Double.valueOf(split[2])));
22             }
23         }
24     }
25     //reducer端
26     public static class Test1AvgRateReducer extends Reducer<LongWritable, DoubleWritable, LongWritable, DoubleWritable> {
27         protected void reduce(LongWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
28             double sum = 0.0;
29             int num = 0;
30             for (DoubleWritable value : values) {
31                 sum += value.get();
32                 num++;
33             }
34             Double avg = sum / num;
35             context.write(key, new DoubleWritable(avg));
36         }
37     }
38     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
39         Configuration conf = new Configuration();
40         Job job = Job.getInstance(conf);
41         job.setJarByClass(Test1AvgRate.class);
42         job.setMapperClass(Test1AvgRateMapper.class);
43         job.setReducerClass(Test1AvgRateReducer.class);
44
45         //指定map和reduce输出数据的类型
46         job.setMapOutputKeyClass(LongWritable.class);
47         job.setMapOutputValueClass(DoubleWritable.class);
48         job.setOutputKeyClass(LongWritable.class);
49         job.setOutputValueClass(DoubleWritable.class);
50
51         FileInputFormat.setInputPaths(job, new Path("文件所在路径"));
52         FileSystem fs = FileSystem.get(conf);
53         Path outPath = new Path("输出路径");
54         //判断文件是否存在
55         if (fs.exists(outPath)) {
56             fs.delete(outPath, true);
57         }
58         FileOutputFormat.setOutputPath(job, outPath);
59         boolean b = job.waitForCompletion(true);
60         System.exit(b ? 0 : 1);
61     }
62 }        

平均评分部分显示结果:

案例1代码:

 1 package com.lhb.demo;
 2
 3 import org.apache.hadoop.io.WritableComparable;
 4 import java.io.DataInput;
 5 import java.io.DataOutput;
 6 import java.io.IOException;
 7
 8 public class MovieBean1 implements WritableComparable<MovieBean1> {
 9     private int movie_year;
10     private long movie_id;
11     private String movie_name;
12     private double movie_avg_rae;
13
14     public MovieBean1() {
15     }
16
17     public MovieBean1(int movie_year, long movie_id, String movie_name, double movie_avg_rae) {
18         this.movie_year = movie_year;
19         this.movie_id = movie_id;
20         this.movie_name = movie_name;
21         this.movie_avg_rae = movie_avg_rae;
22     }
23
24     public int getMovie_year() {
25         return movie_year;
26     }
27
28     public void setMovie_year(int movie_year) {
29         this.movie_year = movie_year;
30     }
31
32     public long getMovie_id() {
33         return movie_id;
34     }
35
36     public void setMovie_id(long movie_id) {
37         this.movie_id = movie_id;
38     }
39
40     public String getMovie_name() {
41         return movie_name;
42     }
43
44     public void setMovie_name(String movie_name) {
45         this.movie_name = movie_name;
46     }
47
48     public double getMovie_avg_rae() {
49         return movie_avg_rae;
50     }
51
52     public void setMovie_avg_rae(double movie_avg_rae) {
53         this.movie_avg_rae = movie_avg_rae;
54     }
55
56     public String toString() {
57         return "movie{" +
58                 "year=" + movie_year +
59                 ", id=" + movie_id +
60                 ", name=‘" + movie_name + ‘\‘‘ +
61                 ", avg=" + movie_avg_rae +
62                 ‘}‘;
63     }
64
65     public int compareTo(MovieBean1 o) {
66         if (o.movie_year == this.movie_year) {
67             return o.movie_avg_rae > this.movie_avg_rae ? 1 : -1;
68         } else {
69             return o.movie_year > this.movie_year ? 1 : -1;
70         }
71     }
72
73     public void write(DataOutput dataOutput) throws IOException {
74         dataOutput.writeInt(this.movie_year);
75         dataOutput.writeLong(this.movie_id);
76         dataOutput.writeUTF(this.movie_name);
77         dataOutput.writeDouble(this.movie_avg_rae);
78     }
79
80     public void readFields(DataInput dataInput) throws IOException {
81         this.movie_year = dataInput.readInt();
82         this.movie_id = dataInput.readLong();
83         this.movie_name = dataInput.readUTF();
84         this.movie_avg_rae = dataInput.readDouble();
85     }
86 }
 1 package com.lhb.test.homework.test;
 2 import org.apache.hadoop.io.NullWritable;
 3 import org.apache.hadoop.mapreduce.Partitioner;
 4
 5 public class YearPartitioner extends Partitioner<MovieBean1, NullWritable> {
 6     public int getPartition(MovieBean1 movieBean1, NullWritable nullWritable, int i) {
 7         int movie_year = movieBean1.getMovie_year();
 8         return movie_year % i;
 9     }
10 }
 1 package com.lhb.demo;
 2
 3 import org.apache.commons.lang.StringUtils;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.FileSystem;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import java.io.BufferedReader;
15 import java.io.FileReader;
16 import java.io.IOException;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.regex.Matcher;
20 import java.util.regex.Pattern;
21
22 public class Test01 {
23     public static class Test01Mapper extends Mapper<LongWritable, Text, MovieBean1, NullWritable> {
24         Map<Long, Double> rateMap;
25
26         protected void setup(Context context) throws IOException, InterruptedException {
27             rateMap = new HashMap<Long, Double>();
28
29             BufferedReader br = new BufferedReader(new FileReader("求出平均评分的目录"));
30             String line = "";
31             while (StringUtils.isNotBlank((line = br.readLine()))) {
32                 String[] split = line.split("\t");
33                 if (split.length >= 2) {
34                     rateMap.put(Long.valueOf(split[0]), Double.valueOf(split[1]));
35                 }
36             }
37             System.out.println(rateMap);
38         }
39
40         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
41             String[] split = value.toString().split("::");
42             String pattern = "\\(\\d{4}\\)";
43             String line = value.toString();
44             Pattern r = Pattern.compile(pattern);
45             Matcher matcher = r.matcher(line);
46             String s = "";
47             if (matcher.find()) {
48                 s = matcher.group(0);
49                 s = s.replaceAll("\\(", "").replaceAll("\\)", "");
50             }
51             if (split.length >= 3) {
52                 Double avg_score = rateMap.getOrDefault(Long.valueOf(split[0]), 0.0);
53                 MovieBean1 movieBean1 = new MovieBean1();
54                 movieBean1.setMovie_avg_rae(avg_score);
55                 movieBean1.setMovie_name(split[1]);
56                 movieBean1.setMovie_year(Integer.valueOf(s));
57                 movieBean1.setMovie_id(Long.valueOf(split[0]));
58                 context.write(movieBean1, NullWritable.get());
59             }
60         }
61     }
62
63     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
64         Configuration conf = new Configuration();
65         Job job = Job.getInstance(conf);
66         job.setJarByClass(Test01.class);
67         job.setMapperClass(Test01Mapper.class);
68
69         //指定map输出数据的类型
70         job.setMapOutputKeyClass(MovieBean1.class);
71         job.setMapOutputValueClass(NullWritable.class);
72
73         //局部优化
74         job.setPartitionerClass(YearPartitioner.class);
75         //分区
76         job.setNumReduceTasks(20);
77
78         FileInputFormat.setInputPaths(job, new Path("movie的目录"));
79         FileSystem fs = FileSystem.get(conf);
80         Path outPath = new Path("输出目录");
81         if (fs.exists(outPath)) {
82             fs.delete(outPath, true);
83         }
84
85         FileOutputFormat.setOutputPath(job, outPath);
86         boolean b = job.waitForCompletion(true);
87         System.exit(b ? 0 : 1);
88     }
89 }

运行部分结果如下:

数据如下:

链接: https://pan.baidu.com/s/1hc84MTWm5xosl4o_LrGoSw 提取码: z59t

原文地址:https://www.cnblogs.com/hong-bo/p/11448207.html

时间: 2024-07-29 07:28:06

MapReduce案例----影评分析(年份,电影id,电影名字,平均评分)的相关文章

python 爬虫分析30年香港电影

前言 上个礼拜接触爬虫,本身对香港电影比较感兴趣,这2天就去拿豆瓣数据做了份香港近30年电影的分析 正文 数据来源豆瓣 这些路径是有规律 ,设置好循环条件,拿到电影url在进行下一步,在这之前有一部分电影我们过滤掉,典型的就是没有评分的电影,没有评分电影大部分是演出晚会,B级片等电影, 读者有兴趣可以查下,这些电影的数据可用性差我不来拿分析,香港上世纪90-99电影总数2700+,过滤后拿到的1100+.近30年电影初步得到是2500+ 然后分析页面结构,电影名会有多的 ,我是空格分开后取的第一

2-1 尝试对豆瓣上的演员参演电影的电影名和上映日期进行抓取

1 step1_actorsDate.py 2 # -*- coding: utf-8 -*- 3 import requests 4 import pandas as pd 5 import lxml.html 6 import time 7 from pandas import DataFrame 8 9 headers = {"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML,

MapReduce总体架构分析

转自:http://blog.csdn.net/Androidlushangderen/article/details/41051027 继前段时间分析Redis源码一段时间之后,我即将开始接下来的一段技术学习的征程,研究的技术就是当前非常火热的Hadoop,但是一个Hadoop生态圈是非常庞大的,所以首先我的打算是挑选其中的一部分模块,去学习,研究,我就选中了MapReduce.MapReduce最早是由Google公司在04年发布的论文中提出的一种思想,后来被人实现出来,才有了后面的Hado

MapReduce源码分析之JobSubmitter(一)

JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter. 首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 private FileSystem

MapReduce案例运行

从<Hadoop权威指南>选取了一个小案例,在Hadoop集群环境中运行. 1.新建JAVA类,保存书中源代码. [huser@master bin]$ vi URLCat.java import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class URL

MapReduce源码分析之MapTask分析(二)

SpillThread分析 为什么需要Spill 内存大小总是有效,因此在Mapper在处理过程中,数据持续输出到内存中时,必然需要有机制能将内存中的数据换出,合理的刷出到磁盘上.SpillThread就是用来完成这部分工作. SpillThread的线程处理函数只是做一层封装,当索引表中的kvstart和kvend指向一样的索引位置时,会持续处于等待过程,等待外部通知需要触发spill动作,当有spill请求时,会触发StartSpill来唤醒SpillThread线程,进入到sortAndS

一个测试案例的分析

案例: 某软件公司在开发一个城镇居民保险系统时,在单元测试.集成测试阶段,为了追赶进度,开发人员与测试人员都没有介入测试工作. 系统测试阶段,测试小组借助缺陷管理工具和开发人员交互进行测试与缺陷修复工作.期间,发现"扭转文档无法归档"的严重错误,开发人员在修改时,认为难度太大,决定暂停修改,得到测试人员认可.在产品发布前,该问题在开发环境下得到解决. 回归测试结束后,开发人员把开发环境下的产品打包,发送给客户. 分析:在案例中,有几处显然不合理的地方: 1.测试介入太晚 2.回归测试做

MapReduce源码分析之MapTask分析

前言 MapReduce的源码分析是基于Hadoop1.2.1基础上进行的代码分析. 该章节会分析在MapTask端的详细处理流程以及MapOutputCollector是如何处理map之后的collect输出的数据. map端的主要处理流程 图1 MapTask处理流程 图1所示为MapTask的主要代码执行流程,在MapTask启动后会进入入口run函数,根据是否使用新的api来决定选择运行新的mapper还是旧的mapper,最后完成执行向外汇报. 在这,我们选择分析旧的api,也就是ru

机器学习 - 文本分析案例 - 新闻分析

文本分析概念 停用词 语料中大量出现, 无用数据, 如下类似的这种词语 Tf  - 词频统计 TF 的计算方式有很多, 最常见的用 某词文章中出现次数 / 文章总词数 idf  - 逆文档频率 TF - idf   关键词提取 相似度 分词 语料库 词频 词频向量 整体流程 语料清洗 (去掉停用词, 去掉大量重复的非正常用语等) 计算公式 文本分析案例 - 新闻分析 样本数据 数据来源于 搜狗实验室新闻数据   数据需要处理成  pandas 便于读取的数据才可以使用 import pandas