Question:
I have been trying several approaches to retrieve all messages from the SQS queue by using AWS SDK for Java to no avail. I have read about the distributed nature of the AWS SQS and that messages are stored on the different servers. But what I do not understand is why this architecture is not hidden from the end user. What tricks do I have to apply in Java code to retrieve all messages and be 100% sure that no one was missed?
I tried this with the “Long Polling”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } System.out.println(); |
And this with Request Batching / Client-Side Buffering:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
// Create the basic Amazon SQS async client AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync); CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue"); CreateQueueResult res = bufferedSqs.createQueue(createRequest); SendMessageRequest request = new SendMessageRequest(); String body = "test message_" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); SendMessageResult sendResult = bufferedSqs.sendMessage(request); ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(10) .withQueueUrl(queueUrl); ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq); List for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } |
But I am still unable to retrieve all messages.
Any idea?
AWS Forum keeps silence on my post.
Answer:
When receiving messages from an SQS queue, you need to repeatedly call sqs:ReceiveMessage
.
On each call to sqs:ReceiveMessage
, you will get 0 or more messages from the queue which you’ll need to iterate through. For each message, you’ll also need to call sqs:DeleteMessage
to remove the message from the queue when you’re done processing each message.
Add a loop around your “Long Polling” sample above to receive all messages.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
for (;;) { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List for (Message message : messages) { System.out.println(" Message"); System.out.println(" MessageId: " + message.getMessageId()); System.out.println(" ReceiptHandle: " + message.getReceiptHandle()); System.out.println(" MD5OfBody: " + message.getMD5OfBody()); System.out.println(" Body: " + message.getBody()); for (Entry System.out.println(" Attribute"); System.out.println(" Name: " + entry.getKey()); System.out.println(" Value: " + entry.getValue()); } } System.out.println(); } |
Also note that you may receive the same message more than once. So allow your work to “reprocess” the same message, or detect a repeated message.