在 Spark/Scala 中将 RDD 转换为数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33127970/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Convert RDD to Dataframe in Spark/Scala
提问by sparkDabbler
The RDD has been created in the format Array[Array[String]]and has the following values:
RDD 已按格式创建Array[Array[String]]并具有以下值:
val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))
I want to create a dataFrame with the schema :
我想用架构创建一个数据框:
val schemaString = "callId oCallId callTime duration calltype swId"
Next steps:
下一步:
scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)
Gives the following error:
给出以下错误:
console:45: error: overloaded method value createDataFrame with alternatives:
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType)
val calDF = sqlContext.createDataFrame(rowRDD, schema)
回答by Beryllium
Just paste into a spark-shell:
只需粘贴到一个spark-shell:
val a =
Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))
val rdd = sc.makeRDD(a)
case class X(callId: String, oCallId: String,
callTime: String, duration: String, calltype: String, swId: String)
Then map()over the RDD to create instances of the case class, and then create the DataFrame using toDF():
然后map()通过 RDD 创建 case 类的实例,然后使用toDF()以下命令创建 DataFrame :
scala> val df = rdd.map {
case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame =
[callId: string, oCallId: string, callTime: string,
duration: string, calltype: string, swId: string]
This infers the schema from the case class.
这从案例类中推断出模式。
Then you can proceed with:
然后你可以继续:
scala> df.printSchema()
root
|-- callId: string (nullable = true)
|-- oCallId: string (nullable = true)
|-- callTime: string (nullable = true)
|-- duration: string (nullable = true)
|-- calltype: string (nullable = true)
|-- swId: string (nullable = true)
scala> df.show()
+----------+-------+-------------------+--------+--------+----+
| callId|oCallId| callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797| 0|2015-07-29 10:38:42| 0| 1| 1|
|4580056797| 0|2015-07-29 10:38:42| 0| 1| 1|
+----------+-------+-------------------+--------+--------+----+
If you want to use toDF()in a normal program (not in the spark-shell), make sure (quoted from here):
如果要toDF()在普通程序中使用(不在 中spark-shell),请确保(引自此处):
- To
import sqlContext.implicits._right after creating theSQLContext - Define the case class outside of the method using
toDF()
- 要
import sqlContext.implicits._在创建之后SQLContext - 使用以下方法在方法之外定义案例类
toDF()
回答by Eugene Zhulenev
You need to convert first you Arrayinto Rowand then define schema. I made assumption that most of your fields are Long
您需要先将您Array转换为Row模式,然后再定义模式。我假设你的大部分领域都是Long
val rdd: RDD[Array[String]] = ???
val rows: RDD[Row] = rdd map {
case Array(callId, oCallId, callTime, duration, swId) =>
Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
}
object schema {
val callId = StructField("callId", LongType)
val oCallId = StructField("oCallId", StringType)
val callTime = StructField("callTime", StringType)
val duration = StructField("duration", LongType)
val swId = StructField("swId", LongType)
val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
}
sqlContext.createDataFrame(rows, schema.struct)
回答by Sida Zhou
Using spark 1.6.1and scala 2.10
使用spark 1.6.1和scala 2.10
I got the same error error: overloaded method value createDataFrame with alternatives:
我遇到了同样的错误 error: overloaded method value createDataFrame with alternatives:
For me, gotcha was the signature in createDataFrame, I was trying to use the val rdd : List[Row], but it failed
because java.util.List[org.apache.spark.sql.Row]and scala.collection.immutable.List[org.apache.spark.sql.Row]are NOT the same.
对我来说,gotcha 是 中的签名createDataFrame,我试图使用val rdd : List[Row],但它失败了,因为java.util.List[org.apache.spark.sql.Row]和scala.collection.immutable.List[org.apache.spark.sql.Row]不一样。
The working solution I've found is I would convert val rdd : Array[Array[String]]into RDD[Row]via List[Array[String]]. I find this is the closest to what's in the documentation
我发现的工作解决方案是我将转换val rdd : Array[Array[String]]为RDD[Row]via List[Array[String]]。我发现这是最接近文档中的内容
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rdd_original : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))
val rdd : List[Array[String]] = rdd_original.toList
val schemaString = "callId oCallId callTime duration calltype swId"
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD to Rows.
val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier
// val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works
val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema)
df.show
回答by ccheneson
I assume that your schemais, like in the Spark Guide, as follow:
我假设你schema是,就像在Spark Guide中一样,如下:
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
If you look at the signature of the createDataFrame, here is the one that accepts a StructType as 2nd argument (for Scala)
如果您查看createDataFrame的签名,这里是接受 StructType 作为第二个参数的签名(对于 Scala)
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Creates a DataFrame from an RDD containing Rows using the given schema.
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
使用给定的模式从包含行的 RDD 创建一个 DataFrame。
So it accepts as 1st argument a RDD[Row]. What you have in rowRDDis a RDD[Array[String]]so there is a mismatch.
所以它接受第一个参数 a RDD[Row]。你有什么rowRDD是RDD[Array[String]]如此不匹配。
Do you need an RDD[Array[String]]?
你需要RDD[Array[String]]吗?
Otherwise you can use the following to create your dataframe:
否则,您可以使用以下内容来创建您的数据框:
val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))

