通过Spark SQL关联查询两个HDFS上的文件操作

order_created.txt   订单编号  订单创建时间

10703007267488  2014-05-01 06:01:12.334+01
10101043505096  2014-05-01 07:28:12.342+01
10103043509747  2014-05-01 07:50:12.33+01
10103043501575  2014-05-01 09:27:12.33+01
10104043514061  2014-05-01 09:03:12.324+01

order_picked.txt   订单编号  订单提取时间

10703007267488  2014-05-01 07:02:12.334+01
10101043505096  2014-05-01 08:29:12.342+01
10103043509747  2014-05-01 10:55:12.33+01

上传上述两个文件到HDFS:

hadoop fs -put order_created.txt /data/order_created.txt
hadoop fs -put order_picked.txt /data/order_picked.txt

通过Spark SQL关联查询两个文件

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._

case class OrderCreated(order_no:String,create_date:String)
case class OrderPicked(order_no:String,picked_date:String)

val order_created = sc.textFile("/data/order_created.txt").map(_.split("\t")).map( d => OrderCreated(d(0),d(1)))
val order_picked = sc.textFile("/data/order_picked.txt").map(_.split("\t")).map( d => OrderPicked(d(0),d(1)))

order_created.registerTempTable("t_order_created")
order_picked.registerTempTable("t_order_picked")

#手工设置Spark SQL task个数
hiveContext.setConf("spark.sql.shuffle.partitions","10")
hiveContext.sql("select a.order_no, a.create_date, b.picked_date from t_order_created a join t_order_picked b on a.order_no = b.order_no").collect.foreach(println)

执行结果如下:

[10101043505096,2014-05-01 07:28:12.342+01,2014-05-01 08:29:12.342+01]
[10703007267488,2014-05-01 06:01:12.334+01,2014-05-01 07:02:12.334+01]
[10103043509747,2014-05-01 07:50:12.33+01,2014-05-01 10:55:12.33+01]
时间: 2024-12-14 16:57:23

通过Spark SQL关联查询两个HDFS上的文件操作的相关文章

(一)SQL关联查询的使用技巧 (各种 join)

---恢复内容开始--- (一)SQL关联查询的使用技巧 (各种 join) 这几天因为工作的时候,发现自己的sql语句基础不是很好,特意研究了一下,发现sql语句真的是博大精深,sql语句不仅是要查出来你想要的数据,更讲究查询的效率,因为在查询大量数据时往往会因为数据量大,造成效率很低,再加上前后台数据的交互,造成了访问延迟等等的一系列问题. 在我们的日常工作中往往用到很多的查询方式,例如 嵌套查询,关联查询,子查询等等,就我而言,我感觉关联查询是最容易学习,和效率最高的.下面就我总结的关联查

SQL如何查询两个表的数据

在进行查询操作时,我们通常需要查询两个关联表的数据,我们可以使用where语句进行查询,如: select Emp.E_Id,Company.C_OraName from Emp,Company where Companey.C_Id=Emp.C_Id 但是我们往往会碰到比较复杂的语句,这时候使用where就不太合适了,其实SQL可以用较为直接的形式进行连接操作,可以在From子句中以直接的形式指出: select top 10 E_Id,E_Name,C_Name from Emp join

[Spark][Python]对HDFS 上的文件,采用绝对路径,来读取获得 RDD

对HDFS 上的文件,采用绝对路径,来读取获得 RDD: In [102]: mydata=sc.textFile("file:/home/training/test.txt")17/09/24 06:31:04 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 230.5 KB, free 2.4 MB)17/09/24 06:31:04 INFO stora

hadoop的API对HDFS上的文件访问

这篇文章主要介绍了使用hadoop的API对HDFS上的文件访问,其中包括上传文件到HDFS上.从HDFS上下载文件和删除HDFS上的文件,需要的朋友可以参考下hdfs文件操作操作示例,包括上传文件到HDFS上.从HDFS上下载文件和删除HDFS上的文件,大家参考使用吧 复制代码 代码如下:import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*; import java.io.File;import ja

Eclipse 上传 删除 下载 分析 hdfs 上的文件

本篇讲解如何通过Eclipse 编写代码去操作分析hdfs 上的文件. 1.在eclipse 下新建Map/Reduce Project项目.如图:  项目建好后,会默认加载一系列相应的jar包. 下面还有很多包. 2.我们新建Java 类就可以了.代码如下: package org.hadoop.examples; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOExcep

【转载】HDFS 上传文件不均衡和Balancer太慢的问题

向HDFS上传文件,如果是从某个datanode开始上传文件,会导致上传的数据优先写满当前datanode的磁盘,这对于运行分布式程序是非常不利的. 解决的办法: 1.从其他非datanode节点上传 可以将hadoop的安装目录复制一份到一个不在集群中的节点(直接从非datanode的namenode上传也可以,但是这样不太好,会增加namenode的负担,并且时间长了会让namenode上放了各种乱七八糟的文件),在这个节点上不启动任何hadoop进程,但是可以当作客户端使用.上传文件到集群

HDFS 上传文件不均衡和Balancer太慢的问题

向HDFS上传文件,如果是从某个datanode开始上传文件,会导致上传的数据优先写满当前datanode的磁盘,这对于运行分布式程序是非常不利的. 解决的办法: 1.从其他非datanode节点上传 可以将hadoop的安装目录复制一份到一个不在集群中的节点(直接从非datanode的namenode上传也可以,但是这样不太好,会增加namenode的负担,并且时间长了会让namenode上放了各种乱七八糟的文件),在这个节点上不启动任何hadoop进程,但是可以当作客户端使用.上传文件到集群

eclipse通过maven进行打包并且对hdfs上的文件进行wordcount

在eclipse中配置自己的maven仓库 1.安装maven(用于管理仓库,jar包的管理) -1.解压maven安装包 -2.把maven添加到环境变量/etc/profile -3.添加maven目录下的conf/setting.xml文件到-/.m2文件夹下 2.安装eclipse -1.解压eclipse安装文件 -2.执行eclipse.inst文件 -3.按步骤操作 3.在eclipse中配置自己的maven仓库 1.window>>perfoemence>>mave

在spark udf中读取hdfs上的文件

某些场景下,我们在写UDF实现业务逻辑时候,可能需要去读取某个配置文件. 大多时候我们都会将此文件上传个hdfs某个路径下,然后通过hdfs api读取该文件,但是需要注意: UDF中读取文件部分最好放在静态代码块中(只会在类加载时候读取一次),尤其在处理的数据量比较大的时候,否则会反反复复的读取,造成不必要的开销,甚至任务失败,示例代码如下: package cn.com.dtmobile.udf; import java.util.HashMap; import org.apache.spa