常见计算框架算子层对比

背景

前段时间在为内部自研的计算框架设计算子层,参考对比了一些开源的计算框架的算子层,本文做一个粗粒度的梳理。

下面这张图是我对计算框架抽象层次的一个拆分,具体可以参考上周日杭州Spark meetup上我做的Spark SQL分享 slides

Pig-latin

Hadoop MR上的DSL,面向过程,适用于large-scale的数据分析。语法很美,可惜只适合CLI 。

A = load ‘xx‘ AS (c1:int, c2:chararray, c3:float)
B = GROUP A BY c1
C = FOREACH B GENERATE group, COUNT(A)
C = FOREACH B GENERATE $0. $1.c2

X = COGROUP A by a1, B BY b1
Y = JOIN A by a1 (LEFT|FULL|LEFT OUTER), B BY b1

Cascading

Hadoop MR上的封装,Twitter Summingbird正是基于Cascading的。 每个算子都是new出来的,Pipe实例被"迭代式"地传入新的算子里 。

// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );
Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

// the ‘head‘ of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );

// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) );

// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

// initialize app properties, tell Hadoop which jar file to use
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );

// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new HadoopFlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

// execute the flow, block until complete
flow.complete();

Trident

在Storm上提供高级的抽象原语,延续Transactional Topology的exactly-once的语义,满足事务性。 原语过于抽象,构造过程充斥重复性的字段定义。

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
       .groupBy(new Fields("follower"))
       .aggregate(new One(), new Fields("one"))
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

RDD

Spark上的分布式弹性数据集,具备丰富的原语。 RDD原语的灵活性归功于Scala语言本身的FP性质以及语法糖,而其丰富性又源自Scala语言本身API的丰富性。Java难以实现如此强大的表达能力。但RDD确实是非常有参考价值的。

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

SchemaRDD

Spark SQL里的"Table"型RDD,额外为SQL提供了一套DSL。 但是这套DSL只适合SQL,表达能力不够,偏"垂直"。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD

// Define the schema using a case class.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// DSL: where(), select(), as(), join(), limit(), groupBy(), orderBy() etc.
val teenagers = people.where(‘age >= 10).where(‘age <= 19).select(‘name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

Apache Crunch

Google FlumeJava论文的开源实现,是一个标准的算子层,现在支持Hadoop任务和Spark任务。
Crunch符合FlumeJava的设定,实现了PCollection和PTable这样的分布式、不可变数据表示集,实现了parallelDo(),groupByKey(),combineValues(),flattern()四种基本原语,且基于此原语可以衍生出count(),join(),top()。也实现了Deffered Evalution 以及 针对MSCR(MapShuffleCombineReduce) Operation的优化。
Crunch的任务编写严重依赖Hadoop,其本质是为了在批量计算框架上写MapReduce Pipeline。原语方面不够丰富,且parallelDo()不太适合流式语境。此外,其很多特性和功能是我们不需要具备的,但是抽象数据表示、接口模型、流程控制是可以参考的。

public class WordCount extends Configured implements Tool, Serializable {
  public int run(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings()); // Indicates the serialization format

    PTable<String, Long> counts = words.count();
    // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, args[1]);
    // Execute the pipeline as a MapReduce.
    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    int result = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(result);
  }
}

总结

最后这张图展示了Hadoop之上各种Data Pipeline项目的实现层次对比:

全文完 :)

时间: 2024-10-22 04:20:10

常见计算框架算子层对比的相关文章

谈谈分布式计算的算子层

本文是我对分布式计算的算子这层的一些认识和想法.因为最近自己的开发任务也是这方面相关的,公司内部有自研的类流式计算框架需要做一层算子层.我主要分析的是流式系统上实现算子这一点入手,对比现有计算框架和业界正在开展的项目,分析分析这件事的表面和背后深层的含义,以及可想象空间. 趋势 Yahoo! 的Pig on Storm项目,让Pig-latin能够执行在Storm这种流式引擎上,最终使得Pig-latin能够混用于流式计算和批量计算场景下.应该说,无论是Spark,Summingbird,还是P

说说阿里增量计算框架Galaxy :增量计算模型 (二)

背景 在前一篇文章中,介绍到了Galaxy的增量计算性质,其state是框架内部管理的,以及与Storm的简单对比.这篇文章将讲述更多Galaxy增量模型的事情,并介绍这套增量模型之上实现的Galaxy SQL和Galaxy Operator,同时会从增量角度对比Spark Streaming. Galaxy MRM增量与Spark Streaming MRM模型全称为MapReduceMerge,比MapReduce做了一个Merge操作.merge阶段可与state交互,读写某个key的ol

说说阿里增量计算框架Galaxy (一)

背景 Galaxy是阿里数据平台事业部,实时计算组自研的增量计算框架.今年双十一,阿里直播大屏就是Galaxy支持和保障的重要业务之一,相信大家可能看过双十一之后网上一些介绍性的文章了,比如阿里研发实时计算平台 每秒运算量将超千万,不过这篇文章面向非技术人员,最后的比喻也是有点醉.还这篇比较新的 阿里巴巴实时数据公共层助力双11媒体直播.本文我会介绍一些我认为可以公开出来说的galaxy技术上的特点,让技术人员对该计算框架有个更准确的认识. 计算模型 首先明确根本的一点,Galaxy是增量计算模

一文读懂大数据计算框架与平台

1.前言 计算机的基本工作就是处理数据,包括磁盘文件中的数据,通过网络传输的数据流或数据包,数据库中的结构化数据等.随着互联网.物联网等技术得到越来越广泛的应用,数据规模不断增加,TB.PB量级成为常态,对数据的处理已无法由单台计算机完成,而只能由多台机器共同承担计算任务.而在分布式环境中进行大数据处理,除了与存储系统打交道外,还涉及计算任务的分工,计算负荷的分配,计算机之间的数据迁移等工作,并且要考虑计算机或网络发生故障时的数据安全,情况要复杂得多. 举一个简单的例子,假设我们要从销售记录中统

开源图计算框架GraphLab介绍

GraphLab介绍 GraphLab 是由CMU(卡内基梅隆大学)的Select 实验室在2010 年提出的一个基于图像处理模型的开源图计算框架,框架使用C++语言开发实现.该框架是面向机器学习(ML)的流处理并行计算框架,可以运行在多处理机的单机系统.集群或是亚马逊的EC2 等多种环境下.框架的设计目标是,像MapReduce一样高度抽象,可以高效执行与机器学习相关的.具有稀疏的计算依赖特性的迭代性算法,并且保证计算过程中数据的高度一致性和高效的并行计算性能.该框架最初是为处理大规模机器学习

hadoop之魂--mapreduce计算框架,让收集的数据产生价值 (第4篇)

  通过前面的学习,大家已经了解了HDFS文件系统.有了数据,下一步就要分析计算这些数据,产生价值.接下来我们介绍Mapreduce计算框架,学习数据是怎样被利用的. Mapreduce计算框架 如果将Hadoop比做一头大象,那么MapReduce就是那头大象的电脑.MapReduce是Hadoop核心编程模型.在Hadoop中,数据处理核心就是MapReduce程序设计模型. 本章内容: 1) MapReduce编程模型 2) MapReduce执行流程 3) MapReduce数据本地化

志愿计算框架与论坛

志愿计算,是一种利用计算机闲置资源参与公益类分布式计算的方法. 志愿计算的框架: 1 [email protected] [email protected]是一个研究蛋白质折叠,误折,聚合及由此引起的相关疾病的分布式计算工程.蛋白质是一个生物体系的网络基础,它们是一个个纳米级计算机.在蛋白质实现它的生物功能之前,它们会把自己装配起来,或者说是折叠:折叠过程对人类而言仍是未解之谜.当蛋白质没有正确折叠(误折)无疑会产生严重的后果,包括许多知名的疾病,比方阿兹海默症(Alzheimer's),疯牛病

[.NET网格计算框架] Alchemi

Alchemi [.NET网格计算框架] 是 一个以使用简易为目的的Windows下的网格计算框架.它提供了:a)开发网格软件的编程环境 和 b)建造网格和运行网格软件的运行机制.       Alchemi提供了软件合成的弹性.你可以使用强劲的网格线型模式以任何.NET支援的语言写网格软件. 或者把现有的软件以编程或宣布的方式改成网格软件. 建造同一水平网格(捆绑群)只要在一台电脑上安装Alchemi Manager和在每一台网格电脑上安装Alchemi Executor. 这一弹性的模式使E

Android 测试 Appium、Robotium、monkey等框架或者工具对比

1. Appium测试 (功能测试,用户接受度测试,黑盒测试) - Rating: 8 Website: http://appium.io/ Appium测试相当于黑盒测试.只是测试UI逻辑正确性.所以Appium测试框架提供的方法有限.获取一个AppiumDriver对象.该对象只是有很多findElements()的方法,获取到UI元素.UI元素是WebElement,这个类提供的方法基本是获取信息为主,比如获取name,class,tagName,location,text,isSlect