KafkaConsumer 0.10 Java API 错误消息:没有当前的分区分配
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/41008610/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
KafkaConsumer 0.10 Java API error message: No current assignment for partition
提问by colossal
I am using KafkaConsumer 0.10 Java api. I want to consume from a specific partition and specific offset. I looked up and found that there is a seek method but its throwing an exception. Anyone had a similar use case or solution ?
我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量消费。我查了一下,发现有一个seek方法,但它抛出了一个异常。任何人都有类似的用例或解决方案?
Code:
代码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
Exception
例外
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
回答by Matthias J. Sax
Before you can seek()
you first need to subscribe()
to a topic orassign()
partition of a topic to the consumer. Also keep in mind, that subscribe()
and assign()
are lazy -- thus, you also need to do a "dummy call" to poll()
before you can use seek()
.
在您可以之前,seek()
您首先需要将subscribe()
一个主题或一个主题的assign()
分区提供给消费者。还请记住,that subscribe()
andassign()
是懒惰的——因此,poll()
在使用seek()
.
Note: as of Kafka 2.0, the new
poll(Duration timeout)
is async and it's not guaranteed that you have a complete assignment whenpoll
returns. Thus, you might need to check your assignment before usingseek()
and alsopoll
again to refresh the assignment. (Cf. KIP-266for details)
注意:从 Kafka 2.0 开始,new
poll(Duration timeout)
是异步的,并且不能保证你在poll
返回时有一个完整的分配。因此,您可能需要在使用前检查您的作业,seek()
并poll
再次刷新作业。(详见KIP-266)
If you use subscribe()
, you use group management: thus, you can start multiple consumers using the same group.id
and all partitions of the topic will be assigned evenly over all consumers within the group automatically (each partition will get assigned to a single consumer in the group).
如果使用subscribe()
,则使用组管理:因此,您可以使用相同的方式启动多个消费者group.id
,并且主题的所有分区将自动平均分配给组内的所有消费者(每个分区将分配给组中的一个消费者) .
If you want to read specific partitions, you need to use manual assignment via assign()
. This allows you to do any assignment you want.
如果要读取特定分区,则需要通过assign()
. 这允许您执行任何您想要的任务。
Btw: KafkaConsumer
has a very long an detailed class JavaDoc including examples. It's worth to read it.
顺便说一句:KafkaConsumer
有一个很长的详细类 JavaDoc,包括示例。值得一读。
回答by Adam111p
If you do not want to use poll()and retrieve map records, and change the offset itself. Kafka version 0.11 Try this:
如果您不想使用poll()并检索地图记录,请更改偏移量本身。Kafka 0.11 版试试这个:
...
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2"));
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList());
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator");
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings
consumer.seekToBeginning(partitions); //or other seek
Poll for coordinator events. This ensures that the coordinator is known and that the consumer has joined the group (if it is using group management). This also handles periodic offset commits if they are enabled.
对协调员事件进行投票。这确保了协调器是已知的,并且消费者已经加入组(如果它使用组管理)。如果启用它们,这也会处理定期偏移提交。