Transactional Data Lake with Amazon Data Firehose & Iceberg

To Nha Notes | July 10, 2025, 3:30 p.m.

Overview

This AWS sample demonstrates how to build an end-to-end transactional data lake by streaming Change Data Capture (CDC) from MySQL to Amazon S3 in Apache Iceberg format using Amazon Data Firehose and DMS.

Architecture Components:

  • Source: Aurora MySQL Database
  • CDC Capture: AWS Database Migration Service (DMS)
  • Streaming: Amazon Kinesis Data Streams
  • Processing: Amazon Data Firehose with Lambda transformation
  • Storage: Amazon S3 with Apache Iceberg table format
  • Query: Amazon Athena

Prerequisites

  • AWS CLI configured with appropriate permissions
  • Python 3.x with CDK installed
  • Basic understanding of AWS services (RDS, DMS, Kinesis, S3, Athena)

Quick Start Guide

1. Initial Setup

# Clone the repository
git clone https://github.com/aws-samples/transactional-datalake-using-amazon-datafirehose-iceberg.git
cd transactional-datalake-using-amazon-datafirehose-iceberg

# Create and activate virtual environment
python3 -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate.bat

# Install dependencies
pip install -r requirements.txt

# Set AWS environment variables
export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
export CDK_DEFAULT_REGION=$(aws configure get region)

2. Configuration

Create cdk.context.json with your specific configuration:

{
  "db_cluster_name": "dms-source-db",
  "dms_data_source": {
    "database_name": "testdb",
    "table_name": "retail_trans"
  },
  "kinesis_stream_name": "cdc_retail_trans_stream",
  "data_firehose_configuration": {
    "stream_name": "PUT-ICE-0d1f6",
    "destination_iceberg_table_configuration": {
      "database_name": "cdc_iceberg_demo_db",
      "table_name": "retail_trans_iceberg",
      "unique_keys": ["trans_id"]
    },
    "s3_bucket_name": "trans-datalake-iceberg-{your-region}",
    "output_prefix": "cdc_iceberg_demo_db/retail_trans_iceberg"
  }
}

3. Deployment Order

Deploy stacks in the following sequence:

# 1. Create VPC and Aurora MySQL
cdk deploy TransactionalDataLakeVpc AuroraMysqlAsDMSDataSource

# 2. (Optional) Create bastion host for database access
cdk deploy AuroraMysqlBastionHost

# 3. Create DMS components
cdk deploy DMSTargetKinesisDataStream DMSRequiredIAMRolesStack DMSTaskAuroraMysqlToKinesis

# 4. Create S3 bucket for Iceberg
cdk deploy DataFirehoseToIcebergS3Path

# 5. Create Lambda function for data transformation
cdk deploy FirehoseDataTransformLambdaStack

# 6. Create IAM roles and permissions
cdk deploy FirehoseToIcebergRoleStack GrantLFPermissionsOnFirehoseRole

# 7. Deploy Data Firehose
cdk deploy FirehoseToIcebergStack

4. Database Setup

Connect to Aurora MySQL and set up the sample database:

-- Enable binary logging verification
SHOW GLOBAL VARIABLES LIKE "log_bin";

-- Set binary log retention for DMS
CALL mysql.rds_set_configuration('binlog retention hours', 24);

-- Create sample database and table
CREATE DATABASE IF NOT EXISTS testdb;
USE testdb;

CREATE TABLE IF NOT EXISTS retail_trans (
  trans_id BIGINT(20) AUTO_INCREMENT,
  customer_id VARCHAR(12) NOT NULL,
  event VARCHAR(10) DEFAULT NULL,
  sku VARCHAR(10) NOT NULL,
  amount INT DEFAULT 0,
  device VARCHAR(10) DEFAULT NULL,
  trans_datetime DATETIME DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY(trans_id),
  KEY(trans_datetime)
) ENGINE=InnoDB AUTO_INCREMENT=0;

5. Create Iceberg Table in Athena

