Java Kafka - 使用高级消费者的延迟队列实现
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31775003/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Kafka - Delayed Queue implementation using high level consumer
提问by Nimrod007
Want to implement a delayed consumer using the high level consumer api
想要使用高级消费者 api 实现延迟消费者
main idea:
大意:
- produce messages by key (each msg contains creation timestamp) this makes sure that each partition has ordered messages by produced time.
- auto.commit.enable=false (will explicitly commit after each message process)
- consume a message
- check message timestamp and check if enough time has passed
- process message (this operation will never fail)
commit 1 offset
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }
- 按键生成消息(每个 msg 包含创建时间戳)这确保每个分区都按生成时间对消息进行排序。
- auto.commit.enable=false(将在每个消息处理后显式提交)
- 消费一条消息
- 检查消息时间戳并检查是否已经过去了足够的时间
- 处理消息(此操作永远不会失败)
提交 1 个偏移量
while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg }
some concerns about this implementation:
关于这个实现的一些担忧:
- commit each offset might slow ZK down
- can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)
- problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout ?)
- how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)
- any other problems im missing?
- 提交每个偏移量可能会减慢 ZK
- consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
- 问题等待很长时间而不提交偏移量,例如延迟时间为 24 小时,将从迭代器获取下一个,睡眠 24 小时,处理并提交(ZK 会话超时?)
- ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)
- 我还缺少其他任何问题吗?
Thanks!
谢谢!
采纳答案by Emil H
One way to go about this would be to use a different topic where you push all messages that are to be delayed. If all delayed messages should be processed after the same time delay this will be fairly straight forward:
解决此问题的一种方法是使用不同的主题,您可以在其中推送所有要延迟的消息。如果所有延迟的消息都应该在相同的时间延迟后处理,这将是相当简单的:
while(it.hasNext()) {
val message = it.next().message()
if(shouldBeDelayed(message)) {
val delay = 24 hours
val delayTo = getCurrentTime() + delay
putMessageOnDelayedQueue(message, delay, delayTo)
}
else {
process(message)
}
consumer.commitOffset()
}
All regular messages will now be processed as soon as possible while those that needs a delay gets put on another topic.
现在将尽快处理所有常规消息,而那些需要延迟的消息将放在另一个主题上。
The nice thing is that we know that the message at the head of the delayed topic is the one that should be processed first since its delayTo value will be the smallest. Therefore we can set up another consumer that reads the head message, checks if the timestamp is in the past and if so processes the message and commits the offset. If not it does not commit the offset and instead just sleeps until that time:
好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个消费者来读取头消息,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而是一直睡到那个时候:
while(it.hasNext()) {
val delayedMessage = it.peek().message()
if(delayedMessage.delayTo < getCurrentTime()) {
val readMessage = it.next().message
process(readMessage.originalMessage)
consumer.commitOffset()
} else {
delayProcessingUntil(delayedMessage.delayTo)
}
}
In case there are diffrent delay times you could partition the topic on the delay (e.g. 24 hours, 12 hours, 6 hours). If the delay time is more dynamic than that it becomes a bit more complex. You could solve it by introducing having two delay topics. Read all messages off delay topic A
and process all the messages whose delayTo
value are in the past. Among the others you just find the one with the closest delayTo
and then put them on topic B
. Sleep until the closest one should be processed and do it all in reverse, i.e. process messages from topic B
and put the once that shouldn't yet be proccessed back on topic A
.
如果有不同的延迟时间,您可以在延迟上划分主题(例如 24 小时、12 小时、6 小时)。如果延迟时间比这更动态,它会变得更复杂一些。您可以通过引入两个延迟主题来解决它。从延迟主题中读取所有消息A
并处理所有delayTo
值为过去的消息。在其他人中,您只需找到最接近的一个,delayTo
然后将它们放在主题上B
。睡眠直到最接近的一个应该被处理并反向执行所有操作,即处理来自 topic 的消息B
并将一次不应该被处理的消息放在 topic 上A
。
To answer your specific questions (some have been addressed in the comments to your question)
回答您的具体问题(有些问题已在您问题的评论中解决)
- commit each offset might slow ZK down
- 提交每个偏移量可能会减慢 ZK
You could consider switching to storing the offset in Kafka (a feature available from 0.8.2, check out offsets.storage
property in consumer config)
您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始提供的功能,请查看offsets.storage
消费者配置中的属性)
- can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)
- consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
I believe it can if it is not able to communicate with the offset storage for instance. Using idempotent messages solves this problem thou, as you say.
例如,如果它无法与偏移存储通信,我相信它可以。正如你所说,使用幂等消息可以解决这个问题。
- problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout ?)
- 问题等待很长时间而不提交偏移量,例如延迟时间为 24 小时,将从迭代器获取下一个,睡眠 24 小时,处理并提交(ZK 会话超时?)
This won't be a problem with the above outlined solution unless the processing of the message itself takes more than the session timeout.
除非消息本身的处理时间超过会话超时,否则这不会成为上述解决方案的问题。
- how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)
- ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)
Again with the above you shouldn't need to set a long session timeout.
再次使用上述内容,您不需要设置长时间的会话超时。
- any other problems im missing?
- 我还缺少其他任何问题吗?
There always are ;)
总是有;)
回答by nucatus
I would suggest another route in your cases.
在你的情况下,我会建议另一条路线。
It doesn't make sense to address the waiting time in the main thread of the consumer. This will be an anti-pattern in how the queues are used. Conceptually, you need to process the messages as fastest as possible and keep the queue at a low loading factor.
在消费者的主线程中解决等待时间是没有意义的。这将是如何使用队列的反模式。从概念上讲,您需要尽可能快地处理消息并将队列保持在较低的加载因子。
Instead, I would use a scheduler that will schedule jobs for each message you are need to delay. This way you can process the queue and create asynchronous jobs that will be triggered at predefined points in time.
相反,我会使用一个调度程序来为您需要延迟的每条消息安排作业。通过这种方式,您可以处理队列并创建将在预定义时间点触发的异步作业。
The downfall of using this technique is that it is sensible to the status of the JVM that holds the scheduled jobs in memory. If that JVM fails, you loose the scheduled jobs and you don't know if the task was or was not executed.
使用这种技术的缺点是它对在内存中保存预定作业的 JVM 的状态很敏感。如果该 JVM 发生故障,您将失去计划的作业,并且您不知道该任务是否已执行。
There are scheduler implementations, though that can be configured to run in a cluster environment, thus keeping you safe from JVM crashes.
有调度程序实现,但可以配置为在集群环境中运行,从而使您免受 JVM 崩溃的影响。
Take a look at this java scheduling framework: http://www.quartz-scheduler.org/
看看这个java调度框架:http: //www.quartz-scheduler.org/
回答by Dhyan
Use Tibco EMS or other JMS Queue's. They have retry delay built in . Kafka may not be the right design choice for what you are doing
使用 Tibco EMS 或其他 JMS 队列。他们内置了重试延迟。Kafka 可能不是您正在做的事情的正确设计选择
回答by softwarevamp
Keyed-list on schedule or its redis alternative may be best approaches.
按计划使用键控列表或其 redis 替代方案可能是最好的方法。