Spark withColumn()是一个DataFrame函数,用于向DataFrame中添加新列,更改现有列的值,转换列的数据类型,从现有列派生新列。
Spark withColumn()语法和用法
- 向DataFrame添加新列
- 更改现有列的值
- 从现有列派生新列
- 更改列数据类型
- 添加,替换或更新多列
- 重命名列名
- 从DataFrame删除列
- 将列拆分为多列
SparkwithColumn()是DataFrame的转换函数,用于处理DataFrame上所有行或选定行的列值。
在执行诸如添加新列,更新现有列的值,从现有列派生新列等操作之后,withColumn() 函数将返回一个新的 DataFrame。
下面是withColumn()函数的语法。
withColumn(colName : String, col : Column) : DataFrame
colName:Stirng–指定要创建的新列。使用现有的列来更新值。
col:Column –列表达式。
由于withColumn() 是一个转换函数,它只有在调用action时才执行。
Spark withColumn() 方法在内部引入了一个投影。
Spark文档 首先,让我们创建一个要使用的DataFrame。
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructType} val data = Seq(Row(Row("James;","","Smith"),"36636","M","3000"), Row(Row("Michael","Rose",""),"40288","M","4000"), Row(Row("Robert","","Williams"),"42114","M","4000"), Row(Row("Maria","Anne","Jones"),"39192","F","4000"), Row(Row("Jen","Mary","Brown"),"","F","-1") ) val schema = new StructType() .add("name",new StructType() .add("firstname",StringType) .add("middlename",StringType) .add("lastname",StringType)) .add("dob",StringType) .add("gender",StringType) .add("salary",StringType) val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
1.向DataFrame添加一个新列 要创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。在下面的代码片段中,lit()函数用于向DataFrame列添加常量值。我们还可以链接以添加多个列。
import org.apache.spark.sql.functions.lit df.withColumn("Country", lit("USA")) //chaining to operate on multiple columns df.withColumn("Country", lit("USA")) .withColumn("anotherColumn",lit("anotherValue"))
如果要处理几个列,则上述方法很好,但是当您要添加或更新多个列时,请勿使用withColumn()链接,因为这会导致性能问题,而应使用select()来更新多个列。
2.更改现有列的值 withColumn()DataFrame的Spark函数也可以用于更新现有列的值。为了更改该值,请将现有的列名作为第一个参数传递,将值分配为第二个列。请注意,第二个参数应为Columntype。
import org.apache.spark.sql.functions.col df.withColumn("salary",col("salary")*100)
此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。
3.从现有列派生新列 要创建新列,请使用您希望新列使用的名称指定第一个参数,并通过对现有列进行操作来使用第二个参数来分配值。
df.withColumn("CopiedColumn",col("salary")* -1)
此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。
4.更改列数据类型 通过在DataFrame上使用Spark withColumn并在列上使用强制转换功能,我们可以更改DataFrame列的数据类型。下面的语句将“工资”列的数据类型从字符串更改为整数。
df.withColumn("salary",col("salary").cast("Integer"))
5.添加,替换或更新多个列 当您想在Spark DataFrame中添加,替换或更新多列时,建议不要链接withColumn()函数,因为这会导致性能问题,并建议在DataFrame上创建临时视图后使用select()
df2.createOrReplaceTempView("PERSON") spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
6.重命名列名 尽管6,7和8中的示例未使用withColumn()函数,但我仍然想解释如何重命名,删除和拆分列,因为这些对您很有用。
要重命名现有列,请在DataFrame上使用“ withColumnRenamed ”功能。
df.withColumnRenamed("gender","sex")
7.放置一列 使用drop()函数从DataFrame中删除特定的列。
df.drop("CopiedColumn")
8.将列拆分为多列 尽管此示例未使用withColumn()函数,但我仍然觉得用转换函数将一个DataFrame列拆分为多个列还是很好的解释map()。
import spark.implicits._ val columns = Seq("name","address") val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"), ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732")) var dfFromData = spark.createDataFrame(data).toDF(columns:_*) dfFromData.printSchema() val newDF = dfFromData.map(f=>{ val nameSplit = f.getAs[String](0).split(",") val addSplit = f.getAs[String](1).split(",") (nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3)) }) val finalDF = newDF.toDF("First Name","Last Name", "Address Line1","City","State","zipCode") finalDF.printSchema() finalDF.show(false)
此代码段将“名称”列拆分为“名字”,“姓氏”,并将“地址”列拆分为“地址行1”,“城市”,“州”和“邮政编码”。产量低于产出:
root |-- First Name: string (nullable = true) |-- Last Name: string (nullable = true) |-- Address Line1: string (nullable = true) |-- City: string (nullable = true) |-- State: string (nullable = true) |-- zipCode: string (nullable = true) +----------+---------+--------------+-------+-----+-------+ |First Name|Last Name|Address Line1 |City |State|zipCode| +----------+---------+--------------+-------+-----+-------+ |Robert | Smith |1 Main st | Newark| NJ | 92537 | |Maria | Garcia |3456 Walnut st| Newark| NJ | 94732 | +----------+---------+--------------+-------+-----+-------+
注意:请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。
Spark withColumn完整示例
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.functions._ object WithColumn { def main(args:Array[String]):Unit= { val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("SparkByExamples.com") .getOrCreate() val dataRows = Seq(Row(Row("James;","","Smith"),"36636","M","3000"), Row(Row("Michael","Rose",""),"40288","M","4000"), Row(Row("Robert","","Williams"),"42114","M","4000"), Row(Row("Maria","Anne","Jones"),"39192","F","4000"), Row(Row("Jen","Mary","Brown"),"","F","-1") ) val schema = new StructType() .add("name",new StructType() .add("firstname",StringType) .add("middlename",StringType) .add("lastname",StringType)) .add("dob",StringType) .add("gender",StringType) .add("salary",StringType) val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataRows),schema) //Change the column data type df2.withColumn("salary",df2("salary").cast("Integer")) //Derive a new column from existing val df4=df2.withColumn("CopiedColumn",df2("salary")* -1) //Transforming existing column val df5 = df2.withColumn("salary",df2("salary")*100) //You can also chain withColumn to change multiple columns //Renaming a column. val df3=df2.withColumnRenamed("gender","sex") df3.printSchema() //Droping a column val df6=df4.drop("CopiedColumn") println(df6.columns.contains("CopiedColumn")) //Adding a literal value df2.withColumn("Country", lit("USA")).printSchema() //Retrieving df2.show(false) df2.select("name").show(false) df2.select("name.firstname").show(false) df2.select("name.*").show(false) import spark.implicits._ val columns = Seq("name","address") val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"), ("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732")) var dfFromData = spark.createDataFrame(data).toDF(columns:_*) dfFromData.printSchema() val newDF = dfFromData.map(f=>{ val nameSplit = f.getAs[String](0).split(",") val addSplit = f.getAs[String](1).split(",") (nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3)) }) val finalDF = newDF.toDF("First Name","Last Name","Address Line1","City","State","zipCode") finalDF.printSchema() finalDF.show(false) df2.createOrReplaceTempView("PERSON") spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show() } }