Complete your serverless media processing pipeline with API Gateway, Lambda functions, and end-to-end testing. Part 2 covers the user-facing API and event-driven processing.
In this comprehensive guide, we’ll complete our serverless media processing pipeline by adding the user-facing API and event-driven processing components. This is Part 2 of our 4-part series, building on the infrastructure foundation from Part 1.
What we’ll build:
Architecture Flow:
User → API Gateway → Lambda → S3 → Event → Dispatcher → SQS → Worker → Processed S3
Region: ap-south-1 (Mumbai)
Estimated Setup Time: 1-2 hours
Prerequisites: Part 1 completed
Before starting, ensure you have completed Part 1 and have:
amodhbh-media-uploads, amodhbh-media-processed)media-processing-jobs)media-processing-queue, media-processing-dlq)Required AWS Permissions:
Navigate to IAM:
Configure trust policy:
Add permissions:
AWSLambdaBasicExecutionRoleName and create:
media-api-lambda-role{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DynamoDBReadWrite",
"Effect": "Allow",
"Action": ["dynamodb:PutItem", "dynamodb:GetItem", "dynamodb:UpdateItem"],
"Resource": "arn:aws:dynamodb:ap-south-1:*:table/media-processing-jobs"
},
{
"Sid": "S3PresignedURL",
"Effect": "Allow",
"Action": ["s3:PutObject"],
"Resource": "arn:aws:s3:::amodhbh-media-uploads/*"
},
{
"Sid": "S3GetProcessedObjects",
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::amodhbh-media-processed/*"
}
]
}
APILambdaCustomPolicyThis function generates a pre-signed S3 URL and creates a job record.
Navigate to Lambda:
Basic information:
request-upload-lambdaPermissions:
media-api-lambda-roleClick Create function
Configuration tab → General configuration → Edit
Add environment variables:
| Key | Value |
|---|---|
UPLOAD_BUCKET | amodhbh-media-uploads |
DYNAMODB_TABLE | media-processing-jobs |
lambda_function.py with:import json
import boto3
import uuid
from datetime import datetime
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
import os
UPLOAD_BUCKET = os.environ['UPLOAD_BUCKET']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
table = dynamodb.Table(DYNAMODB_TABLE)
def lambda_handler(event, context):
"""
Generate a pre-signed S3 upload URL and create a job record in DynamoDB.
"""
logger.info(f"Received event: {json.dumps(event)}")
try:
# Parse request body
if event.get('body'):
body = json.loads(event['body'])
else:
body = {}
# Get filename from request (optional)
filename = body.get('filename', f"{uuid.uuid4()}.jpg")
# Generate unique job ID
job_id = str(uuid.uuid4())
# Generate S3 key (path in bucket)
s3_key = f"uploads/{job_id}/{filename}"
# Create job record in DynamoDB
create_job_record(job_id, s3_key)
# Generate pre-signed URL (valid for 5 minutes)
upload_url = generate_presigned_url(UPLOAD_BUCKET, s3_key)
# Return response
response = {
'jobId': job_id,
'uploadUrl': upload_url,
'expiresIn': 300 # 5 minutes in seconds
}
logger.info(f"Generated upload URL for job {job_id}")
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'POST, OPTIONS'
},
'body': json.dumps(response)
}
except Exception as e:
logger.error(f"Error: {str(e)}")
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Internal server error',
'message': str(e)
})
}
def create_job_record(job_id, s3_key):
"""
Create a new job record in DynamoDB with PENDING status.
"""
logger.info(f"Creating job record for {job_id}")
try:
table.put_item(
Item={
'jobId': job_id,
'status': 'PENDING',
's3Key': s3_key,
'createdAt': datetime.utcnow().isoformat(),
'updatedAt': datetime.utcnow().isoformat()
}
)
logger.info(f"Successfully created job record for {job_id}")
except Exception as e:
logger.error(f"Error creating job record: {str(e)}")
raise
def generate_presigned_url(bucket, key):
"""
Generate a pre-signed URL for uploading to S3.
"""
logger.info(f"Generating pre-signed URL for s3://{bucket}/{key}")
try:
url = s3_client.generate_presigned_url(
'put_object',
Params={
'Bucket': bucket,
'Key': key,
'ContentType': 'image/jpeg'
},
ExpiresIn=300 # 5 minutes
)
return url
except Exception as e:
logger.error(f"Error generating pre-signed URL: {str(e)}")
raise
This function retrieves job status from DynamoDB.
Lambda console → Functions → Create function
Basic information:
get-job-status-lambdaPermissions:
media-api-lambda-roleClick Create function
Configuration tab → General configuration → Edit
Add environment variables:
| Key | Value |
|---|---|
DYNAMODB_TABLE | media-processing-jobs |
PROCESSED_BUCKET | amodhbh-media-processed |
lambda_function.py with:import json
import boto3
import os
from boto3.dynamodb.conditions import Key
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
s3_client = boto3.client('s3')
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
table = dynamodb.Table(DYNAMODB_TABLE)
def lambda_handler(event, context):
"""
Retrieve job status from DynamoDB by job ID.
"""
logger.info(f"Received event: {json.dumps(event)}")
try:
# Extract jobId from path parameters
path_parameters = event.get('pathParameters', {})
job_id = path_parameters.get('jobId')
if not job_id:
logger.warning("Missing jobId parameter")
return {
'statusCode': 400,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Missing jobId parameter'
})
}
logger.info(f"Retrieving status for job {job_id}")
# Get job from DynamoDB
response = table.get_item(Key={'jobId': job_id})
if 'Item' not in response:
logger.warning(f"Job {job_id} not found")
return {
'statusCode': 404,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Job not found'
})
}
job = response['Item']
# If job is completed, generate a pre-signed URL for the processed image
if job['status'] == 'COMPLETED' and 'processedUrl' in job:
download_url = generate_download_url(job['processedUrl'])
if download_url:
job['downloadUrl'] = download_url
logger.info(f"Retrieved status for job {job_id}: {job['status']}")
# Return job status
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET, OPTIONS'
},
'body': json.dumps({
'jobId': job['jobId'],
'status': job['status'],
'createdAt': job.get('createdAt'),
'updatedAt': job.get('updatedAt'),
'downloadUrl': job.get('downloadUrl'),
'errorMessage': job.get('errorMessage')
})
}
except Exception as e:
logger.error(f"Error: {str(e)}")
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Internal server error',
'message': str(e)
})
}
def generate_download_url(s3_url):
"""
Generate a pre-signed URL for downloading the processed image.
"""
try:
if s3_url.startswith('s3://'):
# Parse the S3 URL
s3_path = s3_url.replace('s3://', '')
parts = s3_path.split('/', 1)
if len(parts) == 2:
bucket, key = parts
# Generate pre-signed URL for download (valid 1 hour)
download_url = s3_client.generate_presigned_url(
'get_object',
Params={
'Bucket': bucket,
'Key': key
},
ExpiresIn=3600 # 1 hour
)
return download_url
except Exception as e:
logger.error(f"Error generating download URL: {str(e)}")
return None
Navigate to API Gateway:
Create API:
media-processing-apiConfigure routes (skip for now):
Configure stages:
$default (auto-deploy)Review and create:
Note your API endpoint URL:
https://abc123xyz.execute-api.ap-south-1.amazonaws.comIn your API, click Routes in the left sidebar
Click Create
Method: POST
Path: /uploads
Click Create
Attach integration:
POST /uploads routerequest-upload-lambdaClick Create (to create another route)
Method: GET
Path: /jobs/{jobId}
Click Create
Attach integration:
GET /jobs/{jobId} routeget-job-status-lambdaOur Lambda functions already return CORS headers, so no additional CORS configuration is needed in API Gateway for HTTP APIs.
Using curl (Linux/Mac/Git Bash on Windows):
curl -X POST https://YOUR_API_ID.execute-api.ap-south-1.amazonaws.com/uploads \
-H "Content-Type: application/json" \
-d '{"filename": "test-image.jpg"}'
Expected response:
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"uploadUrl": "https://amodhbh-media-uploads.s3.ap-south-1.amazonaws.com/uploads/...",
"expiresIn": 300
}
Using Postman:
https://YOUR_API_ID.execute-api.ap-south-1.amazonaws.com/uploadsContent-Type: application/json{
"filename": "my-photo.jpg"
}
After getting the uploadUrl from the previous request, upload an actual image:
Using curl:
curl -X PUT "PRESIGNED_URL_FROM_PREVIOUS_RESPONSE" \
-H "Content-Type: image/jpeg" \
--data-binary @path/to/your/image.jpg
Using Postman:
uploadUrl from the previous responseContent-Type: image/jpegUsing curl:
curl https://YOUR_API_ID.execute-api.ap-south-1.amazonaws.com/jobs/YOUR_JOB_ID
Expected response (when status is PENDING):
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "PENDING",
"createdAt": "2025-10-17T12:00:00.000000",
"updatedAt": "2025-10-17T12:00:00.000000"
}
Expected response (when status is COMPLETED):
{
"jobId": "550e8400-e29b-41d4-a716-446655440000",
"status": "COMPLETED",
"createdAt": "2025-10-17T12:00:00.000000",
"updatedAt": "2025-10-17T12:00:15.000000",
"downloadUrl": "https://amodhbh-media-processed.s3.ap-south-1.amazonaws.com/..."
}
Navigate to IAM:
Configure trust policy:
Add permissions:
AWSLambdaBasicExecutionRoleName and create:
dispatcher-lambda-role{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSSendMessage",
"Effect": "Allow",
"Action": ["sqs:SendMessage", "sqs:GetQueueUrl"],
"Resource": "arn:aws:sqs:ap-south-1:*:media-processing-queue"
}
]
}
DispatcherLambdaCustomPolicyNavigate to Lambda:
Basic information:
dispatcher-lambdaPermissions:
dispatcher-lambda-roleClick Create function
Configuration tab → General configuration → Edit
Add environment variables:
| Key | Value |
|---|---|
SQS_QUEUE_URL | (We’ll get this in the next step) |
Navigate to SQS:
Click on media-processing-queue
Copy the Queue URL from the Details section
https://sqs.ap-south-1.amazonaws.com/123456789012/media-processing-queueGo back to Lambda:
dispatcher-lambda functionSQS_QUEUE_URL with the copied URLlambda_function.py with:import json
import boto3
import os
from urllib.parse import unquote_plus
import logging
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sqs_client = boto3.client('sqs')
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
def lambda_handler(event, context):
"""
Triggered by S3 upload events. Extracts S3 object details and
sends a message to SQS for processing by the worker Lambda.
"""
logger.info(f"Received event: {json.dumps(event)}")
try:
# Process each S3 event record
for record in event['Records']:
# Extract S3 event information
event_name = record['eventName']
bucket_name = record['s3']['bucket']['name']
object_key = unquote_plus(record['s3']['object']['key'])
logger.info(f"Processing S3 event: {event_name}")
logger.info(f"Bucket: {bucket_name}, Key: {object_key}")
# Only process object creation events
if not event_name.startswith('ObjectCreated'):
logger.info(f"Ignoring event type: {event_name}")
continue
# Extract job ID from the S3 key
# Expected format: uploads/{job_id}/{filename}
job_id = extract_job_id(object_key)
if not job_id:
logger.warning(f"Could not extract job ID from key: {object_key}")
continue
# Create message for SQS
message = {
'jobId': job_id,
's3Key': object_key,
'bucket': bucket_name
}
# Send message to SQS queue
send_to_sqs(message)
logger.info(f"Successfully queued job {job_id} for processing")
return {
'statusCode': 200,
'body': json.dumps('Successfully dispatched events to SQS')
}
except Exception as e:
logger.error(f"Error processing S3 event: {str(e)}")
# Re-raise to signal failure
raise
def extract_job_id(s3_key):
"""
Extract job ID from S3 key path.
Expected format: uploads/{job_id}/{filename}
"""
parts = s3_key.split('/')
if len(parts) >= 2 and parts[0] == 'uploads':
return parts[1]
return None
def send_to_sqs(message):
"""
Send a message to the SQS queue.
"""
logger.info(f"Sending message to SQS: {json.dumps(message)}")
try:
response = sqs_client.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(message),
MessageAttributes={
'JobId': {
'StringValue': message['jobId'],
'DataType': 'String'
}
}
)
logger.info(f"Message sent with ID: {response['MessageId']}")
return response
except Exception as e:
logger.error(f"Error sending message to SQS: {str(e)}")
raise
This is the crucial step that triggers the entire processing pipeline.
First, we need to allow S3 to invoke our Lambda function.
In the dispatcher-lambda function:
Configure permissions:
AllowS3Invocations3.amazonaws.comarn:aws:s3:::amodhbh-media-uploadslambda:InvokeFunctionNavigate to S3:
Open the uploads bucket:
amodhbh-media-uploadsCreate event notification:
Configure the event:
trigger-dispatcher-on-uploaduploads/ (only trigger for files in the uploads/ folder)s3:ObjectCreated:*)dispatcher-lambda from the dropdowntrigger-dispatcher-on-uploaduploads/dispatcher-lambdaNow we’ll test the complete pipeline from start to finish!
Step 1: Request Upload URL
curl -X POST https://YOUR_API_ID.execute-api.ap-south-1.amazonaws.com/uploads \
-H "Content-Type: application/json" \
-d '{"filename": "beach-sunset.jpg"}'
Expected response:
{
"jobId": "abc-123-def-456",
"uploadUrl": "https://amodhbh-media-uploads.s3.ap-south-1.amazonaws.com/...",
"expiresIn": 300
}
Save the jobId and uploadUrl from the response!
Step 2: Upload Image to S3
curl -X PUT "PASTE_UPLOAD_URL_HERE" \
-H "Content-Type: image/jpeg" \
--data-binary @/path/to/your/beach-sunset.jpg
Expected response: Empty response with 200 OK status
Step 3: Immediately Check Job Status (Should be PENDING or PROCESSING)
curl https://YOUR_API_ID.execute-api.ap-south-1.amazonaws.com/jobs/YOUR_JOB_ID
Expected response:
{
"jobId": "abc-123-def-456",
"status": "PENDING",
"createdAt": "2025-10-17T12:00:00.000000",
"updatedAt": "2025-10-17T12:00:00.000000"
}
Step 4: Wait 10-30 seconds, then check status again
curl https://YOUR_API_ID.execute-api.ap-south-1.amazonaws.com/jobs/YOUR_JOB_ID
Expected response:
{
"jobId": "abc-123-def-456",
"status": "COMPLETED",
"createdAt": "2025-10-17T12:00:00.000000",
"updatedAt": "2025-10-17T12:00:15.000000",
"downloadUrl": "https://amodhbh-media-processed.s3.ap-south-1.amazonaws.com/..."
}
Step 5: Download the Processed Image
Copy the downloadUrl from the response and paste it in your browser, or use curl:
curl -o processed-image.jpg "PASTE_DOWNLOAD_URL_HERE"
Verify: Open processed-image.jpg and confirm it has the watermark “© Amodhbh Media” in the bottom-right corner!
For dispatcher-lambda:
dispatcher-lambda → Monitor tabFor worker-lambda:
worker-lambda → Monitor tabmedia-processing-queuemedia-processing-jobsjobId and verify status is COMPLETEDUploads bucket:
amodhbh-media-uploadsuploads/{job_id}/Processed bucket:
amodhbh-media-processedprocessed/uploads/{job_id}/If you upload 1000 images simultaneously:
Security:
Monitoring:
Performance:
Processing 10,000 images per month:
S3 Lifecycle Policies:
DynamoDB Optimization:
Lambda Optimization:
Current Limits:
For Higher Load:
Cause: Lambda function errors Solution:
Cause: URL expired or incorrect headers Solution:
Cause: Incorrect job ID or DynamoDB issue Solution:
request-upload-lambda for errorsCause: S3 event notification or dispatcher Lambda issue Solution:
Check dispatcher Lambda logs:
dispatcher-lambdaCheck S3 event notification:
uploads/)Check SQS queue:
Cause: Worker Lambda or SQS trigger issue Solution:
Cause: Pillow library or image processing issue Solution:
Cause: Worker Lambda failure Solution:
Before considering the pipeline complete:
media-api-lambda-role exists with correct permissionsdispatcher-lambda-role exists with SQS permissionsrequest-upload-lambda exists and is deployedget-job-status-lambda exists and is deployeddispatcher-lambda exists and is deployedmedia-processing-api existsPOST /uploads is configured and workingGET /jobs/{jobId} is configured and workingtrigger-dispatcher-on-upload exists on uploads bucketuploads/ prefixdispatcher-lambdaCongratulations! You’ve successfully built a complete serverless media processing pipeline.
What’s Next:
Optional Enhancements:
Production Considerations:
User Application
↓ (1) POST /uploads
API Gateway → request-upload-lambda
↓ (2) Returns uploadUrl + jobId
↓ (creates DynamoDB record: PENDING)
↓
User uploads image to S3
↓ (3) S3 Event
dispatcher-lambda
↓ (4) Sends message
SQS Queue (media-processing-queue)
↓ (5) Triggers
worker-lambda
↓ (6) Downloads, processes, uploads
↓ (updates DynamoDB: COMPLETED)
↓
User Application
↓ (7) GET /jobs/{jobId}
API Gateway → get-job-status-lambda
↓ (8) Returns status + downloadUrl
| Resource Type | Name | Purpose |
|---|---|---|
| IAM Role | media-api-lambda-role | Execution role for API Lambda functions |
| Lambda | request-upload-lambda | Generates upload URLs and creates job records |
| Lambda | get-job-status-lambda | Retrieves job status from DynamoDB |
| Lambda | dispatcher-lambda | Forwards S3 events to SQS queue |
| API Gateway | media-processing-api | HTTP API for client applications |
| API Route | POST /uploads | Request upload URL endpoint |
| API Route | GET /jobs/{jobId} | Check job status endpoint |
| S3 Event | trigger-dispatcher-on-upload | Triggers dispatcher on file upload |
POST https://{api-id}.execute-api.ap-south-1.amazonaws.com/uploads
GET https://{api-id}.execute-api.ap-south-1.amazonaws.com/jobs/{jobId}
In this comprehensive guide, we’ve completed the serverless media processing pipeline:
✅ API Gateway Setup:
✅ Lambda Functions:
✅ End-to-End Testing:
✅ Production Readiness:
Key Benefits:
Ready for Part 3? We’ll cover cleanup procedures and resource management to ensure you can safely delete all resources when done testing!
This is Part 2 of a 3-part series on building a production-ready serverless media processing pipeline. Stay tuned for Part 3 where we’ll cover cleanup procedures and resource management! Here is the Part 3, where we’ll cover cleanup procedures and resource management!