Spark将计算结果写入到Mysql中

今天主要来谈谈如何将Spark计算的结果写入到Mysql或者其他的关系型数据库里面。其实方式也很简单,代码如下:

package scala

import java.sql.{DriverManager, PreparedStatement, Connection}
import org.apache.spark.{SparkContext, SparkConf}

object RDDtoMysql {

  case class Blog(name: String, count: Int)

  def myFun(iterator: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into blog(name, count) values (?, ?)"
    try {
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark",
    "root", "123456")
      iterator.foreach(data => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, data._1)
        ps.setInt(2, data._2)
        ps.executeUpdate()
      }
      )
    } catch {
      case e: Exception => println("Mysql Exception")
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("RDDToMysql").setMaster("local")
    val sc = new SparkContext(conf)
    val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
    data.foreachPartition(myFun)
  }
}

其实是通过foreachPartition遍历RDD的每个分区,并调用普通的Scala方法来写数据库。在运行程序之前需要确保数据库里面存在blog表,可以通过下面语句创建:

CREATE TABLE `blog` (
  `name` varchar(255) NOT NULL,
  `count` int(10) unsigned DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf-8

然后直接运行上述的代码即可。运行完成你就可以在数据库里面查询结果:

SELECT * FROM blog b;
www  10
iteblog  20
com  30

需要注意的是:

  1、你最好使用foreachPartition 函数来遍历RDD,并且在每台Work上面创建数据库的connection。

  2、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。

  3、在插入Mysql的时候最好使用批量插入。

  4、确保你写入数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能会导致数据插入到数据库失败。

  5、不建议将你的RDD数据写入到Mysql等关系型数据库中。

时间: 2024-10-11 14:26:45

Spark将计算结果写入到Mysql中的相关文章

Spark操作dataFrame进行写入mysql,自定义sql的方式

业务场景: 现在项目中需要通过对spark对原始数据进行计算,然后将计算结果写入到mysql中,但是在写入的时候有个限制: 1.mysql中的目标表事先已经存在,并且当中存在主键,自增长的键id 2.在进行将dataFrame写入表的时候,id字段不允许手动写入,因为其实自增长的 要求: 1.写入数据库的时候,需要指定字段写入,也就是说,只指定部分字段写入 2.在写入数据库的时候,对于操作主键相同的记录要实现更新操作,非插入操作 分析: spark本身提供了对dataframe的写入数据库的操作

mysql中计算两个日期的时间差函数TIMESTAMPDIFF用法

mysql中计算两个日期的时间差函数TIMESTAMPDIFF用法: 语法: TIMESTAMPDIFF(interval,datetime_expr1,datetime_expr2) 说明: 返回日期或日期时间表达式datetime_expr1 和datetime_expr2the 之间的整数差.其结果的单位由interval 参数给出.interval 的法定值同TIMESTAMPADD()函数说明中所列出的相同. mysql> SELECT TIMESTAMPDIFF(MONTH,'200

第96课: 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

本期内容 技术实现解析 实现实战 SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统.然而,重要的是要了解如何正确有效地使用这种原始方法.一些常见的错误,以避免如下: 写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统.为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据.例如如下scala

mysql中max_allowed_packet参数的配置方法(避免大数据写入或者更新失败)

这篇文章主要介绍了mysql中max_allowed_packet参数的配置方法,以及查看max_allowed_packet参数当前值的方法,需要的朋友可以参考下 MySQL根据配置文件会限制Server接受的数据包大小.有时候大的插入和更新会受 max_allowed_packet 参数限制,导致写入或者更新失败. 查看目前配置: 复制代码 代码如下: show VARIABLES like '%max_allowed_packet%'; 显示的结果为: 复制代码 代码如下: +------

php读取excel内容并写入mysql中

实现功能:上传EXCEL文件,并按照格式导入到数据库中 在网上查找到phpecel 但是phpexcel实在是太大,自身能力有限不能很好的精简,最终找到了phpexcelreader这个类:非常小又能实现我需要的功能 第一步.下载phpexcelReader类(后面给出) 第二步.创建add.php写入如下代码: $str=$_FILES['excelname']['tmp_name']; $data = new Spreadsheet_Excel_Reader($str); $data_arr

mysql中的日期处理 计算 字符串截取

一.MySQL 获得当前日期时间 函数 1.1 获得当前日期+时间(date + time)函数:now() mysql> select now(); +---------------------+ | now() | +---------------------+ | 2008-08-08 22:20:46 | +---------------------+ 除了 now() 函数能获得当前的日期时间外,MySQL 中还有下面的函数: current_timestamp() ,current_

基于Spark GraphX计算二度关系

关系计算问题描述 二度关系是指用户与用户通过关注者为桥梁发现到的关注者之间的关系.目前微博通过二度关系实现了潜在用户的推荐.用户的一度关系包含了关注.好友两种类型,二度关系则得到关注的关注.关注的好友.好友的关注.好友的好友四种类型. 如果要为全站亿级用户根据二度关系和四种桥梁类型推荐桥梁权重最高 TopN 个用户,大致估算了下总关系量在千亿级别,按照原有的 Mapreduce 模式计算整个二度关系,需要以桥梁用户为 Key,把它的关注和粉丝两个亿级的表做 Join,如果活跃用户按照亿计,平均关

第11课:Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究

本篇博文的目标是: Driver的ReceiverTracker接收到数据之后,下一步对数据是如何进行管理 一:ReceiverTracker的架构设计 1. Driver在Executor启动Receiver方式,每个Receiver都封装成一个Task,此时一个Job中就一个Task,而Task中就一条数据,也就是Receiver数据.由此,多少个Job也就可以启动多少个Receiver. 2. ReceiverTracker在启动Receiver的时候他有ReceiverSuperviso

MYSQL中的BlackHole引擎

MYSQL中的BlackHole引擎 http://blog.csdn.net/ylspirit/article/details/7234021 http://blog.chinaunix.net/uid-22646981-id-3271711.html MySQL在5.x系列提供了Blackhole引擎–“黑洞”.  其作用正如其名字一样:任何写入到此引擎的数据均会被丢弃掉, 不做实际存储:Select语句的内容永远是空. 和Linux中的 /dev/null 文件完成的作用完全一致. 那么,