Spark与mysql整合

一、需求:把最终结果存储在mysql中

1、UrlGroupCount1类

import java.net.URL
import java.sql.DriverManager

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 把最终结果存储在mysql中
  */
object UrlGroupCount1 {
  def main(args: Array[String]): Unit = {
    //1.创建spark程序入口
    val conf: SparkConf = new SparkConf().setAppName("UrlGroupCount1").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    //2.加载数据
    val rdd1: RDD[String] = sc.textFile("e:/access.log")

    //3.将数据切分
    val rdd2: RDD[(String, Int)] = rdd1.map(line => {
      val s: Array[String] = line.split("\t")
      //元组输出
      (s(1), 1)
    })

    //4.累加求和
    val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_+_)

    //5.取出分组的学院
    val rdd4: RDD[(String, Int)] = rdd3.map(x => {
      val url = x._1
      val host: String = new URL(url).getHost.split("[.]")(0)
      //元组输出
      (host, x._2)
    })

    //6.根据学院分组
    val rdd5: RDD[(String, List[(String, Int)])] = rdd4.groupBy(_._1).mapValues(it => {
      //根据访问量排序 倒序
      it.toList.sortBy(_._2).reverse.take(1)
    })

    //7.把计算结果保存到mysql中
    rdd5.foreach(x => {
      //把数据写到mysql
      val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urlcount?charactorEncoding=utf-8","root", "root")
      //把spark结果插入到mysql中
      val sql = "INSERT INTO url_data (xueyuan,number_one) VALUES (?,?)"
      //执行sql
      val statement = conn.prepareStatement(sql)

      statement.setString(1, x._1)
      statement.setString(2, x._2.toString())
      statement.executeUpdate()
      statement.close()
      conn.close()
    })

    //8.关闭资源
    sc.stop()
  }
}

2、mysql创建数据库和表

CREATE DATABASE urlcount;
USE urlcount;

CREATE TABLE url_data(
    uid INT PRIMARY KEY AUTO_INCREMENT,
    xueyuan VARCHAR(50),
    number_one VARCHAR(200)
)

3、结果

二、Spark提供的连接mysql的方式--jdbcRDD

1、JdbcRDDDemo类

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark提供的连接mysql的方式
  * jdbcRDD
  */
object JdbcRDDDemo {
  def main(args: Array[String]): Unit = {
    //1.创建spark程序入口
    val conf: SparkConf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    //匿名函数
    val connection = () => {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      DriverManager.getConnection("jdbc:mysql://localhost:3306/urlcount?characterEncoding=utf-8","root", "root")
    }

    //查询数据
    val jdbcRdd: JdbcRDD[(Int, String, String)] = new JdbcRDD(
      //指定sparkContext
      sc,
      connection,
      "SELECT * FROM url_data where uid >= ? AND uid <= ?",
       //2个任务并行
      1, 4, 2,
      r => {
        val uid = r.getInt(1)
        val xueyuan = r.getString(2)
        val number_one = r.getString(3)
        (uid, xueyuan, number_one)
      }
    )

    val result: Array[(Int, String, String)] = jdbcRdd.collect()
    println(result.toBuffer)
    sc.stop()
  }
}

2、结果

原文地址:https://www.cnblogs.com/areyouready/p/10274953.html

时间: 2024-10-10 05:37:47

Spark与mysql整合的相关文章

Spring MVC+Mybatis+Maven+Velocity+Mysql整合实例

本篇文章将通过一个简单显示用户信息的实例整合Spring mvc+mybatis+Maven+velocity+mysql. 对于实现整合的重点在于以下几个配置文件的实现 1.Maven依赖包 2.spring配置文件(springContext-user.xml) 3.mybatis配置文件(MyBatis-User-Configuration.xml) 4.spring-mvc配置文件(spring-mvc.xml) 5.web.xml配置文件 源码下载地址:http://download.

使用Apache Spark 对 mysql 调优 查询速度提升10倍以上

