scala 如何开始使用 Akka Streams?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35120082/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:59:32  来源:igfitidea点击:

How to get started with Akka Streams?

scalaakka-stream

提问by kiritsuku

The Akka Streams library already comes with quite a wealth of documentation. However, the main problem for me is that it provides too much material - I feel quite overwhelmed by the number of concepts that I have to learn. Lots of examples shown there feel very heavyweight and can't easily be translated to real world use cases and are therefore quite esoteric. I think it gives way too much details without explaining how to build all the building blocks together and how exactly it helps to solve specific problems.

Akka Streams 库已经附带了相当丰富的文档。然而,对我来说主要的问题是它提供了太多的材料——我对我必须学习的概念数量感到非常不知所措。那里显示的许多示例感觉非常重要,不能轻易转换为现实世界的用例,因此非常深奥。我认为它提供了太多细节,而没有解释如何将所有构建块构建在一起以及它究竟如何帮助解决特定问题。

There are sources, sinks, flows, graph stages, partial graphs, materialization, a graph DSL and a lot more and I just don't know where to start. The quick start guideis meant to be a starting place but I don't understand it. It just throws in the concepts mentioned above without explaining them. Furthermore the code examples can't be executed - there are missing parts which makes it more or less impossible for me to follow the text.

有源、汇、流、图阶段、部分图、物化、图 DSL 等等,我只是不知道从哪里开始。该快速入门指南,就是要一个首发位置,但我不明白。它只是引入了上面提到的概念而没有解释它们。此外,代码示例无法执行 - 缺少部分使我或多或少无法遵循文本。

Can anyone explain the concepts sources, sinks, flows, graph stages, partial graphs, materialization and maybe some other things that I missed in simple words and with easy examples that don't explain every single detail (and which are probably not needed anyway at the beginning)?

任何人都可以用简单的语言和简单的例子来解释概念源、汇、流、图阶段、部分图、物化以及其他一些我错过的东西开始)?

回答by kiritsuku

This answer is based on akka-streamversion 2.4.2. The API can be slightly different in other versions. The dependency can be consumed by sbt:

这个答案基于akka-streamversion 2.4.2。API 在其他版本中可能略有不同。依赖可以被sbt消耗:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"


Alright, lets get started. The API of Akka Streams consists of three main types. In contrast to Reactive Streams, these types are a lot more powerful and therefore more complex. It is assumed that for all the code examples the following definitions already exist:

好的,让我们开始吧。Akka Streams 的 API 由三种主要类型组成。与Reactive Streams 相比,这些类型更强大,因此也更复杂。假设对于所有代码示例,以下定义已经存在:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

The importstatements are needed for the type declarations. systemrepresents the actor system of Akka and materializerrepresents the evaluation context of the stream. In our case we use a ActorMaterializer, which means that the streams are evaluated on top of actors. Both values are marked as implicit, which gives the Scala compiler the possibility to inject these two dependencies automatically whenever they are needed. We also import system.dispatcher, which is a execution context for Futures.

import类型声明需要这些语句。system代表Akka的actor系统,materializer代表流的求值上下文。在我们的例子中,我们使用 a ActorMaterializer,这意味着流是在 actor 之上进行评估的。这两个值都标记为implicit,这使 Scala 编译器可以在需要时自动注入这两个依赖项。我们还导入了system.dispatcher,它是 的执行上下文Futures

A New API

一个新的 API

Akka Streams have these key properties:

Akka Streams 具有以下关键属性:

  • They implement the Reactive Streams specification, whose three main goals backpressure, async and non-blocking boundaries and interoperability between different implementations do fully apply for Akka Streams too.
  • They provide an abstraction for an evaluation engine for the streams, which is called Materializer.
  • Programs are formulated as reusable building blocks, which are represented as the three main types Source, Sinkand Flow. The building blocks form a graph whose evaluation is based on the Materializerand needs to be explicitly triggered.
  • 他们实现了Reactive Streams 规范,其三个主要目标背压、异步和非阻塞边界以及不同实现之间的互操作性也完全适用于 Akka Streams。
  • 它们为流的评估引擎提供了一个抽象,称为Materializer
  • 程序被制定为可重用的构建块,它们表示为三种主要类型SourceSinkFlow. 构建块形成一个图,其评估基于Materializer并且需要显式触发。

