java Spark Launcher 无限等待工作完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31754328/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Launcher waiting for job completion infinitely
提问by TomaszGuzialek
I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example:
我正在尝试从 Java 代码将带有 Spark 作业的 JAR 提交到 YARN 集群中。我正在使用 SparkLauncher 提交 SparkPi 示例:
Process spark = new SparkLauncher()
.setAppResource("C:\spark-1.4.1-bin-hadoop2.6\lib\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi")
.setMaster("yarn-cluster")
.launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
There are two problems:
有两个问题:
- While submitting in "yarn-cluster" mode, the application is sucessfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and pi is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing "Waiting to finish..." The log of the container can be found here
- While submitting in "yarn-client" mode, the application does not appear in YARN UI and the submitting application hangs at "Waiting to finish..." When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (pi is not printed out). The log of the container can be found here
- 在“yarn-cluster”模式下提交时,应用程序成功提交到 YARN 并成功执行(它在 YARN UI 中可见,报告为 SUCCESS,输出中打印 pi)。但是,提交的应用程序永远不会收到处理完成的通知——它在打印“等待完成...”后无限挂起 容器的日志可以在这里找到
- 在“yarn-client”模式下提交时,应用程序没有出现在YARN UI中,提交的应用程序挂在“等待完成...”挂代码被杀死时,应用程序出现在YARN UI中,并报告为SUCCESS,但输出为空(pi 没有打印出来)。容器的日志可以在这里找到
I tried to execute the submitting application both with Oracle Java 7 and 8.
我尝试使用 Oracle Java 7 和 8 执行提交应用程序。
回答by TomaszGuzialek
I got help in the Spark mailing list. The key is to read / clear getInputStream and getErrorStream() on the Process. The child process might fill up the buffer and cause a deadlock - see Oracle docs regarding Process. The streams should be read in separate threads:
我在 Spark 邮件列表中得到了帮助。关键是读取/清除进程上的 getInputStream 和 getErrorStream()。子进程可能会填满缓冲区并导致死锁 - 请参阅有关 Process 的 Oracle 文档。流应该在单独的线程中读取:
Process spark = new SparkLauncher()
.setSparkHome("C:\spark-1.4.1-bin-hadoop2.6")
.setAppResource("C:\spark-1.4.1-bin-hadoop2.6\lib\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
where InputStreamReaderRunnable class is:
其中 InputStreamReaderRunnable 类是:
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
回答by Abdulrahman
Since this is an old post, i would like to add an update that might help whom ever read this post after. In spark 1.6.0 there are some added functions in SparkLauncher class. Which is:
由于这是一篇旧帖子,我想添加一个更新,可能会对以后阅读此帖子的人有所帮助。在 spark 1.6.0 中,SparkLauncher 类中添加了一些功能。这是:
def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher
You can run the application with out the need for additional threads for the stdout and stderr handling plush there is a nice status reporting of the application running. Use this code:
您可以运行应用程序而不需要额外的线程来处理 stdout 和 stderr 处理毛绒有一个很好的应用程序运行状态报告。使用此代码:
val env = Map(
"HADOOP_CONF_DIR" -> hadoopConfDir,
"YARN_CONF_DIR" -> yarnConfDir
)
val handler = new SparkLauncher(env.asJava)
.setSparkHome(sparkHome)
.setAppResource("Jar/location/.jar")
.setMainClass("path.to.the.main.class")
.setMaster("yarn-client")
.setConf("spark.app.id", "AppID if you have one")
.setConf("spark.driver.memory", "8g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "2g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "32")
.setConf("spark.default.parallelism", "100")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true)
.startApplication()
println(handle.getAppId)
println(handle.getState)
You can keep enquering the state if the spark application until it give success. For information about how the Spark Launcher server works in 1.6.0. see this link: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
如果 spark 应用程序成功,您可以继续查询状态。有关 Spark Launcher 服务器如何在 1.6.0 中工作的信息。请参阅此链接:https: //github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
回答by Elkhan Dadashov
I implemented using CountDownLatch, and it works as expected. This is for SparkLauncher version 2.0.1 and it works in Yarn-cluster mode too.
我使用 CountDownLatch 实现,它按预期工作。这适用于 SparkLauncher 2.0.1 版,它也适用于 Yarn-cluster 模式。
...
final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();
long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);
...
private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
private static final Log log = LogFactory.getLog(SparkAppListener.class);
private final CountDownLatch countDownLatch;
public SparkAppListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void stateChanged(SparkAppHandle handle) {
String sparkAppId = handle.getAppId();
State appState = handle.getState();
if (sparkAppId != null) {
log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
+ SPARK_STATE_MSG.get(appState));
} else {
log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
}
if (appState != null && appState.isFinal()) {
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {}
@Override
public void run() {}
}