- 在线动态计算分类最热门商品案例回顾与演示
- 基于案例贯通Spark Streaming的运行源码
- 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机。
是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark
SQL作为查询引擎。其中链接数据库代码如下:
package
com.dt.spark.com.dt.spark.streaming;
import
java.sql.Connection;
import
java.sql.DriverManager;
import
java.util.LinkedList;
public class
ConnectionPool {
private static
LinkedList<Connection>
connectionQueue;
static
{
try
{
Class.forName("com.mysql.jdbc.Driver");
}
catch
(ClassNotFoundException e) {
e.printStackTrace();
}
}
public synchronized static
Connection
getConnection() {
try
{
if(connectionQueue
==
null) {
connectionQueue
=
new
LinkedList<Connection>();
for(int
i =
0;
i <
5;
i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://Master:3306/sparkstreaming",
"root","123456");
connectionQueue.push(conn);
}
}
}
catch
(Exception e) {
e.printStackTrace();
}
return
connectionQueue.poll();
}
public static void
returnConnection(Connection conn) {
connectionQueue.push(conn);
}
}
操作代码如下:
package
com.dt.spark.com.dt.spark.streaming
import
org.apache.spark.SparkConf
import
org.apache.spark.sql.Row
import
org.apache.spark.sql.hive.HiveContext
import
org.apache.spark.sql.types.{IntegerType,
StringType,
StructField,
StructType}
import
org.apache.spark.streaming.{Seconds,
StreamingContext}
/**
* 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
* 下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
*
*
@author
DT大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains/
* 实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用ML、sql、graphx等功能是因为有foreachRDD和Transform
* 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
* 假设说这里的数据的格式:user item category,例如Rocky Samsung Android
*/
object
OnlineTheTop3ItemForEachCategory2DB {
def
main(args: Array[String]){
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1G的内存)的初学者 *
*/
val
conf =
new
SparkConf()
//创建SparkConf对象
conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")
//设置应用程序的名称,在程序运行的监控界面可以看到名称
// conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
conf.setMaster("local[6]")
//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
val
ssc =
new
StreamingContext(conf,
Seconds(5))
ssc.checkpoint("/root/resource/checkpoint/")
val
userClickLogsDStream = ssc.socketTextStream("Master",
9999)
val
formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
(clickLog.split(" ")(2)
+ "_"
+ clickLog.split(" ")(1),
1))
val
categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
_-_,
Seconds(60),
Seconds(20))
categoryUserClickLogsDStream.foreachRDD { rdd => {
if
(rdd.isEmpty()) {
println("No
data inputted!!!")
}
else
{
val
categoryItemRow = rdd.map(reducedItem => {
val
category = reducedItem._1.split("_")(0)
val
item = reducedItem._1.split("_")(1)
val
click_count = reducedItem._2
Row(category,
item,
click_count)
})
val
structType =
StructType(Array(
StructField("category",
StringType,
true),
StructField("item",
StringType,
true),
StructField("click_count",
IntegerType,
true)
))
val
hiveContext =
new
HiveContext(rdd.context)
val
categoryItemDF = hiveContext.createDataFrame(categoryItemRow,
structType)
categoryItemDF.registerTempTable("categoryItemTable")
val
reseltDataFram = hiveContext.sql("SELECT category,item,click_count
FROM (SELECT category,item,click_count,row_number()" +
" OVER (PARTITION BY category ORDER BY click_count
DESC) rank FROM categoryItemTable) subquery " +
" WHERE rank <= 3")
reseltDataFram.show()
val
resultRowRDD = reseltDataFram.rdd
resultRowRDD.foreachPartition
{ partitionOfRecords => {
if
(partitionOfRecords.isEmpty){
println("This
RDD is not null but partition is null")
}
else
{
// ConnectionPool is a static, lazily initialized
pool of connections
val
connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val
sql =
"insert into categorytop3(category,item,client_count) values(‘"
+ record.getAs("category")
+ "‘,‘"
+
record.getAs("item")
+ "‘,"
+ record.getAs("click_count")
+ ")"
val
stmt = connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection)
// return to the pool for future reuse
}
}
}
}
}
}
/**
* 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler
* 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:
* 1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job
* 2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
* 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker
* 内部会通过ReceivedBlockTracker来管理接受到的元数据信息
* 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
* 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
* 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?
* 1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
* 2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;
*
*/
ssc.start()
ssc.awaitTermination()
}
}
*/
def this(
master:
String,
appName:
String,
batchDuration: Duration,
sparkHome:
String
=
null,
jars:
Seq[String]
= Nil,
environment: Map[String,
String] = Map()) = {
this(StreamingContext.createNewSparkContext(master,
appName,
sparkHome,
jars,
environment),
null,
batchDuration)
}
StreamingContext在创建时会通过sparkConf在内部会构建SparkContext,所以StreamingContext是建立在一个SparkContext实例上,从这点也可以说明Spark
Streaming是运行在Spark Core之上的。
def
socketStream[T:
ClassTag](
hostname:
String,
port:
Int,
converter: (InputStream) =>
Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T]
= {
new
SocketInputDStream[T](this,
hostname,
port,
converter,
storageLevel)
}
SocketStream底层调用的是SocketInputDStream实例,继承自ReceiverInputDStream通过SocketInputDstream然后基于Receiver方法接受数据
private[streaming]
class
SocketInputDStream[T:
ClassTag](
ssc_ : StreamingContext,
host:
String,
port:
Int,
bytesToObjects: InputStream =>
Iterator[T],
storageLevel: StorageLevel
)
extends
ReceiverInputDStream[T](ssc_)
{
def
getReceiver(): Receiver[T]
= {
new
SocketReceiver(host,
port,
bytesToObjects,
storageLevel)
}
}
在ReceiverInputDstream构建的时候会初始化一个ReceiverRateController
override protected[streaming]
val
rateController: Option[RateController]
= {
if
(RateController.isBackPressureEnabled(ssc.conf))
{
Some(new
ReceiverRateController(id,
RateEstimator.create(ssc.conf,
ssc.graph.batchDuration)))
}
else
{
None
}
}
private[streaming]
class
ReceiverRateController(id:
Int,
estimator: RateEstimator)
extends
RateController(id,
estimator) {
override def
publish(rate:
Long):
Unit
=
ssc.scheduler.receiverTracker.sendRateUpdate(id,
rate)
}
在此对其做负载均衡
通过启动一条线程来接受Socket网络数据
/** Create a socket connection
and receive data until receiver is stopped */
def
receive() {
var
socket: Socket =
null
try
{
logInfo("Connecting to "
+ host +
":"
+ port)
socket =
new
Socket(host,
port)
logInfo("Connected to "
+ host +
":"
+ port)
val
iterator = bytesToObjects(socket.getInputStream())
while(!isStopped
&& iterator.hasNext) {
store(iterator.next)
}
if
(!isStopped()) {
restart("Socket data stream had no more data")
}
else
{
logInfo("Stopped receiving")
}
}
catch
{
case
e: java.net.ConnectException =>
restart("Error connecting to "
+ host +
":"
+ port,
e)
case
NonFatal(e) =>
logWarning("Error receiving data",
e)
restart("Error receiving data",
e)
}
finally
{
if
(socket !=
null) {
socket.close()
logInfo("Closed socket to "
+ host +
":"
+ port)
}
}
}
}
ssc.start()
*/
def
start():
Unit
= synchronized {
state
match
{
case
INITIALIZED
=>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try
{
validate()
// Start the streaming scheduler in a new thread,
so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start")
{
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
"false")
scheduler.start()
}
state
= StreamingContextState.ACTIVE
}
catch
{
调用JobScheduler的Start方法
class
JobScheduler(val
ssc: StreamingContext)
extends
Logging
private val
jobSets: java.util.Map[Time,
JobSet] =
new
ConcurrentHashMap[Time,
JobSet]
//时间和JobSet的对应
private val
numConcurrentJobs
= ssc.conf.getInt("spark.streaming.concurrentJobs",
1)
private val
jobExecutor
=
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs,
"streaming-job-executor")
private val
jobGenerator
=
new
JobGenerator(this)
//创建一个job的生成器
val
clock
=
jobGenerator.clock
//产生job的时间源
val
listenerBus
=
new
StreamingListenerBus()
//创建Streaming流的监听器
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var
receiverTracker: ReceiverTracker
= null
// A tracker to track all
the input stream information as well as processed record number
var
inputInfoTracker:
InputInfoTracker = null
private var
eventLoop: EventLoop[JobSchedulerEvent]
= null
def
start():
Unit
= synchronized {
if
(eventLoop
!=
null)
return
// scheduler has already been started
logDebug("Starting
JobScheduler")
eventLoop
=
new
EventLoop[JobSchedulerEvent]("JobScheduler")
{
override protected def
onReceive(event: JobSchedulerEvent):
Unit
= processEvent(event)
override protected def
onError(e:
Throwable):
Unit
= reportError("Error in job scheduler",
e)
}
eventLoop.start()
//通过实践循环来处理例如接收数据事件,通过事件驱动方式来处理,底层应该是根据RPC框架来处理
// attach rate controllers of input streams to receive
batch completion updates
for
{
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker
=
new
ReceiverTracker(ssc)
inputInfoTracker
=
new
InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
Event中进行事件的处理
private def
processEvent(event: JobSchedulerEvent) {
try
{
event
match
{
case
JobStarted(job,
startTime) => handleJobStart(job,
startTime)
case
JobCompleted(job,
completedTime) => handleJobCompletion(job,
completedTime)
case
ErrorReported(m,
e) => handleError(m,
e)
}
}
catch
{
case
e:
Throwable
=>
reportError("Error in job scheduler",
e)
}
}
private def
handleJobStart(job: Job,
startTime:
Long) {
val
jobSet =
jobSets.get(job.time)
val
isFirstJobOfJobSet = !jobSet.hasStarted
jobSet.handleJobStart(job)
if
(isFirstJobOfJobSet) {
// "StreamingListenerBatchStarted" should be posted
after calling "handleJobStart" to get the
// correct "jobSet.processingStartTime".
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
job.setStartTime(startTime)
listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
logInfo("Starting job "
+ job.id +
" from job set of time "
+ jobSet.time)
}
private def
handleJobCompletion(job: Job,
completedTime:
Long) {
val
jobSet =
jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job "
+ job.id +
" from job set of time "
+ jobSet.time)
if
(jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution:
%.3f s)".format(
jobSet.totalDelay /
1000.0,
jobSet.time.toString,
jobSet.processingDelay /
1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result
match
{
case
Failure(e) =>
reportError("Error running job "
+ job,
e)
case
_ =>
}
}
private def
handleError(msg:
String,
e:
Throwable) {
logError(msg,
e)
ssc.waiter.notifyError(e)
}
private class
JobHandler(job: Job)
extends
Runnable
with
Logging {
import
JobScheduler._
def
run() {
try
{
val
formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds,
ssc.graph.batchDuration.milliseconds,
showYYYYMMSS =
false)
val
batchUrl =
s"/streaming/batch/?id=${job.time.milliseconds}"
val
batchLinkText =
s"[output operation
${job.outputOpId},
batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY,
job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY,
job.outputOpId.toString)
// We need to assign `eventLoop` to a temp variable.
Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it‘s possible that when `post` is called, `eventLoop` happens to null.
var
_eventLoop =
eventLoop
if
(_eventLoop !=
null) {
_eventLoop.post(JobStarted(job,
clock.getTimeMillis()))
// Disable checks for existing output directories
in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true)
{
job.run()
}
_eventLoop =
eventLoop
if
(_eventLoop !=
null) {
_eventLoop.post(JobCompleted(job,
clock.getTimeMillis()))
}
}
else
{
// JobScheduler has been stopped.
}
}
finally
{
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY,
null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY,
null)
}
}
}
}
private[streaming]
object
JobScheduler {
val
BATCH_TIME_PROPERTY_KEY
=
"spark.streaming.internal.batchTime"
val
OUTPUT_OP_ID_PROPERTY_KEY
=
"spark.streaming.internal.outputOpId"
}
JobGernerated代码部分,提交JobSet将submitJob事件放在事件队列中
private def
generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation
code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try
{
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
// allocate received blocks to batch
graph.generateJobs(time)
// generate jobs using allocated block
}
match
{
case
Success(jobs) =>
val
streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time,
jobs,
streamIdToInputInfos))
case
Failure(e) =>
jobScheduler.reportError("Error generating jobs for
time " + time,
e)
}
eventLoop.post(DoCheckpoint(time,
clearCheckpointDataLater =
false))
}
获取Input输入信息,触发submitJobset分发job到每台机器,在jobScheduler中进行调用调用jobHandler,jobHandler继承自runnerbal接口
foreachRDD会将Dstream添加到DstreamGraph中
在JobScheduler中会实例化receiverTracker和InputInforTracker的变量,在ReceiverTracker中会创建一个ReceiverTrackerEndpoint的变量
def
start():
Unit
= synchronized {
if
(isTrackerStarted) {
throw new
SparkException("ReceiverTracker already started")
}
if
(!receiverInputStreams.isEmpty)
{
endpoint
= ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker",
new
ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if
(!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState
=
Started
}
}
查看lauchReceivers中发现
*
worker nodes as a parallel collection, and runs them.
*/
private def
launchReceivers():
Unit
= {
val
receivers =
receiverInputStreams.map(nis
=> {
val
rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting "
+ receivers.length +
" receivers")
endpoint.send(StartAllReceivers(receivers))
}
获取个个worker节点的Reciver,查看runDummySparkJob发现课程一中,MakRDD,作为负载均衡将信息均发到各个节点上去。
在ReceiverTrackerEndPoint接收到StartALLReceivers消息,并做如下处理
override def
receive: PartialFunction[Any,
Unit] = {
// Local messages
case
StartAllReceivers(receivers)
=>
val
scheduledLocations =
schedulingPolicy.scheduleReceivers(receivers,
getExecutors)
for
(receiver <- receivers) {
val
executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId,
executors)
receiverPreferredLocations(receiver.streamId)
= receiver.preferredLocation
startReceiver(receiver,
executors)
}
中调用StartReciver中发现new ReceiverSupervisorImpl
private def
startFirstTime() {
val
startTime =
new
Time(timer.getStartTime())
graph.start(startTime
- graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at "
+ startTime)
}
如果使用了checkpoint那么就是重新启动,没有启动就startFirstTime方法,在startFirstTime中发现graph.start,和time1.start
def start(time: Time) {
this.synchronized {
require(zeroTime == null, "DStream graph computation already started")
zeroTime = time
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart)
inputStreams.par.foreach(_.start())
}
}
调用DstreamGraph的start方法
回到前面的submitJob中,submitJob的第二个参数,是一个函数,它的功能是Worker节点上启动Receiver
val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
这里要弄清ReceiverInputDstream和Recevier的区别。Receiver是具体接收数据的,而ReceiverInputDstream是对Receiver做了一层封装。
private[streaming]
final def
getOrCompute(time: Time): Option[RDD[T]]
= {
// If RDD was already generated, then retrieve it from
HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse
{
// Compute the RDD if time is valid (e.g. correct time
in a sliding window)
// of RDD generation, else generate nothing.
if
(isTimeValid(time)) {
val
rddOption = createRDDWithLocalProperties(time,
displayInnerRDDOps =
false) {
// Disable checks for existing output directories
in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true)
{
compute(time)
}
}
rddOption.foreach {
case
newRDD =>
// Register the generated RDD for caching and checkpointing
if
(storageLevel
!= StorageLevel.NONE)
{
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD
${newRDD.id} for
time $time to
$storageLevel")
}
if
(checkpointDuration
!=
null
&& (time -
zeroTime).isMultipleOf(checkpointDuration))
{
newRDD.checkpoint()
logInfo(s"Marking RDD
${newRDD.id} for
time $time for
checkpointing")
}
generatedRDDs.put(time,
newRDD)
}
rddOption
}
else
{
None
}
}
}
是用getOrCompute方法,生成指定时间的RDD
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY免费直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains