SparkSql系列(3/25) withColumnRnamed 函数

6,036次阅读
没有评论

共计 3086 个字符,预计需要花费 8 分钟才能阅读完成。

在SparkwithColumnRenamed()中用于重命名一列或多个DataFrame列名。取决于DataFrame架构,重命名列可能变得简单到复杂,尤其是当列嵌套有struct类型时,它会变得复杂。

在本文中,我将说明如何使用多个用例来重命名DataFrame列,例如重命名所选的多个列,嵌套的struct列,所有带有Scala示例的列。

  • 使用withColumnRenamed –重命名Spark DataFrame列名
  • 与withColumnRenamed一起使用–重命名多个列
  • 使用StructType –重命名Spark DataFrame上的嵌套列
  • 使用选择–重命名嵌套的列
  • 使用withColumn –重命名嵌套的列
  • 使用col()函数–动态重命名所有或多个列
  • 使用toDF()–重命名所有或多个列

首先,让我们为示例创建数据,我们将数据转换为Spark DataFrame时使用Row类。

val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
Row(Row("Michael ","Rose",""),"40288","M",4000),
Row(Row("Robert ","","Williams"),"42114","M",4000),
Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
Row(Row("Jen","Mary","Brown"),"","F",-1)
)

我们的基本架构具有嵌套结构。

val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",IntegerType)

让我们通过使用parallelize创建DataFrame并提供上述架构。

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.printSchema()

以下是我们的架构结构。我不在这里打印数据,因为我们的示例没有必要。该模式具有嵌套结构。

1.使用Spark withColumnRenamed –重命名DataFrame列名

SparkwithColumnRenamed()在DataFrame上具有更改列名称的功能。这是最直接的方法;该函数有两个参数;第一个是您现有的列名,第二个是您想要的新列名。返回的是新的DataFrame。

句法:

def withColumnRenamed(existingName: String, newName: String): DataFrame

existingName –您要更改的现有列名

newName –列的新名称

返回具有重命名列的新DataFrame(Dataset [Row])。如果架构不包含,则此操作不可操作existingName。

范例:

df.withColumnRenamed("dob","DateOfBirth")
.printSchema()

上面的示例将spark DataFrame上的列名从“ dob”更改为“ DateOfBirth”。请注意,该withColumnRenamed函数返回一个新的DataFrame,并且不会修改当前的DataFrame。

2.使用withColumnRenamed –重命名多个列

要更改多个列名,我们应该withColumnRenamed按如下所示链接函数。

val df2 = df.withColumnRenamed("dob","DateOfBirth")
.withColumnRenamed("salary","salary_amount")

df2.printSchema()
重命名dob和salary列后,这将创建一个新的DataFrame“ df2”。

3.使用Spark StructType –重命名Dataframe中的嵌套列

更改嵌套数据上的列名并非一帆风顺,我们可以通过使用StructType使用新的DataFrame列创建一个新的架构,并使用如下所示的cast函数来使用它来实现。

val schema2 = new StructType()
.add("fname",StringType)
.add("middlename",StringType)
.add("lname",StringType)

df.select(col("name").cast(schema2),
col("dob"),
col("gender"),
col("salary"))
.printSchema()

该语句在名称结构中将firstname重命名为fname,将lastname重命名为lname。

重命名多列

4.使用选择–重命名嵌套元素。

让我们看看通过将结构转置为平面来更改嵌套列的另一种方法。

df.select(col("name.firstname").as("fname"),
col("name.middlename").as("mname"),
col("name.lastname").as("lname"),
col("dob"),col("gender"),col("salary"))
.printSchema()

Spark重命名嵌套列

5.使用Spark DataFrame withColumn –重命名嵌套的列

当您在Spark DatFrame上嵌套了列并且想要重命名它时,请使用withColumn数据框对象从现有对象创建新列,我们将需要删除现有列。下面的示例从“ name.firstname”创建一个“ fname”列,并删除“ name”列

val df4 = df.withColumn("fname",col("name.firstname"))
.withColumn("mname",col("name.middlename"))
.withColumn("lname",col("name.lastname"))
.drop("name")
df4.printSchema()

6.使用col()函数–动态重命名所有或多个列

更改数据框上所有列名称的另一种方法是使用col()函数。

val old_columns = Seq("dob","gender","salary","fname","mname","lname")
val new_columns = Seq("DateOfBirth","Sex","salary","firstName","middleName","lastName")
val columnsList = old_columns.zip(new_columns).map(f=>{col(f._1).as(f._2)})
val df5 = df4.select(columnsList:_*)
df5.printSchema()

7.使用toDF()–更改Spark DataFrame中的所有列

当我们的数据具有平面结构(无嵌套)时,请使用toDF()新的架构来更改所有列名。

val newColumns = Seq("newCol1","newCol2","newCol3")
val df = df.toDF(newColumns:_*)
正文完
请博主喝杯咖啡吧!
post-qrcode
 
admin
版权声明:本站原创文章,由 admin 2021-03-02发表,共计3086字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码