Apache Crunch的设计 (上)

背景

Apache Crunch是FlumeJava的实现,为不太方便直接开发和使用的MapReduce程序,开发一套MR流水线,具备数据表示模型,提供基础原语和高级原语,根据底层执行引擎对MR Job的执行进行优化。从分布式计算角度看,Crunch提供的许多计算原语,可以在Spark、Hive、Pig等地方找到很多相似之处,而本身的数据读写,序列化处理,分组、排序、聚合的实现,类似MapReduce各阶段的拆分都可以在Hadoop里找到影子。

本文介绍Crunch在数据表示模型、操作原语、序列化处理方面的设计和实现类,关于Pipeline的不同种实现以及与hadoop MR、Spark引擎的对接将在之后的文章里介绍。就像之前说的,很多内容可以在hadoop、spark、pig等地方找到相似之处。

阅读Crunch的设计和源码结构,可以更好地理解FlumeJava论文的描述,更好地剖析MapReduce的计算和各阶段组成,熟悉Hadoop MR Job的API等,可以提供很好的实现思路。

参考资料:User Guide 和 源码。

下图为七种Hadoop之上的计算表示层对比:

数据模型和基础类

三种分布式数据集的抽象接口:PCollection,PTable,PGroupedTable

?  PCollection<T>代表分布式、不可变的数据集,提供parallelDo和union方法,触发对每个元素进行DoFn操作,返回新的PCollection<U>

?  PTable<K, V>是PCollection<Pair<K,V>>实现,代表分布式、未排序的multimap。除了继承自PCollection 的parallelDo,还复写了union方法,提供了groupByKey方法。groupByKey方法对应MapReduce job里的排序阶段。在groupByKey操作里,开发者可以在shuffle过程里(参见GroupingOptions类)做细粒度的reducer数目、分区策略、分组策略以及排序策略控制

?  PGroupedTable<K, V>是groupByKey操作的结果,代表分布式、排过序的map,具备迭代器,其实现是PCollection<Pair<K,Iterable<V>>>。除了继承自PCollection的parallelDo、union,提供combineValues方法,允许在shuffle的map端或reduce端使用满足交换律和结合律的聚合算子(参见Aggregator类)作用于PGroupedTable实例的values上

PCollection里的两种基本原语接口:

org.apache.crunch.lib里 面的其他数据转换操作都来自于上面四种原语。

PCollection提供的其他方法:

count(), min(), max(),aggregate(Aggregator)

filter(), cache()

PTable提供的方法:

PObject<T>,同FlumeJava设计,用于存储Java对象,物化过了之后可以使用getValue()方法获得PObject的值。

数据从Source流入,经过pipeline处理,最后从Target输出。

提供了三种pipeline,分别是MRPipeline,MemPipeline,SparkPipeline

DoFn处理数据

hadoop的MapReduce job,通过配置job.xml,set本次job的map和reduce class,反射出具体类。Crunch的做法是使用java的序列化把DoFn序列化(DoFn实现了java.io.Serializable),在pipeline里传输到task上并被调用。使用的时候要注意可序列化(某些情况下使用transient,static等方式),特别是在MRPipeline和SparkPipeline环境下。

DoFn允许访问TaskInputOutputContext(Hadoop里task的一个上下文类)里的内容,且DoFn可以是在map里,也可以在reduce里。在执行的时候,首先触发initialize方法,类似Mapper, Reducer里的setup方法,比如如果使用了第三方非序列化的类,就可以在此处先实例化出来(声明为transient)。然后是process方法,结果被Emitter发射出去,比如传递给下一个DoFn。最后当所有输入被处理后,执行cleanup,一方面可以最后把一些状态传递给下一个stage,另一方面用于释放资源。

DoFn还有一些和hadoop mr比较类似的地方,比如increment,context和configuration的set/get,还有scaleFactor这个设置,用于估计处理完后数据量的大小,可以影响任务执行的优化(比如判断Reduce个数、I/O量)。默认scaleFactor是0.99,子类会复写这个值,在下面会提到。

FilterFn, MapFn, CombineFn子类

常见的DoFn有FilterFn,MapFn,CombineFn,比较方便使用和测试,几个基础抽象类里都有使用到。

DoFn<S, T>的Process方法做实际的执行逻辑,

public abstract void process(S input, Emitter<T> emitter);

Emitter对应输出,子类体系如下

FilterFn<T>继承DoFn<T, T>,需要实现其accept(T input)方法,返回boolean,它的process方法会调用accept来判断是否输出。

  public void process(T input, Emitter<T> emitter) {
    if (accept(input)) {
      emitter.emit(input);
    }
  }

PCollection的Filter方法就是传入一个FilterFn的实现。FilterFn有and,or,not等子类,定义在FilterFns里。scaleFactor为0.5,很好理解。

MapFn<T>继承DoFn<S, T>,需要实现其map(S input)方法,返回T,process方法里调用如下:

  public void process(S input, Emitter<T> emitter) {
    emitter.emit(map(input));
  }

scaleFactor为1.0。

CombineFn继承DoFn<Pair<S, Iterable<T>>, Pair<S, T>>,用于在reduce执行前处理map输出,减少shuffle过程的网络开销,与PGroupedTable里的combineValues()绑定使用。CombineFn常常和Aggregator的实现子类结合使用。

利用PTypes序列化数据

本节内容对应的package为org.apache.crunch.types,都是和类型相关的类。

PType<T>定义了数据的序列化和反序列化方式,在PCollection的parallelDo里面使用,如最简单的:

<T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);

由于PCollection<T>范型的设计,T被类型擦除(type erasure)了,以上的output类型需要和指定的PType相符,类似于这样:

PCollection<String> lines = ...;
lines.parallelDo(new DoFn<String, Integer>() { ... }, Writables.ints());

Crunch设置了两种PTypeFamily,一种是hadoop的writable,另一种是Avro。 Crunch还是比较靠拢Apache Hadoop的MR的(至少在Spark出现之前,也只能为Hadoop MR做pipeline吧)。

PTypeFamily提供一些基础的类型是这样的,

  PType<Void> nulls();
  PType<String> strings();
  PType<Long> longs();
  PType<Integer> ints();
  PType<Float> floats();
  PType<Double> doubles();
  PType<Boolean> booleans();
  PType<ByteBuffer> bytes();

为了适应PTable,PType有另一个子类体系,PTableType<K, V>,继承PType<Pair<K,V>>。

PType,PTableType的Avro、Writable类型的构造、扩展方式就不说明了。

数据读写

大部分数据读写格式是hadoopinputFormat/outputFormat那套,简单介绍下主要类和类型。

本节内容对应的package为org.apache.crunch.io,都是和数据读写相关的类。

Source

Source<T>和TableSource<K, V>代表数据源,分别对应PCollection和PTable。

在Pipelie的read方法里使用。

<T> PCollection<T> read(Source<T> source);
<K, V> PTable<K, V> read(TableSource<K, V> tableSource);

有一个org.apache.crunch.io.From类,定义了一些静态方法,用于读取数据源的时候指定数据格式和类型,比如Writable,然后返回Source或TableSource。

比较常用的Source对应的Input类型如下:

Target

Target的定义和Source类似,主要是在Pipeline的write方法里使用,常用的类型如下:

Target具备一些不同的WriteMode,是个枚举类,如下例子:

PCollection<String> lines = ...;
// The default option is to fail if the output path already exists.
lines.write(At.textFile("/user/crunch/out"), WriteMode.DEFAULT);

// Delete the output path if it already exists.
lines.write(At.textFile("/user/crunch/out"), WriteMode.OVERWRITE);

// Add the output of the given PCollection to the data in the path
// if it already exists.
lines.write(At.textFile("/user/crunch/out"), WriteMode.APPEND);

// Use this directory as a checkpoint location, which requires that this
// be a SourceTarget, not just a Target:
lines.write(At.textFile("/user/crunch/out"), WriteMode.CHECKPOINT);

有一个SourceTarget<T>类比较特殊,同时继承了Source<T>和Target,既可充当输入源,又可充当输出地。

数据的物化

PCollection有一个物化的方法,

  /**
   * Returns a reference to the data set represented by this PCollection that
   * may be used by the client to read the data locally.
   */
  Iterable<S> materialize();

是延迟触发的。

数据处理原语

本节介绍org.apache.crunch.lib包下的数据处理模型类,算是advanced原语。

groupByKey

PTable的三个groupByKey方法控制了数据的shuffle和处理过程,

  PGroupedTable<K, V> groupByKey();
  PGroupedTable<K, V> groupByKey(int numPartitions);
  PGroupedTable<K, V> groupByKey(GroupingOptions options);

第一个是最简单的shuffle,输出的paritition数目会由planner估计数据大小而设置。

