Spark:将DataFrame写入Mysql

Spark将DataFrame进行一些列处理后,需要将之写入mysql,下面是实现过程

1.mysql的信息

mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

1 //配置文件示例:
2 [[email protected] tmp_lillcol]$ cat job.properties
3 #mysql数据库配置
4 mysql.driver=com.mysql.jdbc.Driver
5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
6 mysql.username=user
7 mysql.password=123456

2.需要的jar依赖(sbt版本,maven的对应修改即可)

 1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
 4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
 8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

3.完整实现代码

  1 import java.io.FileInputStream
  2 import java.sql.{Connection, DriverManager}
  3 import java.util.Properties
  4
  5 import org.apache.spark.sql.hive.HiveContext
  6 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
  7 import org.apache.spark.{SparkConf, SparkContext}
  8
  9 /**
 10   * @author Administrator
 11   *         2018/10/16-10:15
 12   *
 13   */
 14 object SaveDataFrameASMysql {
 15   var hdfsPath: String = ""
 16   var proPath: String = ""
 17   var DATE: String = ""
 18
 19   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
 20   val sc: SparkContext = new SparkContext(sparkConf)
 21   val sqlContext: SQLContext = new HiveContext(sc)
 22
 23   def main(args: Array[String]): Unit = {
 24     hdfsPath = args(0)
 25     proPath = args(1)
 26     //不过滤读取
 27     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 28     dim_sys_city_dict.show(10)
 29
 30     //保存mysql
 31     saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
 32   }
 33
 34   /**
 35     * 将DataFrame保存为Mysql表
 36     *
 37     * @param dataFrame 需要保存的dataFrame
 38     * @param tableName 保存的mysql 表名
 39     * @param saveMode  保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
 40     * @param proPath   配置文件的路径
 41     */
 42   def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
 43     var table = tableName
 44     val properties: Properties = getProPerties(proPath)
 45     val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库
 46     prop.setProperty("user", properties.getProperty("mysql.username"))
 47     prop.setProperty("password", properties.getProperty("mysql.password"))
 48     prop.setProperty("driver", properties.getProperty("mysql.driver"))
 49     prop.setProperty("url", properties.getProperty("mysql.url"))
 50     if (saveMode == SaveMode.Overwrite) {
 51       var conn: Connection = null
 52       try {
 53         conn = DriverManager.getConnection(
 54           prop.getProperty("url"),
 55           prop.getProperty("user"),
 56           prop.getProperty("password")
 57         )
 58         val stmt = conn.createStatement
 59         table = table.toUpperCase
 60         stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
 61         conn.close()
 62       }
 63       catch {
 64         case e: Exception =>
 65           println("MySQL Error:")
 66           e.printStackTrace()
 67       }
 68     }
 69     dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
 70   }
 71
 72   /**
 73     * 获取 Mysql 表的数据
 74     *
 75     * @param sqlContext
 76     * @param tableName 读取Mysql表的名字
 77     * @param proPath   配置文件的路径
 78     * @return 返回 Mysql 表的 DataFrame
 79     */
 80   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
 81     val properties: Properties = getProPerties(proPath)
 82     sqlContext
 83       .read
 84       .format("jdbc")
 85       .option("url", properties.getProperty("mysql.url"))
 86       .option("driver", properties.getProperty("mysql.driver"))
 87       .option("user", properties.getProperty("mysql.username"))
 88       .option("password", properties.getProperty("mysql.password"))
 89       //        .option("dbtable", tableName.toUpperCase)
 90       .option("dbtable", tableName)
 91       .load()
 92
 93   }
 94
 95   /**
 96     * 获取 Mysql 表的数据 添加过滤条件
 97     *
 98     * @param sqlContext
 99     * @param table           读取Mysql表的名字
100     * @param filterCondition 过滤条件
101     * @param proPath         配置文件的路径
102     * @return 返回 Mysql 表的 DataFrame
103     */
104   def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
105     val properties: Properties = getProPerties(proPath)
106     var tableName = ""
107     tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
108     sqlContext
109       .read
110       .format("jdbc")
111       .option("url", properties.getProperty("mysql.url"))
112       .option("driver", properties.getProperty("mysql.driver"))
113       .option("user", properties.getProperty("mysql.username"))
114       .option("password", properties.getProperty("mysql.password"))
115       .option("dbtable", tableName)
116       .load()
117   }
118
119   /**
120     * 获取配置文件
121     *
122     * @param proPath
123     * @return
124     */
125   def getProPerties(proPath: String) = {
126     val properties: Properties = new Properties()
127     properties.load(new FileInputStream(proPath))
128     properties
129   }
130 }

