scala 使用 spark-csv 编写单个 CSV 文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31674530/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:24:28  来源:igfitidea点击:

Write single CSV file using spark-csv

scalacsvapache-sparkspark-csv

提问by user1735076

I am using https://github.com/databricks/spark-csv, I am trying to write a single CSV, but not able to, it is making a folder.

我正在使用https://github.com/databricks/spark-csv,我正在尝试编写一个 CSV,但不能,它正在创建一个文件夹。

Need a Scala function which will take parameter like path and file name and write that CSV file.

需要一个 Scala 函数,它将采用路径和文件名等参数并写入该 CSV 文件。

回答by zero323

It is creating a folder with multiple files, because each partition is saved individually. If you need a single output file (still in a folder) you can repartition(preferred if upstream data is large, but requires a shuffle):

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果您需要单个输出文件(仍在文件夹中),您可以repartition(如果上游数据很大,则首选,但需要洗牌):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

or coalesce:

coalesce

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

data frame before saving:

保存前的数据框:

All data will be written to mydata.csv/part-00000. Before you use this option be sure you understand what is going on and what is the cost of transferring all data to a single worker. If you use distributed file system with replication, data will be transfered multiple times - first fetched to a single worker and subsequently distributed over storage nodes.

所有数据都将写入mydata.csv/part-00000. 在您使用此选项之前,请确保您了解正在发生的事情以及将所有数据传输到单个工作人员的成本是多少。如果您将分布式文件系统与复制一起使用,则数据将被多次传输——首先提取到单个工作程序,然后分布在存储节点上。

Alternatively you can leave your code as it is and use general purpose tools like cator HDFS getmergeto simply merge all the parts afterwards.

或者您可以留下您的代码,因为它是和使用像通用工具catHDFSgetmerge简单地合并之后的所有部件。

回答by Minkymorgan

If you are running Spark with HDFS, I've been solving the problem by writing csv files normally and leveraging HDFS to do the merging. I'm doing that in Spark (1.6) directly:

如果您使用 HDFS 运行 Spark,我一直在通过正常写入 csv 文件并利用 HDFS 进行合并来解决问题。我直接在 Spark (1.6) 中这样做:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Can't remember where I learned this trick, but it might work for you.

不记得我在哪里学到的这个技巧,但它可能对你有用。

回答by etspaceman

I might be a little late to the game here, but using coalesce(1)or repartition(1)may work for small data-sets, but large data-sets would all be thrown into one partition on one node. This is likely to throw OOM errors, or at best, to process slowly.

我在这里玩游戏可能有点晚了,但是使用coalesce(1)repartition(1)可能适用于小型数据集,但大型数据集将全部放入一个节点上的一个分区中。这很可能会引发 OOM 错误,或者充其量是处理缓慢。

I would highly suggest that you use the FileUtil.copyMerge()function from the Hadoop API. This will merge the outputs into a single file.

我强烈建议您使用FileUtil.copyMerge()Hadoop API 中的函数。这会将输出合并到一个文件中。

EDIT- This effectively brings the data to the driver rather than an executor node. Coalesce()would be fine if a single executor has more RAM for use than the driver.

编辑- 这有效地将数据带到驱动程序而不是执行程序节点。Coalesce()如果单个 executor 的 RAM 比驱动程序多,那就没问题了。

EDIT 2: copyMerge()is being removed in Hadoop 3.0. See the following stack overflow article for more information on how to work with the newest version: How to do CopyMerge in Hadoop 3.0?

编辑 2copyMerge()正在 Hadoop 3.0 中删除。有关如何使用最新版本的更多信息,请参阅以下堆栈溢出文章:如何在 Hadoop 3.0 中执行 CopyMerge?

回答by JosiahYoder-deactive except..

If you are using Databricks and can fit all the data into RAM on one worker (and thus can use .coalesce(1)), you can use dbfs to find and move the resulting CSV file:

如果您正在使用 Databricks 并且可以将所有数据放入一个 worker 的 RAM 中(因此可以使用.coalesce(1)),您可以使用 dbfs 来查找和移动生成的 CSV 文件:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

