scala 如何在spark中将rdd对象转换为数据帧
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29383578/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to convert rdd object to dataframe in spark
提问by user568109
How can I convert an RDD (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) to a Dataframe org.apache.spark.sql.DataFrame. I converted a dataframe to rdd using .rdd. After processing it I want it back in dataframe. How can I do this ?
如何将 RDD( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]) 转换为 Dataframe org.apache.spark.sql.DataFrame。我使用 .rdd 将数据帧转换为 rdd .rdd。处理后我希望它回到数据帧中。我怎样才能做到这一点 ?
采纳答案by The Archetypal Paul
SqlContexthas a number of createDataFramemethods that create a DataFramegiven an RDD. I imagine one of these will work for your context.
SqlContext有许多createDataFrame方法可以创建DataFrame给定的RDD. 我想其中之一将适用于您的上下文。
For example:
例如:
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Creates a DataFrame from an RDD containing Rows using the given schema.
使用给定的模式从包含行的 RDD 创建一个 DataFrame。
回答by mrsrinivas
This code works perfectly from Spark 2.x with Scala 2.11
此代码从Spark 2.x 和 Scala 2.11完美运行
Import necessary classes
导入必要的类
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
Create SparkSessionObject, and Here it's spark
创建SparkSession对象,这里是spark
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext // Just used to create test RDDs
Let's an RDDto make it DataFrame
让我们RDD来实现它DataFrame
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
Method 1
方法一
Using SparkSession.createDataFrame(RDD obj).
使用SparkSession.createDataFrame(RDD obj).
val dfWithoutSchema = spark.createDataFrame(rdd)
dfWithoutSchema.show()
+------+--------------------+
| _1| _2|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
| test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+
Method 2
方法二
Using SparkSession.createDataFrame(RDD obj)and specifying column names.
使用SparkSession.createDataFrame(RDD obj)和指定列名。
val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals")
dfWithSchema.show()
+------+--------------------+
| id| vals|
+------+--------------------+
| first|[2.0, 1.0, 2.1, 5.4]|
| test|[1.5, 0.5, 0.9, 3.7]|
|choose|[8.0, 2.9, 9.1, 2.5]|
+------+--------------------+
Method 3 (Actual answer to the question)
方法三(实际答题)
This way requires the input rddshould be of type RDD[Row].
这种方式要求输入rdd应该是类型RDD[Row]。
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
create the schema
创建架构
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
Now apply both rowsRddand schemato createDataFrame()
现在申请都rowsRdd和schema到createDataFrame()
val df = spark.createDataFrame(rowsRdd, schema)
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
回答by dtjones
Assuming your RDD[row] is called rdd, you can use:
假设你的 RDD[row] 被称为 rdd,你可以使用:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
回答by Daniel de Paula
Note: This answer was originally posted here
注意:这个答案最初发布在这里
I am posting this answer because I would like to share additional details about the available options that I did not find in the other answers
我发布此答案是因为我想分享有关我在其他答案中未找到的可用选项的更多详细信息
To create a DataFrame from an RDD of Rows, there are two main options:
要从行的 RDD 创建 DataFrame,有两个主要选项:
1)As already pointed out, you could use toDF()which can be imported by import sqlContext.implicits._. However, this approach only works for the following types of RDDs:
1)正如已经指出的,您可以使用toDF()which 可以通过import sqlContext.implicits._. 但是,这种方法仅适用于以下类型的 RDD:
RDD[Int]RDD[Long]RDD[String]RDD[T <: scala.Product]
RDD[Int]RDD[Long]RDD[String]RDD[T <: scala.Product]
(source: Scaladocof the SQLContext.implicitsobject)
(来源:Scaladoc所述的SQLContext.implicits对象)
The last signature actually means that it can work for an RDD of tuples or an RDD of case classes (because tuples and case classes are subclasses of scala.Product).
最后一个签名实际上意味着它可以用于元组的 RDD 或案例类的 RDD(因为元组和案例类是 的子类scala.Product)。
So, to use this approach for an RDD[Row], you have to map it to an RDD[T <: scala.Product]. This can be done by mapping each row to a custom case class or to a tuple, as in the following code snippets:
因此,要将这种方法用于RDD[Row],您必须将其映射到RDD[T <: scala.Product]。这可以通过将每一行映射到自定义案例类或元组来完成,如下面的代码片段所示:
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
or
或者
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
The main drawback of this approach (in my opinion) is that you have to explicitly set the schema of the resulting DataFrame in the map function, column by column. Maybe this can be done programatically if you don't know the schema in advance, but things can get a little messy there. So, alternatively, there is another option:
这种方法的主要缺点(在我看来)是您必须在 map 函数中逐列显式设置生成的 DataFrame 的架构。如果您事先不知道架构,也许这可以通过编程方式完成,但那里的事情可能会变得有点混乱。因此,或者,还有另一种选择:
2)You can use createDataFrame(rowRDD: RDD[Row], schema: StructType)as in the accepted answer, which is available in the SQLContextobject. Example for converting an RDD of an old DataFrame:
2)您可以createDataFrame(rowRDD: RDD[Row], schema: StructType)在接受的答案中使用as 在SQLContext对象中可用。转换旧 DataFrame 的 RDD 的示例:
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
Note that there is no need to explicitly set any schema column. We reuse the old DF's schema, which is of StructTypeclass and can be easily extended. However, this approach sometimes is not possible, and in some cases can be less efficient than the first one.
请注意,无需显式设置任何架构列。我们重用了旧的 DF 模式,它是StructType一流的并且可以轻松扩展。但是,这种方法有时是不可能的,并且在某些情况下可能比第一种方法效率低。
回答by Ajay Gupta
Suppose you have a DataFrameand you want to do some modification on the fields data by converting it to RDD[Row].
假设您有一个DataFrame并且您想通过将其转换为RDD[Row].
val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head))
To convert back to DataFramefrom RDDwe need to define the structure typeof the RDD.
转换回DataFrame从RDD我们需要定义结构类型的RDD。
If the datatype was Longthen it will become as LongTypein structure.
如果数据类型是Long那么它将成为LongType结构。
If Stringthen StringTypein structure.
如果String再StringType在结构上。
val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true)))
Now you can convert the RDD to DataFrame using the createDataFramemethod.
现在您可以使用createDataFrame方法将 RDD 转换为 DataFrame 。
val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct)
回答by Rashmit Rathod
Here is a simple example of converting your List into Spark RDD and then converting that Spark RDD into Dataframe.
这是将您的列表转换为 Spark RDD,然后将该 Spark RDD 转换为 Dataframe 的简单示例。
Please note that I have used Spark-shell's scala REPL to execute following code, Here sc is an instance of SparkContext which is implicitly available in Spark-shell. Hope it answer your question.
请注意,我使用了 Spark-shell 的 scala REPL 来执行以下代码,这里的 sc 是 SparkContext 的一个实例,它在 Spark-shell 中隐式可用。希望它能回答你的问题。
scala> val numList = List(1,2,3,4,5)
numList: List[Int] = List(1, 2, 3, 4, 5)
scala> val numRDD = sc.parallelize(numList)
numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:28
scala> val numDF = numRDD.toDF
numDF: org.apache.spark.sql.DataFrame = [_1: int]
scala> numDF.show
+---+
| _1|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
+---+
回答by Aravind Krishnakumar
Method 1: (Scala)
方法一:(Scala)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z")
Method 2: (Scala)
方法二:(Scala)
case class temp(val1: String,val3 : Double)
val rdd = sc.parallelize(Seq(
Row("foo", 0.5), Row("bar", 0.0)
))
val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF()
rows.show()
Method 1: (Python)
方法一:(Python)
from pyspark.sql import Row
l = [('Alice',2)]
Person = Row('name','age')
rdd = sc.parallelize(l)
person = rdd.map(lambda r:Person(*r))
df2 = sqlContext.createDataFrame(person)
df2.show()
Method 2: (Python)
方法二:(Python)
from pyspark.sql.types import *
l = [('Alice',2)]
rdd = sc.parallelize(l)
schema = StructType([StructField ("name" , StringType(), True) ,
StructField("age" , IntegerType(), True)])
df3 = sqlContext.createDataFrame(rdd, schema)
df3.show()
Extracted the value from the row object and then applied the case class to convert rdd to DF
从行对象中提取值,然后应用案例类将 rdd 转换为 DF
val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" }
val temp2 = attrib2.map{case Row ( key: Int) => s"$key" }
case class RLT (id: String, attrib_1 : String, attrib_2 : String)
import hiveContext.implicits._
val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF
回答by ozzieisaacs
On newer versions of spark (2.0+)
在较新版本的 spark (2.0+) 上
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val spark = SparkSession
.builder()
.getOrCreate()
import spark.implicits._
val dfSchema = Seq("col1", "col2", "col3")
rdd.toDF(dfSchema: _*)
回答by teserecter
One needs to create a schema, and attach it to the Rdd.
Assuming val spark is a product of a SparkSession.builder...
假设 val spark 是 SparkSession.builder 的产物...
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/* Lets gin up some sample data:
* As RDD's and dataframes can have columns of differing types, lets make our
* sample data a three wide, two tall, rectangle of mixed types.
* A column of Strings, a column of Longs, and a column of Doubules
*/
val arrayOfArrayOfAnys = Array.ofDim[Any](2,3)
arrayOfArrayOfAnys(0)(0)="aString"
arrayOfArrayOfAnys(0)(1)=0L
arrayOfArrayOfAnys(0)(2)=3.14159
arrayOfArrayOfAnys(1)(0)="bString"
arrayOfArrayOfAnys(1)(1)=9876543210L
arrayOfArrayOfAnys(1)(2)=2.71828
/* The way to convert an anything which looks rectangular,
* (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to
* throw it into sparkContext.parallelize.
* http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows
* the parallelize definition as
* def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
* so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys.
* Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it.
*/
val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys)
/* We'll be using the sqlContext.createDataFrame to add a schema our RDD.
* The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have.
* To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq)
* As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type.
*/
val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=>
Row.fromSeq(f.toSeq)
)
/* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe.
* https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as
* case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
* Will leave the two default values in place for each of the columns:
* nullability as true,
* metadata as an empty Map[String,Any]
*
*/
val schema = StructType(
StructField("colOfStrings", StringType) ::
StructField("colOfLongs" , LongType ) ::
StructField("colOfDoubles", DoubleType) ::
Nil
)
val df=spark.sqlContext.createDataFrame(rddOfRows,schema)
/*
* +------------+----------+------------+
* |colOfStrings|colOfLongs|colOfDoubles|
* +------------+----------+------------+
* | aString| 0| 3.14159|
* | bString|9876543210| 2.71828|
* +------------+----------+------------+
*/
df.show
Same steps, but with fewer val declarations:
相同的步骤,但 val 声明更少:
val arrayOfArrayOfAnys=Array(
Array("aString",0L ,3.14159),
Array("bString",9876543210L,2.71828)
)
val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq))
/* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata:
* Consider constructing the schema from an Array[StructField]. This would allow looping over
* the columns, with a match statement applying the appropriate sql datatypes as the second
* StructField arguments.
*/
val sf=new Array[StructField](3)
sf(0)=StructField("colOfStrings",StringType)
sf(1)=StructField("colOfLongs" ,LongType )
sf(2)=StructField("colOfDoubles",DoubleType)
val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList))
df.show
回答by Priyanshu Singh
I tried to explain the solution using the word count problem. 1. Read the file using sc
我尝试使用字数问题来解释解决方案。1.使用sc读取文件
- Produce word count
Methods to create DF
- rdd.toDF method
- rdd.toDF("word","count")
- spark.createDataFrame(rdd,schema)
Read file using spark
val rdd=sc.textFile("D://cca175/data/")Rdd to Dataframe
val df=sc.textFile("D://cca175/data/").toDF("t1") df.show
Method 1
Create word count RDD to Dataframe
val df=rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>(x+y)).toDF("word","count")Method2
Create Dataframe from Rdd
val df=spark.createDataFrame(wordRdd) # with header val df=spark.createDataFrame(wordRdd).toDF("word","count") df.showMethod3
Define Schema
import org.apache.spark.sql.types._
val schema=new StructType(). add(StructField("word",StringType,true)). add(StructField("count",StringType,true))
Create RowRDD
import org.apache.spark.sql.Row val rowRdd=wordRdd.map(x=>(Row(x._1,x._2)))Create DataFrame from RDD with schema
val df=spark.createDataFrame(rowRdd,schema)
df.show
- 产生字数
创建 DF 的方法
- rdd.toDF 方法
- rdd.toDF("word","count")
- spark.createDataFrame(rdd,schema)
使用spark读取文件
val rdd=sc.textFile("D://cca175/data/")Rdd 到数据框
val df=sc.textFile("D://cca175/data/").toDF("t1") df.show
方法一
创建字数 RDD 到 Dataframe
val df=rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>(x+y)).toDF("word","count")方法二
从 Rdd 创建数据框
val df=spark.createDataFrame(wordRdd) # with header val df=spark.createDataFrame(wordRdd).toDF("word","count") df.show方法3
定义架构
导入 org.apache.spark.sql.types._
val schema=new StructType()。add(StructField("word",StringType,true))。添加(结构域(“计数”,字符串类型,真))
创建行RDD
import org.apache.spark.sql.Row val rowRdd=wordRdd.map(x=>(Row(x._1,x._2)))使用模式从 RDD 创建 DataFrame
val df=spark.createDataFrame(rowRdd,schema)
df.show

