使用map端连接结合分布式缓存机制实现Join算法

前面我们介绍了MapReduce中的Join算法,我们提到了可以通过map端连接或reduce端连接实现join算法,在文章中,我们只给出了reduce端连接的例子,下面我们说说使用map端连接结合分布式缓存机制实现Join算法

1、介绍

我们使用频道类型数据集和机顶盒用户数据集,进行连接,统计出每天、每个频道、每分钟的收视人数

2、数据集

频道类型数据集就是channelType.csv文件,如下示例

机顶盒用户数据集来源于“08.统计电视机顶盒中无效用户数据,并以压缩格式输出有效数据”这个实战项目处理后的结果,数据集如下所示

3、分析

基于项目的需求,我们通过以下几步完成:

1、编写Mapper类,连接用户数据和频道类型数据,按需求将数据解析为key=频道类别+日期+每分钟,value=机顶盒号,然后将结果输出。

2、编写Combiner类,先将Mapper输出结果合并一次,然后输出给Reducer。

3、编写Reducer类,统计出收视率,然后使用MultipleOutputs类将每分钟的收视率,按天输出到不同文件路径下

4、编写驱动方法 run,运行MapReduce程序

4、实现

1、编写Mapper、Reducer

package com.buaa;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @ProjectName CountViewers
* @PackageName com.buaa
* @ClassName CountViews
* @Description 通过map端连接,最后统计出 每天 每个类别 每分钟的收视人数 并按天分别输出不同的文件下
* @Author 刘吉超
* @Date 2016-06-01 16:12:08
*/
@SuppressWarnings("deprecation")
public class CountViews extends Configured implements Tool {
	/*
	 * 解析tv用户数据
	 */
	public static class ViewsMapper extends Mapper<LongWritable, Text, Text, MapWritable> {
		// 定义全局 Hashtable 对象
		private Hashtable<String, String> table = new Hashtable<String, String>();

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			// 返回本地文件路径
            Path[] localPaths = (Path[]) context.getLocalCacheFiles();
            if (localPaths.length == 0) {
                throw new FileNotFoundException("Distributed cache file not found.");
            }  

            // 获取本地 FileSystem实例
            FileSystem fs = FileSystem.getLocal(context.getConfiguration());
            // 打开输入流
            FSDataInputStream in = fs.open(new Path(localPaths[0].toString()));  

			// 创建BufferedReader读取器
			BufferedReader br = new BufferedReader(new InputStreamReader(in));

			String infoAddr = null;
			// 按行读取文件
			while (null != (infoAddr = br.readLine())) {
				// 将每行数据解析成数组 records
				String[] records = infoAddr.split("\t");
				/*
				 * records[0]为频道名称,records[1]为频道类别
				 * 世界地理	4
				 */
				table.put(records[0], records[1]);
			}
		}

		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		    /*
		     * 数据格式:机顶盒 + "@" + 日期 + "@" + 频道名称 + "@" + 开始时间+ "@" + 结束时间
		     * [email protected]@CCTV-1 综合@02:21:[email protected]:21:06
		     */
			String[] records = value.toString().split("@");
			// 机顶盒
			String stbNum = records[0];
			// 日期
			String date = records[1];
			// 频道名称
			String sn = records[2];
			// 开始时间
			String s = records[3];
			// 结束时间
			String e = records[4];

			// 如果开始时间或结束时间为空,直接返回
			if(StringUtils.isEmpty(s) || StringUtils.isEmpty(e)){
				return ;
			}

			// 按每条记录的起始时间、结束时间 计算出分钟列表List
			List<String> list = ParseTime.getTimeSplit(s, e);

			if(list == null){
				return;
			}

			// 频道类别
			String channelType = StringUtils.defaultString(table.get(sn),"0");

