将 RDD[String] 转换为 RDD[Row] 到 Dataframe Spark Scala
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/41898144/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Convert RDD[String] to RDD[Row] to Dataframe Spark Scala
提问by Defcon
I am reading in a file that has many spaces and need to filter out the space. Afterwards we need to convert it to a dataframe. Example input below.
我正在读一个有很多空格的文件,需要过滤掉这些空格。之后我们需要将其转换为数据帧。下面的示例输入。
2017123 |     |10|running|00000|111|-EXAMPLE
My solution to this was the following function which parses out all spaces and trims the file.
我对此的解决方案是以下函数,它解析出所有空格并修剪文件。
def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}
However, I am not sure how to get it into a dataframe. sc.textFilereturns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.
但是,我不确定如何将其放入数据框中。sc.textFile返回一个RDD[String]. 我尝试了案例类的方式,但问题是我们有 800 个字段架构,案例类不能超过 22。
I was thinking of somehow converting RDD[String] to RDD[Row] so I can use the createDataFramefunction. 
我正在考虑以某种方式将 RDD[String] 转换为 RDD[Row] 以便我可以使用该createDataFrame函数。
val DF = spark.createDataFrame(rowRDD, schema)
Any suggestions on how to do this?
关于如何做到这一点的任何建议?
回答by puhlen
First split/parse your strings into the fields.
首先将您的字符串拆分/解析为字段。
rdd.map( line => parse(line))where parse is some parsing function.  It could be as simple as split but you may want something more robust.  This will get you an RDD[Array[String]]or similar.
rdd.map( line => parse(line))其中 parse 是一些解析函数。它可以像拆分一样简单,但您可能想要更强大的东西。这会给你一个RDD[Array[String]]或类似的。
You can then convert to an RDD[Row]with rdd.map(a => Row.fromSeq(a))
然后您可以转换为RDD[Row]withrdd.map(a => Row.fromSeq(a))
From there you can convert to DataFrame wising sqlContext.createDataFrame(rdd, schema)where rdd is your RDD[Row]and schema is your schema StructType.
从那里你可以转换为 DataFrame wise sqlContext.createDataFrame(rdd, schema),其中 rdd 是你的RDD[Row],schema 是你的架构 StructType。
回答by Ram Ghadiyaram
In your case simple way :
在你的情况下,简单的方法:
val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))
val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))
How to solve productarityissue if you are using scala 2.10 ?
productarity如果您使用的是 Scala 2.10,如何解决问题?
However, I am not sure how to get it into a dataframe.
sc.textFilereturns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.
但是,我不确定如何将其放入数据框中。
sc.textFile返回一个 RDD[String]。我尝试了案例类的方式,但问题是我们有 800 个字段架构,案例类不能超过 22。
Yes, There are some limitations like productaritybut we can overcome... 
you can do like below example for < versions 2.11 :
是的,有一些限制,productarity但我们可以克服......你可以像下面的 < 版本 2.11 示例一样:
prepare a case class which extends Productand overrides methods.
准备一个extends Product覆盖方法的案例类。
like...
喜欢...
productArity():Int:This returns the size of the attributes. In our case, it's 33. So, our implementation looks like this:productElement(n:Int):Any:Given an index, this returns the attribute. As protection, we also have a default case, which throws anIndexOutOfBoundsExceptionexception:canEqual (that:Any):Boolean: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against class:
productArity():Int:这将返回属性的大小。在我们的例子中,它是 33。所以,我们的实现看起来像这样:productElement(n:Int):Any:给定一个索引,这将返回属性。作为保护,我们还有一个默认情况,它会引发IndexOutOfBoundsException异常:canEqual (that:Any):Boolean:这是三个函数中的最后一个,当对类进行相等检查时,它用作边界条件:
- Example implementation you can refer this Student case class which has 33 fields in it
 - Example student dataset description here
 
- 示例实现您可以参考这个包含 33 个字段的学生案例类
 - 示例学生数据集描述在这里
 

