scala Apache Spark 使用管道分隔的 CSV 文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34246499/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:52:10  来源:igfitidea点击:

Apache Spark working with pipe delimited CSV files

scalaapache-sparkapache-spark-sql

提问by Edward

I am very new to Apache Spark and am trying to use SchemaRDD with my pipe delimited text file. I have a standalone installation of Spark 1.5.2 on my Mac using Scala 10. I have a CSV file with the following representative data and I am trying to split the following into 4 different files based on the first value (column) of the record. I would very much appreciate any help I can get with this.

我对 Apache Spark 非常陌生,并且正在尝试将 SchemaRDD 与我的管道分隔文本文件一起使用。我在 Mac 上使用 Scala 10 独立安装了 Spark 1.5.2。我有一个包含以下代表性数据的 CSV 文件,我试图根据记录的第一个值(列)将以下内容拆分为 4 个不同的文件. 我非常感谢我能得到的任何帮助。

1|1.8|20140801T081137|115810740
2|20140714T060000|335|22159892|3657|0.00|||181
2|20140714T061500|335|22159892|3657|0.00|||157
2|20140714T063000|335|22159892|3657|0.00|||156
2|20140714T064500|335|22159892|3657|0.00|||66
2|20140714T070000|335|22159892|3657|0.01|||633
2|20140714T071500|335|22159892|3657|0.01|||1087
3|34|Starz
3|35|VH1
3|36|CSPAN: Cable Satellite Public Affairs Network
3|37|Encore
3|278|CMT: Country Music Television
3|281|Telehit
4|625363|1852400|Matlock|9212|The Divorce
4|625719|1852400|Matlock|16|The Rat Pack
4|625849|1846952|Smallville|43|Calling

回答by KrisP

Note: Your csv file does not have the same number of fields in each row - this cannot be parsed as is into a DataFrame. (SchemaRDD has been renamed to DataFrame.) Here is something you can do if your csv file were well-formed:

注意:您的 csv 文件每行中的字段数不同 - 这无法按原样解析为 DataFrame。(SchemaRDD 已重命名为 DataFrame。)如果您的 csv 文件格式正确,您可以执行以下操作:

launch spark-shell or spark-submit with --packages com.databricks:spark-csv_2.10:1.3.0 in order to parse csv files easily (see here). In Scala, your code would be, assuming your csv file has a header - if yes, it is easier to refer to columns:

使用 --packages com.databricks:spark-csv_2.10:1.3.0 启动 spark-shell 或 spark-submit,以便轻松解析 csv 文件(请参阅此处)。在 Scala 中,您的代码将是,假设您的 csv 文件有一个标题 - 如果是,则更容易引用列:

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", '|').load("/path/to/file.csv")
// assume 1st column has name col1
val df1 = df.filter( df("col1") === 1)  // 1st DataFrame
val df2 = df.filter( df("col1") === 2)  // 2nd DataFrame  etc... 

Since your file is not well formed, you would have to parse each of the different lines differently, so for example, do the following:

由于您的文件格式不正确,您必须以不同方式解析每一行,例如,执行以下操作:

val lines = sc.textFile("/path/to/file.csv")

case class RowRecord1( col1:Int, col2:Double, col3:String, col4:Int)
def parseRowRecord1( arr:Array[String]) = RowRecord1( arr(0).toInt, arr(1).toDouble, arr(2), arr(3).toInt)

case class RowRecord2( col1:Int, col2:String, col3:Int, col4:Int, col5:Int, col6:Double, col7:Int)
def parseRowRecord2( arr:Array[String]) = RowRecord2( arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toDouble, arr(8).toInt)

val df1 = lines.filter(_.startsWith("1")).map( _.split('|')).map( arr => parseRowRecord1( arr )).toDF
val df2 = lines.filter(_.startsWith("2")).map( _.split('|')).map( arr => parseRowRecord2( arr )).toDF

回答by Andrew Rowlands

In PySpark, the command is:

在 PySpark 中,命令是:

df = spark.read.csv("filepath", sep="|")