1.1.1 map端连接- DistributedCache分布式缓存小数据集
当一个数据集非常小时,可以将小数据集发送到每个节点,节点缓存到内存中,这个数据集称为边数据。用map函数将小数据集中的数据按键聚合到大的数据集中,输出连接数据集,进行连接操作。
(1) 分布式缓存指定缓存文件
执行命令行时,采用hadoop jar hadoop-example.jar MapSideJoinMain -files input/cityfile/tb_dim_city.dat input/data/all output
-files input/cityfile/tb_dim_city.dat指定需要缓存的文件,会被复制到各个节任务点。
(2)指定缓存文件的三种类型
Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中,任务运行前,节点管理器从分布式文件系统中复制文件到本地。
1) -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。
2) -archives 选项向自己的任务中复制存档(压缩)文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。
3) -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。
(3)缓存文件删除机制
节点管理器为缓存中的文件各维护一个计数器,任务运行时,文件计数器加1,任务完成后,计数器减1,计数器为0时才能删除文件,当节点缓存容量大于一定值(yarn.nodemanger.localizer.cache.target-size-mb设置,默认10GB),才会删除最近最少使用的文件。
(4)Job的分布式缓存API
除了可以用命令行参数指定缓存文件外,还以通过Job的API指定缓存文件;即通过job对象调用下面的函数设置缓存文件。
//以下两组方法将文件或存档添加到分布式缓存
public void addCacheFile(URI uri);
public void addCacheArchive(URI uri);
//以下两组方法将一次性向分布式缓存中添加一组文件或存档
public void setCacheFiles(URI[] files);
public void setCacheArchives(URI[] archives);
//以下两组方法将文件或存档添加到 MapReduce 任务的类路径
public void addFileToClassPath(Path file);
public void addArchiveToClassPath(Path archive);
public void createSymlink();
(6)DistributedCache缓存小数据集实现hadoop map端连接实例
下面的实例是将城市名称的数据集和用户信息的数据集进行连接,城市名称的数据集很小,而用户信息的数据集很大,所以可以采用缓存文件的方式,将城市信息数据集发送到任务,map任务通过setup方法从缓存中读取小数据集文件tb_dim_city.dat,在内存中形成map映射,map函数处理用户信息数据,根据用户信息中的城市id去map映射中找到城市名称,然后合并输出。
package Temperature; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * 用途说明: * Map side join中的left outer join * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show), * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|": * id name orderid city_code is_show * 0 其他 9999 9999 0 * 1 长春 1 901 1 * 2 吉林 2 902 1 * 3 四平 3 903 1 * 4 松原 4 904 1 * 5 通化 5 905 1 * 6 辽源 6 906 1 * 7 白城 7 907 1 * 8 白山 8 908 1 * 9 延吉 9 909 1 * -------------------------风骚的分割线------------------------------- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) * tb_user_profiles.dat文件内容,分隔符为"|": * userID network flow cityID * 1 2G 123 1 * 2 3G 333 2 * 3 3G 555 1 * 4 2G 777 3 * 5 3G 666 4 * -------------------------风骚的分割线------------------------------- * 结果: * 1 长春 1 901 1 1 2G 123 * 1 长春 1 901 1 3 3G 555 * 2 吉林 2 902 1 2 3G 333 * 3 四平 3 903 1 4 2G 777 * 4 松原 4 904 1 5 3G 666 */ public class MapSideJoinMain extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); public static class LeftOutJoinMapper extends Mapper { private HashMap city_info = new HashMap<String,String>(); private Text outPutKey = new Text(); private Text outPutValue = new Text(); private String mapInputStr = null; private String mapInputSpit[] = null; private String city_secondPart = null; /** * 此方法在每个task开始之前执行,这里主要用作从DistributedCache * 中取到tb_dim_city文件,并将里边记录取出放到内存中。 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; //获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String cityInfo = null; for(Path p : distributePaths){ if(p.toString().endsWith("tb_dim_city.dat")){ //读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while(null!=(cityInfo=br.readLine())){ String[] cityPart = cityInfo.split("\\|",5); if(cityPart.length ==5){ city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]); } } } } } /** * Map端的实现相当简单,直接判断tb_user_profiles.dat中的 * cityID是否存在我的map中就ok了,这样就可以实现Map Join了 */ protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //排掉空行 if(value == null || value.toString().equals("")){ return; } mapInputStr = value.toString(); mapInputSpit = mapInputStr.split("\\|",4); //过滤非法记录 if(mapInputSpit.length != 4){ return; } //判断链接字段是否在map中存在 city_secondPart = (String) city_info.get((Object) mapInputSpit[3]); if(city_secondPart != null){ this.outPutKey.set(mapInputSpit[3]); this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]); context.write(outPutKey, outPutValue); } } } public int run(String[] args) throws Exception { Configuration conf=getConf(); //获得配置文件对象 DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件 Job job=new Job(conf,"MapJoinMR"); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径 job.setJarByClass(MapSideJoinMain.class); job.setMapperClass(LeftOutJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式 //设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); //设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new MapSideJoinMain(),args); System.exit(returnCode); } catch (Exception e) { // TODO Auto-generated catch block logger.error(e.getMessage()); } } }
实例参考文献:
https://www.cnblogs.com/cssdongl/p/6018806.html
自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:
https://www.cnblogs.com/bclshuai/p/11380657.html
原文地址:https://www.cnblogs.com/bclshuai/p/12319471.html