java 将 Kafka Stream Input 打印到控制台?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39327868/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 04:16:29  来源:igfitidea点击:

Print Kafka Stream Input out to console?

javastreamapache-kafkaapache-kafka-streams

提问by Zeliax

I've been looking through a lot of the Kafka documentation for a java application that I am working on. I've tried getting into the lambda syntax introduced in Java 8, but I am a little sketchy on that ground and don't feel too confident that it should be what I use as of yet.

我一直在查看我正在处理的 Java 应用程序的大量 Kafka 文档。我已经尝试进入 Java 8 中引入的 lambda 语法,但我对此有点粗略,并且对它应该是我目前使用的内容并不太有信心。

I've a Kafka/Zookeeper Service running without any troubles, and what I want to do is write a small example program that based on the input will write it out, but not do a wordcount as there are so many examples of already.

我有一个 Kafka/Zookeeper 服务运行没有任何问题,我想做的是编写一个小示例程序,根据输入将它写出来,但不要进行字数统计,因为已经有很多示例了。

As for sample data I will be getting a string of following structure:

至于示例数据,我将得到一串以下结构:

Example data

示例数据

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.

Question

问题

I want to be able to extract the 3 letter keywords and print them with a System.out.println. How do I get a string variable containing the input? I know how to apply regular expressions or even just searching through the string to get the keywords.

我希望能够提取 3 个字母的关键字并用System.out.println. 如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获取关键字。

Code

代码

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    final Serde<String> stringSerde = Serdes.String();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    //How do I assign the input from in-stream to the following variable?
    String variable = ?
}

I have zookeeper, kafka, producer and consumer running all hooked up to the same topic so I want to basically see the same Stringappear on all of the instances (producer, consumer and stream).

我有 zookeeper、kafka、生产者和消费者运行,它们都连接到同一个主题,所以我希望基本上看到String所有实例(生产者、消费者和流)上都出现相同的情况。

回答by Matthias J. Sax

If you use Kafka Streams, you need to apply functions/operators on your data streams. In your case, you create a KStreamobject, thus, you want to apply an operator to source.

如果您使用 Kafka Streams,则需要在数据流上应用函数/运算符。在您的情况下,您创建了一个KStream对象,因此,您希望将运算符应用于source.

Depending on what you want to do, there are operators that apply a function to each record in the stream independently (eg. map()), or other operators that apply a function to multiple record together (eg. aggregateByKey()). You should have a look into the documentation: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsland examples https://github.com/confluentinc/kafka-streams-examples

根据您要执行的操作,有些运算符可以独立地将函数应用于流中的每个记录(例如map()),或者其他运算符将函数一起应用于多个记录(例如aggregateByKey())。您应该查看文档:http: //docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl和示例https://github.com/confluentinc/kafka-流示例

Thus, you never create local variables using Kafka Streams as you show in your example above, but rather embed everything in operators/functions that get chained together.

因此,您永远不会使用 Kafka Streams 创建局部变量,如您在上面的示例中所示,而是将所有内容嵌入到链接在一起的运算符/函数中。

For example, if you want to print all input record to stdout, you could do

例如,如果要将所有输入记录打印到标准输出,则可以执行

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
    void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
 });

Thus, after you start your application via streams.start(), it will consumer the records from you input topic and for each record of your topic, a call to apply(...)is done, which prints the record on stdout.

因此,在您通过 启动应用程序后streams.start(),它将使用您输入的主题中的记录,并且对于您主题的每条记录,都会调用apply(...),从而在标准输出上打印记录。

Of course, a more native way for printing the stream to the console would be to use source.print()(which internally is basically the same as the shown foreach()operator with an already given ForeachAction.)

当然,将流打印到控制台的更本地方式是使用source.print()(在内部与显示的foreach()运算符基本相同,并且已经给定了ForeachAction.)

For your example with assigning the string to a local variable, you would need to put your code into apply(...)and do your regex-stuff etc. there to "extract the 3 letter keywords".

对于将字符串分配给局部变量的示例,您需要将代码放入apply(...)并执行正则表达式等以“提取 3 个字母的关键字”。

The best way to express this, would however be via a combination of flatMapValues()and print()(ie, source.flatMapValues(...).print()). flatMapValues()is called for each input record (in your case, I assume key will be nullso you can ignore it). Within your flatMapValuefunction, you apply your regex and for each match, you add the match to a list of values that you finally return.

然而,表达这一点的最佳方式是通过flatMapValues()print()(即,source.flatMapValues(...).print())的组合。flatMapValues()为每个输入记录调用(在您的情况下,我假设 key 将是null这样您可以忽略它)。在您的flatMapValue函数中,您应用正则表达式,并且对于每个匹配项,将匹配项添加到您最终返回的值列表中。

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        ArrayList<String> keywords = new ArrayList<String>();

        // apply regex to value and for each match add it to keywords

        return keywords;
    }
}

The output of flatMapValueswill be a KStreamagain, containing a record for each found keyword (ie, the output stream is a "union" over all lists your return in ValueMapper#apply()). Finally, you just print your result to console via print(). (Of course, you could also use a single foreachinstead of flatMapValue+printbut this would be less modular.)

的输出flatMapValuesKStream再次包含每个找到的关键字的记录(即,输出流是您返回的所有列表的“联合” ValueMapper#apply())。最后,您只需通过print(). (当然,您也可以使用单个foreach而不是flatMapValue+,print但这会减少模块化。)