SparkSql系列(2/25)-使用withColumn

32,495次阅读
8 条评论

共计 5989 个字符,预计需要花费 15 分钟才能阅读完成。

Spark withColumn()是一个DataFrame函数,用于向DataFrame中添加新列,更改现有列的值,转换列的数据类型,从现有列派生新列。

Spark withColumn()语法和用法

  • 向DataFrame添加新列
  • 更改现有列的值
  • 从现有列派生新列
  • 更改列数据类型
  • 添加,替换或更新多列
  • 重命名列名
  • 从DataFrame删除列
  • 将列拆分为多列

SparkwithColumn()是DataFrame的转换函数,用于处理DataFrame上所有行或选定行的列值。

在执行诸如添加新列,更新现有列的值,从现有列派生新列等操作之后,withColumn() 函数将返回一个新的 DataFrame。

下面是withColumn()函数的语法。

withColumn(colName : String, col : Column) : DataFrame

colName:Stirng–指定要创建的新列。使用现有的列来更新值。

col:Column –列表达式。

由于withColumn() 是一个转换函数,它只有在调用action时才执行。

Spark withColumn() 方法在内部引入了一个投影。

Spark文档
首先,让我们创建一个要使用的DataFrame。

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
val data = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
Row(Row("Michael","Rose",""),"40288","M","4000"),
Row(Row("Robert","","Williams"),"42114","M","4000"),
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)

val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

1.向DataFrame添加一个新列
要创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。在下面的代码片段中,lit()函数用于向DataFrame列添加常量值。我们还可以链接以添加多个列。

import org.apache.spark.sql.functions.lit
df.withColumn("Country", lit("USA"))

//chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
.withColumn("anotherColumn",lit("anotherValue"))

如果要处理几个列,则上述方法很好,但是当您要添加或更新多个列时,请勿使用withColumn()链接,因为这会导致性能问题,而应使用select()来更新多个列。

2.更改现有列的值
withColumn()DataFrame的Spark函数也可以用于更新现有列的值。为了更改该值,请将现有的列名作为第一个参数传递,将值分配为第二个列。请注意,第二个参数应为Columntype。

import org.apache.spark.sql.functions.col
df.withColumn("salary",col("salary")*100)

此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。

3.从现有列派生新列
要创建新列,请使用您希望新列使用的名称指定第一个参数,并通过对现有列进行操作来使用第二个参数来分配值。

df.withColumn("CopiedColumn",col("salary")* -1)

此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。

4.更改列数据类型
通过在DataFrame上使用Spark withColumn并在列上使用强制转换功能,我们可以更改DataFrame列的数据类型。下面的语句将“工资”列的数据类型从字符串更改为整数。

df.withColumn("salary",col("salary").cast("Integer"))

5.添加,替换或更新多个列
当您想在Spark DataFrame中添加,替换或更新多列时,建议不要链接withColumn()函数,因为这会导致性能问题,并建议在DataFrame上创建临时视图后使用select()

df2.createOrReplaceTempView("PERSON")

spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()

6.重命名列名
尽管6,7和8中的示例未使用withColumn()函数,但我仍然想解释如何重命名,删除和拆分列,因为这些对您很有用。

要重命名现有列,请在DataFrame上使用“ withColumnRenamed ”功能。

df.withColumnRenamed("gender","sex")

7.放置一列
使用drop()函数从DataFrame中删除特定的列。

df.drop("CopiedColumn")

8.将列拆分为多列
尽管此示例未使用withColumn()函数,但我仍然觉得用转换函数将一个DataFrame列拆分为多个列还是很好的解释map()。

import spark.implicits._

val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()

val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name",
"Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)

此代码段将“名称”列拆分为“名字”,“姓氏”,并将“地址”列拆分为“地址行1”,“城市”,“州”和“邮政编码”。产量低于产出:

root
|-- First Name: string (nullable = true)
|-- Last Name: string (nullable = true)
|-- Address Line1: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- zipCode: string (nullable = true)

+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert | Smith |1 Main st | Newark| NJ | 92537 |
|Maria | Garcia |3456 Walnut st| Newark| NJ | 94732 |
+----------+---------+--------------+-------+-----+-------+

注意:请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。

Spark withColumn完整示例

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.functions._
object WithColumn {

def main(args:Array[String]):Unit= {

val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

val dataRows = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
Row(Row("Michael","Rose",""),"40288","M","4000"),
Row(Row("Robert","","Williams"),"42114","M","4000"),
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)

val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)

val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataRows),schema)

//Change the column data type
df2.withColumn("salary",df2("salary").cast("Integer"))

//Derive a new column from existing
val df4=df2.withColumn("CopiedColumn",df2("salary")* -1)

//Transforming existing column
val df5 = df2.withColumn("salary",df2("salary")*100)

//You can also chain withColumn to change multiple columns

//Renaming a column.
val df3=df2.withColumnRenamed("gender","sex")
df3.printSchema()

//Droping a column
val df6=df4.drop("CopiedColumn")
println(df6.columns.contains("CopiedColumn"))

//Adding a literal value
df2.withColumn("Country", lit("USA")).printSchema()

//Retrieving
df2.show(false)
df2.select("name").show(false)
df2.select("name.firstname").show(false)
df2.select("name.*").show(false)

import spark.implicits._

val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"), ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()

val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name","Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)

df2.createOrReplaceTempView("PERSON")
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
}
}
正文完
请博主喝杯咖啡吧!
post-qrcode
 13
admin
版权声明:本站原创文章,由 admin 2021-03-02发表,共计5989字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(8 条评论)
验证码
ManchiYY 评论达人 LV.1
2022-02-23 10:10:32 回复

拆分列哪里要是数据量很大的话,通过这样索引是不是难以实现

 Windows  Edge  中国江西省赣州市电信
    admin 博主
    2022-02-23 22:28:44 回复

    @ManchiYY 如果拆分列很大,那么我觉得在一开始设计表的时候就应该考虑这个问题

     Macintosh  Chrome  中国广东省深圳市电信
ManchiYY 评论达人 LV.1
2022-02-23 10:16:12 回复

val result = indata.select(“city_name”,explode(split(“city_rst_info”,”/t”))as “info”)这个方法可能会简单一点

 Windows  Edge  中国江西省赣州市电信
    admin 博主
    2022-02-23 22:29:57 回复

    @ManchiYY 使用explode方法比较优雅,文章中例子使用get方式获取是想通过map的方式来表达,你提到的这个方法也是很赞的 :smile:

     Macintosh  Chrome  中国广东省深圳市电信
      ManchiYY 评论达人 LV.1
      2022-02-24 08:22:42 回复

      @admin 最近遇到一个问题2022.2.22这种以点分隔的时间字符串使用to_date()函数转换不成日期类型,不知道为咋办了 :cry:

       Windows  Edge  中国江西省赣州市电信
        admin 博主
        2022-02-24 09:32:28 回复

        @ManchiYY to_date(col(“time”),”yyyy.M.dd”) 需要你构建一个pattern去识别

         Windows  Chrome  中国广东省广州市电信
          ManchiYY 评论达人 LV.1
          2022-02-24 20:14:08 回复

          @admin 能请问一下你用的spark是那个版本的吗?我的to_date()函数只能接收一个参数

           Windows  Edge  中国江西省上饶市联通