分解(转置?)Spark SQL 表中的多列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33220916/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-01 04:07:43  来源:igfitidea点击:

Explode (transpose?) multiple columns in Spark SQL table

sqlapache-sparkapache-spark-sqlhiveql

提问by anthr

I am using Spark SQL (I mention that it is in Spark in case that affects the SQL syntax - I'm not familiar enough to be sure yet) and I have a table that I am trying to re-structure, but I'm getting stuck trying to transpose multiple columns at the same time.

我正在使用 Spark SQL(我提到它在 Spark 中,以防影响 SQL 语法 - 我还不够熟悉,无法确定)并且我有一个我正在尝试重新构建的表,但我试图同时转置多个列时陷入困境。

Basically I have data that looks like:

基本上我的数据看起来像:

userId    someString      varA     varB
   1      "example1"    [0,2,5]   [1,2,9]
   2      "example2"    [1,20,5]  [9,null,6]

and I'd like to explode both varA and varB simultaneously (the length will always be consistent) - so that the final output looks like this:

并且我想同时分解 varA 和 varB(长度将始终保持一致) - 以便最终输出如下所示:

userId    someString      varA     varB
   1      "example1"       0         1
   1      "example1"       2         2
   1      "example1"       5         9
   2      "example2"       1         9
   2      "example2"       20       null
   2      "example2"       5         6

but I can only seem to get a single explode(var) statement to work in one command, and if I try to chain them (ie create a temp table after the first explode command) then I obviously get a huge number of duplicate, unnecessary rows.

但我似乎只能在一个命令中使用一个单一的爆炸(var)语句,如果我尝试链接它们(即在第一个爆炸命令之后创建一个临时表),那么我显然会得到大量重复的,不必要的行。

Many thanks!

非常感谢!

回答by zero323

Spark >= 2.4

火花 >= 2.4

You can skip zipudfand use arrays_zipfunction:

您可以跳过zipudf并使用arrays_zip功能:

df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
  $"userId", $"someString",
  $"vars.varA", $"vars.varB").show

Spark < 2.4

火花 < 2.4

What you want is not possible without a custom UDF. In Scala you could do something like this:

没有自定义 UDF,您想要的东西是不可能的。在 Scala 中,您可以执行以下操作:

val data = sc.parallelize(Seq(
    """{"userId": 1, "someString": "example1",
        "varA": [0, 2, 5], "varB": [1, 2, 9]}""",
    """{"userId": 2, "someString": "example2",
        "varA": [1, 20, 5], "varB": [9, null, 6]}"""
))

val df = spark.read.json(data)

df.printSchema
// root
//  |-- someString: string (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- varA: array (nullable = true)
//  |    |-- element: long (containsNull = true)
//  |-- varB: array (nullable = true)
//  |    |-- element: long (containsNull = true)

Now we can define zipudf:

现在我们可以定义zipudf:

import org.apache.spark.sql.functions.{udf, explode}

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// |     1|  example1|   0|   1|
// |     1|  example1|   2|   2|
// |     1|  example1|   5|   9|
// |     2|  example2|   1|   9|
// |     2|  example2|  20|null|
// |     2|  example2|   5|   6|
// +------+----------+----+----+

With raw SQL:

使用原始 SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")

sqlContext.sql(
  """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")

回答by Alexandru Ivana

You could also try

你也可以试试

case class Input(
 userId: Integer,
 someString: String,
 varA: Array[Integer],
 varB: Array[Integer])

case class Result(
 userId: Integer,
 someString: String,
 varA: Integer,
 varB: Integer)

def getResult(row : Input) : Iterable[Result] = {
 val user_id = row.user_id
 val someString = row.someString
 val varA = row.varA
 val varB = row.varB
 val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))}
 seq
 }

val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9))
val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10))
val input_df = sc.parallelize(Seq(obj1, obj2)).toDS

val res = input_df.flatMap{ row => getResult(row) }
res.show
// +------+----------+----+-----+
// |userId|someString|varA|varB |
// +------+----------+----+-----+
// |     1|  string1 |   0|   1 |
// |     1|  string1 |   2|   2 |
// |     1|  string1 |   5|   9 |
// |     2|  string2 |   1|   2 |
// |     2|  string2 |   3|   3 |
// |     2|  string2 |   6|   10|
// +------+----------+----+-----+