Question:
I’ve been working on a small Spring Boot application that receives messages from Amazon SQS. However I foresee that processing these messages may fail, so that’s why I thought adding a dead letter queue would be a good idea.
There is a problem though: when the processing fails (which I force by throwing an Exception for some of the messages) it is not reattempted later on and it’s not moved to the dead letter queue. I am struggling to find the issue, since there doesn’t seem to much info on it.
However if I look at Amazon’s documentation, they seem to be able to do it, but without using the Spring Boot annotations. Is there any way I can make the code below work transactional without writing too much of the JMS code myself?
This is the current configuration that I am using.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
@Configuration public class AWSConfiguration { @Value("${aws.sqs.endpoint}") private String endpoint; @Value("${aws.iam.key}") private String iamKey; @Value("${aws.iam.secret}") private String iamSecret; @Value("${aws.sqs.queue}") private String queue; @Bean public JmsTemplate createJMSTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(getSQSConnectionFactory()); jmsTemplate.setDefaultDestinationName(queue); jmsTemplate.setDeliveryPersistent(true); jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT); return jmsTemplate; } @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(getSQSConnectionFactory()); factory.setConcurrency("1-1"); return factory; } @Bean public JmsTransactionManager jmsTransactionManager() { return new JmsTransactionManager(getSQSConnectionFactory()); } @Bean public ConnectionFactory getSQSConnectionFactory() { return SQSConnectionFactory.builder() .withAWSCredentialsProvider(awsCredentialsProvider) .withEndpoint(endpoint) .withNumberOfMessagesToPrefetch(10).build(); } private final AWSCredentialsProvider awsCredentialsProvider = new AWSCredentialsProvider() { @Override public AWSCredentials getCredentials() { return new BasicAWSCredentials(iamKey, iamSecret); } @Override public void refresh() { } }; } |
And finally the receiving end:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Service public class QueueReceiver { private static final String EXPERIMENTAL_QUEUE = "${aws.sqs.queue}"; @JmsListener(destination = EXPERIMENTAL_QUEUE) public void receiveSegment(String jsonSegment) throws IOException { Segment segment = Segment.fromJSON(jsonSegment); if(segment.shouldFail()) { throw new IOException("This segment is expected to fail"); } System.out.println(segment.getText()); } } |
Answer:
Spring Cloud AWS
You can greatly simplify your configuration by leveraging Spring Cloud AWS.
MessageHandler
1 2 3 4 5 6 7 8 9 10 11 12 |
@Service public class MessageHandler { @SqsListener(value = "test-queue", deletionPolicy = SqsMessageDeletionPolicy.NEVER) public void queueListener(String msg, Acknowledgment acknowledgment){ System.out.println("message: " + msg); if(/*successful*/){ acknowledgment.acknowledge(); } } } |
The example shown above is all you need to receive messages. This assumes you’ve created an sqs queue with an associated dead letter queue. If you’re messages aren’t acknowledged, then they will be retried again until they reach the maximum # of receives. Then it will be forwarded to the dead letter queue.