scala 我们是否应该像在训练之前并行化 Seq 一样并行化 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37557014/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:20:29  来源:igfitidea点击:

Should we parallelize a DataFrame like we parallelize a Seq before training

scalaapache-sparkpysparkapache-spark-sqlapache-spark-ml

提问by Abhishek

Consider the code given here,

考虑这里给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

Assuming we read "training" as a dataframe using sqlContext.read(), should we still do something like

假设我们使用 sqlContext.read() 将“训练”读作数据帧,我们是否仍应执行类似的操作

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

or the fit function will automatically take care of parallelizing the computation/ data when passed a dataFrame

或者 fit 函数将在传递数据帧时自动处理并行化计算/数据

Regards,

问候,

回答by zero323

DataFrameis a distributed data structure. It is neither required nor possible to parallelizeit. SparkConext.parallelizemethod is used only to distributed local data structures which reside in the driver memory. You shouldn't be used to distributed large datasets not to mention redistributing RDDsor higher level data structures (like you do in your previous question)

DataFrame是一种分布式数据结构。它既不是必需的,也不是不可能的parallelizeSparkConext.parallelize方法仅用于驻留在驱动程序内存中的分布式本地数据结构。您不应该习惯分布式大型数据集,更不用说重新分配RDDs或更高级的数据结构(就像您在上一个问题中所做的那样)

sc.parallelize(trainingData.collect()) 

If you want to convert between RDD/ Dataframe(Dataset) use methods which are designed to do it:

如果要在RDD/ Dataframe( Dataset)之间进行转换,请使用旨在执行此操作的方法:

  1. from DataFrameto RDD:

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
    val rdd: RDD[Row] = df.rdd
    
  2. form RDDto DataFrame:

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    
  1. DataFrameRDD

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
    val rdd: RDD[Row] = df.rdd
    
  2. 形成RDDDataFrame

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    

回答by Timomo

You should maybe check out the difference between RDD and DataFrame and how to convert between the two: Difference between DataFrame and RDD in Spark

您可能应该查看 RDD 和 DataFrame 之间的区别以及如何在两者之间进行转换:Difference between DataFrame and RDD in Spark

To answer your question directly: A DataFrame is already optimized for parallel execution. You do not need to do anything and you can pass it to any spark estimators fit() method directly. The parallel executions are handled in the background.

直接回答您的问题:DataFrame 已经针对并行执行进行了优化。你不需要做任何事情,你可以直接将它传递给任何 spark estimators fit() 方法。并行执行在后台处理。