In the following a deeper introduction in how to use the three main types shall be given.

下面将更深入地介绍如何使用这三种主要类型。

Source

来源

A Sourceis a data creator, it serves as an input source to the stream. Each Sourcehas a single output channel and no input channel. All the data flows through the output channel to whatever is connected to the Source.

ASource是数据创建者,它充当流的输入源。每个Source都有一个输出通道,没有输入通道。所有数据都通过输出通道流向连接到Source.

Source

来源

Image taken from boldradius.com.

图片来自boldradius.com

A Sourcecan be created in multiple ways:

ASource可以通过多种方式创建:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

In the above cases we fed the Sourcewith finite data, which means they will terminate eventually. One should not forget, that Reactive Streams are lazy and asynchronous by default. This means one explicitly has to request the evaluation of the stream. In Akka Streams this can be done through the run*methods. The runForeachwould be no different to the well known foreachfunction - through the runaddition it makes explicit that we ask for an evaluation of the stream. Since finite data is boring, we continue with infinite one:

在上述情况下,我们为 提供了Source有限数据,这意味着它们最终会终止。不应忘记,默认情况下,Reactive Streams 是惰性和异步的。这意味着必须明确要求对流进行评估。在 Akka Streams 中,这可以通过run*方法来完成。这runForeach与众所周知的foreach函数没有什么不同- 通过run添加它明确表明我们要求对流进行评估。由于有限数据很无聊,我们继续无限数据:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

With the takemethod we can create an artificial stop point that prevents us from evaluating indefinitely. Since actor support is built-in, we can also easily feed the stream with messages that are sent to an actor:

使用该take方法,我们可以创建一个人工停止点,以防止我们无限期地进行评估。由于演员支持是内置的,我们还可以轻松地向流提供发送给演员的消息:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

We can see that the Futuresare executed asynchronously on different threads, which explains the result. In the above example a buffer for the incoming elements is not necessary and therefore with OverflowStrategy.failwe can configure that the stream should fail on a buffer overflow. Especially through this actor interface, we can feed the stream through any data source. It doesn't matter if the data is created by the same thread, by a different one, by another process or if they come from a remote system over the Internet.

我们可以看到Futures在不同的线程上异步执行,这就解释了结果。在上面的例子中,传入元素的缓冲区不是必需的,因此OverflowStrategy.fail我们可以配置流应该在缓冲区溢出时失败。特别是通过这个actor接口,我们可以通过任何数据源馈送流。数据是由同一个线程、不同的线程、另一个进程创建的,还是来自 Internet 上的远程系统都无关紧要。

Sink

下沉

A Sinkis basically the opposite of a Source. It is the endpoint of a stream and therefore consumes data. A Sinkhas a single input channel and no output channel. Sinksare especially needed when we want to specify the behavior of the data collector in a reusable way and without evaluating the stream. The already known run*methods do not allow us these properties, therefore it is preferred to use Sinkinstead.

ASink基本上是 a 的反义词Source。它是流的端点,因此消耗数据。ASink有一个输入通道,没有输出通道。Sinks当我们想以可重用的方式指定数据收集器的行为而不评估流时,尤其需要它。已知的run*方法不允许我们使用这些特性,因此最好使用它Sink

Sink

下沉

Image taken from boldradius.com.

图片来自boldradius.com

A short example of a Sinkin action:

a 的一个简短示例Sink

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Connecting a Sourceto a Sinkcan be done with the tomethod. It returns a so called RunnableFlow, which is as we will later see a special form of a Flow- a stream that can be executed by just calling its run()method.

