java 如何在 kafka 0.9.0 中使用多线程消费者?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36274024/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use multi-thread consumer in kafka 0.9.0?
提问by Acceml
The doc of kafka give an approach about with following describes:
kafka 的文档给出了一种方法,描述如下:
One Consumer Per Thread:A simple option is to give each thread its own consumer > instance.
每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 > 实例。
My code:
我的代码:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CloudKafkaConsumer consumer;
private final String topicName;
public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
this.consumer = consumer;
this.topicName = topicName;
}
@Override
public void run() {
try {
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
public static void main(String[] args) {
CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
.withBootstrapServers("172.31.1.159:9092")
.withGroupId("test")
.build();
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
executorService.shutdown();
}
}
but it doesn't work and throws an exception:
但它不起作用并抛出异常:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全
Furthermore, I read the source of Flink (an open source platform for distributed stream and batch data processing). Flink using multi-thread consumer is similar to mine.
此外,我阅读了 Flink(分布式流和批处理数据处理的开源平台)的源代码。使用多线程消费者的 Flink 与我的类似。
long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
ConsumerRecords<byte[], byte[]> records;
//noinspection SynchronizeOnNonFinalField
synchronized (flinkKafkaConsumer.consumer) {
try {
records = flinkKafkaConsumer.consumer.poll(pollTimeout);
} catch (WakeupException we) {
if (running) {
throw we;
}
// leave loop
continue;
}
}
What's wrong?
怎么了?
回答by Lan
Kafka consumer is not thread safe. As you pointed out in your question, the document stated that
Kafka 消费者不是线程安全的。正如您在问题中指出的那样,该文件指出
A simple option is to give each thread its own consumer instance
一个简单的选择是为每个线程提供自己的使用者实例
But in your code, you have the same consumer instance wrapped by different KafkaConsumerRunner instances. Thus multiple threads are accessing the same consumer instance. The kafka documentation clearly stated
但是在您的代码中,您拥有由不同的 KafkaConsumerRunner 实例包装的相同消费者实例。因此,多个线程正在访问同一个消费者实例。kafka 文档中明确说明
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.
Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致 ConcurrentModificationException。
That's exactly the exception you received.
这正是您收到的例外情况。
回答by Steve
It is throwing the exception on your call to subscribe. this.consumer.subscribe(topicName);
它在您调用订阅时抛出异常。 this.consumer.subscribe(topicName);
Move that block into a synchronized block like this:
将该块移动到一个同步块中,如下所示:
@Override
public void run() {
try {
synchronized (consumer) {
this.consumer.subscribe(topicName);
}
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if (!closed.get()) throw e;
}
}
回答by demonodojo
Maybe is not your case, but if you are mergin processing of data of serveral topics, then you can read data from multiple topics with the same consumer. If not, then is preferable to create separate jobs consuming each topic.
也许不是您的情况,但是如果您正在合并处理多个主题的数据,那么您可以从具有相同使用者的多个主题中读取数据。如果不是,则最好创建使用每个主题的单独作业。