Spark DataFrame列的合并与拆分

版本说明:Spark-2.3.0

使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。

1 DataFrame列数据的合并
例如:我们有如下数据,想要将三列数据合并为一列,并以“,”分割

+----+---+-----------+
|name|age|      phone|
+----+---+-----------+
|Ming| 20|15552211521|
|hong| 19|13287994007|
| zhi| 21|15552211523|
+----+---+-----------+

1.1 使用map方法重写

使用map方法重写就是将DataFrame使用map取值之后,然后使用toSeq方法转成Seq格式,最后使用Seq的foldLeft方法拼接数据,并返回,如下所示:

//方法1:利用map重写
    val separator = ","
    df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

1.2 使用内置函数concat_ws

合并多列数据也可以使用SparkSQL的内置函数concat_ws()

//方法2: 使用内置函数 concat_ws
    import org.apache.spark.sql.functions._
    df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

1.3 使用自定义UDF函数

自己编写UDF函数,实现多列合并

 //方法3:使用自定义UDF函数

    // 编写udf函数
    def mergeCols(row: Row): String = {
      row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
    df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()

完整代码:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StringType

/**
  * Created by shirukai on 2018/9/12
  * DataFrame 合并列
  */
object MergeColsTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    //从内存创建一组DataFrame数据
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L))
      .toDF("name", "age", "phone")
    df.show()
    /**
      * +----+---+-----------+
      * |name|age|      phone|
      * +----+---+-----------+
      * |Ming| 20|15552211521|
      * |hong| 19|13287994007|
      * | zhi| 21|15552211523|
      * +----+---+-----------+
      */
    //方法1:利用map重写
    val separator = ","
    df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */
    //方法2: 使用内置函数 concat_ws
    import org.apache.spark.sql.functions._
    df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */
    //方法3:使用自定义UDF函数

    // 编写udf函数
    def mergeCols(row: Row): String = {
      row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
    df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()

    /**
      * /**
      * * +-------------------+
      * * |              value|
      * * +-------------------+
      * * |Ming,20,15552211521|
      * * |hong,19,13287994007|
      * * | zhi,21,15552211523|
      * * +-------------------+
      **/
      */
  }
}

2 DataFrame列数据的拆分

上面我们将DataFrame的多列数据合并为一列如下所示,有时候我们也需要将单列数据,以某种拆分规则,拆分为多列。下面提供几种将一列拆分为多列的方法。

+-------------------+
|              value|
+-------------------+
|Ming,20,15552211521|
|hong,19,13287994007|
| zhi,21,15552211523|
+-------------------+

2.1 使用内置函数split,然后遍历添加列

该方法,先利用内置函数split将单列的数据拆分,然后遍历使用getItem(角标)方法获取拆分后的数据,依次使用withColumn方法添加新列,代码如下所示:

  //方法1: 使用内置函数split,然后遍历添加列
    val separator = ","
    lazy val first = df.first()

    val numAttrs = first.toString().split(separator).length
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()
  /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+

2.2 使用UDF函数创建多列数据,然后合并
该方法是使用udf函数,生成多个列,然后合并到原来的数据。该方法参考了VectorDisassembler(与spark ml官网提供的VectorAssembler相反),这是一个第三方的spark ml向量拆分算法,该方法github地址:https://github.com/jamesbconner/VectorDisassembler。代码如下所示:

//方法2:使用udf函数创建多列,然后合并
    val attributes: Array[Attribute] = {
      val numAttrs = first.toString().split(separator).length
      //生成attributes
      Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
    }
    //创建多列数据
    val fieldCols = attributes.zipWithIndex.map(x => {
      val assembleFunc = udf {
        str: String =>
          str.split(separator)(x._2)
      }
      assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
    })
    //合并数据
    df.select(col("*") +: fieldCols: _*).show()

    /**
      * +-------------------+-------+-------+-----------+
      * |              value|value_0|value_1|    value_2|
      * +-------------------+-------+-------+-----------+
      * |Ming,20,15552211521|   Ming|     20|15552211521|
      * |hong,19,13287994007|   hong|     19|13287994007|
      * | zhi,21,15552211523|    zhi|     21|15552211523|
      * +-------------------+-------+-------+-----------+
      */

完整代码:

import org.apache.spark.ml.attribute.{Attribute, NumericAttribute}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType

/**
  * Created by shirukai on 2018/9/12
  * 拆分列
  */
object SplitColTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    //从内存中创建DataFrame
    import spark.implicits._
    val df = Seq("Ming,20,15552211521", "hong,19,13287994007", "zhi,21,15552211523")
      .toDF("value")
    df.show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

    import org.apache.spark.sql.functions._
    //方法1: 使用内置函数split,然后遍历添加列
    val separator = ","
    lazy val first = df.first()

    val numAttrs = first.toString().split(separator).length
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()

    /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+
      */

    //方法2:使用udf函数创建多列,然后合并
    val attributes: Array[Attribute] = {
      val numAttrs = first.toString().split(separator).length
      //生成attributes
      Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
    }
    //创建多列数据
    val fieldCols = attributes.zipWithIndex.map(x => {
      val assembleFunc = udf {
        str: String =>
          str.split(separator)(x._2)
      }
      assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
    })
    //合并数据
    df.select(col("*") +: fieldCols: _*).show()

    /**
      * +-------------------+-------+-------+-----------+
      * |              value|value_0|value_1|    value_2|
      * +-------------------+-------+-------+-----------+
      * |Ming,20,15552211521|   Ming|     20|15552211521|
      * |hong,19,13287994007|   hong|     19|13287994007|
      * | zhi,21,15552211523|    zhi|     21|15552211523|
      * +-------------------+-------+-------+-----------+
      */
  }
}

原文地址:https://www.cnblogs.com/itboys/p/9813934.html

时间: 2024-10-30 21:29:28

Spark DataFrame列的合并与拆分的相关文章

spark dataframe操作集锦(提取前几行,合并,入库等)

Spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数. 而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到Hive中. 不得不赞叹dataframe的强大. 具体示例:为了得到样本均衡的训练集,需要对两个数据集中各取相同的训练样本数目来组成,因此用到了这

spark DataFrame 常见操作

spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数. 而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中. 不得不赞叹dataframe的强大. 具体示例:为了得到样本均衡的训练集,需要对两个数据集中各取相同的训练样本数目来组成,因此用到了这

Pandas: 如何将一列中的文本拆分为多行? | Python

Pandas: 如何将一列中的文本拆分为多行? 在数据处理过程中,经常会遇到以下类型的数据: 在同一列中,本该分别填入多行中的数据,被填在一行里了,然而在分析的时候,需要拆分成为多行. 在上图中,列名为”Country” ,index为4和5的单元格内,值为”UK/Australia”和”UK/Netherland”. 今天,我们来介绍将含有多值的内容分拆成多行的几种方法. 加载数据 PS:可以通过左右滑动来查看代码 import pandas as pd df = pd.DataFrame({

Spark DataFrame ETL教程

前言 ETL是 Extract-Transform-Load的缩写,也就是抽取-转换-加载,在数据工作中是非常重要的部分.实际上,ETL就是一个对数据进行批处理的过程,一个ETL程序就是一个批处理脚本,执行时能将一堆数据转化成我们需要的形式. 每个接触过数据批处理的工程师,都走过ETL的流程,只是没有意识到而已.按照ETL过程的框架来重新认识数据批处理,有利于我们更清晰地编写批处理脚本. 在单机范围内的数据量下,使用python的pandas包就可以非常方便地完成数据批处理工作.但当数据量达到1

04. 字符串合并与拆分写法小结

原文:04. 字符串合并与拆分写法小结 一. 字符合并 if OBJECT_ID('ConcatStr') is not null drop table ConcatStr GO create table ConcatStr ( ID int, Code varchar(10) ) GO insert into ConcatStr select 1,'XXX' union all select 1,'YYY' union all select 2,'PPP' union all select 2

Goldengate进程的合并与拆分规范

Goldengate抽取进程的合并与拆分原则 1.    文档综述 1.1.  文档说明 本文档描述了对GoldenGate的抽取进程进行拆分和合并的基本原则和详细步骤.  1.2.  读者范围 本文档主要容灾相关人员.纳入数据级容灾范围的应用系统相关人员使用,在Goldengate实施.运维的整个生命周期中,必须严格遵循本系列文档. 1.3.  术语说明 序号 完整说法 缩略说法 1 GoldenGate GG或OGG         2.      抽取进程的拆分 2.1.  拆分原则  

【峰回路转】Excel技巧百例 07.汉字列的合并

如果您看到如下图这样的表格,需要把内容合并到一列,您会如何处理呢? 这里介绍一下汉字列的合并 常用的方式有2种: 1.用&连接符 E1=A1&B1&C1&D1 2.用CONCATENATE函数 版权声明:本文为博主原创文章,未经博主允许不得转载.

MSSQL—列记录合并成一行

在项目开发中,有时会碰到将列记录合并为一行的情况,例如根据地区将人员姓名合并,或根据拼音首字母合并城市等,下面就以根据地区将人员姓名合并为例,详细讲一下合并的方法. 首先,先建一个表,并添加一些数据,建表代码如下: If OBJECT_ID(N'Demo') Is Not Null    Begin        Drop Table Demo    EndElse    Begin        Create Table Demo(        Area nvarchar(30),     

spark dataframe unionall

今天本来想写一个spark dataframe unionall的demo,由于粗心报下面错误: Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 3 columns and the right has 4; at o