spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题

首先我们使用新的API方法连接mysql加载数据 创建DF

import org.apache.spark.sql.DataFrame
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{SaveMode, DataFrame}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.hive.HiveContext
import java.sql.DriverManager
import java.sql.Connection
val sqlContext = new HiveContext(sc)
val mySQLUrl = "jdbc:mysql://10.180.211.100:3306/appcocdb?user=appcoc&password=Asia123"

val CI_MDA_SYS_TABLE = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()

val CI_MDA_SYS_TABLE_COLUMN = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

val CI_LABEL_EXT_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_EXT_INFO").cache()

val CI_LABEL_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_INFO").cache()

val CI_APPROVE_STATUS = sqlContext.jdbc(mySQLUrl,"CI_APPROVE_STATUS").cache()

val DIM_COC_LABEL_COUNT_RULES = sqlContext.jdbc(mySQLUrl,"DIM_COC_LABEL_COUNT_RULES").cache()

根据多表ID进行关联

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner").cache()
labels.join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner").cache()
labels.join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner").cache()
labels.join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner").cache()
labels.filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === 107 and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2) and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull) and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === 1).cache()

于是噼里啪啦的报错了,在第三个join时找不到ID了,这个问题很诡异。。。:

无奈了。。于是使用官网API spark1.4的指定方法尝试

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,"TABLE_ID")
labels.join(CI_LABEL_EXT_INFO,"COLUMN_ID")
labels.join(CI_LABEL_INFO,"LABEL_ID")
labels.join(CI_APPROVE_STATUS).WHERE($"LABEL_ID"===$"RESOURCE_ID")

于是又噼里啪啦的,还是找不到ID。。。。

最后无奈。。就用原来的方法 创建软连接,加载数据,发现可以。。这我就不明白了。。。

val CI_MDA_SYS_TABLE_DDL = s"""
             CREATE TEMPORARY TABLE CI_MDA_SYS_TABLE
             USING org.apache.spark.sql.jdbc
             OPTIONS (
               url    ‘${mySQLUrl}‘,
               dbtable     ‘CI_MDA_SYS_TABLE‘
             )""".stripMargin

     sqlContext.sql(CI_MDA_SYS_TABLE_DDL)
     val CI_MDA_SYS_TABLE = sql("SELECT * FROM CI_MDA_SYS_TABLE").cache()
    //val CI_MDA_SYS_TABLE  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()

    val CI_MDA_SYS_TABLE_COLUMN_DDL = s"""
            CREATE TEMPORARY TABLE CI_MDA_SYS_TABLE_COLUMN
            USING org.apache.spark.sql.jdbc
            OPTIONS (
              url    ‘${mySQLUrl}‘,
              dbtable     ‘CI_MDA_SYS_TABLE_COLUMN‘
            )""".stripMargin

    sqlContext.sql(CI_MDA_SYS_TABLE_COLUMN_DDL)
    val CI_MDA_SYS_TABLE_COLUMN = sql("SELECT * FROM CI_MDA_SYS_TABLE_COLUMN").cache()
    //val CI_MDA_SYS_TABLE_COLUMN  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

.........

最终问题是解决了。。可是 为什么直接加载不行呢。。还有待考究。

附带一个问题的解决 如果啊报这种错误

