java Apache Kafka 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41410151/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 05:50:58  来源:igfitidea点击:

Apache Kafka-Exception : org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V

javaapache-kafkakafka-consumer-apinosuchmethoderror

提问by vaquar khan

I am using Apache Kafka version kafka_2.10-0.10.1.0

我正在使用 Apache Kafka 版本 kafka_2.10-0.10.1.0

During Poc i have created simple producer and consumer. When i am try to consume message getting following error :

在 Poc 期间,我创建了简单的生产者和消费者。当我尝试使用消息时出现以下错误:

 Exception in thread "Thread-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V
at com.spnotes.kafka.simple.Consumer$ConsumerThread.run(Consumer.java:59)

Code :

代码 :

package com.spnotes.kafka.simple;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;


public class Consumer {
    private static Scanner in;

    public static void main(String[] argv)throws Exception{
        if (argv.length != 2) {
            System.err.printf("Usage: %s <topicName> <groupId>\n",
                    Consumer.class.getSimpleName());
            System.exit(-1);
        }
        in = new Scanner(System.in);
        String topicName = argv[0];
        String groupId = argv[1];

        ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
        consumerRunnable.start();
        String line = "";
        while (!line.equals("exit")) {
            line = in.next();
        }
        consumerRunnable.getKafkaConsumer().wakeup();
        System.out.println("Stopping consumer .....");
        consumerRunnable.join();
    }

    private static class ConsumerThread extends Thread{
        private String topicName;
        private String groupId;
        private KafkaConsumer<String,String> kafkaConsumer;

        public ConsumerThread(String topicName, String groupId){
            this.topicName = topicName;
            this.groupId = groupId;
        }
        public void run() {
            Properties configProperties = new Properties();
            configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

            //Figure out where to start processing messages from
            kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
            kafkaConsumer.subscribe(Arrays.asList(topicName));
            //Start processing messages
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records)
                        System.out.println(record.value());
                }
            }catch(WakeupException ex){
                System.out.println("Exception caught " + ex.getMessage());
            }finally{
                kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
            }
        }
        public KafkaConsumer<String,String> getKafkaConsumer(){
           return this.kafkaConsumer;
        }
    }
}

Ruining using command :

使用命令破坏:

java -cp .:/home/osboxes/Kafka/kafka_2.10-0.10.1.1/libs/*:/home/osboxes/Kafka/kafka_2.10-0.10.1.1/libs/KafkaAPIClient-1.0-SNAPSHOT.jar  com.spnotes.kafka.simple.Consumer test group1

Error :

错误 :

Exception in thread "Thread-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V
    at com.spnotes.kafka.simple.Consumer$ConsumerThread.run(Consumer.java:59)

回答by vaquar khan

I was using wrong API thanks for amethystic . After adding new client version it's start working

感谢 amethystic,我使用了错误的 API。添加新的客户端版本后,它开始工作

Old API was in POM

旧 API 在 POM 中

   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.0</version>
    </dependency>

Added new API in POM

在 POM 中添加了新的 API

   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>

No backward compatibility between Kafka API old and new version

Kafka API 新旧版本之间没有向后兼容性