Spark Row 到 JSON
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36157810/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark Row to JSON
提问by navige
I would like to create a JSON from a Spark v.1.6 (using scala) dataframe. I know that there is the simple solution of doing df.toJSON.
我想从 Spark v.1.6(使用 scala)数据帧创建一个 JSON。我知道有一个简单的解决方案 do df.toJSON。
However, my problem looks a bit different. Consider for instance a dataframe with the following columns:
但是,我的问题看起来有点不同。例如,考虑具有以下列的数据框:
| A | B | C1 |? C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |
I would like to have at the end a dataframe with
我想最后有一个数据框
| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } |
where C is a JSON containing C1, C2, C3. Unfortunately, I at compile time I do not know what the dataframe looks like (except the columns Aand Bthat are always "fixed").
其中 C 是包含C1, C2,的 JSON C3。不幸的是,我在编译时不知道数据框的样子(列除外,A并且B这些列始终是“固定的”)。
As for the reason why I need this: I am using Protobuf for sending around the results. Unfortunately, my dataframe sometimes has more columns than expected and I would still send those via Protobuf, but I do not want to specify all columns in the definition.
至于我需要这个的原因:我使用 Protobuf 发送结果。不幸的是,我的数据框有时比预期的列多,我仍然会通过 Protobuf 发送这些列,但我不想在定义中指定所有列。
How can I achieve this?
我怎样才能做到这一点?
回答by Michael Armbrust
回答by David Griffin
Here, no JSON parser, and it adapts to your schema:
在这里,没有 JSON 解析器,它适应您的架构:
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
df.select(
col(df.columns(0)),
col(df.columns(1)),
concat(
lit("{"),
concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
val c = dt._1;
val t = dt._2;
concat(
lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ),
col(c),
lit(if(t=="StringType") "\""; else "")
)
}):_*),
lit("}")
) as "C"
).collect()
回答by zero323
First lets convert C's to a struct:
首先让我们将 C 转换为 a struct:
val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
This is structure can be converted to JSONL using toJSONas before:
可以toJSON像以前一样使用此结构将其转换为 JSONL :
dfStruct.toJSON.collect
// Array[String] = Array(
// {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}},
// {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
I am not aware of any built-in method that can convert a single column but you can either convert it individually and joinor use your favorite JSON parser in an UDF.
我不知道任何可以转换单个列的内置方法,但您可以单独转换它,join或者在 UDF 中使用您最喜欢的 JSON 解析器。
case class C(C1: String, C2: Int, C3: Boolean)
object CJsonizer {
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.Hymanson.Serialization
import org.json4s.Hymanson.Serialization.write
implicit val formats = Serialization.formats(org.json4s.NoTypeHints)
def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}
val cToJSON = udf((c1: String, c2: Int, c3: Boolean) =>
CJsonizer.toJSON(c1, c2, c3))
df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
回答by Cyanny
I use this command to solve the to_json problem:
我使用这个命令来解决 to_json 问题:
output_df = (df.select(to_json(struct(col("*"))).alias("content")))

