scala 为什么我的 Spark 工作中有这么多任务?默认获取 200 个任务
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37758647/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Why so many tasks in my spark job? Getting 200 Tasks By Default
提问by uh_big_mike_boi
I have a spark job that takes a file with 8 records from hdfs, does a simple aggregation and saves it back to hdfs. I notice there are like hundreds of tasks when I do this.
我有一个 spark 作业,它从 hdfs 获取一个包含 8 条记录的文件,进行简单的聚合并将其保存回 hdfs。当我这样做时,我注意到有数百个任务。
I also am not sure why there are multiple jobs for this? I thought a job was more like when an action happened. I can speculate as to why - but my understanding was that inside of this code it should be one job and it should be broken down into stages, not multiple jobs. Why doesn't it just break it down into stages, how come it breaks into jobs?
我也不确定为什么有多个工作?我认为工作更像是行动发生时。我可以推测原因 - 但我的理解是在这段代码中,它应该是一项工作,它应该被分解为多个阶段,而不是多项工作。为什么不把它分解成阶段,它为什么分解成工作?
As far as the 200 plus tasks, since the amount of data and the amount of nodes is miniscule, it doesn't make sense that there is like 25 tasks for each row of data when there is only one aggregations and a couple of filters. Why wouldn't it just have one task per partition per atomic operation?
就 200 多个任务而言,由于数据量和节点数量微乎其微,当只有一个聚合和几个过滤器时,每行数据有 25 个任务是没有意义的。为什么每个原子操作每个分区只有一个任务?
Here is the relevant scala code -
这是相关的 Scala 代码 -
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
Below is the screen shot after clicking on the application

Below is are the stages show when viewing the specific "job" of id 0

Below is the first part of the screen when clicking on the stage with over 200 tasks
下面是点击有 200 多个任务的舞台时的第一部分屏幕
This is the second part of the screen inside the stage
Below is after clicking on the "executors" tab

As requested, here are the stages for Job ID 1
根据要求,以下是作业 ID 1 的阶段
Here are the details for the stage in job ID 1 with 200 tasks
以下是具有 200 个任务的作业 ID 1 中的阶段的详细信息
回答by marios
This is a classic Spark question.
这是一个经典的 Spark 问题。
The two tasks used for reading (Stage Id 0 in second figure) is the defaultMinPartitionssetting which is set to 2. You can get this parameter by reading the value in the REPL sc.defaultMinPartitions. It should also be visible in the Spark UI under the "Environment" tab.
用于读取的两个任务(第二个图中的 Stage Id 0)defaultMinPartitions是设置为 2 的设置。您可以通过读取 REPL 中的值来获取该参数sc.defaultMinPartitions。它也应该在“环境”选项卡下的 Spark UI 中可见。
You can take a look at the codefrom GitHub to see that this exactly what is happening. If you want more partitions to be used on read, just add it as a parameter e.g., sc.textFile("a.txt", 20).
您可以查看GitHub中的代码,了解这正是正在发生的事情。如果您希望在读取时使用更多分区,只需将其添加为参数,例如sc.textFile("a.txt", 20).
Now the interesting part comes from the 200 partitions that come on the second stage (Stage Id 1 in second figure). Well, each time there is a shuffle, Spark needs to decide how many partitions will the shuffle RDD have. As you can imagine, the default is 200.
现在有趣的部分来自第二阶段的 200 个分区(第二个图中的阶段 Id 1)。好吧,每次有一个shuffle,Spark需要决定shuffle的RDD有多少个分区。可以想象,默认值为 200。
You can change that using:
您可以使用以下方法更改它:
sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
If you run your code with this configuration you will see that the 200 partitions are not going to be there any more. How to set this parameter is kind of an art. Maybe choose 2x the number of cores you have (or whatever).
如果您使用此配置运行您的代码,您将看到 200 个分区将不再存在。如何设置这个参数是一门艺术。也许选择两倍于您拥有的内核数量(或其他)。
I think Spark 2.0 has a way to automatically infer the best number of partitions for shuffle RDDs. Looking forward to that!
我认为 Spark 2.0 有一种方法可以自动推断 shuffle RDD 的最佳分区数。期待那个!
Finally, the number of jobs you get has to do with how many RDD actionsthe resulting optimized Dataframe code resulted to. If you read the Spark specs it says that each RDD action will trigger one job. When you action involves a Dataframe or SparkSQL the Catalyst optimizer will figure out an execution plan and generate some RDD based code to execute it. It's hard to say exactly why it uses two actions in your case. You may need to look at the optimized query plan to see exactly what is doing.
最后,您获得的作业数量与生成的优化 Dataframe 代码导致的RDD 操作数量有关。如果您阅读 Spark 规范,它会说每个 RDD 操作都会触发一项工作。当您的操作涉及 Dataframe 或 SparkSQL 时,Catalyst 优化器将找出执行计划并生成一些基于 RDD 的代码来执行它。很难确切地说明为什么它在您的情况下使用两个操作。您可能需要查看优化的查询计划以准确了解正在执行的操作。
回答by Enrique Altuna
I am having a similar problem. But in my scenario the collection I am parallelizing has less elements than the number of tasks scheduled by Spark (causing spark to behave oddly sometimes). Using the forced partition number I was able to fix this issue.
我有个类似的问题。但是在我的场景中,我正在并行化的集合的元素少于 Spark 调度的任务数量(导致 Spark 有时表现得很奇怪)。使用强制分区号我能够解决这个问题。
It was something like this:
它是这样的:
collection = range(10) # In the real scenario it was a complex collection
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario
Then, I saw in the Spark log:
然后,我在 Spark 日志中看到:
INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks

