scala 将新数据附加到分区的镶木地板文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34935393/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Append new data to partitioned parquet files
提问by Saman
I am writing an ETL process where I will need to read hourly log files, partition the data, and save it. I am using Spark (in Databricks). The log files are CSV so I read them and apply a schema, then perform my transformations.
我正在编写一个 ETL 过程,我需要在其中读取每小时的日志文件、对数据进行分区并进行保存。我正在使用 Spark(在 Databricks 中)。日志文件是 CSV,所以我读取它们并应用模式,然后执行我的转换。
My problem is, how can I save each hour's data as a parquet format but append to the existing data set? When saving, I need to partition by 4 columns present in the dataframe.
我的问题是,如何将每小时的数据保存为镶木地板格式但附加到现有数据集?保存时,我需要按数据框中存在的 4 列进行分区。
Here is my save line:
这是我的保存行:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
The problem is that if the destination folder exists the save throws an error. If the destination doesn't exist then I am not appending my files.
问题是,如果目标文件夹存在,则保存会引发错误。如果目标不存在,那么我不会附加我的文件。
I've tried using .mode("append")but I find that Spark sometimes fails midway through so I end up loosing how much of my data is written and how much I still need to write.
我试过使用,.mode("append")但我发现 Spark 有时会在中途失败,所以我最终丢失了我的数据写入量以及我仍然需要写入的数据量。
I am using parquet because the partitioning substantially increases my querying in the future. As well, I must write the data as some file format on disk and cannot use a database such as Druid or Cassandra.
我正在使用镶木地板,因为分区会大大增加我将来的查询。同样,我必须将数据以某种文件格式写入磁盘,不能使用 Druid 或 Cassandra 等数据库。
Any suggestions for how to partition my dataframe and save the files (either sticking to parquet or another format) is greatly appreciated.
非常感谢有关如何分区我的数据框和保存文件(坚持镶木地板或其他格式)的任何建议。
回答by Glennie Helles Sindholt
If you need to append the files, you definitely have to use the append mode. I don't know how many partitions you expect it to generate, but I find that if you have manypartitions, partitionBywill cause a number of problems (memory- and IO-issues alike).
如果你需要追加文件,你肯定要使用追加模式。我不知道您希望它生成多少个分区,但我发现如果您有多个分区,partitionBy将导致许多问题(内存和 IO 问题类似)。
If you think that your problem is caused by write operations taking too long, I recommend that you try these two things:
如果您认为您的问题是由于写操作时间过长造成的,我建议您尝试以下两点:
1) Use snappy by adding to the configuration:
1) 通过添加到配置中使用 snappy:
conf.set("spark.sql.parquet.compression.codec", "snappy")
2) Disable generation of the metadata files in the hadoopConfigurationon the SparkContextlike this:
2)禁止代的元数据文件的hadoopConfiguration上SparkContext是这样的:
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
The metadata-files will be somewhat time consuming to generate (see this blog post), but according to thisthey are not actually important. Personally, I always disable them and have no issues.
元数据,文件将在一定程度上消耗产生(见时间这篇博客),但根据本他们实际上并不重要。就个人而言,我总是禁用它们并且没有问题。
If you generate many partitions (> 500), I'm afraid the best I can do is suggest to you that you look into a solution notusing append-mode - I simply never managed to get partitionByto work with that many partitions.
如果您生成许多分区(> 500),恐怕我能做的最好的事情就是建议您研究不使用附加模式的解决方案- 我只是从未设法partitionBy使用这么多分区。
回答by MrChrisRodriguez
If you're using unsorted partitioning your data is going to be split across all of your partitions. That means every task will generate and write data to each of your output files.
如果您使用未排序分区,您的数据将被拆分到所有分区。这意味着每个任务都会生成数据并将数据写入每个输出文件。
Consider repartitioning your data according to your partition columns before writing to have all the data per output file on the same partitions:
在写入之前,请考虑根据分区列对数据进行重新分区,以使每个输出文件的所有数据都位于同一分区上:
data
.filter(validPartnerIds($"partnerID"))
.repartition([optional integer,] "partnerID","year","month","day")
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)

