java KafkaProducer:`callback` 和返回的 `Future` 之间的区别?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43601747/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 07:35:35  来源:igfitidea点击:

KafkaProducer: Difference between `callback` and returned `Future`?

javaapache-kafkakafka-producer-api

提问by Thilo

The KafkaProducer send methodboth returns a Future and accepts a Callback.

KafkaProducer发送方法既回报和未来接受回调。

Is there any fundamental difference between using one mechanism over the other to execute an action upon completion of the sending?

使用一种机制与另一种机制在发送完成后执行操作之间有什么根本区别吗?

采纳答案by streetturtle

The asynchronous approach

异步方法

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
})

gives you better throughput comparing to synchronous

与同步相比,为您提供更好的吞吐量

RecordMetadata rm = producer.send(record).get();

since you don't wait for acknowledgements in first case.

因为在第一种情况下您不会等待确认。

Also in asynchronous way ordering is not guaranteed, whereas in synchronous it is - message is sent only after acknowledgement received.

同样,在异步方式中,不能保证排序,而在同步方式中,它是 - 消息仅在收到确认后发送。

Another difference could be that in synchronous call in case of exception you can stop sending messages straightaway after the exception occurs, whereas in second case some messages will be sent before you discover that something is wrong and perform some actions.

另一个区别可能是,在发生异常的同步调用中,您可以在异常发生后立即停止发送消息,而在第二种情况下,将在您发现问题并执行某些操作之前发送一些消息。

Also note that in asynchronous approach the number of messages which are "in fligh" is controlled by max.in.flight.requests.per.connectionparameter.

另请注意,在异步方法中,“飞行中”的消息数量由max.in.flight.requests.per.connection参数控制。

Apart from synchronous and asynchronous approaches you can use Fire and Forgetapproach, which is almost the same as synchronous, but without processing the returned metadata - just send the message and hope that it will reach the broker (knowing that most likely it will happen, and producer will retry in case of recoverable errors), but there is a chance that some messages will be lost:

除了同步和异步方法之外,您还可以使用Fire and Forget方法,这与同步几乎相同,但不处理返回的元数据 - 只需发送消息并希望它能够到达代理(知道它很可能会发生,如果出现可恢复的错误,生产者将重试),但有可能会丢失一些消息:

RecordMetadata rm = producer.send(record);

To summarize:

总结一下:

  • Fire and Forget - fastest one, but some messages could be lost;
  • Synchronous - slowest, use it if you cannot afford to lose messages;
  • Asynchronous - something in between.
  • Fire and Forget - 最快的,但可能会丢失一些消息;
  • 同步 - 最慢,如果您无法承受丢失消息,请使用它;
  • 异步 - 介于两者之间。

回答by Imus

Looking at the documentation you linked to it looks like the main difference between the Future and the Callback lies in who initiates the "request is finished, what now?" question.

查看您链接到它的文档,看起来 Future 和 Callback 之间的主要区别在于谁发起了“请求已完成,现在怎么办?” 问题。

Let's say we have a customer Cand a baker B. And Cis asking Bto make him a nice cookie. Now there are 2 possible ways the baker can return the delicious cookie to the customer.

假设我们有一个客户C和一个面包师B。并C要求B给他做一块好吃的饼干。现在,面包师可以通过两种可能的方式将美味的饼干返还给顾客。

Future

未来

The baker accepts the request and tells the customer: Ok, when I'm finished I'll place your cookie here on the counter. (This agreement is the Future.)

面包师接受了请求并告诉顾客:好的,当我完成后,我会把你的饼干放在柜台上。(本协议为Future。)

In this scenario, the customer is responsible for checking the counter (Future) to see if the baker has finished his cookie or not.

在这种情况下,客户负责检查柜台 ( Future) 以查看面包师是否已经完成了他的饼干。

blockingThe customer stays near the counter and looks at it until the cookie is put there (Future.get()) or the baker puts an apology there instead (Error : Out of cookie dough).

阻塞顾客留在柜台附近并看着它,直到饼干放在那里 (Future.get()) 或面包师在那里道歉(错误:饼干面团用完)。

non-blockingThe customer does some other work, and once in a while checks if the cookie is waiting for him on the counter (Future.isDone()). If the cookie is ready, the customer takes it (Future.get()).

