Java 如何获得 kafka 主题的最新偏移量?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38428196/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I get the LATEST offset of a kafka topic?
提问by Neptune
I'm writing a kafka
consumer using Java. I want to keep the real time of the message, so if there are too many messages waiting for consuming, such as 1000 or more, I should abandon the unconsumed messages and start consuming from the latest offset.
我正在kafka
使用 Java编写消费者。我想保持消息的实时性,所以如果等待消费的消息太多,比如1000条或更多,我应该放弃未消费的消息,从最近的偏移量开始消费。
For this problem, I try to compare the last committed offset and the latest offset of a topic(only 1 partition), if the difference between these two offsets is larger than a certain amount, I will set the latest offset of the topic as next offset so that I can abandon those redundant messages.
对于这个问题,我尝试比较一个主题的最后提交的偏移量和最新的偏移量(只有1个分区),如果这两个偏移量之间的差异大于一定量,我将主题的最新偏移量设置为next offset 以便我可以放弃那些多余的消息。
Now my problem is how to get the latest offset of a topic, some people say I can use old consumer, but it's too complicated, do new consumer has this function?
现在我的问题是如何获取一个topic的最新偏移量,有人说可以用老consumer,但是太复杂了,新consumer有这个功能吗?
采纳答案by lynn
The new consumer is also complicated.
新消费者也很复杂。
//assign the topic
consumer.assign();
//assign the topic
consumer.assign();
//seek to end of the topic
consumer.seekToEnd();
//seek to end of the topic
consumer.seekToEnd();
//the position is the latest offset
consumer.position();
//the position is the latest offset
consumer.position();
回答by hiaclibe
For Kafka version : 0.10.1.1
对于 Kafka 版本:0.10.1.1
// Get the diff of current position and latest offset
Set<TopicPartition> partitions = new HashSet<TopicPartition>();
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition());
partitions.add(actualTopicPartition);
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition);
long actualPosition = consumer.position(actualTopicPartition);
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition));
回答by rai.skumar
KafkaConsumer<String, String> consumer = ...
consumer.subscribe(Collections.singletonList(topic));
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.poll(0);
consumer.seekToEnd(Collections.singletonList(topicPartition));
long currentOffset = consumer.position(topicPartition) -1;
Above snippet returns the current committed message offset for the given topic and partition number.
上面的代码段返回给定主题和分区号的当前已提交消息偏移量。
回答by Steven
You can also use the kafka server command line tools:
您还可以使用 kafka 服务器命令行工具:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name
回答by Jignesh Patel
I have developed below code to fetch Offset Status
我开发了以下代码来获取偏移状态
import java.util
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
class GetOffsetRange(consumer:KafkaConsumer[String,String]) {
def getStartOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={
val topicPartitionList = consumer.partitionsFor(topic)
val partitionMap=new util.HashMap[TopicPartition,Long]()
val arrTopic=new util.ArrayList[TopicPartition]()
consumer.subscribe(Collections.singletonList(topic));
for(topic<-topicPartitionList.asScala){
println(topic.topic() +","+topic.partition())
arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
}
consumer.poll(0)
consumer.seekToBeginning(arrTopic)
for(partition <- arrTopic.asScala){
partitionMap.put(partition,consumer.position(partition)-1)
}
return partitionMap
}
def getEndOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={
val topicPartitionList = consumer.partitionsFor(topic)
val partitionMap=new util.HashMap[TopicPartition,Long]()
val arrTopic=new util.ArrayList[TopicPartition]()
consumer.subscribe(Collections.singletonList(topic));
for(topic<-topicPartitionList.asScala){
println(topic.topic() +","+topic.partition())
arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
}
consumer.poll(0)
consumer.seekToEnd(arrTopic)
for(partition <- arrTopic.asScala){
partitionMap.put(partition,consumer.position(partition)-1)
}
return partitionMap
}
}
回答by Jyoti Ranjan
since kafka 1.0.1, consumer has a method called endOffsets
从 kafka 1.0.1 开始,消费者有一个名为 endOffsets 的方法
public java.util.Map endOffsets(java.util.Collection partitions)
公共 java.util.Map endOffsets(java.util.Collection 分区)
Please let me know if you need full code..
如果您需要完整代码,请告诉我..
Please refer apache-kafka-1.0.1-javadoc