KafkaConsumer 0.10 Java API 错误消息:没有当前的分区分配

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41008610/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 05:37:12  来源:igfitidea点击:

KafkaConsumer 0.10 Java API error message: No current assignment for partition

javakafka-consumer-api

提问by colossal

I am using KafkaConsumer 0.10 Java api. I want to consume from a specific partition and specific offset. I looked up and found that there is a seek method but its throwing an exception. Anyone had a similar use case or solution ?

我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量消费。我查了一下,发现有一个seek方法,但它抛出了一个异常。任何人都有类似的用例或解决方案?

Code:

代码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

Exception

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)

回答by Matthias J. Sax

Before you can seek()you first need to subscribe()to a topic orassign()partition of a topic to the consumer. Also keep in mind, that subscribe()and assign()are lazy -- thus, you also need to do a "dummy call" to poll()before you can use seek().

在您可以之前,seek()您首先需要将subscribe()一个主题一个主题的assign()分区提供给消费者。还请记住,that subscribe()andassign()是懒惰的——因此,poll()在使用seek().

Note: as of Kafka 2.0, the new poll(Duration timeout)is async and it's not guaranteed that you have a complete assignment when pollreturns. Thus, you might need to check your assignment before using seek()and also pollagain to refresh the assignment. (Cf. KIP-266for details)

注意:从 Kafka 2.0 开始,newpoll(Duration timeout)是异步的,并且不能保证你在poll返回时有一个完整的分配。因此,您可能需要在使用前检查您的作业,seek()poll再次刷新作业。(详见KIP-266

If you use subscribe(), you use group management: thus, you can start multiple consumers using the same group.idand all partitions of the topic will be assigned evenly over all consumers within the group automatically (each partition will get assigned to a single consumer in the group).

如果使用subscribe(),则使用组管理:因此,您可以使用相同的方式启动多个消费者group.id,并且主题的所有分区将自动平均分配给组内的所有消费者(每个分区将分配给组中的一个消费者) .

If you want to read specific partitions, you need to use manual assignment via assign(). This allows you to do any assignment you want.

如果要读取特定分区,则需要通过assign(). 这允许您执行任何您想要的任务。

Btw: KafkaConsumerhas a very long an detailed class JavaDoc including examples. It's worth to read it.

顺便说一句:KafkaConsumer有一个很长的详细类 JavaDoc,包括示例。值得一读。

回答by Adam111p

If you do not want to use poll()and retrieve map records, and change the offset itself. Kafka version 0.11 Try this:

如果您不想使用poll()并检索地图记录,请更改偏移量本身。Kafka 0.11 版试试这个:

...
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);    
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2"));
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList());
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);    

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings
consumer.seekToBeginning(partitions); //or other seek

Poll for coordinator events. This ensures that the coordinator is known and that the consumer has joined the group (if it is using group management). This also handles periodic offset commits if they are enabled.

对协调员事件进行投票。这确保了协调器是已知的,并且消费者已经加入组(如果它使用组管理)。如果启用它们,这也会处理定期偏移提交。