scala 在 Spark 中,在所有工作人员上拥有静态对象的正确方法是什么?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26369916/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
In Spark, what is the right way to have a static object on all workers?
提问by Daniel Langdon
I've been looking at the documentation for Spark and it mentions this:
我一直在查看 Spark 的文档,它提到了这一点:
Spark's API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:
Anonymous function syntax, which can be used for short pieces of code. Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:
Spark 的 API 严重依赖于驱动程序中传递的函数来在集群上运行。有两种推荐的方法可以做到这一点:
匿名函数语法,可用于短代码。全局单例对象中的静态方法。比如你可以定义对象MyFunctions,然后传递MyFunctions.func1,如下:
object MyFunctions { def func1(s: String): String = { ... } }
myRdd.map(MyFunctions.func1)
Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:
请注意,虽然也可以在类实例中传递对方法的引用(与单例对象相反),但这需要将包含该类的对象与该方法一起发送。例如,考虑:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
Here, if we create a new MyClass and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing
rdd.map(x => this.func1(x)).
在这里,如果我们创建一个新的 MyClass 并在其上调用 doStuff,则其中的映射引用了该 MyClass 实例的 func1 方法,因此需要将整个对象发送到集群。它类似于写作
rdd.map(x => this.func1(x))。
Now my doubt is what happens if you have attributes on the singleton object (which are supposed to be equivalent to static). Same example with a small alteration:
现在我的疑问是,如果您在单例对象上有属性(应该等价于静态)会发生什么。有一个小的改动的同一个例子:
object MyClass {
val value = 1
def func1(s: String): String = { s + value }
}
myRdd.map(MyClass.func1)
So the function is still referenced statically, but how far does Spark goes by trying to serialize all referenced variables? Will it serialize valueor will it be initialized again in the remote workers?
所以该函数仍然是静态引用的,但是 Spark 尝试序列化所有引用的变量在多大程度上做到了呢?它会序列化value还是会在远程工作者中再次初始化?
Additionally, this is all in the context that I have some heavy models inside a singleton object and I would like to find the correct way to serialize them to workers while keeping the ability to reference them from the singleton everywhere, instead of passing them around as function parameters across a pretty deep function call stack.
此外,这一切都是在我在单例对象中有一些重模型的情况下,我想找到正确的方法将它们序列化为工作人员,同时保持从任何地方的单例引用它们的能力,而不是将它们作为跨相当深的函数调用堆栈的函数参数。
Any in-depth information on what/how/when does Spark serialize things would be appreciated.
任何有关 Spark 序列化事物的内容/方式/时间的深入信息都将不胜感激。
采纳答案by vanza
This is less a question about Spark and more of a question of how Scala generates code. Remember that a Scala objectis pretty much a Java class full of static methods. Consider a simple example like this:
这与其说是关于 Spark 的问题,不如说是关于 Scala 如何生成代码的问题。请记住,Scalaobject几乎是一个充满静态方法的 Java 类。考虑这样一个简单的例子:
object foo {
val value = 42
def func(i: Int): Int = i + value
def main(args: Array[String]): Unit = {
println(Seq(1, 2, 3).map(func).sum)
}
}
That will be translated to 3 Java classes; one of them will be the closure that is a parameter to the mapmethod. Using javapon that class yields something like this:
这将被转换为 3 个 Java 类;其中之一是作为map方法参数的闭包。javap在该类上使用会产生如下结果:
public final class foo$$anonfun$main extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
public static final long serialVersionUID;
public final int apply(int);
public int apply$mcII$sp(int);
public final java.lang.Object apply(java.lang.Object);
public foo$$anonfun$main();
}
Note there are no fields or anything. If you look at the disassembled bytecode, all it does is call the func()method. When running in Spark, this is the instance that will get serialized; since it has no fields, there's not much to be serialized.
请注意,没有字段或任何内容。如果您查看反汇编的字节码,它所做的就是调用该func()方法。在 Spark 中运行时,这是将被序列化的实例;因为它没有字段,所以没有太多要序列化的内容。
As for your question, how to initialize static objects, you can have an idempotent initialization function that you call at the start of your closures. The first one will trigger initialization, the subsequent calls will be no-ops. Cleanup, though, is a lot trickier, since I'm not familiar with an API that does something like "run this code on all executors".
至于你的问题,如何初始化静态对象,你可以有一个幂等的初始化函数,你可以在闭包开始时调用它。第一个将触发初始化,后续调用将是空操作。但是,清理要棘手得多,因为我不熟悉执行诸如“在所有执行程序上运行此代码”之类的 API。
One approach that can be useful if you need cleanup is explained in this blog, in the "setup() and cleanup()" section.
本博客的“setup() 和 cleanup()”部分解释了一种在需要清理时很有用的方法。
EDIT: just for clarification, here's the disassembly of the method that actually makes the call.
编辑:只是为了澄清,这里是实际进行调用的方法的反汇编。
public int apply$mcII$sp(int);
Code:
0: getstatic #29; //Field foo$.MODULE$:Lfoo$;
3: iload_1
4: invokevirtual #32; //Method foo$.func:(I)I
7: ireturn
See how it just references the static field holding the singleton and calls the func()method.
看看它如何只引用持有单例的静态字段并调用该func()方法。