4.测试

 1 def main(args: Array[String]): Unit = {
 2 hdfsPath = args(0)
 3 proPath = args(1)
 4 //不过滤读取
 5 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 6 dim_sys_city_dict.show(10)
 7
 8 //保存mysql
 9 saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
10 }

5.运行结果数据敏感进行过处理

 1 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
 2 |dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
 3 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
 4 |     1|    249|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 21|    张三公司|
 5 |     2|    240|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 21|    张三公司|
 6 |     3|    240|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 21|    张三公司|
 7 |     4|    242|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 01|    张三公司|
 8 |     5|    246|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 01|    张三公司|
 9 |     6|    246|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 01|    张三公司|
10 |     7|    248|       **|    **_ab|     200|      **按时|    **-查到|xcaasd...| 01|    张三公司|
11 |     8|    242|       **|    **_ab|     400|      **按时|    **-查到|xcaasd...| 01|    张三公司|
12 |     9|    247|       **|    **_ab|     200|      **按时|    **-查到|xcaasd...| 01|    张三公司|
13 |     0|    243|       **|    **_ab|     400|      **按时|    **-查到|xcaasd...| 01|    张三公司|
14 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
15
16 mysql> desc TestMysqlTble1;
17 +-------------+-------------+------+-----+---------+-------+
18 | Field       | Type        | Null | Key | Default | Extra |
19 +-------------+-------------+------+-----+---------+-------+
20 | dict_id     | varchar(32) | YES  |     | NULL    |       |
21 | city_id     | varchar(32) | YES  |     | NULL    |       |
22 | city_name   | varchar(32) | YES  |     | NULL    |       |
23 | city_code   | varchar(32) | YES  |     | NULL    |       |
24 | group_id    | varchar(32) | YES  |     | NULL    |       |
25 | group_name  | varchar(32) | YES  |     | NULL    |       |
26 | area_code   | varchar(32) | YES  |     | NULL    |       |
27 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
28 | sort        | varchar(32) | YES  |     | NULL    |       |
29 | bureau_name | varchar(32) | YES  |     | NULL    |       |
30 +-------------+-------------+------+-----+---------+-------+
31 10 rows in set (0.00 sec)
32
33 mysql> desc TestMysqlTble2;
34 +-------------+------+------+-----+---------+-------+
35 | Field       | Type | Null | Key | Default | Extra |
36 +-------------+------+------+-----+---------+-------+
37 | dict_id     | text | YES  |     | NULL    |       |
38 | city_id     | text | YES  |     | NULL    |       |
39 | city_name   | text | YES  |     | NULL    |       |
40 | city_code   | text | YES  |     | NULL    |       |
41 | group_id    | text | YES  |     | NULL    |       |
42 | group_name  | text | YES  |     | NULL    |       |
43 | area_code   | text | YES  |     | NULL    |       |
44 | bureau_id   | text | YES  |     | NULL    |       |
45 | sort        | text | YES  |     | NULL    |       |
46 | bureau_name | text | YES  |     | NULL    |       |
47 +-------------+------+------+-----+---------+-------+
48 10 rows in set (0.00 sec)
49
50
51 mysql> select count(1) from TestMysqlTble1;
52 +----------+
53 | count(1) |
54 +----------+
55 |       21 |
56 +----------+
57 1 row in set (0.00 sec)
58
59 mysql> select count(1) from TestMysqlTble2;
60 +----------+
61 | count(1) |
62 +----------+
63 |       21 |
64 +----------+
65 1 row in set (0.00 sec)

6.效率问题

