Java 如何在 Spark 中实现自定义作业侦听器/跟踪器?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24463055/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to implement custom job listener/tracker in Spark?
提问by user3705662
I have a class like below, and when i run this through command line i want to see progress status. some thing like,
我有一个如下所示的类,当我通过命令行运行它时,我想查看进度状态。就像是,
10% completed...
30% completed...
100% completed...Job done!
I am using spark 1.0 on yarn and using Java API.
我在纱线上使用 spark 1.0 并使用 Java API。
public class MyJavaWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: MyJavaWordCount <master> <file>");
System.exit(1);
}
System.out.println("args[0]: <master>="+args[0]);
System.out.println("args[1]: <file>="+args[1]);
JavaSparkContext ctx = new JavaSparkContext(
args[0],
"MyJavaWordCount",
System.getenv("SPARK_HOME"),
System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// output input output
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
// output input
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
// K V input K V
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
// K V input
public Tuple2<String, Integer> call(String s) {
// K V
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2);
}
System.exit(0);
}
}
回答by Daniel Darabos
You should implement SparkListener
. Just override whatever events you are interested in (job/stage/task start/end events), then call sc.addSparkListener(myListener)
.
你应该实现SparkListener
. 只需覆盖您感兴趣的任何事件(作业/阶段/任务开始/结束事件),然后调用sc.addSparkListener(myListener)
.
It does not give you a straight-up percentage-based progress tracker, but at least you can track that progress is being made and its rough rate. The difficulty comes from how unpredictable the number of Spark stages can be, and also how the running times of each stage can be vastly different. The progress within a stage should be more predictable.
它不会为您提供直接的基于百分比的进度跟踪器,但至少您可以跟踪正在取得的进展及其粗略的速度。困难来自于 Spark 阶段的数量是多么不可预测,以及每个阶段的运行时间是如何有很大不同的。一个阶段内的进展应该更可预测。
回答by Gabber
If you are using scala-spark this code will help you to adding spark listener.
如果您使用的是 scala-spark,此代码将帮助您添加 spark 侦听器。
Create your SparkContext
创建你的 SparkContext
val sc=new SparkContext(sparkConf)
Now you can add your spark listener in spark context
现在您可以在 spark 上下文中添加您的 spark 侦听器
sc.addSparkListener(new SparkListener() {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
println("Spark ApplicationStart: " + applicationStart.appName);
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
println("Spark ApplicationEnd: " + applicationEnd.time);
}
});
Here isthe list of Interface for listening to events from the Spark schedule.
这是用于从 Spark 计划中侦听事件的接口列表。
回答by Ram Ghadiyaram
First thing is if you want track progress then you can consider spark.ui.showConsoleProgress
pls check @Yijie Shens answer(Spark output: log-style vs progress-style) for this..
首先是如果你想跟踪进度,那么你可以考虑spark.ui.showConsoleProgress
检查@Yijie Shens answer( Spark output: log-style vs progress-style) 为此..
I think no need to implement Spark listener for such thing. Unless you are very specific.
我认为不需要为这样的事情实现 Spark 监听器。除非你很具体。
Question : How to implement custom job listener/tracker in Spark?
问题:如何在 Spark 中实现自定义作业侦听器/跟踪器?
You can Use SparkListener and intercept SparkListener events.
您可以使用 SparkListener 并拦截 SparkListener 事件。
Classic example of this implementation with in Spark Framework it self is HeartBeatReceiver.
在 Spark 框架中,此实现的经典示例是 HeartBeatReceiver。
Example :HeartBeatReceiver.scala
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.addSparkListener(this) ...
Below are list of Listener events available. out of which application/job events should be useful for you
以下是可用的侦听器事件列表。其中应用程序/工作事件应该对您有用
SparkListenerApplicationStart
SparkListenerJobStart
SparkListenerStageSubmitted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerTaskEnd
SparkListenerStageCompleted
SparkListenerJobEnd
SparkListenerApplicationEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerBlockUpdated
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
SparkListener应用程序启动
SparkListenerJobStart
SparkListenerStage已提交
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerTaskEnd
SparkListener阶段完成
SparkListenerJobEnd
SparkListener应用程序结束
SparkListenerEnvironmentUpdate
已添加 SparkListenerBlockManager
SparkListenerBlockManagerRemoved
SparkListenerBlockUpdated
SparkListenerUnpersistRDD
已添加 SparkListenerExecutor
SparkListenerExecutorRemoved