本文聚焦 Apache Spark 入门,了解其在大数据领域的地位,覆盖 Apache Spark 的安装及应用程序的建立,并解释一些常见的行为和操作。
一、 为什么要使用 Apache Spark
时下,我们正处在一个“大数据”的时代,每时每刻,都有各种类型的数据被生产。而在此紫外,数据增幅的速度也在显著增加。从广义上看,这些数据包含交易数据、社交媒体内容(比如文本、图像和视频)以及传感器数据。那么,为什么要在这些内容上投入如此多精力,其原因无非就是从海量数据中提取洞见可以对生活和生产实践进行很好的指导。
在几年前,只有少部分公司拥有足够的技术力量和资金去储存和挖掘大量数据,并对其挖掘从而获得洞见。然而,被雅虎 2009 年开源的 Apache Hadoop 对这一状况产生了颠覆性的冲击——通过使用商用服务器组成的集群大幅度地降低了海量数据处理的门槛。因此,许多行业(比如 Health care、Infrastructure、Finance、Insurance、Telematics、Consumer、Retail、Marketing、E-commerce、Media、 Manufacturing 和 Entertainment)开始了 Hadoop 的征程,走上了海量数据提取价值的道路。着眼 Hadoop ,其主要提供了两个方面的功能:
- 通过水平扩展商用主机,HDFS提供了一个廉价的方式对海量数据进行容错存储。
- MapReduce 计算范例,提供了一个简单的编程模型来挖掘数据并获得洞见。
下图展示了 MapReduce 的数据处理流程,其中一个 Map-Reduce step 的输出将作为下一个典型 Hadoop job 的输入结果。
在整个过程中,中间结果会借助磁盘传递,因此对比计算,大量的 Map-Reduced 作业都受限于 IO 。然而对于 ETL 、数据整合和清理这样的用例来说,IO 约束并不会产生很大的影响,因为这些场景对数据处理时间往往不会有较高的需求。然而,在现实世界中,同样存在许多对延时要求较为苛刻的用例,比如:
- 对流数据进行处理来做近实时分析。举个例子,通过分析点击流数据做视频推荐,从而提高用户的参与度。在这个用例中,开发者必须在精度和延时之间做平衡。
- 在大型数据集上进行交互式分析,数据科学家可以在数据集上做 ad-hoc 查询。
毫无疑问,历经数年发展,Hadoop 生态圈中的丰富工具已深受用户喜爱,然而这里仍然存在众多问题给使用带来了挑战:
- 每个用例都需要多个不同的技术堆栈来支撑,在不同使用场景下,大量的解决方案往往捉襟见肘。
- 在生产环境中机构往往需要精通数门技术。
- 许多技术存在版本兼容性问题。
- 无法在并行 job 中更快地共享数据。
而通过 Apache Spark,上述问题迎刃而解!Apache Spark 是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例,如下图。
二、 关于 Apache Spark
Apache Spark 是个开源和兼容 Hadoop 的集群计算平台。由加州大学伯克利分校的 AMPLabs 开发,作为 Berkeley Data Analytics Stack(BDAS) 的一部分,当下由大数据公司 Databricks 保驾护航,更是 Apache 旗下的顶级项目,下图显示了 Apache Spark 堆栈中的不同组件。
Apache Spark 的5大优势:
- 更高的性能,因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。很多对 Spark 感兴趣的朋友可能也会听过这样一句话——在数据全部加载到内存的情况下, Spark 可以比 Hadoop 快 100 倍,在内存不够存放所有数据的情况下快 Hadoop 10 倍。
- 通过建立在 Java、Scala、Python、SQL (应对交互式查询)的标准 API 以方便各行各业使用,同时还含有大量开箱即用的机器学习库。
- 与现有 Hadoop v1 ( SIMR ) 和 2.x (YARN) 生态兼容,因此机构可以进行无缝迁移。
- 方便下载和安装。方便的 shell(REPL: Read-Eval-Print-Loop)可以对 API 进行交互式的学习。
- 借助高等级的架构提高生产力,从而可以讲精力放到计算上。
同时, Apache Spark 由 Scala 实现,代码非常简洁。
三、安装Apache Spark
下表列出了一些重要链接和先决条件:
如上图所示,Apache Spark 的部署方式包括 standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos 。Apache Spark 需求一定的 Java、Scala 或 Python 知识。这里,我们将专注 standalone 配置下的安装和运行。
- 安装 JDK 1.6+、Scala 2.10+、Python [2.6,3] 和 sbt
- 下载 Apache Spark 1.0.1 Release
- 在指定目录下 Untar 和 Unzip spark-1.0.1.tgz
[email protected]~/Downloads$ pwd /Users/akuntamukkala/Downloads [email protected]~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark
- 运行 sbt 建立 Apache Spark
[email protected]~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 [email protected]~/spark/spark-1.0.1$ sbt/sbt assembly
- 发布 Scala 的 Apache Spark standalone REPL
/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell
如果是 Python
/Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark
- 查看 SparkUI @ http://localhost:4040
四、Apache Spark 的工作模式
Spark 引擎提供了在集群中所有主机上进行分布式内存数据处理的能力,下图显示了一个典型 Spark job 的处理流程。
下图显示了 Apache Spark 如何在集群中执行一个作业。
Master 控制数据如何被分割,利用了数据本地性,并在 Slaves 上跟踪所有分布式计算。在某个Slave不可用时,其存储的数据会分配给其他可用的 Slaves 。虽然当下( 1.0.1 版本) Master 还存在单点故障,但后期必然会被修复。
五、弹性分布式数据集(Resilient Distributed Dataset,RDD)
弹性分布式数据集(RDD,从 Spark 1.3 版本开始已被 DataFrame 替代)是 Apache Spark 的核心理念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation 和 action 。Transformation 是类似在 RDD 上做 filter()、map() 或 union() 以生成另一个 RDD 的操作,而 action 则是 count()、first()、take(n)、collect() 等促发一个计算并返回值到 Master 或者稳定存储系统的操作。Transformations 一般都是 lazy 的,直到 action 执行后才会被执行。Spark Master/Driver 会保存 RDD 上的 Transformations 。这样一来,如果某个 RDD 丢失(也就是 salves 宕掉),它可以快速和便捷地转换到集群中存活的主机上。这也就是 RDD 的弹性所在。
下图展示了 Transformation 的 lazy :
我们可以通过下面示例来理解这个概念:从文本中发现 5 个最常用的 word 。下图显示了一个可能的解决方案。
在上面命令中,我们对文本进行读取并且建立字符串的 RDD 。每个条目代表了文本中的 1 行。
scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”) hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12 scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false) topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14
- 通过上述命令我们可以发现这个操作非常简单——通过简单的 Scala API 来连接 transformations 和 actions 。
- 可能存在某些 words 被 1 个以上空格分隔的情况,导致有些 words 是空字符串,因此需要使用 filter(!_.isEmpty) 将它们过滤掉。
- 每个 word 都被映射成一个键值对:map(word=>(word,1))。
- 为了合计所有计数,这里需要调用一个 reduce 步骤—— reduceByKey(+) 。 + 可以非常便捷地为每个 key 赋值。
- 我们得到了 words 以及各自的 counts,下一步需要做的是根据 counts 排序。在 Apache Spark ,用户只能根据 key 排序,而不是值。因此,这里需要使用 map{case (word, count) => (count, word)} 将 (word, count) 流转到 (count, word)。
- 需要计算最常用的 5 个 words ,因此需要使用 sortByKey(false) 做一个计数的递减排序。
上述命令包含了一个 .take(5) (an action operation, which triggers computation) 和在 /Users/akuntamukkala/temp/gutenburg.txt 文本中输出 10 个最常用的 words 。在 Python shell 中用户可以实现同样的功能。
RDD lineage 可以通过 toDebugString (一个值得记住的操作)来跟踪。
scala> topWordCount.take(5).foreach(x=>println(x)) (1044,the) (730,and) (679,of) (648,to) (511,I)
常用的 Transformations:
常见集合操作
更多 transformations 信息,请查看 http://spark.apache.org/docs/latest/programming-guide.html#transformations
常用的 actions
更多 actions 参见 http://spark.apache.org/docs/latest/programming-guide.html#actions
六、RDD持久性
Apache Spark 中一个主要的能力就是在集群内存中持久化/缓存 RDD 。这将显著地提升交互速度。下表显示了 Spark 中各种选项。
上面的存储等级可以通过 RDD. cache() 操作上的 persist () 操作访问,可以方便地指定 MEMORY_ONLY 选项。关于持久化等级的更多信息,可以访问这里 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。
Spark 使用 Least Recently Used (LRU) 算法来移除缓存中旧的、不常用的 RDD ,从而释放出更多可用内存。同样还提供了一个 unpersist() 操作来强制移除缓存/持久化的 RDD 。
七、变量共享
Accumulators。Spark 提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题—— Accumulators 。Accumulators 在一个 Spark context 中通过默认值初始化,这些计数器在 Slaves 节点上可用,但是 Slaves 节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到 Master 。 Master 是唯一可以读取和计算所有更新合集的节点。举个例子:
[email protected]~/temp$ cat output.log error warning info trace error info info scala> val nErrors=sc.accumulator(0.0) scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”) scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1) scala> nErrors.value Result:Int = 2
Broadcast Variables。实际生产中,通过指定 key 在 RDDs 上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给 slave nodes 发送大体积数据集的情况,让其负责托管需要做 join 的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络 IO 比内存访问速度慢 100 倍。为了解决这个问题,Spark 提供了 Broadcast Variables,如其名称一样,它会向 slave nodes 进行广播。因此,节点上的 RDD 操作可以快速访问 Broadcast Variables 值。举个例子,期望计算一个文件中所有路线项的运输成本。通过一个 look-up table指定每种运输类型的成本,这个look-up table 就可以作为 Broadcast Variables 。
[email protected]~/temp$ cat packagesToShip.txt ground express media priority priority ground express media scala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect().toMap map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10) scala> val bcMailRates = sc.broadcast(map)
上述命令中,我们建立了一个 broadcast variable,基于服务类别成本的 map 。
scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)
在上述命令中,我们通过 broadcast variable 的 mailing rates 来计算运输成本。
scala> pts.map(shipType=>(shipType,1)).reduceByKey(+). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()
通过上述命令,我们使用 accumulator 来累加所有运输的成本。详细信息可通过下面的 PDF 查看 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。
八、Spark SQL
通过 Spark Engine,Spark SQL 提供了一个便捷的途径来进行交互式分析,使用一个被称为 SchemaRDD 类型的 RDD 。SchemaRDD 可以通过已有 RDDs 建立,或者其他外部数据格式,比如 Parquet files、JSON 数据,或者在 Hive 上运行 HQL。SchemaRDD 非常类似于 RDBMS 中的表格。一旦数据被导入 SchemaRDD,Spark 引擎就可以对它进行批或流处理。Spark SQL 提供了两种类型的 Contexts——SQLContext 和 HiveContext,扩展了 SparkContext 的功能。
SparkContext 提供了到简单 SQL parser 的访问,而 HiveContext 则提供了到 HiveQL parser 的访问。HiveContext 允许企业利用已有的 Hive 基础设施。
这里看一个简单的 SQLContext 示例。
下面文本中的用户数据通过 “ | ” 来分割。
John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854
定义 Scala case class 来表示每一行:
case class Customer(name:String,age:Int,gender:String,address: String)
下面的代码片段体现了如何使用 SparkContext 来建立 SQLContext ,读取输入文件,将每一行都转换成 SparkContext 中的一条记录,并通过简单的 SQL 语句来查询 30 岁以下的男性用户。
val sparkConf = new SparkConf().setAppName(“Customers”) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’)) val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”) sqlContext.sql(“select * from customers where gender=’M’ and age < 30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris, TX,75461]
更多使用 SQL 和 HiveQL 的示例请访问下面链接 https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。
九、Spark Streaming
Spark Streaming 提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了 Spark 的简易编程模型。从真正意义上讲,Spark Streaming 会将流数据转换成 micro batches,从而将 Spark 批处理编程模型应用到流用例中。这种统一的编程模型让 Spark 可以很好地整合批量处理和交互式流分析。下图显示了 Spark Streaming 可以从不同数据源中读取数据进行分析。
Spark Streaming 中的核心抽象是 Discretized Stream(DStream)。DStream 由一组 RDD 组成,每个 RDD 都包含了规定时间(可配置)流入的数据。上图很好地展示了 Spark Streaming 如何通过将流入数据转换成一系列的 RDDs,再转换成 DStream 。每个 RDD 都包含两秒(设定的区间长度)的数据。在 Spark Streaming 中,最小长度可以设置为 0.5 秒,因此处理延时可以达到 1 秒以下。
Spark Streaming 同样提供了 window operators ,它有助于更有效率在一组 RDD ( a rolling window of time)上进行计算。同时,DStream 还提供了一个 API ,其操作符(transformations 和 output operators)可以帮助用户直接操作 RDD 。下面不妨看向包含在 Spark Streaming 下载中的一个简单示例。示例是在 Twitter 流中找出趋势 hashtags ,详见下面代码。
spark- 1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”) val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters)
上述代码用于建立 Spark Streaming Context 。Spark Streaming 将在 DStream 中建立一个 RDD ,包含了每 2 秒流入的 tweets 。
val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))
上述代码片段将 Tweet 转换成一组 words ,并过滤出所有以 a# 开头的。
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))
上述代码展示了如何整合计算 60 秒内一个 hashtag 流入的总次数。
topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println(“\nPopular topics in last 60 seconds (%s total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s tweets)”.format(tag, count))} })
上面代码将找出 top 10 趋势 tweets ,然后将其打印。
ssc.start()
上述代码让 Spark Streaming Context 开始检索 tweets 。一起聚焦一些常用操作,假设我们正在从一个 socket 中读入流文本。
al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)
更多 operators 请访问 http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations
Spark Streaming 拥有大量强大的 output operators ,比如上文提到的 foreachRDD(),了解更多可访问 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations。
十、附加学习资源
- Wikipedia article (good): http://en.wikipedia.org/wiki/Apache_Spark
- Launching a Spark cluster on EC2: http://ampcamp.berkeley.edu/exercises-strata-conf-2013/launching-a-cluster.html
- Quick start: https://spark.apache.org/docs/1.0.1/quick-start.html
- The Spark platform provides MLLib(machine learning) and GraphX(graph algorithms). The following links provide more information:https://spark.apache.org/docs/latest/mllib-guide.html、https://spark.apache.org/docs/1.0.1/graphx-programming-guide.html、https://dzone.com/refcardz/apache-spark
- Apache Spark在线编程练习 http://www.hubwiz.com/course/5449c691e564e50960f1b7a9/
原文链接:Apache Spark:An Engine for Large-Scale Data Processing