基于案例一节课贯通Spark Streaming流计算框架的运行源码



  1. 在线动态计算分类最热门商品案例回顾与演示
  2. 基于案例贯通Spark Streaming的运行源码
  1. 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机。

    是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark
    SQL作为查询引擎。

    其中链接数据库代码如下:

package
com.dt.spark.com.dt.spark.streaming;

import
java.sql.Connection;

import
java.sql.DriverManager;

import
java.util.LinkedList;

public class
ConnectionPool {

private static
LinkedList<Connection>
connectionQueue;

static
{

try
{

Class.forName("com.mysql.jdbc.Driver");

}
catch
(ClassNotFoundException e) {

e.printStackTrace();

}

}

public synchronized static
Connection
getConnection() {

try
{

if(connectionQueue
==
null) {

connectionQueue
=
new
LinkedList<Connection>();

for(int
i =
0;
i <
5;
i++) {

Connection conn = DriverManager.getConnection(

"jdbc:mysql://Master:3306/sparkstreaming",

"root","123456");

connectionQueue.push(conn);

}

}

}
catch
(Exception e) {

e.printStackTrace();

}

return
connectionQueue.poll();

}

public static void
returnConnection(Connection conn) {

connectionQueue.push(conn);

}

}

操作代码如下:

package
com.dt.spark.com.dt.spark.streaming

import
org.apache.spark.SparkConf

import
org.apache.spark.sql.Row

import
org.apache.spark.sql.hive.HiveContext

import
org.apache.spark.sql.types.{IntegerType,
StringType,
StructField,
StructType}

import
org.apache.spark.streaming.{Seconds,
StreamingContext}

/**

* 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别

* 下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;

*

*
@author
DT大数据梦工厂

* 新浪微博:http://weibo.com/ilovepains/

*   实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用ML、sql、graphx等功能是因为有foreachRDD和Transform

* 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。

*  假设说这里的数据的格式:user item category,例如Rocky Samsung Android

*/

object
OnlineTheTop3ItemForEachCategory2DB {

def
main(args: Array[String]){

/**

* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

* 只有1G的内存)的初学者       *

*/

val
conf =
new
SparkConf()
//创建SparkConf对象

conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")
//设置应用程序的名称,在程序运行的监控界面可以看到名称

//    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

conf.setMaster("local[6]")

//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口

val
ssc =
new
StreamingContext(conf,
Seconds(5))

ssc.checkpoint("/root/resource/checkpoint/")

val
userClickLogsDStream = ssc.socketTextStream("Master",
9999)

val
formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>

(clickLog.split(" ")(2)
+ "_"
+ clickLog.split(" ")(1),
1))

val
categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,

_-_,
Seconds(60),
Seconds(20))

categoryUserClickLogsDStream.foreachRDD { rdd => {

if
(rdd.isEmpty()) {

println("No
data inputted!!!")

}
else
{

val
categoryItemRow = rdd.map(reducedItem => {

val
category = reducedItem._1.split("_")(0)

val
item = reducedItem._1.split("_")(1)

val
click_count = reducedItem._2

Row(category,
item,
click_count)

})

val
structType =
StructType(Array(

StructField("category",
StringType,
true),

StructField("item",
StringType,
true),

StructField("click_count",
IntegerType,
true)

))

val
hiveContext =
new
HiveContext(rdd.context)

val
categoryItemDF = hiveContext.createDataFrame(categoryItemRow,
structType)

categoryItemDF.registerTempTable("categoryItemTable")

val
reseltDataFram = hiveContext.sql("SELECT category,item,click_count
FROM (SELECT category,item,click_count,row_number()" +

" OVER (PARTITION BY category ORDER BY click_count
DESC) rank FROM categoryItemTable) subquery " +

" WHERE rank <= 3")

reseltDataFram.show()

val
resultRowRDD = reseltDataFram.rdd

resultRowRDD.foreachPartition
{ partitionOfRecords => {

if
(partitionOfRecords.isEmpty){

println("This
RDD is not null but partition is null")

}
else
{

// ConnectionPool is a static, lazily initialized
pool of connections

val
connection = ConnectionPool.getConnection()

partitionOfRecords.foreach(record => {

val
sql =
"insert into categorytop3(category,item,client_count) values(‘"
+ record.getAs("category")
+ "‘,‘"
+

record.getAs("item")
+ "‘,"
+ record.getAs("click_count")
+ ")"

val
stmt = connection.createStatement();

stmt.executeUpdate(sql);

})

ConnectionPool.returnConnection(connection)
// return to the pool for future reuse

}

}

}

}

}

}

/**

* 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler

* 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:

*   1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job

*   2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到

*   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker

*   内部会通过ReceivedBlockTracker来管理接受到的元数据信息

* 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD

* 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个

* 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?

*   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;

*   2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;

*

*/

ssc.start()

ssc.awaitTermination()

}

}

