java 更改 DataFrame.write() 的输出文件名前缀

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36107581/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 00:57:30  来源:igfitidea点击:

Change output filename prefix for DataFrame.write()

javaapache-sparkmapreduceapache-spark-sql

提问by Rob

Output files generated via the Spark SQL DataFrame.write() method begin with the "part" basename prefix. e.g.

通过 Spark SQL DataFrame.write() 方法生成的输出文件以“part”基本名称前缀开头。例如

DataFrame sample_07 = hiveContext.table("sample_07");
sample_07.write().parquet("sample_07_parquet");

Results in:

结果是:

hdfs dfs -ls sample_07_parquet/                                                                                                                                                             
Found 4 items
-rw-r--r--   1 rob rob          0 2016-03-19 16:40 sample_07_parquet/_SUCCESS
-rw-r--r--   1 rob rob        491 2016-03-19 16:40 sample_07_parquet/_common_metadata
-rw-r--r--   1 rob rob       1025 2016-03-19 16:40 sample_07_parquet/_metadata
-rw-r--r--   1 rob rob      17194 2016-03-19 16:40 sample_07_parquet/part-r-00000-cefb2ac6-9f44-4ce4-93d9-8e7de3f2cb92.gz.parquet

I would like to change the output filename prefix used when creating a file using Spark SQL DataFrame.write(). I tried setting the "mapreduce.output.basename" property on the hadoop configuration for the Spark context. e.g.

我想更改使用 Spark SQL DataFrame.write() 创建文件时使用的输出文件名前缀。我尝试在 Spark 上下文的 hadoop 配置上设置“mapreduce.output.basename”属性。例如

public class MyJavaSparkSQL {

  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setAppName("MyJavaSparkSQL");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    ctx.hadoopConfiguration().set("mapreduce.output.basename", "myprefix");
    HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(ctx.sc());
    DataFrame sample_07 = hiveContext.table("sample_07");
    sample_07.write().parquet("sample_07_parquet");
    ctx.stop();
  }

That did not change the output filename prefix for the generated files.

这并没有改变生成文件的输出文件名前缀。

Is there a way to override the output filename prefix when using the DataFrame.write() method?

有没有办法在使用 DataFrame.write() 方法时覆盖输出文件名前缀?

回答by Tzach Zohar

You cannot change the "part" prefix while using any of the standard output formats (like Parquet). See this snippet from ParquetRelation source code:

使用任何标准输出格式(如 Parquet)时,您不能更改“part”前缀。请参阅 ParquetRelation源代码中的此片段:

private val recordWriter: RecordWriter[Void, InternalRow] = {
  val outputFormat = {
    new ParquetOutputFormat[InternalRow]() {
      // ...
      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
        // ..
        //  prefix is hard-coded here:
        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
    }
  }
}

If you really must control the part file names, you'll probably have to implement a custom FileOutputFormat and use one of Spark's save methods that accept a FileOutputFormat class (e.g. saveAsHadoopFile).

如果您确实必须控制部件文件名,则可能必须实现自定义 FileOutputFormat 并使用 Spark 的一种接受 FileOutputFormat 类的保存方法(例如saveAsHadoopFile)。

回答by Sarath Avanavu

Assuming that the output folder have only one csvfile in it, we can rename this grammatically (or dynamically) using the below code. In the below code (last line), get all files from the output directory with csvtype and rename that to a desired file name.

假设输出文件夹中只有一个csv文件,我们可以使用以下代码在语法上(或动态)重命名它。在下面的代码(最后一行)中,从输出目录中获取所有类型的文件csv并将其重命名为所需的文件名。

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val outputfolder_Path = "s3://<s3_AccessKey>:<s3_Securitykey>@<external_bucket>/<path>"     
val fs = FileSystem.get(new java.net.URI(outputfolder_Path), new Configuration())   
fs.globStatus(new Path(outputfolder_Path + "/*.*")).filter(_.getPath.toString.split("/").last.split("\.").last == "csv").foreach{l=>{ fs.rename(new Path(l.getPath.toString), new Path(outputfolder_Path + "/DesiredFilename.csv")) }}