java Kafka 0.9 如何在使用 KafkaConsumer 手动提交偏移量时重新使用消息
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34901781/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Kafka 0.9 How to re-consume message when manually committing offset with a KafkaConsumer
提问by Michael Freeman
I am writing a consumer that manually commits the offset once a series of records are commited to Mongo.
In the case of a Mongo error or any other error an attempt is made to persit the record to an error processing collection
for replay at a later date.
If Mongo is down then I want the consumer to stop processing for a period of time before trying to read the records from the uncommited offset from Kakfa.
The below sample works but I would like to know what the best practice for this scenario is?
我正在编写一个消费者,一旦将一系列记录提交给 Mongo,它就会手动提交偏移量。
在 Mongo 错误或任何其他错误的情况下,会尝试将记录保留到错误处理集合中,以便在以后重播。如果 Mongo 已关闭,那么我希望消费者在尝试从 Kakfa 的未提交偏移量读取记录之前停止处理一段时间。
下面的示例有效,但我想知道这种情况的最佳实践是什么?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
If I don't recreate the consumer I get the following error.
如果我不重新创建消费者,我会收到以下错误。
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
采纳答案by Nautilus
If you didn't commit the offset and the auto.commit.enable property is false then when the call to Mongo fails you just wait the time that you think is necessary and retry to poll().
如果您没有提交偏移量并且 auto.commit.enable 属性为 false,那么当对 Mongo 的调用失败时,您只需等待您认为必要的时间并重试 poll()。
The problem that you are seeing is that the new consumer uses the poll() as a heartbeat mechanism, so if you wait for longer that the timeout request then the coordinator for the topic will kickout the consumer because it will think is dead and it will rebalance the group. So wait for mongo but you may want to poll() ones in a while.
你看到的问题是新的消费者使用 poll() 作为心跳机制,所以如果你等待超时请求的时间更长,那么主题的协调器将踢出消费者,因为它会认为已经死了,它会重新平衡组。所以等待 mongo 但你可能想在一段时间内 poll() 那些。
EDIT: As a workaround you can put this property higher request.timeout.ms
编辑:作为一种解决方法,您可以将此属性设置为更高的 request.timeout.ms
Hope it helps!
希望能帮助到你!
回答by simonchen
Here is my code using client version 0.10.0 .
这是我使用客户端版本 0.10.0 的代码。
Seem is ok for you demand.
似乎可以满足您的需求。
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageProcesser implements Runnable {
private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class);
private final ExecutorService pool = Executors.newFixedThreadPool(4);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final AtomicBoolean closed = new AtomicBoolean(false);
public MessageProcesser(String groupId, String topic, String kafkaServer) {
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "false");
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(Collections.singleton(topic));
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(1000 * 60);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
if (null == value) {
continue;
}
boolean processResult = false;
try {
Future<Object> f = pool.submit(new ProcessCommand(value));
processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!processResult) {
//here if process fail, seek to current offset
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
} else {
this.commitAsyncOffset(record);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (!closed.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
// ignore
}
}
}
}
public void shutdown() {
closed.set(true);
}
public void commitAsyncOffset(ConsumerRecord<String, String> record) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace()));
}
}
});
}
}
回答by Jakub
as I understand it, the (new) client is the one that keeps the consumed offsets. The commit sends the offsets to the server, but it has no effect on next poll from that client, since the client says to the server "give me next messages on THAT offset". Why is then the offset sent to the server? For next rebalance. So the only situation server uses the committed offsets is when some client dies/disconnects - then the partitions are rebalanced and with this rebalances the clients get the offsets from the server.
据我了解,(新)客户端是保持消耗的偏移量的客户端。提交将偏移量发送到服务器,但它对来自该客户端的下一次轮询没有影响,因为客户端对服务器说“在该偏移量上给我下一条消息”。为什么将偏移量发送到服务器?为下一次重新平衡。因此,服务器使用提交的偏移量的唯一情况是当某些客户端死机/断开连接时 - 然后重新平衡分区,并通过重新平衡客户端从服务器获取偏移量。
So if you don't commit offset and then call poll(), you cannot expect that the message will be read again. To this there would have to be a possibility to rollback the offset in the client. I didn't try but I think calling KafkaConsumer.seek to the offset of failed message should do the trick.
所以如果你不提交offset然后调用poll(),你就不能指望消息会被再次读取。为此,必须有可能回滚客户端中的偏移量。我没有尝试,但我认为调用 KafkaConsumer.seek 到失败消息的偏移量应该可以解决问题。
BTW, in this way you can even commit the last successfuly processed message and seek to the first failed, so that you don't need to repeat the whole record list, when failure occured for some message in the middle of it.
顺便说一句,通过这种方式,您甚至可以提交最后一个成功处理的消息并寻找第一个失败的消息,这样当中间的某些消息发生失败时,您就不需要重复整个记录列表。