scala Spark:增加分区数量而不会造成洗牌?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27039246/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark: increase number of partitions without causing a shuffle?
提问by samthebest
When decreasing the number of partitions one can use coalesce, which is great because it doesn't cause a shuffle and seems to work instantly (doesn't require an additional job stage).
当减少一个可以使用的分区数量时coalesce,这很好,因为它不会导致无序并且似乎可以立即工作(不需要额外的工作阶段)。
I would like to do the opposite sometimes, but repartitioninduces a shuffle. I think a few months ago I actually got this working by using CoalescedRDDwith balanceSlack = 1.0- so what would happen is it would split a partition so that the resulting partitions location where all on the same node (so small net IO).
有时我想做相反的事情,但会repartition导致洗牌。我想几个月前我实际上是通过使用CoalescedRDDwithbalanceSlack = 1.0来实现这一点的 - 所以会发生什么是它会拆分一个分区,以便生成的分区位置都在同一个节点上(这么小的网络 IO)。
This kind of functionality is automatic in Hadoop, one just tweaks the split size. It doesn't seem to work this way in Spark unless one is decreasing the number of partitions. I think the solution might be to write a custom partitioner along with a custom RDD where we define getPreferredLocations... but I thought that is such a simple and common thing to do surely there must be a straight forward way of doing it?
这种功能在 Hadoop 中是自动的,只需调整拆分大小即可。除非减少分区数,否则它在 Spark 中似乎不起作用。我认为解决方案可能是编写一个自定义分区器以及我们定义的自定义 RDD getPreferredLocations......但我认为这是一件如此简单和常见的事情,肯定必须有一种直接的方法吗?
Things tried:
尝试的事情:
.set("spark.default.parallelism", partitions)on my SparkConf, and when in the context of reading parquet I've tried sqlContext.sql("set spark.sql.shuffle.partitions= ..., which on 1.0.0 causes an error AND not really want I want, I want partition number to change across all types of job, not just shuffles.
.set("spark.default.parallelism", partitions)在我的SparkConf, 以及在阅读镶木地板时,我尝试过sqlContext.sql("set spark.sql.shuffle.partitions= ...,这在 1.0.0 上会导致错误并且不是我真正想要的,我希望分区号在所有类型的作业中更改,而不仅仅是洗牌。
采纳答案by samthebest
Watch this space
关注此空间
https://issues.apache.org/jira/browse/SPARK-5997
https://issues.apache.org/jira/browse/SPARK-5997
This kind of really simple obvious feature will eventually be implemented - I guess just after they finish all the unnecessary features in Datasets.
这种非常简单明显的特性最终会被实现——我猜就在他们完成了Datasets 中所有不必要的特性之后。
回答by szefuf
I do not exactly understand what your point is. Do you mean you have now 5 partitions, but after next operation you want data distributed to 10? Because having 10, but still using 5 does not make much sense… The process of sending data to new partitions has to happen sometime.
我不完全明白你的意思。你的意思是你现在有 5 个分区,但是在下一次操作之后你想要将数据分发到 10 个?因为有 10 个,但仍然使用 5 个没有多大意义......将数据发送到新分区的过程必须在某个时候发生。
When doing coalesce, you can get rid of unsued partitions, for example: if you had initially 100, but then after reduceByKey you got 10 (as there where only 10 keys), you can set coalesce.
这样做时coalesce,您可以摆脱未起诉的分区,例如:如果您最初有 100 个,但是在 reduceByKey 之后您得到 10 个(因为那里只有 10 个键),您可以设置coalesce.
If you want the process to go the other way, you could just force some kind of partitioning:
如果您希望该过程以另一种方式进行,您可以强制进行某种分区:
[RDD].partitionBy(new HashPartitioner(100))
I'm not sure that's what you're looking for, but hope so.
我不确定这就是您要找的东西,但希望如此。
回答by Adrien Forbu
As you know pyspark use some kind of "lazy" way of running. It will only do the computation when there is some action to do (for exemple a "df.count()" or a "df.show()". So what you can do is define the a shuffle partition between those actions.
如您所知,pyspark 使用某种“懒惰”的运行方式。它只会在有一些动作要做时进行计算(例如“df.count()”或“df.show()”。所以你可以做的是在这些动作之间定义一个随机分区。
You can write :
你可以写 :
sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=100")
# you spark code here with some transformation and at least one action
df = df.withColumn("sum", sum(df.A).over(your_window_function))
df.count() # your action
df = df.filter(df.B <10)
df = df.count()
sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=10")
# you reduce the number of partition because you know you will have a lot
# less data
df = df.withColumn("max", max(df.A).over(your_other_window_function))
df.count() # your action

