scala 如何创建一个可以稍后通过方法调用接收元素的源?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30964824/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:16:44  来源:igfitidea点击:

How to create a Source that can receive elements later via a method call?

scalaakkaakka-streamakka-http

提问by ale64bit

I would like to create a Sourceand later push elements on it, like in:

我想在其上创建一个Source和稍后推送元素,例如:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

What is the recommended way to do this?

推荐的方法是什么?

Thanks!

谢谢!

回答by Ramón J Romero y Vigil

There are three ways this can be achieved:

有以下三种方法可以实现:

1. Post Materialization with SourceQueue

1. 使用 SourceQueue 后期实现

You can use Source.queuethat materializes the Flow into a SourceQueue:

您可以使用Source.queue将 Flow 实体化为一个SourceQueue

case class Weather(zipCode : String, temperature : Double, raining : Boolean)

val bufferSize = 100

//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .to(Sink foreach println)
                  .run() // in order to "keep" the queue Materialized value instead of the Sink's

queue offer Weather("02139", 32.0, true)

2. Post Materialization with Actor

2. 使用 Actor 进行后期实现

There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:

还有一个类似的问题和答案在这里,要点是,你兑现了流作为ActorRef和邮件发送到REF:

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run() // in order to "keep" the ref Materialized value instead of the Sink's

ref ! Weather("02139", 32.0, true)

3. Pre Materialization with Actor

3. 使用 Actor 预实现

Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here:

类似地,您可以显式创建一个包含消息缓冲区的 Actor,使用该 Actor 创建一个 Source,然后按照此处的答案中的描述发送该 Actor 消息:

object WeatherForwarder {
  def props : Props = Props[WeatherForwarder]
}

//see provided link for example definition
class WeatherForwarder extends Actor {...}

val actorRef = actorSystem actorOf WeatherForwarder.props 

//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true) 

//stream already has 1 Weather value to process which is sitting in the 
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}

回答by PetrosP

Since Akka 2.5 Sourcehas a preMaterializemethod.

由于 Akka 2.5Source有一个preMaterialize方法。

According to the documentation, this looks like the indicated way to do what you ask:

根据文档,这看起来像是按照您的要求执行的指示方式:

There are situations in which you require a Sourcematerialized value before the Sourcegets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, like Source.queue, Source.actorRefor Source.maybe.

在某些情况下,您需要一个Source物化值,然后才能Source将其连接到图形的其余部分。这在“物化价值驱动”源的情况下特别有用,例如Source.queueSource.actorRefSource.maybe

Below an example on how this would be with a SourceQueue. Elements are pushed to the queue before and after materialization, as well as from within the Flow:

下面是一个关于如何使用SourceQueue. 元素在物化之前和之后被推送到队列中,以及从Flow:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()


val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()

// Adding element before actual materialization
sourceMat.offer("pre materialization element")

val flow = Flow[String].map { e =>
  if(!e.contains("new")) {
    // Adding elements from within the flow
    sourceMat.offer("new element generated inside the flow")
  }
  s"Processing $e"
}

// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()

// Adding element after materialization
sourceMat.offer("post materialization element")

Output:

输出:

Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow

回答by Jesse Feinman

After playing around and looking for a good solution to this I came across this solution which is clean, simple, and works both pre and post materialization. https://stackoverflow.com/a/32553913/6791842

在四处寻找并为此寻找一个好的解决方案之后,我遇到了这个解决方案,它干净、简单,并且在实现前和实现后都有效。 https://stackoverflow.com/a/32553913/6791842

  val (ref: ActorRef, publisher: Publisher[Int]) =
    Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both).run()

  ref ! 1 //before

  val source = Source.fromPublisher(publisher)

  ref ! 2 //before
  Thread.sleep(1000)
  ref ! 3 //before

  source.runForeach(println)

  ref ! 4 //after
  Thread.sleep(1000)
  ref ! 5 //after

Output:

输出:

1
2
3
4
5