-- Create database
CREATE DATABASE IF NOT EXISTS cdc_iceberg_demo_db;

-- Create Iceberg table
CREATE TABLE cdc_iceberg_demo_db.retail_trans_iceberg (
  trans_id int,
  customer_id string,
  event string,
  sku string,
  amount int,
  device string,
  trans_datetime timestamp
) PARTITIONED BY (`event`)
LOCATION 's3://your-bucket-name/cdc_iceberg_demo_db/retail_trans_iceberg'
TBLPROPERTIES (
  'table_type'='iceberg',
  'format'='parquet',
  'write_compression'='snappy'
);

6. Start Data Pipeline

# Start DMS replication task
DMS_REPLICATION_TASK_ID=$(aws cloudformation describe-stacks --stack-name DMSTaskAuroraMysqlToKinesis | jq -r '.Stacks[0].Outputs.[] | select(.OutputKey == "DMSReplicationTaskId") | .OutputValue | ascii_downcase')

DMS_REPLICATION_TASK_ARN=$(aws dms describe-replication-tasks | jq -r ".ReplicationTasks[] | select(.ReplicationTaskIdentifier == \"${DMS_REPLICATION_TASK_ID}\") | .ReplicationTaskArn")

aws dms start-replication-task \
  --replication-task-arn ${DMS_REPLICATION_TASK_ARN} \
  --start-replication-task-type start-replication

7. Testing the Pipeline

Generate test data and perform DML operations:

# Connect to bastion host and generate test data
python3 gen_fake_mysql_data.py \
  --database testdb \
  --table retail_trans \
  --user admin \
  --password your-password \
  --host your-db-cluster-endpoint \
  --max-count 10

# Test DML operations
UPDATE retail_trans SET amount = 50 WHERE trans_id = 1;
DELETE FROM retail_trans WHERE trans_id = 2;
INSERT INTO retail_trans (customer_id, event, sku, amount, device) VALUES ("12345", "purchase", "ABC123", 99, "mobile");

8. Query Data in Athena

-- Check data count
SELECT COUNT(*) FROM cdc_iceberg_demo_db.retail_trans_iceberg;

-- Query by partition
SELECT * FROM cdc_iceberg_demo_db.retail_trans_iceberg WHERE event = 'purchase';

-- Check recent changes
SELECT * FROM cdc_iceberg_demo_db.retail_trans_iceberg ORDER BY trans_datetime DESC LIMIT 10;

Key Features

  • Real-time CDC: Captures all database changes (INSERT, UPDATE, DELETE)
  • Transactional Consistency: Iceberg format ensures ACID properties
  • Scalable Architecture: Serverless components auto-scale with demand
  • Cost Efficient: Pay-per-use model with S3 storage
  • Analytics Ready: Direct querying with Athena, compatible with Spark

Common Issues & Solutions

1. Lake Formation Permissions

If you encounter permission errors, ensure the CDK execution role is a data lake administrator:

# Check current data lake administrators in Lake Formation console
# Add CDK execution role if missing

2. DMS VPC Role Error

Create required DMS IAM roles if missing:

# The following roles are required:
# - dms-vpc-role
# - dms-cloudwatch-logs-role  
# - dms-access-for-endpoint (for Redshift targets)

3. Binary Logging Issues

Ensure Aurora MySQL has binary logging enabled and proper retention:

SHOW GLOBAL VARIABLES LIKE "log_bin";
CALL mysql.rds_set_configuration('binlog retention hours', 24);

Cleanup

# Stop DMS replication task
aws dms stop-replication-task --replication-task-arn ${DMS_REPLICATION_TASK_ARN}

# Destroy all stacks
cdk destroy --force --all

Useful CDK Commands

  • cdk ls - List all stacks
  • cdk synth - Synthesize CloudFormation template
  • cdk deploy - Deploy stack(s)
  • cdk diff - Compare deployed stack with current state
  • cdk destroy - Delete stack(s)

Related Resources


This reference guide is based on the AWS sample repository. Always check the official documentation for the latest updates and best practices.