在这篇文章中我们将讨论如何利用 Apache Spark 来提升 MySQL 的查询性能. 介绍 在我的前一篇文章Apache Spark with MySQL 中介绍了如何利用 Apache Spark 实现数据分析以及如何对大量存放于文本文件的数据进行转换和分析.瓦迪姆还做了一个基准测试用来比较 MySQL 和 Spark with Parquet 柱状格式 (使用空中交通性能数据) 二者的性能. 这个测试非常棒,但如果我们不希望将数据从 MySQL 移到其他的存储系统中,而是继续在已有的

日常开发系列——Maven+Spring+Spring MVC+MyBatis+MySQL整合SSM框架

进入公司开发已经3个多月了,项目用的是Maven+Spring+Spring MVC+MyBatis+MySQL,趁这个周末有空,仔细研读一下公司项目的基本框架,学习一下这个环境是怎么搭建起来的,经过自己的研究终于是成功地试验出来.自己亲手做的才算是自己学到的,决定将其记录下来,以便日后查询,源码同时也欢迎大家拍砖. 一.数据库的准备 这次整合试验想着做个简单的,就决定做一个普通的用户登陆,就一张表吧 我新建的数据库名字是test,然后新建了一张表 DROP TABLE IF EXISTS `u

记录一次spark连接mysql遇到的问题

在使用spark连接mysql的过程中报错了,错误如下 08:51:32.495 [main] ERROR - Error loading factory org.apache.calcite.jdbc.CalciteJdbc41Factory java.lang.NoClassDefFoundError: org/apache/calcite/linq4j/QueryProvider at java.lang.ClassLoader.defineClass1(Native Method) ~[

hadoop+spark+mongodb+mysql+c#

一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存档和通信系统(PACS).电子病历系统(EMR)和区域医疗卫生服务(GMIS)等成功实施与普及推广,而且随着日新月异的计算机技术和网络技术的革新,进一步为数字化医院带来新的交互渠道譬如:远程医疗服务,网上挂号预约. 随着IT技术的飞速发展,80%以上的三级医院都相继建立了自己的医院信息系统

大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统

一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存档和通信系统(PACS).电子病历系统(EMR)和区域医疗卫生服务(GMIS)等成功实施与普及推广,而且随着日新月异的计算机技术和网络技术的革新,进一步为数字化医院带来新的交互渠道譬如:远程医疗服务,网上挂号预约. 随着IT技术的飞速发展,80%以上的三级医院都相继建立了自己的医院信息系统

Spring+SpringMVC+Mybatis+Mysql整合实例【转】

本文要实现Spring+SpringMVC+Mybatis+Mysql的一个整合,实现了SpringMVC控制访问的页面,将得到的页面参数传递给Spring中的Mybatis的bean类,然后查找Mysql数据的功能,并通过JSP显示出来.建议可以先看笔者另一文章Mybatis与Spring整合创建Web项目 .笔者觉得整合过程中问题比较多的还是Spring+Mybatis的整合,SpringMVC的整合还是比较简单. Spring        Spring 是一个开源框架, Spring 是

spark往mysql里写东西

如果报错no suitable driver found for jdbc 是spark没有合适的jdbc连接驱动,需要下一个jdbc connector的jar包放到spark路径下并把次路径放到spark/conf/spark-env.sh的SPARK_CLASSPATH里 如:export SPARK_CLASSPATH=$SPARK_CLASSPATH:/root/spark/mysql-connector-java-5.1.35.jar 如果报ERROR 1130: Host ’x′

storm 与mysql整合问题

首先说明下问题的情况, 1.我storm 环境已经搭建完成,在本地测试wordcount是没问题的, 2.我在wordcount中加入一个MysqlBolt,此Bolt只是简单的把 wordcount的结果存入mysql数据库中,在本地模式测试测试时,完全可以把结果插入指定表. 3.我的每个storm 节点都已经把mysql-connector-java-5.1.23.jar 放到storm的lib目录下. 4.每个节点均可以访问指定数据库,都已经开通相应权限 5.并且在远程模式下执行原始wor