spark 分析sql内容再插入到sql表中

package cn.spark.study.core.mycode_dataFrame;

import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.sql.Connection;
import java.sql.Statement;

import scala.Tuple2;

public class JDBCDataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("JDBCDataSource")
.setMaster("local")
;
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String,String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://127.0.0.1:3306/testdb");
options.put("dbtable","student_infos");
//加载表信息
DataFrame studentInfosDF = sqlContext.read().format("jdbc")
.options(options).load();
options.put("dbtable", "student_scores");
DataFrame studentScoreDF = sqlContext.read().format("jdbc")
.options(options).load();
JavaPairRDD<String, Tuple2<Integer, Integer>> result_RDD = studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0),Integer.valueOf(String.valueOf(row.get(1))));
}
})
.join(studentScoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0),Integer.valueOf(String.valueOf(row.get(1))));
}
}));

// result_RDD.foreach(new VoidFunction<Tuple2<String,Tuple2<Integer,Integer>>>() {
//
// @Override
// public void call(Tuple2<String, Tuple2<Integer, Integer>> tuple)
// throws Exception {
// System.out.println(tuple._1 + ":" + tuple._2._1 + ":" + tuple._2._2);
// }
// });

JavaRDD<Row> result_RDD_ROW= result_RDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple)
throws Exception {
return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
}
});
List<StructField> list = new ArrayList<StructField>();
list.add(DataTypes.createStructField("name", DataTypes.StringType, true));
list.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
list.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
StructType st = DataTypes.createStructType(list);
DataFrame result_df = sqlContext.createDataFrame(result_RDD_ROW, st);
result_df.javaRDD().foreach(new VoidFunction<Row>() {
private static final long serialVersionUID = 1L;

@Override
public void call(Row row) throws Exception {
String sql = "insert into good_student_infos values("
+ "‘" + String.valueOf(row.getString(0)) + "‘,"
+ Integer.valueOf(String.valueOf(row.get(1))) + ","
+ Integer.valueOf(String.valueOf(row.get(2))) + ")";
System.out.println("sql:" + sql);
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
Statement stmt = null;
try {
conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/testdb","","");
stmt = conn.createStatement();
stmt.execute(sql);
} catch (Exception e) {
e.printStackTrace();
}finally
{
if(stmt != null)
{
stmt.close();
}
if(conn != null)
{
conn.close();
}
}
}
});
}
}

数据库准备

-- create table student_info(name VARCHAR(20),age INTEGER);
-- create table studnet_scores(name VARCHAR(20),score INTEGER);
-- insert into student_info values("leo",18),("marry",17),("jack",19)
-- insert into student_scores values("leo",88),("marry",99),("jack",60)
-- ALTER TABLE student_info RENAME to student_infos;
-- create table good_student_infos(name VARCHAR(20),age INTEGER,score INTEGER)

最终生成的表

时间: 2024-10-14 12:57:10

spark 分析sql内容再插入到sql表中的相关文章

ORACLE 查询一个数据表后通过遍历再插入另一个表中的两种写法

ORACLE 查询一个数据表后通过遍历再插入另一个表中的两种写法 语法 第一种: 通过使用Oracle语句块  --指定文档所有部门都能查看 declare cursor TABLE_DEPT is SELECT ID,UNAME from g_users where utype=2 and STATUS>-1; begin for c in TABLE_DEPT loop INSERT INTO G_KNOWDOCRIGHT(RID,DIRID,DOCID,USERID) VALUES(SYS

SQL语句:关于复制表结构和内容到另一张表中的SQL语句

1.复制新表结构及数据到新表create table 新表 select * from 旧表 2.只复制表结构到新表 create table 新表 select * from 旧表 where 1=2 3.复制旧表的数据到新表(假设两个表结构一样) insert into 新表 select * from 旧表 4.复制旧表的数据到新表(假设两个表结构不一样) insert into 新表(字段1,字段2,......) select 字段1, 字段2,... from 旧表 SQL语句:关于

sql语句,怎么查看一个表中的所有约束

sql语句,怎么查看一个表中的所有约束,比如,一个student表,有唯一,外键,主键,用sql语句怎么查看student表中的所有约束呢? select * from sysobjects where parent_obj in(select id from sysobjects where name='student')

C# 将DataTable表中的数据批量插入到数据库表中的方法

C#中有时候需要将内存中的数据批量插入到数据库表中,使用for循环进行批量插入不但耗时而且会频繁操作数据库. 针对数据量很少的可以使用for循环插入,但是针对于数据量大的则不推荐使用for循环插入,推荐使用sql的块处理插入. 块处理不但耗时少而且不会频繁对数据库进行操作,只是需要注意的一点是DataTable中的列必须与表的列完全一致. 如下代码是批量插入的一个函数,自测可用. 1 #region 使用SqlBulkCopy将DataTable中的数据批量插入数据库中 2 /// <summa

Mysql之将一张表内容导入另一张表中

类别一. 如果两张张表(导出表和目标表)的字段一致,并且希望插入全部数据,可以用这种方法: INSERT INTO 目标表 SELECT  * FROM 来源表 ; 例如,要将A 表插入到B 表中,则可以通过如下SQL语句实现: INSERT INTO A SELECT * FROM B ; 类别二. 如果只希望导入指定字段,可以用这种方法: INSERT INTO 目标表 (字段1, 字段2, ...) SELECT 字段1, 字段2, ...FROM 来源表 ; 请注意以上两表的字段类型必须

新建表需要原表的数据,mysql 如何把查询到的结果插入到新表中

1. 如果两张张表(导出表和目标表)的字段一致,并且希望插入全部数据,可以用这种方法: INSERT INTO  目标表  SELECT  * FROM  来源表 ; 例如,要将 articles 表插入到 newArticles 表中,则可以通过如下SQL语句实现: INSERT INTO  newArticles  SELECT  * FROM  articles ; 2. 如果只希望导入指定字段,可以用这种方法: INSERT INTO  目标表 (字段1, 字段2, ...)  SELE

c#用NPOI将excel文件内容读取到datatable数据表中

将excel文件内容读取到datatable数据表中,支持97-2003和2007两种版本的excel 1.第一种是根据excel文件路径读取excel并返回datatable 1 /// <summary> 2 /// 将excel文件内容读取到DataTable数据表中 3 /// </summary> 4 /// <param name="fileName">文件完整路径名</param> 5 /// <param name=

根据目录名将某个指定目录下的所有文件名以一定的规则插入到一个表中

最近项目遇到一个需求,就是把某个目录的所有文件的名称以一定的规则插入到一个表中,首先来看下badge表结构: 目录名称为'Picture/Badge/',文件的命名也是以一定的规则来命名的,例如: balldate_1_1_sl.png,match_1_10_1_1_gr.png,apply_1_50_sl.png,,,如果把balldate_1_1插入到 badge表,那么type=1,first_index=1,second_index=1,path='Picture/Badge/balld

sql实现从两个表获取字段组成表数据再插入到函数表中

实现此效果说起来比较难以说明,我这里还是先将实现的效果已图的形式展示一下吧. 这是两个表的设计.我想实现的效果举个例子,以查询secretaryCharge为例: 点击"市级",我将查询到市级一下所有"区级"的secretaryCharge数目,并通过organizations中的fullName来作为名字,查到的统计数目作为值.得到的效果便是如下图所示: 具体实现sql语句就是用到join on,sql语句具体如下: insert @result(fullName