第三个方法里的GroupingOptions对groupByKey提供了更多细粒度的控制,包括数据如何分区、如何排序、如何分组。

如果下面执行引擎是hadoop,那么会使用hadoop的Partitiner、RawComparator来做分区和排序。

GroupingOptions是不可变的,通过GroupingOptions.Builder构建出来使用:

GroupingOptions opts = GroupingOptions.builder()
      .groupingComparatorClass(MyGroupingComparator.class)
      .sortComparatorClass(MySortingComparator.class)
      .partitionerClass(MyPartitioner.class)
      .numReducers(N)
      .conf("key", "value")
      .conf("other key", "other value")
      .build();
PTable<String, Long> kv = ...;
PGroupedTable<String, Long> kv.groupByKey(opts);

combineValues

PTable通过groupByKey得到PGroupedTable,它的combineValues可以让planner控制在shuffle的前后对数据做一些聚合函数的处理。

利用Aggregators的静态方法,使用简单聚合函数的实现类:

PTable<String, Double> data = ...;

// Sum the values of the doubles for each key.
PTable<String, Double> sums =
  data.groupByKey().combineValues(Aggregators.SUM_DOUBLES());
// Find the ten largest values for each key.
PTable<String, Double> maxes =
data.groupByKey().combineValues(Aggregators.MAX_DOUBLES(10));

PTable<String, String> text = ...;
// Get a random sample of 100 unique elements for each key.
PTable<String, String> samp =
text.groupByKey().combineValues(Aggregators.SAMPLE_UNIQUE_ELEMENTS(100));

simple aggregations

参考Aggregator的实现类。

Joins

支持inner join, leftouter join, right outer join, full outer join。定义在枚举类JoinType里。由JoinStrategy执行join动作,

  PTable<K, Pair<U,V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType);

JoinStrategy实现类有

Reduce-sideJoins

对应DefaultStrategy类,是hadoop里比较简单和鲁棒的join,来自两处input的处理后的数据都shuffle到同一个reducer上,比较小的那份数据被收集起来,与流进来的比较大的那份数据进行join。

Map-sideJoins

对应MapsideJoinStrategy类,比较小的那份数据需要load到内存里,需要保证比较小的那个table能够被缓存在各task的内存里。

ShardedJoins

对应ShardedJoinStrategy类,允许把相同key的数据,分区到多个reducer上,避免某些reducer上数据量过大,因为很多分布式join会有数据倾斜的问题,导致某些reducer会出现内存不够的情况。

BloomFilter Joins

对应BloomFilterStrategy类,适合左侧table数据量太大,但仍远小于右侧table数据量,且右侧table的大多数key无法匹配左侧table数据的情况。

cogroups

Crunch的cogroup与Pig里的cogroup类似,接受多份PTable,根据相同的key,输出一个个bag。cogroup处理类似join的第一步。

sorting

others

Cartisian、Coalescing、Distinct、Sampling、Set Operations等。

全文完 :)

Apache Crunch的设计 (上)

时间: 2024-10-12 03:28:45

Apache Crunch的设计 (上)的相关文章

分布式日志收集系统Apache Flume的设计详细介绍

问题导读: 1.Flume传输的数据的基本单位是是什么? 2.Event是什么,流向是怎么样的? 3.Source:完成对日志数据的收集,分成什么打入Channel中? 4.Channel的作用是什么? 5.取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器,由谁来完成? 6.Flume支那些数据格式? 7.对于直接读取文件Source,有两种方式,分别是什么? 8.Channel有多种方式有哪些方式? 概述Flume是Cloudera公司的一款高性能.高可能的分布

C#进阶系列——MEF实现设计上的“松耦合”(四):构造函数注入

前言:今天十一长假的第一天,本因出去走走,奈何博主最大的乐趣是假期坐在电脑前看各处堵车,顺便写写博客,有点收获也是好的.关于MEF的知识,之前已经分享过三篇,为什么有今天这篇?是因为昨天分享领域服务的时候,用到MEF的注入有参构造函数的方法,博主好奇心重,打算稍微深挖一下,这篇来对此知识点做个总结. 还是将前面三篇的目录列出来,对MEF没有了解的朋友,可以先看看: C#进阶系列——MEF实现设计上的“松耦合”(一) C#进阶系列——MEF实现设计上的“松耦合”(二) C#进阶系列——MEF实现设

MEF实现设计上的“松耦合”