可以使用该方法Source将 a连接到 a 。它返回一个所谓的,这是我们稍后将看到的一种特殊形式的 a - 可以通过调用其方法来执行的流。SinktoRunnableFlowFlowrun()

Runnable Flow

可运行流

Image taken from boldradius.com.

图片来自boldradius.com

It is of course possible to forward all values that arrive at a sink to an actor:

当然可以将所有到达接收器的值转发给一个actor:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Flow

流动

Data sources and sinks are great if you need a connection between Akka streams and an existing system but one can not really do anything with them. Flows are the last missing piece in the Akka Streams base abstraction. They act as a connector between different streams and can be used to transform its elements.

如果您需要 Akka 流和现有系统之间的连接,但您无法真正使用它们做任何事情,则数据源和接收器非常有用。流是 Akka Streams 基础抽象中最后一个缺失的部分。它们充当不同流之间的连接器,可用于转换其元素。

Flow

流动

Image taken from boldradius.com.

图片来自boldradius.com

If a Flowis connected to a Sourcea new Sourceis the result. Likewise, a Flowconnected to a Sinkcreates a new Sink. And a Flowconnected with both a Sourceand a Sinkresults in a RunnableFlow. Therefore, they sit between the input and the output channel but by themselves do not correspond to one of the flavors as long as they are not connected to either a Sourceor a Sink.

如果一个Flow连接到Source一个新Source的结果。同样,Flow连接到 aSink创建一个新的Sink. 并且 aFlow与 aSource和 a都连接Sink导致 a RunnableFlow。因此,它们位于输入和输出通道之间,但只要它们未连接到 aSource或 a ,它们本身就不对应于其中一种口味Sink

Full Stream

全流

Image taken from boldradius.com.

图片来自boldradius.com

In order to get a better understanding of Flows, we will have a look at some examples:

为了更好地理解Flows,我们将看一些例子:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

Via the viamethod we can connect a Sourcewith a Flow. We need to specify the input type because the compiler can't infer it for us. As we can already see in this simple example, the flows invertand doubleare completely independent from any data producers and consumers. They only transform the data and forward it to the output channel. This means that we can reuse a flow among multiple streams:

通过该via方法,我们可以将 aSource与 a连接起来Flow。我们需要指定输入类型,因为编译器无法为我们推断它。正如我们在这个简单的例子已经看到,流动invertdouble从任何数据生产者和消费者完全独立的。他们只转换数据并将其转发到输出通道。这意味着我们可以在多个流之间重用一个流:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1and s2represent completely new streams - they do not share any data through their building blocks.

s1s2代表全新的流——它们不通过它们的构建块共享任何数据。

Unbounded Data Streams

无界数据流

Before we move on we should first revisit some of the key aspects of Reactive Streams. An unbounded number of elements can arrive at any point and can put a stream in different states. Beside from a runnable stream, which is the usual state, a stream may get stopped either through an error or through a signal that denotes that no further data will arrive. A stream can be modeled in a graphical way by marking events on a timeline as it is the case here:

在我们继续之前,我们应该首先回顾一下 Reactive Streams 的一些关键方面。无限数量的元素可以到达任何点,并且可以将流置于不同的状态。除了可运行的流(这是通常的状态)之外,流可能会因错误或表示不再有数据到达的信号而停止。流可以通过在时间线上标记事件以图形方式建模,就像这里的情况:

Shows that a stream is a sequence of ongoing events ordered in time

表明流是按时间排序的一系列正在进行的事件

Image taken from The introduction to Reactive Programming you've been missing.

图片取自The Introduction to Reactive Programming you've been missing

We have already seen runnable flows in the examples of the previous section. We get a RunnableGraphwhenever a stream can actually be materialized, which means that a Sinkis connected to a Source. So far we always materialized to the value Unit, which can be seen in the types:

