Spark SQL Table Join(Python)

示例

Spark SQL注册“临时表”执行“Join”(Inner Join、Left Outer Join、Right Outer Join、Full Outer Join)

代码

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row

conf = SparkConf().setAppName("spark_sql_table_join")

sc = SparkContext(conf=conf)

sqlCtx = SQLContext(sc)

line1 = sc.parallelize(["name1 a", "name3 c", "name4 d"])

line2 = sc.parallelize(["name1 1", "name2 2", "name3 3"])

word1 = line1.map(lambda line: line.split(" "))

word2 = line2.map(lambda line: line.split(" "))

table1 = word1.map(lambda words: Row(name=words[0], title=words[1]))

table2 = word2.map(lambda words: Row(name=words[0], fraction=words[1]))

tableSchema1 = sqlCtx.inferSchema(table1)

tableSchema2 = sqlCtx.inferSchema(table2)

tableSchema1.registerTempTable("table1")

tableSchema2.registerTempTable("table2")

def printRows(rows):
    if rows:
        for row in rows:
            print row

# inner join
rows = sqlCtx.sql(
    "select table1.name, table1.title, table2.fraction from table1 join table2 on table1.name = table2.name").collect()

printRows(rows)

print "============================================="

# left outer join
rows = sqlCtx.sql(
    "select table1.name, table1.title, table2.fraction from table1 left outer join table2 on table1.name = table2.name").collect()

printRows(rows)

# right outer join
rows = sqlCtx.sql(
    "select table1.name, table1.title, table2.fraction from table1 right outer join table2 on table1.name = table2.name").collect()

print "============================================="

printRows(rows)

# full outer join
rows = sqlCtx.sql(
    "select table1.name, table1.title, table2.fraction from table1 full outer join table2 on table1.name = table2.name").collect()

print "============================================="

printRows(rows)

"""
Row(name=u‘name1‘, title=u‘a‘, fraction=u‘1‘)
Row(name=u‘name3‘, title=u‘c‘, fraction=u‘3‘)
=============================================
Row(name=u‘name1‘, title=u‘a‘, fraction=u‘1‘)
Row(name=u‘name3‘, title=u‘c‘, fraction=u‘3‘)
Row(name=u‘name4‘, title=u‘d‘, fraction=None)
=============================================
Row(name=u‘name1‘, title=u‘a‘, fraction=u‘1‘)
Row(name=None, title=None, fraction=u‘2‘)
Row(name=u‘name3‘, title=u‘c‘, fraction=u‘3‘)
=============================================
Row(name=u‘name1‘, title=u‘a‘, fraction=u‘1‘)
Row(name=None, title=None, fraction=u‘2‘)
Row(name=u‘name3‘, title=u‘c‘, fraction=u‘3‘)
Row(name=u‘name4‘, title=u‘d‘, fraction=None)
"""

sc.stop()
时间: 2024-10-13 16:01:07

Spark SQL Table Join(Python)的相关文章

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

Spark SQL 源码分析之 In-Memory Columnar Storage 之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar

第九篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar

Spark SQL 优化策略

查询优化是传统数据库中最为重要的一环,这项技术在传统数据库中已经很成熟.除了查询优化, Spark SQL 在存储上也进行了优化,从以下几点查看 Spark SQL 的一些优化策略. (1)内存列式存储与内存缓存表       Spark SQL 可以通过 cacheTable 将数据存储转换为列式存储,同时将数据加载到内存进行缓存. cacheTable 相当于在分布式集群的内存物化视图,将数据进行缓存,这样迭代的或者交互式的查询不用再从 HDFS 读数据,直接从内存读取数据大大减少了 I/O

Spark SQL中的broadcast join分析

在Spark-1.6.2中,执行相同join查询语句,broadcast join模式下,DAG和执行时间如下图所示: 1.broadcast join (1)DAG (2)执行时间 122 rows selected (22.709 seconds) 2.非broadcast join (1)DAG (2)执行时间 122 rows selected (55.512 seconds) 对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThres

Spark SQL  inferSchema实现原理探微(Python)

使用Spark SQL的基础是“注册”(Register)若干表,表的一个重要组成部分就是模式,Spark SQL提供两种选项供用户选择: (1)applySchema applySchema的方式需要用户编码显示指定模式,优点:数据类型明确,缺点:多表时有一定的代码工作量. (2)inferSchema inferSchema的方式无需用户编码显示指定模式,而是系统自动推断模式,代码比较简洁,但既然是推断,就可能出现推断错误(即与用户期望的数据类型不匹配的情况),所以我们需要对其推断过程有清晰

Spark SQL中的几种join

1.小表对大表(broadcast join) 将小表的数据分发到每个节点上,供大表使用.executor存储小表的全部数据,一定程度上牺牲了空间,换取shuffle操作大量的耗时,这在SparkSQL中称作Broadcast Join Broadcast Join的条件有以下几个: *被广播的表需要小于 spark.sql.autoBroadcastJoinThreshold 所配置的值,默认是10M (或者加了broadcast join的hint) *基表不能被广播,比如 left out

Spark SQL inferSchema实现原理探微(Python)【转】

使用Spark SQL的基础是"注册"(Register)若干表,表的一个重要组成部分就是模式,Spark SQL提供两种选项供用户选择: (1)applySchema applySchema的方式需要用户编码显示指定模式,优点:数据类型明确,缺点:多表时有一定的代码工作量. (2)inferSchema inferSchema的方式无需用户编码显示指定模式,而是系统自动推断模式,代码比较简洁,但既然是推断,就可能出现推断错误(即与用户期望的数据类型不匹配的情况),所以我们需要对其推断

Spark SQL编程指南(Python)

前言 Spark SQL允许我们在Spark环境中使用SQL或者Hive SQL执行关系型查询.它的核心是一个特殊类型的Spark RDD:SchemaRDD. SchemaRDD类似于传统关系型数据库的一张表,由两部分组成: Rows:数据行对象 Schema:数据行模式:列名.列数据类型.列可否为空等 Schema可以通过四种方式被创建: (1)Existing RDD (2)Parquet File (3)JSON Dataset (4)By running Hive SQL 考虑到Par