一开始直接这么用的时候小数据还没什么,但是数据量大一点的时候速度就不行了,于是想方设法的想优化一下,用了几个手段效果不明显,然后进去看源代码,发现了两个关键的片段

 1  /**
 2    * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
 3    * table already exists in the external database, behavior of this function depends on the
 4    * save mode, specified by the `mode` function (default to throwing an exception).
 5    *
 6    * Don‘t create too many partitions in parallel on a large cluster; otherwise Spark might crash
 7    * your external database systems.
 8    *
 9    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
10    * @param table Name of the table in the external database.
11    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
12    *                             tag/value. Normally at least a "user" and "password" property
13    *                             should be included.
14    *
15    * @since 1.4.0
16    */
17   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
18     val props = new Properties()
19     extraOptions.foreach { case (key, value) =>
20       props.put(key, value)
21     }
22     // connectionProperties should override settings in extraOptions
23     props.putAll(connectionProperties)
24     val conn = JdbcUtils.createConnectionFactory(url, props)()
25
26     try {
27       var tableExists = JdbcUtils.tableExists(conn, url, table)
28
29       if (mode == SaveMode.Ignore && tableExists) {
30         return
31       }
32
33       if (mode == SaveMode.ErrorIfExists && tableExists) {
34         sys.error(s"Table $table already exists.")
35       }
36
37       if (mode == SaveMode.Overwrite && tableExists) {
38         JdbcUtils.dropTable(conn, table)
39         tableExists = false
40       }
41
42       // Create the table if the table didn‘t exist.
43       if (!tableExists) {
44         val schema = JdbcUtils.schemaString(df, url)
45         val sql = s"CREATE TABLE $table ($schema)"
46         val statement = conn.createStatement
47         try {
48           statement.executeUpdate(sql)
49         } finally {
50           statement.close()
51         }
52       }
53     } finally {
54       conn.close()
55     }
56
57     JdbcUtils.saveTable(df, url, table, props)//-----------------------------关键点1
58   }
59
60
61   /**
62    * Saves the RDD to the database in a single transaction.
63    */
64   def saveTable(
65       df: DataFrame,
66       url: String,
67       table: String,
68       properties: Properties) {
69     val dialect = JdbcDialects.get(url)
70     val nullTypes: Array[Int] = df.schema.fields.map { field =>
71       getJdbcType(field.dataType, dialect).jdbcNullType
72     }
73
74     val rddSchema = df.schema
75     val getConnection: () => Connection = createConnectionFactory(url, properties)
76     val batchSize = properties.getProperty("batchsize", "1000").toInt
77     df.foreachPartition { iterator => //------------------------------------关键点2
78       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
79     }
80   }

也就是说,自带的方法就是按照分区来存的,每一个分区开启一个mysql连接,所以最简单的优化方式就是在保存之前对DataFrame进行重新分区,注意数据倾斜问题,不然可能效率没有提升。
当然目前测试过最快的就是文件拿下来直接通过load data的命令导入mysql,但是这个比较麻烦。

下面是分区示例

 1 def main(args: Array[String]): Unit = {
 2     hdfsPath = args(0)
 3     proPath = args(1)
 4     //不过滤读取
 5     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 6     dim_sys_city_dict.show(10)
 7
 8     //保存mysql
 9     saveASMysqlTable(dim_sys_city_dict.repartition(10), "TestMysqlTble2", SaveMode.Append, proPath)
10   }

7.总结

将DataFrame写入mysql有几点需要注意的地方:

  • 需要保存的表最好事先建好,否则字段类型会使用默认的,Text类型实在是耗资源,对比前后两张表,下面分别为源表TestMysqlTble1和DataFrame保存的mysql表TestMysqlTble2
 1 mysql> desc TestMysqlTble1;
 2 +-------------+-------------+------+-----+---------+-------+
 3 | Field       | Type        | Null | Key | Default | Extra |
 4 +-------------+-------------+------+-----+---------+-------+
 5 | dict_id     | varchar(32) | YES  |     | NULL    |       |
 6 | city_id     | varchar(32) | YES  |     | NULL    |       |
 7 | city_name   | varchar(32) | YES  |     | NULL    |       |
 8 | city_code   | varchar(32) | YES  |     | NULL    |       |
 9 | group_id    | varchar(32) | YES  |     | NULL    |       |
10 | group_name  | varchar(32) | YES  |     | NULL    |       |
11 | area_code   | varchar(32) | YES  |     | NULL    |       |
12 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
13 | sort        | varchar(32) | YES  |     | NULL    |       |
14 | bureau_name | varchar(32) | YES  |     | NULL    |       |
15 +-------------+-------------+------+-----+---------+-------+
16 10 rows in set (0.00 sec)
17
18 mysql> desc TestMysqlTble2;
19 +-------------+------+------+-----+---------+-------+
20 | Field       | Type | Null | Key | Default | Extra |
21 +-------------+------+------+-----+---------+-------+
22 | dict_id     | text | YES  |     | NULL    |       |
23 | city_id     | text | YES  |     | NULL    |       |
24 | city_name   | text | YES  |     | NULL    |       |
25 | city_code   | text | YES  |     | NULL    |       |
26 | group_id    | text | YES  |     | NULL    |       |
27 | group_name  | text | YES  |     | NULL    |       |
28 | area_code   | text | YES  |     | NULL    |       |
29 | bureau_id   | text | YES  |     | NULL    |       |
30 | sort        | text | YES  |     | NULL    |       |
31 | bureau_name | text | YES  |     | NULL    |       |
32 +-------------+------+------+-----+---------+-------+
33 10 rows in set (0.00 sec)
  • 关于 SaveMode.Overwrite
 1 def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
 2     val props = new Properties()
 3     extraOptions.foreach { case (key, value) =>
 4       props.put(key, value)
 5     }
 6     // connectionProperties should override settings in extraOptions
 7     props.putAll(connectionProperties)
 8     val conn = JdbcUtils.createConnectionFactory(url, props)()
 9
