scala 如何检查数据帧?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33424445/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:46:02  来源:igfitidea点击:

How to checkpoint DataFrames?

scalaapache-sparkdataframeapache-spark-sql

提问by Daniel Shields

I'm looking for a way to checkpoint DataFrames. Checkpoint is currently an operation on RDD but I can't find how to do it with DataFrames. persist and cache (which are synonyms for each other) are available for DataFrame but they do not "break the lineage" and are thus unsuitable for methods that could loop for hundreds (or thousands) of iterations.

我正在寻找一种检查点数据帧的方法。Checkpoint 目前是对 RDD 的操作,但我找不到如何使用 DataFrames 进行操作。persist 和 cache(它们是彼此的同义词)可用于 DataFrame,但它们不会“打破血统”,因此不适用于可能循环数百(或数千)次迭代的方法。

As an example, suppose that I have a list of functions whose signature is DataFrame => DataFrame. I want to have a way to compute the following even when myfunctions has hundreds or thousands of entries:

例如,假设我有一个签名为 DataFrame => DataFrame 的函数列表。即使 myfunctions 有数百或数千个条目,我也想有一种方法来计算以下内容:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df, f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }

回答by Daniel de Paula

TL;DR:For Spark versions up to 1.6, to actually get a "checkpointed DF", my suggested solution is based on another answer, but with one extra line:

TL; DR:对于高达 1.6 的 Spark 版本,要实际获得“检查点 DF”,我建议的解决方案基于另一个答案,但有一个额外的行:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed


Explanation

解释

Updated after further research.

进一步研究后更新。

As pointed out, checkpointing a DataFrame directly is not currently (Spark 1.6.1) possible, though there is an issuefor it on Spark's Jira.

正如所指出的,目前(Spark 1.6.1)无法直接检查数据帧,尽管在 Spark 的 Jira 上存在问题

So, a possible workaround is the one suggested on another answer:

因此,可能的解决方法是在另一个答案中建议的解决方法:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

However, with this approach, only the df.rdd object will be checkpointed. This can be verified by calling toDebugStringto df.rdd:

但是,使用这种方法,只会检查 df.rdd 对象。这可以通过调用toDebugString来验证df.rdd

 scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []

and then calling toDebugStringafter a quick transformation to df(please note that I created my DataFrame from a JDBC source), returns the following:

然后toDebugString在快速转换后调用df(请注意,我从 JDBC 源创建了我的 DataFrame),返回以下内容:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []

df.explainalso shows a hint:

df.explain还显示了一个提示:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

So, to actually achieve a "checkpointed" DataFrame, I can only think of creating a new one from the checkpointed RDD:

因此,要真正实现“检查点”DataFrame,我只能考虑从检查点 RDD 创建一个新的:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")

Then we can verify that the new DataFrame is "checkpointed":

然后我们可以验证新的 DataFrame 是否被“检查点”:

1) newDF.explain:

1)newDF.explain

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]

2) newDF.rdd.toDebugString:

2)newDF.rdd.toDebugString

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

3) With transformation:

3) 带变换:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

Also, I tried some more complex transformations and I was able to check, in practice, that the newDFobject was checkpointed.

此外,我尝试了一些更复杂的转换,并且在实践中我能够检查newDF对象是否已设置检查点。

Therefore, the only way I found to reliably checkpoint a DataFrame was by checkpointing its associated RDD and creating a new DataFrame object from it.

因此,我发现可靠地检查 DataFrame 的唯一方法是检查其关联的 RDD 并从中创建一个新的 DataFrame 对象。

I hope it helps. Cheers.

我希望它有帮助。干杯。

回答by Assaf Mendelson

As of spark 2.1, dataframe has a checkpoint method (see http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) you can use directly, no need to go through RDD.

从 spark 2.1 开始,dataframe 有一个检查点方法(参见http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset)你可以直接使用,没有需要通过RDD。

回答by mrsrinivas

Extending to Assaf Mendelson answer,

扩展到 Assaf Mendelson 的回答,

As of today Spark version 2.2, DataSet#checkpoint()API is Evolving and Experimental

从今天的 Spark 2.2 版开始,DataSet#checkpoint()API 正在不断发展和实验

Usage:

用法:

Before checkpoint CheckpointDir has to be mentioned using SparkContext

在必须使用 SparkContext 提及检查点 CheckpointDir 之前

spark.sparkContext.setCheckpointDir("checkpoint/dir/location")

val ds: Dataset[Long] = spark.range(10).repartition('id % 2)

// do checkpoint now, it will preserve partition also
val cp: Dataset[Long] = ds.checkpoint()

How is works internally?

内部运作如何?

So far the implementation for DataSet checkpointis to convert the DataSet to RDD then checkpoint it.

到目前为止,DataSet 检查点实现是将 DataSet 转换为 RDD,然后对其进行检查点。

// In DataSet.scala 

//API we used in example  
def checkpoint(): Dataset[T] = checkpoint(eager = true)

//Base implementation
def checkpoint(eager: Boolean): Dataset[T] = {
    val internalRdd = queryExecution.toRdd.map(_.copy())
    internalRdd.checkpoint()

    if (eager) {
      internalRdd.count() //To materialize DataSet immediately on checkpoint() call
    }

  ...
}

回答by Justin Pihony

I think right now you'll have to do

我想现在你必须做

sc.setCheckpointDir("/DIR")
df.rdd.checkpoint

And then you will have to perform your action on the underlying df.rdd. Calling df.ACTIONwill not work currently, only df.rdd.ACTION

然后您将不得不对底层df.rdd. df.ACTION目前无法通话,只有df.rdd.ACTION