java 我需要在我的单元测试中模拟一个 RabbitMQ

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37113377/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 02:13:19  来源:igfitidea点击:

I need to mock a RabbitMQ in my unit Test

javajunitmockingrabbitmqamqp

提问by Miguel

I am using a RabbitMQ in my project.

我在我的项目中使用了 RabbitMQ。

I have in my consumer the code of the client part of rabbitMQ and the connection need a tls1.1 to connect with the real MQ.

我在我的消费者中有rabbitMQ 客户端部分的代码,并且连接需要一个tls1.1 才能与真正的MQ 连接。

I want to test this code in my JUnit test and to mock the message delivery to my consumer.

我想在我的 JUnit 测试中测试这段代码,并模拟向我的消费者发送的消息。

I see in google several examples with different tools how camel rabbit or activeMQ but this tools works with amqp 1.0 and rabbitMQ only works in amqp 0.9 .

我在谷歌中看到了几个使用不同工具的例子,骆驼兔或 activeMQ 是如何使用 amqp 1.0 的,而 rabbitMQ 仅适用于 amqp 0.9 。

Someone had this problem?

有人遇到过这个问题吗?

Thanks!

谢谢!

UPDATE

更新

This is the code to testing to receive a json from the queue.

这是测试从队列接收 json 的代码。

package com.foo.foo.queue;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;

import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class BrokerThreadHLConsumer extends Thread {

private static BrokerThreadHLConsumer instance;

private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);

private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;

private boolean loop;

private BrokerThreadHLConsumer() throws IOException {
    ConnectionFactory factory = new ConnectionFactory();
    char[] keyPassphrase = "clientrabbit".toCharArray();
    KeyStore keyStoreCacerts;
    ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
    String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
    String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
    try {
        /* Public key cacerts to connect to message queue*/
        keyStoreCacerts = KeyStore.getInstance("PKCS12");
        URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
        File filePublicKey = new File(resourcePublicKey.toURI());
        keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
        KeyManagerFactory keyManager;

        keyManager = KeyManagerFactory.getInstance("SunX509");
        keyManager.init(keyStoreCacerts, keyPassphrase);

        char[] trustPassphrase = "changeit".toCharArray();
        KeyStore tks;

        tks = KeyStore.getInstance("JCEKS");

        URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
        File fileCacerts = new File(resourceCacerts.toURI());

        tks.load(new FileInputStream(fileCacerts), trustPassphrase);

        TrustManagerFactory tmf;
        tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.1");
        c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);

        factory.setUri(rabbitHost);
        factory.useSslProtocol(c);
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, "fanout");
        queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    } catch (CertificateException e) {
        e.printStackTrace();
    } catch (KeyStoreException e) {
        e.printStackTrace();
    } catch (UnrecoverableKeyException e) {
        e.printStackTrace();
    } catch (KeyManagementException e1) {
        e1.printStackTrace();
    } catch (Exception e) {
        log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
        log.error(e.getStackTrace());
        e.printStackTrace();
    }
}

public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
    if (instance == null)
        instance = new BrokerThreadHLConsumer();
    return instance;
}

public void run() {
    if (PolicyFinder != null) {
        try {
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            log.info("Consumer broker started and waiting for messages");
            loop = true;
            while (loop) {
                try {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    JSONObject obj = new JSONObject(message);
                    log.info("Message received from broker " + obj);
                    if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
                        log.error("PolicySet error: error upgrading the policySet");
                    }
                } catch (Exception e) {
                    log.error("Receiving message error");
                    log.error(e);
                }
            }
        } catch (IOException e) {
            log.error("Consumer couldn't start");
            log.error(e.getStackTrace());
        }
    } else {
        log.error("Consumer couldn't start cause of PolicyFinder is null");
    }
}

public void close() {
    loop = false;
    try {
        consumer.getChannel().basicCancel(consumer.getConsumerTag());
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        channel.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
    this.PolicyFinder = PolicyFinder;
}
}

回答by Lo?c Le Doyen

As I understand it, there are two things trying to be tested in the question:

据我了解,有两件事试图在问题中进行测试:

  • TLS configuration to connect to RabbitMQ
  • basicPublish / basicConsume (what's called delivery) behavior regarding interactions with the rest of the application
  • 连接 RabbitMQ 的 TLS 配置
  • basicPublish / basicConsume(所谓的交付)关于与应用程序其余部分交互的行为

For the first one, as TLS itself is being tested, only connecting to a real instance of RabbitMQ with correct truststore configured will prove that configuration is working

对于第一个,因为正在测试 TLS 本身,只有连接到配置了正确信任库的真实 RabbitMQ 实例才能证明配置有效

For the second one however, for tests demonstrating features of the app (with tools like Cucumber for readability), you may try a library i'm working on: rabbitmq-mock(and that's why I'm digging up an old post)

然而,对于第二个,对于演示应用程序功能的测试(使用 Cucumber 等工具以提高可读性),您可以尝试我正在开发的库:rabbitmq-mock(这就是我挖掘旧帖子的原因)

Just include it as dependency:

只需将其包含为依赖项:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.14</version>
    <scope>test</scope>
</dependency>

And replace new ConnectionFactory()by new MockConnectionFactory()in your unit test.

并在您的单元测试中替换new ConnectionFactory()new MockConnectionFactory()

Samples are available in the project: https://github.com/fridujo/rabbitmq-mock/blob/master/src/test/java/com/github/fridujo/rabbitmq/mock/IntegrationTest.java

项目中提供了示例:https: //github.com/fridujo/rabbitmq-mock/blob/master/src/test/java/com/github/fridujo/rabbitmq/mock/IntegrationTest.java

回答by NoobEditor

So here is how i did it, some stuffs might be here and there in process of hiding the necessary class implementation details, but you would get a hint! :)