15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp9:49897 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp5:45978 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.176.238.11:38968 in memory (size: 8.2 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on cbg6aocdp4:55199 in memory (size: 8.2 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO ContextCleaner: Cleaned shuffle 0
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.176.238.11:38968 in memory (size: 6.5 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on cbg6aocdp8:55706 in memory (size: 6.5 KB, free: 1060.3 MB)
TARGET_TABLE_CODE:========================IT03
Exception in thread "main" java.lang.RuntimeException: Error in configuring object
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
        at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
        at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:218)
        at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:137)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at main.asiainfo.coc.impl.IndexMakerObj$.makeIndexsAndLabels(IndexMakerObj.scala:137)
        at main.asiainfo.coc.CocDss$.main(CocDss.scala:23)
        at main.asiainfo.coc.CocDss.main(CocDss.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        ... 71 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 76 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
        ... 78 more

一看最后就知道 是hadoop数据压缩格式为lzo spark要想读取 必须引入hadoop lzo的jar包

时间: 2024-10-20 08:37:17

spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题的相关文章

hive加载json数据解决方案

hive官方并不支持json格式的数据加载,默认支持csv格式文件加载,如何在不依赖外部jar包的情况下实现json数据格式解析,本编博客着重介绍此问题解决方案 首先创建元数据表: create EXTERNAL table access_log (content string) row format delimited fields terminated by '\t' STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInpu

SQLLoader8(加载的数据中有换行符处理方法)

SQLLDR加载的数据中有换行符处理方法1.创建测试表: CREATE TABLE MANAGER( MGRNO NUMBER, MNAME VARCHAR2(30), JOB VARCHAR2(30), REMARK VARCHAR2(1000) ); 2.创建控制文件我们可以通过控制文件,在数据加载前处理remark列的数据,将用户指定的"\n"字符替换为chr(10),即标准换行符,创建控制文件如下: LOAD DATA INFILE 'D:\testSqlLoader\ldr_

ajax验证表单元素规范正确与否 ajax展示加载数据库数据 ajax三级联动

一.ajax验证表单元素规范正确与否 以用ajax来验证用户名是否被占用为例 1创建表单元素<input type="text" id="t"> 2在js中用keyup事件来进行操作 3创建ajax格式和内容:格式: $.ajax({ url:"哪一个服务端处理器", data:{"自己起名",所需要传给处理器的数据}, type:"post", dataType:"json"

iOS:详解MJRefresh刷新加载更多数据的第三方库

原文链接:http://www.ios122.com/2015/08/mjrefresh/ 简介 MJRefresh这个第三方库是李明杰老师的杰作,这个框架帮助我们程序员减轻了超级多的麻烦,节约了开发时间,提高了开发效率.由于目前能力有限,尚不能自己写一套框架,所以就先膜拜和看明白大牛的框架了. 用于为应用添加常用的上拉加载更多与下拉刷新效果,适用 UIScrollView . UITableView . UICollectionView . UIWebView. gtihub上的地址:http

虚拟列表控件---加载大数据行

虚拟列表控件---加载大数据行 平常所用到的列ListView/ListCtrl控件,都是只有行至几百行数据,直至今日,在项目中遇到了上10W量级数据条,终于感觉到普通加载的艰辛,遂到网上乱找一通,发现大同小异,转载了这篇比较详细的,后面代码所用到的m_Items,为存放的列表的数据结构列表, 这篇文章虽详尽,改日做一个DEMO, Demo 一.什么是虚拟列表控件 虚拟列表控件是指带有LVS_OWNERDATA风格的列表控件.. 二.为什么使用虚拟列表控件 我们知道,通常使用列表控件CListC

数字图像处理 CImage类的使用与封装(jpg png gif tif bmp等格式图像的加载、数据读写、保存等功能)

引入CImage类的原因 原有的CBitmap 类只能处理BMP格式的图片,非常受限.而CImage可以处理JPGE.GIF.BMP.PNG等多种格式图片,扩展了图片处理功能且能与CBitmap 进行转换( 因为所载入的位图句柄都是HBITMAP,所以可相互转换),因此引入CImage类进行图像处理. CImage类简介 CImage是MFC和ATL共享的新类,它能从外部磁盘中调入一个JPEG.GIF.BMP和PNG格式的图像文件加以显示,而且这些文件格式可以相互转换. CImage提供增强型的

Jquery.ajax 详细解释 通过Http请求加载远程数据

首先请看一个Jquery.ajax的例子 $.ajax({ type: "GET", url: "/api/SearchApi/GetResults", dataType: "json", data:{ filter: "test", pageNumber: 1, pageSize: 5 }, success: function(data){ // do something }, complete: function(XMLH

ASP.NET仿新浪微博下拉加载更多数据瀑布流效果

闲来无事,琢磨着写点东西.貌似页面下拉加载数据,瀑布流的效果很火,各个网站都能见到各式各样的展示效果,原理大同小异.于是乎,决定自己写一写这个效果,希望能给比我还菜的菜鸟们一点参考价值. 在开始之前,先把实现的基本原理说一下.当夜幕下拉到底部的时候,js可以判断滚动条的位置,到达底部触发js方法,执行jquery的ajax方法,向后台一般处理程序夜幕ashx文件请求数据源,得到json格式的数据源.然后,遍历json数据源,拼接一个li标签,再填充到页面上去. 首先,我们来做个简单的html页面

[翻译][MVC 5 + EF 6] 7:加载相关数据

原文:Reading Related Data with the Entity Framework in an ASP.NET MVC Application 1.延迟(Lazy)加载.预先(Eager)加载.显式(Explicit)加载: EF加载相关数据到实体导航属性有以下几种方式: 延迟加载:当实体第一次读取时,相关数据没有加载.当第一次试图访问导航属性时,所需的导航数据自动加载.这导致多条查询语句被发送到数据库:一条查询实体本身,一条查询实体相关数据.DbContext类默认启用延迟加载