如何让 kafka 在 Java 程序中消耗延迟
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/42201616/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to get kafka consume lag in java program
提问by jamee
I wrote a java program to consume messsage from kafka. I want to monitor the consume lag, how to get it by java?
我编写了一个 Java 程序来使用来自 kafka 的消息。我想监控消费延迟,如何通过java获取?
BTW, I use:
顺便说一句,我使用:
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
Thanks in advance.
提前致谢。
采纳答案by Florian Garcia
I personnaly query directly jmx informations from my consumers. I only consume in java so the JMX beans : kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max
are available.
我个人直接从我的消费者那里查询 jmx 信息。我只在 Java 中使用,所以 JMX bean :kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max
可用。
If jolokia is in your classpath you can retrieve the value with a GET on /jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max
and gather all the results in one place.
如果 jolokia 在您的类路径中,您可以通过 GET 检索该值/jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max
并将所有结果收集到一个地方。
There is also Burrowwhich is very easy to configure, but it's a bit outdated (doesn't work for 0.10 if I remember well).
还有Burrow很容易配置,但它有点过时了(如果我没记错的话,它不适用于 0.10)。
回答by Alexey Vlasov
In case if you don't want to include kafka (and scala) dependencies to your project you can use class below. It uses only kafka-clients dependencies.
如果您不想在项目中包含 kafka(和 scala)依赖项,您可以使用下面的类。它仅使用 kafka-clients 依赖项。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
public class KafkaConsumerMonitor {
public static class PartionOffsets {
private long endOffset;
private long currentOffset;
private int partion;
private String topic;
public PartionOffsets(long endOffset, long currentOffset, int partion, String topic) {
this.endOffset = endOffset;
this.currentOffset = currentOffset;
this.partion = partion;
this.topic = topic;
}
public long getEndOffset() {
return endOffset;
}
public long getCurrentOffset() {
return currentOffset;
}
public int getPartion() {
return partion;
}
public String getTopic() {
return topic;
}
}
private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString();
public Map<TopicPartition, PartionOffsets> getConsumerGroupOffsets(String host, String topic, String groupId) {
Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);
KafkaConsumer consumer = createNewConsumer(groupId, host);
BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
throw new IllegalStateException();
};
Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> (entry.getKey()),
entry -> {
OffsetAndMetadata committed = consumer.committed(entry.getKey());
return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey().partition(), topic);
}, mergeFunction));
return result;
}
public Map<TopicPartition, Long> getLogEndOffset(String topic, String host) {
Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap<>();
KafkaConsumer<?, ?> consumer = createNewConsumer(monitoringConsumerGroupID, host);
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList());
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition)));
consumer.close();
return endOffsets;
}
private static KafkaConsumer<?, ?> createNewConsumer(String groupId, String host) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new KafkaConsumer<>(properties);
}
}
回答by T Anna
I am using Spring for my api. Using the below code, you can get the metrics via java.The code works.
我正在为我的 api 使用 Spring。使用下面的代码,您可以通过 java 获取指标。代码有效。
@Component
public class Receiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(Receiver.class);
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public void testlag() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
Map<String, Map<MetricName, ? extends Metric>> metrics = messageListenerContainer.metrics();
metrics.forEach( (clientid, metricMap) ->{
System.out.println("------------------------For client id : "+clientid);
metricMap.forEach((metricName,metricValue)->{
//if(metricName.name().contains("lag"))
System.out.println("------------Metric name: "+metricName.name()+"-----------Metric value: "+metricValue.metricValue());
});
});
}
}
回答by amethystic
Try to use AdminClient#listGroupOffsets(groupID) to retrieve offsets of all topic partitions associated with the consumer's group. For example:
尝试使用 AdminClient#listGroupOffsets(groupID) 来检索与消费者组关联的所有主题分区的偏移量。例如:
AdminClient client = AdminClient.createSimplePlaintext("localhost:9092");
Map<TopicPartition, Object> offsets = JavaConversions.asJavaMap(
client.listGroupOffsets("groupID"));
Long offset = (Long) offsets.get(new TopicPartition("topic", 0));
...
EDIT:
Snippets above show how to get the committed offset for a given partition. Code below shows how to retrieve LEO for a partition.
编辑:
上面的片段显示了如何获取给定分区的提交偏移量。下面的代码显示了如何检索分区的 LEO。
public long getLogEndOffset(TopicPartition tp) {
KafkaConsumer consumer = createNewConsumer();
Collections.singletonList(tp);
consumer.assign(Collections.singletonList(tp));
consumer.seekToEnd(Collections.singletonList(tp));
return consumer.position(tp);
}
private KafkaConsumer<String, String> createNewConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer(properties);
}
Invoking getLogEndOffset
returns the LEO for the given partition, then subtract the committed offset from it and the result is the lag.
调用getLogEndOffset
返回给定分区的 LEO,然后从中减去提交的偏移量,结果就是滞后。
回答by snowfox
For you reference, I got this done with the code below. Basically, you have to calculate the lag of each topic-partition manually by calculating the delta between current committed offset and the end offset.
供您参考,我使用下面的代码完成了这项工作。基本上,您必须通过计算当前提交的偏移量和结束偏移量之间的增量来手动计算每个主题分区的滞后。
private static Map<TopicPartition, Long> lagOf(String brokers, String groupId) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult currentOffsets = client.listConsumerGroupOffsets(groupId);
try {
// get current offsets of consuming topic-partitions
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = currentOffsets.partitionsToOffsetAndMetadata()
.get(3, TimeUnit.SECONDS);
final Map<TopicPartition, Long> result = new HashMap<>();
doWithKafkaConsumer(groupId, brokers, (c) -> {
// get latest offsets of consuming topic-partitions
// lag = latest_offset - current_offset
Map<TopicPartition, Long> endOffsets = c.endOffsets(consumedOffsets.keySet());
result.putAll(endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())));
});
return result;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("", e);
return Collections.emptyMap();
}
}
}
public static void doWithKafkaConsumer(String groupId, String brokers,
Consumer<KafkaConsumer<String, String>> consumerRunner) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumerRunner.accept(consumer);
}
}
Note that one consumer group could be consuming multiple topics simultaneously, so if you need to get the lag for each topic, you'll have to group and aggregate the result by topic then.
请注意,一个消费者组可能同时消费多个主题,因此如果您需要获取每个主题的延迟,则必须按主题对结果进行分组和聚合。
Map<TopicPartition, Long> lags = lagOf(brokers, group);
Map<String, Long> topicLag = new HashMap<>();
lags.forEach((tp, lag) -> {
topicLag.compute(tp.topic(), (k, v) -> v == null ? lag : v + lag);
});