我们已经在上一节的示例中看到了可运行的流。RunnableGraph每当流实际上可以被物化时,我们就会得到 a ,这意味着 aSink连接到 a Source。到目前为止,我们总是物化到 value Unit,这可以在类型中看到:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

For Sourceand Sinkthe second type parameter and for Flowthe third type parameter denote the materialized value. Throughout this answer, the full meaning of materialization shall not be explained. However, further details about materialization can be found at the official documentation. For now the only thing we need to know is that the materialized value is what we get when we run a stream. Since we were only interested in side effects so far, we got Unitas the materialized value. The exception to this was a materialization of a sink, which resulted in a Future. It gave us back a Future, since this value can denote when the stream that is connected to the sink has been ended. So far, the previous code examples were nice to explain the concept but they were also boring because we only dealt with finite streams or with very simple infinite ones. To make it more interesting, in the following a full asynchronous and unbounded stream shall be explained.

ForSourceSink第二个类型参数和Flow第三个类型参数表示物化值。在整个答案中,不应解释具体化的全部含义。但是,可以在官方文档中找到有关具体化的更多详细信息。现在我们唯一需要知道的是物化值是我们运行流时得到的值。由于到目前为止我们只对副作用感兴趣,所以我们得到Unit了物化值。对此的例外是接收器的具体化,这导致Future. 它给了我们一个Future,因为这个值可以表示连接到接收器的流何时结束。到目前为止,前面的代码示例很好地解释了这个概念,但它们也很无聊,因为我们只处理有限流或非常简单的无限流。为了使它更有趣,下面将解释一个完整的异步和无界流。

ClickStream Example

点击流示例

As an example, we want to have a stream that captures click events. To make it more challenging, let's say we also want to group click events that happen in a short time after each other. This way we could easily discover double, triple or tenfold clicks. Furthermore, we want to filter out all single clicks. Take a deep breath and imagine how you would solve that problem in an imperative manner. I bet no one would be able to implement a solution that works correctly on the first try. In a reactive fashion this problem is trivial to solve. In fact, the solution is so simple and straightforward to implement that we can even express it in a diagram that directly describes the behavior of the code:

例如,我们想要一个捕获点击事件的流。为了让它更具挑战性,假设我们还想将短时间内发生的点击事件依次分组。通过这种方式,我们可以轻松发现双倍、三倍或十倍的点击。此外,我们要过滤掉所有的单击。深吸一口气,想象一下你将如何以命令式的方式解决这个问题。我敢打赌,没有人能够实现在第一次尝试时就可以正常工作的解决方案。以一种被动的方式,这个问题很容易解决。事实上,该解决方案实现起来非常简单直接,我们甚至可以用一个图表来表达它,直接描述代码的行为:

The logic of the click stream example

点击流示例的逻辑

Image taken from The introduction to Reactive Programming you've been missing.

图片取自The Introduction to Reactive Programming you've been missing

The gray boxes are functions that describe how one stream is transformed into another. With the throttlefunction we accumulate clicks within 250 milliseconds, the mapand filterfunctions should be self-explanatory. The color orbs represent an event and the arrows depict how they flow through our functions. Later in the processing steps, we get less and less elements that flow through our stream, since we group them together and filter them out. The code for this image would look something like this:

灰色框是描述一个流如何转换为另一个流的函数。使用throttle我们在 250 毫秒内累积点击次数的函数,mapfilter函数应该是不言自明的。颜色球体代表一个事件,箭头描绘它们如何流经我们的功能。在后面的处理步骤中,流过我们的流的元素越来越少,因为我们将它们组合在一起并过滤掉它们。该图像的代码如下所示:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

The whole logic can be represented in only four lines of code! In Scala, we could write it even shorter:

整个逻辑可以用四行代码来表示!在 Scala 中,我们可以写得更短:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

