scala 将 UDF 应用于 Spark Dataframe 中的多列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45189085/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:22:30  来源:igfitidea点击:

Apply UDF to multiple columns in Spark Dataframe

scalaapache-sparkuser-defined-functions

提问by Giridhar Karnik

I have a dataframe which looks like below

我有一个如下所示的数据框

| id| age|   rbc|  bgr| dm|cad|appet| pe|ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|  3|48.0|normal|117.0| no| no| poor|yes|yes|           ckd|
....
....
....

I have written an UDF to convert categorical yes, no, poor, normalinto binary 0sand 1s

我编写了一个 UDF 来将分类转换yes, no, poor, normal为二进制0s1s

def stringToBinary(stringValue: String): Int = {
    stringValue match {
        case "yes" => return 1
        case "no" => return 0
        case "present" => return 1
        case "notpresent" => return 0
        case "normal" => return 1
        case "abnormal" => return 0
    }
}

val stringToBinaryUDF = udf(stringToBinary _)

I am applying this to the dataframe as follows

我将其应用到数据框如下

val newCol = stringToBinaryUDF.apply(col("pc")) //creates the new column with formatted value
val refined1 = noZeroDF.withColumn("dm", newCol) //adds the new column to original

How can I pass multiple columns into the UDF so that I don't have to repeat myself for other categorical columns?

如何将多个列传递到 UDF 中,以便我不必为其他分类列重复自己?

回答by Ramesh Maharjan

udffunctions should not be the choice if you have sparkfunctions to do the same job as udffunctions would serialize and deserialize the column data.

udf如果您有spark函数执行与udf函数序列化和反序列化列数据相同的工作,则不应选择函数。

Given a dataframeas

给定一个dataframe作为

+---+----+------+-----+---+---+-----+---+---+--------------+
|id |age |rbc   |bgr  |dm |cad|appet|pe |ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|3  |48.0|normal|117.0|no |no |poor |yes|yes|ckd           |
+---+----+------+-----+---+---+-----+---+---+--------------+

You can achieve your requirement with whenfunction as

您可以通过以下when功能实现您的要求

import org.apache.spark.sql.functions._
def applyFunction(column : Column) = when(column === "yes" || column === "present" || column === "normal", lit(1))
  .otherwise(when(column === "no" || column === "notpresent" || column === "abnormal", lit(0)).otherwise(column))

df.withColumn("dm", applyFunction(col("dm")))
  .withColumn("cad", applyFunction(col("cad")))
  .withColumn("rbc", applyFunction(col("rbc")))
  .withColumn("pe", applyFunction(col("pe")))
  .withColumn("ane", applyFunction(col("ane")))
  .show(false)

The result is

结果是

+---+----+---+-----+---+---+-----+---+---+--------------+
|id |age |rbc|bgr  |dm |cad|appet|pe |ane|classification|
+---+----+---+-----+---+---+-----+---+---+--------------+
|3  |48.0|1  |117.0|0  |0  |poor |1  |1  |ckd           |
+---+----+---+-----+---+---+-----+---+---+--------------+

Now the question clearly says that you don't want to repeat the procedure for all the columns for that you can do the following

现在问题清楚地表明您不想为所有列重复该过程,您可以执行以下操作

val columnsTomap = df.select("rbc", "cad", "rbc", "pe", "ane").columns

var tempdf = df
columnsTomap.map(column => {
  tempdf = tempdf.withColumn(column, applyFunction(col(column)))
})

tempdf.show(false)

回答by Haroun Mohammedi

A UDFcan take many parameters i.e. many columns but it should return one result i.e. one column.

一个UDF可以有很多参数,即多列,但它应该返回一个结果,即一列。

In order to doing so, just add parameters to your stringToBinaryfunction and it's done.

为此,只需向您的stringToBinary函数添加参数即可。

It you want it to take two columns it will look like this :

如果您希望它采用两列,它将如下所示:

def stringToBinary(stringValue: String, secondValue: String): Int = {
stringValue match {
    case "yes" => return 1
    case "no" => return 0
    case "present" => return 1
    case "notpresent" => return 0
    case "normal" => return 1
    case "abnormal" => return 0
}
}
val stringToBinaryUDF = udf(stringToBinary _)

Hope this helps

希望这可以帮助

回答by Iraj Hedayati

You could also use foldLeftfunction. Having your UDFcalled stringToBinaryUDF:

你也可以使用foldLeft函数。让您的UDF调用stringToBinaryUDF

import org.apache.spark.sql.functions._

val categoricalColumns = Seq("rbc", "cad", "rbc", "pe", "ane")
val refinedDF = categoricalColumns
    .foldLeft(noZeroDF) { (accumulatorDF: DataFrame, columnName: String) =>
         accumulatorDF
            .withColumn(columnName, stringToBinaryUDF(col(columnName)))
     }

That will respect the immutability and functional programming.

这将尊重不变性和函数式编程。