pandas collect() 或 toPandas() 在 pyspark/EMR 中的大型 DataFrame 上

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47536123/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 04:49:42  来源:igfitidea点击:

collect() or toPandas() on a large DataFrame in pyspark/EMR

pandasapache-sparkpysparkemramazon-emr

提问by Rami

I have an EMR cluster of one machine "c3.8xlarge", after reading several resources, I understood that I have to allow decent amount of memory off-heap because I am using pyspark, so I have configured the cluster as follow:

我有一台机器“c3.8xlarge”的 EMR 集群,在阅读了几个资源后,我明白我必须允许大量的堆外内存,因为我使用的是 pyspark,所以我配置了集群如下:

One executor:

一名执行者:

  • spark.executor.memory 6g
  • spark.executor.cores 10
  • spark.yarn.executor.memoryOverhead 4096
  • spark.executor.memory 6g
  • spark.executor.cores 10
  • spark.yarn.executor.memoryOverhead 4096

Driver:

司机:

  • spark.driver.memory 21g
  • spark.driver.memory 21g

When I cache()the DataFrame it takes about 3.6GB of memory.

当我cache()使用 DataFrame 时,它​​需要大约 3.6GB 的内存。

Now when I call collect()or toPandas()on the DataFrame, the process crashes.

现在,当我在 DataFrame 上调用collect()toPandas()时,进程崩溃了。

I know that I am bringing a large amount of data into the driver, but I think that it is not that large, and I am not able to figure out the reason of the crash.

我知道我将大量数据带入驱动程序,但我认为它没有那么大,我无法弄清楚崩溃的原因。

When I call collect()or toPandas()I get this error:

当我打电话collect()toPandas()收到此错误时:

Py4JJavaError: An error occurred while calling o181.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 6.0 failed 4 times, most recent failure: Lost task 5.3 in stage 6.0 (TID 110, ip-10-0-47-207.prod.eu-west-1.hs.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1511879540686_0005_01_000016 on host: ip-10-0-47-207.prod.eu-west-1.hs.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply$mcI$sp(Dataset.scala:2803)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:2800)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython.apply(Dataset.scala:2800)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

==== Update ====

==== 更新 ====

As @user6910411 suggested, I have tried the solution mentioned here, and in that case I get the following error:

正如@user6910411 所建议的,我已经尝试了这里提到的解决方案,在这种情况下,我收到以下错误:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 41, ip-10-0-33-57.prod.eu-west-1.hs.internal, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 13.5 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Any hint about what is happening here?

关于这里发生了什么的任何提示?

回答by zero323

TL;DRI believe you're seriously underestimating memory requirements.

TL;DR我相信你严重低估了内存需求。

Even assuming that data is fully cached, storage info will show only a fraction of peak memory required for bringing data back to the driver.

即使假设数据已完全缓存,存储信息也只会显示将数据带回驱动程序所需的峰值内存的一小部分。

Since data is actually pretty large I would consider writing it to Parquet and reading it back directly in Python using PyArrow (Reading and Writing the Apache Parquet Format) completely skipping all the intermediate stages.

由于数据实际上非常大,我会考虑将其写入 Parquet 并使用 PyArrow(读取和写入 Apache Parquet 格式)直接在 Python 中读回,完全跳过所有中间阶段。

回答by Dafni Argyro Krystallidou

As mentioned above, when calling toPandas(), all records of the DataFrame are collected to the driver program and hence should be done on a small subset of the data. (https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)

如上所述,当调用 toPandas() 时,DataFrame 的所有记录都被收集到驱动程序中,因此应该对一小部分数据进行处理。( https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)