java Spring Kafka - 如何使用组 ID 将偏移量重置为最新?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47777395/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 09:48:15  来源:igfitidea点击:

Spring Kafka - How to reset offset to latest with a group id?

javaspringapache-kafkaspring-integrationspring-kafka

提问by Bachrc

I am currently using Spring Integration Kafka to make real-time statistics. Though, the group name makes Kafka search all the previous values the listener didn't read.

我目前正在使用 Spring Integration Kafka 进行实时统计。但是,组名使 Kafka 搜索侦听器未读取的所有先前值。

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

I would like to begin to the latest offset, and not be bothered by old values. Is there a possibility to reset the offset of the group ?

我想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?

回答by Bachrc

Because I didn't saw any example of this, I'm gonna explain how I did here.

因为我没有看到任何这样的例子,我将在这里解释我是怎么做的。

The class of your @KafkaListenermust implement a ConsumerSeekAwareclass, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek)

您的类@KafkaListener必须实现一个ConsumerSeekAware类,该类将允许侦听器在分配分区时控制偏移量搜索。(来源:https: //docs.spring.io/spring-kafka/reference/htmlsingle/#seek

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan) for guiding me to the answer.

在这里,在重新平衡时,我们使用给定的回调来寻找所有给定主题的最后一个偏移量。感谢 Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan) 指导我找到答案。

回答by A.Chinese

You can set a ConsumerRebalanceListenerfor the kafka consumer while you subscribing to some topics,in which you can get the lastest offset of each partition by KafkaConsumer.endOffsets()method, and set this to consumer by KafkaConsumer.seek()method ,like this:

您可以ConsumerRebalanceListener在订阅某些主题时为kafka消费者设置a ,其中您可以通过KafkaConsumer.endOffsets()方法获取每个分区的最新偏移量,并将其设置为消费者KafkaConsumer.seek(),如下所示:

kafkaConsumer.subscribe(Collections.singletonList(topics),
    new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            //get and set the lastest offset for each partiton
            kafkaConsumer.endOffsets(partitions) 
                .forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
        }
    }
);

回答by link

you can use partitionOffsets annotation to start with exact offset,for example:

您可以使用 partitionOffsets 注释以精确偏移量开始,例如:

@KafkaListener(id = "bar", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })public void listen(ConsumerRecord<?, ?> record) {
     }

回答by Ahmad Abdelghany

For a new consumer group that doesn't have an initial offset in kafka, you just set your AUTO_OFFSET_RESET_CONFIG:

对于在 kafka 中没有初始偏移量的新消费者组,您只需设置AUTO_OFFSET_RESET_CONFIG

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "new-consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

For an existing consumer group, assuming you can't change your group id to appear as new, you have to seek to desired offset during initialization.

对于现有的消费者组,假设您无法更改组 ID 以显示为新的,则必须在初始化期间寻求所需的偏移量。

See docs

查看文档