scala 如何将新的 Struct 列添加到 DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31615657/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to add a new Struct column to a DataFrame
提问by Kim Ngo
I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points.
我目前正在尝试从 MongoDB 中提取数据库并使用 Spark 将geo_points.
The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_pointtype.
Mongo 数据库有纬度和经度值,但 ElasticSearch 要求将它们转换为geo_point类型。
Is there a way in Spark to copy the latand loncolumns to a new column that is an arrayor struct?
Spark 中有没有办法将lat和lon列复制到一个array或的新列中struct?
Any help is appreciated!
任何帮助表示赞赏!
回答by zero323
I assume you start with some kind of flat schema like this:
我假设您从某种类似这样的平面模式开始:
root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)
First lets create example data:
首先让我们创建示例数据:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)
val df = sqlContext.createDataFrame(rdd, schema)
An easy way is to use an udf and case class:
一个简单的方法是使用 udf 和 case 类:
case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")
dfRes.printSchema
and we get
我们得到
root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)
A hard way is to transform your data and apply schema afterwards:
一个困难的方法是转换您的数据并在之后应用架构:
val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}
val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)
sqlContext.createDataFrame(rddRes, schemaRes).show
and we get an expected output
我们得到了预期的输出
+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+
Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:
从头开始创建嵌套模式可能很乏味,所以如果可以的话,我会推荐第一种方法。如果您需要更复杂的结构,它可以轻松扩展:
case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema
and we get expected output:
我们得到了预期的输出:
root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)
Unfortunately you have no control over nullablefield so if is important for your project you'll have to specify schema.
不幸的是,您无法控制nullable字段,因此如果对您的项目很重要,则必须指定架构。
Finally you can use structfunction introduced in 1.4:
最后你可以使用struct1.4 中引入的函数:
import org.apache.spark.sql.functions.struct
df.select($"key", struct($"lat", $"long").alias("location"))
回答by user8817325
Try this:
试试这个:
import org.apache.spark.sql.functions._
df.registerTempTable("dt")
dfres = sql("select struct(lat,lon) as colName from dt")

