AWS MSK: Managed Apache Kafka for Streaming Workloads

Bits Lovers
Written by Bits Lovers on
AWS MSK: Managed Apache Kafka for Streaming Workloads

Amazon MSK (Managed Streaming for Apache Kafka) runs Apache Kafka on AWS without you managing ZooKeeper, broker upgrades, or disk provisioning. You get standard Kafka — same protocol, same client libraries, same consumer group semantics — but AWS handles the infrastructure. The operational gap between MSK and self-managed Kafka on EC2 is significant: broker failure recovery, storage scaling, and multi-AZ replication are automatic.

The decision between MSK and Kinesis Data Streams comes down to protocol and ecosystem. If you’re already running Kafka clients, using Kafka Connect for data integration, or need Kafka Streams for processing, MSK gives you that without migration pain. If you’re starting fresh with no existing Kafka investment, Kinesis is simpler and often cheaper at moderate scale. Both handle the same streaming patterns; neither is universally superior.

MSK Provisioned vs MSK Serverless

Two deployment modes. MSK Provisioned gives you dedicated broker instances with predictable capacity. MSK Serverless automatically scales and charges per throughput with no broker management.

MSK Serverless is the right starting point for variable workloads — you don’t think about partitions, broker counts, or storage provisioning. The limit is 200 MBps write throughput per cluster and 400 MBps read throughput. MSK Serverless doesn’t support all Kafka features (no custom broker configurations, no Kafka Streams with some features), but covers the majority of producer/consumer use cases.

# Create MSK Serverless cluster
aws kafka create-cluster-v2 \
  --cluster-name my-serverless-kafka \
  --serverless '{
    "VpcConfigs": [{
      "SubnetIds": ["subnet-private-1a", "subnet-private-1b", "subnet-private-1c"],
      "SecurityGroupIds": ["sg-kafka-clients"]
    }],
    "ClientAuthentication": {
      "Sasl": {"Iam": {"Enabled": true}}
    }
  }'

# Create MSK Provisioned cluster
aws kafka create-cluster \
  --cluster-name my-kafka-cluster \
  --kafka-version "3.6.0" \
  --number-of-broker-nodes 3 \
  --broker-node-group-info '{
    "InstanceType": "kafka.m5.large",
    "ClientSubnets": ["subnet-private-1a", "subnet-private-1b", "subnet-private-1c"],
    "SecurityGroups": ["sg-kafka-brokers"],
    "StorageInfo": {
      "EbsStorageInfo": {
        "VolumeSize": 100
      }
    }
  }' \
  --encryption-info '{
    "EncryptionInTransit": {
      "ClientBroker": "TLS",
      "InCluster": true
    }
  }' \
  --client-authentication '{
    "Sasl": {
      "Iam": {"Enabled": true},
      "Scram": {"Enabled": false}
    },
    "Tls": {"Enabled": false}
  }' \
  --enhanced-monitoring PER_TOPIC_PER_BROKER \
  --open-monitoring '{
    "Prometheus": {
      "JmxExporter": {"EnabledInBroker": true},
      "NodeExporter": {"EnabledInBroker": true}
    }
  }'

Three brokers across three subnets (one per AZ) is the minimum for production. This gives you Multi-AZ replication and tolerates the loss of one broker. Five brokers allow two simultaneous broker failures.

kafka.m5.large handles roughly 30-50 MB/s throughput per broker. At 3 brokers with replication factor 3, effective write throughput is 30-50 MB/s cluster-wide (data is written to all replicas). For higher throughput, scale to kafka.m5.4xlarge (250+ MB/s per broker) or add brokers.

# Get bootstrap brokers after cluster is active (takes 15-20 minutes)
CLUSTER_ARN=$(aws kafka list-clusters \
  --cluster-name-filter my-kafka-cluster \
  --query 'ClusterInfoList[0].ClusterArn' \
  --output text)

aws kafka get-bootstrap-brokers \
  --cluster-arn $CLUSTER_ARN \
  --query '{TLS:BootstrapBrokerStringTls,IAM:BootstrapBrokerStringSaslIam}'

IAM Authentication

