AWS EventBridge Pipes: Point-to-Point Event Integration Without the Glue Code

Bits Lovers
Written by Bits Lovers on
AWS EventBridge Pipes: Point-to-Point Event Integration Without the Glue Code

Before EventBridge Pipes launched in December 2022, connecting an SQS queue to a Step Functions state machine meant writing a Lambda function that polled the queue, parsed the payload, and invoked the state machine. The Lambda existed purely as glue — no business logic, just format transformation and API calls. EventBridge Pipes replaces that pattern with a managed service: you specify the source, optionally define a filter and an enrichment step, and point it at a target. AWS handles the polling, batching, error handling, and retry logic.

That’s the core pitch. The implementation has enough nuance to get wrong, and the pricing model has a trap worth knowing about before you commit.

What Pipes Actually Does

A pipe is a directed connection between one source and one target, with optional filtering and enrichment in the middle. That’s the entire model. One source, one target. For fan-out — one event going to many consumers — you use an EventBridge event bus with rules attached. Pipes are for point-to-point.

The four stages in order:

Source — the buffered queue or stream the pipe polls. Pipes works only with pull-based sources: SQS queues, Kinesis data streams, DynamoDB Streams, MSK topics, self-managed Kafka clusters, and Amazon MQ brokers (ActiveMQ and RabbitMQ). What ties them together is that they buffer records until something comes to fetch them — Pipes plays that role. Push-based sources that emit events directly don’t fit here; use EventBridge event buses with rules for those.

Filter — an optional event pattern that determines which records from the source actually proceed through the pipe. If a record doesn’t match the filter pattern, it gets discarded — and you don’t pay for it. This is the most important cost control mechanism in the service.

Enrichment — an optional call to an external endpoint to augment the event before it reaches the target. Supported enrichment types: Lambda functions, Step Functions Express Workflows, API Gateway endpoints, and EventBridge API Destinations (for external APIs). Enrichment responses are capped at 6 MB. The enriched payload replaces the original event before it hits the target.

Target — where the processed event goes. The target list is wider than the source list: Lambda, Step Functions (both Standard and Express), EventBridge event buses, API Gateway, API Destinations, Kinesis, SQS, SNS, CloudWatch Logs, Kinesis Data Firehose, ECS tasks, Batch jobs, and SageMaker pipelines.

Pricing: Only Pay for What Passes the Filter

EventBridge Pipes charges $0.40 per million requests. The critical detail: this counts requests after the filter step, not total source records. If your SQS queue receives 10 million messages per day but your filter passes only 2%, you pay for 200,000 requests, not 10 million.

A “request” is 64 KB. A 130 KB payload counts as two requests. For most application-level events (JSON payloads under 64 KB), one event equals one request.

Compare this to the alternative: a Lambda function polling an SQS queue via Event Source Mapping. Lambda charges $0.20 per million requests plus duration ($0.0000166667 per GB-second). At 128 MB memory with 100 ms average duration, a Lambda processing 10 million events costs about $2 in requests plus $21 in duration — $23 total. A pipe processing those same 10 million events costs $4. If your Lambda is just routing — no real business logic — Pipes is meaningfully cheaper and removes a function to maintain.

The math changes when you need enrichment via Lambda. A pipe that filters, then calls Lambda for enrichment, then sends to a target costs $0.40/million (pipe) plus Lambda duration. At that point you’re paying for both, and whether it’s cheaper than ESM depends on your specific workload. Run the numbers for your actual volume.

Setting Up a Pipe: SQS to Step Functions

The most common production use case: an SQS queue receives order events; a Step Functions workflow processes each order. Without Pipes, you write a Lambda that polls the queue and starts executions. With Pipes:

# Create the pipe via CLI
aws pipes create-pipe \
  --name "order-processor" \
  --role-arn arn:aws:iam::123456789012:role/EventBridgePipesRole \
  --source arn:aws:sqs:us-east-1:123456789012:order-events \
  --source-parameters '{
    "SqsQueueParameters": {
      "BatchSize": 10,
      "MaximumBatchingWindowInSeconds": 5
    }
  }' \
  --filter-criteria '{
    "Filters": [{
      "Pattern": "{\"body\":{\"status\":[\"PENDING\"]}}"
    }]
  }' \
  --target arn:aws:states:us-east-1:123456789012:stateMachine:OrderWorkflow \
  --target-parameters '{
    "StepFunctionStateMachineParameters": {
      "InvocationType": "FIRE_AND_FORGET"
    }
  }'

