scala Spark unionAll 多个数据帧

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37612622/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:20:41  来源:igfitidea点击:

Spark unionAll multiple dataframes

scalaapache-sparkapache-spark-sql

提问by echo

For a set of dataframes

对于一组数据帧

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

to union all of them I do

联合他们所有我做

df1.unionAll(df2).unionAll(df3)

Is there a more elegant and scalable way of doing this for any number of dataframes, for example from

对于任意数量的数据帧,是否有更优雅和可扩展的方式来执行此操作,例如从

Seq(df1, df2, df3) 

回答by zero323

The simplest solution is to reducewith union(unionAllin Spark < 2.0):

最简单的解决方案是reduce使用unionunionAll在 Spark < 2.0 中):

val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)

This is relatively concise and shouldn't move data from off-heap storage but extends lineage with each unionrequires non-linear time to perform plan analysis. what can be a problem if you try to merge large number of DataFrames.

这是相对简洁的,不应该从堆外存储移动数据,但扩展每个联合的沿袭需要非线性时间来执行计划分析。如果您尝试合并大量DataFrames.

You can also convert to RDDsand use SparkContext.union:

您还可以转换为RDDs并使用SparkContext.union

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}

It keeps lineage shortanalysis cost low but otherwise it is less efficient than merging DataFramesdirectly.

它使谱系短分析成本保持较低,但在其他方面不如DataFrames直接合并有效。

回答by TH22

For pyspark you can do the following:

对于 pyspark,您可以执行以下操作:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

It's also worth nothing that the order of the columns in the dataframes should be the same for this to work. This can silently give unexpected results if you don't have the correct column orders!!

数据帧中列的顺序应该是相同的,这也是毫无意义的。如果您没有正确的列顺序,这可能会默默地给出意想不到的结果!!

If you are using pyspark 2.3 or greater, you can use unionByName so you don't have to reorder the columns.

如果您使用的是 pyspark 2.3 或更高版本,您可以使用 unionByName 这样您就不必重新排列列。

回答by S. Biedermann

Under the Hood spark flattens union expressions. So it takes longer when the Union is done linearly.

在引擎盖下火花使工会表达变得扁平。所以当 Union 线性完成时需要更长的时间。

The best solution is spark to have a union function that supports multiple DataFrames.

最好的解决方案是 spark 具有支持多个 DataFrame 的联合功能。

But the following code might speed up the union of multiple DataFrames (or DataSets)somewhat.

但是下面的代码可能会在一定程度上加速多个 DataFrames(或 DataSets)的并集。

  def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
      binaryReduce[Dataset[T]](datasets, _.union(_))
  }
  def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
      if (ts.isEmpty) {
         throw new IllegalArgumentException
      }
      var array = ts toArray
      var size = array.size
      while(size > 1) {
         val newSize = (size + 1) / 2
         for (i <- 0 until newSize) {
             val index = i*2
             val index2 = index + 1
             if (index2 >= size) {
                array(i) = array(index)  // last remaining
             } else {
                array(i) = op(array(index), array(index2))
             }
         }
         size = newSize
     }
     array(0)
 }