spark dataFrame withColumn

说明:withColumn用于在原有DF新增一列

1. 初始化sqlContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

2.导入sqlContext隐式转换

import sqlContext.implicits._

3.  创建DataFrames

val df = sqlContext.read.json("file:///usr/local/spark-2.3.0/examples/src/main/resources/people.json")

4. 显示内容

df.show()

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

5. 为原有df新加一列

df.withColumn("id2", monotonically_increasing_id()+1)

6. 显示添加列后的内容

res6.show()

+----+-------+---+

| age|   name|id2|

+----+-------+---+

|null|Michael|  1|

|  30|   Andy|  2|

|  19| Justin|  3|

+----+-------+---+

完成的过程如下:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details

sqlContext: org.apache.spark.sql.SQLContext = [email protected]

scala> import sqlContext.implicits._

import sqlContext.implicits._

scala> val df = sqlContext.read.json("file:///usr/local/spark-2.3.0/examples/src/main/resources/people.json")

2018-06-25 18:55:30 WARN  ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0

2018-06-25 18:55:30 WARN  ObjectStore:568 - Failed to get database default, returning NoSuchObjectException

2018-06-25 18:55:32 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

scala> df.withColumn("id2", monotonically_increasing_id()+1)

res6: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> res6.show()

+----+-------+---+

| age|   name|id2|

+----+-------+---+

|null|Michael|  1|

|  30|   Andy|  2|

|  19| Justin|  3|

+----+-------+---+

原文地址:https://www.cnblogs.com/abcdwxc/p/9225855.html

时间: 2024-10-18 00:57:47

spark dataFrame withColumn的相关文章

spark dataframe操作集锦(提取前几行,合并,入库等)

Spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数. 而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到Hive中. 不得不赞叹dataframe的强大. 具体示例:为了得到样本均衡的训练集,需要对两个数据集中各取相同的训练样本数目来组成,因此用到了这

spark DataFrame 常见操作

spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数. 而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中. 不得不赞叹dataframe的强大. 具体示例:为了得到样本均衡的训练集,需要对两个数据集中各取相同的训练样本数目来组成,因此用到了这

Spark DataFrame ETL教程

前言 ETL是 Extract-Transform-Load的缩写,也就是抽取-转换-加载,在数据工作中是非常重要的部分.实际上,ETL就是一个对数据进行批处理的过程,一个ETL程序就是一个批处理脚本,执行时能将一堆数据转化成我们需要的形式. 每个接触过数据批处理的工程师,都走过ETL的流程,只是没有意识到而已.按照ETL过程的框架来重新认识数据批处理,有利于我们更清晰地编写批处理脚本. 在单机范围内的数据量下,使用python的pandas包就可以非常方便地完成数据批处理工作.但当数据量达到1

spark dataframe unionall

今天本来想写一个spark dataframe unionall的demo,由于粗心报下面错误: Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 3 columns and the right has 4; at o

Spark DataFrame列的合并与拆分

版本说明:Spark-2.3.0 使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列.这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法. 1 DataFrame列数据的合并例如:我们有如下数据,想要将三列数据合并为一列,并以","分割 +----+---+-----------+ |name|age| phone| +----+---+-----------+ |Ming| 20|15552211521| |

spark dataframe函数编程

DataFrame 的函数 Action 操作 1. collect() ,返回值是一个数组,返回dataframe集合所有的行 2. collectAsList() 返回值是一个Java类型的数组,返回dataframe集合所有的行 3. count() 返回一个number类型的,返回dataframe集合的行数 4. describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用

Spark DataFrame 数据框空值判断和处理

scala> val data1 = data.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating") data1: org.apache.spark

spark&dataframe

1.今天,我们来介绍spark以及dataframe的相关的知识点,但是在此之前先说一下对以前的hadoop的一些理解 当我启动hadoop的时候,上面有hdfs的存储结构,由于这个是分布式存储,所以当一个节点挂了之后,此后由于 还有别的机器上存储这些block块(这里面你肯定要问了,我们怎么知道它挂了,其实我前面关于akaka的时候rpc 通信的机制,心跳机制),所以这个是我们选择它的理由之一,还有一个原因我们可以进行无限扩容,是因为当我们 使用zookeeper进行管理这些datanode的

spark dataframe 类型转换

读一张表,对其进行二值化特征转换.可以二值化要求输入类型必须double类型,类型怎么转换呢? 直接利用spark column 就可以进行转换: DataFrame dataset = hive.sql("select age,sex,race from hive_race_sex_bucktizer "); /** * 类型转换 */ dataset = dataset.select(dataset.col("age").cast(DoubleType).as(