The definition of clickStreamis a little bit more complex but this is only the case because the example program runs on the JVM, where capturing of click events is not easily possible. Another complication is that Akka by default doesn't provide the throttlefunction. Instead we had to write it by ourselves. Since this function is (as it is the case for the mapor filterfunctions) reusable across different use cases I don't count these lines to the number of lines we needed to implement the logic. In imperative languages however, it is normal that logic can't be reused that easily and that the different logical steps happen all at one place instead of being applied sequentially, which means that we probably would have misshaped our code with the throttling logic. The full code example is available as a gistand shall not be discussed here any further.

的定义clickStream稍微复杂一点,但这只是因为示例程序在 JVM 上运行,在 JVM 上捕获单击事件并不容易。另一个复杂因素是 Akka 默认不提供该throttle功能。相反,我们必须自己编写。由于此函数(就像maporfilter函数的情况)可在不同用例中重用,因此我不将这些行数计入实现逻辑所需的行数。然而,在命令式语言中,逻辑不能那么容易地重用并且不同的逻辑步骤都发生在一个地方而不是按顺序应用是正常的,这意味着我们可能会用节流逻辑错误地塑造我们的代码。完整的代码示例可作为主旨,这里不再赘述。

SimpleWebServer Example

SimpleWebServer 示例

What should be discussed instead is another example. While the click stream is a nice example to let Akka Streams handle a real world example, it lacks the power to show parallel execution in action. The next example shall represent a small web server that can handle multiple requests in parallel. The web sever shall be able to accept incoming connections and receive byte sequences from them that represent printable ASCII signs. These byte sequences or strings should be split at all newline-characters into smaller parts. After that, the server shall respond to the client with each of the split lines. Alternatively, it could do something else with the lines and give a special answer token, but we want to keep it simple in this example and therefore don't introduce any fancy features. Remember, the server needs to be able to handle multiple requests at the same time, which basically means that no request is allowed to block any other request from further execution. Solving all of these requirements can be hard in an imperative way - with Akka Streams however, we shouldn't need more than a few lines to solve any of these. First, let's have an overview over the server itself:

应该讨论的是另一个例子。虽然点击流是让 Akka Streams 处理真实世界示例的一个很好的例子,但它缺乏显示并行执行的能力。下一个示例将代表一个可以并行处理多个请求的小型 Web 服务器。Web 服务器应能够接受传入连接并从它们接收代表可打印 ASCII 符号的字节序列。这些字节序列或字符串应在所有换行符处拆分为更小的部分。之后,服务器应以每条分割线响应客户端。或者,它可以对这些行做一些其他的事情并给出一个特殊的答案标记,但我们希望在这个例子中保持简单,因此不引入任何花哨的功能。记住,服务器需要能够同时处理多个请求,这基本上意味着不允许任何请求阻止任何其他请求进一步执行。以命令式的方式解决所有这些要求可能很困难——但是,使用 Akka Streams,我们不应该只需要几行代码来解决任何这些问题。首先,让我们概述一下服务器本身:

server

服务器

Basically, there are only three main building blocks. The first one needs to accept incoming connections. The second one needs to handle incoming requests and the third one needs to send a response. Implementing all of these three building blocks is only a little bit more complicated than implementing the click stream:

基本上,只有三个主要构建块。第一个需要接受传入的连接。第二个需要处理传入的请求,第三个需要发送响应。实现所有这三个构建块只比实现点击流稍微复杂一点:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

The function mkServertakes (besides from the address and the port of the server) also an actor system and a materializer as implicit parameters. The control flow of the server is represented by binding, which takes a source of incoming connections and forwards them to a sink of incoming connections. Inside of connectionHandler, which is our sink, we handle every connection by the flow serverLogic, which will be described later. bindingreturns a Future, which completes when the server has been started or the start failed, which could be the case when the port is already taken by another process. The code however, doesn't completely reflect the graphic as we can't see a building block that handles responses. The reason for this is that the connection already provides this logic by itself. It is a bidirectional flow and not just a unidirectional one as the flows we have seen in the previous examples. As it was the case for materialization, such complex flows shall not be explained here. The official documentationhas plenty of material to cover more complex flow graphs. For now it is enough to know that Tcp.IncomingConnectionrepresents a connection that knows how to receive requests and how to send responses. The part that is still missing is the serverLogicbuilding block. It can look like this:

