AWS Kinesis: Real-Time Data Streaming with Data Streams, Firehose, and Flink
Kinesis is four distinct services that AWS bundles under one name, which creates genuine confusion. Kinesis Data Streams is a durable ordered log — producers write records, multiple consumers read them independently, data stays for 24 hours to 365 days. Kinesis Data Firehose (now called Amazon Data Firehose) is a managed delivery pipeline — you put data in one end and it comes out the other end in S3, Redshift, or OpenSearch, buffered and optionally transformed. Kinesis Data Analytics is now Amazon Managed Service for Apache Flink — stateful stream processing with windowing and joins. Kinesis Video Streams handles video ingestion for machine learning; most teams don’t touch it.
This guide covers the three data-focused services with concrete examples: when Streams is the right choice and when SQS is simpler, how Firehose delivery works with Lambda transformation, and the difference between enhanced fan-out and standard consumer polling.
Kinesis Data Streams: The Ordered Log
A Kinesis stream is divided into shards. Each shard handles 1 MB/s write throughput and 2 MB/s read throughput. A stream with 10 shards handles 10 MB/s writes and 20 MB/s reads. The shard is the scaling unit.
# Create a stream with 4 shards
aws kinesis create-stream \
--stream-name application-events \
--shard-count 4
# Set retention period (default 24h, max 365 days)
aws kinesis increase-stream-retention-period \
--stream-name application-events \
--retention-period-hours 168 # 7 days
# Describe the stream
aws kinesis describe-stream-summary \
--stream-name application-events \
--query 'StreamDescriptionSummary.{Status:StreamStatus,Shards:OpenShardCount,Retention:RetentionPeriodHours}'
Records within a shard are strictly ordered by sequence number. Records across shards have no ordering guarantee. The partition key determines which shard a record lands on — Kinesis hashes the partition key to assign it to a shard. Use a high-cardinality partition key (user ID, device ID, session ID) to distribute records evenly across shards. A low-cardinality key (like “type” with only 3 values) concentrates records into a few shards while others sit idle.
Writing Records
# PutRecord — single record
aws kinesis put-record \
--stream-name application-events \
--partition-key "user-12345" \
--data "$(echo '{"userId":"12345","event":"purchase","amount":99.99}' | base64)"
# PutRecords — batch up to 500 records, 5 MB total
aws kinesis put-records \
--stream-name application-events \
--records '[
{
"Data": "'"$(echo '{"event":"pageview","userId":"12345"}' | base64)"'",
"PartitionKey": "user-12345"
},
{
"Data": "'"$(echo '{"event":"click","userId":"67890"}' | base64)"'",
"PartitionKey": "user-67890"
}
]'
PutRecords returns a success/failure status per record. Check FailedRecordCount — if non-zero, retry only the failed records with exponential backoff. Don’t retry the entire batch; the successful records don’t need to be sent again and sending them again creates duplicates.
For high-throughput producers (millions of records/minute), use the Kinesis Producer Library (KPL). KPL handles batching, retries, and record aggregation automatically. Aggregation packs multiple small records into a single Kinesis record, reducing per-record API costs and increasing effective throughput by up to 1,000x for small records.
Lambda Trigger from Kinesis
# Create Lambda event source mapping
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/application-events \
--function-name process-events \
--starting-position LATEST \
--batch-size 100 \
--maximum-batching-window-in-seconds 5 \
--bisect-batch-on-function-error true \
--destination-config '{
"OnFailure": {
"Destination": "arn:aws:sqs:us-east-1:123456789012:kinesis-failures-dlq"
}
}' \
--parallelization-factor 3
ParallelizationFactor: 3 runs 3 concurrent Lambda invocations per shard. With 4 shards, that’s up to 12 concurrent Lambda functions processing the stream — significantly higher throughput than 1 invocation per shard. The order is preserved within each of the 3 parallel lanes (ordered by sequence number), but you lose strict ordering across all records in the shard.
import base64
import json
def handler(event, context):
for record in event['Records']:
# Kinesis data is base64-encoded
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
data = json.loads(payload)
shard_id = record['eventSourceARN'].split('/')[-1]
sequence_number = record['kinesis']['sequenceNumber']
partition_key = record['kinesis']['partitionKey']
timestamp = record['kinesis']['approximateArrivalTimestamp']
print(f"Processing: shard={shard_id}, seq={sequence_number}, user={data.get('userId')}")
# Process the record
process_event(data)
def process_event(data):
event_type = data.get('event')
if event_type == 'purchase':
update_purchase_metrics(data)
elif event_type == 'pageview':
update_analytics(data)
Enhanced Fan-Out for Multiple Consumers
Standard Kinesis consumers share the 2 MB/s read throughput per shard. With 5 consumers reading the same stream, each gets 400 KB/s. Enhanced fan-out gives each registered consumer a dedicated 2 MB/s pipe per shard — no sharing.
# Register a consumer with enhanced fan-out
CONSUMER_ARN=$(aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789012:stream/application-events \
--consumer-name analytics-consumer \
--query 'Consumer.ConsumerARN' \
--output text)
# Subscribe to the shard using enhanced fan-out (push-based, not polling)
# This is typically handled by KCL or Lambda automatically
aws kinesis subscribe-to-shard \
--consumer-arn $CONSUMER_ARN \
--shard-id shardId-000000000000 \
--starting-position '{"Type": "LATEST"}'
Enhanced fan-out costs $0.015 per consumer per shard-hour plus $0.013 per GB read. For 3 consumers on a 4-shard stream running 24/7: 3 × 4 × $0.015 × 720 = $129.60/month in enhanced fan-out fees, before data volume. Standard polling is free (you pay only for GetRecords API calls). Only use enhanced fan-out when you have 2+ consumers that each need maximum read throughput.
Resharding: Scaling Shards
When a stream is under-provisioned (hitting the 1 MB/s write limit per shard), split shards. When over-provisioned, merge adjacent shards.
# Check shard utilization to decide when to reshard
aws cloudwatch get-metric-statistics \
--namespace AWS/Kinesis \
--metric-name WriteProvisionedThroughputExceeded \
--dimensions Name=StreamName,Value=application-events \
--statistics Sum \
--period 300 \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%SZ)
# Split a hot shard into two
SHARD_ID="shardId-000000000001"
STARTING_HASH=$(aws kinesis describe-stream \
--stream-name application-events \
--query "StreamDescription.Shards[?ShardId=='$SHARD_ID'].HashKeyRange.StartingHashKey" \
--output text)
# New starting hash key splits the shard at the midpoint
NEW_HASH=$(python3 -c "
start = int('$STARTING_HASH')
end = int('$(aws kinesis describe-stream --stream-name application-events --query \"StreamDescription.Shards[?ShardId==\`$SHARD_ID\`].HashKeyRange.EndingHashKey\" --output text)')
print((start + end) // 2)
")
aws kinesis split-shard \
--stream-name application-events \
--shard-to-split $SHARD_ID \
--new-starting-hash-key $NEW_HASH
# Alternatively, use UpdateShardCount to target an exact number of shards
aws kinesis update-shard-count \
--stream-name application-events \
--target-shard-count 8 \
--scaling-type UNIFORM_SCALING
UNIFORM_SCALING is the practical choice — it redistributes hash key ranges uniformly. Manual split/merge lets you target specific shards (useful when one partition key is disproportionately hot), but requires knowing the exact hash key boundaries.
Kinesis Data Firehose (Amazon Data Firehose)
Firehose handles delivery, not streaming. Data in → buffer → deliver to destination. No shards, no consumers to manage. You don’t call GetRecords; Firehose pushes data to the destination automatically.
# Create Firehose delivery stream to S3
aws firehose create-delivery-stream \
--delivery-stream-name app-events-to-s3 \
--s3-destination-configuration '{
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseS3Role",
"BucketARN": "arn:aws:s3:::my-events-bucket",
"Prefix": "events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
"ErrorOutputPrefix": "errors/!{firehose:error-output-type}/",
"BufferingHints": {
"SizeInMBs": 64,
"IntervalInSeconds": 300
},
"CompressionFormat": "GZIP",
"CloudWatchLoggingOptions": {
"Enabled": true,
"LogGroupName": "/aws/kinesisfirehose/app-events-to-s3",
"LogStreamName": "S3Delivery"
}
}'
# Put records directly to Firehose
aws firehose put-record \
--delivery-stream-name app-events-to-s3 \
--record '{"Data": "'"$(echo '{"event":"purchase","userId":"12345"}' | base64)"'"}'
# Or put records from a Kinesis Data Stream (Firehose reads from the stream)
aws firehose create-delivery-stream \
--delivery-stream-name stream-to-s3 \
--delivery-stream-type KinesisStreamAsSource \
--kinesis-stream-source-configuration '{
"KinesisStreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/application-events",
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseKinesisRole"
}' \
--s3-destination-configuration '{
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseS3Role",
"BucketARN": "arn:aws:s3:::my-events-bucket",
"BufferingHints": {"SizeInMBs": 128, "IntervalInSeconds": 300},
"CompressionFormat": "GZIP"
}'
The Prefix with !{timestamp:yyyy} creates Hive-style partitioned S3 paths automatically. Athena can query these partitions directly without loading data — you get queryable streaming data in S3 with no ETL step.
BufferingHints controls when Firehose flushes to S3: whichever limit (size or interval) hits first triggers a delivery. 64 MB / 300 seconds means you deliver at least every 5 minutes, or sooner if 64 MB accumulates. Smaller buffers mean more S3 objects and higher API costs; larger buffers mean longer latency to delivery.
Lambda Transformation in Firehose
Add a Lambda function to transform records before delivery:
import base64
import json
def handler(event, context):
output = []
for record in event['records']:
# Decode the record
payload = json.loads(base64.b64decode(record['data']).decode('utf-8'))
# Filter out test events
if payload.get('environment') == 'test':
output.append({
'recordId': record['recordId'],
'result': 'Dropped', # Drop this record
'data': record['data']
})
continue
# Add processing timestamp
payload['processed_at'] = context.aws_request_id
# Re-encode (must add newline for S3 line-delimited format)
encoded = base64.b64encode(
(json.dumps(payload) + '\n').encode('utf-8')
).decode('utf-8')
output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': encoded
})
return {'records': output}
Each record must return Ok (transformed), Dropped (exclude from delivery), or ProcessingFailed (send to error prefix). Firehose calls your Lambda with batches of up to 3 MB and retries ProcessingFailed records. The Lambda invocation adds latency to delivery but lets you filter, enrich, and reformat records before they hit S3.
Kinesis vs SQS vs MSK
The right streaming service depends on your access pattern:
Kinesis Data Streams: Multiple independent consumers reading the same data; ordered processing within a partition; replay from any point in the retention window; sub-second latency. Best for: event sourcing, real-time analytics, audit logging where multiple systems need the same data.
SQS: Single consumer (or competing consumers) processing work items; messages deleted after consumption; at-least-once delivery; built-in dead-letter queue. Best for: task queues, job distribution, decoupling microservices. SQS is simpler and cheaper for single-consumer patterns.
MSK (Managed Kafka): Kafka protocol compatibility; consumer groups with offset management; topics with configurable partition counts; longer retention; ecosystem compatibility (Kafka Connect, Kafka Streams). Best for: teams already using Kafka, workloads needing Kafka-specific features, or when you need more control over consumer group management. The AWS MSK guide covers when MSK is the right choice over Kinesis.
Firehose: Delivery pipeline only — no custom consumer logic, no replay, no fan-out. Best for: shipping data to S3, Redshift, or OpenSearch with minimal code. If you just need to get events into S3, Firehose with 10 lines of config is faster than building a Kinesis consumer that writes to S3.
Pricing
Kinesis Data Streams: $0.015 per shard-hour (≈$10.80/month per shard) + $0.014 per million PUT records. A 4-shard stream costs $43.20/month in shard-hours regardless of volume. At 100 million records/month: $43.20 + $1.40 = $44.60/month.
Firehose: $0.029 per GB ingested for the first 500 TB/month. No shard costs, no upfront. For 1 TB/month: $29.75. Simpler pricing for delivery-focused workloads.
For applications processing event streams at high volume, combining Kinesis Data Streams (for fan-out to multiple consumers) with Firehose (for S3 delivery) is a common pattern. Kinesis is the canonical choice for real-time stream processing in AWS; the AWS EventBridge guide covers the alternative event-routing model for lower-throughput, event-driven architectures where the Kinesis shard model would be over-engineered.
Comments