10     try {
11       var tableExists = JdbcUtils.tableExists(conn, url, table)
12
13       if (mode == SaveMode.Ignore && tableExists) {
14         return
15       }
16
17       if (mode == SaveMode.ErrorIfExists && tableExists) {
18         sys.error(s"Table $table already exists.")
19       }
20
21       if (mode == SaveMode.Overwrite && tableExists) {
22         JdbcUtils.dropTable(conn, table)//----------------------------------------关键点1
23         tableExists = false
24       }
25
26       // Create the table if the table didn‘t exist.
27       if (!tableExists) {
28         val schema = JdbcUtils.schemaString(df, url)
29         val sql = s"CREATE TABLE $table ($schema)"
30         val statement = conn.createStatement
31         try {
32           statement.executeUpdate(sql)
33         } finally {
34           statement.close()
35         }
36       }
37     } finally {
38       conn.close()
39     }
40
41     JdbcUtils.saveTable(df, url, table, props)
42   }
43
44  /**
45    * Drops a table from the JDBC database.
46    */
47   def dropTable(conn: Connection, table: String): Unit = {
48     val statement = conn.createStatement
49     try {
50       statement.executeUpdate(s"DROP TABLE $table")//-------------------------------------关键点2
51     } finally {
52       statement.close()
53     }
54   }

从上述两段关键代码可以看到,在写入的时候会先判断表存不存在,SaveMode.Overwrite 的时候会执行 dropTable(conn: Connection, table: String)把原来的表删除掉,这也意味着你会失去你的表结构,新建的表会出现上一个问题都用默认类型,所以在保存的方法中我添加了下面的操作

 1 if (saveMode == SaveMode.Overwrite) {
 2  51       var conn: Connection = null
 3  52       try {
 4  53         conn = DriverManager.getConnection(
 5  54           prop.getProperty("url"),
 6  55           prop.getProperty("user"),
 7  56           prop.getProperty("password")
 8  57         )
 9  58         val stmt = conn.createStatement
10  59         table = table.toUpperCase
11  60         stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
12  61         conn.close()
13  62       }
14  63       catch {
15  64         case e: Exception =>
16  65           println("MySQL Error:")
17  66           e.printStackTrace()
18  67       }
truncate仅仅是删除数据,并不删除结构。

如果表一开始不存在

如果一开始不存在需要分两种情况:

1.非SaveMode.Overwrite模式

没有问题,会直接建表,用默认的数据类型

2.SaveMode.Overwrite模式

会报错,下面是在没有TestMysqlTble2的情况下使用SaveMode.Overwrite

 1 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table ‘iptv.TESTMYSQLTBLE2‘ doesn‘t exist
 2         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 3         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 4         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 5         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 6         at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
 7         at com.mysql.jdbc.Util.getInstance(Util.java:387)
 8         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
 9         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
10         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
11         at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
12         at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
13         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
14         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2505)
15         at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
16         at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
17         at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
18         at com.iptv.job.basedata.SaveDataFrameASMysql$.main(SaveDataFrameASMysql.scala:33)
19         at com.iptv.job.basedata.SaveDataFrameASMysql.main(SaveDataFrameASMysql.scala)
20         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
21         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
22         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
23         at java.lang.reflect.Method.invoke(Method.java:498)
24         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
25         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
26         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
27         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
28         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

报错详情

1 at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
2 生面报错位置对应的代码为
3 stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
4 即truncate需要表存在

至此,DataFrame写mysql功能实现

文章为个人工作总结,转载请注明出处!!!!!!!

原文地址:https://www.cnblogs.com/lillcol/p/9796935.html

时间: 2024-08-03 05:24:55

Spark:将DataFrame写入Mysql的相关文章

Spark操作dataFrame进行写入mysql,自定义sql的方式

