如何在 Spark 2 Scala 中将 Row 转换为 json

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41601874/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-03 18:46:21  来源:igfitidea点击:

How to convert Row to json in Spark 2 Scala

jsonscalaapache-sparkjson4s

提问by Sami Badawi

Is there a simple way to converting a given Row object to json?

有没有一种简单的方法可以将给定的 Row 对象转换为 json?

Found this about converting a whole Dataframe to json output: Spark Row to JSON

发现这个关于将整个数据帧转换为 json 输出: Spark Row to JSON

But I just want to convert a one Row to json. Here is pseudo code for what I am trying to do.

但我只想将一行转换为 json。这是我正在尝试做的伪代码。

More precisely I am reading json as input in a Dataframe. I am producing a new output that is mainly based on columns, but with one json field for all the info that does not fit into the columns.

更准确地说,我正在读取 json 作为数据帧中的输入。我正在生成一个主要基于列的新输出,但有一个 json 字段用于所有不适合列的信息。

My question what is the easiest way to write this function: convertRowToJson()

我的问题是编写此函数的最简单方法是什么:convertRowToJson()

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom's Solutions:

Psidom 的解决方案:

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

only works if the Row only has one level not with nested Row. This is the schema:

仅当 Row 只有一层而不是嵌套 Row 时才有效。这是架构:

StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

Also tried Artem suggestion, but that did not compile:

还尝试了 Artem 建议,但没有编译:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}

采纳答案by Sami Badawi

I need to read json input and produce json output. Most fields are handled individually, but a few json sub objects need to just be preserved.

我需要读取 json 输入并生成 json 输出。大多数字段都是单独处理的,但只需要保留一些 json 子对象。

When Spark reads a dataframe it turns a record into a Row. The Row is a json like structure. That can be transformed and written out to json.

当 Spark 读取数据帧时,它将记录转换为行。Row 是一个类似 json 的结构。可以将其转换并写出到 json。

But I need to take some sub json structures out to a string to use as a new field.

但是我需要将一些子 json 结构取出到字符串中以用作新字段。

This can be done like this:

这可以像这样完成:

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))

location.addressis the path to the sub json object of the incoming json based dataframe. address_jsonis the column name of that object converted to a string version of the json.

location.address是传入的基于 json 的数据帧的子 json 对象的路径。address_json是该对象的列名转换为 json 的字符串版本。

to_jsonis implemented in Spark 2.1.

to_json在 Spark 2.1 中实现。

If generating it output json using json4s address_json should be parsed to an AST representation otherwise the output json will have the address_json part escaped.

如果使用 json4s address_json 生成它输出 json 应该被解析为 AST 表示,否则输出 json 将有 address_json 部分转义。

回答by Psidom

You can use getValuesMapto convert the row object to a Map and then convert it JSON:

您可以使用getValuesMap将行对象转换为 Map,然后将其转换为 JSON:

import scala.util.parsing.json.JSONObject
import org.apache.spark.sql._

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")    
val row = df.first()          // this is an example row object

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

convertRowToJSON(row)
// res46: String = {"A" : 1, "B" : 2, "C" : 3}

回答by Arnon Rodman

Pay attention scala class scala.util.parsing.json.JSONObject is deprecated and not support null values.

注意 scala 类 scala.util.parsing.json.JSONObject 已弃用且不支持空值。

@deprecated("This class will be removed.", "2.11.0")

@deprecated("这个类将被删除。", "2.11.0")

"JSONFormat.defaultFormat doesn't handle null values"

“JSONFormat.defaultFormat 不处理空值”

https://issues.scala-lang.org/browse/SI-5092

https://issues.scala-lang.org/browse/SI-5092

回答by Artem

Essentially, you can have a dataframe which contains just one row. Thus, you can try to filter your initial dataframe and then parse it to json.

本质上,您可以拥有一个仅包含一行的数据框。因此,您可以尝试过滤初始数据帧,然后将其解析为 json。

回答by KiranM

JSon has schema but Row doesn't have a schema, so you need to apply schema on Row & convert to JSon. Here is how you can do it.

JSon 有架构,但 Row 没有架构,因此您需要在 Row 上应用架构并转换为 JSon。这是您如何做到的。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

def convertRowToJson(row: Row): String = {

  val schema = StructType(
      StructField("name", StringType, true) ::
      StructField("meta", StringType, false) ::  Nil)

      return sqlContext.applySchema(row, schema).toJSON
}

回答by Ehud Lev

I had the same issue, I had parquet files with canonical schema (no arrays), and I only want to get json events. I did as follows, and it seems to work just fine (Spark 2.1):

我有同样的问题,我有规范模式(没有数组)的镶木地板文件,我只想获取 json 事件。我做了如下,它似乎工作得很好(Spark 2.1):

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import scala.util.parsing.json.JSONFormat.ValueFormatter
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}

def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = {
  schema.fields.map {
    field =>
      try{
        if (field.dataType.typeName.equals("struct")){
          field.name -> getValuesMap(row.getAs[Row](field.name),   field.dataType.asInstanceOf[StructType]) 
        }else{
          field.name -> row.getAs[T](field.name)
        }
      }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}}
  }.filter(xy => xy._2 != null).toMap
}

def convertRowToJSON(row: Row, schema: StructType): JSONObject = {
  val m: Map[String, Any] = getValuesMap(row, schema)
  JSONObject(m)
}
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
val defaultFormatter : ValueFormatter = (x : Any) => x match {
  case s : String => "\"" + JSONFormat.quoteString(s) + "\""
  case jo : JSONObject => jo.toString(defaultFormatter)
  case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
  case ja : JSONArray => ja.toString(defaultFormatter)
  case other => other.toString
}

val someFile = "s3a://bucket/file"
val df: DataFrame = sqlContext.read.load(someFile)
val schema: StructType = df.schema
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema))

回答by TheTeacher

if you are iterating through an data frame , you can directly convert the data frame to a new dataframe with json object inside and iterate that

如果您正在遍历数据帧,则可以直接将数据帧转换为内部包含 json 对象的新数据帧并对其进行迭代

val df_json = df.toJSON

val df_json = df.toJSON

回答by Sami Badawi

I combining the suggestion from: Artem, KiranM and Psidom. Did a lot of trails and error and came up with this solutions that I tested for nested structures:

我结合了来自 Artem、KiranM 和 Psidom 的建议。做了很多跟踪和错误,并提出了我测试嵌套结构的解决方案:

def row2Json(row: Row, sqlContext: SQLContext): String = {
  import sqlContext.implicits
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
  dataframe.toJSON.first
}

This solution worked, but only while running in driver mode.

此解决方案有效,但仅在以驱动程序模式运行时有效。