Handling Failed Messages in AWS Simple Queue Service (SQS)

AWS Simple Queue Service (SQS) is a managed message queuing service that enables a fully scaled and distributed environment for Serverless and Microservice applications. It helps applications manage message-oriented middleware without implementing it from scratch. With the SQS, applications can send, store and retrieve messages between various components without having huge programming overhead and fear of losing any messages in the flight.

I will not explain much about the SQS service but about a nice feature that comes with the SQS itself. We can use SQS with Lambda for processing messages. So in the Lambdas, SQS can invoke a lambda with an event if we bind that event to the Lambda. Lambda will then synchronously poll the queue and retrieve messages as batches for processing, If this batch is processed successfully, it deletes that batch of messages from the SQS queue and takes another batch for processing. This happens synchronously until all the messages are successfully processed, or the retry count is finished.

These batches can contain 1 to 10,000 messages for Standard queues and 1 to 10 for FIFO (First In, First Out) queues. The problem is that you have a batch size of 50, and the 49th message fails to process., Until the 49th message, it was successfully processed and hidden from the queue to avoid process from other Lambda invocations. But when it failed in the 49th message, all 50 messages are shows up again for processing even if 48 messages are successfully done. We had this issue in one of our applications, and it uploads documents to some other system. Due of this issue, it uploads the same document multiple times, and there are many duplicated documents in that system.


0  u2gntlq33ohgnlb


When we searched for a solution to this problem, we found a great concept called Partial Batch Processing, which allows us to inform SQS that failed messages only needs to reprocess or be placed in a DLQ. So other successful messages can be deleted from the queue.

Let’s look at how to implement this on your SQS queue processing Lambda. First, I am creating a small Lambda with two Functions. One will push some messages to the queue, and the other will consume them from the queue.

serverless.yml file will look like this.


service: mediumsqs
frameworkVersion: '2'

provider:
  name: aws
  runtime: nodejs12.x
  lambdaHashingVersion: 20201221
  region: us-east-1
  iamRoleStatements:
    - Effect: 'Allow'
      Resource: ['*']
      Action: ['sqs:*']
  environment: 
    QUEUE_URL: !Ref MediumSQSArticleQueue
  
functions:
  messageProducer:
    handler: handler.messageProducer
    events:
      - http:
          path: mediumsqs/produce
          method: get
  
  messageConsumer:
    handler: handler.messageConsumer
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - MediumSQSArticleQueue
              - Arn
          batchSize: 50
          maximumBatchingWindow: 10
          functionResponseType: 'ReportBatchItemFailures' #Add this Response type to serverless.yml

resources:
  Resources:
    MediumSQSArticleQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "MediumSQSArticleQueue"
        VisibilityTimeout: 200

If you see this serverless.yml file. Under the SQS event, we have an attribute called functionResponseType and the value for that need to be as ReportBatchItemFailures for this implementation work.

I will write two functions as well like below.


messageProducer Function

module.exports.messageProducer = async (event) => {

  for (var i = 0; i < 10; i++) {
    await sqs.sendMessage({
      MessageBody: 'success',
      QueueUrl: sqsQueueUrl
    }).promise();
  }

  for (var i = 0; i < 10; i++) {
    await sqs.sendMessage({
      MessageBody: 'failed',
      QueueUrl: sqsQueueUrl
    }).promise();
  }

  return {
    statusCode: 200
  };
};

In the messageProducer function, We have written two for loops, One will push 10 messages with a body of “success” to the SQS queue and other will push 10 messages with a body of “failed”. This is just a HTTP request and we can trigger this by running it as a GET request.


messageConsumer Function

module.exports.messageConsumer = async (event) => {

  var sqsRecords = event.Records;
  var batchItemFailures = [];

  if (sqsRecords.length > 0) {
    for await (const message of sqsRecords) {
      if (message.body === 'failed')
        batchItemFailures.push({ itemIdentifier: message.messageId })

      if (message.body === 'success')
        console.log("It is success")
    }
  }
  return { batchItemFailures };
};

The messageConsumer function listens to the messages and checks each message’s body by a loop through every record in the event. If it shows up as “failed”, it will push the message-id of the particular message into the batchItemFailures variable under the itemIdentifier key. And it will return all the items after all the operations are done.

The returned response looks like this,


1 9tecfhahzu2tbtgm51oxkw


If you have done everything, use sls deploy to deploy this function, get the producer function URL and run it on the browser. Then go to the SQS in AWS Console and select MediumSQSArticleQueue, and you can see there are failed ten messages on the flight to queue to retry for the consumer function.


1 s9fwkmfnljx mx6k ic9jg


Here I have failed messages intentionally, but you can fail messages inside a try-catch block for the real-world use case.

Finally, few things to remember about the response of the Consumer function,

  • If the function’s response is null or empty (Value of batchItemFailures), SQS will consider all the messages successful.
  • If the response JSON is invalid or empty or null or invalid, the message-id for itemIdentifier will be treated like all the batch is failed. So be cautious when creating the Response JSON.

That is all you need to know!


0 umwnhri dcuc9tc1


You can get the complete test project from the below URL

https://github.com/sandunisuru/sqsmediumarticle