Java 如何在 Kafka 中使用多个消费者?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30899163/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I use multiple consumers in Kafka?
提问by Jeff Gong
I am a new student studying Kafka and I've run into some fundamental issues with understanding multiple consumers that articles, documentations, etc. have not been too helpful with so far.
我是一名学习 Kafka 的新生,我在理解多个消费者方面遇到了一些基本问题,到目前为止,文章、文档等并没有太大帮助。
One thing I have tried to do is write my own high level Kafka producer and consumer and run them simultaneously, publishing 100 simple messages to a topic and having my consumer retrieve them. I have managed to do this successfully, but when I try to introduce a second consumer to consume from the same topic that messages were just published to, it receives no messages.
我尝试做的一件事是编写我自己的高级 Kafka 生产者和消费者并同时运行它们,向主题发布 100 条简单消息并让我的消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入第二个消费者以从刚刚发布消息的同一主题中消费时,它没有收到任何消息。
It was my understanding that for each topic, you could have consumers from separate consumer groups and each of these consumer groups would get a full copy of the messages produced to some topic. Is this correct? If not, what would be the proper way for me to set up multiple consumers? This is the consumer class that I have written so far:
我的理解是,对于每个主题,您可以拥有来自不同消费者组的消费者,并且这些消费者组中的每一个都将获得针对某个主题生成的消息的完整副本。这样对吗?如果没有,我设置多个消费者的正确方法是什么?这是我到目前为止编写的消费者类:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Furthermore, I noticed that originally I was testing the above consumption for a topic 'test' with only a single partition. When I added another consumer to an existing consumer group say 'testGroup', this trigged a Kafka rebalance which slowed down the latency of my consumption by a significant amount, in the magnitude of seconds. I thought that this was an issue with rebalancing since I only had a single partition, but when I created a new topic 'multiplepartitions' with say 6 partitions, similar issues arose where adding more consumers to the same consumer group caused latency issues. I have looked around and people are telling me I should be using a multi-threaded consumer -- can anyone shed light on that?
此外,我注意到最初我正在测试只有一个分区的主题“测试”的上述消耗。当我将另一个消费者添加到现有的消费者组(例如“testGroup”)时,这触发了 Kafka 重新平衡,从而显着降低了我的消费延迟,以秒为单位。我认为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个包含 6 个分区的新主题“multiplepartitions”时,出现了类似的问题,即向同一消费者组添加更多消费者会导致延迟问题。我环顾四周,人们告诉我我应该使用多线程消费者——有人能解释一下吗?
采纳答案by Chris Gerken
I think your problem lies with the auto.offset.reset property. When a new consumer reads from a partition and there's no previous committed offset, the auto.offset.reset property is used to decide what the starting offset should be. If you set it to "largest" (the default) you start reading at the latest (last) message. If you set it to "smallest" you get the first available message.
我认为您的问题在于 auto.offset.reset 属性。当一个新的消费者从一个分区中读取并且没有先前提交的偏移量时, auto.offset.reset 属性用于决定起始偏移量应该是多少。如果您将其设置为“最大”(默认值),您将从最新(最后)消息开始阅读。如果您将其设置为“最小”,您将获得第一条可用消息。
So add:
所以添加:
properties.put("auto.offset.reset", "smallest");
and try again.
然后再试一次。
* edit *
* 编辑 *
"smallest" and "largest" were deprecated a while back. You should use "earliest" or "latest" now. Any questions, check the docs
"smallest" 和 "largest" 不久前被弃用了。您现在应该使用“最早”或“最新”。有任何问题,请查看文档
回答by Alper Akture
In the documentation hereit says: "if you provide more threads than there are partitions on the topic, some threads will never see a message". Can you add partitions to your topic? I have my consumer group thread count equal to the number of partitions in my topic, and each thread is getting messages.
在此处的文档中,它说:“如果您提供的线程多于主题上的分区数,则某些线程将永远不会看到消息”。您可以为主题添加分区吗?我的消费者组线程数等于我的主题中的分区数,并且每个线程都在获取消息。
Here's my topic config:
这是我的主题配置:
buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs:
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0
And my consumer:
而我的消费者:
package com.cie.dispatcher.services;
import com.cie.dispatcher.model.WinNotification;
import com.fasterxml.Hymanson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* This will create three threads, assign them to a "group" and listen for notifications on a topic.
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the
* lifecycle manager in dropwizard.
* <p/>
* Created by aakture on 6/15/15.
*/
public class KafkaTopicListener implements Managed {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private int threadCount;
private WinNotificationWorkflow winNotificationWorkflow;
private ObjectMapper objectMapper;
@Inject
public KafkaTopicListener(String a_zookeeper,
String a_groupId, String a_topic,
int threadCount,
WinNotificationWorkflow winNotificationWorkflow,
ObjectMapper objectMapper) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
this.threadCount = threadCount;
this.winNotificationWorkflow = winNotificationWorkflow;
this.objectMapper = objectMapper;
}
/**
* Creates the config for a connection
*
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example.
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads.
* @return the config props
*/
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public void stop() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
LOG.info("Interrupted during shutdown, exiting uncleanly");
}
LOG.info("{} shutdown successfully", this.getClass().getName());
}
/**
* Starts the listener
*/
public void start() {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(threadCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(threadCount);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ListenerThread(stream, threadNumber));
threadNumber++;
}
}
private class ListenerThread implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ListenerThread(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
try {
String message = null;
LOG.info("started listener thread: {}", m_threadNumber);
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
try {
message = new String(it.next().message());
LOG.info("receive message by " + m_threadNumber + " : " + message);
WinNotification winNotification = objectMapper.readValue(message, WinNotification.class);
winNotificationWorkflow.process(winNotification);
} catch (Exception ex) {
LOG.error("error processing queue for message: " + message, ex);
}
}
LOG.info("Shutting down listener thread: " + m_threadNumber);
} catch (Exception ex) {
LOG.error("error:", ex);
}
}
}
}
回答by user1119541
If you want multiple consumers to consume same messages (like a broadcast), you can spawn them with different consumer group and also setting auto.offset.reset to smallest in consumer config. If you want multiple consumers to finish consuming in parallel ( divide the work among them ), you should create number of partitions >= number of consumers. One partition can be only consumed by at most one consumer process. But One consumer can consume more than one partitions.
如果您希望多个消费者使用相同的消息(如广播),您可以使用不同的消费者组生成它们,并在消费者配置中将 auto.offset.reset 设置为最小。如果您希望多个消费者并行完成消费(在它们之间分配工作),则应创建分区数 >= 消费者数。一个分区最多只能被一个消费者进程消费。但是一个消费者可以消费多个分区。