scala 为什么加入失败并显示“java.util.concurrent.TimeoutException:[300 秒]后期货超时”?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/41123846/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Why does join fail with "java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]"?
提问by Christos Hadjinikolis
I am using Spark 1.5.
我正在使用 Spark 1.5。
I have two dataframes of the form:
我有两个形式的数据框:
scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]
scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
libriFirstTable50Plus3DFhas 766,151 recordswhile linkPersonItemLessThan500DFhas 26,694,353 records. Note that I am using repartition(number)on linkPersonItemLessThan500DFsince I intend to join these two later on. I am following up the above code with:
libriFirstTable50Plus3DF有766,151 条记录,而linkPersonItemLessThan500DF有26,694,353 条记录。请注意,我正在使用repartition(number)onlinkPersonItemLessThan500DF因为我打算稍后加入这两个。我正在跟进上述代码:
val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))
for which I am getting this output:
我得到这个输出:
16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
 at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
 at org.apache.spark.sql.DataFrame$$anonfun$collect.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.DataFrame$$anonfun$collect.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
 at $iwC$$iwC$$iwC.<init>(<console>:93)
 at $iwC$$iwC.<init>(<console>:95)
 at $iwC.<init>(<console>:97)
 at <init>(<console>:99)
 at .<init>(<console>:103)
 at .<clinit>(<console>)
 at .<init>(<console>:7)
 at .<clinit>(<console>)
 at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
and I don't understand what is the issue. Is it as simple as increasing the waiting time? Is the join too intensive? Do I need more memory? Is the shufffling intensive? Can anyone help?
我不明白这是什么问题。是增加等待时间那么简单吗?加入是否过于密集?我需要更多内存吗?洗牌很激烈吗?任何人都可以帮忙吗?
回答by T. Gaw?da
This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.
发生这种情况是因为 Spark 尝试进行 Broadcast Hash Join 并且其中一个 DataFrames 非常大,因此发送它会消耗很多时间。
You can:
你可以:
- Set higher 
spark.sql.broadcastTimeoutto increase timeout -spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000) persist()both DataFrames, then Spark will use Shuffle Join - reference from here
- 设置更高
spark.sql.broadcastTimeout以增加超时 -spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000) persist()两个数据帧,然后 Spark 将使用 Shuffle Join - 从这里参考
PySpark
火花
In PySpark, you can set the config when you build the spark context in the following manner:
在 PySpark 中,您可以通过以下方式构建 Spark 上下文时设置配置:
spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()
回答by Jacek Laskowski
Just to add some code context to the very concise answer from @T. Gaw?da.
In your Spark application, Spark SQL did choose a broadcast hash joinfor the join because "libriFirstTable50Plus3DF has 766,151 records"which happened to be less than the so-called broadcast threshold(defaults to 10MB).
在您的 Spark 应用程序中,Spark SQL 确实为连接选择了广播哈希连接,因为“libriFirstTable50Plus3DF 有 766,151 条记录”恰好小于所谓的广播阈值(默认为 10MB)。
You can control the broadcast threshold using spark.sql.autoBroadcastJoinThresholdconfiguration property.
您可以使用spark.sql.autoBroadcastJoinThreshold配置属性控制广播阈值。
spark.sql.autoBroadcastJoinThresholdConfigures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.
spark.sql.autoBroadcastJoinThreshold配置表的最大大小(以字节为单位),该表将在执行连接时广播到所有工作节点。通过将此值设置为 -1 可以禁用广播。请注意,当前仅支持已运行命令 ANALYZE TABLE COMPUTE STATISTICS noscan 的 Hive Metastore 表的统计信息。
You can find that particular type of join in the stack trace:
您可以在堆栈跟踪中找到特定类型的连接:
org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
BroadcastHashJoinphysical operator in Spark SQL uses a broadcast variableto distribute the smaller dataset to Spark executors (rather than shipping a copy of it with every task).
BroadcastHashJoinSpark SQL 中的物理运算符使用广播变量将较小的数据集分发给 Spark 执行器(而不是随每个任务传送它的副本)。
If you used explainto review the physical query plan you'd notice the query uses BroadcastExchangeExecphysical operator. This is where you can see the underlying machinery for broadcasting the smaller table(and the timeout).
如果您曾经explain查看过物理查询计划,您会注意到该查询使用BroadcastExchangeExec物理运算符。在这里您可以看到用于广播较小表(和超时)的底层机制。
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
doExecuteBroadcastis part of SparkPlancontract that every physical operator in Spark SQL follows that allows for broadcasting if needed. BroadcastExchangeExechappens to need it.
doExecuteBroadcast是SparkPlanSpark SQL 中每个物理运算符都遵循的契约的一部分,允许在需要时进行广播。BroadcastExchangeExec碰巧需要它。
The timeoutparameter is what you are looking for.
该超时参数是你在找什么。
private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}
As you can see you can disable it completely (using a negative value) that would imply to wait for the broadcast variable to be shipped to executors indefinitely or use sqlContext.conf.broadcastTimeoutwhich is exactly spark.sql.broadcastTimeoutconfiguration property. The default value is 5 * 60seconds which you can see in the stacktrace:
如您所见,您可以完全禁用它(使用负值),这意味着无限期地等待广播变量发送给执行程序或使用sqlContext.conf.broadcastTimeout这正是spark.sql.broadcastTimeout配置属性。默认值为5 * 60秒,您可以在堆栈跟踪中看到:
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
java.util.concurrent.TimeoutException:期货在 [300 秒] 后超时
回答by Pedro H
In my case, it was caused by a broadcast over a large dataframe:
就我而言,它是由大数据帧上的广播引起的:
df.join(broadcast(largeDF))
So, based on the previous answers, I fixed it by removing the broadcast:
因此,根据之前的答案,我通过删除广播来修复它:
df.join(largeDF)
回答by lasclocker
In addition to increasing spark.sql.broadcastTimeoutor persist() both DataFrames, 
除了增加spark.sql.broadcastTimeout或persist()两个DataFrames,
You may try:
你可以试试:
1.disable broadcast by setting spark.sql.autoBroadcastJoinThresholdto -1
通过设置1.停用广播spark.sql.autoBroadcastJoinThreshold到-1
2.increase the spark driver memory by setting spark.driver.memoryto a higher value.
2.通过设置spark.driver.memory为更高的值来增加火花驱动器内存。

