Java 如何在 Spark 中实现自定义作业侦听器/跟踪器?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24463055/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 12:21:03  来源:igfitidea点击:

How to implement custom job listener/tracker in Spark?

javaapache-spark

提问by user3705662

I have a class like below, and when i run this through command line i want to see progress status. some thing like,

我有一个如下所示的类,当我通过命令行运行它时,我想查看进度状态。就像是,

10% completed... 
30% completed... 
100% completed...Job done!

I am using spark 1.0 on yarn and using Java API.

我在纱线上使用 spark 1.0 并使用 Java API。

public class MyJavaWordCount {
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: MyJavaWordCount <master> <file>");
            System.exit(1);
        }
        System.out.println("args[0]: <master>="+args[0]);
        System.out.println("args[1]: <file>="+args[1]);

        JavaSparkContext ctx = new JavaSparkContext(
                args[0],
                "MyJavaWordCount",
                System.getenv("SPARK_HOME"),
                System.getenv("SPARK_EXAMPLES_JAR"));
        JavaRDD<String> lines = ctx.textFile(args[1], 1);

//      output                                            input   output         
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            //              output       input 
            public Iterable<String> call(String s) {
                return Arrays.asList(s.split(" "));
            }
        });

//          K       V                                                input   K       V 
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            //            K       V             input 
            public Tuple2<String, Integer> call(String s) {
                //                K       V 
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2 tuple : output) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
        System.exit(0);
    }
}

回答by Daniel Darabos

You should implement SparkListener. Just override whatever events you are interested in (job/stage/task start/end events), then call sc.addSparkListener(myListener).

你应该实现SparkListener. 只需覆盖您感兴趣的任何事件(作业/阶段/任务开始/结束事件),然后调用sc.addSparkListener(myListener).

It does not give you a straight-up percentage-based progress tracker, but at least you can track that progress is being made and its rough rate. The difficulty comes from how unpredictable the number of Spark stages can be, and also how the running times of each stage can be vastly different. The progress within a stage should be more predictable.

它不会为您提供直接的基于百分比的进度跟踪器,但至少您可以跟踪正在取得的进展及其粗略的速度。困难来自于 Spark 阶段的数量是多么不可预测,以及每个阶段的运行时间是如何有很大不同的。一个阶段内的进展应该更可预测。

回答by Gabber

If you are using scala-spark this code will help you to adding spark listener.

如果您使用的是 scala-spark,此代码将帮助您添加 spark 侦听器。

Create your SparkContext

创建你的 SparkContext

val sc=new SparkContext(sparkConf) 

Now you can add your spark listener in spark context

现在您可以在 spark 上下文中添加您的 spark 侦听器

sc.addSparkListener(new SparkListener() {
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
    println("Spark ApplicationStart: " + applicationStart.appName);
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
    println("Spark ApplicationEnd: " + applicationEnd.time);
  }

});

Here isthe list of Interface for listening to events from the Spark schedule.

这是用于从 Spark 计划中侦听事件的接口列表。

回答by Ram Ghadiyaram

First thing is if you want track progress then you can consider spark.ui.showConsoleProgresspls check @Yijie Shens answer(Spark output: log-style vs progress-style) for this..

首先是如果你想跟踪进度,那么你可以考虑spark.ui.showConsoleProgress检查@Yijie Shens answer( Spark output: log-style vs progress-style) 为此..

I think no need to implement Spark listener for such thing. Unless you are very specific.

我认为不需要为这样的事情实现 Spark 监听器。除非你很具体。



Question : How to implement custom job listener/tracker in Spark?

问题:如何在 Spark 中实现自定义作业侦听器/跟踪器?

You can Use SparkListener and intercept SparkListener events.

您可以使用 SparkListener 并拦截 SparkListener 事件

Classic example of this implementation with in Spark Framework it self is HeartBeatReceiver.

在 Spark 框架中,此实现的经典示例是 HeartBeatReceiver。

Example :HeartBeatReceiver.scala

示例:HeartBeatReceiver.scala

/**
 * Lives in the driver to receive heartbeats from executors..
 */
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
  extends SparkListener with ThreadSafeRpcEndpoint with Logging {

  def this(sc: SparkContext) {
    this(sc, new SystemClock)
  }

  sc.addSparkListener(this) ...

Below are list of Listener events available. out of which application/job events should be useful for you

以下是可用的侦听器事件列表。其中应用程序/工作事件应该对您有用

  • SparkListenerApplicationStart

  • SparkListenerJobStart

  • SparkListenerStageSubmitted

  • SparkListenerTaskStart

  • SparkListenerTaskGettingResult

  • SparkListenerTaskEnd

  • SparkListenerStageCompleted

  • SparkListenerJobEnd

  • SparkListenerApplicationEnd

  • SparkListenerEnvironmentUpdate

  • SparkListenerBlockManagerAdded

  • SparkListenerBlockManagerRemoved

  • SparkListenerBlockUpdated

  • SparkListenerUnpersistRDD

  • SparkListenerExecutorAdded

  • SparkListenerExecutorRemoved

  • SparkListener应用程序启动

  • SparkListenerJobStart

  • SparkListenerStage已提交

  • SparkListenerTaskStart

  • SparkListenerTaskGettingResult

  • SparkListenerTaskEnd

  • SparkListener阶段完成

  • SparkListenerJobEnd

  • SparkListener应用程序结束

  • SparkListenerEnvironmentUpdate

  • 已添加 SparkListenerBlockManager

  • SparkListenerBlockManagerRemoved

  • SparkListenerBlockUpdated

  • SparkListenerUnpersistRDD

  • 已添加 SparkListenerExecutor

  • SparkListenerExecutorRemoved