scala Spark Build Custom Column Function,用户自定义函数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36546456/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:09:52  来源:igfitidea点击:

Spark Build Custom Column Function, user defined function

scalaapache-sparkapache-spark-sql

提问by other15

I'm using Scala and want to build my own DataFrame function. For example, I want to treat a column like an array , iterate through each element and make a calculation.

我正在使用 Scala 并想构建我自己的 DataFrame 函数。例如,我想将一列视为数组,遍历每个元素并进行计算。

To start off, I'm trying to implement my own getMax method. So column x would have the values [3,8,2,5,9], and the expected output of the method would be 9.

首先,我正在尝试实现我自己的 getMax 方法。因此,列 x 的值为 [3,8,2,5,9],该方法的预期输出将为 9。

Here is what it looks like in Scala

这是它在 Scala 中的样子

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

This is what I have so far, and get this error

这是我到目前为止所拥有的,并收到此错误

"value length is not a member of org.apache.spark.sql.column", 

and I don't know how else to iterate through the column.

而且我不知道如何遍历该列。

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

}

Once I am able to implement my own method, I will create a column function

一旦我能够实现我自己的方法,我将创建一个列函数

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)

And then I hope to be able to use this in a SQL statement, for example

然后我希望能够在SQL语句中使用它,例如

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

and the expected output would be 9, given input column [3,8,2,5,9]

给定输入列 [3,8,2,5,9],预期输出为 9

I am following an answer from another thread Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data framewhere they create a private method for standard deviation. The calculations I will do will be more complex than this, (e.g I will be comparing each element in the column) , am I going in the correct directions or should I be looking more into User Defined Functions?

我正在关注另一个线程Spark Scala - 如何迭代数据帧中的行,并将计算值添加为数据帧的新列,并在其中创建标准偏差的私有方法。我将进行的计算将比这更复杂(例如,我将比较列中的每个元素),我是朝着正确的方向前进还是应该更多地研究用户定义的函数?

回答by Daniel de Paula

In a Spark DataFrame, you can't iterate through the elements of a Column using the approaches you thought of because a Column is not an iterable object.

在 Spark DataFrame 中,您无法使用您想到的方法遍历 Column 的元素,因为 Column 不是可迭代对象。

However, to process the values of a column, you have some options and the right one depends on your task:

但是,要处理一列的值,您有一些选择,正确的选择取决于您的任务:

1) Using the existing built-in functions

1) 使用现有的内置函数

Spark SQL already has plenty of useful functions for processing columns, including aggregation and transformation functions. Most of them you can find in the functionspackage (documentation here). Some others (binary functions in general) you can find directly in the Columnobject (documentation here). So, if you can use them, it's usually the best option. Note:don't forget the Window Functions.

Spark SQL 已经有很多有用的处理列的函数,包括聚合和转换函数。其中大部分都可以在functions包中找到(此处为文档)。您可以直接在Column对象中找到其他一些(一般为二进制函数)(此处的文档)。因此,如果您可以使用它们,那通常是最好的选择。注意:不要忘记Window Functions

2) Creating an UDF

2)创建一个UDF

If you can't complete your task with the built-in functions, you may consider defining an UDF (User Defined Function). They are useful when you can process each item of a column independently and you expect to produce a new column with the same number of rows as the original one (not an aggregated column). This approach is quite simple: first, you define a simple function, then you register it as an UDF, then you use it. Example:

如果你不能用内置函数完成你的任务,你可以考虑定义一个UDF(用户定义函数)。当您可以独立处理列的每个项目并且您希望生成与原始列(不是聚合列)行数相同的新列时,它们很有用。这种方法非常简单:首先,定义一个简单的函数,然后将其注册为 UDF,然后使用它。例子:

def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))

For more information, here'sa nice article.

有关更多信息,这里有一篇不错的文章。

3) Using an UDAF

3) 使用 UDAF

If your task is to create aggregated data, you can define an UDAF (User Defined Aggregation Function). I don't have a lot of experience with this, but I can point you to a nice tutorial:

如果您的任务是创建聚合数据,您可以定义一个 UDAF(用户定义的聚合函数)。我在这方面没有很多经验,但我可以为您指出一个不错的教程:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) Fall back to RDD processing

4)回退到RDD处理

If you really can't use the options above, or if you processing task depends on different rows for processing one and it's not an aggregation, then I think you would have to select the column you want and process it using the corresponding RDD. Example:

如果你真的不能使用上面的选项,或者如果你处理任务依赖于不同的行来处理一个而不是聚合,那么我认为你必须选择你想要的列并使用相应的 RDD 处理它。例子:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD


So, there was the options I could think of. I hope it helps.

所以,有我能想到的选择。我希望它有帮助。

回答by Boern

An easy example is given in the excellent documentation, where a whole section is dedicated to UDFs:

优秀文档中给出了一个简单的示例,其中有一整节专门介绍 UDF:

import org.apache.spark.sql._

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
val spark = df.sparkSession
spark.udf.register("simpleUDF", (v: Int) => v * v)
df.select($"id", callUDF("simpleUDF", $"value"))