Kafka Producer 的 java 客户端示例,发送方法不接受 KeyedMessage
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29208164/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
java client example for Kafka Producer, send method not accepting KeyedMessage
提问by Shades88
I am running kafka 2.9.1-0.8.2.1. I included jars provided in libs/ directory within main kafka directory. Now I am trying to run a java producer example as per what is given here https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example. Now producer.send
method seems to be accepting this kind of argument Seq<KeyedMessage<String, String>>
. In the example, object of KeyedMessage is not converted into anything. When I try to do the same I get incompatible types compiler error.
我正在运行 kafka 2.9.1-0.8.2.1。我将 libs/ 目录中提供的 jars 包含在主 kafka 目录中。现在,我正在尝试按照此处提供的内容运行 java 生产者示例https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example。现在producer.send
方法似乎正在接受这种说法Seq<KeyedMessage<String, String>>
。在这个例子中,KeyedMessage 的对象没有被转换成任何东西。当我尝试做同样的事情时,我得到了不兼容的类型编译器错误。
Here's the code
这是代码
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.producer.Producer;
import scala.collection.Seq;
public class KakfaProducer {
public static void main(String [] args) {
Properties prop = new Properties();
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class","kafka.serializer.StringEncoder");
//prop.put("partitioner.class", "example.producer.SimplePartitioner");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String,String> producer = new <String,String>Producer(producerConfig);
String topic = "test";
KeyedMessage<String,String> message = new <String,String>KeyedMessage(topic, "Hello Test message");
producer.send(message);
producer.close();
}
}
And that commented code is giving me class def not found exception. I tried to look a lot on net, but it's not helping.
并且注释的代码给了我 class def not found 异常。我试图在网上查看很多,但没有帮助。
There are two kinds of jars in that libs/ directory. One is kafka-client and other one is just kafka and version number. Am I including wrong jar? Which one do I need to work with?
该 libs/ 目录中有两种 jar。一个是 kafka-client,另一个只是 kafka 和版本号。我包括错误的罐子吗?我需要与哪一位合作?
采纳答案by suranjan
For the first problem, instead of importing scala API, import Java one. So, instead of using:
对于第一个问题,不是导入 scala API,而是导入 Java 一个。所以,而不是使用:
import kafka.producer.Producer;
please use:
请用:
import kafka.javaapi.producer.Producer;
SimplePartitioner code can be found below. Add it to the corresponding directory:
SimplePartitioner 代码可以在下面找到。添加到对应目录:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
}
return partition;
}
}
回答by yzmyyff
There are 2 ways to run producer in Java.
有两种方法可以在 Java 中运行生产者。
1) Using core Kafka. Its your method. 2) Using Kafka-client.
1)使用核心Kafka。它是你的方法。2)使用Kafka客户端。
The difference between these are mentioned in Kafka 0.8.2 Documentation.
Kafka 0.8.2 文档中提到了它们之间的区别。
These new clients are meant to supplant the existing Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server.
这些新客户端旨在取代现有的 Scala 客户端,但为了兼容性,它们将共存一段时间。这些客户端在一个单独的 jar 中可用,依赖最少,而旧的 Scala 客户端仍然与服务器打包在一起。
It means the new client is smaller and may replace the original method.
这意味着新客户端更小,可能会取代原来的方法。
In Section 3.4. New Producer provider some different configs.
在第 3.4 节中。新的生产者提供者一些不同的配置。
We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality.
我们正在努力替代我们现有的生产商。该代码现在可以在主干中使用,可以认为是测试版质量。