java 将 Kafka Stream Input 打印到控制台?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39327868/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Print Kafka Stream Input out to console?
提问by Zeliax
I've been looking through a lot of the Kafka documentation for a java application that I am working on. I've tried getting into the lambda syntax introduced in Java 8, but I am a little sketchy on that ground and don't feel too confident that it should be what I use as of yet.
我一直在查看我正在处理的 Java 应用程序的大量 Kafka 文档。我已经尝试进入 Java 8 中引入的 lambda 语法,但我对此有点粗略,并且对它应该是我目前使用的内容并不太有信心。
I've a Kafka/Zookeeper Service running without any troubles, and what I want to do is write a small example program that based on the input will write it out, but not do a wordcount as there are so many examples of already.
我有一个 Kafka/Zookeeper 服务运行没有任何问题,我想做的是编写一个小示例程序,根据输入将它写出来,但不要进行字数统计,因为已经有很多示例了。
As for sample data I will be getting a string of following structure:
至于示例数据,我将得到一串以下结构:
Example data
示例数据
This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.
Question
问题
I want to be able to extract the 3 letter keywords and print them with a System.out.println
. How do I get a string variable containing the input? I know how to apply regular expressions or even just searching through the string to get the keywords.
我希望能够提取 3 个字母的关键字并用System.out.println
. 如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获取关键字。
Code
代码
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
//How do I assign the input from in-stream to the following variable?
String variable = ?
}
I have zookeeper, kafka, producer and consumer running all hooked up to the same topic so I want to basically see the same String
appear on all of the instances (producer, consumer and stream).
我有 zookeeper、kafka、生产者和消费者运行,它们都连接到同一个主题,所以我希望基本上看到String
所有实例(生产者、消费者和流)上都出现相同的情况。
回答by Matthias J. Sax
If you use Kafka Streams, you need to apply functions/operators on your data streams. In your case, you create a KStream
object, thus, you want to apply an operator to source
.
如果您使用 Kafka Streams,则需要在数据流上应用函数/运算符。在您的情况下,您创建了一个KStream
对象,因此,您希望将运算符应用于source
.
Depending on what you want to do, there are operators that apply a function to each record in the stream independently (eg. map()
), or other operators that apply a function to multiple record together (eg. aggregateByKey()
). You should have a look into the documentation: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsland examples https://github.com/confluentinc/kafka-streams-examples
根据您要执行的操作,有些运算符可以独立地将函数应用于流中的每个记录(例如map()
),或者其他运算符将函数一起应用于多个记录(例如aggregateByKey()
)。您应该查看文档:http: //docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl和示例https://github.com/confluentinc/kafka-流示例
Thus, you never create local variables using Kafka Streams as you show in your example above, but rather embed everything in operators/functions that get chained together.
因此,您永远不会使用 Kafka Streams 创建局部变量,如您在上面的示例中所示,而是将所有内容嵌入到链接在一起的运算符/函数中。
For example, if you want to print all input record to stdout, you could do
例如,如果要将所有输入记录打印到标准输出,则可以执行
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
Thus, after you start your application via streams.start()
, it will consumer the records from you input topic and for each record of your topic, a call to apply(...)
is done, which prints the record on stdout.
因此,在您通过 启动应用程序后streams.start()
,它将使用您输入的主题中的记录,并且对于您主题的每条记录,都会调用apply(...)
,从而在标准输出上打印记录。
Of course, a more native way for printing the stream to the console would be to use source.print()
(which internally is basically the same as the shown foreach()
operator with an already given ForeachAction
.)
当然,将流打印到控制台的更本地方式是使用source.print()
(在内部与显示的foreach()
运算符基本相同,并且已经给定了ForeachAction
.)
For your example with assigning the string to a local variable, you would need to put your code into apply(...)
and do your regex-stuff etc. there to "extract the 3 letter keywords".
对于将字符串分配给局部变量的示例,您需要将代码放入apply(...)
并执行正则表达式等以“提取 3 个字母的关键字”。
The best way to express this, would however be via a combination of flatMapValues()
and print()
(ie, source.flatMapValues(...).print()
). flatMapValues()
is called for each input record (in your case, I assume key will be null
so you can ignore it). Within your flatMapValue
function, you apply your regex and for each match, you add the match to a list of values that you finally return.
然而,表达这一点的最佳方式是通过flatMapValues()
和print()
(即,source.flatMapValues(...).print()
)的组合。flatMapValues()
为每个输入记录调用(在您的情况下,我假设 key 将是null
这样您可以忽略它)。在您的flatMapValue
函数中,您应用正则表达式,并且对于每个匹配项,将匹配项添加到您最终返回的值列表中。
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> keywords = new ArrayList<String>();
// apply regex to value and for each match add it to keywords
return keywords;
}
}
The output of flatMapValues
will be a KStream
again, containing a record for each found keyword (ie, the output stream is a "union" over all lists your return in ValueMapper#apply()
). Finally, you just print your result to console via print()
.
(Of course, you could also use a single foreach
instead of flatMapValue
+print
but this would be less modular.)
的输出flatMapValues
将KStream
再次包含每个找到的关键字的记录(即,输出流是您返回的所有列表的“联合” ValueMapper#apply()
)。最后,您只需通过print()
. (当然,您也可以使用单个foreach
而不是flatMapValue
+,print
但这会减少模块化。)