Python 使用 Spark 将列转置为行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37864222/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 20:01:48  来源:igfitidea点击:

Transpose column to row with Spark

pythonapache-sparkpivottranspose

提问by Raouf

I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:

我正在尝试将表格的某些列转置为行。我正在使用 Python 和 Spark 1.5.0。这是我的初始表:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

I would like to have somthing like this:

我想要这样的东西:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

Does someone know haw I can do it? Thank you for your help.

有人知道我怎么做吗?感谢您的帮助。

回答by zero323

It is relatively simple to do with basic Spark SQL functions.

使用基本的 Spark SQL 函数做起来相对简单。

Python

Python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

Scala:

斯卡拉

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))

  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))

回答by javadba

The Spark local linear algebra libraries are presently very weak: and they do not include basic operations as the above.

Spark 局部线性代数库目前非常薄弱:它们不包括上述基本运算。

There is a JIRA for fixing this for Spark 2.1 - but that will not help you today.

有一个 JIRA 可以为 Spark 2.1 解决这个问题——但是今天这对你没有帮助。

Something to consider: performing a transpose will likely require completely shuffling the data.

需要考虑的事情:执行转置可能需要完全混洗数据。

For now you will need to write RDD code directly. I have written transposein scala - but not in python. Here is the scalaversion:

现在你需要直接编写 RDD 代码。我用transposeScala写过——但不是用 python 写的。这是scala版本:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }

So you can convert that to python for your use. I do not have bandwidth to write/test that at this particular moment: let me know if you were unable to do that conversion.

因此,您可以将其转换为 python 供您使用。在这个特定时刻,我没有足够的带宽来编写/测试:如果您无法进行该转换,请告诉我。

At the least - the following are readily converted to python.

至少 - 以下很容易转换为python.

  • zipWithIndex--> enumerate()(python equivalent - credit to @zero323)
  • map--> [someOperation(x) for x in ..]
  • groupBy--> itertools.groupBy()
  • zipWithIndex--> enumerate()(python 等效项 - 归功于 @zero323)
  • map--> [someOperation(x) for x in ..]
  • groupBy--> itertools.groupBy()

Here is the implementation for flattenwhich does not have a python equivalent:

这是flatten没有python等效项的实现:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

So you should be able to put those together for a solution.

因此,您应该能够将它们放在一起以获得解决方案。

回答by Vamsi Prabhala

One way to solve with pyspark sqlusing functions create_mapand explode.

一种方式解决与pyspark sql使用功能create_mapexplode

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \
                    func.create_map(func.lit('col_1'),df.col_1,
                                    func.lit('col_2'),df.col_2,
                                    func.lit('col_3'),df.col_3
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()

回答by David

Use flatmap. Something like below should work

使用平面图。像下面这样的东西应该工作

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))

回答by Tgsmith61591

I took the Scala answer that @javadba wrote and created a Python version for transposing all columns in a DataFrame. This might be a bit different from what OP was asking...

我采用了@javadba 编写的 Scala 答案并创建了一个 Python 版本,用于将DataFrame. 这可能与 OP 所要求的有点不同......

from itertools import chain
from pyspark.sql import DataFrame


def _sort_transpose_tuple(tup):
    x, y = tup
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]


def transpose(X):
    """Transpose a PySpark DataFrame.

    Parameters
    ----------
    X : PySpark ``DataFrame``
        The ``DataFrame`` that should be tranposed.
    """
    # validate
    if not isinstance(X, DataFrame):
        raise TypeError('X should be a DataFrame, not a %s' 
                        % type(X))

    cols = X.columns
    n_features = len(cols)

    # Sorry for this unreadability...
    return X.rdd.flatMap( # make into an RDD
        lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
        lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
        lambda grp_res: grp_res[0]).map( # sort by index % n_features key
        lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
        lambda key_col: key_col[1]).toDF() # return to DF

For example:

例如:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+

>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  4|  7|
|  2|  5|  8|
|  3|  6|  9|
+---+---+---+

回答by Parul Singh

A very handy way to implement:

一个非常方便的实现方式:

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)

回答by Artem Zaika

To transpose Dataframe in pySpark, I use pivotover the temporary created column, which I drop at the end of the operation.

为了在 中转置 Dataframe pySpark,我使用pivot了临时创建的列,我在操作结束时删除了该列。

Say, we have a table like this. What we wanna do is to find all users over each listed_days_binvalue.

说,我们有一张这样的桌子。我们想要做的是找到每个listed_days_bin值的所有用户。

+------------------+-------------+
|  listed_days_bin | users_count | 
+------------------+-------------+
|1                 |            5| 
|0                 |            2|
|0                 |            1| 
|1                 |            3|  
|1                 |            4| 
|2                 |            5| 
|2                 |            7|  
|2                 |            2|  
|1                 |            1|
+------------------+-------------+

Create new temp column - 'pvt_value', aggregate over it and pivot results

创建新的临时列 - 'pvt_value',对其进行聚合并透视结果

import pyspark.sql.functions as F


agg_df = df.withColumn('pvt_value', lit(1))\
        .groupby('pvt_value')\
        .pivot('listed_days_bin')\
        .agg(F.sum('users_count')).drop('pvt_value')

New Dataframe should look like:

新数据框应如下所示:

+----+---+---+
|  0 | 1 | 2 | # Columns 
+----+---+---+
|   3| 13| 14| # Users over the bin
+----+---+---+

回答by Gonza Piotti

You could use the stackfunction:

您可以使用堆栈函数:

for example:

例如:

df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")

where:

在哪里:

  • 2 is the number of columns to stack (col_1 and col_2)
  • 'col_1' is a string for the key
  • col_1 is the column from which to take the values
  • 2 是要堆叠的列数(col_1 和 col_2)
  • 'col_1' 是键的字符串
  • col_1 是从中获取值的列

if you have several columns, you could build the whole stackstring iterating the column names and pass that to selectExpr

如果您有多个列,则可以构建整个堆栈字符串迭代列名并将其传递给selectExpr