java 简单的 Kafka 消费者没有收到消息
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44760866/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Simple Kafka Consumer not receiving messages
提问by mohiitg
I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumerand KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code.
I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me.
Kafka Version: kafka_2.10-0.10.2.1
and corresponding maven dependency is used in pom.
我是 Kafka 的新手,正在运行KafkaConsumer和KafkaProducer上给出的一个简单的 kafka 消费者/生产者示例。当我从终端运行消费者时,消费者正在接收消息,但我无法使用 Java 代码进行侦听。我也在 StackoverFlow 上搜索了类似的问题(链接:Link1,Link2)并尝试了该解决方案,但似乎没有任何效果对我有用。Kafka 版本:kafka_2.10-0.10.2.1
以及相应的 maven 依赖在 pom 中使用。
Java Code for producer and consumer:
生产者和消费者的 Java 代码:
public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9094");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));
producer.close();
}}
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9094");
props.put("group.id", "test");
props.put("zookeeper.connect", "localhost:2181");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic3", "topic2"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}}
Starting kafka:
bin/kafka-server-start.sh config/server.properties
(I have already set port, brokerid in properties file)
启动kafka:(
bin/kafka-server-start.sh config/server.properties
我已经在属性文件中设置了端口、brokerid)
回答by Sanjay
First check what all the groups are available by using :
首先使用以下命令检查所有可用的组:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Then check for which group your topic belongs by using below cmd :
然后使用以下 cmd 检查您的主题属于哪个组:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe
Once you find your topic and associated group name (just replace group.idwith your group if it not belongs to default group) then try with below prop and let me know if it works :
一旦你找到你的主题和相关的组名(如果它不属于默认组,只需将group.id替换为你的组)然后尝试使用以下道具并让我知道它是否有效:
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group"); // default topic name
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName)); // replace you topic name
//print the topic name
java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
System.out.println("list of topic size :" + listTopics.size());
for(String topic : listTopics.keySet()){
System.out.println("topic name :"+topic);
}
回答by codehacker
Run the consumer before running the producer so that the consumer registers with the group coordinator first.Later when u run the producer the consumer consumes the messages.The first time u run the consumer its registering with the group coordinator.In order to find out till what offset the consumer has consumed the messages use this kafka-consumer-offset-checker.bat --group group-1 --topic testing-1 --zookeeper localhost:2181
This shows the consumer has consumed which offset of the topic last.
在运行生产者之前运行消费者,这样消费者首先向组协调器注册。稍后当你运行生产者时,消费者消费消息。第一次运行消费者时,它向组协调器注册。为了找出直到消费者消费消息的偏移量使用 thiskafka-consumer-offset-checker.bat --group group-1 --topic testing-1 --zookeeper localhost:2181
这表明消费者最后消费了主题的哪个偏移量。
回答by Aravind A
Clear Your 'tmp'
folder in the drive which you accessing kafka. then open new 'cmd' command window! Restart server freshly, and post " .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic H1 --from-beginning
"this code in the command window to run consumer without any error
清除您'tmp'
访问kafka的驱动器中的文件夹。然后打开新的“cmd”命令窗口!重新启动服务器,并.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic H1 --from-beginning
在命令窗口中发布“ ”此代码以运行消费者没有任何错误
回答by DenisKolodin
Try to set enable.partition.eof
parameter to false
:
尝试将enable.partition.eof
参数设置为false
:
props.put("enable.partition.eof", "false");
That worked for me.
那对我有用。
回答by itsKrrish
Try this one this code worked for me.
试试这个代码对我有用。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(props);
myConsumer.subscribe(Arrays.asList(topicName));
myConsumer.subscribe(topics);
try{
while (true) {
ConsumerRecords<String, String> records = myConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format( "Topic: %s, Partition: %d, Offset: %d, key: %s, value: %s",
record.topic(),record.partition(), record.offset(),record.key(),record.value()
));
}}
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
myConsumer.close();
}