pandas collect() 或 toPandas() 在 pyspark/EMR 中的大型 DataFrame 上
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/47536123/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
collect() or toPandas() on a large DataFrame in pyspark/EMR
提问by Rami
I have an EMR cluster of one machine "c3.8xlarge", after reading several resources, I understood that I have to allow decent amount of memory off-heap because I am using pyspark, so I have configured the cluster as follow:
我有一台机器“c3.8xlarge”的 EMR 集群,在阅读了几个资源后,我明白我必须允许大量的堆外内存,因为我使用的是 pyspark,所以我配置了集群如下:
One executor:
一名执行者:
- spark.executor.memory 6g
- spark.executor.cores 10
- spark.yarn.executor.memoryOverhead 4096
- spark.executor.memory 6g
- spark.executor.cores 10
- spark.yarn.executor.memoryOverhead 4096
Driver:
司机:
- spark.driver.memory 21g
- spark.driver.memory 21g
When I cache()
the DataFrame it takes about 3.6GB of memory.
当我cache()
使用 DataFrame 时,它需要大约 3.6GB 的内存。
Now when I call collect()
or toPandas()
on the DataFrame, the process crashes.
现在,当我在 DataFrame 上调用collect()
或toPandas()
时,进程崩溃了。
I know that I am bringing a large amount of data into the driver, but I think that it is not that large, and I am not able to figure out the reason of the crash.
我知道我将大量数据带入驱动程序,但我认为它没有那么大,我无法弄清楚崩溃的原因。
When I call collect()
or toPandas()
I get this error:
当我打电话collect()
或toPandas()
收到此错误时:
Py4JJavaError: An error occurred while calling o181.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 6.0 failed 4 times, most recent failure: Lost task 5.3 in stage 6.0 (TID 110, ip-10-0-47-207.prod.eu-west-1.hs.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1511879540686_0005_01_000016 on host: ip-10-0-47-207.prod.eu-west-1.hs.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply$mcI$sp(Dataset.scala:2803)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:2800)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:2800)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
==== Update ====
==== 更新 ====
As @user6910411 suggested, I have tried the solution mentioned here, and in that case I get the following error:
正如@user6910411 所建议的,我已经尝试了这里提到的解决方案,在这种情况下,我收到以下错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 41, ip-10-0-33-57.prod.eu-west-1.hs.internal, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 13.5 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Any hint about what is happening here?
关于这里发生了什么的任何提示?
回答by zero323
TL;DRI believe you're seriously underestimating memory requirements.
TL;DR我相信你严重低估了内存需求。
Even assuming that data is fully cached, storage info will show only a fraction of peak memory required for bringing data back to the driver.
即使假设数据已完全缓存,存储信息也只会显示将数据带回驱动程序所需的峰值内存的一小部分。
- First of all Spark SQL uses compressed columnar storagefor caching. Depending on the data distribution and compression algorithm in-memory size can be much smaller than the uncompressed Pandas output, not to mention plain
List[Row]
. The latter also stores column names, further increasing memory usage. - Data collection is indirect, with data being stored both on the JVM side and Python side. While JVM memory can be released once data goes through socket, peak memory usage should account for both.
Plain
toPandas
implementation collectsRows
first, then creates PandasDataFrame
locally. This further increases (possibly doubles) memory usage. Luckily this part is already addressed on master (Spark 2.3), with more direct approach using Arrow serialization (SPARK-13534 - Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas).For possible solution independent of Apache Arrow you can check Faster and Lower memory implementation toPandason the Apache Spark Developer List.
- 首先,Spark SQL 使用压缩的列式存储进行缓存。根据数据分布和压缩算法,内存中的大小可能比未压缩的 Pandas 输出小得多,更不用说普通
List[Row]
. 后者还存储列名,进一步增加了内存使用量。 - 数据收集是间接的,数据同时存储在 JVM 端和 Python 端。虽然一旦数据通过套接字就可以释放 JVM 内存,但峰值内存使用量应该兼顾两者。
普通
toPandas
实现Rows
先收集,然后DataFrame
在本地创建 Pandas。这进一步增加(可能加倍)内存使用量。幸运的是,这部分已经在 master (Spark 2.3) 上得到解决,使用 Arrow 序列化更直接的方法(SPARK-13534 - 为 Spark DataFrame 实现 Apache Arrow 序列化器以用于 DataFrame.toPandas)。对于独立于 Apache Arrow 的可能解决方案,您可以在 Apache Spark 开发人员列表上查看Faster and Lower memory implementation toPandas。
Since data is actually pretty large I would consider writing it to Parquet and reading it back directly in Python using PyArrow (Reading and Writing the Apache Parquet Format) completely skipping all the intermediate stages.
由于数据实际上非常大,我会考虑将其写入 Parquet 并使用 PyArrow(读取和写入 Apache Parquet 格式)直接在 Python 中读回,完全跳过所有中间阶段。
回答by Dafni Argyro Krystallidou
As mentioned above, when calling toPandas(), all records of the DataFrame are collected to the driver program and hence should be done on a small subset of the data. (https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)
如上所述,当调用 toPandas() 时,DataFrame 的所有记录都被收集到驱动程序中,因此应该对一小部分数据进行处理。( https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)