将 RDD[String] 转换为 RDD[Row] 到 Dataframe Spark Scala

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41898144/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:02:44  来源:igfitidea点击:

Convert RDD[String] to RDD[Row] to Dataframe Spark Scala

scalahadoopapache-sparkdataframespark-dataframe

提问by Defcon

I am reading in a file that has many spaces and need to filter out the space. Afterwards we need to convert it to a dataframe. Example input below.

我正在读一个有很多空格的文件,需要过滤掉这些空格。之后我们需要将其转换为数据帧。下面的示例输入。

2017123 |     |10|running|00000|111|-EXAMPLE

My solution to this was the following function which parses out all spaces and trims the file.

我对此的解决方案是以下函数,它解析出所有空格并修剪文件。

def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}

However, I am not sure how to get it into a dataframe. sc.textFilereturns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.

但是,我不确定如何将其放入数据框中。sc.textFile返回一个RDD[String]. 我尝试了案例类的方式,但问题是我们有 800 个字段架构,案例类不能超过 22。

I was thinking of somehow converting RDD[String] to RDD[Row] so I can use the createDataFramefunction.

我正在考虑以某种方式将 RDD[String] 转换为 RDD[Row] 以便我可以使用该createDataFrame函数。

val DF = spark.createDataFrame(rowRDD, schema)

Any suggestions on how to do this?

关于如何做到这一点的任何建议?

回答by puhlen

First split/parse your strings into the fields.

首先将您的字符串拆分/解析为字段。

rdd.map( line => parse(line))where parse is some parsing function. It could be as simple as split but you may want something more robust. This will get you an RDD[Array[String]]or similar.

rdd.map( line => parse(line))其中 parse 是一些解析函数。它可以像拆分一样简单,但您可能想要更强大的东西。这会给你一个RDD[Array[String]]或类似的。

You can then convert to an RDD[Row]with rdd.map(a => Row.fromSeq(a))

然后您可以转换为RDD[Row]withrdd.map(a => Row.fromSeq(a))

From there you can convert to DataFrame wising sqlContext.createDataFrame(rdd, schema)where rdd is your RDD[Row]and schema is your schema StructType.

从那里你可以转换为 DataFrame wise sqlContext.createDataFrame(rdd, schema),其中 rdd 是你的RDD[Row],schema 是你的架构 StructType。

回答by Ram Ghadiyaram

In your case simple way :

在你的情况下,简单的方法:

val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))
val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))

How to solve productarityissue if you are using scala 2.10 ?

productarity如果您使用的是 Scala 2.10,如何解决问题?

However, I am not sure how to get it into a dataframe. sc.textFilereturns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.

但是,我不确定如何将其放入数据框中。sc.textFile返回一个 RDD[String]。我尝试了案例类的方式,但问题是我们有 800 个字段架构,案例类不能超过 22。

Yes, There are some limitations like productaritybut we can overcome... you can do like below example for < versions 2.11 :

是的,有一些限制,productarity但我们可以克服......你可以像下面的 < 版本 2.11 示例一样:

prepare a case class which extends Productand overrides methods.

准备一个extends Product覆盖方法的案例类。

like...

喜欢...

  • productArity():Int:This returns the size of the attributes. In our case, it's 33. So, our implementation looks like this:

  • productElement(n:Int):Any:Given an index, this returns the attribute. As protection, we also have a default case, which throws an IndexOutOfBoundsExceptionexception:

  • canEqual (that:Any):Boolean: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against class:

  • productArity():Int:这将返回属性的大小。在我们的例子中,它是 33。所以,我们的实现看起来像这样:

  • productElement(n:Int):Any:给定一个索引,这将返回属性。作为保护,我们还有一个默认情况,它会引发IndexOutOfBoundsException异常:

  • canEqual (that:Any):Boolean:这是三个函数中的最后一个,当对类进行相等检查时,它用作边界条件: