scala 如何在scala spark中通过键连接两个数据集

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39822377/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:41:24  来源:igfitidea点击:

how to join two datasets by key in scala spark

scalaapache-spark

提问by tobby

I have two datasets and each dataset have two elements. Below are examples.

我有两个数据集,每个数据集都有两个元素。下面是例子。

Data1: (name, animal)

数据1:(名称,动物)

('abc,def', 'monkey(1)')
('df,gh', 'zebra')
...

Data2: (name, fruit)

数据2:(名称,水果)

('a,efg', 'apple')
('abc,def', 'banana(1)')
...

Results expected: (name, animal, fruit)

预期结果:(名称、动物、水果)

('abc,def', 'monkey(1)', 'banana(1)')
... 

I want to join these two datasets by using first column 'name.' I have tried to do this for a couple of hours, but I couldn't figure out. Can anyone help me?

我想通过使用第一列“名称”来连接这两个数据集。我已经尝试这样做了几个小时,但我无法弄清楚。谁能帮我?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text1 = sc.textFile(args(0))
val text2 = sc.textFile(args(1))

val joined = text1.join(text2)

Above code is not working!

上面的代码不起作用!

采纳答案by maasg

joinis defined on RDDs of pairs, that is, RDDs of type RDD[(K,V)]. The first step needed is to transform the input data into the right type.

join定义在成对的 RDD 上,即类型为 的 RDD RDD[(K,V)]。所需的第一步是将输入数据转换为正确的类型。

We first need to transform the original data of type Stringinto pairs of (Key, Value):

我们首先需要将 type 的原始数据String转换成对(Key, Value)

val parse:String => (String, String) = s => {
  val regex = "^\('([^']+)',[\W]*'([^']+)'\)$".r
  s match {
    case regex(k,v) => (k,v)
    case _ => ("","")
  }
}

(Note that we can't use a simple split(",")expression because the key contains commas)

(请注意,我们不能使用简单的split(",")表达式,因为键包含逗号)

Then we use that function to parse the text input data:

然后我们使用该函数来解析文本输入数据:

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')")
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')")

val rdd1 = sparkContext.parallelize(s1)
val rdd2 = sparkContext.parallelize(s2)

val kvRdd1 = rdd1.map(parse)
val kvRdd2 = rdd2.map(parse)

Finally, we use the joinmethod to join the two RDDs

最后,我们使用join方法加入两个RDD

val joined = kvRdd1.join(kvRdd2)

// Let's check out results

// 让我们看看结果

joined.collect

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1))))

回答by Arunakiran Nulu

You have to create pairRDDs first for your data sets then you have to apply join transformation. Your data sets are not looking accurate.

您必须首先为您的数据集创建 pairRDD,然后您必须应用连接转换。您的数据集看起来不准确。

Please consider the below example.

请考虑以下示例。

**Dataset1**

a 1
b 2
c 3

**Dataset2**

a 8
b 4

Your code should be like below in Scala

您的代码在 Scala 中应该如下所示

    val pairRDD1 = sc.textFile("/path_to_yourfile/first.txt").map(line => (line.split(" ")(0),line.split(" ")(1)))

    val pairRDD2 = sc.textFile("/path_to_yourfile/second.txt").map(line => (line.split(" ")(0),line.split(" ")(1)))

    val joinRDD = pairRDD1.join(pairRDD2)

    joinRDD.collect

Here is the result from scala shell

这是 Scala shell 的结果

res10: Array[(String, (String, String))] = Array((a,(1,8)), (b,(2,4)))