scala 如何在spark数据帧/spark sql中读取带有模式的json
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39355149/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
how to read json with schema in spark dataframes/spark sql
提问by raj kumar
sql/dataframes, please help me out or provide some good suggestion on how to read this json
sql/dataframes,请帮助我或提供一些关于如何阅读此 json 的好建议
{
"billdate":"2016-08-08',
"accountid":"xxx"
"accountdetails":{
"total":"1.1"
"category":[
{
"desc":"one",
"currentinfo":{
"value":"10"
},
"subcategory":[
{
"categoryDesc":"sub",
"value":"10",
"currentinfo":{
"value":"10"
}
}]
}]
}
}
Thanks,
谢谢,
采纳答案by Ram Ghadiyaram
Seems like your json is not valid. pls check with http://www.jsoneditoronline.org/
好像您的 json 无效。请检查http://www.jsoneditoronline.org/
Please see an-introduction-to-json-support-in-spark-sql.html
请参阅an-introduction-to-json-support-in-spark-sql.html
if you want to register as the table you can register like below and print the schema.
如果你想注册为表,你可以像下面一样注册并打印模式。
DataFrame df = sqlContext.read().json("/path/to/validjsonfile").toDF();
df.registerTempTable("df");
df.printSchema();
Below is sample code snippet
下面是示例代码片段
DataFrame app = df.select("toplevel");
app.registerTempTable("toplevel");
app.printSchema();
app.show();
DataFrame appName = app.select("toplevel.sublevel");
appName.registerTempTable("sublevel");
appName.printSchema();
appName.show();
Example with scala :
使用 scala 的示例:
{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}
val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame
Reading top level field
读取顶级字段
val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)
Use the select() method to specify the top-level field, collect() to collect it into an Array[Row], and the getString() method to access a column inside each Row.
使用 select() 方法指定顶级字段,使用 collect() 将其收集到 Array[Row] 中,使用 getString() 方法访问每行内的列。
Flatten and Read a JSON Array
展平并读取 JSON 数组
each Person has an array of "cities". Let's flatten these arrays and read out all their elements.
每个 Person 都有一个“城市”数组。让我们展平这些数组并读出它们的所有元素。
val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame
val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]
allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)
The explode() method explodes, or flattens, the cities array into a new column named "city". We then use select() to select the new column, collect() to collect it into an Array[Row], and getString() to access the data inside each Row.
explode() 方法将城市数组分解或展平到一个名为“city”的新列中。然后我们使用 select() 选择新列,使用 collect() 将其收集到一个 Array[Row] 中,并使用 getString() 访问每一行内的数据。
Read an Array of Nested JSON Objects, Unflattened
读取未平展的嵌套 JSON 对象数组
read out the "schools" data, which is an array of nested JSON objects. Each element of the array holds the school name and year:
读出“学校”数据,这是一个嵌套的 JSON 对象数组。数组的每个元素都包含学校名称和年份:
val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]
val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]
schoolsArr.foreach(schools => {
schools.map(row => print(row.getString(0), row.getLong(1)))
print("\n")
})
(stanford,2010)(berkeley,2012)
(ucsb,2011)
(berkeley,2014)
Use select()and collect()to select the "schools" array and collect it into an Array[Row]. Now, each "schools" array is of type List[Row], so we read it out with the getSeq[Row]()method. Finally, we can read the information for each individual school, by calling getString()for the school name and getLong()for the school year.
使用select()和collect()选择“学校”数组并将其收集到一个Array[Row]. 现在,每个“学校”数组的类型都是List[Row],所以我们用getSeq[Row]()方法读出它。最后,我们可以通过调用getString()学校名称和学年来阅读每个学校的信息getLong()。
回答by Raghavan
You can try the following code to read the JSON file based on Schema in Spark 2.2
Spark 2.2中基于Schema的JSON文件读取可以尝试如下代码
import org.apache.spark.sql.types.{DataType, StructType}
//Read Json Schema and Create Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json
//add the schema
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]
//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")

