java 亚马逊 sqs 的异步消费者
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32222809/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Async consumer for Amazon sqs
提问by user5118993
I am new to working with queues. I am able to successfully post messages and receive them synchronously However, I am now trying to async now.
我是处理队列的新手。我能够成功发布消息并同步接收它们但是,我现在正在尝试异步。
The reference links provided by sqs suggests using jmsclient wrapper. And the link also mentions to use it if you already have a code that is integrated to a jms client.
sqs 提供的参考链接建议使用 jmsclient 包装器。如果您已经有一个集成到 jms 客户端的代码,该链接还提到使用它。
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/jmsclient.html#samples
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/jmsclient.html#samples
But I am starting afresh I referred this example to send and recv messages synchronously.
但是我重新开始我引用了这个例子来同步发送和接收消息。
Can I use the same code but implement it with a message listener? Any code examples will be appreciated.
我可以使用相同的代码但使用消息侦听器实现它吗?任何代码示例将不胜感激。
回答by rbarni
There's a code sample in the section about Using JMS with Amazon SQSof the Amazon SQS Developer Guide that shows how to asynchronously receive messages using JMS.
Amazon SQS 开发人员指南的关于将 JMS 与 Amazon SQS 一起使用的部分中有一个代码示例,它展示了如何使用 JMS 异步接收消息。
First you implement the MessageListener interface:
首先实现 MessageListener 接口:
class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
// Cast the received message as TextMessage and print the text to screen.
if (message != null) {
System.out.println("Received: " + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
And then you set it as the MessageListener for a MessageConsumer:
然后将其设置为 MessageConsumer 的 MessageListener:
// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);
// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());
// Start receiving incoming messages.
connection.start();
// Wait for 1 second. The listener onMessage() method will be invoked when a message is received.
Thread.sleep(1000);
回答by doer_uvc
You can use sqslistener
annotation from SpringCloud framework.
If you are developing application with Spring
and AWS
and you are not using Spring Cloud
, it is good time for you to switch.
您可以使用sqslistener
SpringCloud 框架中的注解。如果您正在使用Spring
和开发应用程序AWS
并且您没有使用Spring Cloud
,那么现在是您切换的好时机。
Here is a sample code to asynchronously receive message from SQS using sqslistener
annotation. A good thing is you have to almost zero configuration for using this :
这是使用sqslistener
注释从 SQS 异步接收消息的示例代码。一件好事是你必须几乎零配置才能使用它:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import com.example.my.RecoverableException;
@Component
@Slf4j
public class CustomMessageQueue {
@SqsListener(value = "${build_request_queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(String message) {
try {
// write message processing here
} catch (RecoverableException e) {
// handle errors here for which message from queue should not be deleted
// throwing an exception will make receive method fail and hence message does not get deleted
throw e;
} catch (Exception e) {
// suppress exceptions for which message should be deleted.
}
}
}
The great thing about sqslistener
annotation is its deletionPolicy
. So you can decide when a message from SQS gets deleted.
关于伟大的事情sqslistener
注释是它deletionPolicy
。因此,您可以决定何时删除来自 SQS 的消息。
回答by Strabek
Although this is old question I will add another solution I use in one of my Python apps. It might give you an idea how to do it in any language.
虽然这是个老问题,但我将添加另一个解决方案,我在我的一个 Python 应用程序中使用。它可能会让您了解如何使用任何语言进行操作。
I have kind of proxy function (listener) which gets triggered by SQS. In this function I have for loop that loops through messages and invokes another Lambda function by executing invoke method of lambda client. Each message 'knows' function name which should be invoked.
我有一种由 SQS 触发的代理功能(侦听器)。在这个函数中,我有 for 循环,它循环遍历消息并通过执行 lambda client 的invoke 方法来调用另一个 Lambda 函数。每条消息“知道”应该调用的函数名称。
I use this proxy function for all my other functions within the app and triggered by SQS.
我将此代理功能用于应用程序中的所有其他功能并由 SQS 触发。
This way I can utilise Lambda destinations functionality added recently.
这样我就可以利用最近添加的 Lambda 目标功能。
回答by sendon1982
I use aws sdk to connect to SQS
我使用 aws sdk 连接到 SQS
AWSCredentials credential = new BasicAWSCredentials(accessKey, secretKey);
AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(credential);
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.US_EAST_1).withCredentials(
awsCredentialsProvider).build();
ReceiveMessageRequest messageRequest = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(10)
.withMaxNumberOfMessages(10);;
ReceiveMessageResult queueResult = sqs.receiveMessage(messageRequest);
List<Message> messages = offerchangenotificationQue.getMessages();
for (Message message : messages) {
log.info("Receive message from queue {}", message.getBody());
}
回答by Oleg Dulin
SQS stands for "Simple Queue Service." It is meant to be simple, literally. As a result, it does not support some of the niceties of JMS, in particular asynchronous listeners.
SQS 代表“简单队列服务”。从字面上看,这意味着简单。因此,它不支持 JMS 的某些优点,尤其是异步侦听器。
I have written a blog post on this topic: http://thedulinreport.com/2015/05/09/guaranteeing-delivery-of-messages-with-aws-sqs/
我写了一篇关于这个主题的博客文章:http: //thedulinreport.com/2015/05/09/guaranteeing-delivery-of-messages-with-aws-sqs/
Basically, what you need to do is write a poller in an infinite loop, but you want to be smart about it -- you don't want to keep polling too much because you are charged per request.
基本上,您需要做的是在无限循环中编写一个轮询器,但您希望对此保持清醒——您不想继续进行过多轮询,因为您需要按请求付费。