在 Spark/Scala 中将 RDD 转换为数据帧

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33127970/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:42:32  来源:igfitidea点击:

Convert RDD to Dataframe in Spark/Scala

scalahadoopapache-spark

提问by sparkDabbler

The RDD has been created in the format Array[Array[String]]and has the following values:

RDD 已按格式创建Array[Array[String]]并具有以下值:

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))

I want to create a dataFrame with the schema :

我想用架构创建一个数据框:

val schemaString = "callId oCallId callTime duration calltype swId"

Next steps:

下一步:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

Gives the following error:

给出以下错误:

console:45: error: overloaded method value createDataFrame with alternatives:
     (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
    cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],   
    org.apache.spark.sql.types.StructType)
       val calDF = sqlContext.createDataFrame(rowRDD, schema)

回答by Beryllium

Just paste into a spark-shell:

只需粘贴到一个spark-shell

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)

Then map()over the RDD to create instances of the case class, and then create the DataFrame using toDF():

然后map()通过 RDD 创建 case 类的实例,然后使用toDF()以下命令创建 DataFrame :

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]

This infers the schema from the case class.

这从案例类中推断出模式。

Then you can proceed with:

然后你可以继续:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+

If you want to use toDF()in a normal program (not in the spark-shell), make sure (quoted from here):

如果要toDF()在普通程序中使用(不在 中spark-shell),请确保(引自此处):

  • To import sqlContext.implicits._right after creating the SQLContext
  • Define the case class outside of the method using toDF()
  • import sqlContext.implicits._在创建之后SQLContext
  • 使用以下方法在方法之外定义案例类 toDF()

回答by Eugene Zhulenev

You need to convert first you Arrayinto Rowand then define schema. I made assumption that most of your fields are Long

您需要先将您Array转换为Row模式,然后再定义模式。我假设你的大部分领域都是Long

    val rdd: RDD[Array[String]] = ???
    val rows: RDD[Row] = rdd map {
      case Array(callId, oCallId, callTime, duration, swId) =>
        Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
    }

    object schema {
      val callId = StructField("callId", LongType)
      val oCallId = StructField("oCallId", StringType)
      val callTime = StructField("callTime", StringType)
      val duration = StructField("duration", LongType)
      val swId = StructField("swId", LongType)

      val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
    }

    sqlContext.createDataFrame(rows, schema.struct)

回答by Sida Zhou

Using spark 1.6.1and scala 2.10

使用spark 1.6.1scala 2.10

I got the same error error: overloaded method value createDataFrame with alternatives:

我遇到了同样的错误 error: overloaded method value createDataFrame with alternatives:

For me, gotcha was the signature in createDataFrame, I was trying to use the val rdd : List[Row], but it failed because java.util.List[org.apache.spark.sql.Row]and scala.collection.immutable.List[org.apache.spark.sql.Row]are NOT the same.

对我来说,gotcha 是 中的签名createDataFrame,我试图使用val rdd : List[Row],但它失败了,因为java.util.List[org.apache.spark.sql.Row]scala.collection.immutable.List[org.apache.spark.sql.Row]不一样。

The working solution I've found is I would convert val rdd : Array[Array[String]]into RDD[Row]via List[Array[String]]. I find this is the closest to what's in the documentation

我发现的工作解决方案是我将转换val rdd : Array[Array[String]]RDD[Row]via List[Array[String]]。我发现这是最接近文档中的内容

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val rdd_original : Array[Array[String]] = Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd : List[Array[String]] = rdd_original.toList

val schemaString = "callId oCallId callTime duration calltype swId"

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD to Rows.
val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier
// val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works

val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema)
df.show

回答by ccheneson

I assume that your schemais, like in the Spark Guide, as follow:

我假设你schema是,就像在Spark Guide中一样,如下:

val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

If you look at the signature of the createDataFrame, here is the one that accepts a StructType as 2nd argument (for Scala)

如果您查看createDataFrame的签名,这里是接受 StructType 作为第二个参数的签名(对于 Scala)

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Creates a DataFrame from an RDD containing Rows using the given schema.

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

使用给定的模式从包含行的 RDD 创建一个 DataFrame。

So it accepts as 1st argument a RDD[Row]. What you have in rowRDDis a RDD[Array[String]]so there is a mismatch.

所以它接受第一个参数 a RDD[Row]。你有什么rowRDDRDD[Array[String]]如此不匹配。

Do you need an RDD[Array[String]]?

你需要RDD[Array[String]]吗?

Otherwise you can use the following to create your dataframe:

否则,您可以使用以下内容来创建您的数据框:

val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))