Spark Row 到 JSON

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36157810/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-03 18:20:04  来源:igfitidea点击:

Spark Row to JSON

jsonscalaapache-sparkapache-spark-sql

提问by navige

I would like to create a JSON from a Spark v.1.6 (using scala) dataframe. I know that there is the simple solution of doing df.toJSON.

我想从 Spark v.1.6(使用 scala)数据帧创建一个 JSON。我知道有一个简单的解决方案 do df.toJSON

However, my problem looks a bit different. Consider for instance a dataframe with the following columns:

但是,我的问题看起来有点不同。例如,考虑具有以下列的数据框:

|  A  |     B     |  C1  |? C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

I would like to have at the end a dataframe with

我想最后有一个数据框

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |

where C is a JSON containing C1, C2, C3. Unfortunately, I at compile time I do not know what the dataframe looks like (except the columns Aand Bthat are always "fixed").

其中 C 是包含C1, C2,的 JSON C3。不幸的是,我在编译时不知道数据框的样子(列除外,A并且B这些列始终是“固定的”)。

As for the reason why I need this: I am using Protobuf for sending around the results. Unfortunately, my dataframe sometimes has more columns than expected and I would still send those via Protobuf, but I do not want to specify all columns in the definition.

至于我需要这个的原因:我使用 Protobuf 发送结果。不幸的是,我的数据框有时比预期的列多,我仍然会通过 Protobuf 发送这些列,但我不想在定义中指定所有列。

How can I achieve this?

我怎样才能做到这一点?

回答by Michael Armbrust

Spark 2.1 should have native support for this use case (see #15354).

Spark 2.1 应该对此用例有本机支持(请参阅#15354)。

import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))

回答by David Griffin

Here, no JSON parser, and it adapts to your schema:

在这里,没有 JSON 解析器,它适应您的架构:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
        col(c),
        lit(if(t=="StringType") "\""; else "") 
      )
    }):_*), 
    lit("}")
  ) as "C"
).collect()

回答by zero323

First lets convert C's to a struct:

首先让我们将 C 转换为 a struct

val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))

This is structure can be converted to JSONL using toJSONas before:

可以toJSON像以前一样使用此结构将其转换为 JSONL :

dfStruct.toJSON.collect
// Array[String] = Array(
//   {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
//   {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})

I am not aware of any built-in method that can convert a single column but you can either convert it individually and joinor use your favorite JSON parser in an UDF.

我不知道任何可以转换单个列的内置方法,但您可以单独转换它,join或者在 UDF 中使用您最喜欢的 JSON 解析器。

case class C(C1: String, C2: Int, C3: Boolean)

object CJsonizer {
  import org.json4s._
  import org.json4s.JsonDSL._
  import org.json4s.Hymanson.Serialization
  import org.json4s.Hymanson.Serialization.write

  implicit val formats = Serialization.formats(org.json4s.NoTypeHints)

  def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}


val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
  CJsonizer.toJSON(c1, c2, c3))

df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))

回答by Cyanny

I use this command to solve the to_json problem:

我使用这个命令来解决 to_json 问题:

output_df = (df.select(to_json(struct(col("*"))).alias("content")))