java 连接到 Apache Kafka 多节点集群中的 Zookeeper
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38583608/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Connecting to Zookeeper in a Apache Kafka Multi Node cluster
提问by amateur
I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?
我按照以下说明设置多节点 kafka 集群。现在,如何连接到zookeeper?可以在 JAVA 中从生产者/消费者端连接到一个动物园管理员还是有办法连接所有动物园管理员节点?
Setting a multi node Apache ZooKeeper cluster
设置多节点 Apache ZooKeeper 集群
On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties
在集群的每个节点上,将以下行添加到文件 kafka/config/zookeeper.properties
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1' for zNode01, ‘2' for ZNode02, etc… )
在集群的每个节点上,在 dataDir 属性表示的文件夹中创建一个名为 myid 的文件(默认文件夹为 /tmp/zookeeper )。myid 文件应仅包含 znode 的 id(zNode01 为 '1',ZNode02 为 '2' 等...)
Setting a multi broker Apache Kafka cluster
设置多代理 Apache Kafka 集群
On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:
在集群的每个节点上修改 kafka/config/server.properties 文件中的 zookeeper.connect 属性:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x
在集群的每个节点上修改文件 kafka/config/server.properties 中的属性 host.name:host.name=zNode0x
On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)
在集群的每个节点上修改文件 kafka/config/server.properties 中的属性 broker.id(集群中的每个代理都应该有一个唯一的 id)
回答by Shettyh
You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition
您可以传递生产者或消费者中的所有节点。Kafka 足够智能,它会根据复制因子或分区连接到具有您所需数据的节点
Here is the consumer code :
这是消费者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
You can find more info here
你可以在这里找到更多信息
Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name, this will help in load balencing also.
注意:这个方法的问题是它会打开多个连接来找出哪个节点保存数据。对于更健壮和可扩展的系统,您可以维护分区号和节点名称的映射,这也有助于负载平衡。
Here is the producer sample
这是生产者样本
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
more info here
更多信息在这里
回答by Kamal Chandraprakash
No need to pass Zookeeper connection properties in the Kafka clients (Producer & Consumer).
无需在 Kafka 客户端(生产者和消费者)中传递 Zookeeper 连接属性。
From Kafka-v9 and above, Kafka Producer and Consumer doesn't communicate with the Zookeeper.
从 Kafka-v9 及以上版本开始,Kafka Producer 和 Consumer 不与 Zookeeper 通信。