Skip to content

Lambda Consumers

Kenton Vizdos edited this page Mar 12, 2025 · 1 revision

Running Consumers in AWS Lambda

Caution

READ CRITICAL SQS CONFIGURATION. There are specific configuration changes required to make this work!

TypeQueue makes it easy to run consumers within AWS Lambda, allowing you to process SQS messages with a serverless approach. Using the LambdaConsumer, you can handle messages in a type-safe way while ensuring that only failed messages are retried.

Critical SQS Configuration

When setting up your Lambda function with SQS as an event source (using Terraform or another IaC tool), it is critically important to configure the event source mapping with the following parameter:

function_response_types = ["ReportBatchItemFailures"]

This setting ensures that your Lambda function reports only the failed messages back to SQS for retry, rather than reprocessing the entire batch. A full Terraform example is available in the repository, but the key snippet is:

resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn = aws_sqs_queue.typequeue_lambda_demo.arn
  function_name    = aws_lambda_function.typequeue_lambda.arn
  enabled          = true
  batch_size       = 10

  function_response_types = ["ReportBatchItemFailures"] # SUPER IMPORTANT
}

Lambda Consumer Code Sample

Below is an example of a Lambda function written in Go that leverages the TypeQueue LambdaConsumer to process SQS messages:

package main

import (
	"context"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/kvizdos/typequeue/pkg/typequeue"
	"github.com/kvizdos/typequeue/pkg/typequeue_lambda"
	"github.com/sirupsen/logrus"
)

type TestEvent struct {
	typequeue.SQSAble
	Content string `json:"content"`
}

func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
	logger := logrus.New()
	logger.SetLevel(logrus.DebugLevel)
	consumer := typequeue_lambda.LambdaConsumer[*TestEvent]{
		Logger:    logger,
		SQSEvents: sqsEvent,
	}
	consumer.Consume(context.Background(), typequeue.ConsumerSQSOptions{}, func(record *TestEvent) error {
		logger.Info("Consuming Message", "content", record.Content)
		if record.Content == "FAIL ME" {
			return fmt.Errorf("Test Failure")
		}
		return nil
	})
	return consumer.GetBatchItemFailures(), nil // make sure you have this bit, it'll actually cause the retry!
}

func main() {
	lambda.Start(handler)
}

It's nearly identical to the base consumer, and fully interoperable. They have the same base interface, so code can be swapped incredibly easily if you want to start in "monolithic" mode, and then later to "lambda" mode.

How It Works

  • LambdaConsumer Setup: The LambdaConsumer is instantiated with your logger and the incoming SQS event (events.SQSEvent). This consumer processes messages in a type-safe manner.

  • Message Processing: The processing function logs the message content and returns an error if a message should be retried (for example, if record.Content equals "FAIL ME"). This error will mark the message for retry. In your own code, it is recommended to pull the processing function out of inline so it can be individually tested.

  • Reporting Failures: After processing, the function calls consumer.GetBatchItemFailures() to report only the failed messages back to SQS. This works in tandem with the Terraform configuration that sets function_response_types to "ReportBatchItemFailures".

References

This guide shows you how to configure your Lambda environment to run TypeQueue consumers effectively. Make sure your Terraform configuration includes the critical function_response_types setting to properly handle message failures.

Check out the Lambda example in cmd/lambda_consumer_demo! It has full terraform available.

Happy Coding!