业务场景: 现在项目中需要通过对spark对原始数据进行计算,然后将计算结果写入到mysql中,但是在写入的时候有个限制: 1.mysql中的目标表事先已经存在,并且当中存在主键,自增长的键id 2.在进行将dataFrame写入表的时候,id字段不允许手动写入,因为其实自增长的 要求: 1.写入数据库的时候,需要指定字段写入,也就是说,只指定部分字段写入 2.在写入数据库的时候,对于操作主键相同的记录要实现更新操作,非插入操作 分析: spark本身提供了对dataframe的写入数据库的操作

将pandas的DataFrame数据写入MySQL数据库 + sqlalchemy

将pandas的DataFrame数据写入MySQL数据库 + sqlalchemy [python] view plain copy print? import pandas as pd from sqlalchemy import create_engine ##将数据写入mysql的数据库,但需要先通过sqlalchemy.create_engine建立连接,且字符编码设置为utf8,否则有些latin字符不能处理 yconnect = create_engine('mysql+mysql

Spark DataFrame写入HBase的常用方式

Spark是目前最流行的分布式计算框架,而HBase则是在HDFS之上的列式分布式存储引擎,基于Spark做离线或者实时计算,数据结果保存在HBase中是目前很流行的做法.例如用户画像.单品画像.推荐系统等都可以用HBase作为存储媒介,供客户端使用. 因此Spark如何向HBase中写数据就成为很重要的一个环节了.本文将会介绍三种写入的方式,其中一种还在期待中,暂且官网即可... 代码在spark 2.2.0版本亲测 1. 基于HBase API批量写入 第一种是最简单的使用方式了,就是基于R

Spark读HBase写MySQL

1 Spark读HBase Spark读HBase黑名单数据,过滤出当日新增userid,并与mysql黑名单表内userid去重后,写入mysql. def main(args: Array[String]): Unit = { @volatile var broadcastMysqlUserids: Broadcast[Array[String]] = null val today = args(0) val sourceHBaseTable = PropertiesUtil.getProp

python执行shell获取硬件参数写入mysql

最近要获取服务器各种参数,包括cpu.内存.磁盘.型号等信息.试用了Hyperic HQ.Nagios和Snmp,它们功能都挺强大的,但是于需求不是太符,亦或者太heavy. 于是乎想到用python执行shell获取这些信息,python执行shell脚本有以下三种方法: 1. os.system() os.system('ls')#返回结果0或者1,不能得到命令的输出 2. os.popen() output = os.popen('ls') print output.read()#打印出的

第一篇博客(python爬取小故事网并写入mysql)

前言: 这是一篇来自整理EVERNOTE的笔记所产生的小博客,实现功能主要为用广度优先算法爬取小故事网,爬满100个链接并写入mysql,虽然CS作为双学位已经修习了三年多了,但不仅理论知识一般,动手能力也很差,在学习的空余时间前前后后DEBUG了很多次,下面给出源代码及所遇到的BUG. 本博客参照代码及PROJECT来源:http://kexue.fm/archives/4385/ 源代码: 1 import requests as rq 2 import re 3 import codecs

c#写入Mysql中文显示乱码 解决方法 z

mysql字符集utf8,c#写入中文后,全部显示成?,一个汉字对应一个? 解决方法:在数据库连接字符串中增加字符集的说明,Charset=utf8,如 MySQLConnection con = new MySQLConnection("server=127.0.0.1;uid=root;pwd=;database=test;Charset=utf8"); 搞定 c#写入Mysql中文显示乱码 解决方法 z,布布扣,bubuko.com

PHP如何通过SQL语句将数据写入MySQL数据库呢?

1,php和MySQL建立连接关系 2,打开 3,接受页面数据,PHP录入到指定的表中 1.2两步可直接使用一个数据库链接文件即可:conn.php <?phpmysql_connect("localhost","root","");//连接MySQLmysql_select_db("hello");//选择数据库?> 当然,前提是已经安装WEB服务器.PHP和MySQL,并且建立MySQL表“webjx” mys

php从memcache读取数据再批量写入mysql的方法

这篇文章主要介绍了php从memcache读取数据再批量写入mysql的方法,可利用memcache缓解服务器读写压力,并实现数据库数据的写入操作,非常具有实用价值,需要的朋友可以参考下. 用 Memcache 可以缓解 php和数据库压力下面代码是解决高负载下数据库写入瓶颈问题,遇到最实用的:写入ip pv uv的时候,用户达到每分钟几万访问量,要记录这些数据,实时写入数据库必定奔溃. 用以下技术就能解决,还有如用户注册,同一时间断内,大量用户注册,可以缓存后一次性写入到数据库,代码如下 pu