			// 循环所有分钟,拆分数据记录并输出
			for (String min : list) {
				MapWritable avgnumMap = new MapWritable();
				avgnumMap.put(new Text(stbNum), new Text());

				/*
			     * [email protected]@02:59
			     */
				context.write(new Text(channelType + "@" + date+ "@" + min), avgnumMap);
			}
		}
	}

	/*
	 * 定义Combiner,合并 Mapper 输出结果
	 */
	public static class ViewsCombiner extends Reducer<Text, MapWritable, Text, MapWritable> {
		protected void reduce(Text key, Iterable<MapWritable> values,Context context) throws IOException, InterruptedException {
			MapWritable avgnumMap = new MapWritable();
			for (MapWritable val : values) {
				// 合并相同的机顶盒号
				avgnumMap.putAll(val);
			}
			context.write(key, avgnumMap);
		}
	}

	/*
	 * 统计每个频道类别,每分钟的收视人数,然后按日期输出到不同文件路径下
	 */
	public static class ViewsReduce extends Reducer<Text, MapWritable, Text, Text> {
		// 声明多路径输出对象
		private MultipleOutputs<Text, Text> mos;

		protected void setup(Context context) throws IOException,InterruptedException {
			mos = new MultipleOutputs<Text, Text>(context);
		}

		protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
			// 数据格式:key=channelType+date+min  value=map(stbNum)
			String[] kv = key.toString().split("@");
			// 频道类别
			String channelType = kv[0];
			// 日期
			String date = kv[1];
			// 分钟
			String min = kv[2];

			MapWritable avgnumMap = new MapWritable();
			for (MapWritable m : values) {
				avgnumMap.putAll(m);
			}

			// 按日期将数据输出到不同文件路径下
			mos.write(new Text(channelType), new Text(min + "\t" + avgnumMap.size()), date.replaceAll("-", ""));
		}

		protected void cleanup(Context context) throws IOException, InterruptedException {
			mos.close();
		}
	}

	@Override
	public int run(String[] arg) throws Exception {
		// 读取配置文件
		Configuration conf = new Configuration();

		// 判断路径是否存在,如果存在,则删除
		Path mypath = new Path(arg[1]);
		FileSystem hdfs = mypath.getFileSystem(conf);
		if (hdfs.isDirectory(mypath)) {
			hdfs.delete(mypath, true);
		}

		Job job = Job.getInstance(conf,"CountViews");
		// 设置主类
		job.setJarByClass(CountViews.class);

		// 输入路径
		FileInputFormat.addInputPaths(job, arg[0]+"20120917,"+arg[0]+"20120918,"+arg[0]+
				"20120919,"+arg[0]+"20120920,"+arg[0]+"20120921,"+arg[0]+"20120922,"+arg[0]+"20120923");
		// 输出路径
		FileOutputFormat.setOutputPath(job, new Path(arg[1]));
		// 去part-r-00000空文件
		LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  

		// Mapper
		job.setMapperClass(ViewsMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(MapWritable.class);

		// 设置Combiner
		job.setCombinerClass(ViewsCombiner.class);

		// Reducer
		job.setReducerClass(ViewsReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// 指定分布式缓存文件
		job.addCacheFile(new URI(arg[2]));

		//提交任务
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		String[] arg = {
				"hdfs://hadoop1:9000/buaa/tv/out/",
				"hdfs://hadoop1:9000/buaa/ctype/",
				"hdfs://hadoop1:9000/buaa/channel/channelType.csv"
			};
		int ec = ToolRunner.run(new Configuration(), new CountViews(), arg);
		System.exit(ec);
	}
}

2、提取开始时间~结束时间之间的分钟数

package com.buaa;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;

/**
* @ProjectName CountViewers
* @PackageName com.buaa
* @ClassName ParseTime
* @Description TODO
* @Author 刘吉超
* @Date 2016-06-01 16:11:10
*/
public class ParseTime {
	/**
	 * 提取start~end之间的分钟数
	 *
	 * @param start
	 * @param end
	 * @return List
	 */
	public static List<String> getTimeSplit(String start, String end) {
		List<String> list = new ArrayList<String>();
		// SimpleDateFormat
		SimpleDateFormat formatDate = new SimpleDateFormat("HH:mm");
		SimpleDateFormat parseDate = new SimpleDateFormat("HH:mm:ss");

		/*
		 * 开始时间格式:02:21:03
		 */
		Calendar startCalendar = Calendar.getInstance();

		/*
		 * 结束时间格式:02:21:06
		 */
		Calendar endCalendar = Calendar.getInstance();

		try {
			startCalendar.setTime(parseDate.parse(start));
			endCalendar.setTime(parseDate.parse(end));
		} catch (ParseException e1) {
			return null;
		}

		while (startCalendar.compareTo(endCalendar) <= 0) {
			list.add(formatDate.format(startCalendar.getTime()));
			startCalendar.add(Calendar.MINUTE, 1);
		}

		return list;
	}

	public static void main(String[] args) {
		String start = "12:59:24";
		String end = "13:03:45";
		List<String> list1  = getTimeSplit(start, end);
		for(String st1 : list1){
			System.out.println(st1);
		}
	}
}

5、运行结果

如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

时间: 2024-11-03 03:24:18

使用map端连接结合分布式缓存机制实现Join算法的相关文章

9.3.1 map端连接- DistributedCache分布式缓存小数据集

1.1.1         map端连接- DistributedCache分布式缓存小数据集 当一个数据集非常小时,可以将小数据集发送到每个节点,节点缓存到内存中,这个数据集称为边数据.用map函数将小数据集中的数据按键聚合到大的数据集中,输出连接数据集,进行连接操作. (1)   分布式缓存指定缓存文件 执行命令行时,采用hadoop  jar hadoop-example.jar MapSideJoinMain  -files input/cityfile/tb_dim_city.dat

分布式缓存的一致性hash算法

基本场景 比如你有 N 个 cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N 个 cache 上呢,你很可能会采用类似下面的通用方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache : 常规取余的hash算法 hash(key) % N 对于N台缓存服务器构成的集群缓存,依次编号为0 - (N-1)先对要存储的key进行hash取值,然后用hash值对N取余,得到一个在缓存服务器编号区间的一个数字,则将当前key存到这台服务器

[.NET领域驱动设计实战系列]专题八:DDD案例:网上书店分布式消息队列和分布式缓存的实现

一.引言 在上一专题中,商家发货和用户确认收货功能引入了消息队列来实现的,引入消息队列的好处可以保证消息的顺序处理,并且具有良好的可扩展性.但是上一专题消息队列是基于内存中队列对象来实现,这样实现有一个弊端,就是一旦服务重启或出现故障时,此时消息队列中的消息会丢失,并且也记录不了日志.所以就会出现,商家发货成功后,用户并没有收到邮件通知,并且也没有日志让我们发现是否发送了邮件通知.为了解决这个问题,就需要引入一种可恢复的消息队列.目前有很多开源的消息队列都支持可恢复的,例如TibcoEms.ne

缓存:本地缓存和分布式缓存及缓存过期时间设置

1.首先对于本地内存缓存,就是把数据缓存在本机的内存中,如下图1所示: 2. 分布式缓存机制:可能存在跨进程,跨域访问缓存数据 对于分布式的缓存,此时因为缓存的数据是放在缓存服务器中的,或者说,此时应用程序需要跨进程的去访问分布式缓存服务器,如图2: 当我们在应用中使用跨进程的缓存机制,例如分布式缓存memcached或者微软的AppFabric,此时数据被缓存在应用程序之外的进程中.每次,当我们要把一些数据缓存起来的时候,缓存的API就会把数据首先序列化为字节的形式,然后把这些字节发送给缓存服

大型网站技术架构,6网站的伸缩性架构之分布式缓存集群的伸缩性设计

和所有服务器都部署相同应用的应用服务器集群不同,分布式缓存服务器集群中不同的服务器中缓存的数据各不相同,缓存访问请求不可以在缓存服务器集群中的任意一台处理,必须先找到缓存有需要数据的服务器,然后才能访问. 这个特点制约了分布式缓存集群的伸缩性设计,因为新上线的缓存服务器没有缓存任何数据,而已下线的缓存服务器还缓存这网站的许多热点数据. 必须让新上线的缓存服务器对整个分布式缓存集群影响最小,也就是说新加入的缓存服务器应使整个缓存服务器集群中已经缓存的数据尽可能还被访问到,这是分布式缓存集群伸缩性设

简单的Map缓存机制实现

简单的Map缓存机制实现 大致思路是用一个单例的Map实现,当然此Map得是线程安全的--ConcurrentHashMap 原本项目需求是缓存十条消息,所以打算用Map实现缓存机制.中途夭折下面具体尚未实现... 当然此代码仞为半成品,具体得根据项目需求采用不同的原则清除缓存 package per.zww.util; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class CacheP

分布式理论(4):Leases 一种解决分布式缓存一致性的高效容错机制(转)

作者:Cary G.Gray and David R. Cheriton 1989 译者:[email protected] 2011-5-7 出处:http://duanple.blog.163.com/blog/static/70971767201141111440789/ [ 序:所谓租约(leases),其实就是一个合同,即服务端给予客户端在一定期限内可以控制修改操作的权力.如果服务端要修改数据,首先要征求拥有这块数据的租约的客户端的同意,之后才可以修改.客户端从服务端读取数据时往往就同

java缓存机制(上) map和spring注解@Cacheable

借鉴于   https://www.cnblogs.com/ms-grf/p/7249220.html 缓存的目的在于节省访问时间以及减轻大并发量访问带来资源上的消耗. 一.外存 除计算机内存和CPU缓存以外的存储器,如常见的C.D.E.F盘,还有U盘,软盘,硬盘,光盘之类.断电后仍能保存数据的完整性. 二.内存 用于与CPU沟通.计算机中所有程序的进行都是在内存中进行,CPU中所有的运算数据以及和外存之间交换的数据都存储在其中.数据断电不保存. 三.高速缓冲 一般情况下,CPU的处理数据速度非

第八章 企业项目开发--分布式缓存memcached

注意:本节代码基于<第七章 企业项目开发--本地缓存guava cache> 1.本地缓存的问题 本地缓存速度一开始高于分布式缓存,但是随着其缓存数量的增加,所占内存越来越大,系统运行内存越来越小,最后系统会被拖慢(这一点与第二点联系起来) 本地缓存存于本机,其缓存数量与大小受本机内存大小限制 本地缓存存于本机,其他机器的访问不到这样的缓存 解决方案:分布式缓存 Jboss cache:缓存还存于本机,但是会同步更新到其他机器(解决了第三个问题,解决不了第一和第二个问题),如果缓存机器数量很多