Python 使用 Spark 将列转置为行
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37864222/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Transpose column to row with Spark
提问by Raouf
I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:
我正在尝试将表格的某些列转置为行。我正在使用 Python 和 Spark 1.5.0。这是我的初始表:
+-----+-----+-----+-------+
| A |col_1|col_2|col_...|
+-----+-------------------+
| 1 | 0.0| 0.6| ... |
| 2 | 0.6| 0.7| ... |
| 3 | 0.5| 0.9| ... |
| ...| ...| ...| ... |
I would like to have somthing like this:
我想要这样的东西:
+-----+--------+-----------+
| A | col_id | col_value |
+-----+--------+-----------+
| 1 | col_1| 0.0|
| 1 | col_2| 0.6|
| ...| ...| ...|
| 2 | col_1| 0.6|
| 2 | col_2| 0.7|
| ...| ...| ...|
| 3 | col_1| 0.5|
| 3 | col_2| 0.9|
| ...| ...| ...|
Does someone know haw I can do it? Thank you for your help.
有人知道我怎么做吗?感谢您的帮助。
回答by zero323
It is relatively simple to do with basic Spark SQL functions.
使用基本的 Spark SQL 函数做起来相对简单。
Python
Python
from pyspark.sql.functions import array, col, explode, struct, lit
df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])
def to_long(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
to_long(df, ["A"])
Scala:
斯卡拉:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}
val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")
def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")
val kvs = explode(array(
cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
))
val byExprs = by.map(col(_))
df
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}
toLong(df, Seq("A"))
回答by javadba
The Spark local linear algebra libraries are presently very weak: and they do not include basic operations as the above.
Spark 局部线性代数库目前非常薄弱:它们不包括上述基本运算。
There is a JIRA for fixing this for Spark 2.1 - but that will not help you today.
有一个 JIRA 可以为 Spark 2.1 解决这个问题——但是今天这对你没有帮助。
Something to consider: performing a transpose will likely require completely shuffling the data.
需要考虑的事情:执行转置可能需要完全混洗数据。
For now you will need to write RDD code directly. I have written transpose
in scala - but not in python. Here is the scala
version:
现在你需要直接编写 RDD 代码。我用transpose
Scala写过——但不是用 python 写的。这是scala
版本:
def transpose(mat: DMatrix) = {
val nCols = mat(0).length
val matT = mat
.flatten
.zipWithIndex
.groupBy {
_._2 % nCols
}
.toSeq.sortBy {
_._1
}
.map(_._2)
.map(_.map(_._1))
.toArray
matT
}
So you can convert that to python for your use. I do not have bandwidth to write/test that at this particular moment: let me know if you were unable to do that conversion.
因此,您可以将其转换为 python 供您使用。在这个特定时刻,我没有足够的带宽来编写/测试:如果您无法进行该转换,请告诉我。
At the least - the following are readily converted to python
.
至少 - 以下很容易转换为python
.
zipWithIndex
-->enumerate()
(python equivalent - credit to @zero323)map
-->[someOperation(x) for x in ..]
groupBy
-->itertools.groupBy()
zipWithIndex
-->enumerate()
(python 等效项 - 归功于 @zero323)map
-->[someOperation(x) for x in ..]
groupBy
-->itertools.groupBy()
Here is the implementation for flatten
which does not have a python equivalent:
这是flatten
没有python等效项的实现:
def flatten(L):
for item in L:
try:
for i in flatten(item):
yield i
except TypeError:
yield item
So you should be able to put those together for a solution.
因此,您应该能够将它们放在一起以获得解决方案。
回答by Vamsi Prabhala
One way to solve with pyspark sql
using functions create_map
and explode
.
一种方式解决与pyspark sql
使用功能create_map
和explode
。
from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant
df = df.withColumn('mapCol', \
func.create_map(func.lit('col_1'),df.col_1,
func.lit('col_2'),df.col_2,
func.lit('col_3'),df.col_3
)
)
#Use explode function to explode the map
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()
回答by David
Use flatmap. Something like below should work
使用平面图。像下面这样的东西应该工作
from pyspark.sql import Row
def rowExpander(row):
rowDict = row.asDict()
valA = rowDict.pop('A')
for k in rowDict:
yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})
newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))
回答by Tgsmith61591
I took the Scala answer that @javadba wrote and created a Python version for transposing all columns in a DataFrame
. This might be a bit different from what OP was asking...
我采用了@javadba 编写的 Scala 答案并创建了一个 Python 版本,用于将DataFrame
. 这可能与 OP 所要求的有点不同......
from itertools import chain
from pyspark.sql import DataFrame
def _sort_transpose_tuple(tup):
x, y = tup
return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]
def transpose(X):
"""Transpose a PySpark DataFrame.
Parameters
----------
X : PySpark ``DataFrame``
The ``DataFrame`` that should be tranposed.
"""
# validate
if not isinstance(X, DataFrame):
raise TypeError('X should be a DataFrame, not a %s'
% type(X))
cols = X.columns
n_features = len(cols)
# Sorry for this unreadability...
return X.rdd.flatMap( # make into an RDD
lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
lambda grp_res: grp_res[0]).map( # sort by index % n_features key
lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
lambda key_col: key_col[1]).toDF() # return to DF
For example:
例如:
>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
+---+---+---+
>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 4| 7|
| 2| 5| 8|
| 3| 6| 9|
+---+---+---+
回答by Parul Singh
A very handy way to implement:
一个非常方便的实现方式:
from pyspark.sql import Row
def rowExpander(row):
rowDict = row.asDict()
valA = rowDict.pop('A')
for k in rowDict:
yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})
newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)
回答by Artem Zaika
To transpose Dataframe in pySpark
, I use pivot
over the temporary created column, which I drop at the end of the operation.
为了在 中转置 Dataframe pySpark
,我使用pivot
了临时创建的列,我在操作结束时删除了该列。
Say, we have a table like this. What we wanna do is to find all users over each listed_days_bin
value.
说,我们有一张这样的桌子。我们想要做的是找到每个listed_days_bin
值的所有用户。
+------------------+-------------+
| listed_days_bin | users_count |
+------------------+-------------+
|1 | 5|
|0 | 2|
|0 | 1|
|1 | 3|
|1 | 4|
|2 | 5|
|2 | 7|
|2 | 2|
|1 | 1|
+------------------+-------------+
Create new temp column - 'pvt_value'
, aggregate over it and pivot results
创建新的临时列 - 'pvt_value'
,对其进行聚合并透视结果
import pyspark.sql.functions as F
agg_df = df.withColumn('pvt_value', lit(1))\
.groupby('pvt_value')\
.pivot('listed_days_bin')\
.agg(F.sum('users_count')).drop('pvt_value')
New Dataframe should look like:
新数据框应如下所示:
+----+---+---+
| 0 | 1 | 2 | # Columns
+----+---+---+
| 3| 13| 14| # Users over the bin
+----+---+---+
回答by Gonza Piotti
You could use the stackfunction:
您可以使用堆栈函数:
for example:
例如:
df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")
where:
在哪里:
- 2 is the number of columns to stack (col_1 and col_2)
- 'col_1' is a string for the key
- col_1 is the column from which to take the values
- 2 是要堆叠的列数(col_1 和 col_2)
- 'col_1' 是键的字符串
- col_1 是从中获取值的列
if you have several columns, you could build the whole stackstring iterating the column names and pass that to selectExpr
如果您有多个列,则可以构建整个堆栈字符串迭代列名并将其传递给selectExpr