Scala Spark 中的分布式映射

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24724786/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:24:27  来源:igfitidea点击:

Distributed Map in Scala Spark

scalaapache-spark

提问by blue-sky

Does Spark support distributed Map collection types ?

Spark 是否支持分布式 Map 集合类型?

So if I have an HashMap[String,String] which are key,value pairs , can this be converted to a distributed Map collection type ? To access the element I could use "filter" but I doubt this performs as well as Map ?

因此,如果我有一个 HashMap[String,String] 是键值对,可以将其转换为分布式 Map 集合类型吗?要访问元素,我可以使用“过滤器”,但我怀疑它的性能与 Map 一样好?

采纳答案by aaronman

Since I found some new info I thought I'd turn my comments into an answer. @maasg already covered the standard lookupfunction I would like to point out you should be careful because if the RDD's partitioner is None, lookup just uses a filter anyway. In reference to the (K,V) store on top of spark it looks like this is in progress, but a usable pull request has been made here. Here is an example usage.

因为我发现了一些新信息,所以我想我会把我的评论变成答案。@maasg 已经涵盖了标准lookup函数,我想指出您应该小心,因为如果 RDD 的分区器是 None,则查找无论如何都只使用过滤器。关于 spark 之上的 (K,V) 存储,它看起来正在进行中,但已在此处提出一个可用的拉取请求。这是一个示例用法。

import org.apache.spark.rdd.IndexedRDD

// Create an RDD of key-value pairs with Long keys.
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing
// the entries.
val indexed = IndexedRDD(rdd).cache()

// Perform a point update.
val indexed2 = indexed.put(1234L, 10873).cache()
// Perform a point lookup. Note that the original IndexedRDD remains
// unmodified.
indexed2.get(1234L) // => Some(10873)
indexed.get(1234L) // => Some(0)

// Efficiently join derived IndexedRDD with original.
val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0)
indexed3.collect // => Array((1234L, 10873))

// Perform insertions and deletions.
val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache()
indexed2.get(-100L) // => None
indexed4.get(-100L) // => Some(111)
indexed2.get(999L) // => Some(0)
indexed4.get(999L) // => None

It seems like the pull request was well received and will probably be included in future versions of spark, so it is probably safe to use that pull request in your own code. Here is the JIRA ticketin case you were curious

拉取请求似乎很受欢迎,并且可能会包含在 Spark 的未来版本中,因此在您自己的代码中使用该拉取请求可能是安全的。这是JIRA 票,以防您好奇

回答by maasg

The quick answer: Partially.

快速回答:部分。

You can transform a Map[A,B]into an RDD[(A,B)]by first forcing the map into a sequence of (k,v)pairs but by doing so you loose the constrain that keys of a map must be a set. ie. you loose the semantics of the Mapstructure.

您可以通过首先将映射强制转换为一系列对来将 aMap[A,B]转换为 an RDD[(A,B)](k,v)但是这样做可以解除映射的键必须是集合的约束。IE。你失去了Map结构的语义。

From a practical perspective, you can still resolve an element into its corresponding value using kvRdd.lookup(element)but the result will be a sequence, given that you have no warranties that there's a single lookup value as explained before.

从实际的角度来看,您仍然可以使用将元素解析为其对应的值,kvRdd.lookup(element)但结果将是一个序列,因为您不能保证存在单个查找值,如前所述。

A spark-shell example to make things clear:

一个让事情更清楚的 spark-shell 示例:

val englishNumbers = Map(1 -> "one", 2 ->"two" , 3 -> "three")
val englishNumbersRdd = sc.parallelize(englishNumbers.toSeq)

englishNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one) 

val spanishNumbers = Map(1 -> "uno", 2 -> "dos", 3 -> "tres")
val spanishNumbersRdd = sc.parallelize(spanishNumbers.toList)

val bilingueNumbersRdd = englishNumbersRdd union spanishNumbersRdd

bilingueNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one, uno)