This pipe polls the SQS queue in batches of up to 10 messages, passes only messages where body.status == "PENDING", and starts a Step Functions execution for each one. No Lambda required.

The IAM role for the pipe needs permissions to read from SQS (sqs:ReceiveMessage, sqs:DeleteMessage, sqs:GetQueueAttributes) and to start executions on the state machine (states:StartExecution). If you add enrichment via Lambda, the role also needs lambda:InvokeFunction.

Filtering in Depth

Pipe filter patterns use the same syntax as EventBridge event bus rules. You match on message content, not metadata. For SQS, the filter operates on the message body (parsed as JSON). For DynamoDB Streams, you filter on the stream record structure — including eventName (INSERT, MODIFY, REMOVE), the new image, and the old image.

Useful DynamoDB Streams filter: only process INSERT events.

{
  "eventName": ["INSERT"]
}

Only process MODIFY events where a specific field changed:

{
  "eventName": ["MODIFY"],
  "dynamodb": {
    "NewImage": {
      "status": {
        "S": ["ACTIVE"]
      }
    }
  }
}

This is powerful for change data capture patterns. A DynamoDB table with user records can fan status changes to a pipe that only fires when status transitions to a specific value — no polling code, no Lambda evaluating every record.

Enrichment: When and Why

Enrichment exists for cases where the source event doesn’t contain all the information the target needs. An SQS message might carry an order ID; the Step Functions workflow might need the full order object with customer details and line items.

Without enrichment, you’d write a Lambda that fetches the full order before sending it to Step Functions. With enrichment, you configure a Lambda or API Gateway endpoint in the pipe’s enrichment stage. The pipe calls it with the filtered record and replaces the payload with the response before sending it to the target.

# Enrichment Lambda: receives filtered Pipe record, returns enriched payload
import boto3
import json

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Orders')

def handler(event, context):
    enriched = []
    for record in event:
        order_id = json.loads(record['body'])['orderId']
        response = table.get_item(Key={'orderId': order_id})
        enriched.append(response.get('Item', {}))
    return enriched

The enrichment function receives a list of records (the filtered batch) and should return a list of enriched payloads in the same order. AWS replaces each record with the corresponding enriched response. Enrich carefully — the response must stay under 6 MB total. If you’re enriching a batch of 100 records and each enriched record is 100 KB, you’re at 10 MB and the pipe will fail.

Ordering Guarantees and Concurrency Limits

Standard SQS queues don’t guarantee order, and Pipes makes no additional ordering guarantees for them. You can increase BatchSize up to 10,000 and MaximumBatchingWindowInSeconds up to 300 to batch records efficiently.

For ordered sources — SQS FIFO queues, Kinesis data streams, DynamoDB Streams — the pipe maintains the ordering guarantees of the source. This comes with a concurrency ceiling:

  • SQS FIFO: concurrency is bounded by the number of message group IDs. If you have 50 message group IDs, the pipe can process 50 batches concurrently at most.
  • Kinesis and DynamoDB Streams: concurrency is bounded by the number of shards. A 10-shard Kinesis stream allows up to 10 concurrent pipe executions.

You cannot override this. The concurrency limit is an inherent property of ordered streaming sources, not a pipe configuration. Plan shard counts and FIFO group IDs according to your required throughput.

Error Handling and Auto-Disable

When the enrichment step or target returns errors, the pipe backs off its polling rate. Persistent 4xx errors — bad requests that will never succeed — trigger automatic pipe disabling. AWS sets the pipe state to STOPPED and writes an explanatory message to the StateReason field.

Check a stopped pipe’s reason:

aws pipes describe-pipe --name "order-processor" \
  --query '{State:CurrentState,Reason:StateReason}'

