java 如何将消息发送到受 SSL 保护的 kafka 主题

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43027146/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 07:01:39  来源:igfitidea点击:

how to send message to kafka topic protected with SSL

javasslapache-kafkakafka-producer-api

提问by Bravo

Can anyone please suggest which properties we need to set to send message to SSL protected kafka topic using java KafkaProducer ,am new to kafka , not able to send one message to kafka protected with SSL

任何人都可以建议我们需要设置哪些属性才能使用 java KafkaProducer 向受 SSL 保护的 kafka 主题发送消息,我是 kafka 的新手,无法向受 SSL 保护的 kafka 发送一条消息

回答by Maximilien Belinga

I assume you already know how to configure Kafka for SSL. You need to add configuration settings for SSL encryption and for SSL authentication. Basically, this is a basic producer structure for that.

我假设您已经知道如何为 SSL 配置 Kafka。您需要为 SSL 加密和 SSL 身份验证添加配置设置。基本上,这是一个基本的生产者结构。

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  "test1234");

// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
TestCallback callback = new TestCallback();
Random rnd = new Random();
for (long i = 0; i < 100 ; i++) {
   ProducerRecord<String, String> data = new ProducerRecord<String, String>(
           "test-topic", "key-" + i, "message-"+i );
   producer.send(data, callback);
}

producer.close();