所以这就是我是怎么做的,在隐藏必要的类实现细节的过程中,一些东西可能会在这里和那里,但你会得到一个提示! :)

  • assumption for unit test:
    • RMQ is working fine and data send to it would be pushed in queue
    • Only thing to be tested is if the data generated is correct or not
    • and whether the call to RMQs send()is happening or not!
  • 单元测试的假设:
    • RMQ 工作正常,发送到它的数据将被推送到队列中
    • 唯一需要测试的是生成的数据是否正确
    • 以及是否正在调用 RMQ send()


 public class SomeClassTest {
        private Config config;
        private RmqConfig rmqConfig;
        private static final ObjectMapper mapper = new ObjectMapper();
        private JasperServerClient jasperServerClient;
    //    @Mock
        @InjectMocks
        private RabbitMQProducer rabbitMQProducer;
        private Connection phoenixConnection;
        private String targetNotificationMessage;
        SomeClass someClassObject;

        @Before
        public void setUp() {

            // Mock basic stuffs
            config = mock(Config.class);
            Connection = mock(Connection.class);
            rabbitMQProducer = mock(RabbitMQProducer.class); // Imp


            jasperServerClient = mock(JasperServerClient.class);

            rmqConfig = RmqConfig.builder()
                    .host("localhost")
                    .port(5672)
                    .userName("guest")
                    .password("guest")
                    .queueName("somequeue_name")
                    .prefetch(1)
                    .build();
            final String randomMessage = "This is a waste message";
            Message mockMsg = Message.forSending(randomMessage.getBytes(), null, rmqConfig.getQueueName(), rmqConfig.getQueueName(), "text/plain", "UTF-8", true); // prepare a mock message


            // Prepare service configs
            ConnectionConfig connectionConfig = RmqConfigUtil.getConfig(rmqConfig);
            ProducerConfig producerConfig = new ProducerConfigBuilder()
                    .exchange(rmqConfig.getQueueName())
                    .contentType("text/pain")
                    .contentEncoding("UTF-8")
                    .connection(connectionConfig).build();
            rabbitMQProducer.open(croducerConfig.asMap());

            // build the major stuff where the code resides
            someClassObject =  SomeClass.builder()
                    .phoenixConnection(phoenixConnection)
                    .userExchangeName(rmqConfig.getQueueName())
                    .userRabbitMQProducer(rabbitMQProducer)
                    .ftpConfig(config.getFtpConfig())
                    .jasperServerClient(jasperServerClient)
                    .objectMapper(new ObjectMapper())
                    .build();

            MockitoAnnotations.initMocks(this);
        }


        @Test
        public void testNotificationPub() throws Exception {

            // Prepare expected Values
            targetNotificationMessage = <<some message>>

            // Reflection -  my target functions were private
            Class cls = Class.forName("com.some.path.to.class");
            Object[] objForGetMessage = {<<stuffs>>, <<stuffs>>};

            Method getNotificationMessage = cls.getDeclaredMethod("private_fn_1", <<some class>>.class, <<some class>>.class);
            Method pubNotification = cls.getDeclaredMethod("private_fn_2", <<some class>>.class, RabbitMQProducer.class, String.class);

            getNotificationMessage.setAccessible(true);
            pubNotification.setAccessible(true);

            // Test Case #1
            final <<some class>> notificationMessage = (<<some class>>)getNotificationMessage.invoke(someClassObject, objForGetMessage);
            assertEquals(notificationMessage.getMessage(), targetNotificationMessage);

            // Test Case #2 -  this does RMQ call
            Object[] objPubMessage = {notificationMessage, rabbitMQProducer, rmqConfig.getQueueName()};
            final Object publishNotification = pubNotification.invoke(someClassObject, objPubMessage);
            assertEquals(publishNotificationResp, publishNotification); //viola


            //Important, since RabbitMQProducer is mocked, we need to checkup if function call is made to "send" function which send data to RMQ
            verify(rabbitMQProducer,times(1)).send(any());

        }


        @Test
        public void testMockCreation(){
            assertNotNull(rmqConfig);
            assertNotNull(config);
        }

回答by The Ancient

I know, it is an old question, still as there is no answer so far. What helped me a lot at the same question, is the following blog post: https://tamasgyorfi.net/2016/04/21/writing-integration-tests-for-rabbitmq-based-components/. It uses Apache QPID (not ActiveMQ as suggested in the OP) and it has support for AMQP 0.9.1.

我知道,这是一个老问题,到目前为止还没有答案。在同一问题上对我帮助很大的是以下博客文章:https: //tamasgyorfi.net/2016/04/21/writing-integration-tests-for-rabbitmq-based-components/。它使用 Apache QPID(不是 OP 中建议的 ActiveMQ),并且支持 AMQP 0.9.1。