从任务调用 Java/Scala 函数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31684842/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:24:40  来源:igfitidea点击:

Calling Java/Scala function from a task

pythonscalaapache-sparkpysparkapache-spark-mllib

提问by zero323

Background

背景

My original question here was Why using DecisionTreeModel.predictinside map function raises an exception?and is related to How to generate tuples of (original lable, predicted label) on Spark with MLlib?

我原来的问题是为什么使用DecisionTreeModel.predict内部地图函数会引发异常?并且与如何使用 MLlib 在 Spark 上生成(原始标签、预测标签)的元组有关?

When we use Scala API a recommended wayof getting predictions for RDD[LabeledPoint]using DecisionTreeModelis to simply map over RDD:

当我们使用 Scala API 时,获得使用预测的推荐方法是简单地映射:RDD[LabeledPoint]DecisionTreeModelRDD

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Unfortunately similar approach in PySpark doesn't work so well:

不幸的是,PySpark 中的类似方法效果不佳:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

例外:您似乎试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,不能在它运行在工作线程上的代码中使用。有关更多信息,请参阅SPARK-5063

Instead of that official documentationrecommends something like this:

而不是官方文档推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

So what is going on here? There is no broadcast variable here and Scala APIdefines predictas follows:

那么这里发生了什么?这里没有广播变量,Scala API定义predict如下:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

so at least at the first glance calling from action or transformation is not a problem since prediction seems to be a local operation.

所以至少乍一看,从动作或转换中调用不是问题,因为预测似乎是一个本地操作。

Explanation

解释

After some digging I figured out that the source of the problem is a JavaModelWrapper.callmethod invoked from DecisionTreeModel.predict. It accessSparkContextwhich is required to call Java function:

经过一番挖掘,我发现问题的根源是JavaModelWrapper.callDecisionTreeModel.predict调用的方法。它访问SparkContext调用Java函数所需的:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Question

问题

In case of DecisionTreeModel.predictthere is a recommended workaround and all the required code is already a part of the Scala API but is there any elegant way to handle problem like this in general?

如果DecisionTreeModel.predict有推荐的解决方法并且所有必需的代码已经是 Scala API 的一部分,但是有没有什么优雅的方法来处理这样的问题?

Only solutions I can think of right now are rather heavyweight:

我现在能想到的唯一解决方案是相当重量级的:

  • pushing everything down to JVM either by extending Spark classes through Implicit Conversions or adding some kind of wrappers
  • using Py4j gateway directly
  • 通过隐式转换扩展 Spark 类或添加某种包装器,将所有内容都推送到 JVM
  • 直接使用 Py4j 网关

回答by zero323

Communication using default Py4J gateway is simply not possible. To understand why we have to take a look at the following diagram from the PySpark Internals document [1]:

使用默认的 Py4J 网关进行通信是不可能的。要了解为什么我们必须查看 PySpark Internals 文档 [1] 中的下图:

enter image description here

在此处输入图片说明

Since Py4J gateway runs on the driver it is not accessible to Python interpreters which communicate with JVM workers through sockets (See for example PythonRDD/ rdd.py).

由于 Py4J 网关在驱动程序上运行,因此通过套接字与 JVM 工作线程通信的 Python 解释器无法访问(参见示例PythonRDD/ rdd.py)。

Theoretically it could be possible to create a separate Py4J gateway for each worker but in practice it is unlikely to be useful. Ignoring issues like reliability Py4J is simply not designed to perform data intensive tasks.

理论上可以为每个 worker 创建一个单独的 Py4J 网关,但实际上它不太可能有用。忽略可靠性等问题 Py4J 根本不是为了执行数据密集型任务而设计的。

Are there any workarounds?

有什么解决方法吗?

  1. Using Spark SQL Data Sources APIto wrap JVM code.

    Pros: Supported, high level, doesn't require access to the internal PySpark API

    Cons: Relatively verbose and not very well documented, limited mostly to the input data

  2. Operating on DataFrames using Scala UDFs.

    Pros: Easy to implement (see Spark: How to map Python with Scala or Java User Defined Functions?), no data conversion between Python and Scala if data is already stored in a DataFrame, minimal access to Py4J

    Cons: Requires access to Py4J gateway and internal methods, limited to Spark SQL, hard to debug, not supported

  3. Creating high level Scala interface in a similar way how it is done in MLlib.

    Pros: Flexible, ability to execute arbitrary complex code. It can be don either directly on RDD (see for example MLlib model wrappers) or with DataFrames(see How to use a Scala class inside Pyspark). The latter solution seems to be much more friendly since all ser-de details are already handled by existing API.

    Cons: Low level, required data conversion, same as UDFs requires access to Py4J and internal API, not supported

    Some basic examples can be found in Transforming PySpark RDD with Scala

  4. Using external workflow management tool to switch between Python and Scala / Java jobs and passing data to a DFS.

    Pros: Easy to implement, minimal changes to the code itself

    Cons: Cost of reading / writing data (Alluxio?)

  5. Using shared SQLContext(see for example Apache Zeppelinor Livy) to pass data between guest languages using registered temporary tables.

    Pros: Well suited for interactive analysis

    Cons: Not so much for batch jobs (Zeppelin) or may require additional orchestration (Livy)

  1. 使用Spark SQL 数据源 API来包装 JVM 代码。

    优点:受支持,高级别的,不需要访问内部 PySpark API

    缺点:相对冗长且没有很好的文档记录,主要限于输入数据

  2. 使用 Scala UDF 对 DataFrame 进行操作。

    优点:易于实现(参见Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?),如果数据已经存储在 DataFrame 中,则 Python 和 Scala 之间没有数据转换,对 Py4J 的访问最少

    缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持

  3. 以类似于在 MLlib 中完成的方式创建高级 Scala 接口。

    优点:灵活,能够执行任意复杂的代码。它可以直接在 RDD 上使用(参见例如MLlib 模型包装器)或使用DataFrames(参见如何在 Pyspark 中使用 Scala 类)。后一种解决方案似乎更加友好,因为所有 ser-de 细节都已由现有 API 处理。

    缺点:低级,需要数据转换,与UDFs一样需要访问Py4J和内部API,不支持

    可以在Transforming PySpark RDD with Scala 中找到一些基本示例

  4. 使用外部工作流管理工具在 Python 和 Scala/Java 作业之间切换并将数据传递到 DFS。

    优点:易于实现,对代码本身的改动最小

    缺点:读/写数据的成本(Alluxio?)

  5. 使用共享SQLContext(参见例如Apache ZeppelinLivy)使用注册的临时表在来宾语言之间传递数据。

    优点:非常适合交互式分析

    缺点:对于批处理作业(Zeppelin)来说不是很多,或者可能需要额外的编排(Livy)



  1. Joshua Rosen. (2014, August 04) PySpark Internals. Retrieved from https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
  1. 约书亚·罗森。(2014 年 8 月 4 日)PySpark 内部结构。取自https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals