java 来自 kafka 消费者的 InstanceAlreadyExistsException

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40880832/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 05:31:41  来源:igfitidea点击:

InstanceAlreadyExistsException coming from kafka consumer

javamultithreadingapache-kafkakafka-consumer-api

提问by user1950349

I am working with Kafka and trying to setup consumer group by follwing this article. The only difference is I have created my own abstract class, handler to make design simpler.

我正在与 Kafka 合作并尝试通过遵循本文来设置消费者组。唯一的区别是我创建了自己的抽象类,处理程序以使设计更简单。

Below is my abstract class:

下面是我的抽象类:

public abstract class Consumer implements Runnable {
  private final Properties consumerProps;
  private final String consumerName;

  public Consumer(String consumerName, Properties consumerProps) {
    this.consumerName = consumerName;
    this.consumerProps = consumerProps;
  }

  protected abstract void shutdown();

  protected abstract void run(String consumerName, Properties consumerProps);

  @Override
  public final void run() {
    run(consumerName, consumerProps);
  }
}

Below is my KafkaConsumerAwhich extends above abstract class:

下面是 myKafkaConsumerA扩展到抽象类之上:

public class KafkaConsumerA extends Consumer {
  private KafkaConsumer<byte[], DataHolder> consumer;

  public KafkaConsumerA(String consumerName, Properties consumerProps) {
    super(consumerName, consumerProps);
  }

  @Override
  public void shutdown() {
    consumer.wakeup();
  }

  @Override
  protected void run(String consumerName, Properties consumerProps) {
    // exception comes from below line from two of the threads and the remaining one thread works fine.
    consumer = new KafkaConsumer<>(consumerProps);
    List<String> topics = getTopicsBasisOnConsumerName(consumerName);
    try {
      consumer.subscribe(topics);
      // Setup the schema config
      Map<String, Object> config = new HashMap<>();
      config.put("urls", "https://abc.qa.host.com");

      GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
      while (true) {
        ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
        for (ConsumerRecord<byte[], DataHolder> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out
              .println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
        }
      }
    } catch (WakeupException ex) {
      ex.printStackTrace();
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      consumer.close();
    }
  }
}

And below is my Handler class:

下面是我的 Handler 类:

// looks like something is wrong in this class
public final class ConsumerHandler {
  private final ExecutorService executorServiceProcess;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      consumers.add(consumer);
      executorServiceProcess.submit(consumer);
    }
  }

  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceProcess.shutdown();
        try {
          executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

And here is I am starting all my consumers in the consumer group from the main class:

这是我从主类开始消费组中的所有消费者:

  public static void main(String[] args) {
    ConsumerHandler handlerA =
        new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
    // run KafkaConsumerB here

     handlerA.shutdown();
     // shutdown KafkaConsumerB here
  }

So with this - my plan is to setup a consumer group with three consumers in KafkaConsumerAand all three subscribed to same topics.

所以有了这个 - 我的计划是建立一个消费者组,其中有三个消费者,KafkaConsumerA并且三个消费者都订阅了相同的主题。

Error:-

错误:-

Whenever I run this, looks like only one consumer in the consumer group works and other two doesn't work. And I see this exception on the console from those two:

每当我运行它时,看起来消费者组中只有一个消费者有效,而其他两个无效。我在控制台上看到了这两个异常:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]

What is wrong I am doing here? getConsumerProps()method return properties object which has client.idand group.idin it with same value for all three consumers in that consumer group.

我在这里做什么错了?getConsumerProps()方法返回的属性对象,它具有client.idgroup.id它与所有三个消费者,消费群相同的值。

Below is my design details:

下面是我的设计细节:

  • My KafkaConsumerAwill have three consumers in a consumer group and each consumer will work on topicA.
  • My KafkaConsumerB(similar to KafkaConsumerA) will have two consumers in a different consumer group and each of those consumer will work on topicB.
  • KafkaConsumerA将在一个消费者组中拥有三个消费者,每个消费者都将处理topicA.
  • 我的KafkaConsumerB(类似于 KafkaConsumerA)将在不同的消费者组中有两个消费者,并且每个消费者都将在topicB.

And these two consumers KafkaConsumerAand KafkaConsumerBwill be running on same box with different consumer group independent of each other.

而这两个消费者KafkaConsumerA,并KafkaConsumerB会与相互独立的不同的消费群同箱中运行。

回答by Nick Vanderhoven

Kafka is trying to register MBeansfor application monitoring and is using the client.idto do so. As you said, you have the properties injected in your abstract class and inject for every consumer the same client.idand group.idin group A. However, you have different clients, so you should give them their own client.id, but keep the same group.id. This will register the different client/consumers in the same consumer group and make them work together, but not clash on the MBeans registration.

Kafka 正在尝试注册MBeans以进行应用程序监控,并且正在使用client.id来这样做。正如您所说,您在抽象类中注入了属性,并为每个使用者注入了相同的client.idgroup.idgroup 中的属性A。但是,您有不同的客户,因此您应该给他们自己的客户client.id,但要保持相同group.id。这将在同一个消费者组中注册不同的客户端/消费者并使它们一起工作,但不会在 MBeans 注册上发生冲突。