scala 更改 spark 数据框中列的可为空属性
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33193958/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Change nullable property of column in spark dataframe
提问by J Calbreath
I'm manually creating a dataframe for some testing. The code to create it is:
我正在手动创建一个数据框进行一些测试。创建它的代码是:
case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
.createDataFrame(List(input(1110,0,1001,-10.00),
input(1111,1,1001,10.00),
input(1111,0,1002,10.00)))
So the schema looks like this:
所以架构看起来像这样:
root
|-- id: long (nullable = false)
|-- var1: integer (nullable = false)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?
我想为这些变量中的每一个设置 'nullable = true'。如何从一开始就声明它或在创建后将其切换到新的数据帧中?
回答by Martin Senne
Answer
回答
With the imports
随着进口
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
you can use
您可以使用
/**
* Set nullable property of column.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either nullable or not
*/
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
directly.
直接地。
Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame?), such that you can call
您还可以通过“pimp my library”库模式使该方法可用(请参阅我的 SO 帖子在 DataFrame 上定义自定义方法的最佳方法是什么?),这样您就可以调用
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )
Edit
编辑
Alternative solution 1
替代解决方案 1
Use a slight modified version of setNullableStateOfColumn
使用稍微修改过的版本 setNullableStateOfColumn
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ? StructField( c, t, nullable = nullable, m)
})
// apply new schema
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
Alternative solution 2
替代解决方案 2
Explicitely define the schema. (Use reflection to create a solution that is more general)
明确定义架构。(使用反射创建更通用的解决方案)
configuredUnitTest("Stackoverflow.") { sparkContext =>
case class Input(id:Long, var1:Int, var2:Int, var3:Double)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
// use this to set the schema explicitly or
// use refelection on the case class member to construct the schema
val schema = StructType( Seq (
StructField( "id", LongType, true),
StructField( "var1", IntegerType, true),
StructField( "var2", IntegerType, true),
StructField( "var3", DoubleType, true)
))
val is: List[Input] = List(
Input(1110, 0, 1001,-10.00),
Input(1111, 1, 1001, 10.00),
Input(1111, 0, 1002, 10.00)
)
val rdd: RDD[Input] = sparkContext.parallelize( is )
val rowRDD: RDD[Row] = rdd.map( (i: Input) ? Row(i.id, i.var1, i.var2, i.var3))
val inputDF = sqlContext.createDataFrame( rowRDD, schema )
inputDF.printSchema
inputDF.show()
}
回答by Sidd Singal
This is a late answer, but wanted to give an alternative solution for people that come here. You can automatically make a DataFrameColumnnullable from the start by the following modification to your code:
这是一个迟到的答案,但想为来到这里的人提供替代解决方案。DataFrameColumn通过对代码进行以下修改,您可以从一开始就自动使一个可为空的:
case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
.createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
input(Some(1111),Some(1),1001,10.00),
input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema
This will yield:
这将产生:
root
|-- id: long (nullable = true)
|-- var1: integer (nullable = true)
|-- var2: integer (nullable = false)
|-- var3: double (nullable = false)
defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]
Essentially, if you declare a field as an Optionby using Some([element])or Noneas the actual inputs, then that field be nullable. Otherwise, the field will not be nullable. I hope this helps!
本质上,如果您Option通过使用Some([element])或None作为实际输入将字段声明为 an ,则该字段可为空。否则,该字段将不可为空。我希望这有帮助!
回答by Rayan Ral
Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:
另一种选择,如果您需要就地更改数据框,并且无法重新创建,您可以执行以下操作:
.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
Spark will then think that this column may contain null, and nullability will be set to true.
Also, you can use udf, to wrap your values in Option.
Works fine even for streaming cases.
Spark 然后会认为该列可能包含null,并且可空性将设置为true。此外,您可以使用udf, 将您的值包装在Option. 即使对于流媒体案例也能正常工作。
回答by matemaciek
More compact version of setting all columns nullable parameter
设置所有列可为空参数的更紧凑版本
Instead of case StructField( c, t, _, m) ? StructField( c, t, nullable = nullable, m)one can use _.copy(nullable = nullable). Then the whole function can be written as:
而不是case StructField( c, t, _, m) ? StructField( c, t, nullable = nullable, m)一个可以使用_.copy(nullable = nullable)。那么整个函数可以写成:
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
回答by echo
Just use java.lang.Integer instead of scala.Int in your case class.
只需在您的案例类中使用 java.lang.Integer 而不是 scala.Int 。
case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
回答by skotlov
Thanks Martin Senne. Just a little addition. In case of inner struct types, you may need to set nullablerecursively, like this:
谢谢马丁森。只是一点点补充。在内部结构类型的情况下,您可能需要递归设置nullable,如下所示:
def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
def set(st: StructType): StructType = {
StructType(st.map {
case StructField(name, dataType, _, metadata) =>
val newDataType = dataType match {
case t: StructType => set(t)
case _ => dataType
}
StructField(name, newDataType, nullable = nullable, metadata)
})
}
df.sqlContext.createDataFrame(df.rdd, set(df.schema))
}
回答by Devendra Singh
Is there a way to achieve the same with streaming dataframe. Looks like above method doesn't support streaming df. Here is the stacktrace -
有没有办法通过流数据帧实现相同的目标。看起来上面的方法不支持流式 df。这是堆栈跟踪 -
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
线程“main” org.apache.spark.sql.AnalysisException 中的异常:必须使用 writeStream.start(); 执行具有流源的查询;卡夫卡在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at this line - df.sqlContext.createDataFrame(df.rdd, newSchema)
在这一行 - df.sqlContext.createDataFrame(df.rdd, newSchema)

