scala 带有流源的查询必须使用 writeStream.start() 执行;

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40609771/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:50:58  来源:igfitidea点击:

Queries with streaming sources must be executed with writeStream.start();

scalaapache-spark-sqlspark-streaming

提问by shivali

I'm trying to read the messages from kafka (version 10) in spark and trying to print it.

我正在尝试在 spark 中读取来自 kafka(版本 10)的消息并尝试打印它。

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()

getting an error Exception in thread "main"

在线程“main”中出现错误异常

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

org.apache.spark.sql.AnalysisException:必须使用 writeStream.start(); 执行带有流媒体源的查询;

回答by ssice

You are branching the query plan: from the same ds1 you are trying to:

您正在分支查询计划:来自您尝试的同一个 ds1:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}
  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

But you are only calling .start()on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.

但是你只调用.start()了第二个分支,让另一个悬空没有终止,这反过来又抛出了你回来的异常。

The solution is to start both branches and await termination.

解决方案是启动两个分支并等待终止。

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()

回答by Rajeev Rathor

i fixed issue by using following code.

我使用以下代码修复了问题。

 val df = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", "streamTest2")
  .load();

    val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

回答by Rajeev Rathor

I struggled a lot with this issue. I tried each of suggested solution from various blog. But I my case there are few statement in between calling start() on query and finally at last i was calling awaitTerminate() function that cause this.

我在这个问题上挣扎了很多。我尝试了各种博客中的每个建议解决方案。但我的情况是,在查询调用 start() 和最后我调用 awaitTerminate() 函数之间几乎没有语句会导致此问题。

Please try in this fashion, It is perfectly working for me. Working example:

请以这种方式尝试,它对我来说非常有用。工作示例:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

If you write in this way that will cause exception/ error:

如果你这样写会导致异常/错误:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // some statement 
    // some statement 

    query.awaitTermination();

will throw given exception and will close your streaming driver.

将抛出给定的异常并将关闭您的流驱动程序。

回答by SLU

Kindly remove ds1.collect.foreach(println)and ds1.printSchema(), use outputModeand awaitAnyTerminationfor background process Waiting until any of the queries on the associated spark.streamshas terminated

请删除 ds1.collect.foreach(println)and ds1.printSchema(),使用outputModeandawaitAnyTermination为后台进程等待,直到相关联的任何查询 spark.streams终止

val spark = SparkSession
    .builder
    .appName("StructuredNetworkWordCount")
    .config("spark.master", "local[*]")
    .getOrCreate()

  val ds1 = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topicA")  .load()

  val consoleOutput1 = ds1.writeStream
     .outputMode("update")
     .format("console")
     .start()

  spark.streams.awaitAnyTermination()


|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+

回答by P696

I was able to resolves this issue by following code. In my scenario, I had multiple intermediate Dataframes, which were basically the transformations made on the inputDF.

我能够通过以下代码解决这个问题。在我的场景中,我有多个中间数据帧,它们基本上是对 inputDF 进行的转换。

 val query = joinedDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .outputMode(OutputMode.Complete())
      .start()
      .awaitTermination()

joinedDF is the result of the last transformation performed.

joinDF 是最后一次转换的结果。