*/

def this(

master:
String,

appName:
String,

batchDuration: Duration,

sparkHome:
String
=
null,

jars:
Seq[String]
= Nil,

environment: Map[String,
String] = Map()) = {

this(StreamingContext.createNewSparkContext(master,
appName,
sparkHome,
jars,
environment),

null,
batchDuration)

}

StreamingContext在创建时会通过sparkConf在内部会构建SparkContext,所以StreamingContext是建立在一个SparkContext实例上,从这点也可以说明Spark
Streaming是运行在Spark Core之上的。

def
socketStream[T:
ClassTag](

hostname:
String,

port:
Int,

converter: (InputStream) =>
Iterator[T],

storageLevel: StorageLevel

): ReceiverInputDStream[T]
= {

new
SocketInputDStream[T](this,
hostname,
port,
converter,
storageLevel)

}

SocketStream底层调用的是SocketInputDStream实例,继承自ReceiverInputDStream通过SocketInputDstream然后基于Receiver方法接受数据

private[streaming]

class
SocketInputDStream[T:
ClassTag](

ssc_ : StreamingContext,

host:
String,

port:
Int,

bytesToObjects: InputStream =>
Iterator[T],

storageLevel: StorageLevel

)
extends
ReceiverInputDStream[T](ssc_)
{

def
getReceiver(): Receiver[T]
= {

new
SocketReceiver(host,
port,
bytesToObjects,
storageLevel)

}

}

在ReceiverInputDstream构建的时候会初始化一个ReceiverRateController

override protected[streaming]
val
rateController: Option[RateController]
= {

if
(RateController.isBackPressureEnabled(ssc.conf))
{

Some(new
ReceiverRateController(id,
RateEstimator.create(ssc.conf,
ssc.graph.batchDuration)))

}
else
{

None

}

}

private[streaming]
class
ReceiverRateController(id:
Int,
estimator: RateEstimator)

extends
RateController(id,
estimator) {

override def
publish(rate:
Long):
Unit
=

ssc.scheduler.receiverTracker.sendRateUpdate(id,
rate)

}

在此对其做负载均衡

通过启动一条线程来接受Socket网络数据

/** Create a socket connection
and receive data until receiver is stopped */

def
receive() {

var
socket: Socket =
null

try
{

logInfo("Connecting to "
+ host +
":"
+ port)

socket =
new
Socket(host,
port)

logInfo("Connected to "
+ host +
":"
+ port)

val
iterator = bytesToObjects(socket.getInputStream())

while(!isStopped
&& iterator.hasNext) {

store(iterator.next)

}

if
(!isStopped()) {

restart("Socket data stream had no more data")

}
else
{

logInfo("Stopped receiving")

}

}
catch
{

case
e: java.net.ConnectException =>

restart("Error connecting to "
+ host +
":"
+ port,
e)

case
NonFatal(e) =>

logWarning("Error receiving data",
e)

restart("Error receiving data",
e)

}
finally
{

if
(socket !=
null) {

socket.close()

logInfo("Closed socket to "
+ host +
":"
+ port)

}

}

}

}

ssc.start()

*/

