scala 并行化/避免火花中的 foreach 循环

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38069239/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:25:43  来源:igfitidea点击:

Parallelize / avoid foreach loop in spark

scalaapache-sparkforeachdataframe

提问by johntechendso

I wrote a class that gets a DataFrame, does some calculations on it and can export the results. The Dataframes are generated by a List of Keys. I know that i am doing this in a very unefficient way right now:

我编写了一个获取 DataFrame 的类,对其进行一些计算并可以导出结果。数据帧由键列表生成。我知道我现在正在以一种非常低效的方式这样做:

var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()                               // writes the Results into another Dataframe that is saved to HDFS
}

I think the foreach on the Scala list is not parallel, so how can i avoid using foreach here? The calculation the DataFrames could happen in parallel, as results of the calculations are NOT input for the next DataFrame - how can i implement this?

我认为 Scala 列表上的 foreach 不是并行的,所以我如何避免在这里使用 foreach?DataFrames 的计算可以并行发生,因为计算结果不是下一个 DataFrame 的输入 - 我该如何实现?

Thank you so much!!

太感谢了!!

__edit:

__编辑:

what i tried to do:

我试图做的:

val l = List(34, 32, 132, 352)      // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
    DataContainer.getDataFrame(i)::l        //append DataFrame to List of Dataframes
}

val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
    val x = new MyClass(data)
)

but gives

但给

Invalid tree; null:
null

edit 2: Okay, i don′t get how evrything works under the hood....

编辑 2:好的,我不明白 evrything 在引擎盖下是如何工作的....

1) Everything works fine when i execute this in spark-shell

1)当我在 spark-shell 中执行此操作时一切正常

spark-shell –driver-memory 10g       
//...
var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass     with new Object
    x.calcSomething()
}

2) Error, when i start the same with

2)错误,当我开始时

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g  
// same code as above
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@7b600fed rejected from java.util.concurrent.ThreadPoolExecutor@1431127[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at scala.concurrent.impl.ExecutionContextImpl$$anon.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3) when i try to parallelize it, i get a error, too

3)当我尝试并行化它时,我也收到错误

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$$anonfun$collect.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$$anonfun$collect.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:104)
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
   org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun.apply(QueryPlanner.scala:58)
scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)

There are actually more than 10 executors, but 4 nodes. I never configure the spark-context. It′s already given on startup.

executor其实有10多个,但是有4个节点。我从不配置火花上下文。它已经在启动时给出。

回答by Sachin Tyagi

You can use scala's parallel collectionsto achieve foreachparallelism on the driver side.

您可以使用 scala 的并行集合来实现foreach驱动程序端的并行性。

val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}

*However, a word of caution: is your cluster capable of running jobs parallely? You may submit the jobs to your spark cluster parallely but they may end up getting queued on the cluster and get executed sequentially.

*但是,请注意:您的集群是否能够并行运行作业?您可以将作业并行提交到 Spark 集群,但它们最终可能会在集群上排队并按顺序执行。

回答by dansuzuki

You can use scala's Future and Spark Fair Scheduling, e.g.

您可以使用 Scala 的 Future 和 Spark Fair Scheduling,例如

import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object YourApp extends App { 
  val sc = ... // SparkContext, be sure to set spark.scheduler.mode=FAIR
  var pool = 0
  // this is to have different pools per job, you can wrap it to limit the no. of pools
  def poolId = {
    pool = pool + 1
    pool
  }
  def runner(i: Int) = Future {
    sc.setLocalProperty("spark.scheduler.pool", poolId)
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()
  }

  val l = List(34, 32, 132, 352)      // Scala List
  val futures = l map(i => runner(i))

  // now you need to wait all your futures to be completed
  futures foreach(f => Await.ready(f, Duration.Inf))

}

With FairScheduler and different pools, each concurrent job will have a fair share of the spark cluster resources.

使用 FairScheduler 和不同的池,每个并发作业将公平共享 Spark 集群资源。

Some reference regarding scala's future here. You might need to add necessary callbacks on completion, success, and/or failures.

关于Scala的未来有一定的参考这里。您可能需要在完成、成功和/或失败时添加必要的回调。

回答by Med Zamrik

I did this using something like using List.par.foreach{object => print(object)}. I am using Zeppelin on Spark 2.3. I have a similar use case where I need to get the data day by day, and process it separately. This cannot be done using a whole month data because of some join conditions on the tables I'm using. Here is a sample of my code:

我使用类似using List.par.foreach{object => print(object)}. 我在 Spark 2.3 上使用 Zeppelin。我有一个类似的用例,我需要每天获取数据,然后单独处理。由于我正在使用的表上的某些连接条件,因此无法使用整月数据完成此操作。这是我的代码示例:

import java.time.LocalDate
import java.sql.Date

var start =  LocalDate.of(2019, 1, 1)
val end   =  LocalDate.of(2019, 2, 1)
var list : List[LocalDate] = List()

var usersDf = spark.read.load("s3://production/users/")
usersDf.createOrReplaceTempView("usersDf")

while (start.isBefore(end)){
    list = start :: list
    start = start.plusDays(1)
}

list.par.foreach{ loopDate =>
    //println(start)
    var yesterday = loopDate.plusDays(-1)
    var tomorrow = loopDate.plusDays(1)
    var lastDay = yesterday.getDayOfMonth()
    var lastMonth = yesterday.getMonthValue()
    var lastYear = yesterday.getYear()

    var day = loopDate.getDayOfMonth()
    var month = loopDate.getMonthValue()
    var year = loopDate.getYear()
    var dateDay = loopDate

    var condition: String = ""
    if (month == lastMonth) {
        condition = s"where year = $year and month = $month and day in ($day, $lastDay)"
    } else {
        condition = s"""where ((year = $year and month = $month and day = $day) or
        (year = $lastYear and month = $lastMonth and day = $lastDay)) 
        """
    }

    //Get events in local timezone
    var aggPbDf = spark.sql(s"""
            with users as (
            select * from users
            where account_creation_date < '$tomorrow'
        )
        , cte as (
            select e.* date(from_utc_timestamp(to_timestamp(concat(e.year,'-', e.month, '-', e.day, ' ', e.hour), 'yyyy-MM-dd HH'), coalesce(u.timezone_name, 'UTC'))) as local_date
            from events.user_events e
            left join users u
            on u.account_id = e.account_id
            $condition)
        select * from cte
        where local_date = '$dateDay'
    """
    )
    aggPbDf.write.mode("overwrite")
        .format("parquet")
        .save(s"s3://prod-bucket/events/local-timezone/date_day=$dateDay")
}

This will get the data for two days, process it, then write out only the desired output. Running this without parwill take about 15 minutes per day, but with parit took 1 hour for the whole month. This will also depend on what your spark cluster can support and the size of your data.

这将获取两天的数据,对其进行处理,然后只写出所需的输出。不par使用par它每天大约需要 15 分钟,但使用它整个月需要 1 小时。这也取决于您的 Spark 集群可以支持什么以及您的数据大小。