How do I fail a specific SQS message in a batch from a Lambda?
As per AWS documentation, SQS event source mapping now supports handling of partial failures out of the box. Gist of the linked article is as follows:
- Include
ReportBatchItemFailures
in yourEventSourceMapping
configuration - The response syntax in case of failures has to be modified to have:
{
"batchItemFailures": [
{ "itemIdentifier": "id2" },
{ "itemIdentifier": "id4" }
]
}
Where id2 and id4 the failed messageIds in a batch.
- Quoting the documentation as is:
Lambda treats a batch as a complete success if your function returns any of the following
- An empty
batchItemFailure
list- A null
batchItemFailure
list- An empty
EventResponse
- A null
EventResponse
Lambda treats a batch as a complete failure if your function returns any of the following:
- An invalid JSON response
- An empty string
itemIdentifier
- A null
itemIdentifier
- An
itemIdentifier
with a bad key name- An
itemIdentifier
value with a message ID that doesn't exist
SAM support is not yet available for the feature as per the documentation. But one of the AWS labs example points to its usage in SAM and it worked for me when tested
Yes you have to manually re-add the failed messages back to the queue.
What I suggest doing is setting up a fail count, so that if all messages failed you can simply return a failed status for all messages, otherwise if the fail count is < 10 then you can individually send back the failed messages to the queue.
You've to programmatically delete each message from after processing it successfully.
So you can have a flag set to true if anyone of the messages failed and depending upon it you can raise error after processing all the messages in a batch so successful messages will be deleted and other messages will be reprocessed based on retry policies.
So as per the below logic only failed and unprocessed messages will get retried.
import boto3
sqs = boto3.client("sqs")
def handler(event, context):
for message in event['records']:
queue_url = "form queue url recommended to set it as env variable"
message_body = message["body"]
print("do some processing :)")
message_receipt_handle = message["receiptHandle"]
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message_receipt_handle
)
there is also another way to save successfully processed message id into a variable and perform batch delete operation based on message id
response = client.delete_message_batch(
QueueUrl='string',
Entries=[
{
'Id': 'string',
'ReceiptHandle': 'string'
},
]
)