java Kafka - 生产者致谢
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44992566/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Kafka - Producer Acknowledgement
提问by Punter Vicky
I saw in a video tutorial that Kafka Broker supports 3 types of acknowledgement when producer posts a message.
我在视频教程中看到,Kafka Broker 在生产者发布消息时支持 3 种类型的确认。
0 - Fire and Forget 1 - Leader Ack 2 - Ack of all the brokers
0 - Fire and Forget 1 - Leader Ack 2 - Ack 所有的broker
I am using Kafka's Java API to post message. Is this something that has to be set for each broker using server.properties specific to each broker or is it something that has to be set by producer? If it has to be set by the producer , please explain how it can be set using Java API.
我正在使用 Kafka 的 Java API 来发布消息。这是必须使用特定于每个代理的 server.properties 为每个代理设置的内容还是必须由生产者设置的内容?如果必须由生产者设置,请说明如何使用 Java API 进行设置。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
try{
for(int i=0;i<150;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
}
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
采纳答案by vahid
It's a producer property and is set similar to other properties you have in your code:
它是一个生产者属性,设置类似于您在代码中拥有的其他属性:
properties.put("acks","all");
The list of all configurable producer properties can be found here.
可以在此处找到所有可配置生产者属性的列表。
You might want to also look at the broker (or topic) propertymin.insync.replicas
that is related to this producer config.
您可能还想查看与此生产者配置相关的代理(或主题)属性min.insync.replicas
。