Python 何时缓存 DataFrame?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/44156365/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 23:45:57  来源:igfitidea点击:

When to cache a DataFrame?

pythonapache-sparkpysparkapache-spark-sql

提问by Alezis

My question is, when should I do dataframe.cache() and when it's usefull?

我的问题是,我什么时候应该做 dataframe.cache() 以及什么时候有用?

Also, in my code should I cache the dataframes in the commented lines?

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

Note: My dataframes are loaded from a Redshift DB.

注意:我的数据帧是从 Redshift 数据库加载的。

Many thanks

非常感谢

Here my code:

这是我的代码:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
    df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
    df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])

    dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
        .filter(dataframe.seq_reserva.isin(seq_reservas))

    ##################################################
    #SHOULD I CACHE HERE df_vta, df_cpa and dataframe
    ##################################################

    dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
                                        dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
                                        dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
                                        dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
                                        dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
                                        ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
                                                "cod_esquema_vta", "cod_emp_atlas_vta") \
        .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
                       dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
                       dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
                       dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
                       dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
                       ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
                               "cod_esquema_cpa", "cod_emp_atlas_cpa") \
        .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
                "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")

    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################

    dataframe = dataframe.withColumn("amount1",
                                     func.when(dataframe.ind_tipo_regimen_fac == 'E',
                                               dataframe.imp_margen_canal * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise(dataframe.imp_venta * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_venta - dataframe.imp_margen_canal) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.withColumn("amount2",
                                     func.when(dataframe.ind_tipo_regimen_con == 'E',
                                               dataframe.imp_margen_canco * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_coste) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.na.fill({'amount1': 0})
    dataframe = dataframe.na.fill({'amount2': 0})

    dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
                                        dataframe.seq_reserva == df_aux.booking_id])

    dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount1))

    dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount2))

    dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)

    dataframe = dataframe.na.fill({'impuesto_canco': 0})

    dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################
    dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
        withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")

    return dataframe

回答by Jacek Laskowski

when should I do dataframe.cache() and when it's usefull?

我什么时候应该做 dataframe.cache() 以及什么时候有用?

cachewhat you are going to use across queries (and early and often up to available memory). It does not really matter what programming language you use (Python or Scala or Java or SQL or R) as the underlying mechanics is the same.

cache您将在查询中使用什么(以及早期并且通常是可用内存)。您使用哪种编程语言(Python 或 Scala 或 Java 或 SQL 或 R)并不重要,因为底层机制是相同的。

You can see if a DataFrame was cached in your physical plan using explainoperator (where InMemoryRelationentities reflect cached datasets with their storage level):

您可以使用explain运算符查看 DataFrame 是否已缓存在您的物理计划中(其中InMemoryRelation实体反映缓存的数据集及其存储级别):

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

After you cache(or persist) your DataFrame the first query may get slower, but it is going to pay off for the following queries.

在您cache(或persist)您的 DataFrame 之后,第一个查询可能会变慢,但它会为以下查询带来回报。

You can check whether a Dataset was cached or not using the following code:

您可以使用以下代码检查数据集是否已缓存:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

Also, in my code should I cache the dataframes in the commented lines?

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

Yes and no. Cache what represents external datasets so you don't pay the extra price of transmitting data across network (while accessing the external storage) every time you query over them.

是和否。缓存代表外部数据集的内容,这样您就不必在每次查询它们时支付跨网络传输数据(同时访问外部存储)的额外费用。

Don't cache what you use only once or is easy to compute. Otherwise, cache.

不要缓存您只使用一次或易于计算的内容。否则,cache



Be careful what you cache, i.e. what Datasetis cached, as it gives different queries cached.

请注意缓存的内容,即Dataset缓存的内容,因为它会缓存不同的查询。

// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)


There's one surprise with caching in Spark SQL. Caching is lazy and that's why you pay the extra price to have rows cached the very first action, but that only happens with DataFrame API. In SQL, caching is eager which makes a huge difference in query performance as you don't have you call an action to trigger caching.

Spark SQL 中的缓存有一个惊喜。缓存是惰性的,这就是为什么您要支付额外的费用来缓存第一个操作的行,但这仅发生在 DataFrame API 中。在 SQL 中,缓存是急切的,这会对查询性能产生巨大影响,因为您无需调用操作来触发缓存。

回答by Vaidas Armonas

Actually in your case .cache()won't help at all. You are not executing any action on your (at least not in your provided function) dataframe. .cache()is a good idea if you will use data several times over like:

实际上在你的情况下.cache()根本没有帮助。您没有对您的(至少不在您提供的函数中)数据帧执行任何操作。.cache()如果您将多次使用数据,这是一个好主意,例如:

data = sub_tax_transfer_pricing_eur_aux(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()

This way you will fetch data only once (when first action is called .show()and then the next use of datadataframe should be faster. However, use this with caution - sometime fetching data again is still faster. Also, I would advise against naming the same name your dataframe over and over again. Dataframes are immutable objects, after all.

这样,您将只获取一次数据(调用第一个操作时.show(),然后下一次使用data数据帧应该更快。但是,请谨慎使用 - 有时再次获取数据仍然更快。另外,我建议不要命名相同的名称一遍又一遍地重复你的数据框。毕竟,数据框是不可变的对象。

Hope this is helpful.

希望这是有帮助的。

回答by Nandeesh

Caching RDDs in Spark: It is one mechanism to speed up applications that access the same RDD multiple times. An RDD that is not cached, nor checkpointed, is re-evaluated again each time an action is invoked on that RDD. There are two function calls for caching an RDD: cache()and persist(level: StorageLevel). The difference among them is that cache()will cache the RDD into memory, whereas persist(level)can cache in memory, on disk, or off-heap memory according to the caching strategy specified by level. persist()without an argument is equivalent with cache(). We discuss caching strategies later in this post. Freeing up space from the Storage memory is performed by unpersist().

在 Spark 中缓存 RDD:这是一种加速多次访问同一个 RDD 的应用程序的机制。每次在该 RDD 上调用操作时,都会重新评估未缓存或检查点的 RDD。有两个用于缓存 RDD 的函数调用:cache()persist(level: StorageLevel). 它们之间的区别是cache()将RDD缓存到内存中,而persist(level)可以根据级别指定的缓存策略缓存在内存中、磁盘上或堆外内存中。 persist()没有参数等同于cache()。我们将在本文后面讨论缓存策略。从 Storage 内存中释放空间由 执行unpersist()

When to use caching: As suggested in this post, it is recommended to use caching in the following situations:

何时使用缓存:正如本文所建议的,建议在以下情况下使用缓存:

  • RDD re-use in iterative machine learning applications
  • RDD re-use in standalone Spark applications
  • When RDD computation is expensive, caching can help in reducing the cost of recovery in the case one executor fails
  • RDD 在迭代机器学习应用程序中的重用
  • RDD 在独立 Spark 应用程序中的重用
  • 当 RDD 计算很昂贵时,缓存可以帮助降低在一个执行器失败的情况下的恢复成本