scala 如何在 Spark 2.1 中保存分区的镶木地板文件?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43731679/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:13:28  来源:igfitidea点击:

How to save a partitioned parquet file in Spark 2.1?

scalaapache-sparkapache-spark-sqlparquet

提问by Daniel Lopez

I am trying to test how to write data in HDFS 2.7 using Spark 2.1. My data is a simple sequence of dummy values and the output should be partitioned by the attributes: idand key.

我正在尝试使用 Spark 2.1 测试如何在 HDFS 2.7 中写入数据。我的数据是一个简单的虚拟值序列,输出应按属性进行分区:idkey

 // Simple case class to cast the data
 case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)

 // Actual data to be stored
 val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1),
    SimpleTest("test", 12, 13.5.toFloat, 2),
    SimpleTest("test", 12, 13.5.toFloat, 3),
    SimpleTest("simple", 12, 13.5.toFloat, 1),
    SimpleTest("simple", 12, 13.5.toFloat, 2),
    SimpleTest("simple", 12, 13.5.toFloat, 3)
 )

 // Spark's workflow to distribute, partition and store
 // sc and sql are the SparkContext and SparkSession, respectively
 val testDataP = sc.parallelize(testData, 6)
 val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
 testDf.write.partitionBy("id", "key").parquet("/path/to/file")

I am expecting to get the following tree structure in HDFS:

我期望在 HDFS 中获得以下树结构:

- /path/to/file
   |- /id=test/key=1/part-01.parquet
   |- /id=test/key=2/part-02.parquet
   |- /id=test/key=3/part-03.parquet
   |- /id=simple/key=1/part-04.parquet
   |- /id=simple/key=2/part-05.parquet
   |- /id=simple/key=3/part-06.parquet

But when I run the previous code I get the following output:

但是当我运行前面的代码时,我得到以下输出:

/path/to/file/id=/key=24/
 |-/part-01.parquet
 |-/part-02.parquet
 |-/part-03.parquet
 |-/part-04.parquet
 |-/part-05.parquet
 |-/part-06.parquet

I do not know if there is something wrong in the code, or is there something else that Spark is doing.

我不知道代码中是否有问题,或者 Spark 正在做什么。

I'm executing spark-submitas follows:

我执行spark-submit如下:

spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=uncompressed --conf spark.sql.parquet.filterPushdown=true --class myClass myFatJar.jar

spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=未压缩 --conf spark.sql.parquet.filterPushdown=true --class myClass myFatJar.jar

采纳答案by Daniel Lopez

I found a solution! According to Cloudera, is a mapred-site.xmlconfiguration problem (check link below). Also, instead of writing the dataframe as: testDf.write.partitionBy("id", "key").parquet("/path/to/file")

我找到了解决办法!根据 Cloudera,是mapred-site.xml配置问题(检查下面的链接)。此外,不要将数据帧写为:testDf.write.partitionBy("id", "key").parquet("/path/to/file")

I did it as follows: testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file"). You can substitute <namenode>and <port>with the HDFS' masternode name and port, respectively.

我做了如下:testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file")。您可以分别用 HDFS 的主节点名称和端口替换<namenode><port>

Special thanks to @jacek-laskowski, for his valuable contribution.

特别感谢@jacek-laskowski 的宝贵贡献。

References:

参考:

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/mp/36363#M1090

Writing to HDFS in Spark/Scala

在 Spark/Scala 中写入 HDFS

回答by Jacek Laskowski

Interesting since...well..."it works for me".

很有趣,因为……嗯…… “它对我有用”

As you describe your dataset using SimpleTestcase class in Spark 2.1 you're import spark.implicits._away to have a typed Dataset.

当您SimpleTest在 Spark 2.1 中使用case 类描述您的数据集时,您将import spark.implicits._拥有一个类型化的Dataset.

In my case, sparkis sql.

就我而言,sparksql

In other words, you don't have to create testDataPand testDf(using sql.createDataFrame).

换句话说,您不必创建testDataPtestDf(使用sql.createDataFrame)。

import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")

In another terminal (after saving to /tmp/testDfdirectory):

在另一个终端中(保存到/tmp/testDf目录后):

$ tree /tmp/testDf/
/tmp/testDf/
├── _SUCCESS
├── id=simple
│?? ├── key=1
│?? │?? └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│?? ├── key=2
│?? │?? └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│?? └── key=3
│??     └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
    ├── key=1
    │?? └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    ├── key=2
    │?? └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    └── key=3
        └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet

8 directories, 7 files