MSK IAM auth is the recommended approach for AWS workloads — no passwords, no certificates to rotate, just IAM policies.

# Grant EC2 instances or Lambda functions access to MSK
aws iam put-role-policy \
  --role-name MyAppRole \
  --policy-name MSKAccess \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:AlterCluster",
        "kafka-cluster:DescribeCluster"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/my-kafka-cluster/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:*Topic*",
        "kafka-cluster:WriteData",
        "kafka-cluster:ReadData"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:topic/my-kafka-cluster/*/*"
    },
    {
      "Effect": "Allow",
      "Action": ["kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup"],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:group/my-kafka-cluster/*/*"
    }]
  }'

Topic Management

# Create topics using the Kafka CLI (from a client in the same VPC)
BOOTSTRAP="b-1.my-kafka.abc123.c1.kafka.us-east-1.amazonaws.com:9098"

# Using kafka-topics.sh with IAM auth
kafka-topics.sh \
  --bootstrap-server $BOOTSTRAP \
  --command-config client.properties \
  --create \
  --topic application-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

# Or via AWS CLI (MSK API)
aws kafka create-topic \
  --cluster-arn $CLUSTER_ARN \
  --topic-name application-events \
  --replication-factor 3 \
  --num-partitions 12 \
  --topic-configuration-info '[
    {"ConfigName": "retention.ms", "ConfigValue": "604800000"},
    {"ConfigName": "cleanup.policy", "ConfigValue": "delete"}
  ]'

Partition count is the key scaling decision. Partitions are the unit of parallelism — a consumer group with 12 consumers processing a 12-partition topic has maximum parallelism (1 partition per consumer). More partitions = more parallelism but more file handles, more memory overhead, and longer leader election during failover.

Replication factor 3 means each partition has 3 copies across different brokers. With 3 brokers and RF=3, every broker holds every partition. With 6 brokers and RF=3, each broker holds half the partitions. Higher broker counts improve throughput; replication factor protects against broker failure.

Producer and Consumer in Python

# producer.py
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import boto3

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token('us-east-1')
    return auth_token, expiry_ms / 1000

producer = KafkaProducer(
    bootstrap_servers=[
        'b-1.my-kafka.abc123.c1.kafka.us-east-1.amazonaws.com:9098',
        'b-2.my-kafka.abc123.c1.kafka.us-east-1.amazonaws.com:9098',
        'b-3.my-kafka.abc123.c1.kafka.us-east-1.amazonaws.com:9098',
    ],
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=oauth_cb,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',          # Wait for all in-sync replicas
    retries=5,
    batch_size=16384,
    linger_ms=10,        # Wait up to 10ms to batch records
    compression_type='gzip'
)

# Send a message
future = producer.send(
    'application-events',
    key='user-12345',          # Determines partition (same key → same partition)
    value={'event': 'purchase', 'userId': '12345', 'amount': 99.99}
)

# Block until sent (optional — async by default)
metadata = future.get(timeout=10)
print(f"Sent to partition {metadata.partition}, offset {metadata.offset}")

producer.flush()
producer.close()
# consumer.py
from kafka import KafkaConsumer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token('us-east-1')
    return auth_token, expiry_ms / 1000

consumer = KafkaConsumer(
    'application-events',
    bootstrap_servers=[
        'b-1.my-kafka.abc123.c1.kafka.us-east-1.amazonaws.com:9098',
        'b-2.my-kafka.abc123.c1.kafka.us-east-1.amazonaws.com:9098',
    ],
    group_id='my-consumer-group',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=oauth_cb,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    auto_commit_interval_ms=5000,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    max_poll_records=500
)

for message in consumer:
    data = message.value
    partition = message.partition
    offset = message.offset
    
    print(f"Partition {partition}, offset {offset}: {data.get('event')} for user {data.get('userId')}")
    
    # Process the message
    process_event(data)

acks='all' ensures the producer waits for all in-sync replicas to confirm the write before returning success. This prevents data loss if the leader broker fails immediately after the write. For non-critical data where throughput matters more than durability, acks=1 (leader only) is faster.

