scala 为什么 Spark 会因“检测到逻辑计划之间的内部连接的笛卡尔积”而失败?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/44762353/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:19:12  来源:igfitidea点击:

Why does Spark fail with "Detected cartesian product for INNER join between logical plans"?

scalaapache-sparkapache-spark-sql

提问by Marsellus Wallace

I am using Spark 2.1.0.

我正在使用Spark 2.1.0

When I execute the following code I'm getting an error from Spark. Why? How to fix it?

当我执行以下代码时,我收到来自 Spark 的错误消息。为什么?如何解决?

val i1 = Seq(("a", "string"), ("another", "string"), ("last", "one")).toDF("a", "b")
val i2 = Seq(("one", "string"), ("two", "strings")).toDF("a", "b")
val i1Idx = i1.withColumn("sourceId", lit(1))
val i2Idx = i2.withColumn("sourceId", lit(2))
val input = i1Idx.union(i2Idx)
val weights = Seq((1, 0.6), (2, 0.4)).toDF("sourceId", "weight")
weights.join(input, "sourceId").show

Error:

错误:

scala> weights.join(input, "sourceId").show
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [_1#34 AS sourceId#39, _2#35 AS weight#40]
+- Filter (((1 <=> _1#34) || (2 <=> _1#34)) && (_1#34 = 1))
   +- LocalRelation [_1#34, _2#35]
and
Union
:- Project [_1#0 AS a#5, _2#1 AS b#6]
:  +- LocalRelation [_1#0, _2#1]
+- Project [_1#10 AS a#15, _2#11 AS b#16]
   +- LocalRelation [_1#10, _2#11]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply.applyOrElse(Optimizer.scala:1011)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply.applyOrElse(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:993)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$$anonfun$apply.apply(RuleExecutor.scala:82)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided

回答by ktheitroadalo

You can triggers inner join after turning on the flag

开启标志后可以触发内连接

spark.conf.set("spark.sql.crossJoin.enabled", "true")

You also could also use the cross join.

您也可以使用交叉连接。

weights.crossJoin(input)

or set the Alias as

或将别名设置为

weights.join(input, input("sourceId")===weights("sourceId"), "cross")

You can find more about the issue SPARK-6459which is said to be fixed in 2.1.1

您可以找到有关SPARK-6459 问题的更多信息,据说该问题已在 2.1.1 中修复

As you have already used 2.1.1 the issue should have been fixed.

由于您已经使用了 2.1.1,这个问题应该已经得到解决。

Hope this helps!

希望这可以帮助!

回答by Jacek Laskowski

tl;drUpgrade to Spark 2.1.1. It's an issue in Spark that was fixed.

tl;dr升级到 Spark 2.1.1。这是 Spark 中已修复的问题。

(I really wished I could also show you the exact change that fixed that in 2.1.1)

(我真的希望我也可以向您展示在 2.1.1 中修复该问题的确切更改)

回答by coderz

For me:

给我:

Dataset<Row> ds1 = sparkSession.read().load("/tmp/data");
Dataset<Row> ds2 = ds1;
ds1.join(ds2, ds1.col("name").equalTo(ds2.col("name"))) // got "Detected cartesian product for INNER join between logical plans"

Dataset<Row> ds1 = sparkSession.read().load("/tmp/data");
Dataset<Row> ds2 = sparkSession.read().load("/tmp/data");
ds1.join(ds2, ds1.col("name").equalTo(ds2.col("name"))) // running properly without errors

I'm using Spark 2.1.0.

我正在使用 Spark 2.1.0。

回答by Navjot Bhardwaj

Got this error in SPARK version: 2.3.0.cloudera3

在 SPARK 版本中出现此错误:2.3.0.cloudera3

Solved by aliasing the dataframes.

通过别名数据帧解决。

e.g. re-assigning the failing dataframe to another dataframe and aliasing the name to that other dataframe.

例如,将失败的数据帧重新分配给另一个数据帧并将名称别名为该其他数据帧。

val dataFrame = inDataFrame.alias("dataFrame")

val dataFrame = inDataFrame.alias("dataFrame")

Hope this helps.

希望这可以帮助。