java 如果消息是由生产者生产的,如何从 Kafka 经纪人那里获得确认?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/34085577/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 22:27:54  来源:igfitidea点击:

how to get acknowledgement from Kafka broker if message is produced by producer?

javaapache-kafkakafka-consumer-apikafka-producer-api

提问by usman

I would like to get some response from the broker when I produce a message. I have tried CallBack mechanism (by implementing CallBack) used in KafkaProducer.sendbut it did not work and does not call onCompletionmethod.

When I shutdown Kafka server and try to produce message then it does call callback method.

当我生成消息时,我想从经纪人那里得到一些回应。我已经尝试过使用的回调机制(通过实现回调),KafkaProducer.send但它没有工作,也没有调用onCompletion方法。

当我关闭 Kafka 服务器并尝试生成消息时,它会调用回调方法。

Is there any other way to get acknowledgment?

有没有其他方式获得认可?

@Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        System.out.println("Called Callback method");
        if (metadata != null) {
            System.out.println("message(" + key + ", " + message
                    + ") sent to partition(" + metadata.partition() + "), "
                    + "offset(" + metadata.offset() + ") in " + elapsedTime
                    + " ms");
        } else {
            exception.printStackTrace();
        }

    }

props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`

i am using kafka-client api 0.9.1 with broker version 0.8.2.

我正在使用 kafka-client api 0.9.1 和代理版本 0.8.2。

回答by Morgan Kenyon

So I'm not 100% sure on which versions work with which in Kafka. Currently I use 0.8.2, I know 0.9 introduced some breaking changes but I couldn't tell you for sure what does/doesn't work now.

所以我不能 100% 确定哪些版本可以在 Kafka 中使用。目前我使用 0.8.2,我知道 0.9 引入了一些重大更改,但我无法确定现在什么有效/无效。

One very strong recommendation, I would use the Kafka-Client version that corresponds to your broker version. If you're using broker 0.8.2, I would use kakfa-client 0.8.2 as well.

一个非常强烈的建议是,我将使用与您的代理版本相对应的 Kafka-Client 版本。如果您使用的是代理 0.8.2,我也会使用 kakfa-client 0.8.2。

You never presented any code of how you're using this, so I'm just somewhat guessing in the dark. But I've implemented the Callback feature in Kafka 0.8.2 by using this methodin the producer. Below is the method signature.

你从来没有提供任何关于你如何使用它的代码,所以我只是在黑暗中猜测。但是我已经通过在生产者中使用这种方法在Kafka 0.8.2 中实现了回调功能。下面是方法签名。

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

Where will I call that method, I actually pass in the class with the overriden method.

我将在哪里调用该方法,我实际上是通过覆盖方法传入该类。

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    } else {
      //implement logic here, or call another method to process metadata
      System.out.println("Callback");
    }
  }
}); 

I'm assuming there's a way to also do it as you've done it. But you'd have to provide the code showing how you're actually sending records to Kafka. Other than that I'm just guessing.

我假设有一种方法也可以像您一样做到这一点。但是您必须提供显示您实际如何将记录发送到 Kafka 的代码。除此之外我只是猜测。

回答by Hugo Carmona

There is a simple way to get information from the broker after publish message by KafkaProducer using kafka 0.9 version. You can call get() method that will return a RecordMetadata object which you can get info such as offset, topicPartition, below a code snippet as an example:

KafkaProducer 使用 kafka 0.9 版本发布消息后,有一种简单的方法可以从代理获取信息。您可以调用 get() 方法,该方法将返回一个 RecordMetadata 对象,您可以在代码片段下方获取诸如偏移量、topicPartition 等信息作为示例:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
                        topic, key.getBytes("UTF-8"), message
                                .getBytes("UTF-8"))).get();
System.out.println("Message produced, offset: " + m.offset());
System.out.println("Message produced, partition : " + m.partition());
System.out.println("Message produced, topic: " + m.topic());

回答by user3881282

Close the producer -- once you are done publishing all your messages -- like this:

关闭生产者——一旦你完成了所有的消息的发布——就像这样:

producer.close();

This ensures that the following method from the Callback is always called:

这确保始终调用回调中的以下方法:

onCompletion(RecordMetadata metadata, Exception exception)

NB: I have tested this by adding that line to this sample Producer classand it works.

注意:我已经通过将该行添加到这个示例 Producer 类来测试它并且它工作正常。