scala 在 Spark Dataframe 中使用函数创建新列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30219592/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:09:06  来源:igfitidea点击:

Create new column with function in Spark Dataframe

scalaapache-sparkdataframe

提问by J Calbreath

I'm trying to figure out the new dataframe API in Spark. seems like a good step forward but having trouble doing something that should be pretty simple. I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a functiin something like this:

我正在尝试找出 Spark 中的新数据帧 API。似乎向前迈出了一大步,但在做一些应该很简单的事情时遇到了麻烦。我有一个包含 2 列“ID”和“Amount”的数据框。作为一个通用示例,假设我想返回一个名为“code”的新列,该列返回基于“Amt”值的代码。我可以写一个类似这样的函数:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

当我尝试像这样使用它时:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

I get type mismatch errors

我收到类型不匹配错误

found   : org.apache.spark.sql.Column
required: Integer

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting wrrors witht he function compiling because it wants a boolean in the if statement.

我已经尝试将我的函数的输入类型更改为 org.apache.spark.sql.Column 但我随后开始在函数编译时出现错误,因为它在 if 语句中需要一个布尔值。

Am I doing this wrong? Is there a better/another way to do this than using withColumn?

我这样做错了吗?有没有比使用 withColumn 更好/另一种方法来做到这一点?

Thanks for your help.

谢谢你的帮助。

回答by yjshen

Let's say you have "Amt" column in your Schema:

假设您的架构中有“Amt”列:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

I think withColumn is the right way to add a column

我认为 withColumn 是添加列的正确方法

回答by Ramesh Maharjan

We should avoid defining udffunctions as much as possible due to its overhead of serializationand deserializationof columns.

udf由于列的开销serializationdeserialization列的开销,我们应该尽可能避免定义函数。

You can achieve the solution with simple whenspark function as below

您可以使用简单的when火花功能实现解决方案,如下所示

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))

回答by imran

Another way of doing this: You can create any function but according to the above error, you should define function as a variable

另一种方法:您可以创建任何函数,但根据上述错误,您应该将函数定义为变量

Example:

例子:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

Now this statement works perfectly:

现在这个语句完美运行:

myDF.withColumn("Code", coder(myDF("Amt")))