scala Spark DataFrame:orderBy 之后的 groupBy 是否保持该顺序?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39505599/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark DataFrame: does groupBy after orderBy maintain that order?
提问by Ana Todor
I have a Spark 2.0 dataframe examplewith the following structure:
我有一个example具有以下结构的 Spark 2.0 数据框:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.
它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序。
I have created an Aggregator groupConcat:
我创建了一个聚合器groupConcat:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
It helps me concatenate columns into strings to obtain this final dataframe:
它帮助我将列连接成字符串以获得最终的数据帧:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), does that guarantee that the hourly counts will be ordered correctly in their respective buckets?
我的问题是,如果我这样做example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"),是否可以保证每小时计数将在各自的存储桶中正确排序?
I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?
我读到 RDD 不一定是这种情况(请参阅Spark 按键排序,然后按分组排序以获得可迭代的排序?),但对于 DataFrames 可能有所不同?
If not, how can I work around it ?
如果没有,我该如何解决?
回答by Adair
groupBy after orderBy doesn't maintain order, as others have pointed out. What you want to do is use a Window function--partition on id and order by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).
正如其他人指出的那样,orderBy 之后的 groupBy 不维护顺序。你想要做的是使用一个 Window 函数——对 id 进行分区并按小时排序。您可以在此之上 collect_list ,然后获取结果列表的最大值(最大),因为它们是累积的(即第一个小时将仅在列表中包含它自己,第二个小时将在列表中包含 2 个元素,依此类推)。
Complete example code:
完整示例代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val data = Seq(( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)).toDF("id", "hour", "count")
val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn("collected", collect_list($"count")
.over(Window.partitionBy("id")
.orderBy("hour")))
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count").show
This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.
这使我们保持在 DataFrame 世界中。我还简化了 OP 使用的 UDF 代码。
Output:
输出:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+
回答by Kat
I have a case where the order is not always kept: sometimes yes, mostly no.
我有一个订单并不总是保留的情况:有时是的,大部分是不是。
My dataframe has 200 partitions running on Spark 1.6
我的数据帧有 200 个分区在 Spark 1.6 上运行
df_group_sort = data.orderBy(times).groupBy(group_key).agg(
F.sort_array(F.collect_list(times)),
F.collect_list(times)
)
to check the ordering I compare the return values of
检查排序我比较了的返回值
F.sort_array(F.collect_list(times))
and
和
F.collect_list(times)
giving e.g. (left: sort_array(collect_list()); right: collect_list())
给例如(左:sort_array(collect_list());右:collect_list())
2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
The left column is always sorted, while the right column only consists of sorted blocks. For different executions of take(), the order of the blocks in the right column is different.
左列总是排序的,而右列只包含排序的块。对于 take() 的不同执行,右列中块的顺序是不同的。
回答by Shyam
If you want to work around the implementation in Java (Scala and Python should be similar):
如果您想解决 Java 中的实现(Scala 和 Python 应该类似):
example.orderBy(“hour”)
.groupBy(“id”)
.agg(functions.sort_array(
functions.collect_list(
functions.struct(dataRow.col(“hour”),
dataRow.col(“count”))),false)
.as(“hourly_count”));
回答by Ashish
order may or may not be the same, depending on number of partitions and the distribution of data. We can solve using rdd itself.
顺序可能相同也可能不同,这取决于分区的数量和数据的分布。我们可以使用 rdd 本身来解决。
For example::
例如::
I saved the below sample data in a file and loaded it in hdfs.
我将以下示例数据保存在一个文件中并将其加载到 hdfs 中。
1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800
and executed the below command:
并执行以下命令:
sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()
output:
输出:
Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))
That is, we grouped the data by type, thereafter sorted by price, and the concatenated the ids with "~" as separator. The above command can be broken as below:
也就是说,我们按类型对数据进行分组,然后按价格排序,并以“~”为分隔符连接 id。上面的命令可以分解如下:
val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)
val groupedData=validData.groupBy(_(1)) //group data rdds
val sortedJoinedData=groupedData.mapValues(x=>{
val list=x.toList
val sortedList=list.sortBy(_(2))
val idOnlyList=sortedList.map(_(0))
idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()
we can then take a particular group by using the command
然后我们可以使用命令来获取特定的组
sortedJoinedData.filter(_._1=="type1").collect()
output:
输出:
Array[(String, String)] = Array((type1,2~1~5))
回答by ChoppyTheLumberHyman
No, sorting within groupByKeywill not necessarily be maintained but this is notoriously difficult to reproduce in memory on one node. As was previously said, the most typical way this happens is when things need to be repartitioned for the groupByKeyto take place. I managed to reproduce this by manually doing a repartitionafter the sort. Then I passed the results into the groupByKey.
不,内部排序groupByKey不一定会保持,但众所周知,很难在一个节点的内存中重现。如前所述,发生这种情况的最典型方式是需要对事物进行重新分区groupByKey才能发生。我设法通过repartition在sort. 然后我将结果传递到groupByKey.
case class Numbered(num:Int, group:Int, otherData:Int)
// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number
val v =
(1 to 100000)
// Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
.map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
// Be sure they are stored in a small number of partitions
.repartition(2)
.sort($"num")
// Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
.repartition(200)
.groupByKey(_.group)
.mapGroups {
case (g, nums) =>
nums // all you need is .sortBy(_.num) here to fix the problem
.map(_.num)
.mkString("~")
}
.collect()
// Walk through the concatenated strings. If any number ahead
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
if (next < prev) {
println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
}
next
}
}
回答by Interfector
The short answer is Yes, the hourly counts will maintain the same order.
简短的回答是肯定的,每小时计数将保持相同的顺序。
To generalise, it's important that you sort before you group. Also the sort must be the same as the group + the column for which you actually want the sorting.
概括地说,在分组之前进行排序很重要。此外,排序必须与组 + 您实际想要排序的列相同。
An example would be like:
一个例子是这样的:
employees
.sort("company_id", "department_id", "employee_role")
.groupBy("company_id", "department_id")
.agg(Aggregators.groupConcat(":", 2) as "count_per_role")

