Java 什么决定了Kafka消费者偏移量?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32390265/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
What determines Kafka consumer offset?
提问by Asif Iqbal
I am relatively new to Kafka. I have done a bit of experimenting with it, but a few things are unclear to me regarding consumer offset. From what I have understood so far, when a consumer starts, the offset it will start reading from is determined by the configuration setting auto.offset.reset
(correct me if I am wrong).
我对卡夫卡比较陌生。我已经对它进行了一些试验,但是关于消费者抵消,我还不清楚一些事情。据我目前所了解,当消费者启动时,它将开始读取的偏移量由配置设置决定auto.offset.reset
(如果我错了,请纠正我)。
Now say for example that there are 10 messages (offsets 0 to 9) in the topic, and a consumer happened to consume 5 of them before it went down (or before I killed the consumer). Then say I restart that consumer process. My questions are:
现在假设主题中有 10 条消息(偏移量 0 到 9),并且消费者在它关闭之前(或在我杀死消费者之前)碰巧消耗了其中的 5 条消息。然后说我重新启动该消费者进程。我的问题是:
If the
auto.offset.reset
is set tosmallest
, is it always going to start consuming from offset 0 ?If the
auto.offset.reset
is set tolargest
, is it going to start consuming from offset 5 ?Is the behaviour regarding this kind of scenario always deterministic ?
如果
auto.offset.reset
设置为smallest
,它是否总是从偏移量 0 开始消耗?如果
auto.offset.reset
设置为largest
,它会从偏移量 5 开始消耗吗?关于这种场景的行为总是确定性的吗?
Please don't hesitate to comment if anything in my question is unclear. Thanks in advance.
如果我的问题中有任何不清楚的地方,请随时发表评论。提前致谢。
采纳答案by serejja
It is a bit more complex than you described.
The auto.offset.reset
config kicks in ONLY if your consumer group does not have a valid offset committed somewhere (2 supported offset storages now are Kafka and Zookeeper), and it also depends on what sort of consumer you use.
它比您描述的要复杂一些。仅当您的消费者组在某处没有提交有效的偏移量时
,auto.offset.reset
配置才会启动(现在支持的 2 个偏移存储是 Kafka 和 Zookeeper),并且还取决于您使用的消费者类型。
If you use a high-level java consumer then imagine following scenarios:
如果您使用高级 Java 消费者,请想象以下场景:
You have a consumer in a consumer group
group1
that has consumed 5 messages and died. Next time you start this consumer it won't even use thatauto.offset.reset
config and will continue from the place it died because it will just fetch the stored offset from the offset storage (Kafka or ZK as I mentioned).You have messages in a topic (like you described) and you start a consumer in a new consumer group
group2
. There is no offset stored anywhere and this time theauto.offset.reset
config will decide whether to start from the beginning of the topic (earliest
) or from the end of the topic (latest
)
您在消费组
group1
中有一个消费者已经消费了 5 条消息并死亡。下次你启动这个消费者时,它甚至不会使用那个auto.offset.reset
配置,而是会从它死的地方继续,因为它只会从偏移存储(如我提到的 Kafka 或 ZK)中获取存储的偏移。您在一个主题中有消息(如您所描述的),并且您在一个新的消费者组中启动了一个消费者
group2
。任何地方都没有存储偏移量,这次auto.offset.reset
配置将决定是从主题的开头 (earliest
) 还是从主题的结尾 (latest
)
One more thing that affects what offset value will correspond to earliest
and latest
configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 5 messages, and then an hour later you post 5 more messages. The latest
offset will still remain the same as in previous example but the earliest
one won't be able to be 0
because Kafka will already remove these messages and thus the earliest available offset will be 5
.
影响偏移值earliest
与latest
配置相对应的另一件事是日志保留策略。假设您有一个保留配置为 1 小时的主题。您发布了 5 条消息,然后一小时后又发布了 5 条消息。该latest
偏移将仍然相同,前面的例子,但earliest
一个不能是0
因为卡夫卡都已经删除这些消息,并因此最早可偏移会5
。
Everything mentioned above is not related to SimpleConsumer
and every time you run it, it will decide where to start from using the auto.offset.reset
config.
上面提到的所有内容都与SimpleConsumer
您无关,每次运行它时,它都会决定从哪里开始使用auto.offset.reset
配置。
If you use Kafka version older than 0.9, you have to replace earliest
, latest
with smallest
,largest
.
如果您使用的卡夫卡版本比0.9老,你要替换earliest
,latest
用smallest
,largest
。
回答by Israel Zinc
Just an update: From Kafka 0.9 and forth, Kafka is using a new Java version of the consumer and the auto.offset.reset parameter names have changed; From the manual:
只是更新:从 Kafka 0.9 开始,Kafka 使用新的 Java 版本的消费者,并且 auto.offset.reset 参数名称已更改;从手册:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer's group
anything else: throw exception to the consumer.
当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量(例如,因为该数据已被删除)时该怎么办:
最早:自动将偏移量重置为最早的偏移量
最新:自动将偏移量重置为最新的偏移量
none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
其他任何事情:向消费者抛出异常。
I spent some time to find this after checking the accepted answer, so I thought it might be useful for the community to post it.
在检查了接受的答案后,我花了一些时间找到了这个,所以我认为它可能对社区有用。
回答by Sasa Ninkovic
Further more there's offsets.retention.minutes. If time since last commit is > offsets.retention.minutes
, then auto.offset.reset
also kicks in
此外还有offsets.retention.minutes。如果自上次提交以来的时间是 > offsets.retention.minutes
,则auto.offset.reset
也开始