java 持续监听 AWS SQS 消息的模式

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47911875/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 09:51:45  来源:igfitidea点击:

Pattern to continuously listen to AWS SQS messages

javaamazon-web-servicessdkamazon-sqs

提问by Francisco Hanna

I have a simple class named QueueServicewith some methods that wrap the methods from the AWS SQS SDK for Java. For example:

我有一个QueueService用一些方法命名的简单类,这些方法包装了来自适用于 Java 的 AWS SQS 开发工具包的方法。例如:

public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
        List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();

        ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
        for(Message message : messages) {
            Hashtable<String, String> resultItem = new Hashtable<String, String>();
            resultItem.put("MessageId", message.getMessageId());
            resultItem.put("ReceiptHandle", message.getReceiptHandle());
            resultItem.put("Body", message.getBody());
            resultList.add(resultItem);
        }
        return resultList;
    }

I have another another class named Appthat has a mainand creates an instace of the QueueService.

我还有另一个名为的类App,它有一个main并创建了QueueService.

I looking for a "pattern" to make the mainin Appto listen for new messages in the queue. Right now I have a while(true)loop where I call the receiveMessagesmethod:

我在寻找一个“模式”,使mainApp监听队列中的新邮件。现在我有一个while(true)循环来调用该receiveMessages方法:

while(true) {
            messages = queueService.receiveMessages(queueURL); 
            for(Hashtable<String, String> message: messages) {
                String receiptHandle = message.get("ReceiptHandle");
                String messageBody = message.get("MessageBody");
                System.out.println(messageBody);
                queueService.deleteMessage(queueURL, receiptHandle);
            }
        }

Is this the correct way? Should I use the async message receive method in SQS SDK?

这是正确的方法吗?我应该在 SQS SDK 中使用异步消息接收方法吗?

回答by Amitkumar Gupta

To my knowledge, there is no way in Amazon SQS to support an active listener model where Amazon SQS would "push" messages to your listener, or would invoke your message listener when there are messages.

据我所知,Amazon SQS 无法支持主动侦听器模型,其中 Amazon SQS 会将消息“推送”到您的侦听器,或者在有消息时调用您的消息侦听器。

So, you would always have to poll for messages. There are two polling mechanisms supported for polling - Short Polling and Long Polling. Each has its own pros and cons, but Long Polling is the one you would typically end up using in most cases, although the default one is Short Polling. Long Polling mechanism is definitely more efficient in terms of network traffic, is more cost efficient (because Amazon charges you by the number of requests made), and is also the preferred mechanism when you want your messages to be processed in a time sensitive manner (~= process as soon as possible).

因此,您将始终必须轮询消息。轮询支持两种轮询机制 - 短轮询和长轮询。每个都有自己的优点和缺点,但在大多数情况下,长轮询是您通常最终使用的一种,尽管默认是短轮询。Long Polling 机制在网络流量方面肯定更高效,更具成本效益(因为亚马逊按请求的数量向您收费),并且当您希望以对时间敏感的方式处理您的消息时,它也是首选机制( ~= 尽快处理)。

There are more intricacies around Long Polling and Short Polling that are worth knowing, and its somewhat difficult to paraphrase all of that here, but if you like, you can read a lot more details about this through the following blog. It has a few code examples as well that should be helpful.

Long Polling 和 Short Polling 有更多值得了解的复杂之处,在这里解释所有这些有点困难,但如果您愿意,可以通过以下博客阅读更多详细信息。它还有一些代码示例,应该会有所帮助。

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

In terms of a while(true) loop, I would say it depends. If you are using Long Polling, and you can set the wait time to be (max) 20 seconds, that way you do not poll SQS more often than 20 seconds if there are no messages. If there are messages, you can decide whether to poll frequently (to process messages as soon as they arrive) or whether to always process them in time intervals (say every n seconds).

就 while(true) 循环而言,我会说这取决于。如果您使用的是长轮询,并且您可以将等待时间设置为(最大)20 秒,这样如果没有消息,您轮询 SQS 的频率不会超过 20 秒。如果有消息,您可以决定是频繁轮询(在消息到达时立即处理)还是始终按时间间隔(例如每 n 秒)处理它们。

Another point to note would be that you could read upto 10 messages in a single receiveMessages request, so that would also reduce the number of calls you make to SQS, thereby reducing costs. And as the above blog explains in details, you may request to read 10 messages, but it may not return you 10 even if there are that many messages in the queue.

另一点要注意的是,您可以在单个 receiveMessages 请求中读取多达 10 条消息,这样也可以减少您对 SQS 的调用次数,从而降低成本。而且正如上面的博客详细解释的那样,您可能会请求读取 10 条消息,但即使队列中有那么多消息,它也可能不会返回 10 条。

In general though, I would say you need to build appropriate hooks and exception handling to turn off the polling if you wish to at runtime, in case you are using a while(true) kind of a structure.

但总的来说,如果您希望在运行时关闭轮询,我会说您需要构建适当的钩子和异常处理,以防您使用 while(true) 类型的结构。

Another aspect to consider is whether you would like to poll SQS in your main application thread or you would like to spawn another thread. So another option could be to create a ScheduledThreadPoolExecutor with a single thread in the main to schedule a thread to poll the SQS periodically (every few seconds), and you may not need a while(true) structure.

另一个需要考虑的方面是您是想在主应用程序线程中轮询 SQS,还是想生成另一个线程。因此,另一种选择可能是在主线程中创建一个 ScheduledThreadPoolExecutor 以调度一个线程来定期(每隔几秒)轮询 SQS,并且您可能不需要 while(true) 结构。

回答by guest

There are a few things that you're missing:

你缺少一些东西:

  • Use the receiveMessages(ReceiveMessageRequest)and set a wait time to enable long polling.
  • Wrap your AWS calls in try/catch blocks. In particular, pay attention to OverLimitException, which can be thrown from receiveMessages()if you would have too many in-flight messages.
  • Wrap the entire body of the whileloop in its own try/catch block, logging any exceptions that are caught (there shouldn't be -- this is here to ensure that your application doesn't crash because AWS changed their API or you neglected to handle an expected exception).
  • 使用receiveMessages(ReceiveMessageRequest)并设置等待时间以启用长轮询。
  • 将您的 AWS 调用包装在 try/catch 块中。尤其要注意OverLimitException,receiveMessages()如果您有过多的动态消息,则可能会抛出。
  • 将整个while循环体包裹在它自己的 try/catch 块中,记录捕获的任何异常(不应该有——这是为了确保您的应用程序不会因为 AWS 更改了他们的 API 或您忽略了而崩溃)处理预期的异常)。

See docfor more information about long polling and possible exceptions.

有关长轮询和可能的异常的更多信息,请参阅文档

As for using the async client: do you have any particular reason to use it? If not, then don't: a single receiver thread is much easier to manage.

至于使用异步客户端:你有什么特别的理由使用它吗?如果没有,那就不要:单个接收器线程更容易管理。

回答by jpdave

If you want to use SQS and then lambda to process the request you can follow the steps given in the linkor you always use lambda instead of SQS and invoke lambda for every request.

如果您想使用 SQS 然后使用 lambda 来处理请求,您可以按照链接中给出的步骤进行操作,或者您始终使用 lambda 而不是 SQS 并为每个请求调用 lambda。