如何使用 JAVA 在 Spark DataFrame 上调用 UDF?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35348058/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I call a UDF on a Spark DataFrame using JAVA?
提问by Kai
Similar question as here, but don't have enough points to comment there.
According to the latest Spark documentationan udf
can be used in two different ways, one with SQL and another with a DataFrame. I found multiple examples of how to use an udf
with sql, but have not been able to find any on how to use a udf
directly on a DataFrame.
根据最新的 Spark文档,anudf
可以以两种不同的方式使用,一种使用 SQL,另一种使用 DataFrame。我找到了多个关于如何使用udf
with sql 的示例,但找不到任何关于如何udf
直接在 DataFrame 上使用 a的示例。
The solution provided by the o.p. on the question linked above uses __callUDF()__
which is _deprecated_
and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:
根据 Spark Java API 文档,上面链接的问题的 op 提供的解决方案使用了__callUDF()__
它,_deprecated_
并且将在 Spark 2.0 中删除它。在那里,它说:
"since it's redundant with udf()"
“因为它与 udf() 是多余的”
so this means I should be able to use __udf()__
to cal a my udf
, but I can't figure out how to do that. I have not stumbled on anything that spells out the syntax for Java-Spark programs. What am I missing?
所以这意味着我应该能够使用__udf()__
cal a my udf
,但我不知道如何做到这一点。我没有偶然发现任何说明 Java-Spark 程序语法的东西。我错过了什么?
import org.apache.spark.sql.api.java.UDF1;
.
.
UDF1 mode = new UDF1<String[], String>() {
public String call(final String[] types) throws Exception {
return types[0];
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
采纳答案by zero323
Spark >= 2.3
火花 >= 2.3
Scala-style udf
can be invoked directly:
Scala风格udf
可以直接调用:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
(Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();
Spark < 2.3
火花 < 2.3
Even if we assume that your UDF is useful and cannot be replaced by a simple getItem
call it has incorrect signature. Array columns are exposed using Scala WrappedArray
not plain Java Arrays so you have to adjust the signature:
即使我们假设您的 UDF 是有用的并且不能被简单的getItem
调用所取代,它也有不正确的签名。数组列是使用 ScalaWrappedArray
而非普通 Java 数组公开的,因此您必须调整签名:
UDF1 mode = new UDF1<Seq<String>, String>() {
public String call(final Seq<String> types) throws Exception {
return types.headOption();
}
};
If UDF is already registered:
如果 UDF 已经注册:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
you can simply use callUDF
(which is a new function introduced in 1.5) to call it by name:
您可以简单地使用callUDF
(这是 1.5 中引入的新函数)按名称调用它:
df.select(callUDF("mode", col("vs"))).show();
You can also use it in selectExprs
:
您还可以在selectExprs
以下情况下使用它:
df.selectExpr("mode(vs)").show();