Python 如何获取kafka主题的分区的最新偏移量?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35432326/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to get latest offset for a partition for a kafka topic?
提问by Saket
I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.
我正在为 Kafka 使用 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
But the output I get is
但我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
I have an alternate approach using assign
but the result is the same
我有另一种使用方法,assign
但结果是一样的
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
It seems from some of the documentation that I might get this behaviour if a fetch
has not been issued. But I cannot find a way to force that. What am I doing wrong?
从一些文档看来,如果fetch
没有发出a,我可能会得到这种行为。但我找不到一种方法来强迫它。我究竟做错了什么?
Or is there a different/simpler way to get the latest offsets for a topic?
或者是否有不同/更简单的方法来获取主题的最新偏移量?
采纳答案by Saket
Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.
最后,在花了一天的时间和几次错误的启动之后,我找到了解决方案并使其正常工作。把它贴给她,以便其他人可以参考。
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
回答by avr
If you wish to use Kafka shell scripts present in kafka/bin, then you can get latest and smallest offsets by using kafka-run-class.sh.
如果您希望使用存在于 kafka/bin 中的 Kafka shell 脚本,那么您可以使用 kafka-run-class.sh 获取最新和最小的偏移量。
To get latest offset command will look like this
要获取最新的偏移量命令将如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
To get smallest offset command will look like this
要获得最小偏移量命令将如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
You can find more information on Get Offsets Shell from following link
您可以从以下链接中找到有关 Get Offsets Shell 的更多信息
Hope this helps!
希望这可以帮助!
回答by Itamar Lavender
from kafka import KafkaConsumer, TopicPartition
TOPIC = 'MYTOPIC'
GROUP = 'MYGROUP'
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(TOPIC):
tp = TopicPartition(TOPIC, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
回答by olujedai
Another way to achieve this is by polling the consumer to obtain the last consumed offset and then using the seek_to_end method to obtain the most recent available offset partition.
实现此目的的另一种方法是轮询消费者以获取上次消费的偏移量,然后使用 seek_to_end 方法获取最近可用的偏移量分区。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
consumer.poll()
consumer.seek_to_end()
This method particularly comes in handy when using consumer groups.
这种方法在使用消费者组时特别有用。
SOURCES:
来源:
回答by a.costa
With kafka-python>=1.3.4
you can use:
有了kafka-python>=1.3.4
你可以使用:
kafka.KafkaConsumer.end_offsets(partitions)
kafka.KafkaConsumer.end_offsets(分区)
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
获取给定分区的最后一个偏移量。一个分区的最后一个偏移量就是即将到来的消息的偏移量,即最后一条可用消息的偏移量+1。
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.end_offsets(ps)