sparkStraming存储数据到mysql

package sparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import spark.bean.orders
import java.util.Properties
import java.sql.{DriverManager, PreparedStatement, Connection}
import org.apache.spark.{SparkContext, SparkConf}  

object WebPagePopularityValueCalculator {

  def main(args: Array[String]) {

    val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_, 2)).toMap

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    val popularityData = lines.map { msgLine =>
      {
        val dataArr: Array[String] = msgLine.split("\\|")
        val pageID = dataArr(0)
        val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
        (pageID, popValue)
      }
    }
    //sum the previous popularity value and current value
    val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
      iterator.flatMap(t => {
        val newValue: Double = t._2.sum
        val stateValue: Double = t._3.getOrElse(0);
        Some(newValue + stateValue)
      }.map(sumedValue => (t._1, sumedValue)))
    }
    val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
    val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
      new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
    //set the checkpoint interval to avoid too frequently data checkpoint which may
    //may significantly reduce operation throughput
    stateDstream.checkpoint(Duration(8 * 2 * 1000))
    //after calculation, we need to sort the result and only show the top 10 hot pages
    stateDstream.foreachRDD { rdd =>
      {
        val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
        val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
        topKData.foreach{ case (k, v) =>
          if(v != 0) {
             println("page" + k + "  " + "value" + v)
             val itb = Iterator((k, v))
             toMySql(itb)
          }

        }

      }
    }
    ssc.start()
    ssc.awaitTermination()
  }

  def toMySql(iterator: Iterator[(String, Double)]): Unit = {
        var conn: Connection = null
        var ps: PreparedStatement = null
        val sql = "insert into userbehavior(page, number) values (?, ?)"
        try {
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "[email protected]")
            iterator.foreach(dataIn => {
                ps = conn.prepareStatement(sql)
                ps.setString(1, dataIn._1)
                ps.setDouble(2, dataIn._2)
                ps.executeUpdate()
            }
            )
        } catch {
            case e: Exception => e.printStackTrace()
        } finally {
            if (ps != null) {
                ps.close()
        }
            if (conn != null) {
                conn.close()
            }
        }
  }

}
时间: 2024-12-15 01:53:43

sparkStraming存储数据到mysql的相关文章

一个系统存储由memcache+mysql组成,写一条数据的时候,更新memcache有几种方式,

一个系统存储由memcache+mysql组成,写一条数据的时候,更新memcache有几种方式,优缺点是什么? 缓存更新(不仅仅是memceche)有2种策略 一种是写时更新 一种是读时更新 一.写时更新是指  写db成功以后  同时更新缓存 , 能有效减少穿透  但是  容易引起数据的不一致 二.读时更新是说 写完db  删除缓存,等到 需要读得时候  在重建缓存, 一致性可以保证  但是  穿透大,容易给db造成压力

python爬取微博图片数据存到Mysql中遇到的各种坑\python Mysql存储图片

