Java 如何检查Kafka Server是否正在运行?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37920923/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 19:48:00  来源:igfitidea点击:

How to check whether Kafka Server is running?

javaapache-kafkakafka-producer-api

提问by Khan

I want to ensure whether kafka server is running or not before starting production and consumption jobs. It is in windows environment and here's my kafka server's code in eclipse...

我想在开始生产和消费作业之前确保 kafka 服务器是否正在运行。它在windows环境中,这是我在eclipse中的kafka服务器代码......

Properties kafka = new Properties();
kafka.setProperty("broker.id", "1");
kafka.setProperty("port", "9092");
kafka.setProperty("log.dirs", "D://workspace//");
kafka.setProperty("zookeeper.connect", "localhost:2181");    
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(kafka);        
KafkaServer server = new KafkaServer(config, new CurrentTime(), option);
server.startup();

In this case if (server != null)is not enough because it is always true. So is there any way to know that my kafka server is running and ready for producer. It is necessary for me to check this because it causes loss of some starting data packets.

在这种情况下if (server != null)是不够的,因为它总是正确的。那么有什么方法可以知道我的 kafka 服务器正在运行并准备好供生产者使用。我有必要检查一下,因为它会导致一些起始数据包丢失。

回答by Paul Carey

All Kafka brokers must be assigned a broker.id. On startup a broker will create an ephemeral node in Zookeeper with a path of /broker/ids/$id. As the node is ephemeral it will be removed as soon as the broker disconnects, e.g. by shutting down.

必须为所有 Kafka 代理分配一个broker.id. 启动时,代理将在 Zookeeper 中创建一个路径为 的临时节点/broker/ids/$id。由于节点是短暂的,一旦代理断开连接,例如通过关闭,它将被删除。

You can view the list of the ephemeral broker nodes like so:

您可以像这样查看临时代理节点的列表:

echo dump | nc localhost 2181 | grep brokers

echo dump | nc localhost 2181 | grep brokers

The ZooKeeper client interface exposes a number of commands; dumplists all the sessions and ephemeral nodes for the cluster.

ZooKeeper 客户端接口公开了许多命令;dump列出集群的所有会话和临时节点。

Note, the above assumes:

请注意,以上假设:

  • You're running ZooKeeper on the default port (2181) on localhost, and that localhostis the leader for the cluster
  • Your zookeeper.connectKafka config doesn't specify a chroot env for your Kafka cluster i.e. it's just host:portand not host:port/path
  • 您在默认端口 ( 2181) 上运行 ZooKeeper localhost,这localhost是集群的领导者
  • 您的zookeeper.connectKafka 配置没有为您的 Kafka 集群指定一个 chroot 环境,即它只是host:port而不是host:port/path

回答by dbustosp

Paul's answer is very good and it is actually how Kafka & Zk work together from a broker point of view.

保罗的回答非常好,从经纪人的角度来看,这实际上是 Kafka 和 Zk 如何协同工作。

I would say that another easy option to check if a Kafka server is running is to create a simple KafkaConsumer pointing to the cluste and try some action, for example, listTopics(). If kafka server is not running, you will get a TimeoutExceptionand then you can use a try-catchsentence.

我想说另一个检查 Kafka 服务器是否正在运行的简单选项是创建一个简单的 KafkaConsumer 指向集群并尝试一些操作,例如listTopics()。如果 kafka 服务器没有运行,你会得到一个TimeoutException然后你可以使用一个try-catch句子。

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }

回答by Prashanth Reddy Mamidi

The good option is to use AdminClientas below before starting to produce or consume the messages

好的选择是在开始生成或使用消息之前使用AdminClient如下

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;           
 try (AdminClient client = AdminClient.create(properties)) {
            client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
        } catch (ExecutionException ex) {
            LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS);
            return;
        }

回答by Mohammad Faisal

I used the AdminClientapi.

我使用了AdminClientapi。

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
{
    ListTopicsResult topics = client.listTopics();
    Set<String> names = topics.names().get();
    if (names.isEmpty())
    {
        // case: if no topic found.
    }
    return true;
}
catch (InterruptedException | ExecutionException e)
{
    // Kafka is not available
}

回答by usman

you can use below code to check for brokers available if server is running.

如果服务器正在运行,您可以使用以下代码检查可用的代理。

import org.I0Itec.zkclient.ZkClient;
     public static boolean isBrokerRunning(){
        boolean flag = false;
        ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 10000);//, kafka.utils.ZKStringSerializer$.MODULE$);
        if(zkClient!=null){
            int brokersCount = zkClient.countChildren(ZkUtils.BrokerIdsPath());
            if(brokersCount > 0){
                logger.info("Following Broker(s) {} is/are available on Zookeeper.",zkClient.getChildren(ZkUtils.BrokerIdsPath()));
                flag = true;    
            }
            else{
                logger.error("ERROR:No Broker is available on Zookeeper.");
            }
            zkClient.close();

        }
        return flag;
    }

回答by Muhammad Faizan Khan

I found an event OnErrorin confluent Kafka:

OnError在融合的 Kafka 中发现了一个事件:

consumer.OnError += Consumer_OnError;

 private void Consumer_OnError(object sender, Error e)
    {
        Debug.Log("connection error: "+ e.Reason);
        ConsumerConnectionError(e);
    }

And its documentation in code:

以及它的代码文档:

    //
    // Summary:
    //     Raised on critical errors, e.g. connection failures or all brokers down. Note
    //     that the client will try to automatically recover from errors - these errors
    //     should be seen as informational rather than catastrophic
    //
    // Remarks:
    //     Executes on the same thread as every other Consumer event handler (except OnLog
    //     which may be called from an arbitrary thread).
    public event EventHandler<Error> OnError;

回答by Raffy Arnaez

For Linux, "ps aux | grep kafka" see if kafka properties are shown in the results. E.g. /path/to/kafka/server.properties

对于 Linux,“ps aux | grep kafka”查看结果中是否显示了 kafka 属性。例如 /path/to/kafka/server.properties

回答by Abzelhan

Firstly you need to create AdminClientbean:

首先,您需要创建AdminClientbean:

 @Bean
 public AdminClient adminClient(){
   Map<String, Object> configs = new HashMap<>();
   configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
   StringUtils.arrayToCommaDelimitedString(new Object[]{"your bootstrap server address}));
   return AdminClient.create(configs);
 }

Then, you can use this script:

然后,您可以使用此脚本:

while (true) {
   Map<String, ConsumerGroupDescription> groupDescriptionMap =
         adminClient.describeConsumerGroups(Collections.singletonList(groupId))
         .all()
         .get(10, TimeUnit.SECONDS);

   ConsumerGroupDescription consumerGroupDescription = groupDescriptionMap.get(groupId);

   log.debug("Kafka consumer group ({}) state: {}",
                groupId,
                consumerGroupDescription.state());

   if (consumerGroupDescription.state().equals(ConsumerGroupState.STABLE)) {
        boolean isReady = true;
        for (MemberDescription member : consumerGroupDescription.members()) {
            if (member.assignment() == null || member.assignment().topicPartitions().isEmpty()) {
            isReady = false;
            }
        }

        if (isReady) {
            break;
           }
        }

        log.debug("Kafka consumer group ({}) is not ready. Waiting...", groupId);
        TimeUnit.SECONDS.sleep(1);
}

This script will check the state of the consumer group every second till the state will be STABLE. Because all consumers assigned to topic partitions, you can conclude that server is running and ready.

此脚本将每秒检查消费者组的状态,直到状态为STABLE。因为所有消费者都分配给主题分区,所以您可以得出结论,服务器正在运行并准备就绪。