java 来自 kafka 消费者的 InstanceAlreadyExistsException
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40880832/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
InstanceAlreadyExistsException coming from kafka consumer
提问by user1950349
I am working with Kafka and trying to setup consumer group by follwing this article. The only difference is I have created my own abstract class, handler to make design simpler.
我正在与 Kafka 合作并尝试通过遵循本文来设置消费者组。唯一的区别是我创建了自己的抽象类,处理程序以使设计更简单。
Below is my abstract class:
下面是我的抽象类:
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
Below is my KafkaConsumerA
which extends above abstract class:
下面是 myKafkaConsumerA
扩展到抽象类之上:
public class KafkaConsumerA extends Consumer {
private KafkaConsumer<byte[], DataHolder> consumer;
public KafkaConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
// exception comes from below line from two of the threads and the remaining one thread works fine.
consumer = new KafkaConsumer<>(consumerProps);
List<String> topics = getTopicsBasisOnConsumerName(consumerName);
try {
consumer.subscribe(topics);
// Setup the schema config
Map<String, Object> config = new HashMap<>();
config.put("urls", "https://abc.qa.host.com");
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
while (true) {
ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
for (ConsumerRecord<byte[], DataHolder> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out
.println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
}
}
} catch (WakeupException ex) {
ex.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
And below is my Handler class:
下面是我的 Handler 类:
// looks like something is wrong in this class
public final class ConsumerHandler {
private final ExecutorService executorServiceProcess;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
consumers.add(consumer);
executorServiceProcess.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceProcess.shutdown();
try {
executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
And here is I am starting all my consumers in the consumer group from the main class:
这是我从主类开始消费组中的所有消费者:
public static void main(String[] args) {
ConsumerHandler handlerA =
new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
// run KafkaConsumerB here
handlerA.shutdown();
// shutdown KafkaConsumerB here
}
So with this - my plan is to setup a consumer group with three consumers in KafkaConsumerA
and all three subscribed to same topics.
所以有了这个 - 我的计划是建立一个消费者组,其中有三个消费者,KafkaConsumerA
并且三个消费者都订阅了相同的主题。
Error:-
错误:-
Whenever I run this, looks like only one consumer in the consumer group works and other two doesn't work. And I see this exception on the console from those two:
每当我运行它时,看起来消费者组中只有一个消费者有效,而其他两个无效。我在控制台上看到了这两个异常:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]
What is wrong I am doing here? getConsumerProps()
method return properties object which has client.id
and group.id
in it with same value for all three consumers in that consumer group.
我在这里做什么错了?getConsumerProps()
方法返回的属性对象,它具有client.id
和group.id
它与所有三个消费者,消费群相同的值。
Below is my design details:
下面是我的设计细节:
- My
KafkaConsumerA
will have three consumers in a consumer group and each consumer will work ontopicA
. - My
KafkaConsumerB
(similar to KafkaConsumerA) will have two consumers in a different consumer group and each of those consumer will work ontopicB
.
- 我
KafkaConsumerA
将在一个消费者组中拥有三个消费者,每个消费者都将处理topicA
. - 我的
KafkaConsumerB
(类似于 KafkaConsumerA)将在不同的消费者组中有两个消费者,并且每个消费者都将在topicB
.
And these two consumers KafkaConsumerA
and KafkaConsumerB
will be running on same box with different consumer group independent of each other.
而这两个消费者KafkaConsumerA
,并KafkaConsumerB
会与相互独立的不同的消费群同箱中运行。
回答by Nick Vanderhoven
Kafka is trying to register MBeansfor application monitoring and is using the client.id
to do so. As you said, you have the properties injected in your abstract class and inject for every consumer the same client.id
and group.id
in group A
. However, you have different clients, so you should give them their own client.id
, but keep the same group.id
. This will register the different client/consumers in the same consumer group and make them work together, but not clash on the MBeans registration.
Kafka 正在尝试注册MBeans以进行应用程序监控,并且正在使用client.id
来这样做。正如您所说,您在抽象类中注入了属性,并为每个使用者注入了相同的client.id
和group.id
group 中的属性A
。但是,您有不同的客户,因此您应该给他们自己的客户client.id
,但要保持相同group.id
。这将在同一个消费者组中注册不同的客户端/消费者并使它们一起工作,但不会在 MBeans 注册上发生冲突。