Python 16 个任务的序列化结果总大小 (1048.5 MB) 大于 spark.driver.maxResultSize (1024.0 MB)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47996396/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 18:28:27  来源:igfitidea点击:

Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

pythonapache-sparkpysparkspark-dataframe

提问by Markus

I get the following error when I add --conf spark.driver.maxResultSize=2050to my spark-submitcommand.

添加--conf spark.driver.maxResultSize=2050spark-submit命令时出现以下错误。

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$$anonfun$run.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)

The reason of adding this configuration was the error:

添加这个配置的原因是错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Therefore, I increased maxResultSizeto 2.5 Gb, but the Spark job fails anyway (the error shown above). How to solve this issue?

因此,我增加到maxResultSize2.5 Gb,但 Spark 作业无论如何都失败了(上面显示的错误)。如何解决这个问题?

回答by Ryan Widmaier

It seems like the problem is the amount of data you are trying to pull back to to your driver is too large. Most likely you are using the collectmethod to retrieve all values from a DataFrame/RDD. The driveris a single process and by collecting a DataFrameyou are pulling all of that data you had distributed across the cluster back to one node. This defeats the purpose of distributing it!It only makes sense to do this after you have reduced the data down to a manageable amount.

问题似乎是您尝试拉回驱动程序的数据量太大。您很可能正在使用collect方法从DataFrame/RDD检索所有值。该驱动程序是一个过程,通过收集数据框你拉你已经在集群回分发到一个节点的数据的。 这违背了分发它的目的!只有在将数据减少到可管理的数量后,才有意义。

You have two options:

您有两个选择:

  1. If you really need to work with all that data, then you should keep it out on the executors. Use HDFSand Parquetto save the data in a distributed manner and use Spark methods to work with the data on the cluster instead of trying to collect it all back to one place.

  2. If you really need to get the data back to the driver, you should examine whether you really need ALL of the data or not. If you only need summary statistics then compute that out on the executors before calling collect. Or if you only need the top 100 results, then only collectthe top 100.

  1. 如果您真的需要处理所有这些数据,那么您应该将其保留在执行程序之外。使用HDFSParquet以分布式方式保存数据,并使用 Spark 方法处理集群上的数据,而不是试图将它们全部收集回一个地方。

  2. 如果您真的需要将数据返回给驱动程序,您应该检查您是否真的需要所有数据。如果您只需要汇总统计信息,请在调用 collect 之前在执行程序上计算出来。或者,如果您只需要前 100 个结果,则只收集前 100 个。

Update:

更新:

There is another reason you can run into this error that is less obvious. Spark will try to send data back the driver beyond just when you explicitly call collect. It will also send back accumulator results for each task if you are using accumulators, data for broadcast joins, and some small status data about each task. If you have LOTS of partitions (20k+ in my experience) you can sometimes see this error. This is a known issuewith some improvements made, and more in the works.

您可能会遇到此错误的另一个不太明显的原因。Spark 会尝试将数据发送回驱动程序,而不仅仅是在您明确调用 collect 时。如果您使用累加器,它还会发回每个任务的累加器结果、广播连接数据以及有关每个任务的一些小状态数据。如果您有很多分区(以我的经验为 20k+),您有时会看到此错误。这是一个已知问题,已进行了一些改进,并且正在进行更多改进。

The options for getting past if if this is your issue are:

如果这是您的问题,则可以通过以下选项:

  1. Increase spark.driver.maxResultSizeor set it to 0 for unlimited
  2. If broadcast joins are the culprit, you can reduce spark.sql.autoBroadcastJoinThresholdto limit the size of broadcast join data
  3. Reduce the number of partitions
  1. 增加spark.driver.maxResultSize或设置为 0 表示无限制
  2. 如果广播加入是罪魁祸首,您可以减少spark.sql.autoBroadcastJoinThreshold以限制广播加入数据的大小
  3. 减少分区数

回答by vj sreenivasan

Cause: caused by actions like RDD's collect() that send big chunk of data to the driver

原因:由 RDD 的 collect() 之类的操作引起,这些操作将大量数据发送给驱动程序

Solution: set by SparkConf: conf.set("spark.driver.maxResultSize", "4g")OR set by spark-defaults.conf: spark.driver.maxResultSize 4gOR set when calling spark-submit: --conf spark.driver.maxResultSize=4g

解决方案:由 SparkConf 设置:conf.set("spark.driver.maxResultSize", "4g")或由 spark-defaults.conf 设置:spark.driver.maxResultSize 4g或在调用 spark-submit 时设置:--conf spark.driver.maxResultSize=4g