scala Spark zipWithIndex 对于并行实现是否安全?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31846233/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:26:50  来源:igfitidea点击:

Is Spark zipWithIndex safe with parallel implementation?

scalaapache-spark

提问by sophie

If I have a file, and I did an RDD zipWithIndex per row,

如果我有一个文件,并且每行都做了一个 RDD zipWithIndex,

([row1, id1001, name, address], 0)
([row2, id1001, name, address], 1)
...
([row100000, id1001, name, address], 100000)

Will I be able to get the same index order if I reload the file? Since it runs in parallel, other rows may be partitioned differently?

如果我重新加载文件,我能否获得相同的索引顺序?由于它并行运行,其他行的分区可能不同?

回答by Paul

RDDs can be sorted, and so do have an order. This order is used to create the index with .zipWithIndex().

RDDs 可以排序,所以有一个顺序。此命令用于创建索引.zipWithIndex()

To get the same order each time depends upon what previous calls are doing in your program. The docs mention that .groupBy()can destroy order or generate different orderings. There may be other calls that do this as well.

每次获得相同的顺序取决于之前的调用在您的程序中执行的操作。文档提到.groupBy()可以破坏顺序或生成不同的顺序。可能还有其他调用也执行此操作。

I suppose you could always call .sortBy()before calling .zipWithIndex()if you needed to guarantee a specific ordering.

我想如果您需要保证特定的顺序,您可以在致电.sortBy()之前随时致电.zipWithIndex()

This is explained in the .zipWithIndex() scala API docs

这在 .zipWithIndex() scala API docs

public RDD<scala.Tuple2<T,Object>> zipWithIndex()Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

public RDD<scala.Tuple2<T,Object>> zipWithIndex()使用其元素索引压缩此 RDD。排序首先基于分区索引,然后是每个分区内项目的排序。所以第一个分区中的第一项获得索引 0,最后一个分区中的最后一项获得最大索引。这类似于 Scala 的 zipWithIndex 但它使用 Long 而不是 Int 作为索引类型。当这个RDD包含多个分区时,该方法需要触发一个spark作业。

请注意,某些 RDD,例如 groupBy() 返回的那些,不保证分区中元素的顺序。因此不能保证分配给每个元素的索引,如果重新评估 RDD,甚至可能会更改。如果需要固定的排序来保证相同的索引分配,您应该使用 sortByKey() 对 RDD 进行排序或将其保存到文件中。