Handling Failed Messages in AWS Simple Queue Service (SQS)

AWS Simple Queue Service (SQS) is a managed message queuing service that enables a fully scaled and distributed environment for Serverless and Microservice applications. It helps applications manage message-oriented middleware without implementing it from scratch. With the SQS, applications can send, store and retrieve messages between various components without having huge programming overhead and fear of losing any messages in the flight.
I will not explain much about the SQS service but about a nice feature that comes with the SQS itself. We can use SQS with Lambda for processing messages. So in the Lambdas, SQS can invoke a lambda with an event if we bind that event to the Lambda. Lambda will then synchronously poll the queue and retrieve messages as batches for processing, If this batch is processed successfully, it deletes that batch of messages from the SQS queue and takes another batch for processing. This happens synchronously until all the messages are successfully processed, or the retry count is finished.
These batches can contain 1 to 10,000 messages for Standard queues and 1 to 10 for FIFO (First In, First Out) queues. The problem is that you have a batch size of 50, and the 49th message fails to process., Until the 49th message, it was successfully processed and hidden from the queue to avoid process from other Lambda invocations. But when it failed in the 49th message, all 50 messages are shows up again for processing even if 48 messages are successfully done. We had this issue in one of our applications, and it uploads documents to some other system. Due of this issue, it uploads the same document multiple times, and there are many duplicated documents in that system.
When we searched for a solution to this problem, we found a great concept called Partial Batch Processing, which allows us to inform SQS that failed messages only needs to reprocess or be placed in a DLQ. So other successful messages can be deleted from the queue.
Let’s look at how to implement this on your SQS queue processing Lambda. First, I am creating a small Lambda with two Functions. One will push some messages to the queue, and the other will consume them from the queue.
serverless.yml
file will look like this.
service: mediumsqs
frameworkVersion: '2'
provider:
name: aws
runtime: nodejs12.x
lambdaHashingVersion: 20201221
region: us-east-1
iamRoleStatements:
- Effect: 'Allow'
Resource: ['*']
Action: ['sqs:*']
environment:
QUEUE_URL: !Ref MediumSQSArticleQueue
functions:
messageProducer:
handler: handler.messageProducer
events:
- http:
path: mediumsqs/produce
method: get
messageConsumer:
handler: handler.messageConsumer
events:
- sqs:
arn:
Fn::GetAtt:
- MediumSQSArticleQueue
- Arn
batchSize: 50
maximumBatchingWindow: 10
functionResponseType: 'ReportBatchItemFailures' #Add this Response type to serverless.yml
resources:
Resources:
MediumSQSArticleQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "MediumSQSArticleQueue"
VisibilityTimeout: 200
If you see this serverless.yml
file. Under the SQS event, we have an attribute called functionResponseType
and the value for that need to be as ReportBatchItemFailures
for this implementation work.
I will write two functions as well like below.
messageProducer Function
module.exports.messageProducer = async (event) => {
for (var i = 0; i < 10; i++) {
await sqs.sendMessage({
MessageBody: 'success',
QueueUrl: sqsQueueUrl
}).promise();
}
for (var i = 0; i < 10; i++) {
await sqs.sendMessage({
MessageBody: 'failed',
QueueUrl: sqsQueueUrl
}).promise();
}
return {
statusCode: 200
};
};
In the messageProducer function, We have written two for loops, One will push 10 messages with a body of “success” to the SQS queue and other will push 10 messages with a body of “failed”. This is just a HTTP request and we can trigger this by running it as a GET request.
messageConsumer Function
module.exports.messageConsumer = async (event) => {
var sqsRecords = event.Records;
var batchItemFailures = [];
if (sqsRecords.length > 0) {
for await (const message of sqsRecords) {
if (message.body === 'failed')
batchItemFailures.push({ itemIdentifier: message.messageId })
if (message.body === 'success')
console.log("It is success")
}
}
return { batchItemFailures };
};
The messageConsumer function listens to the messages and checks each message’s body by a loop through every record in the event. If it shows up as “failed”, it will push the message-id of the particular message into the batchItemFailures
variable under the itemIdentifier
key. And it will return all the items after all the operations are done.
The returned response looks like this,
If you have done everything, use sls deploy
to deploy this function, get the producer function URL and run it on the browser. Then go to the SQS in AWS Console and select MediumSQSArticleQueue
, and you can see there are failed ten messages on the flight to queue to retry for the consumer function.
Here I have failed messages intentionally, but you can fail messages inside a try-catch block for the real-world use case.
Finally, few things to remember about the response of the Consumer function,
- If the function’s response is null or empty (Value of
batchItemFailures
), SQS will consider all the messages successful. - If the response JSON is invalid or empty or null or invalid, the message-id for
itemIdentifier
will be treated like all the batch is failed. So be cautious when creating the Response JSON.
That is all you need to know!
You can get the complete test project from the below URL