scala 如何在 Spark 中压缩两个(或更多)DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32882529/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to zip two (or more) DataFrame in Spark
提问by worldterminator
I have two DataFrameaand b.
ais like
我有两个DataFramea和b。
a就好像
Column 1 | Column 2
abc | 123
cde | 23
bis like
b就好像
Column 1
1
2
I want to zip aand b(or even more) DataFrames which becomes something like:
我想压缩a和b(甚至更多)DataFrames,它变成这样:
Column 1 | Column 2 | Column 3
abc | 123 | 1
cde | 23 | 2
How can I do it?
我该怎么做?
回答by zero323
Operation like this is not supported by a DataFrame API. It is possible to ziptwo RDDs but to make it work you have to match both number of partitions and number of elements per partition. Assuming this is the case:
DataFrame API 不支持这样的操作。zip两个 RDD是可能的,但要使其工作,您必须匹配分区数和每个分区的元素数。假设是这种情况:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
If above conditions are not met the only option that comes to mind is adding an index and join:
如果不满足上述条件,唯一想到的选择是添加索引和连接:
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")
回答by Sohum Sachdev
In Scala's implementation of Dataframes, there is no simple way to concatenate two dataframes into one. We can simply work around this limitation by adding indices to each row of the dataframes. Then, we can do a inner join by these indices. This is my stub code of this implementation:
在 Scala 的 Dataframes 实现中,没有简单的方法可以将两个 dataframes 连接成一个。我们可以通过向数据帧的每一行添加索引来简单地解决这个限制。然后,我们可以通过这些索引进行内部连接。这是我的这个实现的存根代码:
val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)
val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)
aWithId.join(bWithId, "id")
回答by Thomas Decaux
What about pure SQL ?
纯 SQL 呢?
SELECT
room_name,
sender_nickname,
message_id,
row_number() over (partition by room_name order by message_id) as message_index,
row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index
from messages
order by room_name, message_id
回答by snark
I know the OP was using Scala but if, like me, you need to know how to do this in pyspark then try the Python code below. Like @zero323's first solution it relies on RDD.zip()and will therefore fail if both DataFrames don't have the same number of partitions and the same number of rows in each partition.
我知道 OP 使用的是 Scala,但是如果像我一样,您需要知道如何在 pyspark 中执行此操作,请尝试下面的 Python 代码。就像@zero323 的第一个解决方案一样,RDD.zip()如果两个 DataFrame 的分区数和每个分区中的行数不同,它所依赖的解决方案就会失败。
from pyspark.sql import Row
from pyspark.sql.types import StructType
def zipDataFrames(left, right):
CombinedRow = Row(*left.columns + right.columns)
def flattenRow(row):
left = row[0]
right = row[1]
combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__]
return CombinedRow(*combinedVals)
zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row))
combinedSchema = StructType(left.schema.fields + right.schema.fields)
return zippedRdd.toDF(combinedSchema)
joined = zipDataFrames(a, b)

