Apache Spark - reducebyKey - Java -

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25091091/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 07:19:36  来源:igfitidea点击:

Apache Spark - reducebyKey - Java -

javaapache-spark

提问by user641887

I am trying to understand the working of the reduceByKeyin Spark using java as the programming language.

我试图了解reduceByKey使用 java 作为编程语言的 Spark的工作原理。

Say I have a sentence "I am who I am". I break the sentence into words and store it as a list [I, am, who, I, am].

假设我有一句话“我就是我”。我将句子分解为单词并将其存储为列表[I, am, who, I, am]

Now this function assigns 1to each word:

现在这个函数分配1给每个单词:

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
    }
});

So the output is something like this:

所以输出是这样的:

(I,1) 
(am,1)
(who,1)
(I,1)
(am,1)

Now if I have 3 reducers running, each reducer will get a key and the values associated with that key:

现在,如果我有 3 个减速器在运行,每个减速器将获得一个键和与该键关联的值:

reducer 1:
    (I,1)
    (I,1)

reducer 2:
    (am,1)
    (am,1)

reducer 3:
    (who,1)

I wanted to know

我想知道

a. What exactly happens here in the function below.
b. What are the parameters new Function2<Integer, Integer, Integer>
c. Basically how the JavaPairRDD is formed.

一个。在下面的函数中究竟发生了什么。
湾 参数是什么new Function2<Integer, Integer, Integer>
C. 基本上是如何形成 JavaPairRDD 的。

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
});

回答by Sean Owen

I think your questions revolve around the reduce function here, which is a function of 2 arguments returning 1, whereas in a Reducer, you implement a function of many-to-many.

我认为你的问题围绕这里的 reduce 函数,它是一个返回 1 的 2 个参数的函数,而在 Reducer 中,你实现了一个多对多的函数。

This API is simpler if less general. Here you provide an associative operation that can reduce any 2 values down to 1 (e.g. two integers sum to one). This is used to reduce all values for each key to 1. It's not necessary to provide an N-to-1 function since it can be accomplished with a 2-to-1 function. Here, you can't emit multiple values for one key.

如果不那么通用,这个 API 更简单。在这里,您提供了一个关联运算,可以将任何 2 个值减少到 1(例如,两个整数之和为 1)。这用于将每个键的所有值减少到 1。没有必要提供 N 对 1 函数,因为它可以通过 2 对 1 函数完成。在这里,您不能为一个键发出多个值。

The result are (key, reduced value) from each (key, bunch of values).

结果是每个(键,值串)的(键,减少的值)。

The Mapper and Reducer in classic Hadoop MapReduce were actually both quite similar (just that one takes a collection of values rather than single value per key) and let you implement a lot of patterns. In a way that's good, in a way that was wasteful and complex.

经典 Hadoop MapReduce 中的 Mapper 和 Reducer 实际上都非常相似(只是一个值的集合而不是每个键的单个值)并且让您实现很多模式。以一种很好的方式,以一种既浪费又复杂的方式。

You can still reproduce what Mappers and Reducers do, but the method in Spark is mapPartitions, possibly paired with groupByKey. These are the most general operations you might consider, and I'm not saying you shouldemulate MapReduce this way in Spark. In fact it's unlikely to be efficient. But it is possible.

您仍然可以重现 Mappers 和 Reducers 所做的事情,但 Spark 中的方法是 mapPartitions,可能与 groupByKey 配对。这些是您可能会考虑的最通用的操作,我并不是说您应该在 Spark 中以这种方式模拟 MapReduce。事实上,它不太可能有效。但这是可能的。

回答by napster

The reduceByKey works as below:

reduceByKey 的工作原理如下:

in an RDD , if spark finds elements having same key, then spark takes their values and performs certain operations on those values, and returns the same type of value. for eg, let us take, you have and RDD with elements:

在 RDD 中,如果 spark 找到具有相同键的元素,则 spark 获取它们的值并对这些值执行某些操作,并返回相同类型的值。例如,让我们假设,你有和 RDD 的元素:

[k,V1], [K,V2], here V1, V2 are f same type then the arguments to new Function2() could be three.

[k,V1], [K,V2], 这里 V1, V2 是相同的类型,那么 new Function2() 的参数可以是三个。

  1. from the value part of first K,V pair i.e V1.
  2. from the value part of second K,V pair i.e V2.
  3. the return type for the overridden call method which is again of type V1 and V2 (which can be the result of the function operation provided as part of call method).
  1. 来自第一个 K,V 对的值部分,即 V1。
  2. 来自第二个 K,V 对的值部分,即 V2。
  3. 被覆盖的调用方法的返回类型也是 V1 和 V2 类型(它可以是作为 call 方法的一部分提供的函数操作的结果)。