该函数mkServer采用(除了服务器的地址和端口)还有一个actor系统和一个物化器作为隐式参数。服务器的控制流由 表示binding,它获取传入连接的源并将它们转发到传入连接的接收器。里面connectionHandler,也就是我们的sink,我们通过flow处理每一个连接serverLogic,后面会讲到。binding返回一个Future,当服务器启动或启动失败时完成,这可能是端口已被另一个进程占用的情况。然而,代码并没有完全反映图形,因为我们看不到处理响应的构建块。这样做的原因是连接本身已经提供了这个逻辑。它是双向流,而不仅仅是我们在前面示例中看到的单向流。由于是物化的情况,这种复杂的流程不在这里解释。该官方文档有足够的材料来覆盖更复杂的流图。现在,知道Tcp.IncomingConnection代表一个知道如何接收请求和如何发送响应的连接就足够了 。仍然缺少的部分是serverLogic积木。它看起来像这样:

server logic

服务器逻辑

Once again, we are able to split the logic in several simple building blocks that all together form the flow of our program. First we want to split our sequence of bytes in lines, which we have to do whenever we find a newline character. After that, the bytes of each line need to be converted to a string because working with raw bytes is cumbersome. Overall we could receive a binary stream of a complicated protocol, which would make working with the incoming raw data extremely challenging. Once we have a readable string, we can create an answer. For simplicity reasons the answer can be anything in our case. In the end, we have to convert back our answer to a sequence of bytes that can be sent over the wire. The code for the entire logic may look like this:

再一次,我们能够将逻辑拆分为几个简单的构建块,这些构建块一起构成了我们的程序流。首先,我们想将我们的字节序列分成几行,每当我们找到换行符时,我们就必须这样做。之后,每一行的字节都需要转换为字符串,因为处理原始字节很麻烦。总的来说,我们可以接收一个复杂协议的二进制流,这将使处理传入的原始数据极具挑战性。一旦我们有了一个可读的字符串,我们就可以创建一个答案。为简单起见,在我们的案例中,答案可以是任何东西。最后,我们必须将我们的答案转换回可以通过网络发送的字节序列。整个逻辑的代码可能如下所示:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

We already know that serverLogicis a flow that takes a ByteStringand has to produce a ByteString. With delimiterwe can split a ByteStringin smaller parts - in our case it needs to happen whenever a newline character occurs. receiveris the flow that takes all of the split byte sequences and converts them to a string. This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough. responderis the last component and is responsible for creating an answer and converting the answer back to a sequence of bytes. As opposed to the graphic we didn't split this last component in two, since the logic is trivial. At the end, we connect all of the flows through the viafunction. At this point one may ask whether we took care of the multi-user property that was mentioned at the beginning. And indeed we did even though it may not be obvious immediately. By looking at this graphic it should get more clear:

我们已经知道这serverLogic是一个需要 aByteString并且必须产生 a 的流ByteString。随着delimiter我们可以拆分ByteString较小的部分-在我们的情况下,它需要每当一个换行符时发生。receiver是获取所有拆分字节序列并将它们转换为字符串的流程。这当然是一个危险的转换,因为只有可打印的 ASCII 字符才应该转换为字符串,但对于我们的需要,它已经足够了。responder是最后一个组件,负责创建答案并将答案转换回字节序列。与图形相反,我们没有将最后一个组件一分为二,因为逻辑很简单。最后,我们将所有流连接到via功能。说到这里,大家可能会问,开头提到的多用户属性我们有没有处理好。确实我们做到了,尽管它可能不会立即显现出来。通过查看这张图,应该会更清楚:

