Build a production-ready serverless media processing pipeline with AWS Lambda, S3, DynamoDB, and SQS. Part 1 covers infrastructure setup and image processing worker.
In this comprehensive guide, we’ll build a production-ready serverless media processing pipeline that automatically watermarks images uploaded to S3. This is Part 1 of a 4-part series covering the complete infrastructure setup and core processing engine.
What we’ll build:
Architecture Preview:
User Upload → S3 → SQS → Lambda Worker → Processed S3
↓
DynamoDB (Job Status)
Region: ap-south-1 (Mumbai)
Estimated Setup Time: 2-3 hours
Monthly Cost: ~$0.20 for 1000 operations
Before starting, ensure you have:
Required AWS Permissions:
We’ll create two S3 buckets: one for original uploads and another for processed images.
Navigate to S3:
Create the uploads bucket:
amodhbh-media-uploadsProject, Value: serverless-mediaPurpose, Value: uploadsEnvironment, Value: productionVerify creation:
amodhbh-media-uploads in your bucket listCreate the processed bucket:
amodhbh-media-processedProject, Value: serverless-mediaPurpose, Value: processedEnvironment, Value: productionVerify creation:
amodhbh-media-uploads and amodhbh-media-processedWe’ll create a DynamoDB table to track job status throughout the processing pipeline.
Navigate to DynamoDB:
Create table:
media-processing-jobsjobId (Type: String)Table settings:
Encryption:
Tags (Optional):
Project, Value: serverless-mediaEnvironment, Value: productionCreate table:
jobId partition keyWe’ll create two SQS queues: a main processing queue and a dead letter queue for failed messages.
We create the DLQ first, so we can reference it when creating the main queue.
Navigate to SQS:
Create queue:
media-processing-dlqConfiguration:
Access policy:
Encryption:
Tags (Optional):
Project, Value: serverless-mediaEnvironment, Value: productionCreate queue:
arn:aws:sqs:ap-south-1:123456789012:media-processing-dlqCreate queue:
media-processing-queueConfiguration:
Dead-letter queue:
media-processing-dlq from the dropdownAccess policy:
Encryption:
Tags (Optional):
Project, Value: serverless-mediaEnvironment, Value: productionCreate queue:
You should see both queues in the SQS console:
media-processing-queuemedia-processing-dlqClick on media-processing-queue and verify:
media-processing-dlqWe’ll attach AWS managed policies first, then add a custom inline policy.
Search and attach these managed policies:
AWSLambdaBasicExecutionRole (for CloudWatch Logs)Click Next
media-worker-lambda-roleNow we’ll add permissions for S3, SQS, and DynamoDB.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSReceiveAndDelete",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-south-1:*:media-processing-queue"
},
{
"Sid": "S3ReadUploads",
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::amodhbh-media-uploads/*"
},
{
"Sid": "S3WriteProcessed",
"Effect": "Allow",
"Action": ["s3:PutObject"],
"Resource": "arn:aws:s3:::amodhbh-media-processed/*"
},
{
"Sid": "DynamoDBUpdateJobs",
"Effect": "Allow",
"Action": ["dynamodb:GetItem", "dynamodb:UpdateItem"],
"Resource": "arn:aws:dynamodb:ap-south-1:*:table/media-processing-jobs"
}
]
}
WorkerLambdaCustomPolicyThe Worker Lambda needs the Pillow library for image processing. We’ll use a Lambda Layer approach.
Option A: Use AWS-Provided Layer (Recommended - Easiest)
AWS provides pre-built layers for common Python packages. However, if not available, proceed to Option B.
Option B: Create Custom Layer (Most Reliable)
You’ll need to create this on a Linux environment or use AWS CloudShell.
Open AWS CloudShell:
Create the layer directory structure:
mkdir -p pillow-layer/python
cd pillow-layer
Install Pillow:
pip3 install Pillow -t python/
Create the zip file:
zip -r pillow-layer.zip python/
Download to your local machine:
pillow-layer/pillow-layer.zipNavigate to Lambda:
Create layer:
pillow-layerpillow-layer.zip file you downloadedNote the Layer ARN (you’ll need this when creating the Lambda function)
arn:aws:lambda:ap-south-1:123456789012:layer:pillow-layer:1Navigate to Lambda:
Basic information:
worker-lambdaPermissions:
media-worker-lambda-role from the dropdownAdvanced settings:
Click Create function
In the Configuration tab:
Add the Pillow Layer:
pillow-layer and the latest versionClick Configuration tab
Click Environment variables → Edit
Click Add environment variable for each:
| Key | Value |
|---|---|
UPLOAD_BUCKET | amodhbh-media-uploads |
PROCESSED_BUCKET | amodhbh-media-processed |
DYNAMODB_TABLE | media-processing-jobs |
Click Save
lambda_function.py with:import json
import boto3
import os
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
from datetime import datetime
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS clients
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['DYNAMODB_TABLE'])
UPLOAD_BUCKET = os.environ['UPLOAD_BUCKET']
PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
def lambda_handler(event, context):
"""
Main Lambda handler. Processes SQS messages containing S3 upload events.
"""
logger.info(f"Received event: {json.dumps(event)}")
for record in event['Records']:
try:
# Parse the SQS message body
message_body = json.loads(record['body'])
job_id = message_body['jobId']
s3_key = message_body['s3Key']
logger.info(f"Processing job {job_id} for file {s3_key}")
# Update job status to PROCESSING
update_job_status(job_id, 'PROCESSING')
# Download the image from S3
image_data = download_image(UPLOAD_BUCKET, s3_key)
# Process the image (add watermark)
processed_image_data = add_watermark(image_data)
# Generate output key
output_key = f"processed/{s3_key}"
# Upload processed image to S3
upload_image(PROCESSED_BUCKET, output_key, processed_image_data)
# Update job status to COMPLETED
update_job_status(
job_id,
'COMPLETED',
processed_url=f"s3://{PROCESSED_BUCKET}/{output_key}"
)
logger.info(f"Successfully processed job {job_id}")
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
# Update job status to FAILED
if 'job_id' in locals():
update_job_status(job_id, 'FAILED', error=str(e))
# Re-raise the exception so SQS knows the message failed
raise
return {
'statusCode': 200,
'body': json.dumps('Processing complete')
}
def download_image(bucket, key):
"""
Download an image from S3 and return as bytes.
"""
logger.info(f"Downloading s3://{bucket}/{key}")
response = s3_client.get_object(Bucket=bucket, Key=key)
image_data = response['Body'].read()
return image_data
def add_watermark(image_data):
"""
Add a watermark to the image with improved error handling and performance.
"""
logger.info("Adding watermark to image")
try:
# Open the image
image = Image.open(BytesIO(image_data))
# Convert to RGBA if not already (for transparency support)
if image.mode != 'RGBA':
image = image.convert('RGBA')
# Create a transparent overlay
overlay = Image.new('RGBA', image.size, (255, 255, 255, 0))
draw = ImageDraw.Draw(overlay)
# Calculate watermark position (bottom-right corner)
watermark_text = "© Amodhbh Media"
# Try to use a better font, fall back to default if not available
try:
font = ImageFont.truetype("/usr/share/fonts/dejavu/DejaVuSans-Bold.ttf", 36)
except:
font = ImageFont.load_default()
# Get text size using textbbox
bbox = draw.textbbox((0, 0), watermark_text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Position text in bottom-right with 20px margin
x = image.width - text_width - 20
y = image.height - text_height - 20
# Draw semi-transparent background rectangle
padding = 10
draw.rectangle(
[x - padding, y - padding, x + text_width + padding, y + text_height + padding],
fill=(0, 0, 0, 128)
)
# Draw the watermark text
draw.text((x, y), watermark_text, fill=(255, 255, 255, 255), font=font)
# Composite the overlay onto the original image
watermarked = Image.alpha_composite(image, overlay)
# Convert back to RGB (removes alpha channel)
watermarked = watermarked.convert('RGB')
# Save to bytes with optimized settings
output = BytesIO()
watermarked.save(output, format='JPEG', quality=95, optimize=True)
output.seek(0)
return output.getvalue()
except Exception as e:
logger.error(f"Error adding watermark: {str(e)}")
raise
def upload_image(bucket, key, image_data):
"""
Upload processed image to S3 with metadata.
"""
logger.info(f"Uploading to s3://{bucket}/{key}")
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=image_data,
ContentType='image/jpeg',
Metadata={
'processed-by': 'serverless-media-pipeline',
'processing-timestamp': datetime.utcnow().isoformat()
}
)
def update_job_status(job_id, status, processed_url=None, error=None):
"""
Update job status in DynamoDB with improved error handling.
"""
logger.info(f"Updating job {job_id} to status {status}")
try:
update_expression = "SET #status = :status, updatedAt = :timestamp"
expression_values = {
':status': status,
':timestamp': datetime.utcnow().isoformat()
}
expression_names = {
'#status': 'status'
}
if processed_url:
update_expression += ", processedUrl = :url"
expression_values[':url'] = processed_url
if error:
update_expression += ", errorMessage = :error"
expression_values[':error'] = error
table.update_item(
Key={'jobId': job_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values,
ExpressionAttributeNames=expression_names
)
logger.info(f"Successfully updated job {job_id} to {status}")
except Exception as e:
logger.error(f"Error updating job status: {str(e)}")
raise
media-processing-queuemedia-processing-queue listedWe’ll simulate an SQS message manually to test the worker Lambda.
TestMediaProcessing{
"Records": [
{
"messageId": "test-message-id",
"receiptHandle": "test-receipt-handle",
"body": "{\"jobId\": \"test-job-123\", \"s3Key\": \"test-image.jpg\"}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1234567890000",
"SenderId": "test-sender",
"ApproximateFirstReceiveTimestamp": "1234567890000"
},
"messageAttributes": {},
"md5OfBody": "test-md5",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:ap-south-1:123456789012:media-processing-queue",
"awsRegion": "ap-south-1"
}
]
}
Note: This test will fail because test-image.jpg doesn’t exist in your bucket. We’ll do a proper end-to-end test in Part 2.
Before proceeding to Part 2, verify:
amodhbh-media-uploads exists in ap-south-1amodhbh-media-processed exists in ap-south-1media-processing-jobs table existsjobId (String)media-processing-queue existsmedia-processing-dlq existsmedia-worker-lambda-role existsAWSLambdaBasicExecutionRole managed policypillow-layer createdworker-lambda existsEnable S3 Server-Side Encryption:
IAM Least Privilege:
VPC Configuration:
CloudWatch Alarms:
X-Ray Tracing:
Custom Metrics:
Lambda Configuration:
S3 Optimization:
DynamoDB Optimization:
Total idle cost: $0.00/month
Example: 1000 image processing jobs per month
Estimated monthly cost for 1000 operations: ~$0.20
S3 Lifecycle Policies:
DynamoDB Optimization:
Lambda Optimization:
python/PIL/...amodhbh-media-uploads is taken, try: amodhbh-media-uploads-<random-number>Congratulations! You’ve successfully set up the core infrastructure and worker Lambda for your serverless media processing pipeline.
What’s Next:
Quick Reference:
| Resource Type | Name | Purpose |
|---|---|---|
| S3 Bucket | amodhbh-media-uploads | Stores original uploaded images |
| S3 Bucket | amodhbh-media-processed | Stores watermarked images |
| DynamoDB Table | media-processing-jobs | Tracks job status (PENDING/COMPLETED/FAILED) |
| SQS Queue | media-processing-queue | Decouples upload events from processing |
| SQS Queue | media-processing-dlq | Captures failed messages for investigation |
| IAM Role | media-worker-lambda-role | Worker Lambda execution role |
| Lambda Layer | pillow-layer | Provides Pillow library for image processing |
| Lambda Function | worker-lambda | Downloads, watermarks, and uploads images |
| SQS Trigger | media-processing-queue | Triggers worker Lambda when messages arrive |
In this comprehensive guide, we’ve built the foundation of a production-ready serverless media processing pipeline:
✅ Infrastructure Setup:
✅ Worker Lambda:
✅ Security & Monitoring:
Key Benefits:
Ready for Part 2? We’ll add API Gateway, pre-signed URLs, and complete the end-to-end pipeline!
This is Part 1 of a 4-part series on building a production-ready serverless media processing pipeline. Stay tuned for Part 2 where we’ll add API Gateway and complete the user-facing components! Here is the Part 2, where we’ll add API Gateway and complete the user-facing components!