Database Migration Patterns: Zero-Downtime Schema Changes in Production
Executive Summary
Zero-downtime database migrations are critical for maintaining service availability in production environments. This comprehensive guide explores proven patterns and strategies for implementing schema changes without service interruption, including blue-green deployments, expand-contract patterns, and automated migration tools.
Key Migration Strategies
Blue-Green Deployments: Parallel database environments enabling instant switchover with minimal risk and rapid rollback capabilities.
Expand-Contract Pattern: Gradual schema evolution maintaining backward compatibility throughout the migration process.
Version Compatibility: Multi-version support strategies ensuring seamless transitions between application and database versions.
Automated Tooling: Comparison of enterprise migration tools including Flyway, Liquibase, and custom solutions for different use cases.
Zero-Downtime Migration Fundamentals
Migration Architecture Overview
# migration-architecture.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: database-migration-architecture
namespace: database-ops
data:
architecture: |
┌─────────────────────────────────────────────────────────────────┐
│ Zero-Downtime Migration Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Application Layer │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ App v1 │ │ App v2 │ │ App v3 │ │ │
│ │ │ (Previous) │ │ (Current) │ │ (Next) │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │
│ │ └─────────────────┴─────────────────┘ │ │
│ └───────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┴─────────────────────────────┐ │
│ │ Migration Controller │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Version │ │ Schema │ │ Rollback │ │ │
│ │ │ Manager │ │ Migrator │ │ Handler │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └───────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┴─────────────────────────────┐ │
│ │ Database Layer │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Primary │ │ Migration │ │ Rollback │ │ │
│ │ │ Database │ │ Staging │ │ Snapshot │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Change Data Capture (CDC) │ │ │
│ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │
│ │ │ │ Binlog │ │ WAL │ │ OpLog │ │ │ │
│ │ │ │ (MySQL) │ │(PostgreSQL)│ │ (MongoDB) │ │ │ │
│ │ │ └───────────┘ └───────────┘ └───────────┘ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Core Migration Principles
// migration-controller.go
package migration
import (
"context"
"database/sql"
"fmt"
"log"
"sync"
"time"
)
// MigrationController orchestrates zero-downtime migrations
type MigrationController struct {
primaryDB *sql.DB
stagingDB *sql.DB
migrationEngine MigrationEngine
versionManager VersionManager
healthChecker HealthChecker
rollbackHandler RollbackHandler
mu sync.RWMutex
state MigrationState
}
// MigrationState tracks the current migration status
type MigrationState struct {
CurrentVersion string
TargetVersion string
Phase MigrationPhase
StartTime time.Time
LastCheckpoint string
RollbackEnabled bool
Metrics MigrationMetrics
}
// MigrationPhase represents the current phase of migration
type MigrationPhase int
const (
PhaseIdle MigrationPhase = iota
PhasePreCheck
PhaseSchemaExpand
PhaseDataMigration
PhaseValidation
PhaseSchemaContract
PhaseCleanup
PhaseComplete
PhaseRollback
)
// ExecuteMigration performs a zero-downtime migration
func (mc *MigrationController) ExecuteMigration(ctx context.Context, targetVersion string) error {
mc.mu.Lock()
if mc.state.Phase != PhaseIdle {
mc.mu.Unlock()
return fmt.Errorf("migration already in progress: %v", mc.state.Phase)
}
mc.state = MigrationState{
CurrentVersion: mc.versionManager.GetCurrentVersion(),
TargetVersion: targetVersion,
Phase: PhasePreCheck,
StartTime: time.Now(),
RollbackEnabled: true,
}
mc.mu.Unlock()
// Create rollback point
rollbackPoint, err := mc.rollbackHandler.CreateRollbackPoint()
if err != nil {
return fmt.Errorf("failed to create rollback point: %w", err)
}
// Execute migration phases
phases := []struct {
phase MigrationPhase
execute func(context.Context) error
rollback func(context.Context) error
}{
{
phase: PhasePreCheck,
execute: mc.executePreChecks,
rollback: mc.rollbackPreChecks,
},
{
phase: PhaseSchemaExpand,
execute: mc.executeSchemaExpansion,
rollback: mc.rollbackSchemaExpansion,
},
{
phase: PhaseDataMigration,
execute: mc.executeDataMigration,
rollback: mc.rollbackDataMigration,
},
{
phase: PhaseValidation,
execute: mc.executeValidation,
rollback: mc.rollbackValidation,
},
{
phase: PhaseSchemaContract,
execute: mc.executeSchemaContraction,
rollback: mc.rollbackSchemaContraction,
},
{
phase: PhaseCleanup,
execute: mc.executeCleanup,
rollback: nil, // Cleanup is optional
},
}
for _, p := range phases {
mc.updatePhase(p.phase)
if err := p.execute(ctx); err != nil {
log.Printf("Migration failed at phase %v: %v", p.phase, err)
if mc.state.RollbackEnabled && p.rollback != nil {
mc.updatePhase(PhaseRollback)
if rollbackErr := mc.executeRollback(ctx, rollbackPoint); rollbackErr != nil {
return fmt.Errorf("migration failed and rollback failed: %v, rollback error: %w", err, rollbackErr)
}
}
return err
}
// Create checkpoint after each successful phase
if err := mc.createCheckpoint(p.phase); err != nil {
log.Printf("Warning: failed to create checkpoint for phase %v: %v", p.phase, err)
}
}
mc.updatePhase(PhaseComplete)
return nil
}
// executePreChecks validates the migration can proceed
func (mc *MigrationController) executePreChecks(ctx context.Context) error {
checks := []struct {
name string
check func() error
}{
{"database_health", mc.checkDatabaseHealth},
{"version_compatibility", mc.checkVersionCompatibility},
{"disk_space", mc.checkDiskSpace},
{"replication_lag", mc.checkReplicationLag},
{"active_connections", mc.checkActiveConnections},
{"long_running_queries", mc.checkLongRunningQueries},
}
for _, c := range checks {
if err := c.check(); err != nil {
return fmt.Errorf("pre-check '%s' failed: %w", c.name, err)
}
}
return nil
}
// executeSchemaExpansion adds new schema elements
func (mc *MigrationController) executeSchemaExpansion(ctx context.Context) error {
migrations := mc.migrationEngine.GetExpansionMigrations(
mc.state.CurrentVersion,
mc.state.TargetVersion,
)
for _, migration := range migrations {
log.Printf("Executing expansion migration: %s", migration.ID)
// Execute with retry logic
err := mc.executeWithRetry(ctx, func() error {
return mc.migrationEngine.ExecuteMigration(migration)
}, 3, time.Second*5)
if err != nil {
return fmt.Errorf("expansion migration %s failed: %w", migration.ID, err)
}
// Verify migration success
if err := mc.verifyMigration(migration); err != nil {
return fmt.Errorf("verification failed for migration %s: %w", migration.ID, err)
}
}
return nil
}
// executeDataMigration migrates data to new schema
func (mc *MigrationController) executeDataMigration(ctx context.Context) error {
// Get data migration tasks
tasks := mc.migrationEngine.GetDataMigrationTasks(
mc.state.CurrentVersion,
mc.state.TargetVersion,
)
// Execute migrations in parallel with controlled concurrency
sem := make(chan struct{}, 4) // Max 4 concurrent migrations
errChan := make(chan error, len(tasks))
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t DataMigrationTask) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
if err := mc.executeDataMigrationTask(ctx, t); err != nil {
errChan <- fmt.Errorf("task %s failed: %w", t.ID, err)
}
}(task)
}
wg.Wait()
close(errChan)
// Check for errors
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
// executeDataMigrationTask executes a single data migration task
func (mc *MigrationController) executeDataMigrationTask(ctx context.Context, task DataMigrationTask) error {
log.Printf("Starting data migration task: %s", task.ID)
// Create progress tracker
progress := NewProgressTracker(task.EstimatedRows)
// Execute in batches
batchSize := 1000
offset := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Process batch
processed, err := mc.processBatch(task, offset, batchSize)
if err != nil {
return err
}
if processed == 0 {
break // No more rows
}
offset += processed
progress.Update(processed)
// Log progress
if progress.ShouldLog() {
log.Printf("Task %s progress: %s", task.ID, progress.String())
}
// Throttle if needed
if mc.shouldThrottle() {
time.Sleep(100 * time.Millisecond)
}
}
log.Printf("Completed data migration task: %s", task.ID)
return nil
}
// Blue-Green Deployment Implementation
type BlueGreenController struct {
blueDB *sql.DB
greenDB *sql.DB
loadBalancer LoadBalancer
cdcEngine CDCEngine
validator DataValidator
}
// ExecuteBlueGreenMigration performs blue-green deployment
func (bgc *BlueGreenController) ExecuteBlueGreenMigration(ctx context.Context, migration Migration) error {
// Phase 1: Setup green environment
log.Println("Phase 1: Setting up green environment")
if err := bgc.setupGreenEnvironment(); err != nil {
return fmt.Errorf("failed to setup green environment: %w", err)
}
// Phase 2: Start CDC replication
log.Println("Phase 2: Starting CDC replication")
replicationHandle, err := bgc.startCDCReplication()
if err != nil {
return fmt.Errorf("failed to start CDC: %w", err)
}
defer replicationHandle.Stop()
// Phase 3: Apply schema changes to green
log.Println("Phase 3: Applying schema changes")
if err := bgc.applySchemaChanges(migration); err != nil {
return fmt.Errorf("failed to apply schema changes: %w", err)
}
// Phase 4: Wait for replication to catch up
log.Println("Phase 4: Waiting for replication sync")
if err := bgc.waitForReplicationSync(ctx, replicationHandle); err != nil {
return fmt.Errorf("replication sync failed: %w", err)
}
// Phase 5: Validate data consistency
log.Println("Phase 5: Validating data consistency")
if err := bgc.validateDataConsistency(); err != nil {
return fmt.Errorf("data validation failed: %w", err)
}
// Phase 6: Switch traffic to green
log.Println("Phase 6: Switching traffic")
if err := bgc.switchTraffic(); err != nil {
return fmt.Errorf("traffic switch failed: %w", err)
}
// Phase 7: Monitor and validate
log.Println("Phase 7: Post-switch validation")
if err := bgc.postSwitchValidation(ctx); err != nil {
// Rollback if validation fails
log.Println("Post-switch validation failed, rolling back")
if rollbackErr := bgc.rollbackTrafficSwitch(); rollbackErr != nil {
return fmt.Errorf("validation failed and rollback failed: %v, rollback error: %w", err, rollbackErr)
}
return err
}
return nil
}
// CDC Engine for real-time replication
type CDCEngine interface {
StartReplication(source, target *sql.DB) (ReplicationHandle, error)
GetReplicationLag() (time.Duration, error)
ValidateConsistency() error
}
// PostgreSQL CDC Implementation
type PostgreSQLCDC struct {
config CDCConfig
decoder LogicalDecoder
}
func (p *PostgreSQLCDC) StartReplication(source, target *sql.DB) (ReplicationHandle, error) {
// Create replication slot
slotName := fmt.Sprintf("migration_%s", time.Now().Format("20060102150405"))
_, err := source.Exec(`
SELECT pg_create_logical_replication_slot($1, 'pgoutput')
`, slotName)
if err != nil {
return nil, fmt.Errorf("failed to create replication slot: %w", err)
}
// Start replication connection
replConn, err := p.createReplicationConnection()
if err != nil {
return nil, err
}
// Create replication handle
handle := &postgresReplicationHandle{
slotName: slotName,
conn: replConn,
target: target,
decoder: p.decoder,
stopChan: make(chan struct{}),
errorChan: make(chan error, 1),
}
// Start replication worker
go handle.startReplication()
return handle, nil
}
Expand-Contract Pattern Implementation
Database-Agnostic Expand-Contract
// expand-contract.go
package migration
import (
"context"
"database/sql"
"fmt"
"time"
)
// ExpandContractMigration implements the expand-contract pattern
type ExpandContractMigration struct {
db *sql.DB
compatibilityMgr CompatibilityManager
featureFlags FeatureFlagService
}
// Column addition with backward compatibility
func (ecm *ExpandContractMigration) AddColumnWithCompatibility(
ctx context.Context,
table string,
column ColumnDefinition,
) error {
// Phase 1: Add column as nullable
log.Printf("Phase 1: Adding column %s.%s as nullable", table, column.Name)
addColumnSQL := fmt.Sprintf(`
ALTER TABLE %s
ADD COLUMN IF NOT EXISTS %s %s NULL
`, table, column.Name, column.Type)
if _, err := ecm.db.ExecContext(ctx, addColumnSQL); err != nil {
return fmt.Errorf("failed to add column: %w", err)
}
// Phase 2: Add database trigger for backward compatibility
if column.DefaultValue != "" {
triggerSQL := ecm.createCompatibilityTrigger(table, column)
if _, err := ecm.db.ExecContext(ctx, triggerSQL); err != nil {
return fmt.Errorf("failed to create compatibility trigger: %w", err)
}
}
// Phase 3: Backfill existing data
if column.BackfillStrategy != nil {
if err := ecm.backfillData(ctx, table, column); err != nil {
return fmt.Errorf("failed to backfill data: %w", err)
}
}
// Phase 4: Enable feature flag for new column usage
if err := ecm.featureFlags.Enable(fmt.Sprintf("use_%s_%s", table, column.Name)); err != nil {
return fmt.Errorf("failed to enable feature flag: %w", err)
}
return nil
}
// createCompatibilityTrigger creates triggers for backward compatibility
func (ecm *ExpandContractMigration) createCompatibilityTrigger(
table string,
column ColumnDefinition,
) string {
triggerName := fmt.Sprintf("compat_%s_%s", table, column.Name)
return fmt.Sprintf(`
CREATE OR REPLACE FUNCTION %s_func()
RETURNS TRIGGER AS $$
BEGIN
-- Ensure new column has value
IF NEW.%s IS NULL THEN
NEW.%s = %s;
END IF;
-- Sync with old column if exists
IF TG_OP = 'UPDATE' AND OLD.%s IS DISTINCT FROM NEW.%s THEN
-- Update related old columns
%s
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER %s
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
EXECUTE FUNCTION %s_func();
`, triggerName, column.Name, column.Name, column.DefaultValue,
column.Name, column.Name, column.SyncLogic,
triggerName, table, triggerName)
}
// backfillData implements various backfill strategies
func (ecm *ExpandContractMigration) backfillData(
ctx context.Context,
table string,
column ColumnDefinition,
) error {
switch column.BackfillStrategy.Type {
case BackfillBatch:
return ecm.batchBackfill(ctx, table, column)
case BackfillOnline:
return ecm.onlineBackfill(ctx, table, column)
case BackfillLazy:
return ecm.lazyBackfill(ctx, table, column)
default:
return fmt.Errorf("unknown backfill strategy: %v", column.BackfillStrategy.Type)
}
}
// batchBackfill performs batched data backfill
func (ecm *ExpandContractMigration) batchBackfill(
ctx context.Context,
table string,
column ColumnDefinition,
) error {
batchSize := 1000
if column.BackfillStrategy.BatchSize > 0 {
batchSize = column.BackfillStrategy.BatchSize
}
var lastID int64
totalRows := 0
for {
// Update batch of rows
query := fmt.Sprintf(`
UPDATE %s
SET %s = %s
WHERE id > $1
AND %s IS NULL
ORDER BY id
LIMIT $2
RETURNING id
`, table, column.Name, column.BackfillStrategy.ValueExpression,
column.Name)
rows, err := ecm.db.QueryContext(ctx, query, lastID, batchSize)
if err != nil {
return err
}
updatedCount := 0
for rows.Next() {
if err := rows.Scan(&lastID); err != nil {
rows.Close()
return err
}
updatedCount++
}
rows.Close()
if updatedCount == 0 {
break // No more rows to update
}
totalRows += updatedCount
log.Printf("Backfilled %d rows (total: %d)", updatedCount, totalRows)
// Throttle to avoid overload
time.Sleep(100 * time.Millisecond)
}
return nil
}
// Table renaming with zero downtime
func (ecm *ExpandContractMigration) RenameTableZeroDowntime(
ctx context.Context,
oldName, newName string,
) error {
// Step 1: Create updatable view with old name
viewSQL := fmt.Sprintf(`
CREATE OR REPLACE VIEW %s AS
SELECT * FROM %s
`, oldName, newName)
if _, err := ecm.db.ExecContext(ctx, viewSQL); err != nil {
return fmt.Errorf("failed to create compatibility view: %w", err)
}
// Step 2: Create INSTEAD OF triggers for the view
triggers := []string{
ecm.createInsertTrigger(oldName, newName),
ecm.createUpdateTrigger(oldName, newName),
ecm.createDeleteTrigger(oldName, newName),
}
for _, trigger := range triggers {
if _, err := ecm.db.ExecContext(ctx, trigger); err != nil {
return fmt.Errorf("failed to create trigger: %w", err)
}
}
// Step 3: Update feature flag
if err := ecm.featureFlags.Enable(fmt.Sprintf("use_table_%s", newName)); err != nil {
return fmt.Errorf("failed to enable feature flag: %w", err)
}
return nil
}
// Complex schema transformation
type SchemaTransformation struct {
Type TransformationType
Source SchemaElement
Target SchemaElement
Mapping DataMapping
Validation ValidationRules
}
func (ecm *ExpandContractMigration) ExecuteTransformation(
ctx context.Context,
transformation SchemaTransformation,
) error {
// Create transformation plan
plan := ecm.createTransformationPlan(transformation)
// Execute plan phases
for i, phase := range plan.Phases {
log.Printf("Executing transformation phase %d/%d: %s",
i+1, len(plan.Phases), phase.Description)
// Create savepoint for phase
savepoint := fmt.Sprintf("phase_%d", i)
if _, err := ecm.db.ExecContext(ctx, "SAVEPOINT "+savepoint); err != nil {
return err
}
// Execute phase
if err := phase.Execute(ctx, ecm.db); err != nil {
// Rollback to savepoint
if _, rbErr := ecm.db.ExecContext(ctx, "ROLLBACK TO SAVEPOINT "+savepoint); rbErr != nil {
return fmt.Errorf("phase failed and rollback failed: %v, rollback error: %w", err, rbErr)
}
return fmt.Errorf("transformation phase %d failed: %w", i+1, err)
}
// Validate phase results
if err := phase.Validate(ctx, ecm.db); err != nil {
return fmt.Errorf("validation failed for phase %d: %w", i+1, err)
}
}
return nil
}
PostgreSQL-Specific Expand-Contract
-- postgresql-expand-contract.sql
-- Advanced expand-contract patterns for PostgreSQL
-- Pattern 1: Adding NOT NULL column with zero downtime
-- Phase 1: Add nullable column
ALTER TABLE orders ADD COLUMN status varchar(50);
-- Phase 2: Add check constraint (not enforced yet)
ALTER TABLE orders
ADD CONSTRAINT orders_status_not_null
CHECK (status IS NOT NULL) NOT VALID;
-- Phase 3: Backfill in batches
DO $$
DECLARE
batch_size INTEGER := 10000;
updated INTEGER;
BEGIN
LOOP
UPDATE orders
SET status = 'pending'
WHERE status IS NULL
AND id IN (
SELECT id FROM orders
WHERE status IS NULL
LIMIT batch_size
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS updated = ROW_COUNT;
IF updated = 0 THEN
EXIT;
END IF;
-- Prevent long-running transaction
COMMIT;
-- Brief pause to reduce load
PERFORM pg_sleep(0.1);
END LOOP;
END $$;
-- Phase 4: Validate constraint
ALTER TABLE orders VALIDATE CONSTRAINT orders_status_not_null;
-- Phase 5: Convert to NOT NULL
ALTER TABLE orders ALTER COLUMN status SET NOT NULL;
ALTER TABLE orders DROP CONSTRAINT orders_status_not_null;
-- Pattern 2: Splitting tables with zero downtime
-- Original table: users (id, email, profile_data, settings_data)
-- Target: users (id, email), user_profiles (user_id, ...), user_settings (user_id, ...)
-- Phase 1: Create new tables
CREATE TABLE user_profiles (
user_id INTEGER PRIMARY KEY REFERENCES users(id),
bio TEXT,
avatar_url VARCHAR(500),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE user_settings (
user_id INTEGER PRIMARY KEY REFERENCES users(id),
theme VARCHAR(50) DEFAULT 'light',
notifications_enabled BOOLEAN DEFAULT true,
language VARCHAR(10) DEFAULT 'en',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Phase 2: Create triggers to keep data in sync
CREATE OR REPLACE FUNCTION sync_user_split_tables()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
-- Extract and insert profile data
INSERT INTO user_profiles (user_id, bio, avatar_url)
VALUES (
NEW.id,
(NEW.profile_data->>'bio')::TEXT,
(NEW.profile_data->>'avatar_url')::VARCHAR(500)
) ON CONFLICT (user_id) DO UPDATE SET
bio = EXCLUDED.bio,
avatar_url = EXCLUDED.avatar_url,
updated_at = NOW();
-- Extract and insert settings data
INSERT INTO user_settings (user_id, theme, notifications_enabled, language)
VALUES (
NEW.id,
COALESCE((NEW.settings_data->>'theme')::VARCHAR(50), 'light'),
COALESCE((NEW.settings_data->>'notifications_enabled')::BOOLEAN, true),
COALESCE((NEW.settings_data->>'language')::VARCHAR(10), 'en')
) ON CONFLICT (user_id) DO UPDATE SET
theme = EXCLUDED.theme,
notifications_enabled = EXCLUDED.notifications_enabled,
language = EXCLUDED.language,
updated_at = NOW();
ELSIF TG_OP = 'UPDATE' THEN
-- Update profile if data changed
IF OLD.profile_data IS DISTINCT FROM NEW.profile_data THEN
UPDATE user_profiles SET
bio = (NEW.profile_data->>'bio')::TEXT,
avatar_url = (NEW.profile_data->>'avatar_url')::VARCHAR(500),
updated_at = NOW()
WHERE user_id = NEW.id;
END IF;
-- Update settings if data changed
IF OLD.settings_data IS DISTINCT FROM NEW.settings_data THEN
UPDATE user_settings SET
theme = COALESCE((NEW.settings_data->>'theme')::VARCHAR(50), theme),
notifications_enabled = COALESCE((NEW.settings_data->>'notifications_enabled')::BOOLEAN, notifications_enabled),
language = COALESCE((NEW.settings_data->>'language')::VARCHAR(10), language),
updated_at = NOW()
WHERE user_id = NEW.id;
END IF;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM user_profiles WHERE user_id = OLD.id;
DELETE FROM user_settings WHERE user_id = OLD.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER sync_user_tables_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW
EXECUTE FUNCTION sync_user_split_tables();
-- Phase 3: Backfill existing data
INSERT INTO user_profiles (user_id, bio, avatar_url)
SELECT
id,
(profile_data->>'bio')::TEXT,
(profile_data->>'avatar_url')::VARCHAR(500)
FROM users
ON CONFLICT (user_id) DO NOTHING;
INSERT INTO user_settings (user_id, theme, notifications_enabled, language)
SELECT
id,
COALESCE((settings_data->>'theme')::VARCHAR(50), 'light'),
COALESCE((settings_data->>'notifications_enabled')::BOOLEAN, true),
COALESCE((settings_data->>'language')::VARCHAR(10), 'en')
FROM users
ON CONFLICT (user_id) DO NOTHING;
-- Phase 4: Create views for backward compatibility
CREATE OR REPLACE VIEW users_legacy AS
SELECT
u.id,
u.email,
jsonb_build_object(
'bio', p.bio,
'avatar_url', p.avatar_url
) as profile_data,
jsonb_build_object(
'theme', s.theme,
'notifications_enabled', s.notifications_enabled,
'language', s.language
) as settings_data,
u.created_at,
u.updated_at
FROM users u
LEFT JOIN user_profiles p ON u.id = p.user_id
LEFT JOIN user_settings s ON u.id = s.user_id;
-- Phase 5: After application migration, drop old columns
-- ALTER TABLE users DROP COLUMN profile_data;
-- ALTER TABLE users DROP COLUMN settings_data;
-- DROP VIEW users_legacy;
-- DROP TRIGGER sync_user_tables_trigger ON users;
-- DROP FUNCTION sync_user_split_tables();
Version Compatibility Strategies
Multi-Version Support Implementation
// version-compatibility.go
package migration
import (
"context"
"fmt"
"sync"
)
// VersionCompatibilityManager manages multi-version database support
type VersionCompatibilityManager struct {
versions map[string]VersionHandler
activeVersion string
router QueryRouter
mu sync.RWMutex
}
// VersionHandler handles version-specific logic
type VersionHandler struct {
Version string
SchemaAdapter SchemaAdapter
QueryRewriter QueryRewriter
DataTransformer DataTransformer
}
// QueryRouter routes queries to appropriate version handlers
type QueryRouter struct {
rules []RoutingRule
fallback VersionHandler
}
// Execute query with version compatibility
func (vcm *VersionCompatibilityManager) ExecuteQuery(
ctx context.Context,
query Query,
clientVersion string,
) (Result, error) {
vcm.mu.RLock()
handler, exists := vcm.versions[clientVersion]
vcm.mu.RUnlock()
if !exists {
// Use compatibility layer for unknown versions
handler = vcm.getCompatibleHandler(clientVersion)
}
// Rewrite query for target version
rewrittenQuery, err := handler.QueryRewriter.Rewrite(query)
if err != nil {
return nil, fmt.Errorf("query rewrite failed: %w", err)
}
// Execute query
result, err := vcm.executeVersionedQuery(ctx, rewrittenQuery, handler)
if err != nil {
return nil, err
}
// Transform result for client version
transformedResult, err := handler.DataTransformer.Transform(result, clientVersion)
if err != nil {
return nil, fmt.Errorf("result transformation failed: %w", err)
}
return transformedResult, nil
}
// SchemaAdapter provides version-specific schema adaptations
type SchemaAdapter interface {
AdaptSchema(fromVersion, toVersion string) ([]Migration, error)
ValidateCompatibility(version string) error
}
// PostgreSQL Schema Adapter
type PostgreSQLSchemaAdapter struct {
db *sql.DB
}
func (psa *PostgreSQLSchemaAdapter) AdaptSchema(fromVersion, toVersion string) ([]Migration, error) {
// Generate migrations based on version differences
migrations := []Migration{}
// Example: v1 to v2 adds new column
if fromVersion == "v1" && toVersion == "v2" {
migrations = append(migrations, Migration{
ID: "add_feature_flags",
Up: `
ALTER TABLE users
ADD COLUMN IF NOT EXISTS feature_flags JSONB DEFAULT '{}';
CREATE INDEX IF NOT EXISTS idx_users_feature_flags
ON users USING gin(feature_flags);
`,
Down: `
ALTER TABLE users DROP COLUMN IF EXISTS feature_flags;
`,
})
}
// Example: v2 to v3 changes data types
if fromVersion == "v2" && toVersion == "v3" {
migrations = append(migrations, Migration{
ID: "update_timestamp_precision",
Up: `
-- Add new column with microsecond precision
ALTER TABLE events
ADD COLUMN created_at_precise TIMESTAMPTZ(6);
-- Copy data with precision
UPDATE events
SET created_at_precise = created_at;
-- Swap columns
ALTER TABLE events
DROP COLUMN created_at;
ALTER TABLE events
RENAME COLUMN created_at_precise TO created_at;
`,
Down: `
-- Revert to second precision
ALTER TABLE events
ADD COLUMN created_at_standard TIMESTAMPTZ(0);
UPDATE events
SET created_at_standard = created_at;
ALTER TABLE events
DROP COLUMN created_at;
ALTER TABLE events
RENAME COLUMN created_at_standard TO created_at;
`,
})
}
return migrations, nil
}
// Query rewriter for version compatibility
type QueryRewriter interface {
Rewrite(query Query) (Query, error)
}
type SmartQueryRewriter struct {
rules []RewriteRule
}
type RewriteRule struct {
Pattern string
Replacement string
Condition func(Query) bool
}
func (sqr *SmartQueryRewriter) Rewrite(query Query) (Query, error) {
rewritten := query
for _, rule := range sqr.rules {
if rule.Condition(query) {
rewritten = applyRewriteRule(rewritten, rule)
}
}
return rewritten, nil
}
// API Version Adapter for database changes
type APIVersionAdapter struct {
versionMappings map[string]FieldMapping
}
type FieldMapping struct {
OldField string
NewField string
Transform func(interface{}) interface{}
}
// Transform API response based on client version
func (ava *APIVersionAdapter) TransformResponse(
data interface{},
clientVersion string,
) (interface{}, error) {
mapping, exists := ava.versionMappings[clientVersion]
if !exists {
return data, nil // No transformation needed
}
// Apply field mappings
transformed := make(map[string]interface{})
switch d := data.(type) {
case map[string]interface{}:
for key, value := range d {
if mapping.OldField == key {
// Apply transformation
if mapping.Transform != nil {
value = mapping.Transform(value)
}
transformed[mapping.NewField] = value
} else {
transformed[key] = value
}
}
default:
return data, nil
}
return transformed, nil
}
// Compatibility testing framework
type CompatibilityTester struct {
versions []string
testSuite TestSuite
}
func (ct *CompatibilityTester) TestCrossVersionCompatibility() error {
results := make(map[string]map[string]TestResult)
// Test all version combinations
for _, fromVersion := range ct.versions {
results[fromVersion] = make(map[string]TestResult)
for _, toVersion := range ct.versions {
result := ct.testVersionPair(fromVersion, toVersion)
results[fromVersion][toVersion] = result
if !result.Success {
log.Printf("Compatibility test failed: %s -> %s: %v",
fromVersion, toVersion, result.Error)
}
}
}
// Generate compatibility matrix
ct.generateCompatibilityReport(results)
return nil
}
Rollback Procedures
Comprehensive Rollback Strategy
// rollback-handler.go
package migration
import (
"context"
"encoding/json"
"fmt"
"time"
)
// RollbackHandler manages migration rollbacks
type RollbackHandler struct {
db *sql.DB
snapshotStore SnapshotStore
auditLog AuditLogger
validationEngine ValidationEngine
}
// RollbackPoint represents a point in time for rollback
type RollbackPoint struct {
ID string
Timestamp time.Time
Version string
SchemaHash string
DataSnapshot string
Metadata map[string]interface{}
}
// CreateRollbackPoint creates a new rollback point
func (rh *RollbackHandler) CreateRollbackPoint() (*RollbackPoint, error) {
// Capture current schema
schemaHash, err := rh.calculateSchemaHash()
if err != nil {
return nil, fmt.Errorf("failed to calculate schema hash: %w", err)
}
// Create data snapshot
snapshotID, err := rh.snapshotStore.CreateSnapshot()
if err != nil {
return nil, fmt.Errorf("failed to create snapshot: %w", err)
}
rollbackPoint := &RollbackPoint{
ID: generateID(),
Timestamp: time.Now(),
Version: rh.getCurrentVersion(),
SchemaHash: schemaHash,
DataSnapshot: snapshotID,
Metadata: map[string]interface{}{
"trigger": "migration",
"user": getCurrentUser(),
},
}
// Store rollback point
if err := rh.storeRollbackPoint(rollbackPoint); err != nil {
return nil, err
}
return rollbackPoint, nil
}
// ExecuteRollback performs a rollback to a specific point
func (rh *RollbackHandler) ExecuteRollback(
ctx context.Context,
rollbackPoint *RollbackPoint,
) error {
// Validate rollback is possible
if err := rh.validateRollback(rollbackPoint); err != nil {
return fmt.Errorf("rollback validation failed: %w", err)
}
// Create audit entry
auditID := rh.auditLog.StartRollback(rollbackPoint)
defer rh.auditLog.CompleteRollback(auditID)
// Phase 1: Stop writes
if err := rh.enableReadOnlyMode(); err != nil {
return fmt.Errorf("failed to enable read-only mode: %w", err)
}
defer rh.disableReadOnlyMode()
// Phase 2: Restore schema
if err := rh.restoreSchema(ctx, rollbackPoint); err != nil {
return fmt.Errorf("schema restoration failed: %w", err)
}
// Phase 3: Restore data
if err := rh.restoreData(ctx, rollbackPoint); err != nil {
return fmt.Errorf("data restoration failed: %w", err)
}
// Phase 4: Validate restoration
if err := rh.validateRestoration(ctx, rollbackPoint); err != nil {
return fmt.Errorf("restoration validation failed: %w", err)
}
return nil
}
// restoreSchema restores database schema
func (rh *RollbackHandler) restoreSchema(
ctx context.Context,
rollbackPoint *RollbackPoint,
) error {
// Get schema differences
currentSchema, err := rh.getCurrentSchema()
if err != nil {
return err
}
targetSchema, err := rh.getSchemaAtPoint(rollbackPoint)
if err != nil {
return err
}
// Generate reverse migrations
reverseMigrations := rh.generateReverseMigrations(currentSchema, targetSchema)
// Execute reverse migrations in order
for _, migration := range reverseMigrations {
log.Printf("Executing reverse migration: %s", migration.ID)
if err := rh.executeMigration(ctx, migration); err != nil {
return fmt.Errorf("reverse migration %s failed: %w", migration.ID, err)
}
}
return nil
}
// Time-based rollback with point-in-time recovery
type PointInTimeRecovery struct {
walArchive WALArchive
baseBackup BaseBackup
recoveryMgr RecoveryManager
}
func (pitr *PointInTimeRecovery) RecoverToTimestamp(
ctx context.Context,
targetTime time.Time,
) error {
// Find appropriate base backup
backup, err := pitr.baseBackup.FindNearestBefore(targetTime)
if err != nil {
return fmt.Errorf("no suitable backup found: %w", err)
}
// Restore base backup
if err := pitr.restoreBaseBackup(ctx, backup); err != nil {
return fmt.Errorf("base backup restoration failed: %w", err)
}
// Apply WAL up to target time
if err := pitr.applyWALToTime(ctx, backup.Timestamp, targetTime); err != nil {
return fmt.Errorf("WAL replay failed: %w", err)
}
return nil
}
// Automated rollback decision engine
type RollbackDecisionEngine struct {
metrics MetricsCollector
thresholds RollbackThresholds
aiPredictor AnomalyDetector
}
type RollbackThresholds struct {
ErrorRateThreshold float64
LatencyThreshold time.Duration
DataIntegrityFailures int
CustomerImpact float64
}
func (rde *RollbackDecisionEngine) ShouldRollback() (bool, string) {
// Collect current metrics
currentMetrics := rde.metrics.GetCurrentMetrics()
// Check error rate
if currentMetrics.ErrorRate > rde.thresholds.ErrorRateThreshold {
return true, fmt.Sprintf("Error rate %.2f%% exceeds threshold %.2f%%",
currentMetrics.ErrorRate*100, rde.thresholds.ErrorRateThreshold*100)
}
// Check latency
if currentMetrics.P99Latency > rde.thresholds.LatencyThreshold {
return true, fmt.Sprintf("P99 latency %v exceeds threshold %v",
currentMetrics.P99Latency, rde.thresholds.LatencyThreshold)
}
// Check data integrity
integrityErrors := rde.checkDataIntegrity()
if integrityErrors > rde.thresholds.DataIntegrityFailures {
return true, fmt.Sprintf("Data integrity failures: %d", integrityErrors)
}
// AI-based anomaly detection
if anomaly := rde.aiPredictor.DetectAnomaly(currentMetrics); anomaly != nil {
return true, fmt.Sprintf("Anomaly detected: %s", anomaly.Description)
}
return false, ""
}
// Rollback verification
func (rh *RollbackHandler) validateRestoration(
ctx context.Context,
rollbackPoint *RollbackPoint,
) error {
validations := []ValidationCheck{
{
Name: "Schema Integrity",
Check: func() error {
return rh.validateSchemaIntegrity(rollbackPoint.SchemaHash)
},
},
{
Name: "Data Consistency",
Check: func() error {
return rh.validateDataConsistency(rollbackPoint)
},
},
{
Name: "Foreign Key Constraints",
Check: func() error {
return rh.validateForeignKeys()
},
},
{
Name: "Index Integrity",
Check: func() error {
return rh.validateIndexes()
},
},
}
for _, validation := range validations {
if err := validation.Check(); err != nil {
return fmt.Errorf("validation '%s' failed: %w", validation.Name, err)
}
}
return nil
}
Migration Tool Comparison
Flyway Implementation
// flyway-enterprise-config.java
package com.enterprise.migration;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.flywaydb.core.api.callback.Callback;
import org.flywaydb.core.api.callback.Context;
import org.flywaydb.core.api.callback.Event;
public class FlywayEnterpriseConfig {
public static Flyway configureEnterpriseFlyway() {
FluentConfiguration config = Flyway.configure()
.dataSource(getDataSource())
.schemas("public", "audit", "archive")
.table("schema_version")
.baselineOnMigrate(true)
.baselineVersion("1.0")
.installedBy("migration-service")
.mixed(true) // Allow mixing transactional and non-transactional
.group(true) // Group multiple migrations in single transaction
.outOfOrder(false) // Enforce order
.validateOnMigrate(true)
.cleanDisabled(true) // Prevent accidental clean in production
.callbacks(
new MigrationAuditCallback(),
new PerformanceMonitorCallback(),
new RollbackPrepareCallback()
)
.placeholders(Map.of(
"environment", System.getenv("ENVIRONMENT"),
"region", System.getenv("AWS_REGION")
))
.locations(
"classpath:db/migration",
"filesystem:/opt/migrations/custom",
"s3:my-bucket/migrations"
)
.resolvers(
new ConditionalMigrationResolver(),
new EncryptedMigrationResolver()
)
.target("latest")
.cherryPick("2.1", "2.3", "3.0"); // Selective migrations
return new Flyway(config);
}
// Custom callback for audit logging
static class MigrationAuditCallback implements Callback {
@Override
public boolean supports(Event event, Context context) {
return true;
}
@Override
public boolean canHandleInTransaction(Event event, Context context) {
return true;
}
@Override
public void handle(Event event, Context context) {
if (event == Event.BEFORE_MIGRATE) {
logMigrationStart(context);
createRollbackPoint(context);
} else if (event == Event.AFTER_MIGRATE) {
logMigrationComplete(context);
validateMigration(context);
} else if (event == Event.AFTER_MIGRATE_ERROR) {
logMigrationFailure(context);
if (shouldAutoRollback(context)) {
executeRollback(context);
}
}
}
}
// Advanced migration with zero-downtime
public static void executeZeroDowntimeMigration() {
Flyway flyway = configureEnterpriseFlyway();
// Phase 1: Validate pending migrations
MigrationValidationResult validation = flyway.validateWithResult();
if (!validation.validationSuccessful) {
throw new RuntimeException("Validation failed: " + validation.errorDetails);
}
// Phase 2: Create pre-migration snapshot
createDatabaseSnapshot();
// Phase 3: Execute migrations with monitoring
try {
flyway.migrate();
} catch (Exception e) {
// Automatic rollback on failure
rollbackToSnapshot();
throw e;
}
// Phase 4: Post-migration validation
performPostMigrationValidation();
}
}
Liquibase Advanced Configuration
<!-- liquibase-enterprise.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd
http://www.liquibase.org/xml/ns/dbchangelog-ext
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd">
<!-- Properties for environment-specific configuration -->
<property name="table.prefix" value="${env.TABLE_PREFIX}" global="true"/>
<property name="index.tablespace" value="${env.INDEX_TABLESPACE}" global="true"/>
<!-- Include modular changesets -->
<include file="db/changelog/releases/v1.0/master.xml"/>
<include file="db/changelog/releases/v2.0/master.xml"/>
<!-- Conditional changeset based on database type -->
<changeSet id="2024-01-001" author="migration-team">
<preConditions onFail="MARK_RAN">
<dbms type="postgresql"/>
<sqlCheck expectedResult="0">
SELECT COUNT(*) FROM pg_extension WHERE extname = 'uuid-ossp'
</sqlCheck>
</preConditions>
<sql>CREATE EXTENSION IF NOT EXISTS "uuid-ossp";</sql>
</changeSet>
<!-- Zero-downtime column addition with backfill -->
<changeSet id="2024-01-002" author="migration-team" runInTransaction="false">
<comment>Add user_preferences column with zero downtime</comment>
<!-- Step 1: Add nullable column -->
<addColumn tableName="users">
<column name="user_preferences" type="jsonb">
<constraints nullable="true"/>
</column>
</addColumn>
<!-- Step 2: Create backfill function -->
<createProcedure>
CREATE OR REPLACE FUNCTION backfill_user_preferences()
RETURNS void AS $$
DECLARE
batch_size INTEGER := 1000;
total_updated INTEGER := 0;
batch_updated INTEGER;
BEGIN
LOOP
UPDATE users
SET user_preferences = jsonb_build_object(
'theme', 'light',
'language', 'en',
'notifications', true
)
WHERE user_preferences IS NULL
LIMIT batch_size;
GET DIAGNOSTICS batch_updated = ROW_COUNT;
total_updated := total_updated + batch_updated;
IF batch_updated = 0 THEN
EXIT;
END IF;
-- Brief pause between batches
PERFORM pg_sleep(0.1);
RAISE NOTICE 'Backfilled % rows', total_updated;
END LOOP;
END;
$$ LANGUAGE plpgsql;
</createProcedure>
<!-- Step 3: Execute backfill -->
<sql>SELECT backfill_user_preferences();</sql>
<!-- Step 4: Add NOT NULL constraint -->
<addNotNullConstraint
tableName="users"
columnName="user_preferences"
constraintName="users_preferences_not_null"/>
<rollback>
<dropColumn tableName="users" columnName="user_preferences"/>
<dropProcedure procedureName="backfill_user_preferences"/>
</rollback>
</changeSet>
<!-- Complex migration with multiple steps -->
<changeSet id="2024-01-003" author="migration-team" context="production">
<validCheckSum>8:b3f2a4c5d6e7f8a9b0c1d2e3f4a5b6c7</validCheckSum>
<preConditions onFail="HALT">
<tableExists tableName="orders"/>
<columnExists tableName="orders" columnName="status"/>
</preConditions>
<!-- Create new enum type -->
<sql splitStatements="false">
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'order_status_v2') THEN
CREATE TYPE order_status_v2 AS ENUM (
'pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded'
);
END IF;
END $$;
</sql>
<!-- Add new column with new type -->
<addColumn tableName="orders">
<column name="status_v2" type="order_status_v2"/>
</addColumn>
<!-- Migrate data with mapping -->
<sql>
UPDATE orders SET status_v2 =
CASE status
WHEN 'new' THEN 'pending'::order_status_v2
WHEN 'in_progress' THEN 'processing'::order_status_v2
WHEN 'completed' THEN 'delivered'::order_status_v2
ELSE status::order_status_v2
END;
</sql>
<rollback>
<dropColumn tableName="orders" columnName="status_v2"/>
<sql>DROP TYPE IF EXISTS order_status_v2;</sql>
</rollback>
</changeSet>
</databaseChangeLog>
Custom Migration Framework
// custom-migration-framework.go
package migration
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"io/fs"
"path/filepath"
"regexp"
"sort"
"time"
)
// CustomMigrationEngine provides advanced migration capabilities
type CustomMigrationEngine struct {
db *sql.DB
config MigrationConfig
parser MigrationParser
executor MigrationExecutor
validator MigrationValidator
hooks MigrationHooks
}
// MigrationConfig holds configuration for migrations
type MigrationConfig struct {
MigrationsPath string
MigrationsTable string
SchemaName string
LockTimeout time.Duration
StatementTimeout time.Duration
DryRun bool
ValidateChecksums bool
AllowOutOfOrder bool
BaselineVersion string
}
// Migration represents a single migration
type Migration struct {
Version string
Description string
Type MigrationType
Checksum string
Script string
UpSQL []string
DownSQL []string
Metadata map[string]string
}
// MigrationType indicates the type of migration
type MigrationType int
const (
MigrationTypeSQL MigrationType = iota
MigrationTypeGo
MigrationTypeProcedural
)
// Execute runs all pending migrations
func (cme *CustomMigrationEngine) Execute(ctx context.Context) error {
// Acquire migration lock
lock, err := cme.acquireLock(ctx)
if err != nil {
return fmt.Errorf("failed to acquire migration lock: %w", err)
}
defer lock.Release()
// Get migration history
history, err := cme.getMigrationHistory()
if err != nil {
return err
}
// Discover migrations
migrations, err := cme.discoverMigrations()
if err != nil {
return err
}
// Determine pending migrations
pending := cme.determinePendingMigrations(migrations, history)
if len(pending) == 0 {
log.Println("No pending migrations")
return nil
}
log.Printf("Found %d pending migrations", len(pending))
// Execute migrations
for _, migration := range pending {
if err := cme.executeMigration(ctx, migration); err != nil {
return fmt.Errorf("migration %s failed: %w", migration.Version, err)
}
}
return nil
}
// executeMigration executes a single migration with full lifecycle
func (cme *CustomMigrationEngine) executeMigration(ctx context.Context, migration Migration) error {
log.Printf("Executing migration %s: %s", migration.Version, migration.Description)
// Pre-execution hooks
if err := cme.hooks.PreMigration(ctx, migration); err != nil {
return fmt.Errorf("pre-migration hook failed: %w", err)
}
// Start transaction
tx, err := cme.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return err
}
defer tx.Rollback()
// Set timeouts
if _, err := tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL lock_timeout = '%s'", cme.config.LockTimeout)); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL statement_timeout = '%s'", cme.config.StatementTimeout)); err != nil {
return err
}
// Execute migration based on type
switch migration.Type {
case MigrationTypeSQL:
err = cme.executeSQLMigration(ctx, tx, migration)
case MigrationTypeGo:
err = cme.executeGoMigration(ctx, tx, migration)
case MigrationTypeProcedural:
err = cme.executeProceduralMigration(ctx, tx, migration)
default:
err = fmt.Errorf("unknown migration type: %v", migration.Type)
}
if err != nil {
return err
}
// Record migration
if err := cme.recordMigration(ctx, tx, migration); err != nil {
return fmt.Errorf("failed to record migration: %w", err)
}
// Validate migration
if err := cme.validator.Validate(ctx, tx, migration); err != nil {
return fmt.Errorf("migration validation failed: %w", err)
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit migration: %w", err)
}
// Post-execution hooks
if err := cme.hooks.PostMigration(ctx, migration); err != nil {
log.Printf("Warning: post-migration hook failed: %v", err)
}
log.Printf("Successfully completed migration %s", migration.Version)
return nil
}
// Advanced migration discovery with multiple sources
func (cme *CustomMigrationEngine) discoverMigrations() ([]Migration, error) {
var migrations []Migration
// Discover from filesystem
fsMigrations, err := cme.discoverFileSystemMigrations()
if err != nil {
return nil, err
}
migrations = append(migrations, fsMigrations...)
// Discover from embedded resources
embeddedMigrations, err := cme.discoverEmbeddedMigrations()
if err != nil {
return nil, err
}
migrations = append(migrations, embeddedMigrations...)
// Discover from remote sources (S3, Git, etc.)
remoteMigrations, err := cme.discoverRemoteMigrations()
if err != nil {
return nil, err
}
migrations = append(migrations, remoteMigrations...)
// Sort migrations by version
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].Version < migrations[j].Version
})
// Validate migration sequence
if err := cme.validateMigrationSequence(migrations); err != nil {
return nil, err
}
return migrations, nil
}
// Parallel migration execution for independent migrations
type ParallelMigrationExecutor struct {
engine *CustomMigrationEngine
maxWorkers int
}
func (pme *ParallelMigrationExecutor) ExecuteParallel(ctx context.Context, migrations []Migration) error {
// Build dependency graph
graph := pme.buildDependencyGraph(migrations)
// Topological sort
executionOrder := graph.TopologicalSort()
// Execute in parallel respecting dependencies
sem := make(chan struct{}, pme.maxWorkers)
errChan := make(chan error, len(migrations))
completed := make(map[string]bool)
var mu sync.Mutex
for _, batch := range executionOrder {
var wg sync.WaitGroup
for _, migration := range batch {
wg.Add(1)
go func(m Migration) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
// Wait for dependencies
pme.waitForDependencies(m, completed, &mu)
// Execute migration
if err := pme.engine.executeMigration(ctx, m); err != nil {
errChan <- fmt.Errorf("migration %s failed: %w", m.Version, err)
return
}
// Mark as completed
mu.Lock()
completed[m.Version] = true
mu.Unlock()
}(migration)
}
wg.Wait()
}
close(errChan)
// Check for errors
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
// Migration validation framework
type MigrationValidator struct {
rules []ValidationRule
}
type ValidationRule struct {
Name string
Validate func(context.Context, *sql.Tx, Migration) error
}
func (mv *MigrationValidator) Validate(ctx context.Context, tx *sql.Tx, migration Migration) error {
for _, rule := range mv.rules {
if err := rule.Validate(ctx, tx, migration); err != nil {
return fmt.Errorf("validation rule '%s' failed: %w", rule.Name, err)
}
}
return nil
}
// Default validation rules
func DefaultValidationRules() []ValidationRule {
return []ValidationRule{
{
Name: "table_count",
Validate: func(ctx context.Context, tx *sql.Tx, m Migration) error {
var count int
err := tx.QueryRowContext(ctx, `
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_schema = 'public'
`).Scan(&count)
if err != nil {
return err
}
if count == 0 {
return fmt.Errorf("no tables found after migration")
}
return nil
},
},
{
Name: "foreign_key_integrity",
Validate: func(ctx context.Context, tx *sql.Tx, m Migration) error {
var violations int
err := tx.QueryRowContext(ctx, `
SELECT COUNT(*)
FROM pg_constraint
WHERE contype = 'f'
AND NOT convalidated
`).Scan(&violations)
if err != nil {
return err
}
if violations > 0 {
return fmt.Errorf("found %d invalid foreign key constraints", violations)
}
return nil
},
},
{
Name: "index_validity",
Validate: func(ctx context.Context, tx *sql.Tx, m Migration) error {
var invalid int
err := tx.QueryRowContext(ctx, `
SELECT COUNT(*)
FROM pg_index
WHERE NOT indisvalid
`).Scan(&invalid)
if err != nil {
return err
}
if invalid > 0 {
return fmt.Errorf("found %d invalid indexes", invalid)
}
return nil
},
},
}
}
Production Deployment Strategies
Canary Deployment for Database Changes
// canary-deployment.go
package deployment
import (
"context"
"fmt"
"sync"
"time"
)
// CanaryDeploymentController manages canary deployments for database changes
type CanaryDeploymentController struct {
primaryDB *sql.DB
canaryDB *sql.DB
router TrafficRouter
monitor HealthMonitor
rollback RollbackController
}
// DeploymentStrategy defines how to roll out changes
type DeploymentStrategy struct {
InitialCanaryPercent float64
IncrementPercent float64
IncrementInterval time.Duration
SuccessThreshold MetricThreshold
RollbackThreshold MetricThreshold
MaxDuration time.Duration
}
// ExecuteCanaryDeployment performs a canary deployment
func (cdc *CanaryDeploymentController) ExecuteCanaryDeployment(
ctx context.Context,
migration Migration,
strategy DeploymentStrategy,
) error {
// Apply migration to canary
if err := cdc.applyToCanary(ctx, migration); err != nil {
return fmt.Errorf("failed to apply migration to canary: %w", err)
}
// Start with initial canary traffic
if err := cdc.router.SetCanaryTraffic(strategy.InitialCanaryPercent); err != nil {
return fmt.Errorf("failed to set initial canary traffic: %w", err)
}
// Monitor and gradually increase traffic
ticker := time.NewTicker(strategy.IncrementInterval)
defer ticker.Stop()
timeout := time.After(strategy.MaxDuration)
currentPercent := strategy.InitialCanaryPercent
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timeout:
return fmt.Errorf("deployment timeout exceeded")
case <-ticker.C:
// Check metrics
metrics := cdc.monitor.GetCurrentMetrics()
// Check for rollback conditions
if cdc.shouldRollback(metrics, strategy.RollbackThreshold) {
log.Printf("Rollback triggered: %v", metrics)
return cdc.executeRollback(ctx)
}
// Check for success conditions
if cdc.meetsSuccessThreshold(metrics, strategy.SuccessThreshold) {
if currentPercent >= 100 {
// Deployment complete
return cdc.finalizeDeployment(ctx)
}
// Increase canary traffic
currentPercent = min(currentPercent + strategy.IncrementPercent, 100)
if err := cdc.router.SetCanaryTraffic(currentPercent); err != nil {
return fmt.Errorf("failed to update canary traffic: %w", err)
}
log.Printf("Increased canary traffic to %.1f%%", currentPercent)
}
}
}
}
// TrafficRouter manages traffic distribution
type TrafficRouter struct {
config RouterConfig
connections sync.Map
}
func (tr *TrafficRouter) RouteQuery(query Query) (*sql.DB, error) {
// Get user/session hash for consistent routing
hash := tr.hashUserSession(query.UserID, query.SessionID)
// Check if this user is in canary group
canaryPercent := tr.getCanaryPercent()
threshold := uint32(float64(^uint32(0)) * canaryPercent / 100)
if hash <= threshold {
return tr.getCanaryConnection()
}
return tr.getPrimaryConnection()
}
// Feature flag integration for gradual rollout
type FeatureFlagController struct {
flags sync.Map
}
func (ffc *FeatureFlagController) IsEnabled(feature string, context map[string]interface{}) bool {
flag, exists := ffc.flags.Load(feature)
if !exists {
return false
}
ff := flag.(*FeatureFlag)
// Check if globally enabled
if ff.Enabled && ff.Percentage >= 100 {
return true
}
// Check percentage rollout
if ff.Enabled && ff.Percentage > 0 {
hash := ffc.hashContext(context)
threshold := uint32(float64(^uint32(0)) * ff.Percentage / 100)
return hash <= threshold
}
// Check specific rules
for _, rule := range ff.Rules {
if rule.Matches(context) {
return rule.Enabled
}
}
return false
}
Monitoring and Alerting
# monitoring-stack.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: migration-monitoring
namespace: database-ops
data:
prometheus-rules.yml: |
groups:
- name: database_migration
interval: 30s
rules:
- alert: MigrationDurationExceeded
expr: |
(time() - migration_start_timestamp) > 3600
and migration_status == "running"
for: 5m
labels:
severity: warning
team: database
annotations:
summary: "Migration running longer than expected"
description: "Migration {{ $labels.migration_id }} has been running for more than 1 hour"
- alert: MigrationErrorRateHigh
expr: |
rate(migration_errors_total[5m]) > 0.1
for: 2m
labels:
severity: critical
team: database
annotations:
summary: "High migration error rate"
description: "Migration error rate is {{ $value }} errors per second"
- alert: DatabaseReplicationLagHigh
expr: |
mysql_slave_lag_seconds > 30
or pg_replication_lag_seconds > 30
for: 5m
labels:
severity: warning
team: database
annotations:
summary: "Database replication lag is high"
description: "Replication lag is {{ $value }} seconds on {{ $labels.instance }}"
- alert: MigrationRollbackTriggered
expr: |
increase(migration_rollbacks_total[1m]) > 0
labels:
severity: critical
team: database
annotations:
summary: "Migration rollback triggered"
description: "Migration {{ $labels.migration_id }} has been rolled back"
grafana-dashboard.json: |
{
"dashboard": {
"title": "Database Migration Monitoring",
"panels": [
{
"title": "Migration Progress",
"targets": [
{
"expr": "migration_progress_percent",
"legendFormat": "{{ migration_id }}"
}
]
},
{
"title": "Schema Change Impact",
"targets": [
{
"expr": "rate(database_queries_total[5m])",
"legendFormat": "Query Rate"
},
{
"expr": "histogram_quantile(0.99, database_query_duration_seconds_bucket)",
"legendFormat": "P99 Latency"
}
]
},
{
"title": "Data Migration Throughput",
"targets": [
{
"expr": "rate(migration_rows_processed_total[1m])",
"legendFormat": "Rows/sec"
}
]
},
{
"title": "Rollback Readiness",
"targets": [
{
"expr": "migration_rollback_point_age_seconds",
"legendFormat": "Rollback Point Age"
}
]
}
]
}
}
Best Practices and Lessons Learned
Migration Checklist
// migration-checklist.go
package checklist
type MigrationChecklist struct {
PreMigration []ChecklistItem
DuringMigration []ChecklistItem
PostMigration []ChecklistItem
}
var EnterpriseChecklist = MigrationChecklist{
PreMigration: []ChecklistItem{
{
Category: "Planning",
Items: []string{
"Review and approve migration plan",
"Identify dependencies and downstream impacts",
"Schedule maintenance window (if needed)",
"Notify stakeholders",
"Prepare rollback plan",
},
},
{
Category: "Technical Preparation",
Items: []string{
"Create full database backup",
"Verify backup integrity",
"Test migration in staging environment",
"Validate rollback procedure",
"Check disk space (3x data size recommended)",
"Review and optimize migration scripts",
"Set up monitoring and alerting",
},
},
{
Category: "Performance Testing",
Items: []string{
"Benchmark current performance",
"Load test migration scripts",
"Identify potential bottlenecks",
"Plan for traffic management",
},
},
},
DuringMigration: []ChecklistItem{
{
Category: "Execution",
Items: []string{
"Enable read-only mode (if applicable)",
"Create migration tracking record",
"Execute migration phases",
"Monitor progress and metrics",
"Validate each phase completion",
"Update status communications",
},
},
{
Category: "Monitoring",
Items: []string{
"Watch error rates",
"Monitor query performance",
"Check replication lag",
"Track resource utilization",
"Monitor application logs",
},
},
},
PostMigration: []ChecklistItem{
{
Category: "Validation",
Items: []string{
"Verify data integrity",
"Run application smoke tests",
"Check performance metrics",
"Validate all constraints",
"Confirm index optimization",
"Test critical queries",
},
},
{
Category: "Cleanup",
Items: []string{
"Remove temporary objects",
"Clean up old columns/tables",
"Update documentation",
"Archive migration logs",
"Document lessons learned",
"Update runbooks",
},
},
},
}
// Automated checklist validation
func (mc *MigrationChecklist) Validate(phase string) error {
var items []ChecklistItem
switch phase {
case "pre":
items = mc.PreMigration
case "during":
items = mc.DuringMigration
case "post":
items = mc.PostMigration
default:
return fmt.Errorf("unknown phase: %s", phase)
}
incomplete := []string{}
for _, category := range items {
for _, item := range category.Items {
if !isCompleted(item) {
incomplete = append(incomplete, fmt.Sprintf("%s: %s", category.Category, item))
}
}
}
if len(incomplete) > 0 {
return fmt.Errorf("incomplete checklist items: %v", incomplete)
}
return nil
}
Conclusion
Zero-downtime database migrations require careful planning, robust tooling, and well-tested procedures. Key strategies for success include:
- Expand-Contract Pattern: Maintain compatibility throughout the migration process
- Blue-Green Deployments: Enable instant switchover with minimal risk
- Comprehensive Testing: Validate migrations thoroughly in staging environments
- Automated Rollback: Implement reliable rollback procedures for quick recovery
- Continuous Monitoring: Track metrics and respond to issues proactively
By combining these patterns with appropriate tooling (Flyway, Liquibase, or custom solutions), organizations can achieve reliable, zero-downtime database migrations that maintain service availability while evolving their data infrastructure.