scala Spark dataframe写方法写很多小文件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44459355/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark dataframe write method writing many small files
提问by user3030878
I've got a fairly simple job coverting log files to parquet. It's processing 1.1TB of data (chunked into 64MB - 128MB files - our block size is 128MB), which is approx 12 thousand files.
我有一个相当简单的工作,将日志文件转换为镶木地板。它正在处理 1.1TB 的数据(分成 64MB - 128MB 的文件 - 我们的块大小是 128MB),大约有 12000 个文件。
Job works as follows:
工作如下:
val events = spark.sparkContext
.textFile(s"$stream/$sourcetype")
.map(_.split(" \|\| ").toList)
.collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
.toDF()
df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")
It collects the events with a common schema, converts to a DataFrame, and then writes out as parquet.
它使用通用模式收集事件,转换为 DataFrame,然后作为 parquet 写出。
The problem I'm having is that this can create a bit of an IO explosion on the HDFS cluster, as it's trying to create so many tiny files.
我遇到的问题是这会在 HDFS 集群上造成一些 IO 爆炸,因为它试图创建如此多的小文件。
Ideally I want to create only a handful of parquet files within the partition 'date'.
理想情况下,我只想在分区“日期”内创建少数镶木地板文件。
What would be the best way to control this? Is it by using 'coalesce()'?
控制这种情况的最佳方法是什么?是通过使用“coalesce()”吗?
How will that effect the amount of files created in a given partition? Is it dependent on how many executors I have working in Spark? (currently set at 100).
这将如何影响在给定分区中创建的文件数量?它是否取决于我在 Spark 中工作的执行者数量?(目前设置为 100)。
回答by Raphael Roth
you have to repartiton your DataFrameto match the partitioning of the DataFrameWriter
您必须重新分区DataFrame以匹配DataFrameWriter
Try this:
试试这个:
df
.repartition($"date")
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")
回答by user3030878
In Python you can rewrite Raphael's Rothansweras:
在 Python 中,您可以将Raphael 的 Roth答案重写为:
(df
.repartition("date")
.write.mode("append")
.partitionBy("date")
.parquet("{path}".format(path=path)))
You might also consider adding more columns to .repartitionto avoid problems with very large partitions:
您还可以考虑添加更多列.repartition以避免非常大的分区出现问题:
(df
.repartition("date", another_column, yet_another_colum)
.write.mode("append")
.partitionBy("date)
.parquet("{path}".format(path=path)))
回答by eliasah
The simplest solution would be to replace your actual partitioning by :
最简单的解决方案是将您的实际分区替换为:
df
.repartition(to_date($"date"))
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")
You can also use more precise partitioning for your DataFramei.e the day and maybe the hour of an hour range. and then you can be less precise for writer.
That actually depends on the amount of data.
您还可以为您的DataFrameie使用更精确的分区,也许是一个小时的小时范围。然后你可以不那么精确的作家。这实际上取决于数据量。
You can reduce entropy by partitioning DataFrameand the write with partition by clause.
您可以通过分区DataFrame和使用 partition by 子句写入来减少熵。
回答by Jai Prakash
I came across the same issue and I could using coalescesolved my problem.
我遇到了同样的问题,我可以使用coalesce解决了我的问题。
df
.coalesce(3) // number of parts/files
.write.mode(SaveMode.Append)
.parquet(s"$path")
For more information on using coalesceor repartitionyou can refer to the following spark: coalesce or repartition
更多使用coalesce或者repartition可以参考以下spark:coalesce or repartition
回答by Narfanator
Duplicating my answer from here: https://stackoverflow.com/a/53620268/171916
从这里复制我的答案:https: //stackoverflow.com/a/53620268/171916
This is working for me very well:
这对我很有效:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
It produces N files in each output partition (directory), and is (anecdotally) faster than using coalesceand(again, anecdotally, on my data set) faster than only repartitioning on the output.
它在每个输出分区(目录)中生成 N 个文件,并且(据传闻)比使用更快,coalesce并且(再次传闻,在我的数据集上)比仅对输出重新分区更快。
If you're working with S3, I also recommend doing everything on local drives (Spark does a lot of file creation/rename/deletion during write outs) and once it's all settled use hadoop FileUtil(or just the aws cli) to copy everything over:
如果您正在使用 S3,我还建议您在本地驱动器上执行所有操作(Spark 在写出期间执行大量文件创建/重命名/删除),一旦全部解决,请使用 hadoop FileUtil(或仅使用 aws cli)复制所有内容:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
回答by Jeff A.
how about trying running scripts like this as map job consolidating all the parquet files into one:
如何尝试运行这样的脚本作为将所有镶木地板文件合并为一个的地图作业:
$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
-Dmapred.reduce.tasks=1 \
-input "/hdfs/input/dir" \
-output "/hdfs/output/dir" \
-mapper cat \
-reducer cat

