如何在 JSON 中使用 read.schema 仅指定特定字段:SPARK Scala

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38277317/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:28:04  来源:igfitidea点击:

How to specify only particular fields using read.schema in JSON : SPARK Scala

jsonscalaapache-sparkrdd

提问by Rockie Yang

I am trying to programmatically enforce schema(json) on textFile which looks like json. I tried with jsonFile but the issue is for creating a dataframe from a list of json files, spark has to do a 1 pass through the data to create a schema for the dataframe. So it needs to parse all the data which is taking longer time (4 hours since my data is zipped and of size TBs). So I want to try reading it as textFile and enforce schema to get interested fields alone to later query on the resulting data frame. But I am not sure how do I map it to the input. Can some give me some reference on how do I map schema to json like input.

我正在尝试以编程方式在看起来像 json 的 textFile 上强制执行 schema(json)。我尝试使用 jsonFile 但问题是从 json 文件列表创建数据帧,spark 必须对数据执行 1 次传递才能为数据帧创建模式。所以它需要解析所有需要更长时间的数据(因为我的数据被压缩并且大小为 TB 需要 4 小时)。因此,我想尝试将其作为 textFile 读取并强制执行模式以单独获取感兴趣的字段,以便以后对结果数据框进行查询。但我不确定如何将其映射到输入。有人可以给我一些关于如何将模式映射到 json 之类的参考。

input :

输入 :

This is the full schema :

这是完整的架构:

records: org.apache.spark.sql.DataFrame = [country: string, countryFeatures: string, customerId: string, homeCountry: string, homeCountryFeatures: string, places: array<struct<freeTrial:boolean,placeId:string,placeRating:bigint>>, siteName: string, siteId: string, siteTypeId: string, Timestamp: bigint, Timezone: string, countryId: string, pageId: string, homeId: string, pageType: string, model: string, requestId: string, sessionId: string, inputs: array<struct<inputName:string,inputType:string,inputId:string,offerType:string,originalRating:bigint,processed:boolean,rating:bigint,score:double,methodId:string>>] 

But I am only interested in few fields like :

但我只对以下几个领域感兴趣:

res45: Array[String] = Array({"requestId":"bnjinmm","siteName":"bueller","pageType":"ad","model":"prepare","inputs":[{"methodId":"436136582","inputType":"US","processed":true,"rating":0,"originalRating":1},{"methodId":"23232322","inputType":"UK","processed":falase,"rating":0,"originalRating":1}]


 val  records = sc.textFile("s3://testData/sample.json.gz")

  val schema = StructType(Array(StructField("requestId",StringType,true),
                          StructField("siteName",StringType,true),
                          StructField("model",StringType,true),
                          StructField("pageType",StringType,true),
                          StructField("inputs", ArrayType(
                                StructType(
                                            StructField("inputType",StringType,true), 
                                            StructField("originalRating",LongType,true), 
                                            StructField("processed",BooleanType,true), 
                                            StructField("rating",LongType,true), 
                                            StructField("methodId",StringType,true)
                                            ),true),true)))

    val rowRDD = ?? 

    val inputRDD = sqlContext.applySchema(rowRDD, schema)
    inputRDD.registerTempTable("input")

     sql("select * from input").foreach(println)

Is there any way to map this ? Or do I need to use son parser or something. I want to use textFile only because of the constraints.

有什么办法可以映射这个吗?还是我需要使用子解析器或其他东西。由于限制,我只想使用 textFile 。

Tried with :

尝试过:

val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

But keeping getting the error :

但不断收到错误:

<console>:37: error: overloaded method value apply with alternatives:
     (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
      (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
      (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
     cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("inputs",ArrayType(StructType(StructField("inputType",StringType,true), StructField("originalRating",LongType,true), StructField("processed",BooleanType,true), StructField("rating",LongType,true), StructField("score",DoubleType,true), StructField("methodId",StringType,true)),true),true)))
                                              ^

回答by Rockie Yang

It can load with following code with predefined schema, spark don't need to go through the file in ZIP file. The code in the question has ambiguity.

它可以使用以下带有预定义架构的代码加载,spark 不需要通过 ZIP 文件中的文件。问题中的代码有歧义。

import org.apache.spark.sql.types._

val input = StructType(
                Array(
                    StructField("inputType",StringType,true), 
                    StructField("originalRating",LongType,true), 
                    StructField("processed",BooleanType,true), 
                    StructField("rating",LongType,true), 
                    StructField("score",DoubleType,true), 
                    StructField("methodId",StringType,true)
                )
            )

 val schema = StructType(Array(
    StructField("requestId",StringType,true),
    StructField("siteName",StringType,true),
    StructField("model",StringType,true),
    StructField("inputs",
        ArrayType(input,true),
                true)
    )
)

val  records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")

Not all the fields need to be provided. While it's good to provide all if possible.

并非所有字段都需要提供。如果可能的话,最好提供所有内容。

Spark try best to parse all, if some row is not valid. It will add _corrupt_record as a column which contains the whole row. While if it's plained json file file.

如果某行无效,Spark 会尽量解析所有内容。它将添加 _corrupt_record 作为包含整行的列。而如果它是简单的json文件文件。