使用 Kafka java producer 发送消息:Producer connection to localhost:9092 unsuccessful

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/17808988/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-01 15:04:49  来源:igfitidea点击:

Using Kafka java producer send a message: Producer connection to localhost:9092 unsuccessful

javaapache-kafka

提问by Andy Ai

I want to send a message to Kafka server. the broker list is not include localhost. But when produce invoke the send method, it's has exception: Producer connection to localhost:9092 unsuccessful

我想向 Kafka 服务器发送消息。代理列表不包括本地主机。但是当生产调用send方法时,它有异常:Producer connection to localhost:9092 unsuccessful

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

// create a producer

Properties props = new Properties();

props.put("metadata.broker.list", "192.168.1.203:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

producer = new Producer<String, String>(config);

//sending...
String topic = "data_collect_events";
String message = "_Message_1";
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message);
producer.send(keyedMessage);

exception:

例外:

ERROR [main] 2013-07-23 19:27:10,580 kafka.utils.Logging$class: Producer connection to localhost:9092 unsuccessful
java.net.ConnectException: Connection refused: connect
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:364)
    at sun.nio.ch.Net.connect(Net.java:356)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send.apply$mcV$sp(SyncProducer.scala:101)
    at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:101)
    at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:101)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:107)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:101)
    at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:80)
    at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at scala.collection.mutable.HashTable$$anon.foreach(HashTable.scala:161)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
    at enter code herekafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
    at kafka.producer.Producer.send(Producer.scala:74)
    at kafka.javaapi.producer.Producer.send(Producer.scala:32)

回答by Andy Ai

In kafka 0.8 the broker list is only used for retrieving metadata. The producer then connects to the broker using the returned metadata info. The hostname in the metadata depends on the OS setting for the host (see https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F).

在 kafka 0.8 中,代理列表仅用于检索元数据。然后生产者使用返回的元数据信息连接到代理。元数据中的主机名取决于主机的操作系统设置(请参阅 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F)。

You can set host.name in server.properties. like this

您可以在 server.properties 中设置 host.name。像这样

host.name=192.168.1.203

回答by om-nom-nom

Most likely you're using Kafka 0.7 with config keys for Kafka 0.8, so

很可能您正在使用 Kafka 0.7 和 Kafka 0.8 的配置键,所以

props.put("metadata.broker.list", "192.168.1.203:9092");

is simply ignored and producer defaults to localhost:9092when no broker specified.
You have to usebroker.listrather than meta.broker.list.

当没有指定代理时,简单地忽略并且生产者默认为localhost:9092
您必须使用broker.list而不是meta.broker.list.

回答by learner

I had a similar issue where in the Kafka producer located outside EC2 was not able to resolve internal EC2 IP as expected by a Zookeeper.

我有一个类似的问题,即位于 EC2 外部的 Kafka 生产者无法按照 Zookeeper 的预期解析内部 EC2 IP。

I edited the /etc/hostsfile to add an entry of: public-ip& internal-ec2-ipfor the producer to talk to Kafka broker.

我编辑了该/etc/hosts文件以添加以下条目: public-ip&internal-ec2-ip以便生产者与 Kafka 经纪人交谈。