If your file does not fit into RAM on the worker, you may want to consider chaotic3quilibrium's suggestion to use FileUtils.copyMerge(). I have not done this, and don't yet know if is possible or not, e.g., on S3.

如果您的文件不适合工作人员的 RAM,您可能需要考虑 chaotic3quilibrium 的建议使用 FileUtils.copyMerge()。我还没有这样做,也不知道是否可行,例如,在 S3 上。

This answer is built on previous answers to this question as well as my own tests of the provided code snippet. I originally posted it to Databricksand am republishing it here.

此答案建立在此问题的先前答案以及我自己对所提供代码片段的测试之上。我最初将其发布到 Databricks并在此处重新发布。

The best documentation for dbfs's rm's recursive option I have found is on a Databricks forum.

我发现的关于 dbfs 的 rm 递归选项的最佳文档是在Databricks 论坛上

回答by John Zhu

A solution that works for S3 modified from Minkymorgan.

适用于从 Minkymorgan 修改的 S3 的解决方案。

Simply pass the temporary partitioned directory path (with different name than final path) as the srcPathand single final csv/txt as destPathSpecify also deleteSourceif you want to remove the original directory.

如果您想删除原始目录,只需将临时分区目录路径(名称与最终路径不同)作为srcPath单个最终 csv/txt 作为destPath指定deleteSource

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}

回答by pprasad009

spark's df.write()API will create multiple part files inside given path ... to force spark write only a single part file use df.coalesce(1).write.csv(...)instead of df.repartition(1).write.csv(...)as coalesce is a narrow transformation whereas repartition is a wide transformation see Spark - repartition() vs coalesce()

spark 的df.write()API 将在给定的路径中创建多个部分文件......强制 spark 只写入一个部分文件使用df.coalesce(1).write.csv(...)而不是df.repartition(1).write.csv(...)作为合并是一个狭窄的转换,而重新分区是一个广泛的转换,请参阅Spark - repartition() 与 coalesce()

df.coalesce(1).write.csv(filepath,header=True) 

will create folder in given filepath with one part-0001-...-c000.csvfile use

将使用一个part-0001-...-c000.csv文件在给定的文件路径中创建文件夹

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

to have a user friendly filename

有一个用户友好的文件名

回答by Arnon Rotem-Gal-Oz

repartition/coalesce to 1 partition before you save (you'd still get a folder but it would have one part file in it)

在保存之前重新分区/合并到 1 个分区(您仍然会得到一个文件夹,但其中会有一个部分文件)

回答by Gourav

you can use rdd.coalesce(1, true).saveAsTextFile(path)

您可以使用 rdd.coalesce(1, true).saveAsTextFile(path)

it will store data as singile file in path/part-00000

它将数据作为单个文件存储在 path/part-00000

回答by sri hari kali charan Tummala

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

I solved using below approach (hdfs rename file name):-

我使用以下方法解决了(hdfs重命名文件名):-

Step 1:- (Crate Data Frame and write to HDFS)

第 1 步:-(创建数据帧并写入 HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Step 2:- (Create Hadoop Config)

第 2 步:-(创建 Hadoop 配置)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Step3 :- (Get path in hdfs folder path)

Step3 :- (在hdfs文件夹路径中获取路径)

val pathFiles = new Path("/hdfsfolder/blah/")

Step4:- (Get spark file names from hdfs folder)

Step4:- (从hdfs文件夹中获取spark文件名)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5:- (create scala mutable list to save all the file names and add it to the list)

setp5:-(创建scala可变列表以保存所有文件名并将其添加到列表中)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Step 6:- (filter _SUCESS file order from file names scala list)

第 6 步:-(从文件名 scala 列表中过滤 _SUCESS 文件顺序)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

step 7:- (convert scala list to string and add desired file name to hdfs folder string and then apply rename)

第 7 步:-(将 Scala 列表转换为字符串并将所需的文件名添加到 hdfs 文件夹字符串,然后应用重命名)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)

回答by Kees C. Bakker

I'm using this in Python to get a single file:

我在 Python 中使用它来获取单个文件:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)