SparkSQL系列(1/25)–创建DataFrame

475次阅读
没有评论

博主日常在工作中使用spark sql进行数据分析比较多,常见的还会借助hive。从Tfrecord里面读数据出来为DataSet,如果你会使用pandas 那么它跟 pandas 的形式很像,那么你操作起来会更贱容易上手。

在Spark中,使用 createDataFrame() 和 toDF() 方法创建一个 DataFrame,使用这些方法,您可以从已经存在的RDD,DataFrame,Dataset,List,Seq数据对象中创建一个Spark DataFrame,在这里我将用Scala示例进行说明。 。

您还可以从不同的来源(例如TextCSVJSONXMLParquetAvroORC二进制文件,RDBMS表,Hive,HBase等)创建一个DataFrame 。

DataFrame是组织为命名列的分布式数据集合。从概念上讲,它等效于关系数据库中的表或R / Python中的数据框,但是在后台进行了更丰富的优化。可以从多种来源构造DataFrame,例如:结构化数据文件,Hive中的表,外部数据库或现有的RDD。

val spark:SparkSession = SparkSession.builder()
   .master("local[1]").appName("SparkByExamples.com")
   .getOrCreate()

import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))

-数据块

  • Spark从RDD创建DataFrame
  • 从列表和Seq集合创建DataFrame
  • 从CSV文件创建Spark DataFrame
  • 从TXT文件创建
  • 从JSON文件创建
  • 从XML文件创建
  • 从HIVE创造
  • 从RDBMS数据库创建表
  • 从HBase表创建
  • 其他来源(Avro,镶木地板等)

首先,让我们在示例中(例如,当需要使用时.toDF() function)导入所需的spark隐式数据,并为示例创建数据。

1. Spark从RDD创建DataFrame

创建Spark DataFrame的一种简单方法是从现有的RDD中进行。首先,让我们通过调用parallelize()从集合Seq创建一个RDD

我将在下面的所有示例中使用该rdd对象。

val rdd = spark.sparkContext.parallelize(data)

1.1使用toDF()函数

一旦有了RDD,就可以使用toDF()它在Spark中创建DataFrame。默认情况下,它创建的列名称分别为“ _1”和“ _2”,因为每一行有两列。

val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

由于RDD是无模式的,没有列名和数据类型,因此从RDD转换为DataFrame会为您提供默认的列名,如_1,_2等,数据类型则为String。printSchema就是将DataFrame列和数据类型等打印出来。就是表格式。

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)

toDF() 还有另一个签名来分配列名,该签名为列名采用了可变数量的参数,如下所示。

产量低于产出。记住这里我们只是分配了列名,但它仍然将所有数据类型都当作字符串。

默认情况下,这些列的数据类型分配给String。我们可以通过提供模式来更改此行为–<a href="https://sparkbyexamples.com/spark/spark-sql-structtype-on-dataframe/">我们可以</a>在其中<a href="https://sparkbyexamples.com/spark/spark-sql-structtype-on-dataframe/">为每个字段/列指定列名,数据类型和可为null</a>。

1.2从SparkSession使用Spark createDataFrame

createDataFrame()从SparkSession使用是另一种创建方法,它使用rdd对象作为参数。并与toDF()链接以为列指定名称。

val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

1.3在行类型中使用createDataFrame()

createDataFrame()具有另一个签名,该签名采用RDD [Row]类型和列名称的架构作为参数。要首先使用它,我们需要将“ rdd”对象从转换为RDD[T],RDD[Row]并使用StructType和StructField定义模式。这种方式相当于定义好每条数据,但是也要提前定义好数据的格式也就是Schema。

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
val schema = StructType( Array(
                 StructField("language", StringType,true),
                 StructField("language", StringType,true)
             ))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

2.从列表和序列集合创建Spark DataFrame

在本部分中,我们将看到几种通过collectionSeq[T]或创建Spark DataFrame的方法List[T]。这些示例与我们在上一节中使用RDD看到的示例相似,但是我们使用“数据”对象而不是“ rdd”对象。

2.1在List或Seq集合上使用toDF()

toDF()在集合(Seq,List)对象上创建一个DataFrame。确保导入import spark.implicits._以使用toDF()

val dfFromData1 = data.toDF() 

2.2从SparkSession使用createDataFrame()

调用createDataFrame()fromSparkSession是另一种创建方法,它使用集合对象(Seq或List)作为参数。并与toDF()链接以为列指定名称。

//From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)

2.3在行类型中使用createDataFrame()

createDataFrame()在Spark中具有另一个签名,该签名将列类型的行类型和模式的集合作为参数。要首先使用它,我们需要将“数据”对象从Seq [T]转换为Seq [Row]。

//From Data (USING createDataFrame and Adding schema using StructType)
val data = Seq(Row("Java", "20000"), 
               Row("Python", "100000"), 
               Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)

以下 3-6都是从文件中读取相应的数据然后生成DataFrame

3.从CSV创建Spark DataFrame

在以上所有示例中,您已经学习了Spark通过RDD和数据收集对象创建DataFrame。实时这些功能较少使用。在本节和以下各节中,您将学习如何从数据源(如CSV,文本,JSON,Avro等)创建DataFrame。

默认情况下,Spark提供了一个API以读取定界符文件,例如逗号,管道,制表符分隔的文件,并且还提供了几种处理标头,不包含标头,双引号,数据类型等的选项。

有关详细示例,请参阅从CSV文件创建DataFrame。

val df2 = spark.read.csv("/src/resources/file.csv")

4.从文本创建(TXT)文件

在这里,将看到如何从TXT文件创建。

val df2 = spark.read
.text("/src/resources/file.txt")

5.从JSON文件创建

在这里,将看到如何从JSON文件创建。

val df2 = spark.read
.json("/src/resources/file.json")

6.从XML文件创建

要通过解析XML创建DataFrame,我们应该使用”com.databricks.spark.xml”Databricks的DataSource spark-xml API。

val df = spark.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "person")
      .xml("src/main/resources/persons.xml")

7.从HDFS读取

val df =spark.read.format("tfrecords").load("hdfs://****/data/test_tf/all/2020-03-26/12")

7.从Hive创建

处理大数据大部分会从hive或者hdfs读取

val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql(“select * from emp”)

8. Spark从RDBMS数据库创建DataFrame

8.1)从Mysql表

确保您的pom.xml文件或类路径中的MySQL jar都具有MySQL库作为依赖项。

val df_mysql = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:mysql://localhost:port/db”)
   .option(“driver”, “com.mysql.jdbc.Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

8.2从DB2表

确保在pom.xml文件或类路径中的DB2 jar中将DB2库作为依赖项。

val df_db2 = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:db2://localhost:50000/dbname”)
   .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

同样,我们可以从大多数关系数据库中在Spark中创建DataFrame,在这里我没有介绍,我将把它留给您研究。

9.从HBase表创建DataFrame

要从HBase表创建Spark DataFrame,我们应该使用Spark HBase连接器中定义的DataSource 。例如,使用org.apache.spark.sql.execution.datasources.hbaseHortonworks的DataSource“ ”或使用org.apache.hadoop.hbase.sparkspark HBase连接器的“ ”。

    val hbaseDF = sparkSession.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

从HBase表生成DataFrame中解释的详细示例

10.其他来源(Avro,Parquet,Kafka)

我们还可以从Avro,Parquet,HBase创建DataFrame并从Kafka读取数据,这在下面的文章中已经进行了解释,我建议您在有空的时候阅读这些内容。

admin
版权声明:本站原创文章,由admin2021-02-26发表,共计4977字。
转载提示:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)