java 卡夫卡消费者。commitSync 与 commitAsync

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/46546174/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 09:16:04  来源:igfitidea点击:

Kafka-consumer. commitSync vs commitAsync

javaapache-kafkaoffsetkafka-consumer-api

提问by gstackoverflow

The quote from https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

引自https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry.

缺点是虽然 commitSync() 会重试提交直到它成功或遇到不可重试的失败,但 commitAsync() 不会重试。

This phrase is not clear to me. I suppose that consumer sends commit request to broker and in case if the broker doesn't respond within some timeout it means that the commit failed. Am I wrong?

这句话我不太清楚。我想消费者向代理发送提交请求,如果代理在某个超时内没有响应,则意味着提交失败。我错了吗?

Can you clarify the difference of commitSyncand commitAsyncin details?
Also, please provide use cases when which commit type should I prefer.

你能澄清的差异commitSync,并commitAsync在细节?
另外,请提供我应该更喜欢哪种提交类型的用例。

回答by fluency03

As it is said in the API documentation:

正如 API 文档中所说:



This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).

这是一个同步提交,将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它会被抛出给调用者)。

That means, the commitSyncis a blocking method. Calling it will block your thread until it either succeeds or fails.

这意味着,这commitSync是一种阻塞方法。调用它会阻塞你的线程,直到它成功或失败。

For example,

例如,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

For each iteration in the for-loop, only after consumer.commitSync()successfully returns or interrupted with exception thrown, your code will move to the next iteration.

对于 for 循环中的每次迭代,只有在consumer.commitSync()成功返回或被抛出异常中断后,您的代码才会移动到下一次迭代。



This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.

这是一个异步调用,不会阻塞。遇到的任何错误都会传递给回调(如果提供)或丢弃。

That means, the commitAsyncis a non-blocking method. Calling it will not block your thread. Instead, it will continue processing the following instructions, no matter whether it will succeed or fail eventually.

这意味着,commitAsync是一种非阻塞方法。调用它不会阻塞你的线程。相反,它会继续处理下面的指令,不管它最终是成功还是失败。

For example, similar to previous example, but here we use commitAsync:

例如,类似于前面的例子,但这里我们使用commitAsync

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

For each iteration in the for-loop, no matter what will happen to consumer.commitAsync()eventually, your code will move to the next iteration. And, the result of the commit is going to be handled by the callback function you defined.

对于 for 循环中的每次迭代,无论consumer.commitAsync()最终会发生什么,您的代码都会移动到下一次迭代。并且,提交的结果将由您定义的回调函数处理。



Trade-offs: latency vs. data consistency

权衡:延迟与数据一致性

  • If you have to ensure the data consistency, choose commitSync()because it will make sure that, before doing any further actions, you will know whether the offset commit is successful or failed. But because it is sync and blocking, you will spend more time on waiting for the commit to be finished, which leads to high latency.
  • If you are ok of certain data inconsistency and want to have low latency, choose commitAsync()because it will not wait to be finished. Instead, it will just send out the commit request and handle the response from Kafka (success or failure) later, and meanwhile, your code will continue executing.
  • 如果必须保证数据的一致性,请选择,commitSync()因为它可以确保在执行任何进一步操作之前,您将知道偏移量提交是成功还是失败。但是因为它是同步和阻塞的,你会花更多的时间等待提交完成,从而导致高延迟。
  • 如果您可以接受某些数据不一致并希望具有低延迟,请选择,commitAsync()因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时,您的代码将继续执行。

This is all generally speaking, the actually behaviour will depend on your actual code and where you are calling the method.

一般而言,实际行为将取决于您的实际代码以及您调用该方法的位置。

回答by Charls Joseph

Both commitSync and commitAsync uses kafka offset management feature and both has demerits. If the message processing succeeds and commit offset failed(not atomic) and at same time partition re balancing happens, your processed message gets processed again(duplicate processing) by some other consumer. If you are okay with duplicate message processing, then you can go for commitAsync(because it doesn't block and provide low latency, and it provides a higher order commit. so you should be okay). Otherwise go for a custom offset management that takes care of atomicity while processing and updating the offset(use an external offset storage)

commitSync 和 commitAsync 都使用了 kafka 偏移管理功能,都各有千秋。如果消息处理成功并且提交偏移失败(不是原子的)并且同时发生分区重新平衡,则您处理的消息会被其他一些消费者再次处理(重复处理)。如果您对重复消息处理没问题,那么您可以选择 commitAsync(因为它不会阻塞并提供低延迟,并且它提供更高阶的提交。所以您应该没问题)。否则,请使用自定义偏移管理,在处理和更新偏移时处理原子性(使用外部偏移存储)

回答by Moha

commitAync will not retry because if it retries it will make a mess. imagine that you are are trying to commit offset 20 (async), and it did not commit (failed), and then the next poll block tries to commit the offset 40 (async), and it succeeded, now commit offset 20 is still waiting to commit, if it reties and succeed it will make a mess. the mess is that the committed offset should be 40 not 20.

commitAync 不会重试,因为如果重试它会弄得一团糟。假设您正在尝试提交偏移量 20(异步),但没有提交(失败),然后下一个轮询块尝试提交偏移量 40(异步),并且成功了,现在提交偏移量 20 仍在等待承诺,如果它重新启动并成功,它会变得一团糟。麻烦的是提交的偏移量应该是 40 而不是 20。