scala 如何在 Spark 中转置 RDD
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29390717/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to transpose an RDD in Spark
提问by u4715202
I have an RDD like this:
我有一个这样的RDD:
1 2 3
4 5 6
7 8 9
It is a matrix. Now I want to transpose the RDD like this:
它是一个矩阵。现在我想像这样转置 RDD:
1 4 7
2 5 8
3 6 9
How can I do this?
我怎样才能做到这一点?
采纳答案by Daniel Darabos
Say you have an N×M matrix.
假设您有一个 N×M 矩阵。
If both N and M are so small that you can hold N×M items in memory, it doesn't make much sense to use an RDD. But transposing it is easy:
如果 N 和 M 都很小,以至于您可以在内存中保存 N×M 个项目,那么使用 RDD 就没有意义了。但转置它很容易:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
If N or M is so large that you cannot hold N or M entries in memory, then you cannot have an RDD line of this size. Either the original or the transposed matrix is impossible to represent in this case.
如果 N 或 M 太大以至于您无法在内存中保存 N 或 M 个条目,那么您就不能拥有这样大小的 RDD 行。在这种情况下,原始矩阵或转置矩阵都无法表示。
N and M may be of a medium size: you can hold N or M entries in memory, but you cannot hold N×M entries. In this case you have to blow up the matrix and put it together again:
N 和 M 可能是中等大小:您可以在内存中保存 N 或 M 个条目,但不能保存 N×M 个条目。在这种情况下,您必须炸毁矩阵并将其重新组合在一起:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
case (row, rowIndex) => row.zipWithIndex.map {
case (number, columnIndex) => columnIndex -> (rowIndex, number)
}
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}
回答by Martin
A first draft without using collect(), so everything runs worker side and nothing is done on driver:
没有使用 collect() 的初稿,所以一切都在工作端运行,驱动程序什么也没做:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
.map(v => (v._2, v._1)) // key by column position
.groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row
.map(_._2) // discard the key, keep only value
The problem with this solution is that the columns in the transposed matrix will end up shuffled if the operation is performed in a distributed system. Will think of an improved version
该解决方案的问题在于,如果在分布式系统中执行操作,则转置矩阵中的列最终会被打乱。会想到改进版
My idea is that in addition to attach the 'column number' to each element of the matrix, we attach also the 'row number'. So we could key by column position and regroup by key like in the example, but then we could reorder each row on the row number and then strip row/column numbers from the result. I just don't have a way to know the row number when importing a file into an RDD.
我的想法是,除了将“列号”附加到矩阵的每个元素之外,我们还要附加“行号”。因此,我们可以像示例中那样按列位置键并按键重新组合,但随后我们可以对行号上的每一行重新排序,然后从结果中去除行/列号。将文件导入 RDD 时,我只是没有办法知道行号。
You might think it's heavy to attach a column and a row number to each matrix element, but i guess that's the price to pay to have the possibility to process your input as chunks in a distributed fashion and thus handle huge matrices.
您可能认为将列号和行号附加到每个矩阵元素很繁重,但我想这是有可能以分布式方式将输入处理为块从而处理巨大矩阵的代价。
Will update the answer when i find a solution to the ordering problem.
当我找到订购问题的解决方案时将更新答案。
回答by 51zero
As of Spark 1.6 you can use the pivot operationon DataFrames, depending on the actual shape of your data, if you put it into a DF you could pivot columns to rows, the following databricks blogis very useful as it describes in detail a number of pivoting use cases with code examples
从 Spark 1.6 开始,您可以在 DataFrames 上使用数据透视操作,具体取决于数据的实际形状,如果将其放入 DF 中,您可以将列转换为行,以下数据块博客非常有用,因为它详细描述了一个数字使用代码示例透视用例

![Scala - 将 Array[String] 转换为 Array[Double]](/res/img/loading.gif)