scala 左反加入Spark?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43186888/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:10:19  来源:igfitidea点击:

Left Anti join in Spark?

scalaapache-spark

提问by sergeda

I've defined two tables like this:

我已经定义了两个这样的表:

 val tableName = "table1"
    val tableName2 = "table2"

    val format = new SimpleDateFormat("yyyy-MM-dd")
      val data = List(
        List("mike", 26, true),
        List("susan", 26, false),
        List("john", 33, true)
      )
    val data2 = List(
        List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
        List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
        List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
      )

      val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
      val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))
      val schema = StructType(Array(
        StructField("name", StringType, true),
        StructField("age", IntegerType, true),
        StructField("isBoy", BooleanType, false)
      ))
    val schema2 = StructType(Array(
        StructField("name", StringType, true),
        StructField("grade", StringType, true),
        StructField("howold", IntegerType, true),
        StructField("hobby", StringType, true),
        StructField("birthday", DateType, false)
      ))

      val df = sqlContext.createDataFrame(rdd, schema)
      val df2 = sqlContext.createDataFrame(rdd2, schema2)
      df.createOrReplaceTempView(tableName)
      df2.createOrReplaceTempView(tableName2)

I'm trying to build query to return rows from table1 that doesn't have matching row in table2. I've tried to do it using this query:

我正在尝试构建查询以从 table1 返回 table2 中没有匹配行的行。我尝试使用此查询来做到这一点:

Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL

but this just gives me all rows from table1:

但这只是给了我 table1 中的所有行:

List({"name":"john","age":33,"isBoy":true}, {"name":"susan","age":26,"isBoy":false}, {"name":"mike","age":26,"isBoy":true})

List({"name":"john","age":33,"isBoy":true}, {"name":"susan","age":26,"i​​sBoy":false}, {"name" :"mike","age":26,"i​​sBoy":true})

How to make this type of join in Spark efficiently?

如何在 Spark 中有效地进行这种类型的连接?

I'm looking for an SQL query because I need to be able to specify columns which to compare between two tables, not just compare row by row like it is done in other recommended questions. Like using subtract, except etc.

我正在寻找 SQL 查询,因为我需要能够指定要在两个表之间进行比较的列,而不仅仅是像在其他推荐问题中那样逐行比较。就像使用减法,除了等。

回答by Tzach Zohar

You can use the "left anti" join type - either with DataFrame API or with SQL (DataFrame API supports everything that SQL supports, including any join condition you need):

您可以使用“左反”连接类型 - 与 DataFrame API 或 SQL(DataFrame API 支持 SQL 支持的所有内容,包括您需要的任何连接条件):

DataFrame API:

数据帧 API:

df.as("table1").join(
  df2.as("table2"),
  $"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",
  "leftanti"
)

SQL:

查询语句:

sqlContext.sql(
  """SELECT table1.* FROM table1
    | LEFT ANTI JOIN table2
    | ON table1.name = table2.name AND table1.age = table2.howold
  """.stripMargin)

NOTE: it's also worth noting that there's a shorter, more concise way of creating the sample data without specifying the schema separately, using tuples and the implicit toDFmethod, and then "fixing" the automatically-inferred schema where needed:

注意:还值得注意的是,有一种更短、更简洁的方式来创建示例数据,无需单独指定架构,使用元组和隐式toDF方法,然后在需要的地方“修复”自动推断的架构:

import spark.implicits._
val df = List(
  ("mike", 26, true),
  ("susan", 26, false),
  ("john", 33, true)
).toDF("name", "age", "isBoy")

val df2 = List(
  ("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
  ("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
  ("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType))

回答by James Tobin

You can do it with the built in function except(I would have used the code you provided, but you didn't include the imports, so I couldn't just c/p it :( )

你可以使用内置函数来完成except(我会使用你提供的代码,但你没有包含导入,所以我不能只是 c/p :()

val a = sc.parallelize(Seq((1,"a",123),(2,"b",456))).toDF("col1","col2","col3")
val b= sc.parallelize(Seq((4,"a",432),(2,"t",431),(2,"b",456))).toDF("col1","col2","col3")

scala> a.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a| 123|
|   2|   b| 456|
+----+----+----+


scala> b.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   4|   a| 432|
|   2|   t| 431|
|   2|   b| 456|
+----+----+----+

scala> a.except(b).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a| 123|
+----+----+----+

回答by Andy Quiroz

you can use left anti.

你可以使用左反。

dfRcc20.as("a").join(dfClientesDuplicados.as("b")
  ,col("a.eteerccdiid")===col("b.eteerccdiid")&&
    col("a.eteerccdinr")===col("b.eteerccdinr")
  ,"left_anti")

回答by Arun

Left Anti Join in dataset spark?java:

A left anti join returns that all rows from the first dataset which do not have a match in the second dataset.

Example with code:

/*Read data from Employee.csv */
Dataset<Row> employee = sparkSession.read().option("header", "true")
                .csv("C:\Users\Desktop\Spark\Employee.csv");
employee.show();

/*Read data from Employee1.csv */
Dataset<Row> employee1 = sparkSession.read().option("header", "true")
                .csv("C:\Users\Desktop\Spark\Employee1.csv");
employee1.show();

/*Apply left anti join*/
Dataset<Row> leftAntiJoin = employee.join(employee1, employee.col("name").equalTo(employee1.col("name")), "leftanti");

leftAntiJoin.show();

Output:

1) Employee dataset
+-------+--------+-------+
|   name| address| salary|
+-------+--------+-------+
|   Arun|  Indore|    500|
|Shubham|  Indore|   1000|
| Mukesh|Hariyana|  10000|
|  Kanha|  Bhopal| 100000|
| Nandan|Jabalpur|1000000|
|   Raju|  Rohtak|1000000|
+-------+--------+-------+

2) Employee1 dataset
+-------+--------+------+
|   name| address|salary|
+-------+--------+------+
|   Arun|  Indore|   500|
|Shubham|  Indore|  1000|
| Mukesh|Hariyana| 10000|
+-------+--------+------+

3) Applied leftanti join and final data
+------+--------+-------+
|  name| address| salary|
+------+--------+-------+
| Kanha|  Bhopal| 100000|
|Nandan|Jabalpur|1000000|
|  Raju|  Rohtak|1000000|
+------+--------+-------+

回答by Shijin Ravindran

In SQL, you can simply your query to below (not sure if it works in SPARK)

在 SQL 中,您可以简单地查询以下内容(不确定它是否适用于 SPARK)

Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold where table2.name IS NULL 

This will return all rows of table1 for which join failed

这将返回连接失败的 table1 的所有行