server and server logic combined

服务器和服务器逻辑结合

The serverLogiccomponent is nothing but a flow that contains smaller flows. This component takes an input, which is a request, and produces an output, which is the response. Since flows can be constructed multiple times and they all work independently to each other, we achieve through this nesting our multi-user property. Every request is handled within its own request and therefore a short running request can overrun a previously started long running request. In case you wondered, the definition of serverLogicthat was shown previously can of course be written a lot shorter by inlining most of its inner definitions:

serverLogic组件只不过是一个包含较小流的流。该组件接受一个输入,即请求,并产生一个输出,即响应。由于流可以被多次构造并且它们都彼此独立地工作,我们通过这种嵌套来实现我们的多用户属性。每个请求都在它自己的请求中处理,因此一个短期运行的请求可能会超过之前启动的长时间运行的请求。如果您想知道,serverLogic前面显示的定义当然可以通过内联其大部分内部定义来写得更短:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

A test of the web server may look like this:

Web 服务器的测试可能如下所示:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

In order for the above code example to function correctly, we first need to start the server, which is depicted by the startServerscript:

为了让上面的代码示例正常运行,我们首先需要启动服务器,startServer脚本描述了这一点:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

The full code example of this simple TCP server can be found here. We are not only able to write a server with Akka Streams but also the client. It may look like this:

可以在此处找到此简单 TCP 服务器的完整代码示例。我们不仅可以使用 Akka Streams 编写服务器,还可以编写客户端。它可能看起来像这样:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ? StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

The full code TCP client can be found here. The code looks quite similar but in contrast to the server we don't have to manage the incoming connections anymore.

可以在此处找到完整的 TCP 客户端代码。代码看起来非常相似,但与服务器相比,我们不再需要管理传入的连接。

Complex Graphs

复杂图

In the previous sections we have seen how we can construct simple programs out of flows. However, in reality it is often not enough to just rely on already built-in functions to construct more complex streams. If we want to be able to use Akka Streams for arbitrary programs we need to know how to build our own custom control structures and combinable flows that allow us to tackle the complexity of our applications. The good news is that Akka Streams was designed to scale with the needs of the users and in order to give you a short introduction into the more complex parts of Akka Streams, we add some more features to our client/server example.

在前面的部分中,我们已经看到了如何从流中构建简单的程序。然而,实际上仅仅依靠已经内置的函数来构造更复杂的流通常是不够的。如果我们希望能够将 Akka Streams 用于任意程序,我们需要知道如何构建我们自己的自定义控制结构和可组合的流,以解决应用程序的复杂性。好消息是 Akka Streams 旨在根据用户的需求进行扩展,为了向您简要介绍 Akka Streams 更复杂的部分,我们在客户端/服务器示例中添加了更多功能。

One thing we can't do yet is closing a connection. At this point it starts to get a little bit more complicated because the stream API we have seen so far doesn't allow us to stop a stream at an arbitrary point. However, there is the GraphStageabstraction, which can be used to create arbitrary graph processing stages with any number of input or output ports. Let's first have a look at the server side, where we introduce a new component, called closeConnection:

我们还不能做的一件事是关闭连接。在这一点上,它开始变得有点复杂,因为我们目前看到的流 API 不允许我们在任意点停止流。但是,存在GraphStage抽象,可用于创建具有任意数量输入或输出端口的任意图形处理阶段。让我们先来看看服务器端,在那里我们引入了一个新组件,称为closeConnection

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ?
          push(out, "BYE")
          completeStage()
        case msg ?
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

This API looks a lot more cumbersome than the flow API. No wonder, we have to do a lot of imperative steps here. In exchange, we have more control over the behavior of our streams. In the above example, we only specify one input and one output port and make them available to the system by overriding the shapevalue. Furthermore we defined a so called InHandlerand a OutHandler, which are in this order responsible for receiving and emitting elements. If you looked closely to the full click stream example you should recognize these components already. In the InHandlerwe grab an element and if it is a string with a single character 'q', we want to close the stream. In order to give the client a chance to find out that the stream will get closed soon, we emit the string "BYE"and then we immediately close the stage afterwards. The closeConnectioncomponent can be combined with a stream via the viamethod, which was introduced in the section about flows.

