bash 检查Python中是否存在Kafka主题

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30943129/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-18 13:14:09  来源:igfitidea点击:

Check whether a Kafka topic exists in Python

pythonbashapache-kafka

提问by Thomas Schreiter

I want to create a Kafkatopic if it does not already exist. I know how to create a topic via the bash, but I don't know how to check whether it exists.

如果Kafka主题尚不存在,我想创建它。我知道如何通过 bash 创建主题,但我不知道如何检查它是否存在。

topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])

回答by Yonatan Kiron

Another nice way is with python kafka module:

另一个不错的方法是使用 python kafka 模块:

kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions

if topic_name in server_topics:
   your code....

kafka_client.topic_partitions returns list of topics.

kafka_client.topic_partitions 返回主题列表。

回答by chrsblck

You can use the --list (List all available topics)option for kafka-topics.shand see if self.topicexists in the topicsarray, as shown below.

您可以使用该--list (List all available topics)选项kafka-topics.sh查看数组中是否self.topic存在topics,如下所示。

Depending on the number of topics you have this approach might be a bit heavy. If this is the case, you might be able to get away with using --describe (List details for the given topics)which will likelyreturn empty if the topic doesn't exist. I haven't thoroughly tested this, so I can't say for sure how solid this solution (--describe) is, but it might be worth it for you to investigate a bit further.

根据您拥有的主题数量,这种方法可能有点繁重。如果是这种情况,您可能可以避免使用--describe (List details for the given topics),如果主题不存在,则可能会返回空值。我还没有对此进行彻底的测试,所以我不能肯定这个解决方案 ( --describe)有多可靠,但你可能值得进一步调查。

wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

OUTPUT:

输出:

root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

There is also a Broker Configurationavailable so you don't have to take any of these steps:

还有一个Broker Configuration可用,因此您无需执行以下任何步骤:

auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.

auto.create.topics.enable | 真实| 在服务器上启用自动创建主题。如果将其设置为 true,则尝试为不存在的主题生成数据或获取元数据将使用默认复制因子和分区数自动创建它。

I would take this approach if possible.

如果可能,我会采用这种方法。

Note that you should set topic configs (server.properties) on your broker for num.partitionsand default.replication.factorto match your settings in your code snippet.

请注意,您应该server.properties在您的代理上设置主题配置 ( )num.partitionsdefault.replication.factor匹配您在代码片段中的设置。