non-blocking客户做一些其他的工作,偶尔检查一下 cookie 是否在柜台上等待他(Future.isDone())。如果 cookie 准备好了,客户就会拿走它 (Future.get())。

Callback

打回来

In this scenario the customer, after ordering his cookie, tells the baker: When my cookie is ready please give it to my pet robot dog here, he'll know what to do with it (This robot is the Callback).

在这种情况下,客户在订购了他的饼干后,告诉面包师:当我的饼干准备好后,请把它交给我这里的宠物机器狗,他会知道如何处理它(这个机器人是回调)。

Now the baker when the cookie is ready gives the cookie to the dog and tells him to run back to it's owner. The baker can continue baking the next cookie for another customer.

现在,当饼干准备好时,面包师将饼干交给狗,并告诉他跑回它的主人。面包师可以继续为另一位顾客烘焙下一个饼干。

The dog runs back to the customer and starts wagging it's artificial tail to make the customer aware that his cookie is ready.

狗跑回顾客身边并开始摇摆它的人造尾巴,让顾客知道他的饼干已经准备好了。

Notice how the customer didn't have any idea when the cookie would be given to him, nor was he actively polling the baker to see if it was ready.

请注意,客户不知道什么时候可以给他饼干,他也没有主动询问面包师是否准备好。

That's the main difference between the 2 scenario's. Who is responsible for initiating the "your cookie is ready, what do you want to do with it?" question. With the Future, the customer is responsible for checking when it's ready, either by actively waiting, or by polling every now and then. In case of the callback, the baker will call back to the provided function.

这是两种场景之间的主要区别。谁负责发起“您的 cookie 已准备好,您想用它做什么?” 问题。使用 Future,客户负责检查它何时准备就绪,可以通过主动等待或不时轮询。在回调的情况下,面包师将回调提供的函数。



I hope this answer gives you a better insight in what a Future and Calback actually are. Once you got the general idea, you could try to find out on which thread each specific thing is handled. When a thread is blocked, or in what order everything completes. Writing some simple programs that print statements like: "main client thread: cookie recieved" could be a fun way to experiment with this.

我希望这个答案能让您更好地了解 Future 和 Calback 实际上是什么。一旦你有了大致的想法,你就可以尝试找出每个特定事物是在哪个线程上处理的。当一个线程被阻塞时,或者一切以什么顺序完成。编写一些打印语句的简单程序,例如:“主客户端线程:接收到 cookie”可能是一种有趣的试验方式。

回答by Shiraaz.M

My observations based on The Kafka Producer documentation:

我基于Kafka Producer 文档的观察:

  • Futuregives you access to synchronous processing
  • Futuremight not guarantee acknowledgement. My understanding is that a Callbackwill execute after acknowledgement
  • Callbackgives you access to fully non-blockingasynchronous processing.
    • There are also guarantees on the ordering of execution for a callback on the same partition
  • Future使您可以访问同步处理
  • Future可能无法保证确认。我的理解是 aCallback将在确认后执行
  • Callback使您可以访问完全非阻塞的异步处理。
    • 对同一分区上回调的执行顺序也有保证

Callbacks for records being sent to the same partition are guaranteed to execute in order.

发送到同一分区的记录的回调保证按顺序执行。

My other opinion that the Futurereturn object and the Callback'pattern' represents two different programming styles and I think that this is the fundamental difference:

我的另一个观点是,Future返回对象和Callback“模式”代表两种不同的编程风格,我认为这是根本区别:

  • The Futurerepresents Java's Concurrency Model Style.
  • The Callbackrepresents Java's Lambda Programming Style (because Callbackactually satisfies the requirement for a Functional Interface)
  • Future代表Java的并发模型样式。
  • Callback代表Java的拉姆达编程风格(因为回调实际上为满足功能接口的要求)

You can probably end up coding similar behaviors with both the Futureand Callbackstyles, but in some use cases it looks like one might style be more advantageous than the other.

您可能最终会同时使用FutureCallback样式编写类似的行为,但在某些用例中,看起来一种样式可能比另一种更有利。

回答by tchambers

The main difference is whether you want to block the calling thread waiting for the acknowledgment.

主要区别在于是否要阻塞等待确认的调用线程。

The following using the Future.get()method would block the current thread until the send is completed before performing some action.

