scala 如何定义DataFrame的分区?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30995699/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to define partitioning of DataFrame?
提问by rake
I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.
我已经开始在 Spark 1.4.0 中使用 Spark SQL 和 DataFrames。我想在 Scala 中的 DataFrames 上定义自定义分区器,但不知道如何执行此操作。
One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.
我正在使用的数据表之一包含一个交易列表,按帐户,类似于以下示例。
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.
至少在最初,大多数计算将发生在帐户内的交易之间。所以我希望对数据进行分区,以便一个帐户的所有事务都在同一个 Spark 分区中。
But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.
但我没有看到定义这一点的方法。DataFrame 类有一个名为“repartition(Int)”的方法,您可以在其中指定要创建的分区数。但是我没有看到任何可用于为 DataFrame 定义自定义分区器的方法,例如可以为 RDD 指定的方法。
The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.
源数据存储在 Parquet 中。我确实看到,在将 DataFrame 写入 Parquet 时,您可以指定要分区的列,所以大概我可以告诉 Parquet 按“帐户”列对其数据进行分区。但是可能有数百万个帐户,如果我正确理解 Parquet,它会为每个帐户创建一个不同的目录,因此这听起来不是一个合理的解决方案。
Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?
有没有办法让 Spark 对这个 DataFrame 进行分区,以便一个帐户的所有数据都在同一个分区中?
回答by zero323
Spark >= 2.3.0
火花 >= 2.3.0
SPARK-22614exposes range partitioning.
SPARK-22614公开范围分区。
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
SPARK-22389exposes external format partitioning in the Data Source API v2.
SPARK-22389在数据源 API v2 中公开外部格式分区。
Spark >= 1.6.0
火花 >= 1.6.0
In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410and SPARK-4849using repartitionmethod:
在 Spark >= 1.6 中,可以使用按列分区进行查询和缓存。参见:SPARK-11410和SPARK-4849使用repartition方法:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
Unlike RDDsSpark Dataset(including Dataset[Row]a.k.a DataFrame) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.
与RDDsSpark Dataset(包括Dataset[Row]aka DataFrame)不同,目前不能使用自定义分区器。您通常可以通过创建人工分区列来解决这个问题,但它不会为您提供相同的灵活性。
Spark < 1.6.0:
火花 < 1.6.0:
One thing you can do is to pre-partition input data before you create a DataFrame
您可以做的一件事是在创建之前对输入数据进行预分区 DataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
Since DataFramecreation from an RDDrequires only a simple map phase existing partition layout should be preserved*:
由于DataFrame从 an 创建RDD只需要一个简单的地图阶段,因此应保留现有分区布局*:
assert(df.rdd.partitions == partitioned.partitions)
The same way you can repartition existing DataFrame:
你可以用同样的方式重新分区现有的DataFrame:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:
所以看起来也不是没有可能。问题仍然是它是否有意义。我会争辩说,大多数时候它不会:
Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.
- joins in some scenarios, but it would require an internal support,
- window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
- simple aggregations with
GROUP BY- it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent togroupByKey.mapValues(_.reduce)(current behavior) vsreduceByKey(pre-partitioning). Unlikely to be useful in practice. - data compression with
SqlContext.cacheTable. Since it looks like it is using run length encoding, applyingOrderedRDDFunctions.repartitionAndSortWithinPartitionscould improve compression ratio.
Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.
- A whole point of using a high level declarative API is to isolate yourself from a low level implementation details. As already mentioned by @dwysakowiczand @RomiKuntsmanan optimization is a job of the Catalyst Optimizer. It is a pretty sophisticated beast and I really doubt you can easily improve on that without diving much deeper into its internals.
重新分区是一个昂贵的过程。在典型的场景中,大多数数据必须被序列化、混洗和反序列化。另一方面,可以从预分区数据中受益的操作数量相对较少,如果内部 API 没有设计为利用此属性,则会进一步受到限制。
- 在某些情况下加入,但它需要内部支持,
- 使用匹配的分区程序调用窗口函数。同上,仅限于单个窗口定义。虽然它已经在内部分区,所以预分区可能是多余的,
- 简单的聚合
GROUP BY- 可以减少临时缓冲区的内存占用**,但总体成本要高得多。或多或少相当于groupByKey.mapValues(_.reduce)(当前行为)与reduceByKey(预分区)。在实践中不太可能有用。 - 数据压缩与
SqlContext.cacheTable. 由于看起来它正在使用运行长度编码,因此应用OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩率。
性能高度依赖于密钥的分布。如果它有偏差,将导致资源利用率不理想。在最坏的情况下,根本不可能完成工作。
- 使用高级声明性 API 的一个重点是将自己与低级实现细节隔离开来。正如@dwysakowicz和@RomiKuntsman已经提到的,优化是Catalyst Optimizer 的工作。这是一个非常复杂的野兽,我真的怀疑你可以轻松地改进它,而无需深入研究它的内部结构。
Related concepts
相关概念
Partitioning with JDBC sources:
使用 JDBC 源进行分区:
JDBC data sources support predicatesargument. It can be used as follows:
JDBC 数据源支持predicates参数. 它可以按如下方式使用:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.
它为每个谓词创建一个 JDBC 分区。请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果表中看到重复项。
partitionBymethod in DataFrameWriter:
partitionBy方法在DataFrameWriter:
Spark DataFrameWriterprovides partitionBymethod which can be used to "partition" data on write. It separates data on write using provided set of columns
SparkDataFrameWriter提供了partitionBy可用于在写入时“分区”数据的方法。它使用提供的一组列在写入时分离数据
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
This enables predicate push down on read for queries based on key:
这使得基于键的查询的谓词下推读取:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
but it is not equivalent to DataFrame.repartition. In particular aggregations like:
但它不等同于DataFrame.repartition. 特别是像这样的聚合:
val cnts = df1.groupBy($"k").sum()
will still require TungstenExchange:
仍然需要TungstenExchange:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBymethod in DataFrameWriter(Spark >= 2.0):
bucketBy方法DataFrameWriter(火花> = 2.0):
bucketByhas similar applications as partitionBybut it is available only for tables (saveAsTable). Bucketing information can used to optimize joins:
bucketBy具有类似的应用程序,partitionBy但它仅适用于表 ( saveAsTable)。分桶信息可用于优化连接:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* By partition layoutI mean only a data distribution. partitionedRDD has no longer a partitioner.
** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.
*我所说的分区布局仅指数据分布。partitionedRDD 不再有分区器。** 假设没有提前预测。如果聚合仅涵盖列的一小部分,则可能没有任何收益。
回答by NightWolf
In Spark < 1.6 If you create a HiveContext, not the plain old SqlContextyou can use the HiveQLDISTRIBUTE BY colX...(ensures each of N reducers gets non-overlapping ranges of x) & CLUSTER BY colX...(shortcut for Distribute By and Sort By) for example;
在 Spark < 1.6 中,如果您创建一个HiveContext,而不是普通的旧版本,SqlContext您可以使用HiveQLDISTRIBUTE BY colX...(确保 N 个 reducer 中的每一个都获得不重叠的 x 范围)& CLUSTER BY colX...(分发方式和排序方式的快捷方式);
df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
Not sure how this fits in with Spark DF api. These keywords aren't supported in the normal SqlContext (note you dont need to have a hive meta store to use the HiveContext)
不确定这如何与 Spark DF api 配合。普通的 SqlContext 不支持这些关键字(注意你不需要有一个 hive 元存储来使用 HiveContext)
EDIT:Spark 1.6+ now has this in the native DataFrame API
编辑:Spark 1.6+ 现在在原生 DataFrame API 中有这个
回答by Dawid Wysakowicz
So to start with some kind of answer : ) - You can't
所以从某种答案开始:) - 你不能
I am not an expert, but as far as I understand DataFrames, they are not equal to rdd and DataFrame has no such thing as Partitioner.
我不是专家,但据我了解DataFrames,它们不等于rdd,DataFrame没有Partitioner这样的东西。
Generally DataFrame's idea is to provide another level of abstraction that handles such problems itself. The queries on DataFrame are translated into logical plan that is further translated to operations on RDDs. The partitioning you suggested will probably be applied automatically or at least should be.
通常,DataFrame 的想法是提供另一个抽象级别来处理此类问题。DataFrame 上的查询被转换为逻辑计划,该计划被进一步转换为对 RDD 的操作。您建议的分区可能会自动应用,或者至少应该是。
If you don't trust SparkSQL that it will provide some kind of optimal job, you can always transform DataFrame to RDD[Row] as suggested in of the comments.
如果您不相信 SparkSQL 会提供某种最佳工作,您始终可以按照评论中的建议将 DataFrame 转换为 RDD[Row]。
回答by Romi Kuntsman
Use the DataFrame returned by:
使用返回的 DataFrame:
yourDF.orderBy(account)
There is no explicit way to use partitionByon a DataFrame, only on a PairRDD, but when you sort a DataFrame, it will use that in it's LogicalPlan and that will help when you need to make calculations on each Account.
没有明确的方法可以partitionBy在 DataFrame 上使用,只能在 PairRDD 上使用,但是当您对 DataFrame 进行排序时,它将在它的 LogicalPlan 中使用它,这将在您需要对每个帐户进行计算时有所帮助。
I just stumbled upon the same exact issue, with a dataframe that I want to partition by account.
I assume that when you say "want to have the data partitioned so that all of the transactions for an account are in the same Spark partition", you want it for scale and performance, but your code doesn't depend on it (like using mapPartitions()etc), right?
我只是偶然发现了同样的问题,我想按帐户分区的数据框。我假设当你说“想要对数据进行分区以便一个帐户的所有事务都在同一个 Spark 分区中”时,你希望它的规模和性能,但你的代码不依赖于它(比如使用mapPartitions()等),对吗?
回答by Developer
I was able to do this using RDD. But I don't know if this is an acceptable solution for you.
Once you have the DF available as an RDD, you can apply repartitionAndSortWithinPartitionsto perform custom repartitioning of data.
我能够使用 RDD 做到这一点。但我不知道这是否是您可以接受的解决方案。一旦您将 DF 用作 RDD,您就可以申请repartitionAndSortWithinPartitions执行数据的自定义重新分区。
Here is a sample I used:
这是我使用的示例:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)

