scala 如何从 Spark 中的文本文件创建 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36766322/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:11:54  来源:igfitidea点击:

How to create a DataFrame from a text file in Spark

scalaapache-sparkdataframeapache-spark-sqlrdd

提问by Rahul

I have a text file on HDFS and I want to convert it to a Data Frame in Spark.

我在 HDFS 上有一个文本文件,我想将它转换为 Spark 中的数据帧。

I am using the Spark Context to load the file and then try to generate individual columns from that file.

我正在使用 Spark Context 加载文件,然后尝试从该文件生成各个列。

val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))

After doing this, I am trying the following operation.

这样做之后,我正在尝试以下操作。

myFile1.toDF()

I am getting an issues since the elements in myFile1 RDD are now array type.

我遇到了一个问题,因为 myFile1 RDD 中的元素现在是数组类型。

How can I solve this issue?

我该如何解决这个问题?

回答by Tzach Zohar

Update- as of Spark 1.6, you can simply use the built-in csv data source:

更新- 从Spark 1.6 开始,您可以简单地使用内置的 csv 数据源:

spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")

You can also use various options to control the CSV parsing, e.g.:

您还可以使用各种选项来控制 CSV 解析,例如:

val df = spark.read.option("header", "false").csv("file.txt")

For Spark version < 1.6: The easiest way is to use spark-csv- include it in your dependencies and follow the README, it allows setting a custom delimiter (;), can read CSV headers (if you have them), and it can infer the schema types(with the cost of an extra scan of the data).

对于 Spark 版本 < 1.6:最简单的方法是使用spark-csv- 将其包含在您的依赖项中并遵循自述文件,它允许设置自定义分隔符 ( ;),可以读取 CSV 标头(如果有的话),并且可以推断模式类型(以额外扫描数据为代价)。

Alternatively, if you know the schema you can create a case-class that represents it and map your RDD elements into instances of this class before transforming into a DataFrame, e.g.:

另外,如果你知道的模式,你可以创建一个案例类,它表示它并转化成数据帧,例如在你RDD元素融入到这个类的实例映射:

case class Record(id: Int, name: String)

val myFile1 = myFile.map(x=>x.split(";")).map {
  case Array(id, name) => Record(id.toInt, name)
} 

myFile1.toDF() // DataFrame will have columns "id" and "name"

回答by Vikas Singh

I have given different ways to create DataFrame from text file

我给出了从文本文件创建 DataFrame 的不同方法

val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)

raw text file

原始文本文件

val file = sc.textFile("C:\vikas\spark\Interview\text.txt")
val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) => 
(a,b.toInt,c)}.toDF("name","age","city")
fileToDf.foreach(println(_))

spark session without schema

没有模式的火花会话

import org.apache.spark.sql.SparkSession
val sparkSess = 
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()

val df = sparkSess.read.option("header", 
"false").csv("C:\vikas\spark\Interview\text.txt")
df.show()

spark session with schema

使用模式触发会话

import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, 
StringType, nullable=true))
val schema = StructType(fields)

val dfWithSchema = sparkSess.read.option("header", 
"false").schema(schema).csv("C:\vikas\spark\Interview\text.txt")
dfWithSchema.show()

using sql context

使用 sql 上下文

import org.apache.spark.sql.SQLContext

val fileRdd = 
sc.textFile("C:\vikas\spark\Interview\text.txt").map(_.split(",")).map{x 
=> org.apache.spark.sql.Row(x:_*)}
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()

回答by mgaido

If you want to use the toDFmethod, you have to convert your RDDof Array[String]into a RDDof a case class. For example, you have to do:

如果你要使用的toDF方法,你有你的转换RDDArray[String]RDD的情况下类。例如,您必须执行以下操作:

case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

回答by Abhijit

You will not able to convert it into data frame until you use implicit conversion.

在使用隐式转换之前,您将无法将其转换为数据框。

val sqlContext = new SqlContext(new SparkContext())

import sqlContext.implicits._

After this only you can convert this to data frame

只有在此之后,您才能将其转换为数据框

case class Test(id:String,filed2:String)

val myFile = sc.textFile("file.txt")

val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

回答by Vishal

val df = spark.read.textFile("abc.txt")

case class Abc (amount:Int, types: String, id:Int)  //columns and data types

val df2 = df.map(rec=>Amount(rec(0).toInt, rec(1), rec(2).toInt))
rdd2.printSchema


root
 |-- amount: integer (nullable = true)
 |-- types: string (nullable = true)
 |-- id: integer (nullable = true)

回答by Ankita

I know I am quite late to answer this but I have come up with a different answer:

我知道我回答这个问题已经很晚了,但我想出了一个不同的答案:

val rdd = sc.textFile("/home/training/mydata/file.txt")

val text = rdd.map(lines=lines.split(",")).map(arrays=>(ararys(0),arrays(1))).toDF("id","name").show 

回答by Ankita

You can read a file to have an RDD and then assign schema to it. Two common ways to creating schema are either using a case class or a Schema object [my preferred one]. Follows the quick snippets of code that you may use.

您可以读取文件以获得 RDD,然后为其分配架构。创建模式的两种常见方法是使用案例类或模式对象 [我的首选]。遵循您可能会使用的快速代码片段。

Case Class approach

案例类方法

case class Test(id:String,name:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

Schema Approach

模式方法

import org.apache.spark.sql.types._
val schemaString = "id name"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)

val dfWithSchema = sparkSess.read.option("header","false").schema(schema).csv("file.txt")
dfWithSchema.show()

The second one is my preferred approach since case class has a limitation of max 22 fields and this will be a problem if your file has more than 22 fields!

第二个是我的首选方法,因为案例类有最多 22 个字段的限制,如果您的文件超过 22 个字段,这将是一个问题!