def
start():
Unit
= synchronized {

state
match
{

case
INITIALIZED
=>

startSite.set(DStream.getCreationSite())

StreamingContext.ACTIVATION_LOCK.synchronized {

StreamingContext.assertNoOtherContextIsActive()

try
{

validate()

// Start the streaming scheduler in a new thread,
so that thread local properties

// like call sites and job groups can be reset without affecting those of the

// current thread.

ThreadUtils.runInNewThread("streaming-start")
{

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
"false")

scheduler.start()

}

state
= StreamingContextState.ACTIVE

}
catch
{

调用JobScheduler的Start方法

class
JobScheduler(val
ssc: StreamingContext)
extends
Logging

private val
jobSets: java.util.Map[Time,
JobSet] =
new
ConcurrentHashMap[Time,
JobSet]

//时间和JobSet的对应

private val
numConcurrentJobs
= ssc.conf.getInt("spark.streaming.concurrentJobs",
1)

private val
jobExecutor
=

ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs,
"streaming-job-executor")

private val
jobGenerator
=
new
JobGenerator(this)

//创建一个job的生成器

val
clock
=
jobGenerator.clock

//产生job的时间源

val
listenerBus
=
new
StreamingListenerBus()

//创建Streaming流的监听器

// These two are created only when scheduler starts.

// eventLoop not being null means the scheduler has been started and not stopped

var
receiverTracker: ReceiverTracker
= null

// A tracker to track all
the input stream information as well as processed record number

var
inputInfoTracker:
InputInfoTracker = null

private var
eventLoop: EventLoop[JobSchedulerEvent]
= null

def
start():
Unit
= synchronized {

if
(eventLoop
!=
null)
return
// scheduler has already been started

logDebug("Starting
JobScheduler")

eventLoop
=
new
EventLoop[JobSchedulerEvent]("JobScheduler")
{

override protected def
onReceive(event: JobSchedulerEvent):
Unit
= processEvent(event)

override protected def
onError(e:
Throwable):
Unit
= reportError("Error in job scheduler",
e)

}

eventLoop.start()

//通过实践循环来处理例如接收数据事件,通过事件驱动方式来处理,底层应该是根据RPC框架来处理

// attach rate controllers of input streams to receive
batch completion updates

for
{

inputDStream <- ssc.graph.getInputStreams

rateController <- inputDStream.rateController

} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)

receiverTracker
=
new
ReceiverTracker(ssc)

inputInfoTracker
=
new
InputInfoTracker(ssc)

receiverTracker.start()

jobGenerator.start()

logInfo("Started JobScheduler")

}

Event中进行事件的处理

private def
processEvent(event: JobSchedulerEvent) {

try
{

event
match
{

case
JobStarted(job,
startTime) => handleJobStart(job,
startTime)

case
JobCompleted(job,
completedTime) => handleJobCompletion(job,
completedTime)

case
ErrorReported(m,
e) => handleError(m,
e)

}

}
catch
{

case
e:
Throwable
=>

reportError("Error in job scheduler",
e)

}

}

private def
handleJobStart(job: Job,
startTime:
Long) {

val
jobSet =
jobSets.get(job.time)

val
isFirstJobOfJobSet = !jobSet.hasStarted

jobSet.handleJobStart(job)

if
(isFirstJobOfJobSet) {

// "StreamingListenerBatchStarted" should be posted
after calling "handleJobStart" to get the

// correct "jobSet.processingStartTime".

listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))

}

job.setStartTime(startTime)

listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))

logInfo("Starting job "
+ job.id +
" from job set of time "
+ jobSet.time)

}

private def
handleJobCompletion(job: Job,
completedTime:
Long) {

val
jobSet =
jobSets.get(job.time)

jobSet.handleJobCompletion(job)

job.setEndTime(completedTime)

listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))

logInfo("Finished job "
+ job.id +
" from job set of time "
+ jobSet.time)

if
(jobSet.hasCompleted) {

jobSets.remove(jobSet.time)

jobGenerator.onBatchCompletion(jobSet.time)

logInfo("Total delay: %.3f s for time %s (execution:
%.3f s)".format(

jobSet.totalDelay /
1000.0,
jobSet.time.toString,

jobSet.processingDelay /
1000.0

))

listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))

}

job.result
match
{

case
Failure(e) =>

reportError("Error running job "
+ job,
e)

case
_ =>

}

}

private def
handleError(msg:
String,
e:
Throwable) {

logError(msg,
e)

ssc.waiter.notifyError(e)

}

