带有 kerberos 的 Kafka Java Producer

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35934578/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 17:16:12  来源:igfitidea点击:

Kafka Java Producer with kerberos

javaapache-kafkakerberosjaashortonworks-data-platform

提问by Kalpesh

Getting error while sending message to kafka topic in kerberosed enviornment. We have cluster on hdp 2.3

在 kerberosed 环境中向 kafka 主题发送消息时出错。我们在 hdp 2.3 上有集群

I followed this http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

But for sending messages, I have to do kinit explicitly first, then only I am able to send message to kafka topic. I tried to do knit through java class but that also doesn't work. PFB code:

但是对于发送消息,我必须先明确地执行 kinit ,然后才能将消息发送到 kafka 主题。我试图通过 java 类来编织,但这也不起作用。PFB代码:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("sun.security.krb5.debug", "true");

        try {
            long events = Long.parseLong("3");
            Random rnd = new Random();

            Properties props = new Properties();
            System.out.println("After broker list- " + args[0]);

            props.put("metadata.broker.list", args[0]);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("security.protocol", "PLAINTEXTSASL");

            //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


            System.out.println("After config prop -1");

            ProducerConfig config = new ProducerConfig(props);

            System.out.println("After config prop -2 config" + config);

            Producer<String, String> producer = new Producer<String, String>(config);

            System.out.println("After config prop -3");

            for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                Date runtime = new Date();
                String ip = "192.168.2" + rnd.nextInt(255);
                String msg = runtime + " www.example.com, " + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);

                System.out.println("After config prop -1 data" + data);

                producer.send(data);
            }
            producer.close();

        } catch (Throwable th) {
            th.printStackTrace();

        }
    }
}

Pom.xml : All dependency downloaded from hortonworks repo.

Pom.xml :从 hortonworks 存储库下载的所有依赖项。

        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.jasypt</groupId>
                <artifactId>jasypt-spring31</artifactId>
                <version>1.9.2</version>
                <scope>compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1.2.3.4.0-3485</version>
            </dependency>

        </dependencies>

Error : Case1 : when I specify myuser kafka_jass.conf

错误:案例 1:当我指定 myuser kafka_jass.conf 时

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at javax.security.auth.login.Configuration.run(Configuration.java:258)
        at javax.security.auth.login.Configuration.run(Configuration.java:250)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
        at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
        at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

MyUser_Kafka_jass.conf

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   doNotPrompt=true
   useTicketCache=true
   renewTicket=true
   principal="ctadmin/[email protected]";
   useKeyTab=true
   serviceName="kafka"
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   client=true;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   storeKey=true
   useTicketCache=true
   serviceName="zookeeper"
   principal="ctadmin/[email protected]";
};

case2 : When I specify Kafkas own jaas file

case2:当我指定Kafkas自己的jaas文件时

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access
bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181
0(LoginContext.java:203) at javax.security.auth.login.LoginContext.run(LoginContext.java:690) at javax.security.auth.login.LoginContext.run(LoginContext.java:688) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687) at javax.security.auth.login.LoginContext.login(LoginContext.java:595) at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298) at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) at kafka.common.security.LoginManager$.init(LoginManager.scala:36) at kafka.producer.Producer.<init>(Producer.scala:50) at kafka.producer.Producer.<init>(Producer.scala:73) at kafka.javaapi.producer.Producer.<init>(Producer.scala:26) at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

This works fine, if I do kinit before running this app, else it will through above error. I cant do this in my production environment, if there is any way to do this by our app itself then please help me out. Please let me know if you need any more details.

这工作正常,如果我在运行此应用程序之前执行 kinit,否则它将通过上述错误。我无法在我的生产环境中执行此操作,如果我们的应用程序本身有任何方法可以执行此操作,请帮助我。如果您需要更多详细信息,请告诉我。

Thanks:)

谢谢:)

采纳答案by Kalpesh

I don't know what mistake did first time, below things I did again, and it works fine.

我不知道第一次犯了什么错误,下面是我再次犯的错误,并且工作正常。

First give all access to topic:

首先授予对主题的所有访问权限:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="[email protected]"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/ctadmin.keytab"
 client=true;
};

create jass file: kafka-jaas.conf

创建 jas 文件:kafka-jaas.conf

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "{Hostname}:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 10; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

Java Program:

Java程序:

Line 6: expected [controlFlag]

Run application:

运行应用程序:

java -Djava.security.auth.login.config=/home/ctadmin/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

java -Djava.security.auth.login.config=/home/ctadmin/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka -testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

回答by victorgp

The error is in a semicolon you have in your jaas file as you can see in this piece of output:

错误在您的 jaas 文件中的分号中,如您在此输出中所见:

principal="ctadmin/[email protected]";

This line cannot have the semicolon:

这一行不能有分号:

##代码##

it can only exist in the last line:

它只能存在于最后一行: