To Nha Notes | July 10, 2025, 3:30 p.m.
This AWS sample demonstrates how to build an end-to-end transactional data lake by streaming Change Data Capture (CDC) from MySQL to Amazon S3 in Apache Iceberg format using Amazon Data Firehose and DMS.
Architecture Components:
# Clone the repository git clone https://github.com/aws-samples/transactional-datalake-using-amazon-datafirehose-iceberg.git cd transactional-datalake-using-amazon-datafirehose-iceberg # Create and activate virtual environment python3 -m venv .venv source .venv/bin/activate # On Windows: .venv\Scripts\activate.bat # Install dependencies pip install -r requirements.txt # Set AWS environment variables export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text) export CDK_DEFAULT_REGION=$(aws configure get region)
Create cdk.context.json with your specific configuration:
{
"db_cluster_name": "dms-source-db",
"dms_data_source": {
"database_name": "testdb",
"table_name": "retail_trans"
},
"kinesis_stream_name": "cdc_retail_trans_stream",
"data_firehose_configuration": {
"stream_name": "PUT-ICE-0d1f6",
"destination_iceberg_table_configuration": {
"database_name": "cdc_iceberg_demo_db",
"table_name": "retail_trans_iceberg",
"unique_keys": ["trans_id"]
},
"s3_bucket_name": "trans-datalake-iceberg-{your-region}",
"output_prefix": "cdc_iceberg_demo_db/retail_trans_iceberg"
}
}
Deploy stacks in the following sequence:
# 1. Create VPC and Aurora MySQL cdk deploy TransactionalDataLakeVpc AuroraMysqlAsDMSDataSource # 2. (Optional) Create bastion host for database access cdk deploy AuroraMysqlBastionHost # 3. Create DMS components cdk deploy DMSTargetKinesisDataStream DMSRequiredIAMRolesStack DMSTaskAuroraMysqlToKinesis # 4. Create S3 bucket for Iceberg cdk deploy DataFirehoseToIcebergS3Path # 5. Create Lambda function for data transformation cdk deploy FirehoseDataTransformLambdaStack # 6. Create IAM roles and permissions cdk deploy FirehoseToIcebergRoleStack GrantLFPermissionsOnFirehoseRole # 7. Deploy Data Firehose cdk deploy FirehoseToIcebergStack
Connect to Aurora MySQL and set up the sample database:
-- Enable binary logging verification
SHOW GLOBAL VARIABLES LIKE "log_bin";
-- Set binary log retention for DMS
CALL mysql.rds_set_configuration('binlog retention hours', 24);
-- Create sample database and table
CREATE DATABASE IF NOT EXISTS testdb;
USE testdb;
CREATE TABLE IF NOT EXISTS retail_trans (
trans_id BIGINT(20) AUTO_INCREMENT,
customer_id VARCHAR(12) NOT NULL,
event VARCHAR(10) DEFAULT NULL,
sku VARCHAR(10) NOT NULL,
amount INT DEFAULT 0,
device VARCHAR(10) DEFAULT NULL,
trans_datetime DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(trans_id),
KEY(trans_datetime)
) ENGINE=InnoDB AUTO_INCREMENT=0;
-- Create database CREATE DATABASE IF NOT EXISTS cdc_iceberg_demo_db; -- Create Iceberg table CREATE TABLE cdc_iceberg_demo_db.retail_trans_iceberg ( trans_id int, customer_id string, event string, sku string, amount int, device string, trans_datetime timestamp ) PARTITIONED BY (`event`) LOCATION 's3://your-bucket-name/cdc_iceberg_demo_db/retail_trans_iceberg' TBLPROPERTIES ( 'table_type'='iceberg', 'format'='parquet', 'write_compression'='snappy' );
# Start DMS replication task
DMS_REPLICATION_TASK_ID=$(aws cloudformation describe-stacks --stack-name DMSTaskAuroraMysqlToKinesis | jq -r '.Stacks[0].Outputs.[] | select(.OutputKey == "DMSReplicationTaskId") | .OutputValue | ascii_downcase')
DMS_REPLICATION_TASK_ARN=$(aws dms describe-replication-tasks | jq -r ".ReplicationTasks[] | select(.ReplicationTaskIdentifier == \"${DMS_REPLICATION_TASK_ID}\") | .ReplicationTaskArn")
aws dms start-replication-task \
--replication-task-arn ${DMS_REPLICATION_TASK_ARN} \
--start-replication-task-type start-replication
Generate test data and perform DML operations:
# Connect to bastion host and generate test data
python3 gen_fake_mysql_data.py \
--database testdb \
--table retail_trans \
--user admin \
--password your-password \
--host your-db-cluster-endpoint \
--max-count 10
# Test DML operations
UPDATE retail_trans SET amount = 50 WHERE trans_id = 1;
DELETE FROM retail_trans WHERE trans_id = 2;
INSERT INTO retail_trans (customer_id, event, sku, amount, device) VALUES ("12345", "purchase", "ABC123", 99, "mobile");
-- Check data count SELECT COUNT(*) FROM cdc_iceberg_demo_db.retail_trans_iceberg; -- Query by partition SELECT * FROM cdc_iceberg_demo_db.retail_trans_iceberg WHERE event = 'purchase'; -- Check recent changes SELECT * FROM cdc_iceberg_demo_db.retail_trans_iceberg ORDER BY trans_datetime DESC LIMIT 10;
If you encounter permission errors, ensure the CDK execution role is a data lake administrator:
# Check current data lake administrators in Lake Formation console # Add CDK execution role if missing
Create required DMS IAM roles if missing:
# The following roles are required: # - dms-vpc-role # - dms-cloudwatch-logs-role # - dms-access-for-endpoint (for Redshift targets)
Ensure Aurora MySQL has binary logging enabled and proper retention:
SHOW GLOBAL VARIABLES LIKE "log_bin";
CALL mysql.rds_set_configuration('binlog retention hours', 24);
# Stop DMS replication task
aws dms stop-replication-task --replication-task-arn ${DMS_REPLICATION_TASK_ARN}
# Destroy all stacks
cdk destroy --force --all
This reference guide is based on the AWS sample repository. Always check the official documentation for the latest updates and best practices.