Spark读HBase写MySQL

1 Spark读HBase

Spark读HBase黑名单数据,过滤出当日新增userid,并与mysql黑名单表内userid去重后,写入mysql。

def main(args: Array[String]): Unit = {
  @volatile var broadcastMysqlUserids: Broadcast[Array[String]] = null

  val today = args(0)
  val sourceHBaseTable = PropertiesUtil.getProperty("anticheat.blacklist.hbase.tbale")
  val sinkMysqlTable = PropertiesUtil.getProperty("anticheat.blacklist.mysql.dbtable")
  val zookeeper = PropertiesUtil.getProperty("anticheat.blacklist.zookeeper.quorum")
  val zkport = PropertiesUtil.getProperty("anticheat.blacklist.zookeeper.port")
  val znode = PropertiesUtil.getProperty("anticheat.blacklist.zookeeper.znode")

  //创建SparkSession
  val sparkconf = new SparkConf().setAppName("").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  val sc = new SparkContext(sparkconf)
  val spark = AnticheatUtil.SparkSessionSingleton.getInstance(sc.getConf)

  //配置hbase参数
  val conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", zookeeper)
  conf.set("hbase.zookeeper.property.clientPort", zkport)
  conf.set("zookeeper.znode.parent", znode)
  conf.set(TableInputFormat.INPUT_TABLE, sourceHBaseTable)

  // 从数据源获取数据
  val hbaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

  //读取mysql表,并将mysql表中的userid广播出去,用于去重
  broadcastMysqlUserids = get_mysql_user_blacklist(spark,sinkMysqlTable)

  //获取当日新增userid数据组装成与mysql表结构一致的对象rdd
  val records_userid_rdd = get_new_blacklist_rdd(hbaseRDD,today,broadcastMysqlUserids)

  //将当日新增userid数据存入mysql
  save_blacklist_to_mysql(records_userid_rdd,today,spark,sinkMysqlTable)
}

2 Spark读MySQL表广播出去

/**
  * Spark读Mysql用户黑名单表,将黑名单中所有userid赋予广播变量
  * @param spark
  * @return
  */
def get_mysql_user_blacklist(spark: SparkSession,table :String) :Broadcast[Array[String]] = {
  @volatile var broadcastMysqlUserids: Broadcast[Array[String]] = null
  val url = PropertiesUtil.getProperty("anticheat.blacklist.mysql.url")
  val user = PropertiesUtil.getProperty("anticheat.blacklist.mysql.user")
  val password = PropertiesUtil.getProperty("anticheat.blacklist.mysql.password")

  import spark.implicits._
  val mysql_userids_rdd = spark.sqlContext.read
    .format("jdbc")
    .option("url",url)
    .option("dbtable",table)
    .option("user",user)
    .option("password",password)
    .load()
    .map(record => {
     val userid = record.getString(0)
     userid
  })

  if(broadcastMysqlUserids !=null){
    broadcastMysqlUserids.unpersist()
  }
  broadcastMysqlUserids = spark.sparkContext.broadcast(mysql_userids_rdd.collect())
  println(s"broadcastMysqlUserids.size= ${broadcastMysqlUserids.value.size}")
  broadcastMysqlUserids
}

3 构建黑名单数据对象rdd

/**
  * 构建新增userid数据写入mysql
  * @param hbaseRDD
  * @param today
  * @return
  */
def get_new_blacklist_rdd(hbaseRDD: RDD[(ImmutableBytesWritable, Result)],today: String,broadcastMysqlUserids: Broadcast[Array[String]]): RDD[BlackList] = {

  val records_userid_rdd : RDD[BlackList] = hbaseRDD.filter(line =>{
    //过滤出当日新增userid
    var flag = false  //默认非当日新增
    val userid = Bytes.toString(line._2.getRow)
    val dt = Bytes.toString(line._2.getValue(Bytes.toBytes("user"), Bytes.toBytes("dt")))
    val did_dt = Bytes.toString(line._2.getValue(Bytes.toBytes("user"), Bytes.toBytes("did_dt")))

    /* 判断为当日新增userid同时需满足三个条件:
    1. 用户维度加入时间dt=today
    2. 或者用户维度加入时间dt=null 且设备维度加入时间did_dt=today
    3. 并且不在mysql黑名单表中
     */
    if(today.equals(dt) || (dt==null && today.equals(did_dt))){
      //broadcastMysqlUserids.value.search(userid).isInstanceOf[InsertionPoint]调用scala 二分查找函数,注意此函数找到返回false
      if(broadcastMysqlUserids.value.search(userid).isInstanceOf[InsertionPoint]){
        //以上三个条件全满足,表示为当日新增,flag 赋值为 true
        flag = true
      }
    }
    flag
  }).map(record =>{
    //获取新增用户userid,加入黑名单时间设为today,其余字段设为默认值
    val userid = Bytes.toString(record._2.getRow)
    val day = Integer.parseInt(today)
    BlackList(userid,day,null,0,"system")
  })
  records_userid_rdd
}

case class BlackList(userid: String, dt: Int, update_time: Timestamp,delete_flag: Int,operator : String)

4 Spark写MySQL

/**
  * 将userid黑名单数据写入mysql
  * @param blacklist_rdd
  * @param today
  * @param spark
  */