这个 API 看起来比 flow API 麻烦很多。难怪,我们必须在这里做很多必要的步骤。作为交换,我们可以更好地控制流的行为。在上面的例子中,我们只指定了一个输入和一个输出端口,并通过覆盖shape值使它们对系统可用。此外,我们定义了一个所谓的InHandler和 a OutHandler,它们按此顺序负责接收和发射元素。如果您仔细查看完整的点击流示例,您应该已经认出了这些组件。在InHandler我们抓取一个元素,如果它是一个带有单个字符的字符串'q',我们想要关闭流。为了让客户端有机会发现流将很快关闭,我们发出字符串"BYE"然后我们立即关闭舞台。closeConnection可以通过在via关于流的部分中介绍的方法将组件与流组合。

Beside from being able to close connections, it would also be nice if we could show a welcome message to a newly created connection. In order to do this we once again have to go a little bit further:

除了能够关闭连接之外,如果我们可以向新创建的连接显示欢迎消息,那也很好。为了做到这一点,我们再次需要更进一步:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ?
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

The function serverLogicnow takes the incoming connection as a parameter. Inside of its body we use a DSL that allows us to describe complex stream behavior. With welcomewe create a stream that can only emit one element - the welcome message. logicis what was described as serverLogicin the previous section. The only notable difference is that we added closeConnectionto it. Now actually comes the interesting part of the DSL. The GraphDSL.createfunction makes a builder bavailable, which is used to express the stream as a graph. With the ~>function it is possible to connect input and output ports with each other. The Concatcomponent that is used in the example can concatenate elements and is here used to prepend the welcome message in front of the other elements that come out of internalLogic. In the last line, we only make the input port of the server logic and the output port of the concatenated stream available because all the other ports shall remain an implementation detail of the serverLogiccomponent. For an in-depth introduction to the graph DSL of Akka Streams, visit the corresponding section in the official documentation. The full code example of the complex TCP server and of a client that can communicate with it can be found here. Whenever you open a new connection from the client you should see a welcoming message and by typing "q"on the client you should see a message that tells you that the connection has been canceled.

该函数serverLogic现在将传入连接作为参数。在其主体内部,我们使用 DSL 来描述复杂的流行为。随着welcome我们创建了一个流只能发出一个元素-欢迎消息。logicserverLogic上一节中描述的内容。唯一显着的区别是我们添加closeConnection了它。现在实际上是 DSL 的有趣部分。该GraphDSL.create函数使构建器b可用,用于将流表示为图形。使用该~>功能可以将输入和输出端口相互连接。Concat示例中使用的组件可以连接元素,这里用于在其他元素之前添加欢迎消息internalLogic. 在最后一行,我们只使服务器逻辑的输入端口和连接流的输出端口可用,因为所有其他端口都将保留serverLogic组件的实现细节。Akka Streams的Graph DSL深入介绍,请访问官方文档中的相应部分。可以在此处找到复杂 TCP 服务器和可以与其通信的客户端的完整代码示例。每当您从客户端打开新连接时,您应该会看到一条欢迎消息,并且"q"在客户端上键入时,您应该会看到一条消息,告诉您连接已被取消。

There are still some topics which weren't covered by this answer. Especially materialization may scare one reader or another but I'm sure with the material that is covered here everyone should be able to go the next steps by themselves. As already said, the official documentationis a good place to continue learning about Akka Streams.

仍有一些主题未包含在此答案中。特别是具体化可能会吓到一个或另一个读者,但我相信这里涵盖的材料每个人都应该能够自己进行下一步。如前所述,官方文档是继续学习 Akka Streams 的好地方。