private class
JobHandler(job: Job)
extends
Runnable
with
Logging {

import
JobScheduler._

def
run() {

try
{

val
formattedTime = UIUtils.formatBatchTime(

job.time.milliseconds,
ssc.graph.batchDuration.milliseconds,
showYYYYMMSS =
false)

val
batchUrl =
s"/streaming/batch/?id=${job.time.milliseconds}"

val
batchLinkText =
s"[output operation
${job.outputOpId},
batch time ${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY,
job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY,
job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable.
Otherwise, because

// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

// it‘s possible that when `post` is called, `eventLoop` happens to null.

var
_eventLoop =
eventLoop

if
(_eventLoop !=
null) {

_eventLoop.post(JobStarted(job,
clock.getTimeMillis()))

// Disable checks for existing output directories
in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true)
{

job.run()

}

_eventLoop =
eventLoop

if
(_eventLoop !=
null) {

_eventLoop.post(JobCompleted(job,
clock.getTimeMillis()))

}

}
else
{

// JobScheduler has been stopped.

}

}
finally
{

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY,
null)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY,
null)

}

}

}

}

private[streaming]
object
JobScheduler {

val
BATCH_TIME_PROPERTY_KEY
=
"spark.streaming.internal.batchTime"

val
OUTPUT_OP_ID_PROPERTY_KEY
=
"spark.streaming.internal.outputOpId"

}

JobGernerated代码部分,提交JobSet将submitJob事件放在事件队列中

private def
generateJobs(time: Time) {

// Set the SparkEnv in this thread, so that job generation
code can access the environment

// Example: BlockRDDs are created in this thread, and it needs to access BlockManager

// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

SparkEnv.set(ssc.env)

Try
{

jobScheduler.receiverTracker.allocateBlocksToBatch(time)
// allocate received blocks to batch

graph.generateJobs(time)
// generate jobs using allocated block

}
match
{

case
Success(jobs) =>

val
streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time,
jobs,
streamIdToInputInfos))

case
Failure(e) =>

jobScheduler.reportError("Error generating jobs for
time " + time,
e)

}

eventLoop.post(DoCheckpoint(time,
clearCheckpointDataLater =
false))

}

获取Input输入信息,触发submitJobset分发job到每台机器,在jobScheduler中进行调用调用jobHandler,jobHandler继承自runnerbal接口

foreachRDD会将Dstream添加到DstreamGraph中

在JobScheduler中会实例化receiverTracker和InputInforTracker的变量,在ReceiverTracker中会创建一个ReceiverTrackerEndpoint的变量

def
start():
Unit
= synchronized {

if
(isTrackerStarted) {

throw new
SparkException("ReceiverTracker already started")

}

if
(!receiverInputStreams.isEmpty)
{

endpoint
= ssc.env.rpcEnv.setupEndpoint(

"ReceiverTracker",
new
ReceiverTrackerEndpoint(ssc.env.rpcEnv))

if
(!skipReceiverLaunch) launchReceivers()

logInfo("ReceiverTracker started")

trackerState
=
Started

}

}

查看lauchReceivers中发现

*
worker nodes as a parallel collection, and runs them.

*/

private def
launchReceivers():
Unit
= {

val
receivers =
receiverInputStreams.map(nis
=> {

val
rcvr = nis.getReceiver()

rcvr.setReceiverId(nis.id)

rcvr

})

runDummySparkJob()

logInfo("Starting "
+ receivers.length +
" receivers")

endpoint.send(StartAllReceivers(receivers))

}

获取个个worker节点的Reciver,查看runDummySparkJob发现课程一中,MakRDD,作为负载均衡将信息均发到各个节点上去。

在ReceiverTrackerEndPoint接收到StartALLReceivers消息,并做如下处理

