Java 使用 Spring Embedded Kafka 测试 @KafkaListener
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/50123621/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Testing a @KafkaListener using Spring Embedded Kafka
提问by riccardo.cardin
I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.
我正在尝试为我正在使用 Spring Boot 2.x 开发的 Kafka 侦听器编写单元测试。作为单元测试,我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例。所以,我决定使用 Spring Embedded Kafka。
The definition of my listener is very basic.
我的听众的定义是非常基本的。
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
Also the test, that verifies the latch
counter to be equal to zero after receiving a message, is very easy.
此外,latch
在收到消息后验证计数器是否为零的测试也非常简单。
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded
to test a method marked with the annotation @KafkaListener
?
不幸的是,测试失败了,我不明白为什么。是否可以使用 的实例KafkaEmbedded
来测试标有注解的方法@KafkaListener
?
All the code is shared in my GitHub repository kafka-listener.
所有代码都在我的 GitHub 存储库kafka-listener 中共享。
Thanks to all.
谢谢大家。
采纳答案by Gary Russell
You are probably sending the message before the consumer has been assigned the topic/partition. Set property...
您可能在消费者被分配主题/分区之前发送消息。设置属性...
spring:
kafka:
consumer:
auto-offset-reset: earliest
...it defaults to latest
.
...它默认为latest
.
This is like using --from-beginning
with the console consumer.
这就像--from-beginning
与控制台使用者一起使用。
EDIT
编辑
Oh; you're not using boot's properties.
哦; 你没有使用引导的属性。
Add
添加
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
编辑2
BTW, you should probably also do a get(10L, TimeUnit.SECONDS)
on the result of the template.send()
(a Future<>
) to assert that the send was successful.
顺便说一句,您可能还应该get(10L, TimeUnit.SECONDS)
对template.send()
(a Future<>
)的结果执行 a以断言发送成功。
EDIT3
编辑3
To override the offset reset just for the test, you can do the same as what you did for the broker addresses:
要覆盖仅用于测试的偏移量重置,您可以执行与对代理地址所做的相同的操作:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
and
和
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.
但是,请记住,此属性仅适用于组第一次消费时。要在每次应用程序启动时始终从最后开始,您必须在启动期间寻找到最后。
Also, I would recommend setting enable.auto.commit
to false
so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.
此外,我建议设置enable.auto.commit
为,false
以便容器负责提交偏移量,而不是仅仅依靠消费者客户端按时间表执行。