scala 使用单个标题合并 Spark 输出 CSV 文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38056152/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:25:17  来源:igfitidea点击:

Merge Spark output CSV files with a single header

scalacsvhadoopapache-spark

提问by V. Samma

I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.

我想在 AWS 中创建一个数据处理管道,最终将处理后的数据用于机器学习。

I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learningtool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.

我有一个 Scala 脚本,它从 S3 获取原始数据,处理它并将其写入 HDFS 甚至 S3 与Spark-CSV。如果我想使用AWS 机器学习工具来训练预测模型,我想可以使用多个文件作为输入。但是如果我想使用其他东西,我认为最好是收到一个 CSV 输出文件。

Currently, as I do not want to use repartition(1)nor coalesce(1)for performance purposes, I have used hadoop fs -getmergefor manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headersin the data file for training the prediction model.

目前,由于我不想使用repartition(1)coalesce(1)来达到性能目的,因此我使用了hadoop fs -getmerge进行手动测试,但由于它只是合并了作业输出文件的内容,因此我遇到了一个小问题。我需要数据文件中的一行标题来训练预测模型。

If I use .option("header","true")for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

如果我.option("header","true")用于 spark-csv,那么它将标题写入每个输出文件,合并后我在数据中的标题行与输出文件的行数一样多。但是如果 header 选项为 false,那么它不会添加任何标题。

Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shellwith the code below.

现在我找到了将 Scala 脚本中的文件与 Hadoop API 合并的选项FileUtil.copyMerge。我spark-shell用下面的代码尝试了这个。

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?

但是这个解决方案仍然只是将文件相互连接起来并且不处理标题。如何获得只有一行标题的输出文件?

I even tried adding df.columns.mkString(",")as the last argument for copyMerge, but this added the headers still multiple times, not once.

我什至尝试添加df.columns.mkString(",")作为 的最后一个参数copyMerge,但这仍然多次添加标题,而不是一次。

回答by Kang

you can walk around like this.

你可以像这样四处走动。

  • 1.Create a new DataFrame(headerDF) containing header names.
  • 2.Union it with the DataFrame(dataDF) containing the data.
  • 3.Output the union-ed DataFrame to disk with option("header", "false").
  • 4.merge partition files(part-0000**0.csv) using hadoop FileUtil
  • 1.创建一个包含标题名称的新 DataFrame(headerDF)。
  • 2.将它与包含数据的 DataFrame(dataDF) 联合起来。
  • 3.使用option("header", "false") 将合并后的 DataFrame 输出到磁盘。
  • 4.使用hadoop FileUtil合并分区文件(part-0000**0.csv)

In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

通过这种方式,除了单个分区的内容具有来自 headerDF 的一行标题名称之外,所有分区都没有标题。当所有分区合并在一起时,文件顶部有一个标题。示例代码如下

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)

回答by Sam Jacob

  1. Output the header using dataframe.schema ( val header = dataDF.schema.fieldNames.reduce(_ + "," + _))
  2. create a file with the header on dsefs
  3. append all the partition files (headerless) to the file in #2 using hadoop Filesystem API
  1. 使用 dataframe.schema ( val header = dataDF.schema.fieldNames.reduce(_ + "," + _)) 输出标头
  2. 在 dsefs 上创建一个带有标题的文件
  3. 使用 hadoop Filesystem API 将所有分区文件(无头文件)附加到 #2 中的文件

回答by belka

To merge files in a folder into one file:

要将文件夹中的文件合并为一个文件:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

If you want to merge all files into one file, but still in the same folder (butthis brings all data to the driver node):

如果要将所有文件合并为一个文件,但仍位于同一文件夹中(但这会将所有数据带到驱动程序节点):

dataFrame
      .coalesce(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save(out)

Another solution would be to use solution #2 then move the one file inside the folder to another path (with the name of our CSV file).

另一种解决方案是使用解决方案#2,然后将文件夹内的一个文件移动到另一个路径(使用我们的 CSV 文件的名称)。

def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpDir = "tmpDir"

    df.repartition(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", header.toString)
      .option("delimiter", sep)
      .save(tmpDir)

    val dir = new File(tmpDir)
    val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
    (new File(tmpCsvFile)).renameTo(new File(fileName))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
}

回答by eugenio calabrese

Try to specify the schema of the header and read all file from the folder using the option drop malformed of spark-csv. This should let you read all the files in the folder keeping only the headers (because you drop the malformed). Example:

尝试指定标题的架构并使用 spark-csv 的 drop malformed 选项从文件夹中读取所有文件。这应该让您读取文件夹中的所有文件,只保留标题(因为您删除了格式错误)。例子:

val headerSchema = List(
  StructField("example1", StringType, true),
  StructField("example2", StringType, true),
  StructField("example3", StringType, true)
)

val header_DF =sqlCtx.read
  .option("delimiter", ",")
  .option("header", "false")
  .option("mode","DROPMALFORMED")
  .option("inferSchema","false")
  .schema(StructType(headerSchema))
  .format("com.databricks.spark.csv")
  .load("folder containg the files")

In header_DF you will have only the rows of the headers, from this you can trasform the dataframe the way you need.

在 header_DF 中,您将只有标题的行,由此您可以按照需要的方式转换数据帧。

回答by KrazyGautam

 // Convert JavaRDD  to CSV and save as text file
        outputDataframe.write()
                .format("com.databricks.spark.csv")
                // Header => true, will enable to have header in each file
                .option("header", "true")

Please follow the link with Integration test on how to write a single header

请按照有关如何编写单个标头的集成测试的链接进行操作

http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/

http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/