Consumer group IDs are important: all consumers sharing the same group_id split the partitions between them (horizontal scaling). Consumers with different group_id values each get their own copy of all data (fan-out). This is the Kafka equivalent of Kinesis enhanced fan-out — multiple applications consuming the same topic independently.

Lambda Trigger from MSK

# Create Lambda event source mapping from MSK topic
aws lambda create-event-source-mapping \
  --event-source-arn $CLUSTER_ARN \
  --function-name process-kafka-events \
  --starting-position TRIM_HORIZON \
  --batch-size 100 \
  --topics '["application-events"]' \
  --source-access-configurations '[
    {"Type": "SASL_SCRAM_512_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-credentials"}
  ]'

# For IAM auth MSK clusters, use:
# --source-access-configurations '[{"Type": "VPC_SUBNET", "URI": "subnet-private-1a"}, {"Type": "VPC_SECURITY_GROUP", "URI": "sg-kafka-clients"}]'

Lambda with MSK works similarly to Lambda with Kinesis — Lambda polls the topic and invokes your function with batches of records. Each batch is from a single partition, maintaining order. Lambda manages consumer group offsets automatically.

MSK Connect: Managed Kafka Connect

MSK Connect runs Kafka Connect workers without managing the infrastructure. Use it to stream data between Kafka and external systems (S3, RDS, Elasticsearch, Snowflake).

# Create S3 sink connector (upload connector plugin first)
aws kafkaconnect create-connector \
  --connector-name events-to-s3 \
  --kafka-cluster '{
    "apacheKafkaCluster": {
      "bootstrapServers": "b-1.my-kafka...:9092",
      "vpc": {
        "subnets": ["subnet-private-1a", "subnet-private-1b"],
        "securityGroups": ["sg-kafka-clients"]
      }
    }
  }' \
  --kafka-cluster-client-authentication '{"authenticationType": "IAM"}' \
  --kafka-cluster-encryption-in-transit '{"encryptionType": "TLS"}' \
  --connector-configuration '{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "application-events",
    "s3.region": "us-east-1",
    "s3.bucket.name": "my-events-bucket",
    "s3.part.size": "5242880",
    "flush.size": "1000",
    "rotate.interval.ms": "300000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.compatibility": "NONE"
  }' \
  --capacity '{"autoScaling": {"maxWorkerCount": 10, "minWorkerCount": 1, "scaleInPolicy": {"cpuUtilizationPercentage": 20}, "scaleOutPolicy": {"cpuUtilizationPercentage": 80}}}' \
  --plugins '[{"customPlugin": {"customPluginArn": "arn:aws:kafkaconnect:us-east-1:123456789012:custom-plugin/s3-connector/abc123", "revision": 1}}]' \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKConnectRole

MSK Connect auto-scales workers based on CPU utilization, so you don’t manage connector worker counts manually.

Pricing

MSK Provisioned: broker instance hours + storage. kafka.m5.large costs $0.213/hour per broker. Three brokers: $0.213 × 3 × 720 = $460/month in compute plus storage at $0.10/GB-month. A 3-broker cluster with 300 GB storage (100 GB per broker): $460 + $30 = $490/month baseline.

MSK Serverless: $0.75/hour per cluster + $0.10/GB written + $0.05/GB read. A cluster with 100 GB/day written, 200 GB/day read: $0.75 × 720 + 3,000 × $0.10 + 6,000 × $0.05 = $540 + $300 + $300 = $1,140/month. MSK Serverless costs more at high throughput but zero at low throughput.

For the decision between MSK and Kinesis in detail, the Kinesis guide covers the functional overlap and where each service has clear advantages. MSK is the right choice when Kafka protocol compatibility matters — existing clients, Kafka Connect ecosystem, Kafka Streams, or consumer group semantics that go beyond what Kinesis offers.

Storage scaling on provisioned MSK: you can expand EBS volume size online without downtime. You cannot shrink volumes. Plan your initial size conservatively and expand as needed rather than over-provisioning.

Bits Lovers

Bits Lovers

Professional writer and blogger. Focus on Cloud Computing.

Comments

comments powered by Disqus