scala Spark:在不聚合的情况下转置 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40892459/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:53:21  来源:igfitidea点击:

Spark: Transpose DataFrame Without Aggregating

scalaapache-spark

提问by nevi_me

I have looked at a number of questions online, but they don't seem to do what I'm trying to achieve.

我在网上看了很多问题,但他们似乎没有做我想要达到的目标。

I'm using Apache Spark 2.0.2 with Scala.

我将 Apache Spark 2.0.2 与 Scala 一起使用。

I have a dataframe:

我有一个数据框:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+

which I want to transpose to

我想转置

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+

I've tried using pivot()but I couldn't get to the right answer. I ended up looping through my val{x}columns, and pivoting each as per below, but this is proving to be very slow.

我试过使用,pivot()但我无法得到正确的答案。我最终循环浏览了我的val{x}专栏,并按照下面的方式旋转了每个专栏,但这被证明是非常慢的。

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+

Then using union()on each iteration of val{x}to my first dataframe.

然后union()val{x}我的第一个数据帧的每次迭代中使用。

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+

Is there a more efficient way of a transpose where I do not want to aggregate data?

在我不想聚合数据的情况下,是否有更有效的转置方法?

Thanks :)

谢谢 :)

回答by zero323

Unfortunately there is no case when:

不幸的是,没有以下情况:

  • Spark DataFrameis justified considering amount of data.
  • Transposition of data is feasible.
  • DataFrame考虑到数据量,Spark是合理的。
  • 数据的转置是可行的。

You have to remember that DataFrame, as implemented in Spark, is a distributed collection of rows and each row is stored and processed on a single node.

您必须记住DataFrame,正如在 Spark 中实现的那样,是分布式行集合,每一行都在单个节点上存储和处理。

You could express transposition on a DataFrameas pivot:

您可以将 a 上的换位表示DataFramepivot

val kv = explode(array(df.columns.tail.map { 
  c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*))

df
  .withColumn("kv", kv)
  .select($"segment_id", $"kv.k", $"kv.v")
  .groupBy($"k")
  .pivot("segment_id")
  .agg(first($"v"))
  .orderBy($"k")
  .withColumnRenamed("k", "vals")

but it is merely a toy code with no practical applications. In practice it is not better than collecting data:

但它只是一个玩具代码,没有实际应用。在实践中,它并不比收集数据更好:

val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
  case Array(h, t @ _*) => {
    (h.map(_.toString), t.map(_.collect { case x: Int => x }))
  }
}

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
  StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)

spark.createDataFrame(sc.parallelize(rows), schema)

For DataFramedefined as:

对于DataFrame定义为:

val df = Seq(
  (1, 100, 0, 0, 0, 0, 0),
  (2, 0, 50, 0, 0, 20, 0),
  (3, 0, 0, 0, 0, 0, 0),
  (4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")

both would you give you the desired result:

你会给你想要的结果:

+----+---+---+---+---+
|vals|  1|  2|  3|  4|
+----+---+---+---+---+
|val1|100|  0|  0|  0|
|val2|  0| 50|  0|  0|
|val3|  0|  0|  0|  0|
|val4|  0|  0|  0|  0|
|val5|  0| 20|  0|  0|
|val6|  0|  0|  0|  0|
+----+---+---+---+---+

That being said if you need an efficient transpositions on distributed data structure you'll have to look somewhere else. There is a number of structures, including core CoordinateMatrixand BlockMatrix, which can distribute data across both dimensions and can be transposed.

话虽如此,如果您需要对分布式数据结构进行有效的转换,您将不得不寻找其他地方。有许多结构,包括 coreCoordinateMatrixBlockMatrix,它们可以跨两个维度分布数据并且可以转置。

回答by Santhoshm

This should be a perfect solution.

这应该是一个完美的解决方案。

val seq = Seq((1,100,0,0,0,0,0),(2,0,50,0,0,20,0),(3,0,0,0,0,0,0),(4,0,0,0,0,0,0))
val df1 = seq.toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
df1.show()

val schema = df1.schema

val df2 = df1.flatMap(row => {
  val metric = row.getInt(0)
  (1 until row.size).map(i => {
    (metric, schema(i).name, row.getInt(i))
  })
})

val df3 = df2.toDF("metric", "vals", "value")
df3.show()
import org.apache.spark.sql.functions._

val df4 = df3.groupBy("vals").pivot("metric").agg(first("value"))
df4.show()