以下使用Future.get()方法将阻塞当前线程,直到发送完成,然后再执行某些操作。

producer.send(record).get()
// Do some action

When using a Callback to perform some action, the code will execute in the I/O thread so it's non-blocking for the calling thread.

当使用回调执行某些操作时,代码将在 I/O 线程中执行,因此它不会阻塞调用线程。

 producer.send(record,
               new Callback() {
                   // Do some action
                   }
               });

Though the docssays it 'generally' executed in the producer:

尽管文档说它“通常”在生产者中执行:

Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.

请注意,回调通常会在生产者的 I/O 线程中执行,因此应该相当快,否则它们会延迟从其他线程发送消息。如果要执行阻塞或计算量大的回调,建议在回调主体中使用您自己的 Executor 来并行处理。

回答by sun007

send()is a method to start publishing message on Kafka Cluster. The send() method is an asynchronous call that say that send method accumulate message in Buffer and return back immediately. This can be used with linger.ms to batch publish message for better performance. We can handle exception and control using call send method with synchronous using get method on Future or asynchronous with callback.

send()是一种在 Kafka 集群上开始发布消息的方法。send() 方法是一个异步调用,它表示 send 方法在 Buffer 中累积消息并立即返回。这可以与 linger.ms 一起使用以批量发布消息以获得更好的性能。我们可以使用调用 send 方法同步处理异常和控制,使用 Future 上的 get 方法或异步回调。

Each method have their own pros and cons and can be decided based on use cases.

每种方法都有自己的优缺点,可以根据用例来决定。

Asynchronous send(Fire & Forget):We call send method as below to call publish message without waiting any success or error response.

异步发送(Fire & Forget):我们调用send 方法如下调用发布消息,无需等待任何成功或错误响应。

producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));

This scenario will not wait to get complete first message start sending other message to get publish. In case of exception producer retry based on retry config parameter but if message still fail after retry Kafka Producer never know about this. We may lots some message in this case but if ok with few message loss this provide high throughput and high latency.

这个场景不会等待完成第一条消息开始发送其他消息来获得发布。如果异常生产者根据重试配置参数重试,但如果重试后消息仍然失败,Kafka 生产者永远不会知道这一点。在这种情况下,我们可能会大量发送一些消息,但如果消息丢失很少,这将提供高吞吐量和高延迟。

Synchronous sendSimple way to send message synchronously is to use get() method

同步发送同步发送消息的简单方法是使用 get() 方法

RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();

Producer.send return Future of RecordMetadata and when we call .get() method it will get reply from Kafka. We can catch Error in case of error or return RecordMetadata in case of success. RecordMetadata contains offset, partition , timestamp to log the information. Its slow but give high reliability and guarantee to deliver message.

Producer.send 返回 RecordMetadata 的 Future,当我们调用 .get() 方法时,它将从 Kafka 获得回复。我们可以在错误的情况下捕获 Error 或在成功的情况下返回 RecordMetadata。RecordMetadata 包含用于记录信息的偏移量、分区、时间戳。它速度慢,但提供了高可靠性和传递消息的保证。

Asynchronous send with callbackWe can also call send() method with callback function which return response once message get completed. This is good if you like to send message with asynchronous way means not to wait to complete the job but at the same time handle Error or update status about message delivery.

带回调的异步发送我们还可以调用带有回调函数的 send() 方法,一旦消息完成就返回响应。如果您喜欢以异步方式发送消息,这意味着不等待完成作业,但同时处理错误或更新有关消息传递的状态,这很好。

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata recodMetadata, Exception ex){...}
})

Note:Please don't confuse with ack & retries with asynchronous send call. Ack and retries will apply on each send call whether its synchronous or asynchronous call, the only matter how you handle return messages and failure scenario. For example if you send asynchronous send still ack and retries rule get applied but will be on independent thread without blocking other thread to send parallel records. The only challenge we will not be aware in case of failure and time when it message completed successfully.

注意:请不要将 ack & retries 与异步发送调用混淆。Ack 和重试将应用于每个发送调用,无论是同步调用还是异步调用,唯一重要的是您如何处理返回消息和失败场景。例如,如果您发送异步发送仍然 ack 和重试规则得到应用,但将在独立线程上,而不会阻止其他线程发送并行记录。在失败的情况下我们不会意识到的唯一挑战和消息成功完成的时间。