scala 将 UDF 应用于 Spark Dataframe 中的多列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45189085/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Apply UDF to multiple columns in Spark Dataframe
提问by Giridhar Karnik
I have a dataframe which looks like below
我有一个如下所示的数据框
| id| age| rbc| bgr| dm|cad|appet| pe|ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
| 3|48.0|normal|117.0| no| no| poor|yes|yes| ckd|
....
....
....
I have written an UDF to convert categorical yes, no, poor, normalinto binary 0sand 1s
我编写了一个 UDF 来将分类转换yes, no, poor, normal为二进制0s和1s
def stringToBinary(stringValue: String): Int = {
stringValue match {
case "yes" => return 1
case "no" => return 0
case "present" => return 1
case "notpresent" => return 0
case "normal" => return 1
case "abnormal" => return 0
}
}
val stringToBinaryUDF = udf(stringToBinary _)
I am applying this to the dataframe as follows
我将其应用到数据框如下
val newCol = stringToBinaryUDF.apply(col("pc")) //creates the new column with formatted value
val refined1 = noZeroDF.withColumn("dm", newCol) //adds the new column to original
How can I pass multiple columns into the UDF so that I don't have to repeat myself for other categorical columns?
如何将多个列传递到 UDF 中,以便我不必为其他分类列重复自己?
回答by Ramesh Maharjan
udffunctions should not be the choice if you have sparkfunctions to do the same job as udffunctions would serialize and deserialize the column data.
udf如果您有spark函数执行与udf函数序列化和反序列化列数据相同的工作,则不应选择函数。
Given a dataframeas
给定一个dataframe作为
+---+----+------+-----+---+---+-----+---+---+--------------+
|id |age |rbc |bgr |dm |cad|appet|pe |ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|3 |48.0|normal|117.0|no |no |poor |yes|yes|ckd |
+---+----+------+-----+---+---+-----+---+---+--------------+
You can achieve your requirement with whenfunction as
您可以通过以下when功能实现您的要求
import org.apache.spark.sql.functions._
def applyFunction(column : Column) = when(column === "yes" || column === "present" || column === "normal", lit(1))
.otherwise(when(column === "no" || column === "notpresent" || column === "abnormal", lit(0)).otherwise(column))
df.withColumn("dm", applyFunction(col("dm")))
.withColumn("cad", applyFunction(col("cad")))
.withColumn("rbc", applyFunction(col("rbc")))
.withColumn("pe", applyFunction(col("pe")))
.withColumn("ane", applyFunction(col("ane")))
.show(false)
The result is
结果是
+---+----+---+-----+---+---+-----+---+---+--------------+
|id |age |rbc|bgr |dm |cad|appet|pe |ane|classification|
+---+----+---+-----+---+---+-----+---+---+--------------+
|3 |48.0|1 |117.0|0 |0 |poor |1 |1 |ckd |
+---+----+---+-----+---+---+-----+---+---+--------------+
Now the question clearly says that you don't want to repeat the procedure for all the columns for that you can do the following
现在问题清楚地表明您不想为所有列重复该过程,您可以执行以下操作
val columnsTomap = df.select("rbc", "cad", "rbc", "pe", "ane").columns
var tempdf = df
columnsTomap.map(column => {
tempdf = tempdf.withColumn(column, applyFunction(col(column)))
})
tempdf.show(false)
回答by Haroun Mohammedi
A UDFcan take many parameters i.e. many columns but it should return one result i.e. one column.
一个UDF可以有很多参数,即多列,但它应该返回一个结果,即一列。
In order to doing so, just add parameters to your stringToBinaryfunction and it's done.
为此,只需向您的stringToBinary函数添加参数即可。
It you want it to take two columns it will look like this :
如果您希望它采用两列,它将如下所示:
def stringToBinary(stringValue: String, secondValue: String): Int = {
stringValue match {
case "yes" => return 1
case "no" => return 0
case "present" => return 1
case "notpresent" => return 0
case "normal" => return 1
case "abnormal" => return 0
}
}
val stringToBinaryUDF = udf(stringToBinary _)
Hope this helps
希望这可以帮助
回答by Iraj Hedayati
You could also use foldLeftfunction. Having your UDFcalled stringToBinaryUDF:
你也可以使用foldLeft函数。让您的UDF调用stringToBinaryUDF:
import org.apache.spark.sql.functions._
val categoricalColumns = Seq("rbc", "cad", "rbc", "pe", "ane")
val refinedDF = categoricalColumns
.foldLeft(noZeroDF) { (accumulatorDF: DataFrame, columnName: String) =>
accumulatorDF
.withColumn(columnName, stringToBinaryUDF(col(columnName)))
}
That will respect the immutability and functional programming.
这将尊重不变性和函数式编程。

