scala 使用 Spark 通过 s3a 将镶木地板文件写入 s3 非常慢
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36927918/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Using Spark to write a parquet file to s3 over s3a is very slow
提问by Brutus35
I'm trying to write a parquetfile out to Amazon S3using Spark 1.6.1. The small parquetthat I'm generating is ~2GBonce written so it's not that much data. I'm trying to prove Sparkout as a platform that I can use.
我正在尝试将parquet文件写入Amazon S3使用Spark 1.6.1. parquet我生成的小~2GB数据是一次写入的,因此数据量并不多。我正在努力证明Spark自己是一个可以使用的平台。
Basically what I'm going is setting up a star schemawith dataframes, then I'm going to write those tables out to parquet. The data comes in from csv files provided by a vendor and I'm using Spark as an ETLplatform. I currently have a 3 node cluster in ec2(r3.2xlarge)So 120GBof memory on the executors and 16 cores total.
基本上我要设置一个star schemawith dataframes,然后我要把这些表写成镶木地板。数据来自供应商提供的 csv 文件,我使用 Spark 作为ETL平台。我现在有一个3节点集群中ec2(r3.2xlarge)所以120GB对执行人存储器和16个核总数。
The input files total about 22GB and I'm extracting about 2GB of that data for now. Eventually this will be many terabytes when I start loading the full dataset.
输入文件总共大约 22GB,我现在正在提取大约 2GB 的数据。最终,当我开始加载完整数据集时,这将是许多 TB。
Here is my spark/scala pseudocode:
这是我的火花/斯卡拉pseudocode:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
The count takes about 2 minutes for 465884512 rows. The write to parquet takes 38 minutes
对 465884512 行进行计数大约需要 2 分钟。写入镶木地板需要38 分钟
I understand that the coalescedoes a shuffle to the driver which does the write.... but the amount of time it's taking is making me think I'm doing something seriously wrong. Without the coalesce, this still takes 15 minutes, which IMO is still too long and gives me a ton of small parquetfiles. I'd like to have one large file per day of data that I'll have. I have code to do the partitioning by a field valueas well, and it is just as slow. I've also tried to output this to csvand that takes ~1 hour.
我知道这对coalesce执行写入的驱动程序进行了洗牌....但是它花费的时间让我觉得我做错了什么。如果没有coalesce,这仍然需要 15 分钟,IMO 仍然太长并且给了我大量的小parquet文件。我希望每天有一个大文件的数据。我也有代码可以按字段值进行分区,而且速度也很慢。我也尝试将其输出到csv这需要大约 1 小时。
Also, I'm not really setting run time props when I'm submitting my job. My console stats for one job are:
此外,当我提交我的工作时,我并没有真正设置运行时道具。我的一项工作的控制台统计数据是:
- Alive Workers: 2
- Cores in use: 16 Total, 16 Used
- Memory in use: 117.5 GB Total, 107.5 GB Used
- Applications: 1 Running, 5 Completed
- Drivers: 0 Running, 0 Completed
- Status: ALIVE
- 活着的工人:2
- 正在使用的内核:总共 16 个,已使用 16 个
- 正在使用的内存:总计 117.5 GB,已使用 107.5 GB
- 应用程序:1 个正在运行,5 个已完成
- 驱动程序:0 正在运行,0 已完成
- 状态:活着
回答by David
Spark defaults cause a large amount of (probably) unnecessary overhead during I/O operations, especially when writing to S3. This articlediscusses this more thoroughly, but there are 2 settings you'll want to consider changing.
Spark 默认值会在 I/O 操作期间导致大量(可能)不必要的开销,尤其是在写入 S3 时。本文对此进行了更彻底的讨论,但您需要考虑更改 2 个设置。
Using the DirectParquetOutputCommitter. By default, Spark will save all of the data to a temporary folder then move those files afterwards. Using the DirectParquetOutputCommitter will save time by directly writting to the S3 output path- No longer available in Spark 2.0+
- As stated in the jira ticket, the current solution is to
- Switch your code to using s3a and Hadoop 2.7.2+ ; it's better all round, gets better in Hadoop 2.8, and is the basis for s3guard
- Use the Hadoop FileOutputCommitter and set mapreduce.fileoutputcommitter.algorithm.version to 2
- As stated in the jira ticket, the current solution is to
-Schema merging is turned off by default as of Spark 1.5
Turn off Schema Merging. If schema merging is on, the driver node will scan all of the files to ensure a consistent schema. This is especially costly because it is not a distributed operation. Make sure this is turned off by doingval file = sqx.read.option("mergeSchema", "false").parquet(path)- No longer available in Spark 2.0+
使用 DirectParquetOutputCommitter。默认情况下,Spark 会将所有数据保存到一个临时文件夹中,然后再移动这些文件。使用 DirectParquetOutputCommitter 将通过直接写入 S3 输出路径来节省时间- 在 Spark 2.0+ 中不再可用
- 正如jira票中所述,目前的解决方案是
- 将您的代码切换为使用 s3a 和 Hadoop 2.7.2+ ;全面更好,在 Hadoop 2.8 中变得更好,并且是 s3guard 的基础
- 使用 Hadoop FileOutputCommitter 并将 mapreduce.fileoutputcommitter.algorithm.version 设置为 2
- 正如jira票中所述,目前的解决方案是
- 从Spark 1.5
开始,模式合并默认关闭。关闭模式合并。如果模式合并打开,驱动程序节点将扫描所有文件以确保模式一致。这特别昂贵,因为它不是分布式操作。确保通过执行此操作已关闭val file = sqx.read.option("mergeSchema", "false").parquet(path)- 在 Spark 2.0+ 中不再可用
回答by Steve Loughran
The direct output committer is gone from the spark codebase; you are to write your own/resurrect the deleted code in your own JAR. IF you do so, turn speculation off in your work, and know that other failures can cause problems too, where problem is "invalid data".
直接输出提交者从 spark 代码库中消失了;您将在自己的 JAR 中编写自己的/复活已删除的代码。如果这样做,请关闭工作中的推测,并知道其他故障也会导致问题,其中问题是“无效数据”。
On a brighter note, Hadoop 2.8 is going to add some S3A speedups specifically for reading optimised binary formats (ORC, Parquet) off S3; see HADOOP-11694for details. And some people are working on using Amazon Dynamo for the consistent metadata store which should be able to do a robust O(1) commit at the end of work.
好消息是,Hadoop 2.8 将增加一些 S3A 加速,专门用于从 S3 读取优化的二进制格式(ORC、Parquet);有关详细信息,请参阅HADOOP-11694。有些人正在使用 Amazon Dynamo 进行一致的元数据存储,这应该能够在工作结束时进行稳健的 O(1) 提交。
回答by jmng
One of the immediate approaches to speed up Spark writes to S3 is to use the EMRFS S3-optimized Committer .
加快 Spark 写入 S3 的直接方法之一是使用EMRFS S3-optimized Committer。
However, if you use s3a this committer cannot be used:
但是,如果您使用 s3a,则不能使用此提交者:
When the EMRFS S3-optimized Committer is Not Used
The committer is not used under the following circumstances:
When writing to HDFS -> When using the S3A file system When using an output format other than Parquet, such as ORC or text When using MapReduce or Spark's RDD API
未使用 EMRFS S3 优化提交程序时
在以下情况下不使用提交者:
When writing to HDFS -> When using the S3A file system When using an output format other than Parquet, such as ORC or text When using MapReduce or Spark's RDD API
I've tested this difference on AWS EMR 5.26, and using s3:// was 15%-30% faster than s3a:// (but still slow).
我已经在 AWS EMR 5.26 上测试了这种差异,使用 s3:// 比 s3a:// 快 15%-30%(但仍然很慢)。
The fastest way I've managed to accomplish such a copy/write was to write Parquet to a local HDFS and then use s3distcpto copy to S3; in one specific scenario (a few hundreds of small files) this was 5x times faster than writing a DataFrame to Parquet directly to S3.
我设法完成这种复制/写入的最快方法是将 Parquet 写入本地 HDFS,然后使用s3distcp复制到 S3;在一个特定场景(数百个小文件)中,这比将 DataFrame 直接写入 Parquet 到 S3 快 5 倍。
回答by Sebastian Brestin
I also had this issue. Additional from what the rest said, here is a complete explanation from AWS: https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/
我也有这个问题。除了其他人所说的之外,这里是 AWS 的完整解释:https: //aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with -emrfs-s3-optimized-committer/
During my experiment just changing to FileOutCommiter v2(from v1) improved the write 3-4x.
在我的实验中,只是更改为 FileOutCommiter v2(从 v1)将写入提高了 3-4 倍。
self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