For recoverable failures (5xx errors from a target that’s temporarily unavailable), the pipe uses exponential backoff and retries. For SQS sources, failed messages stay in the queue and eventually route to the dead-letter queue you’ve configured. For Kinesis and DynamoDB Streams, the pipe blocks on the failing record until it succeeds, expires from the stream, or you configure a failure destination.

Configure a failure destination to route problematic records somewhere you can inspect them:

aws pipes update-pipe \
  --name "order-processor" \
  --source-parameters '{
    "KinesisStreamParameters": {
      "OnPartialBatchItemFailure": "AUTOMATIC_BISECT",
      "DestinationConfig": {
        "OnFailure": {
          "Destination": "arn:aws:sqs:us-east-1:123456789012:failed-events"
        }
      }
    }
  }'

The SQS dead-letter queue pattern applies here: even if the pipe handles retries, you want visibility into what’s failing and why.

When to Use Pipes vs the Alternatives

Use Pipes over Lambda Event Source Mapping when:

  • Your target is not Lambda (Step Functions, API Gateway, CloudWatch Logs, Batch, etc.)
  • The transformation logic is simple enough to express in an input transformer
  • You want to eliminate a Lambda function that exists purely to route records between services
  • You need enrichment from a single external source before routing

Use Lambda Event Source Mapping when:

  • Your target is Lambda and you need complex processing logic
  • You need to batch-process records with sophisticated error handling (individual record failures, partial batch responses)
  • You need per-record Lambda concurrency control features from ESM

Use EventBridge event bus with Rules when:

  • One event needs to go to multiple targets (fan-out)
  • Events come from AWS services that publish to EventBridge directly (CloudTrail, Config, EC2 state change, etc.)
  • You need content-based routing to many different targets

The SQS Lambda event source mapping guide and EventBridge Step Functions patterns both describe cases where Pipes would now be a valid alternative with less infrastructure to maintain.

Input Transformation

Before the event reaches the target, you can reshape the payload with an input transformer. This is useful when the target expects a specific format. Step Functions, for example, expects the input to match the state machine’s expected schema.

{
  "InputTemplate": {
    "orderId": "<$.body.orderId>",
    "customerId": "<$.body.customerId>",
    "items": "<$.body.lineItems>",
    "receivedAt": "<$.attributes.SentTimestamp>"
  }
}

The transformer uses JSONPath expressions to extract and remap fields from the source record. If you need conditional logic or more complex transformation, move that into the enrichment Lambda rather than trying to express it in the input template syntax.

Monitoring and Observability

Pipes publish metrics to CloudWatch automatically:

  • ExecutionThrottled — the pipe hit a concurrency limit
  • ExecutionFailed — an execution failed to complete
  • ExecutionStarted — number of pipe executions initiated

Set an alarm on ExecutionFailed to catch problems before they pile up:

aws cloudwatch put-metric-alarm \
  --alarm-name "pipe-order-processor-failures" \
  --namespace AWS/EventBridgePipes \
  --metric-name ExecutionFailed \
  --dimensions Name=PipeName,Value=order-processor \
  --statistic Sum --period 60 \
  --evaluation-periods 2 --threshold 5 \
  --comparison-operator GreaterThanThreshold \
  --alarm-actions arn:aws:sns:us-east-1:123456789012:alerts

Pipe executions also appear in CloudTrail under the pipes.amazonaws.com event source. If you need to audit what the pipe processed — useful for compliance or debugging — the CloudTrail logs show each execution with timestamps and source record identifiers.

Practical Starting Point

The setup that pays off fastest: an SQS queue receiving application events, a filter that drops events you don’t care about (passing 20-30% of volume), and a Step Functions Express Workflow or Lambda as the target. You eliminate one Lambda function from your architecture, reduce cost because you only pay for filtered events, and get built-in retry and error handling.

Start with one pipe replacing an existing routing Lambda you already know you want to simplify. That first pipe will teach you the filtering syntax, IAM requirements, and error behavior faster than reading documentation. Once you understand how the pipe behaves under load and failure conditions, the more complex patterns — enrichment, failure destinations, batching windows — become straightforward additions.

Bits Lovers

Bits Lovers

Professional writer and blogger. Focus on Cloud Computing.

Comments

comments powered by Disqus