scala 如何检查DataFrame的模式?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/52760911/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to check the schema of DataFrame?
提问by ScalaBoy
I have the DataFrame dfwith some data that is the result of the calculation process. Then I store this DataFrame in the database for further usage.
我有df一些数据的 DataFrame是计算过程的结果。然后我将此 DataFrame 存储在数据库中以供进一步使用。
For example:
例如:
val rowsRDD: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
val df = spark.createDataFrame(rowsRDD, schema)
I would need to check that all columns in the final DataFrame correspond to specific data types. Of course, one way is to create a DataFrame using schema (as an above example). However, in some cases the changes can be occasionally introduced to the data types during the calculation process - after the initial DataFrame was created (for example, when some formula applied to DataFrame was changed).
我需要检查最终 DataFrame 中的所有列是否对应于特定的数据类型。当然,一种方法是使用模式创建一个 DataFrame(如上例)。但是,在某些情况下,在计算过程中偶尔会向数据类型引入更改 - 在创建初始 DataFrame 之后(例如,当某些应用于 DataFrame 的公式发生更改时)。
Therefore, I want to double-check that the finalDataFrame corresponds to the initial schema. If it does not correspond, then I would like to apply the corresponding casting. Is there any way to do it?
因此,我想仔细检查最终的DataFrame 是否对应于初始架构。如果不对应,那么我想应用相应的铸造。有什么办法吗?
采纳答案by BlueSheepToken
You can get the schema of a dataframe with the schema method
您可以使用 schema 方法获取数据框的架构
df.schema
Define a castColumn method
定义 castColumn 方法
def castColumn(df: DataFrame, colName: String, randomDataType: DataType): DataFrame =
df.withColumn(colName, df.col(colName).cast(randomDataType))
Then apply this method to all the columns you need to cast.
然后将此方法应用于您需要转换的所有列。
First, get an Array of tuples with the colName and the targeted dataType
首先,获取一个带有 colName 和目标数据类型的元组数组
//Assume your dataframes have the same column names, you need to sortBy in case the it is not in the same order
// You can also iterate through dfOrigin.schema only and compare their dataTypes with target dataTypes instead of zipping
val differences = (dfOrigin.schema.fields.sortBy{case (x: StructField) => x.name} zip dfTarget.schema.fields.sortBy{case (x: StructField) => x.name}).collect {
case (origin: StructField, target: StructField) if origin.dataType != target.dataType =>
(origin.name, target.dataType)
}
Then
然后
differences.foldLeft(df) {
case (acc, value) => castColumn(acc, value._1, value._2)
}
回答by Tree DR
Based on Untyped Dataset Operations from https://spark.apache.org/docs/2.2.0/sql-programming-guide.html, it should be:
基于来自https://spark.apache.org/docs/2.2.0/sql-programming-guide.html 的无类型数据集操作,它应该是:
df.printSchema()
df.printSchema()
回答by Kuldip Puri Tejaswi
You can try
你可以试试
> df.printSchema
root
|-- id: string (nullable = true)
|-- val1: double (nullable = true)
|-- val2: double (nullable = true)
This prints the schema in a tree format.Hope this helps.
这会以树格式打印模式。希望这会有所帮助。
回答by Leo C
If I understand your requirement correctly, the following example illustrates how to revert a DataFrame with changed column types to its original version:
如果我正确理解您的要求,以下示例说明了如何将具有更改列类型的 DataFrame 恢复到其原始版本:
import org.apache.spark.sql.types._
val df1 = Seq(
(1, "a", 100L, 10.0), (2, "b", 200L, 20.0)
).toDF("c1", "c2", "c3", "c4")
val df2 = Seq(
(1, "a", 100, 10.0f), (2, "b", 200, 20.0f)
).toDF("c1", "c2", "c3", "c4")
df2.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: integer (nullable = false)
// |-- c4: float (nullable = false)
val fieldsDiffType = (df1.schema.fields zip df2.schema.fields).collect{
case (a: StructField, b: StructField) if a.dataType != b.dataType =>
(a.name, a.dataType)
}
// fieldsDiffType: Array[(String, org.apache.spark.sql.types.DataType)] =
// Array((c3,LongType), (c4,DoubleType))
val df2To1 = fieldsDiffType.foldLeft(df2)( (accDF, field) =>
accDF.withColumn(field._1, col(field._1).cast(field._2))
)
df2To1.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: long (nullable = false)
// |-- c4: double (nullable = false)
Note that this solution works only if the DataFrame columns remain the same in size and order, and does not cover types such as Array or Struct.
请注意,此解决方案仅在 DataFrame 列的大小和顺序保持不变时才有效,并且不涵盖 Array 或 Struct 等类型。
[UPDATE]
[更新]
If there is concern that column order might be changed, you can first order df1.schema.fieldsand df2.schema.fieldsbefore performing zip:
如果担心列顺序可能会改变,您可以先订购df1.schema.fields,df2.schema.fields然后再执行zip:
df1.schema.fields.sortBy(_.name) zip df2.schema.fields.sortBy(_.name)

