scala 如何在 Spark 中压缩两个(或更多)DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32882529/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:40:35  来源:igfitidea点击:

How to zip two (or more) DataFrame in Spark

scalaapache-sparkdataframeapache-spark-sql

提问by worldterminator

I have two DataFrameaand b. ais like

我有两个DataFrameaba就好像

Column 1 | Column 2
abc      |  123
cde      |  23 

bis like

b就好像

Column 1 
1      
2      

I want to zip aand b(or even more) DataFrames which becomes something like:

我想压缩ab(甚至更多)DataFrames,它变成这样:

Column 1 | Column 2 | Column 3
abc      |  123     |   1
cde      |  23      |   2

How can I do it?

我该怎么做?

回答by zero323

Operation like this is not supported by a DataFrame API. It is possible to ziptwo RDDs but to make it work you have to match both number of partitions and number of elements per partition. Assuming this is the case:

DataFrame API 不支持这样的操作。zip两个 RDD是可能的,但要使其工作,您必须匹配分区数和每个分区的元素数。假设是这种情况:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}

val a: DataFrame = sc.parallelize(Seq(
  ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")

// Merge rows
val rows = a.rdd.zip(b.rdd).map{
  case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)

// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

If above conditions are not met the only option that comes to mind is adding an index and join:

如果不满足上述条件,唯一想到的选择是添加索引和连接:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)

// Join and clean
val ab = aWithIndex
  .join(bWithIndex, Seq("_index"))
  .drop("_index")

回答by Sohum Sachdev

In Scala's implementation of Dataframes, there is no simple way to concatenate two dataframes into one. We can simply work around this limitation by adding indices to each row of the dataframes. Then, we can do a inner join by these indices. This is my stub code of this implementation:

在 Scala 的 Dataframes 实现中,没有简单的方法可以将两个 dataframes 连接成一个。我们可以通过向数据帧的每一行添加索引来简单地解决这个限制。然后,我们可以通过这些索引进行内部连接。这是我的这个实现的存根代码:

val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)

val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)

aWithId.join(bWithId, "id")

A little light reading - Check out how Python does this!

一点点轻松阅读 - 看看 Python 是如何做到这一点的!

回答by Thomas Decaux

What about pure SQL ?

纯 SQL 呢?

SELECT 
    room_name, 
    sender_nickname, 
    message_id, 
    row_number() over (partition by room_name order by message_id) as message_index, 
    row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index
from messages
order by room_name, message_id

回答by snark

I know the OP was using Scala but if, like me, you need to know how to do this in pyspark then try the Python code below. Like @zero323's first solution it relies on RDD.zip()and will therefore fail if both DataFrames don't have the same number of partitions and the same number of rows in each partition.

我知道 OP 使用的是 Scala,但是如果像我一样,您需要知道如何在 pyspark 中执行此操作,请尝试下面的 Python 代码。就像@zero323 的第一个解决方案一样,RDD.zip()如果两个 DataFrame 的分区数和每个分区中的行数不同,它所依赖的解决方案就会失败。

from pyspark.sql import Row
from pyspark.sql.types import StructType

def zipDataFrames(left, right):
    CombinedRow = Row(*left.columns + right.columns)

    def flattenRow(row):
        left = row[0]
        right = row[1]
        combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__]
        return CombinedRow(*combinedVals)

    zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row))        
    combinedSchema = StructType(left.schema.fields + right.schema.fields)        
    return zippedRdd.toDF(combinedSchema)

joined = zipDataFrames(a, b)