and note that As RDD's are distributed across nodes, each node will perform their own reduce operation, and return the result to master, and the master again performs the final reduce operation on the results of workers.

并注意,由于RDD是跨节点分布的,每个节点都会执行自己的reduce操作,并将结果返回给master,master再次对workers的结果进行最后的reduce操作。

I guess this explains your query.

我想这解释了您的查询。

回答by Ajay Kr Choudhary

reduceByKey, as the name suggests, would apply a reduce operation on the JavaPairRDD for which the key is the same. If you refer to the documentation, it says that reduceByKey

顾名思义,reduceByKey 将对密钥相同的 JavaPairRDD 应用减少操作。如果你参考文档,它说reduceByKey

Merge the values for each key using an associative and commutative reduce function.

使用关联和可交换的归约函数合并每个键的值。

reduceByKey needs an implementation of Function2 Interface. The syntax of Function2 is: Function2<T1, T2, R>Here, input arguments are of type T1 and T2 and the output argument is of type R.

reduceByKey 需要 Function2 接口的实现。Function2 的语法是: Function2<T1, T2, R>这里,输入参数是 T1 和 T2 类型,输出参数是 R 类型。

Let's understand this with the example that you mentioned

让我们通过你提到的例子来理解这一点

Your JavaPairRDD upon which you want to apply reduceByKey is :

您要对其应用 reduceByKey 的 JavaPairRDD 是:

(I,1) 
(am,1)
(who,1)
(I,1)
(am,1)

In your JavaPairRDD, the Key is the first argument (words in this case) and the value is the second argument (1 assigned to each word). you want to apply reduceByKey in order to know which word has occurred how many times. Whenever we see the same word we want to add up the value of JavaPairRDD. so to add up the value you would want two input argument and the return would be one argument.

在您的 JavaPairRDD 中,Key 是第一个参数(在本例中为单词),值是第二个参数(为每个单词分配 1 个)。您想应用 reduceByKey 以了解哪个单词出现了多少次。每当我们看到相同的词时,我们都想将 JavaPairRDD 的值相加。因此,要将值相加,您需要两个输入参数,而返回值将是一个参数。

Therefore first two Integer, Integer in the syntax refers to the input and the third integer refers to the output. Relating with the syntax of Function2 Interface, T1 and T2 are integer and R is also Integer.

因此前两个整数,语法中的整数指的是输入,第三个整数指的是输出。与Function2 接口的语法有关,T1 和T2 是整数,R 也是整数。

To answer the question c)

回答问题 c)

The JavaPairRDD which is finally formed by applying the reduceByKey operation will have the keyas the original key of the JavaPairRDD on which reduceByKey is applied and the valuewould be the final reduced value as computed in the implementation of Function2 Interface.

通过应用reduceByKey 操作最终形成的JavaPairRDD 将以key作为应用reduceByKey 的JavaPairRDD的原始key,该将是在Function2 接口的实现中计算的最终缩减值。

If you get confused with the parameter of these Functional Interfaces in general. Then you can probably use this rule: Input arguments would be followed by the output argumentsin the syntax declaration of the Interface.

如果您对这些功能接口的参数感到困惑。那么你可能可以使用这个规则:在接口的语法声明中,输入参数后面跟着输出参数

The input argument/arguments would be in the parenthesis of the function and The output argument would be the one mentioned before the function name.

输入参数/参数将在函数的括号中,输出参数将是在函数名称之前提到的参数。

For example:

例如:

  1. Look at the declaration of PairFunction in the question that you asked. It is PairFunction<String, String, Integer>and the corresponding call method is Tuple2<String, Integer> call(String s). So the input is String here and the output is formed by String, Integer.
  2. Look at the declaration of Function2 Interface. It is Function2<Integer, Integer, Integer>and the corresponding call method is Integer call(Integer i1, Integer i2). So the inputs are two Integer and the output is one Integer.
  1. 查看您提出的问题中 PairFunction 的声明。是 PairFunction<String, String, Integer>,对应的调用方法是 Tuple2<String, Integer> call(String s)。所以这里的输入是String,输出是String,Integer。
  2. 查看Function2 Interface的声明。是Function2<Integer, Integer, Integer>,对应的调用方法是Integer call(Integer i1, Integer i2)。所以输入是两个整数,输出是一个整数。

I hope it helps.

我希望它有帮助。

回答by Naveen Kumar

In short, consider this:

简而言之,请考虑:

Input: {(a:1),(b:2),(c:2),(a:3),(b:2),(c:3)}

输入: {(a:1),(b:2),(c:2),(a:3),(b:2),(c:3)}

Pass it to reduceByKey.

传递给reduceByKey.

Output: {(a:4),(b:4),(c:5)}

输出: {(a:4),(b:4),(c:5)}