java 学习卡夫卡 0.8.2

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28776183/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 14:10:39  来源:igfitidea点击:

Learning Kafka 0.8.2

javaapache-kafka

提问by Bruno

Having a heck of a time finding some simple getting started samples with the new release. Such things as KafkaProducer differ from the Producer examples and much of the older code on the web doesn't seem to compile the same.

花了很长时间在新版本中找到一些简单的入门示例。KafkaProducer 之类的东西与 Producer 示例不同,并且网络上的许多旧代码似乎编译不一样。

Any guidance? The Apache Kafka site has zero examples of producers in Java.

任何指导?Apache Kafka 站点有零个 Java 生产者示例。

Please advise.

请指教。

回答by Minh-Triet Lê

In the example below, I create a producer using String as key and byte[] as message content.

在下面的示例中,我使用 String 作为键和 byte[] 作为消息内容创建了一个生产者。

Create a new producer using the essential parameters :

使用基本参数创建一个新的生产者:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:123,server2:456");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);

Synchronously send a message :

同步发送消息:

producer.send(new ProducerRecord<>(topic, msgKey, msgContent)).get();

Asynchronously send a message :

异步发送消息:

producer.send(new ProducerRecord<>(topic, msgKey, msgContent));

Your maven dependencies is good for consumer and producer. If you need only the producer, you can use :

您的 maven 依赖项对消费者和生产者都有好处。如果您只需要生产者,您可以使用:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

Be warned that the new Consumer APIis available but not usable for now. In the code source, the new API will return null or throw exception.

请注意,新的消费者 API可用,但暂时无法使用。在代码源中,新的 API 会返回 null 或抛出异常。

回答by halafi

回答by Kyr

It is always a good idea to check how the original authors are testing their code so you can get a feeling of what they are trying to achieve or the desired use (if and when there are tests provided :)

检查原始作者如何测试他们的代码总是一个好主意,这样您就可以了解他们正在尝试实现的目标或所需的用途(如果以及何时提供测试:)

In this case, just check this code: https://github.com/apache/kafka/blob/0.8.2/examples/src/main/java/kafka/examples/Producer.java

在这种情况下,只需检查此代码:https: //github.com/apache/kafka/blob/0.8.2/examples/src/main/java/kafka/examples/Producer.java

:)

:)

回答by Bruno

I had to regress because of the lack of good examples.

由于缺乏好的例子,我不得不退步。

Here is part of my pom.xml

这是我的 pom.xml 的一部分

    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.0</version>
            <scope>compile</scope>
    </dependency>

Here is my code -pending some testing.

这是我的代码 - 等待一些测试。

    // KafkaProducer.java - A first pass to verify that we can bring in the appropriate
    //                   libraries using Maven

    // Supports unit tests
    package com.bruno;

    import org.junit.Test;
    import static org.junit.Assert.*;



    import java.util.Arrays;
    import java.util.Properties;

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;



    public class MyKafkaProducer
    {

        public static void main(String[] args) throws Exception {
            Properties props = new Properties();

            props.put("metadata.broker.list", "192.168.1.203:9092");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            ProducerConfig config = new ProducerConfig(props);

            Producer p = new Producer<String, String>(config);

            //sending...
            String topic = "test";
            String message = "Hello Kafka";
            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message);
            p.send(keyedMessage);

        }

    }