Spark Streaming结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收实时数据与关系型数据库中的表进行相关的查询操作;

使用技术:Spark Streaming + Spark JDBC External DataSources

代码雏形:

package com.luogankun.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext

case class Student(id: Int, name: String, cityId: Int)

object HDFSStreaming {
  def main(args: Array[String]) {

    val location = args(0)  //HDFS文件路径

    val sparkConf = new SparkConf().setAppName("HDFS JDBC Streaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))

    val sqlContext = new HiveContext(sc)
    import sqlContext.createSchemaRDD

    import  com.luogankun.spark.jdbc._

    //使用External Data Sources处理MySQL中的数据
    val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root","root","select id, name from city") 

    //将cities RDD注册成city临时表
    cities.registerTempTable("city")

    val inputs = ssc.textFileStream(location)
    inputs.foreachRDD(rdd => {
      if (rdd.partitions.length > 0) {        //将Streaming中接收到的数据注册成student临时表
        rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student"); 

        //关联Streaming和MySQL表进行查询操作
        sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city_table c on s.cityId=c.id").collect().foreach(println)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

提交到Spark集群处理脚本:

spark-submit --name SparkSubmit_Demo --class com.luogankun.spark.streaming.HDFSStreaming --master spark://hadoop000:7077 \
--executor-memory 1G --total-executor-cores 1 /home/spark/lib/streaming.jar hdfs://hadoop000:8020/data/hdfs
时间: 2024-08-25 04:56:33

Spark Streaming结合Spark JDBC External DataSouces处理案例的相关文章

Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作: Kafka发送过来的数据格式为:id.name.cityId,分隔符为tab 1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3 MySQL的表city结构为:id int, name varchar 1 bj 2 sz 3 sh 本案例的结果为:select s.id, s.name, s.cityId, c.name from student s

第97课:Spark Streaming 结合Spark SQL 案例

代码如下: package com.dt.spark.streaming import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} /**  * 使用SparkStreaming结合SparkSQL对日志进行分析.  * 假设电商网站点击日志格式(简化)

使用Spark Streaming + Kudu + Impala构建一个预测引擎

随着用户使用天数的增加,不管你的业务是扩大还是缩减了,为什么你的大数据中心架构保持线性增长的趋势?很明显需要一个稳定的基本架构来保障你的业务线.当你的客户处在休眠期,或者你的业务处在淡季,你增加的计算资源就处在浪费阶段:相对应地,当你的业务在旺季期,或者每周一每个人对上周的数据进行查询分析,有多少次你忒想拥有额外的计算资源. 根据需求水平动态分配资源 VS 固定的资源分配方式,似乎不太好实现.幸运的是,借助于现今强大的开源技术,可以很轻松的实现你所愿.在这篇文章中,我将给出一个解决例子,基于流式

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用, 还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码. Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream. DStream 是随时间推移而收到的数据的序列.在内部,每个时间区间收到

Spark Streaming源代码学习总结(一)

1.Spark Streaming 代码分析: 1.1 演示样例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def mai

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea

Spark Streaming源码学习总结(一)

1.Spark Streaming 代码分析: 1.1 示例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def main(

Spark Streaming容错的改进和零数据丢失

本文来自Spark Streaming项目带头人 Tathagata Das的博客文章,他现在就职于Databricks公司.过去曾在UC Berkeley的AMPLab实验室进行大数据和Spark Streaming的研究工作.本文主要谈及了Spark Streaming容错的改进和零数据丢失. 以下为原文: 实时流处理系统必须要能在24/7时间内工作,因此它需要具备从各种系统故障中恢复过来的能力.最开始,Spark Streaming就支持从driver和worker故障恢复的能力.然而有些