C#进阶系列——MEF实现设计上的“松耦合”(二) 前言:前篇 C#进阶系列——MEF实现设计上的“松耦合”(一) 介绍了下MEF的基础用法,让我们对MEF有了一个抽象的认识.当然MEF的用法可能不限于此,比如MEF的目录服务.目录筛选.重组部件等高级应用在这里就不做过多讲解,因为博主觉得这些用法只有在某些特定的环境下面才会用到,着实不太普遍,感觉没有钻下去的必要.如果你有兴趣也可以去了解下.这篇打算将MEF和仓储模式结合起来谈谈MEF在项目中的使用. 1.仓储模式:也叫Repository模式

全心全意为人民服务体现在我们软件设计上

我们这里管理是用的今目标平台,这个平台的网页端效果也在慢慢进步.但另我感触最深的是他们对用户需求的挖掘. 这也是我们系统上线后引发的思考:用户是否喜欢你的软件,不是取决于你的软件技术多么牛B,架构多么先进,而是你是否抓住了他们的心. 我们是为某公司做一个员工打分系统,当上线后,发现了下几个问题: 1.用户进来后不知道干嘛 2.打分的选项之间没有区别,很容易眼花 3.打过分和没有打过分的选项也没有区别 4.打完分之后,员工会问你,我打完分了,然后干嘛,这也是我们的失误 现在来看上面的几条,有哪条是

C#进阶系列——MEF实现设计上的“松耦合”(二)

前言:前篇 C#进阶系列——MEF实现设计上的“松耦合”(一) 介绍了下MEF的基础用法,让我们对MEF有了一个抽象的认识.当然MEF的用法可能不限于此,比如MEF的目录服务.目录筛选.重组部件等高级应用在这里就不做过多讲解,因为博主觉得这些用法只有在某些特定的环境下面才会用到,着实不太普遍,感觉没有钻下去的必要.如果你有兴趣也可以去了解下.这篇打算将MEF和仓储模式结合起来谈谈MEF在项目中的使用. 1.仓储模式:也叫Repository模式.Repository是一个独立的层,介于领域层与数

Apache RocketMQ在linux上的常用命令

Apache RocketMQ在linux上的常用命令 进入maven安装后的rocketmq的bin目录  1.启动Name Server  2.启动Broker 3.关闭Name Server 4.关闭Broker 5.查看Name Server日志 6.查看Broker日志 原文地址:https://www.cnblogs.com/stm32stm32/p/9997972.html

面试挂在了 LRU 缓存算法设计上

好吧,有人可能觉得我标题党了,但我想告诉你们的是,前阵子面试确实挂在了 RLU 缓存算法的设计上了.当时做题的时候,自己想的太多了,感觉设计一个 LRU(Least recently used) 缓存算法,不会这么简单啊,于是理解错了题意(我也是服了,还能理解成这样,,,,),自己一波操作写了好多代码,后来卡住了,再去仔细看题,发现自己应该是理解错了,就是这么简单,设计一个 LRU 缓存算法. 不过这时时间就很紧了,按道理如果你真的对这个算法很熟,十分钟就能写出来了,但是,自己虽然理解 LRU

一些设计上的基本常识

http://javatar.iteye.com/blog/706098 最近给团队新人讲了一些设计上的常识,可能会对其它的新人也有些帮助,把暂时想到的几条,先记在这里. API 与 SPI 分离 框架或组件通常有两类客户,一个是使用者,一个是扩展者.API (Application Programming Interface) 是给使用者用的,而 SPI (Service Provide Interface) 是给扩展者用的.在设计时,尽量把它们隔离开,而不要混在一起.也就是说,使用者是看不到

安晓辉:程序员在公司没事干时候,做什么好?(产品上想多一点,设计上想多一点,技术上做深一点做宽一点,思维框架上学多一点)

(一)项目相关 做下面这些事情,可以让你更了解项目和所用技术: 看看项目的需求文档.设计文档,不要局限于你负责那个模块的,看整个项目的. 看看你在项目中用到的技术,自己掌握得如何,能否进一步提高,比如了解原理.阅读源码,重构自己的代码. 看看其他人的代码,尝试理解他的设计和所实现的功能. 看看别人用到的技术点.技术栈,尝试去了解. (二)个人成长 思考下面的问题,可以让你找到更多事情来做: 我个人想在技术上做到什么程度? 这个技术,团队里哪个人用得最好,好在哪里?我该如何做到像他那样? 产品预期