如何使用 JAVA 在 Spark DataFrame 上调用 UDF?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35348058/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 16:34:48  来源:igfitidea点击:

How do I call a UDF on a Spark DataFrame using JAVA?

javaapache-sparkapache-spark-sqluser-defined-functions

提问by Kai

Similar question as here, but don't have enough points to comment there.

此处类似的问题,但没有足够的点数在此处发表评论。

According to the latest Spark documentationan udfcan be used in two different ways, one with SQL and another with a DataFrame. I found multiple examples of how to use an udfwith sql, but have not been able to find any on how to use a udfdirectly on a DataFrame.

根据最新的 Spark文档,anudf可以以两种不同的方式使用,一种使用 SQL,另一种使用 DataFrame。我找到了多个关于如何使用udfwith sql 的示例,但找不到任何关于如何udf直接在 DataFrame 上使用 a的示例。

The solution provided by the o.p. on the question linked above uses __callUDF()__which is _deprecated_and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:

根据 Spark Java API 文档,上面链接的问题的 op 提供的解决方案使用了__callUDF()__它,_deprecated_并且将在 Spark 2.0 中删除它。在那里,它说:

"since it's redundant with udf()"

“因为它与 udf() 是多余的”

so this means I should be able to use __udf()__to cal a my udf, but I can't figure out how to do that. I have not stumbled on anything that spells out the syntax for Java-Spark programs. What am I missing?

所以这意味着我应该能够使用__udf()__cal a my udf,但我不知道如何做到这一点。我没有偶然发现任何说明 Java-Spark 程序语法的东西。我错过了什么?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

采纳答案by zero323

Spark >= 2.3

火花 >= 2.3

Scala-style udfcan be invoked directly:

Scala风格udf可以直接调用:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

Spark < 2.3

火花 < 2.3

Even if we assume that your UDF is useful and cannot be replaced by a simple getItemcall it has incorrect signature. Array columns are exposed using Scala WrappedArraynot plain Java Arrays so you have to adjust the signature:

即使我们假设您的 UDF 是有用的并且不能被简单的getItem调用所取代,它也有不正确的签名。数组列是使用 ScalaWrappedArray而非普通 Java 数组公开的,因此您必须调整签名:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

If UDF is already registered:

如果 UDF 已经注册:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

you can simply use callUDF(which is a new function introduced in 1.5) to call it by name:

您可以简单地使用callUDF(这是 1.5 中引入的新函数)按名称调用它:

df.select(callUDF("mode", col("vs"))).show();

You can also use it in selectExprs:

您还可以在selectExprs以下情况下使用它:

df.selectExpr("mode(vs)").show();