scala 如何在磁盘上将 spark DataFrame 保存为 csv?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33174443/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:43:13  来源:igfitidea点击:

How to save a spark DataFrame as csv on disk?

scalaapache-sparkapache-spark-sql

提问by Hello lad

For example, the result of this:

例如,这样做的结果:

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

would return an Array.

将返回一个数组。

How to save a spark DataFrame as a csv file on disk ?

如何将 spark DataFrame 保存为磁盘上的 csv 文件?

回答by eliasah

Apache Spark does not support native CSV output on disk.

Apache Spark 不支持磁盘上的本机 CSV 输出。

You have four available solutions though:

不过,您有四种可用的解决方案:

  1. You can convert your Dataframe into an RDD :

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    This will create a folder filepath. Under the file path, you'll find partitions files (e.g part-000*)

    What I usually do if I want to append all the partitions into a big CSV is

    cat filePath/part* > mycsvfile.csv
    

    Some will use coalesce(1,false)to create one partition from the RDD. It's usually a bad practice, since it may overwhelm the driver by pulling all the data you are collecting to it.

    Note that df.rddwill return an RDD[Row].

  2. With Spark <2, you can use databricks spark-csv library:

    • Spark 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • Spark 1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. With Spark 2.xthe spark-csvpackage is not needed as it's included in Spark.

    df.write.format("csv").save(filepath)
    
  4. You can convert to local Pandas data frame and use to_csvmethod (PySpark only).

  1. 您可以将 Dataframe 转换为 RDD :

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    这将创建一个文件夹文件路径。在文件路径下,您会找到分区文件(例如 part-000*)

    如果我想将所有分区附加到一个大的 CSV 中,我通常会做的是

    cat filePath/part* > mycsvfile.csv
    

    有些将使用coalesce(1,false)从 RDD 创建一个分区。这通常是一种不好的做法,因为它可能会将您收集的所有数据都拉到驱动程序中,从而使驱动程序不堪重负。

    请注意,这df.rdd将返回一个RDD[Row].

  2. 使用Spark <2,您可以使用 databricks spark-csv

    • 火花 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • 火花1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. Spark 2.x 中spark-csv不需要该包,因为它包含在 Spark 中。

    df.write.format("csv").save(filepath)
    
  4. 您可以转换为本地 Pandas 数据框和使用to_csv方法(仅限 PySpark)。

Note:Solutions 1, 2 and 3 will result in CSV format files (part-*) generated by the underlying Hadoop API that Spark calls when you invoke save. You will have one part-file per partition.

注意:解决方案 1、2 和 3 将导致part-*Spark 在调用save. part-每个分区将有一个文件。

回答by Erkan ?irin

Writing dataframe to disk as csv is similar read from csv. If you want your result as one file, you can use coalesce.

将数据帧作为 csv 写入磁盘与从 csv 读取类似。如果您希望将结果作为一个文件,您可以使用 coalesce。

df.coalesce(1)
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("output/path")

If your result is an array you should use language specific solution, not spark dataframe api. Because all these kind of results return driver machine.

如果你的结果是一个数组,你应该使用特定于语言的解决方案,而不是 spark dataframe api。因为所有这些结果都返回驱动程序机器。

回答by Ajk

I had similar problem. I needed to write down csv file on driver while I was connect to cluster in client mode.

我有类似的问题。当我以客户端模式连接到集群时,我需要在驱动程序上写下 csv 文件。

I wanted to reuse the same CSV parsing code as Apache Spark to avoid potential errors.

我想重用与 Apache Spark 相同的 CSV 解析代码以避免潜在的错误。

I checked spark-csv code and found code responsible for converting dataframe into raw csv RDD[String]in com.databricks.spark.csv.CsvSchemaRDD.

我检查了火花CSV代码,发现负责将数据帧为原始CSV代码RDD[String]com.databricks.spark.csv.CsvSchemaRDD

Sadly it is hardcoded with sc.textFileand the end of relevant method.

可悲的是,它被硬编码sc.textFile并结束了相关方法。

I copy-pasted that code and removed last lines with sc.textFileand returned RDD directly instead.

我复制粘贴了该代码并删除了最后一行sc.textFile并直接返回了 RDD。

My code:

我的代码:

/*
  This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
  Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
  But in last lines of that method it's hardcoded against writing as text file -
  for our case we need RDD.
 */
object DataframeToRawCsvRDD {

  val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat

  def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
           (implicit ctx: ExecutionContext): RDD[String] = {
    val delimiter = parameters.getOrElse("delimiter", ",")
    val delimiterChar = if (delimiter.length == 1) {
      delimiter.charAt(0)
    } else {
      throw new Exception("Delimiter cannot be more than one character.")
    }

    val escape = parameters.getOrElse("escape", null)
    val escapeChar: Character = if (escape == null) {
      null
    } else if (escape.length == 1) {
      escape.charAt(0)
    } else {
      throw new Exception("Escape character cannot be more than one character.")
    }

    val quote = parameters.getOrElse("quote", "\"")
    val quoteChar: Character = if (quote == null) {
      null
    } else if (quote.length == 1) {
      quote.charAt(0)
    } else {
      throw new Exception("Quotation cannot be more than one character.")
    }

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
    val quoteMode: QuoteMode = if (quoteModeString == null) {
      null
    } else {
      QuoteMode.valueOf(quoteModeString.toUpperCase)
    }

    val nullValue = parameters.getOrElse("nullValue", "null")

    val csvFormat = defaultCsvFormat
      .withDelimiter(delimiterChar)
      .withQuote(quoteChar)
      .withEscape(escapeChar)
      .withQuoteMode(quoteMode)
      .withSkipHeaderRecord(false)
      .withNullString(nullValue)

    val generateHeader = parameters.getOrElse("header", "false").toBoolean
    val headerRdd = if (generateHeader) {
      ctx.sparkContext.parallelize(Seq(
        csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
      ))
    } else {
      ctx.sparkContext.emptyRDD[String]
    }

    val rowsRdd = dataFrame.rdd.map(row => {
      csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
    })

    headerRdd union rowsRdd
  }

}

回答by Jai Prakash

I had similar issue where i had to save the contents of the dataframe to a csv file of name which i defined. df.write("csv").save("<my-path>")was creating directory than file. So have to come up with the following solutions. Most of the code is taken from the following dataframe-to-csvwith little modifications to the logic.

我有类似的问题,我必须将数据帧的内容保存到我定义的名称的 csv 文件中。df.write("csv").save("<my-path>")正在创建目录而不是文件。所以不得不想出以下解决办法。大部分代码取自以下dataframe-to-csv,对逻辑几乎没有修改。

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpParquetDir = "Posts.tmp.parquet"

    df.repartition(1).write.
        format("com.databricks.spark.csv").
        option("header", header.toString).
        option("delimiter", sep).
        save(tmpParquetDir)

    val dir = new File(tmpParquetDir)
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
    }