scala Spark JSON 文本字段到 RDD
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30033875/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark JSON text field to RDD
提问by galex
I've got a cassandra table with a field of type text named snapshot containing JSON objects:
我有一个 cassandra 表,其中包含一个名为 snapshot 的文本类型字段,其中包含 JSON 对象:
[identifier, timestamp, snapshot]
I understood that to be able to do transformations on that field with Spark, I need to convert that field of that RDD to another RDD to make transformations on the JSON schema.
我知道为了能够使用 Spark 对该字段进行转换,我需要将该 RDD 的该字段转换为另一个 RDD 以对 JSON 模式进行转换。
Is that correct? How should I proceed to to that?
那是对的吗?我应该如何处理?
Edit: For now I managed to create an RDD from a single text field:
编辑:现在我设法从单个文本字段创建一个 RDD:
val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()
Which shows me the JSON schema. Good!
这向我展示了 JSON 模式。好的!
How do I proceed to tell Spark that this schema should be applied on all rows of the table Snapshots, to get an RDD on that snapshot field from each row?
我如何继续告诉 Spark 该模式应该应用于表 Snapshots 的所有行,以从每一行获取该快照字段上的 RDD?
回答by RussS
Almost there, you just want to pass your an RDD[String] with your json into the
jsonRDDmethod
差不多了,您只想将带有 json 的 RDD[String] 传递到
jsonRDD方法中
val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String])
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect
A quick example
一个简单的例子
val stringRDD = sc.parallelize(Seq("""
{ "isActive": false,
"balance": ",431.73",
"picture": "http://placehold.it/32x32",
"age": 35,
"eyeColor": "blue"
}""",
"""{
"isActive": true,
"balance": ",515.60",
"picture": "http://placehold.it/32x32",
"age": 34,
"eyeColor": "blue"
}""",
"""{
"isActive": false,
"balance": ",765.29",
"picture": "http://placehold.it/32x32",
"age": 26,
"eyeColor": "blue"
}""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])

