Java 使用 Spring Embedded Kafka 测试 @KafkaListener

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/50123621/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-10 23:27:18  来源:igfitidea点击:

Testing a @KafkaListener using Spring Embedded Kafka

javaspring-bootapache-kafkaspring-kafkaspring-boot-test

提问by riccardo.cardin

I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.

我正在尝试为我正在使用 Spring Boot 2.x 开发的 Kafka 侦听器编写单元测试。作为单元测试,我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例。所以,我决定使用 Spring Embedded Kafka。

The definition of my listener is very basic.

我的听众的定义是非常基本的。

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

Also the test, that verifies the latchcounter to be equal to zero after receiving a message, is very easy.

此外,latch在收到消息后验证计数器是否为零的测试也非常简单。

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbeddedto test a method marked with the annotation @KafkaListener?

不幸的是,测试失败了,我不明白为什么。是否可以使用 的实例KafkaEmbedded来测试标有注解的方法@KafkaListener

All the code is shared in my GitHub repository kafka-listener.

所有代码都在我的 GitHub 存储库kafka-listener 中共享。

Thanks to all.

谢谢大家。

采纳答案by Gary Russell

You are probably sending the message before the consumer has been assigned the topic/partition. Set property...

您可能在消费者被分配主题/分区之前发送消息。设置属性...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it defaults to latest.

...它默认为latest.

This is like using --from-beginningwith the console consumer.

这就像--from-beginning与控制台使用者一起使用。

EDIT

编辑

Oh; you're not using boot's properties.

哦; 你没有使用引导的属性。

Add

添加

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

编辑2

BTW, you should probably also do a get(10L, TimeUnit.SECONDS)on the result of the template.send()(a Future<>) to assert that the send was successful.

顺便说一句,您可能还应该get(10L, TimeUnit.SECONDS)template.send()(a Future<>)的结果执行 a以断言发送成功。

EDIT3

编辑3

To override the offset reset just for the test, you can do the same as what you did for the broker addresses:

要覆盖仅用于测试的偏移量重置,您可以执行与对代理地址所做的相同的操作:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

and

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.

但是,请记住,此属性仅适用于组第一次消费时。要在每次应用程序启动时始终从最后开始,您必须在启动期间寻找到最后。

Also, I would recommend setting enable.auto.committo falseso that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.

此外,我建议设置enable.auto.commit为,false以便容器负责提交偏移量,而不是仅仅依靠消费者客户端按时间表执行。