scala 使用单个标题合并 Spark 输出 CSV 文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38056152/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Merge Spark output CSV files with a single header
提问by V. Samma
I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.
我想在 AWS 中创建一个数据处理管道,最终将处理后的数据用于机器学习。
I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learningtool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.
我有一个 Scala 脚本,它从 S3 获取原始数据,处理它并将其写入 HDFS 甚至 S3 与Spark-CSV。如果我想使用AWS 机器学习工具来训练预测模型,我想可以使用多个文件作为输入。但是如果我想使用其他东西,我认为最好是收到一个 CSV 输出文件。
Currently, as I do not want to use repartition(1)nor coalesce(1)for performance purposes, I have used hadoop fs -getmergefor manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headersin the data file for training the prediction model.
目前,由于我不想使用repartition(1)或coalesce(1)来达到性能目的,因此我使用了hadoop fs -getmerge进行手动测试,但由于它只是合并了作业输出文件的内容,因此我遇到了一个小问题。我需要数据文件中的一行标题来训练预测模型。
If I use .option("header","true")for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.
如果我.option("header","true")用于 spark-csv,那么它将标题写入每个输出文件,合并后我在数据中的标题行与输出文件的行数一样多。但是如果 header 选项为 false,那么它不会添加任何标题。
Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shellwith the code below.
现在我找到了将 Scala 脚本中的文件与 Hadoop API 合并的选项FileUtil.copyMerge。我spark-shell用下面的代码尝试了这个。
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?
但是这个解决方案仍然只是将文件相互连接起来并且不处理标题。如何获得只有一行标题的输出文件?
I even tried adding df.columns.mkString(",")as the last argument for copyMerge, but this added the headers still multiple times, not once.
我什至尝试添加df.columns.mkString(",")作为 的最后一个参数copyMerge,但这仍然多次添加标题,而不是一次。
回答by Kang
you can walk around like this.
你可以像这样四处走动。
- 1.Create a new DataFrame(headerDF) containing header names.
- 2.Union it with the DataFrame(dataDF) containing the data.
- 3.Output the union-ed DataFrame to disk with option("header", "false").
- 4.merge partition files(part-0000**0.csv) using hadoop FileUtil
- 1.创建一个包含标题名称的新 DataFrame(headerDF)。
- 2.将它与包含数据的 DataFrame(dataDF) 联合起来。
- 3.使用option("header", "false") 将合并后的 DataFrame 输出到磁盘。
- 4.使用hadoop FileUtil合并分区文件(part-0000**0.csv)
In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following
通过这种方式,除了单个分区的内容具有来自 headerDF 的一行标题名称之外,所有分区都没有标题。当所有分区合并在一起时,文件顶部有一个标题。示例代码如下
//dataFrame is the data to save on disk
//cast types of all columns to String
val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
//merge header names with data
headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)
//use hadoop FileUtil to merge all partition csv files into a single file
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
回答by Sam Jacob
- Output the header using dataframe.schema ( val header = dataDF.schema.fieldNames.reduce(_ + "," + _))
- create a file with the header on dsefs
- append all the partition files (headerless) to the file in #2 using hadoop Filesystem API
- 使用 dataframe.schema ( val header = dataDF.schema.fieldNames.reduce(_ + "," + _)) 输出标头
- 在 dsefs 上创建一个带有标题的文件
- 使用 hadoop Filesystem API 将所有分区文件(无头文件)附加到 #2 中的文件
回答by belka
To merge files in a folder into one file:
要将文件夹中的文件合并为一个文件:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
If you want to merge all files into one file, but still in the same folder (butthis brings all data to the driver node):
如果要将所有文件合并为一个文件,但仍位于同一文件夹中(但这会将所有数据带到驱动程序节点):
dataFrame
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)
Another solution would be to use solution #2 then move the one file inside the folder to another path (with the name of our CSV file).
另一种解决方案是使用解决方案#2,然后将文件夹内的一个文件移动到另一个路径(使用我们的 CSV 文件的名称)。
def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpDir = "tmpDir"
df.repartition(1)
.write
.format("com.databricks.spark.csv")
.option("header", header.toString)
.option("delimiter", sep)
.save(tmpDir)
val dir = new File(tmpDir)
val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
(new File(tmpCsvFile)).renameTo(new File(fileName))
dir.listFiles.foreach( f => f.delete )
dir.delete
}
回答by eugenio calabrese
Try to specify the schema of the header and read all file from the folder using the option drop malformed of spark-csv. This should let you read all the files in the folder keeping only the headers (because you drop the malformed). Example:
尝试指定标题的架构并使用 spark-csv 的 drop malformed 选项从文件夹中读取所有文件。这应该让您读取文件夹中的所有文件,只保留标题(因为您删除了格式错误)。例子:
val headerSchema = List(
StructField("example1", StringType, true),
StructField("example2", StringType, true),
StructField("example3", StringType, true)
)
val header_DF =sqlCtx.read
.option("delimiter", ",")
.option("header", "false")
.option("mode","DROPMALFORMED")
.option("inferSchema","false")
.schema(StructType(headerSchema))
.format("com.databricks.spark.csv")
.load("folder containg the files")
In header_DF you will have only the rows of the headers, from this you can trasform the dataframe the way you need.
在 header_DF 中,您将只有标题的行,由此您可以按照需要的方式转换数据帧。
回答by KrazyGautam
// Convert JavaRDD to CSV and save as text file
outputDataframe.write()
.format("com.databricks.spark.csv")
// Header => true, will enable to have header in each file
.option("header", "true")
Please follow the link with Integration test on how to write a single header
请按照有关如何编写单个标头的集成测试的链接进行操作
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/

