SQL 在 Apache Spark Join 中包含空值
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/41728762/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Including null values in an Apache Spark Join
提问by Powers
I would like to include null values in an Apache Spark join. Spark doesn't include rows with null by default.
我想在 Apache Spark 连接中包含空值。默认情况下,Spark 不包含带有 null 的行。
Here is the default Spark behavior.
这是默认的 Spark 行为。
val numbersDf = Seq(
("123"),
("456"),
(null),
("")
).toDF("numbers")
val lettersDf = Seq(
("123", "abc"),
("456", "def"),
(null, "zzz"),
("", "hhh")
).toDF("numbers", "letters")
val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))
Here is the output of joinedDf.show()
:
这是输出joinedDf.show()
:
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
+-------+-------+
This is the output I would like:
这是我想要的输出:
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
| null| zzz|
+-------+-------+
回答by zero323
Spark provides a special NULL
safe equality operator:
Spark 提供了一个特殊的NULL
安全相等运算符:
numbersDf
.join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
.drop(lettersDf("numbers"))
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| null| zzz|
| | hhh|
+-------+-------+
Be careful not to use it with Spark 1.5 or earlier. Prior to Spark 1.6 it required a Cartesian product (SPARK-11111- Fast null-safe join).
注意不要在 Spark 1.5 或更早版本中使用它。在 Spark 1.6 之前,它需要笛卡尔积(SPARK-11111- Fast null-safe join)。
In Spark 2.3.0or later you can use Column.eqNullSafe
in PySpark:
在Spark 2.3.0或更高版本中,您可以Column.eqNullSafe
在PySpark 中使用:
numbers_df = sc.parallelize([
("123", ), ("456", ), (None, ), ("", )
]).toDF(["numbers"])
letters_df = sc.parallelize([
("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
]).toDF(["numbers", "letters"])
numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
+-------+-------+-------+
|numbers|numbers|letters|
+-------+-------+-------+
| 456| 456| def|
| null| null| zzz|
| | | hhh|
| 123| 123| abc|
+-------+-------+-------+
and %<=>%
in SparkR:
并%<=>%
在SparkR:
numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
letters_df <- createDataFrame(data.frame(
numbers = c("123", "456", NA, ""),
letters = c("abc", "def", "zzz", "hhh")
))
head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
numbers numbers letters
1 456 456 def
2 <NA> <NA> zzz
3 hhh
4 123 123 abc
With SQL(Spark 2.2.0+) you can use IS NOT DISTINCT FROM
:
使用SQL( Spark 2.2.0+),您可以使用IS NOT DISTINCT FROM
:
SELECT * FROM numbers JOIN letters
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers
This is can be used with DataFrame
API as well:
这也可以与DataFrame
API一起使用:
numbersDf.alias("numbers")
.join(lettersDf.alias("letters"))
.where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
回答by jasonS
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
val letters2 = lettersDf.withColumnRenamed("numbers","num2")
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull && $"num2".isNull) ,"outer")
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show //rename the columns back to the original names
回答by timothyzhang
Based on K L's idea, you could use foldLeft to generate join column expression:
基于 KL 的想法,您可以使用 foldLeft 生成连接列表达式:
def nullSafeJoin(rightDF: DataFrame, columns: Seq[String], joinType: String)(leftDF: DataFrame): DataFrame =
{
val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
val fullExpr = columns.tail.foldLeft(colExpr) {
(colExpr, p) => colExpr && leftDF(p) <=> rightDF(p)
}
leftDF.join(rightDF, fullExpr, joinType)
}
then, you could call this function just like:
然后,你可以像这样调用这个函数:
aDF.transform(nullSafejoin(bDF, columns, joinType))
回答by mpindado
Complementing the other answers, for PYSPARK < 2.3.0you would not have Column.eqNullSafeneither IS NOT DISTINCT FROM.
补充其他答案,对于PYSPARK < 2.3.0你不会有Column.eqNullSafe也不是 DISTINCT FROM。
You still can build the <=> operator with an sql expression to include it in the join, as long as you define alias for the join queries:
您仍然可以使用 sql 表达式构建 <=> 运算符以将其包含在连接中,只要您为连接查询定义别名:
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
numbers_df = spark.createDataFrame (["123","456",None,""], StringType()).toDF("numbers")
letters_df = spark.createDataFrame ([("123", "abc"),("456", "def"),(None, "zzz"),("", "hhh") ]).\
toDF("numbers", "letters")
joined_df = numbers_df.alias("numbers").join(letters_df.alias("letters"),
F.expr('numbers.numbers <=> letters.numbers')).\
select('letters.*')
joined_df.show()
+-------+-------+
|numbers|letters|
+-------+-------+
| 456| def|
| null| zzz|
| | hhh|
| 123| abc|
+-------+-------+
回答by K L
Try the following method to include the null rows to the result of JOIN operator:
尝试以下方法将空行包含到 JOIN 运算符的结果中:
def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = {
var columnsExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
columns.drop(1).foreach(column => {
columnsExpr = columnsExpr && (leftDF(column) <=> rightDF(column))
})
var joinedDF: DataFrame = leftDF.join(rightDF, columnsExpr, joinType)
columns.foreach(column => {
joinedDF = joinedDF.drop(leftDF(column))
})
joinedDF
}