本人长期出售超大量微博数据,并提供特定微博数据打包,Message to [email protected] 前言   由于硬件等各种原因需要把大概170多万2t左右的微博图片数据存到Mysql中.之前存微博数据一直用的非关系型数据库mongodb,由于对Mysql的各种不熟悉,踩了无数坑,来来回回改了3天才完成. 挖坑填坑之旅 建表 存数据的时候首先需要设计数据库,我准备设计了3个表 微博表:[id, userid, blog_text, lat, lng, created_time, res

将SpringCloud ConfigServer持久化存储改为MySQL

原文发布于:http://www.gufeng.tech/ 1.背景 SpringCloud的ConfigServer默认是持久化使用的是git.git有它天然的优势,比如多版本管理.分支管理.提交审核策略等等,但是如果相对其中存储的数据做细粒度的权限控制,就力不从心了.当然,也可以改变使用方式以适应这种特点,但是今天我们要做的是将持久化从git迁移到MySQL上. 2.查询配置信息 ConfigServer有个接口:org.springframework.cloud.config.server

etcd安装部署及数据同步MySQL

一.etcd说明及原理 二.etcd安装部署说明 三.etcd操作说明 四.python安装etcd 五.python-etcd使用说明 六.通过脚本获取本地的信息上传到etcd 七.通过脚本将etc的数据同步到mysql 一.etcd 简介 etcd是用于共享配置和服务发现的分布式,一致的键值存储,重点是: 简单:定义明确,面向用户的API(gRPC) 安全:使用可选的客户端证书认证的自动TLS 快速:基准测试10,000写/秒 可靠:使用Raft协议来进行合理的分布式 etcd是在Go中编写

QString内部仍采用UTF-16存储数据且不会改变(一共9种不同情况下的编码)

出处:https://blog.qt.io/cn/2012/05/16/source-code-must-be-utf-8-and-qstring-wants-it/ 但是注意,这只是QT运行(RunTime)过程中采用的编码,并不代表源码文件里也是这样的.恰恰相反,源码文件必须是UTF8,带不带BOM都可以.我认为,虽然存储在源代码里的中文字是UTF8,但是QT在编译过程中,遇到中文会立即转换成UTF-16从而对源码里的中文字符进行编码,并存储在EXE文件里(这里对编译器来说,都是静态字符,必

使用嵌入式关系型SQLite数据库存储数据

除了可以使用文件或SharedPreferences存储数据,还可以选择使用SQLite数据库存储数据. 在Android平台上,集成了一个嵌入式关系型数据库—SQLite, 1.SQLite3支持 NULL.INTEGER.REAL(浮点数字).TEXT(字符串文本)和BLOB(二进制对象)数据类型,虽然它支持的类型虽然只有五种,但实际上sqlite3也接受varchar(n).char(n).decimal(p,s) 等数据类型,只不过在运算或保存时会转成对应的五种数据类型. 2.SQLit

使用Merge存储引擎实现MySQL分表

一.使用场景 Merge表有点类似于视图.使用Merge存储引擎实现MySQL分表,这种方法比较适合那些没有事先考虑分表,随着数据的增多,已经出现了数据查询慢的情况. 这个时候如果要把已有的大数据量表分开比较痛苦,最痛苦的事就是改代码.所以使用Merge存储引擎实现MySQL分表可以避免改代码. Merge引擎下每一张表只有一个MRG文件.MRG里面存放着分表的关系,以及插入数据的方式.它就像是一个外壳,或者是连接池,数据存放在分表里面. 对于增删改查,直接操作总表即可. 二.建表 1.用户1表

将Excel数据导入mysql数据库的几种方法

将Excel数据导入mysql数据库的几种方法 “我的面试感悟”有奖征文大赛结果揭晓! 前几天需要将Excel表格中的数据导入到mysql数据库中,在网上查了半天,研究了半天,总结出以下几种方法,下面和大家分享一下: 一.用java来将Excel表格中的数据转到mysql中 这是我们用的第一种方法,就是在java找你感谢个类,然后这个类会将Excel表格中的数据存储到内存里,然后再从内存中读出来插入到数据库中,但是要 注意了,这里是存储到String[ ]数组里面,所以取出来的数据也是Strin

Excel连接到MySQL,将Excel数据导入MySql,MySQL for Excel,,

Excel连接到MySQL 即使当今时代我们拥有了类似微软水晶报表之类的强大报表工具和其他一些灵活的客户管 理应用工具,众多企业在分析诸如销售统计和收入信息的时候,微软的Excel依然是最常用的工具.这当然不是没有理由的:Excel以其强大丰富的各种功 能,已经成为办公环境中不可或缺的工具. 然而,现在公司正在逐渐地将数据开始存储在远程数据库中,这样可以供企业员工从不同的地方来阅读和修改数据.但是,以前固有的工作流程习惯是很难打破的.当你的老板需要从远端使用Excel以饼图的形势来看一下最近的销