Java 多个主题的Kafka消费者
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39568947/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Kafka consumer for multiple topic
提问by Apollo
I have a list of topics (for now it's 10) whose size can increase in future. I know we can spawn multiple threads (per topic) to consume from each topic, but in my case if the number of topics increases, then the number of threads consuming from the topics increases, which I do not want, since the topics are not going to get data too frequently, so the threads will sit ideal.
我有一个主题列表(现在是 10 个),它们的大小将来会增加。我知道我们可以产生多个线程(每个主题)来从每个主题消费,但在我的情况下,如果主题数量增加,那么从主题消费的线程数量会增加,这是我不想要的,因为主题不是将过于频繁地获取数据,因此线程将处于理想状态。
Is there any way to have a single consumer to consume from all topics? If yes, then how can we achieve it? Also how will the offset be maintained by Kafka? Please suggest answers.
有没有办法让一个消费者从所有主题中消费?如果是,那么我们如何才能实现它?Kafka 将如何维护偏移量?请提出答案。
采纳答案by Subrata Saha
We can subscribe for multiple topic using following API : consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)
我们可以使用以下 API 订阅多个主题:consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)
Consumer has the topic info and we can comit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.
消费者拥有主题信息,我们可以通过创建 OffsetAndMetadata 对象来使用 consumer.commitAsync 或 consumer.commitSync() 提交,如下所示。
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
回答by Bhawna Arora
There is no need for multiple threads, you can have one consumer, consuming from multiple topics. Offsets are maintained by zookeeper, as kafka-server itself is stateless. Whenever a consumer consumes a message,its offset is commited with zookeeper to keep a future track to process each message only once. So even in case of kafka failure, consumer will start consuming from the next of last commited offset.
不需要多线程,你可以有一个消费者,从多个主题中消费。偏移量由 zookeeper 维护,因为 kafka-server 本身是无状态的。每当消费者消费一条消息时,它的偏移量都会与zookeeper一起提交,以保持未来的轨迹,每条消息只处理一次。因此,即使在 kafka 失败的情况下,消费者也会从上次提交的下一个偏移量开始消费。