Java 如何在 spring kafka 中确认当前偏移量以进行手动提交

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47427948/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 02:35:53  来源:igfitidea点击:

How to acknowledge current offset in spring kafka for manual commit

javaapache-kafkaspring-kafka

提问by user2550140

I am using Spring Kafka first time and I am not able to use Acknowledgement.acknowledge() method for manual commit in my consumer code. please let me know if anything missing in my consumer configuration or listener code. or else is there other way to handle acknowledge offset based on condition. Here i'm looking solution like if the offset is not committed/ acknowledge manually, it should pick same message/offset by consumer.

我第一次使用 Spring Kafka,我无法在我的消费者代码中使用 Acknowledgement.acknowledge() 方法进行手动提交。如果我的消费者配置或侦听器代码中缺少任何内容,请告诉我。或者还有其他方法可以根据条件处理确认偏移量。在这里,我正在寻找解决方案,如果偏移量没有手动提交/确认,它应该由消费者选择相同的消息/偏移量。

Configuration

配置

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
                props));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
}

Listener

听众

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    }
    value++;
}

回答by Artem Bilan

That doesn't work that way in Apache Kafka.

这在 Apache Kafka 中是行不通的。

For the currently running consumer we may never worry about committing offsets. We need them persisted only for new consumers in the same consumer group. The current one track its offset in the memory. I guess somewhere on Broker.

对于当前运行的消费者,我们可能永远不会担心提交偏移量。我们只需要为同一消费者组中的新消费者持久化它们。当前一个跟踪它在内存中的偏移量。我猜在 Broker 的某个地方。

If you need to refetch the same message in the same consumer maybe the next poll round, you should consider to use seek()functionality: https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

如果您需要在同一个消费者中重新获取相同的消息,也许在下一轮轮询中,您应该考虑使用以下seek()功能:https: //docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html /_reference.html#seek

回答by contactsunny

Set the enable-auto-commit property to false:

将 enable-auto-commit 属性设置为 false:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Set the ack-mode to MANUAL_IMMEDIATE:

将确认模式设置为 MANUAL_IMMEDIATE:

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Then, in your consumer/listener code, you can commit the offset manually, like this:

然后,在您的使用者/侦听器代码中,您可以手动提交偏移量,如下所示:

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}

Update: I created a small POC for this. Check it out here, might help you.

更新:我为此创建了一个小型 POC。看看这里,可能对你有帮助。

回答by Zubair A.

You can do following:
1. store the current record offset to file or DB.
2. Implement your kafka listener class with ConsumerAware.
3. Call registerSeekCallback as given below:

您可以执行以下操作:
1. 将当前记录偏移量存储到文件或数据库中。
2. 使用 ConsumerAware 实现您的 kafka 侦听器类。
3. 调用 registerSeekCallback 如下:

(registerSeekCallback(ConsumerSeekCallback callback) 
      {
      callback.seek(topic, partition, offset)
}

So when the consumer goes down or new consumer is assigned , it start reading fomr the offset stored in your DB.

因此,当消费者出现故障或分配了新消费者时,它会开始读取存储在数据库中的偏移量。

回答by Amit Nargund

You need to do the following

您需要执行以下操作

1) Set enable-auto-commit property to false

1) 将 enable-auto-commit 属性设置为 false

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

2) Set the ACK Mode to MANUL_IMMEDIATE

2)设置ACK模式为MANUL_IMMEDIATE

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

3) For processed records you need to call acknowledgment.acknowledge();

3)对于处理过的记录,你需要调用acknowledge.acknowledge();

4) for failed records call acknowledgment.nack(10); Note: the nack method takes a long parameter which is the sleep time and it should be less than max.poll.interval.ms

4)对于失败的记录调用acknowledgement.nack(10);注意:nack 方法需要一个很长的参数,即睡眠时间,它应该小于 max.poll.interval.ms

Below is a sample code

下面是一个示例代码

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    } else {
        acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
    }
    value++;
}