java 读取 Apache Kafka 中的消息偏移量
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25052146/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Reading messages offset in Apache Kafka
提问by Anand
I am very much new to Kafka
and we are using Kafka 0.8.1
.
我很新Kafka
,我们正在使用Kafka 0.8.1
.
What I need to do is to consume a message from topic. For that, I will have to write one consumer in Java which will consume a message from topic and then save that message to database. After a message is saved, some acknowledgement will be sent to Java consumer. If acknowledgement is true, then next message should be consumed from the topic. If acknowldgement is false(which means due to some error message,read from the topic, couldn't be saved into the database), then again that message should be read.
我需要做的是消费来自主题的消息。为此,我将不得不用 Java 编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库中。保存消息后,将向 Java 消费者发送一些确认。如果确认为真,则应从主题消费下一条消息。如果确认为假(这意味着由于某些错误消息,从主题中读取,无法保存到数据库中),则应再次读取该消息。
I think I need to use Simple Consumer
,to have control over message offset and have gone through the Simple Consumer example as given in this link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.
我想我需要使用 Simple Consumer
, 来控制消息偏移量,并完成了此链接https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 中给出的简单消费者示例。
In this example, offset is evaluated in run method as 'readOffset
'. Do I need to play with that? For e.g. I can use LatestTime()
instead of EarliestTime()
and in case of false, I will reset the offset to the one before using offset - 1
.
在此示例中,偏移量在 run 方法中被评估为“ readOffset
”。我需要玩那个吗?例如,我可以使用LatestTime()
代替,EarliestTime()
如果为 false,我将在使用之前将偏移量重置为一个offset - 1
。
Is this how I should proceed?
这是我应该继续的方式吗?
回答by sandris
I think you can get along with using the high level consumer (http://kafka.apache.org/documentation.html#highlevelconsumerapi), that should be easier to use than the SimpleConsumer. I don't think the consumer needs to reread messages from Kafka on database failure, as the consumer already has those messages and can resend them to the DB or do anything else it sees fit.
我认为您可以使用高级消费者(http://kafka.apache.org/documentation.html#highlevelconsumerapi),这应该比 SimpleConsumer 更容易使用。我认为消费者不需要在数据库出现故障时重新读取来自 Kafka 的消息,因为消费者已经拥有这些消息并且可以将它们重新发送到数据库或执行它认为合适的任何其他操作。
High-level consumers store the last offset read from a specific partition in Zookeeper (based on the consumer group name), so that when a consumer process dies and is later restarted (potentially on an other host), it can continue processing messages where it left off. It's possible to autosave this offset to Zookeeper periodically (see the consumer properties auto.commit.enable and auto.commit.interval.ms), or have it saved by application logic by calling ConsumerConnector.commitOffsets
. See also https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example.
高级消费者在 Zookeeper 中存储从特定分区读取的最后一个偏移量(基于消费者组名称),以便当消费者进程死亡并稍后重新启动(可能在其他主机上)时,它可以继续处理消息离开了。可以定期将此偏移量自动保存到 Zookeeper(请参阅使用者属性 auto.commit.enable 和 auto.commit.interval.ms),或者通过调用ConsumerConnector.commitOffsets
. 另请参阅https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example。
I suggest you turn auto-commit off and commit your offsets yourselves once you received DB acknowledgement. Thus, you can make sure unprocessed messages are reread from Kafka in case of consumer failure and all messages commited to Kafka will eventually reach the DB at least once (but not 'exactly once').
我建议您在收到数据库确认后关闭自动提交并自己提交偏移量。因此,您可以确保在消费者失败的情况下从 Kafka 重新读取未处理的消息,并且提交给 Kafka 的所有消息最终将至少到达数据库一次(但不是“恰好一次”)。