scala 在 Spark 中将多个小文件合并为几个大文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31009834/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:17:25  来源:igfitidea点击:

merge multiple small files in to few larger files in Spark

scalahadoopapache-sparkhiveapache-spark-sql

提问by dheee

I using hive through Spark. I have a Insert into partitioned table query in my spark code. The input data is in 200+gb. When Spark is writing to a partitioned table, it is spitting very small files(files in kb's). so now the output partitioned table folder have 5000+ small kb files. I want to merge these in to few large MB files, may be about few 200mb files. I tired using hive merge settings, but they don't seem to work.

我通过 Spark 使用 hive。我的 spark 代码中有一个 Insert into partitioned table 查询。输入数据为 200+gb。当 Spark 写入分区表时,它会吐出非常小的文件(以 kb 为单位的文件)。所以现在输出分区表文件夹有 5000 多个小 kb 文件。我想将这些合并到几个大 MB 文件中,可能只有几个 200mb 的文件。我厌倦了使用 hive 合并设置,但它们似乎不起作用。

'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")

 val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")

val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")

val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")

val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")

val result7G = hiveContext.sql("set hive.aux.jars.path=c:\Applications\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")

val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'

The above hive settings work in a mapreduce hive execution and spits out files of specified size. Is there any option to do this Spark or Scala?

上面的 hive 设置在 mapreduce hive 执行中工作并输出指定大小的文件。是否有任何选项可以执行此 Spark 或 Scala?

采纳答案by zweiterlinde

You may want to try using the DataFrame.coalescemethod; it returns a DataFrame with the specified number of partitions (each of which becomes a file on insertion). So using the number of records you are inserting and the typical size of each record, you can estimate how many partitions to coalesce to if you want files of ~200MB.

您可能想尝试使用DataFrame.coalesce方法;它返回一个具有指定分区数的 DataFrame(每个分区在插入时成为一个文件)。因此,使用您插入的记录数和每条记录的典型大小,您可以估计如果您想要大约 200MB 的文件,要合并多少个分区。

回答by Jussi Kujala

I had the same issue. Solution was to add DISTRIBUTE BY clause with the partition columns. This ensures that data for one partition goes to single reducer. Example in your case:

我遇到过同样的问题。解决方案是在分区列中添加 DISTRIBUTE BY 子句。这确保了一个分区的数据进入单个减速器。您的情况示例:

INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table DISTRIBUTE BY date

回答by gszecsenyi

The dataframe repartition(1)method works in this case.

数据帧repartition(1)方法适用于这种情况。