java 如何使用单个 Spark 上下文在 Apache Spark 中运行并发作业(操作)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28712420/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 14:02:52  来源:igfitidea点击:

How to run concurrent jobs(actions) in Apache Spark using single spark context

javaconcurrencyapache-spark

提问by Sporty


It says in Apache Spark documentation "within each Spark application, multiple “jobs” (Spark actions) may be running concurrently if they were submitted by different threads". Can someone explain how to achieve this concurrency for the following sample code?


它在 Apache Spark 文档中说“在每个 Spark 应用程序中,如果多个“作业”(Spark 操作)由不同的线程提交,它们可能会同时运行”。有人可以解释如何为以下示例代码实现这种并发性吗?

    SparkConf conf = new SparkConf().setAppName("Simple_App");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
    JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");

    System.out.println(file1.count());
    System.out.println(file2.count());

These two jobs are independent and must run concurrently.
Thank You.

这两个作业是独立的,必须同时运行。
谢谢。

回答by G Quintana

Try something like this:

尝试这样的事情:

    final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App");
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    // Start thread 1
    Future<Long> future1 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
            return file1.count();
        }
    });
    // Start thread 2
    Future<Long> future2 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
            return file2.count();
        }
    });
    // Wait thread 1
    System.out.println("File1:"+future1.get());
    // Wait thread 2
    System.out.println("File2:"+future2.get());

回答by Tagar

Using scala parallel collections feature

使用 Scala 并行集合功能

Range(0,10).par.foreach {
  project_id => 
      {
        spark.table("store_sales").selectExpr(project_id+" as project_id", "count(*) as cnt")
        .write
        .saveAsTable(s"counts_$project_id")
    }
}

PS. Above would launch up to 10 parallel Spark jobs but it could be less depending on number of available cores on Spark Driver. Above method by GQ using Futures is more flexible in this regard.

附注。以上将启动多达 10 个并行 Spark 作业,但可能会更少,具体取决于 Spark Driver 上可用内核的数量。GQ使用Futures的上述方法在这方面更加灵活。