def save_blacklist_to_mysql(blacklist_rdd: RDD[BlackList],today: String,spark: SparkSession,table :String): Unit ={
  val url = PropertiesUtil.getProperty("anticheat.blacklist.mysql.url")
  val user = PropertiesUtil.getProperty("anticheat.blacklist.mysql.user")
  val password = PropertiesUtil.getProperty("anticheat.blacklist.mysql.password")

  import spark.implicits._
  val records_userid_dataset = blacklist_rdd.toDS()
  records_userid_dataset.write
    .format("jdbc")
    .option("url",url)
    .option("dbtable",table)
    .option("user",user)
    .option("password",password)
    .mode(SaveMode.Append)
    .save()
}

5 注意问题

数据存入Mysql注意事项
尽量先设置好存储模式
默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:
SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

原文地址:https://www.cnblogs.com/xiaodf/p/10710831.html

时间: 2024-11-05 18:35:56

Spark读HBase写MySQL的相关文章

Spark(四): Spark-sql 读hbase

SparkSQL是指整合了Hive的spark-sql cli, 本质上就是通过Hive访问HBase表,具体就是通过hive-hbase-handler, 具体配置参见:Hive(五):hive与hbase整合 目录: SparkSql 访问 hbase配置 测试验证 SparkSql 访问 hbase配置:  拷贝HBase的相关jar包到Spark节点上的$SPARK_HOME/lib目录下,清单如下: guava-14.0.1.jar htrace-core-3.1.0-incubati

Mysql 读与写函数利用学习

语句简单记忆:         select load_file();         select '一句话' into outfile '网站路径' ; 1.Mysql 读与写函数 (1) 读取函数 load_file() (2) 写入函数 into outfile '' into dumpfile '' 2.Mysql读函数使用 (1)读配置文件语句 select load_file('/etc/httpd/conf/httpd.conf') select load_file('/etc/

hbase内存规划(读多写少型和写多读少型)

//简单说来主要包括读多写少型和写多读少型),内存的相关配置却完全不同. 1.针对不同应用场景,对多种工作模式下的参数进行详细说明,并结合相关示例对集群规划中最核心模块-内存规划进行介绍.2.HBase中内存规划直接涉及读缓存BlockCache.写缓存MemStore,影响系统内存利用率.IO利用率等资源以及读写性能等,重要性不言而喻.3.主要配置也是针对BlockCache和MemStore进行,4.然而针对不同业务类型(简单说来主要包括读多写少型和写多读少型),内存的相关配置却完全不同.5

Spark学习笔记——读写MySQL

1.使用Spark读取MySQL中某个表中的信息 build.sbt文件 name := "spark-hbase" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", "mysql" %

Spark:将DataFrame写入Mysql

Spark将DataFrame进行一些列处理后,需要将之写入mysql,下面是实现过程 1.mysql的信息 mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加. 1 //配置文件示例: 2 [[email protected] tmp_lillcol]$ cat job.properties 3 #mysql数据库配置 4 mysql.driver=com.mysql.jdbc.Driver 5 mysql.url=jdbc:mysql://127.0.0.1:3306/data

Hadoop、Spark、HBase与Redis的适用性讨论(全文)

最近在网上又看到有关于Hadoop适用性的讨论[1].想想今年大数据技术开始由互联网巨头走向中小互联网和传统行业,估计不少人都在考虑各种"纷繁复杂"的大数据技术的适用性的问题.这儿我就结合我这几年在Hadoop等大数据方向的工作经验,与大家讨论一下Hadoop.Spark.HBase及Redis等几个主流大数据技术的使用场景(首先声明一点,本文中所指的Hadoop,是很"狭义"的Hadoop,即在HDFS上直接跑MapReduce的技术,下同). 我这几年实际研究和

Hbase写数据,存数据,读数据的详细过程

转自:http://www.aboutyun.com/thread-10886-1-1.html 附HBase 0.94之后Split策略: http://www.aboutyun.com/thread-11211-1-1.html 1.Client写入需要哪些过程?2.Hbase是如何读取数据的? Client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 出发Compact合并操作 -> 多个Sto

让CodeIgniter支持数据库读、写分离

前言 CodeIgniter默认是不支持读.写分离的,网上流传的一般做法是在CI_Model层修改,但这有几个问题: 首先使用CodeIgniter的用户都是用过之后才发现不支持的,然后要修改大量的旧代码,产生的影响较多: 其次,在Model层修改,如果有代码在Controller操作数据库,将不能支持读.写分离(虽然在Controller直接操作数据库不是好方法): 最后,在CI_Model层的修改都要让用户去使用不同的数据库实例,如写用$this->write_db->query(),读用

WPF程序中App.Config文件的读与写

原文:WPF程序中App.Config文件的读与写 WPF程序中的App.Config文件是我们应用程序中经常使用的一种配置文件,System.Configuration.dll文件中提供了大量的读写的配置,所以它是一种高效的程序配置方式,那么今天我就这个部分来做一次系统性的总结. App.Config文件是系统默认的应用程序配置文件,在我们使用后进行编译时会生成"程序集名称+.exe.config"文件,其本质上也是一个XML文件,在我们的应用程序中添加应用程序配置文件后,默认生成下