scala 激发多个上下文
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32827333/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark multiple contexts
提问by GermainGum
In short:
简而言之:
EC2 cluster: 1 master 3 slaves
EC2集群:1主3从
Spark version : 1.3.1
火花版本:1.3.1
I wish to use the option spark.driver.allowMultipleContexts, one context local (master only) and one cluster (master and slaves).
我希望使用选项spark.driver.allowMultipleContexts,一个本地上下文(仅限主)和一个集群(主和从)。
I get this stacktrace error (line 29 is where I call the object that initialize the second sparkcontext) :
我收到此堆栈跟踪错误(第 29 行是我调用初始化第二个 sparkcontext 的对象的地方):
fr.entry.Main.main(Main.scala)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$$anonfun$apply.apply(SparkContext.scala:1812)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$$anonfun$apply.apply(SparkContext.scala:1808)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning.apply(SparkContext.scala:1808)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning.apply(SparkContext.scala:1795)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795)
at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754)
at fr.entry.cluster$.<init>(Main.scala:79)
at fr.entry.cluster$.<clinit>(Main.scala)
at fr.entry.Main$delayedInit$body.apply(Main.scala:29)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:71)
at scala.App$$anonfun$main.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at fr.entry.Main$.main(Main.scala:14)
at fr.entry.Main.main(Main.scala)
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false)
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29)
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List()
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List()
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB)
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29)
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
More details :
更多细节 :
I would like to run one program which does two things. Firstly I have a sparkContext local (on the master only), I make a RDD and do some operations. Secondly I have a second sparkContext initialize with a master and 3 slaves which also make a RDD and does some operations. So in the first case I want to use the 16 cores of the master and the second case I want to use the 8cores x 3 of the slaves.
我想运行一个程序来做两件事。首先,我有一个本地 sparkContext(仅在主服务器上),我创建了一个 RDD 并执行了一些操作。其次,我有第二个 sparkContext 初始化,其中包含一个主节点和 3 个从节点,它们也创建一个 RDD 并执行一些操作。因此,在第一种情况下,我想使用主设备的 16 个内核,而第二种情况下,我想使用从设备的 8 核 x 3。
Simple Example :
简单示例:
val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8))
println(local.sparkContext.makeRDD(arr).count())
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)
My two SparkContexts :
我的两个 SparkContexts :
object local {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster("local[16]")
.setAppName("Local")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar"))
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "45g")
val sparkContext = new SparkContext(sc)
}
object cluster {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster(masterURL) // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com
.setAppName("Cluster")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars)
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "35g")
val sparkContext = new SparkContext(sc)
}
How can I fix this?
我怎样才能解决这个问题?
回答by pzecevic
Although configuration option spark.driver.allowMultipleContexts exists, it is misleading because usage of multiple Spark contexts is discouraged. This option is used only for Spark internal tests and is not supposed to be used in user programs. You can get unexpected results while running more than one Spark context in a single JVM.
尽管配置选项 spark.driver.allowMultipleContexts 存在,但它具有误导性,因为不鼓励使用多个 Spark 上下文。此选项仅用于 Spark 内部测试,不应在用户程序中使用。在单个 JVM 中运行多个 Spark 上下文时,您可能会得到意想不到的结果。
回答by amey91
If coordination is required between 2 programs, then it would be better to make it part of a single Spark application to take advantage of Sparks internal optimizations and to avoid unnecessary i/o.
如果两个程序之间需要协调,那么最好将其作为单个 Spark 应用程序的一部分,以利用 Sparks 内部优化并避免不必要的 I/O。
Secondly, if 2 applications do not need to coordinate in any way, you can launch 2 separate applications. Since you are using Amazon EC2/EMR, you can use YARN as your resource manager without significant time investment as described here.
其次,如果 2 个应用程序不需要以任何方式协调,则可以启动 2 个单独的应用程序。由于您使用的Amazon EC2 / EMR,您可以用描述纱作为你的资源管理器,而不显著的时间投资在这里。
回答by Leonid
If you have a necessity to work with lots of Spark contexts, you can turn on special option [MultipleContexts] (1) , but it is used only for Spark internal tests and is not supposed to be used in user programs. You will get unexpected behavior while running more than one Spark context in a single JVM [SPARK-2243] (2). However, it is possible to create different contexts in separate JVMs, and manage contexts at SparkConf level, which will optimally fit the executable Jobs.
如果您需要处理大量 Spark 上下文,您可以打开特殊选项 [MultipleContexts] (1) ,但它仅用于 Spark 内部测试,不应在用户程序中使用。在单个 JVM 中运行多个 Spark 上下文时,您会遇到意外行为 [SPARK-2243] (2)。但是,可以在单独的 JVM 中创建不同的上下文,并在 SparkConf 级别管理上下文,这将最佳地适合可执行作业。
It looks like this: Mist creates every new Sparkcontext in its own JVM.
它看起来像这样: Mist 在它自己的 JVM 中创建每个新的 Sparkcontext。
There is a middleware on top of Spark - [Mist]. It manages Spark contexts and multiple JVMs, so you could have different jobs like ETL pipeline, a fast forecast job, an ad-hoc Hive query and a Spark streaming application running in parallel on the same cluster.
Spark 之上有一个中间件 - [Mist]。它管理 Spark 上下文和多个 JVM,因此您可以拥有不同的作业,例如 ETL 管道、快速预测作业、临时 Hive 查询和在同一集群上并行运行的 Spark 流应用程序。
1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67
1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67
2> issues.apache.org/jira/browse/SPARK-2243
2>issues.apache.org/jira/browse/SPARK-2243
回答by yazabara
Java:
爪哇:
.set("spark.driver.allowMultipleContexts", "true")
+
+
sparkContext.cancelAllJobs();
sparkContext.stop();
It works for me.
这个对我有用。

