如何通过Java在Kafka中创建Topic
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27036923/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to create a Topic in Kafka through Java
提问by Jaya Ananthram
I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,
我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建主题,并且通过 java api 推送消息,则它工作正常。但是我想通过java api创建一个主题。经过长时间的搜索,我找到了下面的代码,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?
我尝试了上面的代码,它显示该主题已创建,但我无法在该主题中推送消息。我的代码有什么问题吗?或者任何其他方式来实现上述目标?
采纳答案by Jaya Ananthram
Edit- Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannurfor API version 0.11.0+
编辑- 在较新版本的 Kafka 中不需要 Zookeeper。请参阅@Neeleshkumar Srinivasan Mannur对 API 版本 0.11.0+ 的回答
Original answer
原答案
I fixed it.. After a long research..
我修好了..经过长时间的研究..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,
从上面的代码来看,ZkClient会创建一个topic,但是这个topic信息不会被kafka感知。所以我们要做的是,我们需要通过以下方式为 ZkClient 创建对象,
First import the below statement,
首先导入以下语句,
import kafka.utils.ZKStringSerializer$;
and create object for ZkClient in the following way,
并通过以下方式为 ZkClient 创建对象,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Edit 1: (for @ajkret comment)
编辑 1: (@ajkret 评论)
The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9
由于 api 已更改,因此上述代码不适用于 kafka > 0.9,请使用以下代码用于 kafka > 0.9
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}
回答by Richard G
Just a pointer to anyone looking at this with a updated version of Kafka (At the time of writing this, I was using Kafka v0.10.0.0).
只是一个指向任何使用更新版本的 Kafka 看这个的指针(在撰写本文时,我使用的是 Kafka v0.10.0.0)。
You have to change;
你必须改变;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);
To the following;
以下;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);
It is also a good idea to close the connection once finished;
完成后关闭连接也是一个好主意;
zkClient.close();
回答by Saurabh Mishra
For those trying to achieve this in kafka v0.10.2.1 and running into issues with serialization error 'java.io.StreamCorruptedException: invalid stream header: 3139322E
' following is a sample working code with the needful imports.
对于那些试图在 kafka v0.10.2.1 中实现这一点并遇到序列化错误“ java.io.StreamCorruptedException: invalid stream header: 3139322E
”问题的人,以下是具有必要导入的示例工作代码。
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
public static void createTopic(String topicName, int numPartitions, int numReplication) {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
//Ref: https://gist.github.com/jjkoshy/3842975
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return ZKStringSerializer.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
}
});
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
RackAwareMode.Enforced$.MODULE$);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
回答by Mahesh Mogal
AdminUtils API is getting deprecated. There is new API AdminZkClient which we can use to manage topics in Kafka server.
AdminUtils API 已被弃用。我们可以使用新的 API AdminZkClient 来管理 Kafka 服务器中的主题。
String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();
adminZkClient.createTopic(topicName1,partitions,replication,
topicConfig,RackAwareMode.Disabled$.MODULE$);
You can refer this link for details: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/
您可以参考此链接了解详情:https: //www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/
回答by Neeleshkumar S
The process seems to be pretty much simplified in API 0.11.0+. Using that, it can be done as follows
这个过程似乎在 API 0.11.0+ 中被大大简化了。使用它,可以按如下方式完成
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));
AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)
List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);
adminClient.createTopics(newTopics);
adminClient.close();
The contents of kafka.properties
file are as follows
kafka.properties
文件内容如下
bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Note that the instance of the AdminClient must be closed in order to reflect the newly created topic.
请注意,必须关闭 AdminClient 的实例才能反映新创建的主题。