override def
receive: PartialFunction[Any,
Unit] = {

// Local messages

case
StartAllReceivers(receivers)
=>

val
scheduledLocations =
schedulingPolicy.scheduleReceivers(receivers,
getExecutors)

for
(receiver <- receivers) {

val
executors = scheduledLocations(receiver.streamId)

updateReceiverScheduledExecutors(receiver.streamId,
executors)

receiverPreferredLocations(receiver.streamId)
= receiver.preferredLocation

startReceiver(receiver,
executors)

}

中调用StartReciver中发现new ReceiverSupervisorImpl

private def
startFirstTime() {

val
startTime =
new
Time(timer.getStartTime())

graph.start(startTime
- graph.batchDuration)

timer.start(startTime.milliseconds)

logInfo("Started JobGenerator at "
+ startTime)

}

如果使用了checkpoint那么就是重新启动,没有启动就startFirstTime方法,在startFirstTime中发现graph.start,和time1.start

def start(time: Time) {

this.synchronized {

require(zeroTime == null, "DStream graph computation already started")

zeroTime = time

startTime = time

outputStreams.foreach(_.initialize(zeroTime))

outputStreams.foreach(_.remember(rememberDuration))

outputStreams.foreach(_.validateAtStart)

inputStreams.par.foreach(_.start())

}

}

调用DstreamGraph的start方法

回到前面的submitJob中,submitJob的第二个参数,是一个函数,它的功能是Worker节点上启动Receiver

val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

supervisor.start()

supervisor.awaitTermination()

这里要弄清ReceiverInputDstream和Recevier的区别。Receiver是具体接收数据的,而ReceiverInputDstream是对Receiver做了一层封装。

private[streaming]
final def
getOrCompute(time: Time): Option[RDD[T]]
= {

// If RDD was already generated, then retrieve it from
HashMap,

// or else compute the RDD

generatedRDDs.get(time).orElse
{

// Compute the RDD if time is valid (e.g. correct time
in a sliding window)

// of RDD generation, else generate nothing.

if
(isTimeValid(time)) {

val
rddOption = createRDDWithLocalProperties(time,
displayInnerRDDOps =
false) {

// Disable checks for existing output directories
in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details. We need to have this call here because

// compute() might cause Spark jobs to be launched.

PairRDDFunctions.disableOutputSpecValidation.withValue(true)
{

compute(time)

}

}

rddOption.foreach {
case
newRDD =>

// Register the generated RDD for caching and checkpointing

if
(storageLevel
!= StorageLevel.NONE)
{

newRDD.persist(storageLevel)

logDebug(s"Persisting RDD
${newRDD.id} for
time $time to
$storageLevel")

}

if
(checkpointDuration
!=
null
&& (time -
zeroTime).isMultipleOf(checkpointDuration))
{

newRDD.checkpoint()

logInfo(s"Marking RDD
${newRDD.id} for
time $time for
checkpointing")

}

generatedRDDs.put(time,
newRDD)

}

rddOption

}
else
{

None

}

}

}

是用getOrCompute方法,生成指定时间的RDD

备注:

1、DT大数据梦工厂微信公众号DT_Spark

2、IMF晚8点大数据实战YY免费直播频道号:68917580

3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-11-07 12:45:55

基于案例一节课贯通Spark Streaming流计算框架的运行源码的相关文章

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

spark版本定制五:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 一.在线动态计算分类最热门商品案例回顾与演示 案例回顾: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.

版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCo

基于案例贯通 Spark Streaming 流计算框架的运行源码

本期内容 : Spark Streaming+Spark SQL案例展示 基于案例贯穿Spark Streaming的运行源码 一. 案例代码阐述 : 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等. 1.案例运行代码 : import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCont

Spark 定制版:005~贯通Spark Streaming流计算框架的运行源码

本讲内容: a. 在线动态计算分类最热门商品案例回顾与演示 b. 基于案例贯通Spark Streaming的运行源码 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课主要从事务视角为大家探索Spark Streaming架构机制:Spark Streaming程序分成而部分,一部分是Driver,另外一部分是Executor.通过对Driver和Executor解析,洞察怎么才能完成完整的语义.事务一致性,并保证数据的零丢失,Exa

Spark发行版笔记5:贯通Spark Streaming流计算框架的运行源码

本章节内容: 一.在线动态计算分类最热门商品案例回顾 二.基于案例贯通Spark Streaming的运行源码 先看代码(源码场景:用户.用户的商品.商品的点击量排名,按商品.其点击量排名前三): package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext impo

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

5.Spark Streaming流计算框架的运行流程源码分析2

1 spark streaming 程序代码实例 代码如下: [html] view plain copy object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[String]){ val conf = new SparkConf() //创建SparkConf对象 //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setAppName("OnlineTheTop3ItemForEachCategor