Enterprise full disk encryption with LUKS (Linux Unified Key Setup) requires sophisticated remote unlock capabilities, automated key management, and robust security frameworks for production environments. This comprehensive guide covers advanced LUKS implementations, enterprise-grade remote unlock systems, high availability configurations, and automated security management for critical infrastructure deployments.

Enterprise LUKS Architecture Overview

Full Disk Encryption Framework Design

Enterprise LUKS deployments demand comprehensive security architectures that balance protection, accessibility, and operational requirements across distributed infrastructure environments.

Enterprise Encryption Strategy Matrix

┌─────────────────────────────────────────────────────────────────┐
│                Enterprise Encryption Architecture               │
├─────────────────┬─────────────────┬─────────────────┬───────────┤
│  Data at Rest   │  Key Management │  Remote Access  │ Compliance│
├─────────────────┼─────────────────┼─────────────────┼───────────┤
│ ┌─────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │ ┌───────┐ │
│ │ LUKS/dm-    │ │ │ HSM/TPM     │ │ │ Dropbear    │ │ │ FIPS  │ │
│ │ crypt       │ │ │ Integration │ │ │ SSH Bridge  │ │ │ 140-2 │ │
│ │ + XTS-AES   │ │ │ + Vault     │ │ │ + Network   │ │ │ SOC 2 │ │
│ └─────────────┘ │ └─────────────┘ │ └─────────────┘ │ └───────┘ │
│                 │                 │                 │           │
│ • Multi-layer   │ • Automated     │ • Zero-touch    │ • Audit   │
│ • Performance   │ • Rotation      │ • HA unlock     │ • Reports │
│ • Recovery      │ • Escrow        │ • Monitoring    │ • Logging │
└─────────────────┴─────────────────┴─────────────────┴───────────┘

LUKS Security Deployment Models

ModelUse CaseSecurity LevelComplexityRecovery Time
Standard RemoteBasic serversMediumLow5-15 minutes
HA Multi-KeyCritical systemsHighMedium2-5 minutes
HSM-IntegratedFinancial/healthcareVery HighHigh1-3 minutes
Zero-TouchCloud/containerHighVery High30 seconds

Advanced LUKS Configuration Framework

Enterprise LUKS Setup and Management System

#!/usr/bin/env python3
"""
Enterprise LUKS Management and Automation Framework
"""

import subprocess
import json
import logging
import secrets
import hashlib
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import cryptography.fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64

@dataclass
class LUKSDevice:
    device_path: str
    mount_point: str
    mapper_name: str
    key_slot: int
    cipher: str = "aes-xts-plain64"
    key_size: int = 512
    hash_algorithm: str = "sha256"
    iteration_time: int = 2000
    backup_header_path: Optional[str] = None
    auto_unlock: bool = False
    remote_unlock: bool = False

@dataclass
class RemoteUnlockConfig:
    dropbear_port: int = 2222
    authorized_keys_path: str = "/etc/dropbear-initramfs/authorized_keys"
    network_config: Dict = None
    timeout_seconds: int = 300
    max_attempts: int = 3
    monitoring_enabled: bool = True

class LUKSManager:
    def __init__(self, config_file: str = "/etc/luks/enterprise.conf"):
        self.config_file = Path(config_file)
        self.devices: Dict[str, LUKSDevice] = {}
        self.remote_config: Optional[RemoteUnlockConfig] = None
        self.logger = self._setup_logging()
        
        # Key management
        self.key_store_path = Path("/etc/luks/keystore")
        self.backup_path = Path("/etc/luks/backups")
        
        self._ensure_directories()
        self._load_configuration()
    
    def _setup_logging(self) -> logging.Logger:
        """Setup comprehensive logging"""
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        
        # File handler
        file_handler = logging.FileHandler('/var/log/luks-enterprise.log')
        file_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(file_formatter)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_formatter = logging.Formatter('%(levelname)s: %(message)s')
        console_handler.setFormatter(console_formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        return logger
    
    def _ensure_directories(self) -> None:
        """Create necessary directories with proper permissions"""
        for directory in [self.config_file.parent, self.key_store_path, self.backup_path]:
            directory.mkdir(mode=0o700, parents=True, exist_ok=True)
    
    def _load_configuration(self) -> None:
        """Load LUKS configuration from file"""
        if self.config_file.exists():
            try:
                with open(self.config_file, 'r') as f:
                    config = json.load(f)
                
                # Load devices
                for device_name, device_data in config.get('devices', {}).items():
                    self.devices[device_name] = LUKSDevice(**device_data)
                
                # Load remote unlock configuration
                if 'remote_unlock' in config:
                    self.remote_config = RemoteUnlockConfig(**config['remote_unlock'])
                
                self.logger.info(f"Loaded configuration for {len(self.devices)} LUKS devices")
                
            except Exception as e:
                self.logger.error(f"Failed to load configuration: {e}")
    
    def save_configuration(self) -> None:
        """Save current configuration to file"""
        config = {
            'devices': {name: asdict(device) for name, device in self.devices.items()},
            'remote_unlock': asdict(self.remote_config) if self.remote_config else None
        }
        
        with open(self.config_file, 'w') as f:
            json.dump(config, f, indent=2)
        
        # Secure the configuration file
        self.config_file.chmod(0o600)
        self.logger.info("Configuration saved")
    
    def generate_secure_key(self, length: int = 32) -> bytes:
        """Generate cryptographically secure key"""
        return secrets.token_bytes(length)
    
    def derive_key_from_password(self, password: str, salt: bytes) -> bytes:
        """Derive encryption key from password using PBKDF2"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        return kdf.derive(password.encode())
    
    def encrypt_key(self, key: bytes, password: str) -> Tuple[bytes, bytes]:
        """Encrypt key with password"""
        salt = secrets.token_bytes(16)
        derived_key = self.derive_key_from_password(password, salt)
        
        f = cryptography.fernet.Fernet(base64.urlsafe_b64encode(derived_key))
        encrypted_key = f.encrypt(key)
        
        return encrypted_key, salt
    
    def decrypt_key(self, encrypted_key: bytes, password: str, salt: bytes) -> bytes:
        """Decrypt key with password"""
        derived_key = self.derive_key_from_password(password, salt)
        
        f = cryptography.fernet.Fernet(base64.urlsafe_b64encode(derived_key))
        return f.decrypt(encrypted_key)
    
    def create_luks_device(self, device_name: str, device_path: str, 
                          mount_point: str, password: str,
                          cipher: str = "aes-xts-plain64") -> bool:
        """Create and format LUKS device"""
        try:
            self.logger.info(f"Creating LUKS device {device_name} on {device_path}")
            
            # Generate secure key
            key = self.generate_secure_key()
            key_file = self.key_store_path / f"{device_name}.key"
            
            # Encrypt and store key
            encrypted_key, salt = self.encrypt_key(key, password)
            key_data = {
                'encrypted_key': base64.b64encode(encrypted_key).decode(),
                'salt': base64.b64encode(salt).decode(),
                'created': time.time()
            }
            
            with open(key_file, 'w') as f:
                json.dump(key_data, f)
            key_file.chmod(0o600)
            
            # Create temporary key file for LUKS formatting
            temp_key_file = f"/tmp/luks_key_{device_name}"
            with open(temp_key_file, 'wb') as f:
                f.write(key)
            Path(temp_key_file).chmod(0o600)
            
            try:
                # Format LUKS device
                cmd = [
                    'cryptsetup', 'luksFormat',
                    '--type', 'luks2',
                    '--cipher', cipher,
                    '--key-size', '512',
                    '--hash', 'sha256',
                    '--iter-time', '2000',
                    '--use-random',
                    '--key-file', temp_key_file,
                    device_path
                ]
                
                result = subprocess.run(cmd, capture_output=True, text=True)
                if result.returncode != 0:
                    raise subprocess.CalledProcessError(result.returncode, cmd, result.stderr)
                
                # Backup LUKS header
                backup_file = self.backup_path / f"{device_name}_header.img"
                subprocess.run([
                    'cryptsetup', 'luksHeaderBackup', device_path,
                    '--header-backup-file', str(backup_file)
                ], check=True)
                
                # Create device configuration
                mapper_name = f"luks_{device_name}"
                luks_device = LUKSDevice(
                    device_path=device_path,
                    mount_point=mount_point,
                    mapper_name=mapper_name,
                    key_slot=0,
                    cipher=cipher,
                    backup_header_path=str(backup_file)
                )
                
                self.devices[device_name] = luks_device
                self.save_configuration()
                
                self.logger.info(f"LUKS device {device_name} created successfully")
                return True
                
            finally:
                # Clean up temporary key file
                Path(temp_key_file).unlink(missing_ok=True)
                
        except Exception as e:
            self.logger.error(f"Failed to create LUKS device {device_name}: {e}")
            return False
    
    def unlock_device(self, device_name: str, password: str) -> bool:
        """Unlock LUKS device"""
        if device_name not in self.devices:
            self.logger.error(f"Device {device_name} not found")
            return False
        
        device = self.devices[device_name]
        
        try:
            # Load and decrypt key
            key_file = self.key_store_path / f"{device_name}.key"
            if not key_file.exists():
                self.logger.error(f"Key file for {device_name} not found")
                return False
            
            with open(key_file, 'r') as f:
                key_data = json.load(f)
            
            encrypted_key = base64.b64decode(key_data['encrypted_key'])
            salt = base64.b64decode(key_data['salt'])
            
            key = self.decrypt_key(encrypted_key, password, salt)
            
            # Create temporary key file
            temp_key_file = f"/tmp/luks_unlock_{device_name}"
            with open(temp_key_file, 'wb') as f:
                f.write(key)
            Path(temp_key_file).chmod(0o600)
            
            try:
                # Unlock device
                cmd = [
                    'cryptsetup', 'luksOpen',
                    device.device_path,
                    device.mapper_name,
                    '--key-file', temp_key_file
                ]
                
                result = subprocess.run(cmd, capture_output=True, text=True)
                if result.returncode != 0:
                    raise subprocess.CalledProcessError(result.returncode, cmd, result.stderr)
                
                self.logger.info(f"LUKS device {device_name} unlocked successfully")
                return True
                
            finally:
                # Clean up temporary key file
                Path(temp_key_file).unlink(missing_ok=True)
                
        except Exception as e:
            self.logger.error(f"Failed to unlock LUKS device {device_name}: {e}")
            return False
    
    def lock_device(self, device_name: str) -> bool:
        """Lock LUKS device"""
        if device_name not in self.devices:
            self.logger.error(f"Device {device_name} not found")
            return False
        
        device = self.devices[device_name]
        
        try:
            # Check if device is mounted and unmount if necessary
            result = subprocess.run(['findmnt', '-n', device.mount_point], 
                                  capture_output=True, text=True)
            if result.returncode == 0:
                subprocess.run(['umount', device.mount_point], check=True)
            
            # Close LUKS device
            subprocess.run(['cryptsetup', 'luksClose', device.mapper_name], check=True)
            
            self.logger.info(f"LUKS device {device_name} locked successfully")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to lock LUKS device {device_name}: {e}")
            return False
    
    def add_key_slot(self, device_name: str, existing_password: str, 
                     new_password: str, slot: Optional[int] = None) -> bool:
        """Add new key slot to LUKS device"""
        if device_name not in self.devices:
            return False
        
        device = self.devices[device_name]
        
        try:
            # Load existing key
            key_file = self.key_store_path / f"{device_name}.key"
            with open(key_file, 'r') as f:
                key_data = json.load(f)
            
            encrypted_key = base64.b64decode(key_data['encrypted_key'])
            salt = base64.b64decode(key_data['salt'])
            existing_key = self.decrypt_key(encrypted_key, existing_password, salt)
            
            # Generate new key
            new_key = self.generate_secure_key()
            
            # Create temporary key files
            existing_key_file = f"/tmp/luks_existing_{device_name}"
            new_key_file = f"/tmp/luks_new_{device_name}"
            
            with open(existing_key_file, 'wb') as f:
                f.write(existing_key)
            with open(new_key_file, 'wb') as f:
                f.write(new_key)
            
            Path(existing_key_file).chmod(0o600)
            Path(new_key_file).chmod(0o600)
            
            try:
                # Add key slot
                cmd = ['cryptsetup', 'luksAddKey', device.device_path, 
                      new_key_file, '--key-file', existing_key_file]
                
                if slot is not None:
                    cmd.extend(['--key-slot', str(slot)])
                
                subprocess.run(cmd, check=True)
                
                # Store new key
                new_encrypted_key, new_salt = self.encrypt_key(new_key, new_password)
                new_key_data = {
                    'encrypted_key': base64.b64encode(new_encrypted_key).decode(),
                    'salt': base64.b64encode(new_salt).decode(),
                    'created': time.time(),
                    'slot': slot
                }
                
                new_key_file_path = self.key_store_path / f"{device_name}_slot_{slot or 'auto'}.key"
                with open(new_key_file_path, 'w') as f:
                    json.dump(new_key_data, f)
                new_key_file_path.chmod(0o600)
                
                self.logger.info(f"Added key slot for device {device_name}")
                return True
                
            finally:
                # Clean up temporary files
                Path(existing_key_file).unlink(missing_ok=True)
                Path(new_key_file).unlink(missing_ok=True)
                
        except Exception as e:
            self.logger.error(f"Failed to add key slot for device {device_name}: {e}")
            return False
    
    def rotate_keys(self, device_name: str, old_password: str, new_password: str) -> bool:
        """Rotate encryption keys for device"""
        try:
            # Add new key in next available slot
            if not self.add_key_slot(device_name, old_password, new_password):
                return False
            
            # Remove old key (slot 0)
            device = self.devices[device_name]
            
            # Create new key file
            key_file = self.key_store_path / f"{device_name}.key"
            with open(key_file, 'r') as f:
                key_data = json.load(f)
            
            encrypted_key = base64.b64decode(key_data['encrypted_key'])
            salt = base64.b64decode(key_data['salt'])
            new_key = self.decrypt_key(encrypted_key, new_password, salt)
            
            new_key_file = f"/tmp/luks_rotate_{device_name}"
            with open(new_key_file, 'wb') as f:
                f.write(new_key)
            Path(new_key_file).chmod(0o600)
            
            try:
                # Remove old key slot
                subprocess.run([
                    'cryptsetup', 'luksRemoveKey', device.device_path,
                    '--key-file', new_key_file, '--key-slot', '0'
                ], check=True)
                
                self.logger.info(f"Key rotation completed for device {device_name}")
                return True
                
            finally:
                Path(new_key_file).unlink(missing_ok=True)
                
        except Exception as e:
            self.logger.error(f"Failed to rotate keys for device {device_name}: {e}")
            return False
    
    def backup_luks_header(self, device_name: str) -> Optional[str]:
        """Backup LUKS header"""
        if device_name not in self.devices:
            return None
        
        device = self.devices[device_name]
        backup_file = self.backup_path / f"{device_name}_header_{int(time.time())}.img"
        
        try:
            subprocess.run([
                'cryptsetup', 'luksHeaderBackup', device.device_path,
                '--header-backup-file', str(backup_file)
            ], check=True)
            
            self.logger.info(f"LUKS header backed up to {backup_file}")
            return str(backup_file)
            
        except Exception as e:
            self.logger.error(f"Failed to backup LUKS header for {device_name}: {e}")
            return None
    
    def get_device_status(self, device_name: str) -> Dict:
        """Get comprehensive device status"""
        if device_name not in self.devices:
            return {"error": "Device not found"}
        
        device = self.devices[device_name]
        status = {
            "device_name": device_name,
            "device_path": device.device_path,
            "mapper_name": device.mapper_name,
            "cipher": device.cipher,
            "unlocked": False,
            "mounted": False,
            "key_slots": []
        }
        
        try:
            # Check if device is unlocked
            result = subprocess.run(['cryptsetup', 'status', device.mapper_name], 
                                  capture_output=True, text=True)
            status["unlocked"] = result.returncode == 0
            
            # Check if mounted
            if status["unlocked"]:
                result = subprocess.run(['findmnt', '-n', device.mount_point], 
                                      capture_output=True, text=True)
                status["mounted"] = result.returncode == 0
            
            # Get key slot information
            result = subprocess.run(['cryptsetup', 'luksDump', device.device_path], 
                                  capture_output=True, text=True)
            if result.returncode == 0:
                for line in result.stdout.split('\n'):
                    if 'Key Slot' in line and 'ENABLED' in line:
                        slot_num = line.split(':')[0].split()[-1]
                        status["key_slots"].append(int(slot_num))
            
        except Exception as e:
            status["error"] = str(e)
        
        return status

# Example enterprise usage
def setup_enterprise_luks():
    """Example enterprise LUKS setup"""
    manager = LUKSManager()
    
    # Configure remote unlock
    remote_config = RemoteUnlockConfig(
        dropbear_port=2222,
        network_config={
            "ipv4": "192.168.1.100/24",
            "gateway": "192.168.1.1",
            "ipv6": "2001:db8::100/64",
            "ipv6_gateway": "2001:db8::1"
        },
        timeout_seconds=300,
        monitoring_enabled=True
    )
    
    manager.remote_config = remote_config
    manager.save_configuration()
    
    return manager

if __name__ == "__main__":
    # Demonstration
    manager = setup_enterprise_luks()
    
    # Example device creation (commented out for safety)
    # manager.create_luks_device("data", "/dev/sdb1", "/mnt/secure", "secure_password")
    
    print("Enterprise LUKS Manager initialized")
    for device_name in manager.devices:
        status = manager.get_device_status(device_name)
        print(f"Device {device_name}: {status}")

Enterprise Remote Unlock Infrastructure

Advanced Dropbear SSH Configuration

Production Remote Unlock System

#!/bin/bash
# Enterprise LUKS Remote Unlock Infrastructure Deployment

set -euo pipefail

# Configuration
ENTERPRISE_CONFIG="/etc/luks/enterprise-remote.conf"
SSH_KEYS_DIR="/etc/luks/ssh-keys"
DROPBEAR_CONFIG="/etc/dropbear-initramfs/config"
INITRAMFS_CONFIG="/etc/initramfs-tools/initramfs.conf"
GRUB_CONFIG="/etc/default/grub"
LOG_DIR="/var/log/luks-remote"

# Network configuration
DROPBEAR_PORT="${DROPBEAR_PORT:-2222}"
BACKUP_PORT="${BACKUP_PORT:-2223}"
MANAGEMENT_NETWORK="${MANAGEMENT_NETWORK:-192.168.100.0/24}"
IPV6_ENABLED="${IPV6_ENABLED:-true}"

# Security settings
MAX_AUTH_TRIES="${MAX_AUTH_TRIES:-3}"
LOGIN_TIMEOUT="${LOGIN_TIMEOUT:-300}"
IDLE_TIMEOUT="${IDLE_TIMEOUT:-600}"
KEY_ROTATION_DAYS="${KEY_ROTATION_DAYS:-90}"

# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

# Logging
log() { echo -e "${BLUE}[$(date '+%Y-%m-%d %H:%M:%S')]${NC} $*" | tee -a "$LOG_DIR/deployment.log"; }
warn() { echo -e "${YELLOW}[WARN]${NC} $*" | tee -a "$LOG_DIR/deployment.log"; }
error() { echo -e "${RED}[ERROR]${NC} $*" | tee -a "$LOG_DIR/deployment.log"; exit 1; }
success() { echo -e "${GREEN}[SUCCESS]${NC} $*" | tee -a "$LOG_DIR/deployment.log"; }

# Setup environment
setup_environment() {
    log "Setting up enterprise remote unlock environment..."
    
    mkdir -p "$LOG_DIR" "$SSH_KEYS_DIR" "$(dirname "$ENTERPRISE_CONFIG")"
    chmod 700 "$SSH_KEYS_DIR"
    chmod 755 "$LOG_DIR"
    
    # Install required packages
    if command -v apt >/dev/null 2>&1; then
        apt update
        apt install -y dropbear-initramfs cryptsetup busybox openssh-client
    elif command -v yum >/dev/null 2>&1; then
        yum install -y cryptsetup openssh-clients
        warn "dropbear-initramfs not available on RHEL-based systems"
    fi
    
    success "Environment setup completed"
}

# Generate enterprise SSH key infrastructure
setup_ssh_infrastructure() {
    log "Setting up SSH key infrastructure..."
    
    # Create SSH key for remote unlock
    local unlock_key="$SSH_KEYS_DIR/luks_unlock_rsa"
    if [[ ! -f "$unlock_key" ]]; then
        ssh-keygen -t rsa -b 4096 -f "$unlock_key" -N "" -C "luks-unlock-$(hostname)"
        chmod 600 "$unlock_key"
        chmod 644 "$unlock_key.pub"
        
        log "Generated LUKS unlock SSH key"
    fi
    
    # Create backup key
    local backup_key="$SSH_KEYS_DIR/luks_backup_rsa"
    if [[ ! -f "$backup_key" ]]; then
        ssh-keygen -t rsa -b 4096 -f "$backup_key" -N "" -C "luks-backup-$(hostname)"
        chmod 600 "$backup_key"
        chmod 644 "$backup_key.pub"
        
        log "Generated LUKS backup SSH key"
    fi
    
    # Create emergency access key
    local emergency_key="$SSH_KEYS_DIR/luks_emergency_rsa"
    if [[ ! -f "$emergency_key" ]]; then
        ssh-keygen -t rsa -b 4096 -f "$emergency_key" -N "" -C "luks-emergency-$(hostname)"
        chmod 600 "$emergency_key"
        chmod 644 "$emergency_key.pub"
        
        log "Generated LUKS emergency SSH key"
    fi
    
    success "SSH infrastructure setup completed"
}

# Configure advanced dropbear settings
configure_dropbear() {
    log "Configuring enterprise dropbear settings..."
    
    # Backup original configuration
    if [[ -f "$DROPBEAR_CONFIG" ]]; then
        cp "$DROPBEAR_CONFIG" "$DROPBEAR_CONFIG.backup.$(date +%Y%m%d_%H%M%S)"
    fi
    
    # Create advanced dropbear configuration
    cat > "$DROPBEAR_CONFIG" << EOF
# Enterprise Dropbear Configuration
# Generated: $(date)

# Primary SSH port
DROPBEAR_OPTIONS="-I $IDLE_TIMEOUT -j -k -p $DROPBEAR_PORT -s -c cryptroot-unlock"

# Backup SSH port (different configuration)
DROPBEAR_OPTIONS2="-I $IDLE_TIMEOUT -j -k -p $BACKUP_PORT -s"

# Security settings
DROPBEAR_RECEIVE_WINDOW=65536
DROPBEAR_TRANSMIT_WINDOW=65536

# Logging and monitoring
DROPBEAR_EXTRA_ARGS="-F -E"
EOF

    # Configure authorized keys with restrictions
    local auth_keys="/etc/dropbear-initramfs/authorized_keys"
    
    cat > "$auth_keys" << EOF
# Enterprise LUKS Remote Unlock Authorized Keys
# Generated: $(date)

# Primary unlock key (restricted to cryptroot-unlock command)
command="cryptroot-unlock",no-port-forwarding,no-agent-forwarding,no-x11-forwarding $(cat "$SSH_KEYS_DIR/luks_unlock_rsa.pub")

# Backup key (restricted shell access)
command="/bin/sh",no-port-forwarding,no-agent-forwarding,no-x11-forwarding $(cat "$SSH_KEYS_DIR/luks_backup_rsa.pub")

# Emergency key (full access with source restrictions)
from="$MANAGEMENT_NETWORK",no-port-forwarding,no-agent-forwarding,no-x11-forwarding $(cat "$SSH_KEYS_DIR/luks_emergency_rsa.pub")
EOF

    chmod 600 "$auth_keys"
    
    success "Dropbear configuration completed"
}

# Configure advanced networking
configure_networking() {
    log "Configuring enterprise networking for remote unlock..."
    
    # IPv4 configuration
    local ipv4_config=""
    if [[ -n "${STATIC_IPV4:-}" ]]; then
        ipv4_config="IP=${STATIC_IPV4}::${IPV4_GATEWAY:-}:${IPV4_NETMASK:-255.255.255.0}:$(hostname)"
    fi
    
    # IPv6 configuration
    local ipv6_config=""
    if [[ "$IPV6_ENABLED" == "true" && -n "${STATIC_IPV6:-}" ]]; then
        ipv6_config="ipv6=addr=${STATIC_IPV6},gw=${IPV6_GATEWAY:-},iface=${NETWORK_INTERFACE:-eth0}"
    fi
    
    # Update initramfs configuration
    cat >> "$INITRAMFS_CONFIG" << EOF

# Enterprise LUKS Remote Unlock Network Configuration
# Generated: $(date)

# IPv4 Configuration
$ipv4_config

# Network interface
DEVICE=$NETWORK_INTERFACE

# DNS servers
NFSROOT=auto

# Boot timeout
BOOT=local
EOF

    # Configure GRUB for IPv6 if enabled
    if [[ "$IPV6_ENABLED" == "true" && -n "${STATIC_IPV6:-}" ]]; then
        update_grub_ipv6_config
    fi
    
    success "Network configuration completed"
}

# Update GRUB configuration for IPv6
update_grub_ipv6_config() {
    log "Updating GRUB configuration for IPv6..."
    
    # Backup GRUB configuration
    cp "$GRUB_CONFIG" "$GRUB_CONFIG.backup.$(date +%Y%m%d_%H%M%S)"
    
    # Add IPv6 configuration to GRUB command line
    local ipv6_params="ipv6=addr=${STATIC_IPV6},gw=${IPV6_GATEWAY:-},iface=${NETWORK_INTERFACE:-eth0},forwarding=0,accept_ra=0"
    
    if grep -q "GRUB_CMDLINE_LINUX=" "$GRUB_CONFIG"; then
        sed -i "s/GRUB_CMDLINE_LINUX=\"/GRUB_CMDLINE_LINUX=\"$ipv6_params /" "$GRUB_CONFIG"
    else
        echo "GRUB_CMDLINE_LINUX=\"$ipv6_params\"" >> "$GRUB_CONFIG"
    fi
    
    # Install IPv6 initramfs scripts
    install_ipv6_scripts
    
    success "GRUB IPv6 configuration updated"
}

# Install IPv6 initramfs scripts
install_ipv6_scripts() {
    log "Installing IPv6 initramfs scripts..."
    
    # Create IPv6 hook script
    cat > "/etc/initramfs-tools/hooks/ipv6" << 'EOF'
#!/bin/sh
# IPv6 Support Hook for initramfs

PREREQ=""

prereqs() {
    echo "$PREREQ"
}

case $1 in
prereqs)
    prereqs
    exit 0
    ;;
esac

. /usr/share/initramfs-tools/hook-functions

# Copy IPv6 modules
copy_modules_dir kernel/net/ipv6

# Copy IPv6 utilities
copy_exec /sbin/ip
copy_exec /bin/ping6

exit 0
EOF

    # Create IPv6 init script
    cat > "/etc/initramfs-tools/scripts/init-premount/ipv6" << 'EOF'
#!/bin/sh

PREREQ=""

prereqs() {
    echo "$PREREQ"
}

case $1 in
prereqs)
    prereqs
    exit 0
    ;;
esac

# Parse IPv6 parameters from kernel command line
for x in $(cat /proc/cmdline); do
    case $x in
    ipv6=*)
        IPV6_CONFIG="${x#ipv6=}"
        ;;
    esac
done

if [ -n "$IPV6_CONFIG" ]; then
    # Load IPv6 module
    modprobe ipv6
    
    # Parse configuration
    OLDIFS="$IFS"
    IFS=","
    for param in $IPV6_CONFIG; do
        case $param in
        addr=*)
            IPV6_ADDR="${param#addr=}"
            ;;
        gw=*)
            IPV6_GW="${param#gw=}"
            ;;
        iface=*)
            IPV6_IFACE="${param#iface=}"
            ;;
        esac
    done
    IFS="$OLDIFS"
    
    # Configure IPv6
    if [ -n "$IPV6_ADDR" ] && [ -n "$IPV6_IFACE" ]; then
        ip link set "$IPV6_IFACE" up
        ip -6 addr add "$IPV6_ADDR" dev "$IPV6_IFACE"
        
        if [ -n "$IPV6_GW" ]; then
            ip -6 route add default via "$IPV6_GW" dev "$IPV6_IFACE"
        fi
        
        echo "IPv6 configured: $IPV6_ADDR on $IPV6_IFACE"
    fi
fi
EOF

    chmod +x "/etc/initramfs-tools/hooks/ipv6"
    chmod +x "/etc/initramfs-tools/scripts/init-premount/ipv6"
    
    success "IPv6 scripts installed"
}

# Create advanced monitoring system
setup_monitoring() {
    log "Setting up remote unlock monitoring..."
    
    # Create monitoring script
    cat > "/usr/local/bin/luks-remote-monitor.sh" << 'EOF'
#!/bin/bash
# Enterprise LUKS Remote Unlock Monitoring

METRICS_FILE="/var/log/luks-remote/metrics.log"
ALERT_SCRIPT="/usr/local/bin/luks-alert.sh"

log_metric() {
    echo "$(date '+%Y-%m-%d %H:%M:%S'),$*" >> "$METRICS_FILE"
}

check_dropbear_status() {
    # Check if dropbear is configured
    if [[ -f "/etc/dropbear-initramfs/config" ]]; then
        echo "DROPBEAR_CONFIG,OK"
        log_metric "dropbear_config,ok"
    else
        echo "DROPBEAR_CONFIG,ERROR"
        log_metric "dropbear_config,error"
        return 1
    fi
    
    # Check authorized keys
    if [[ -f "/etc/dropbear-initramfs/authorized_keys" ]]; then
        local key_count=$(grep -c "ssh-rsa\|ssh-ed25519" "/etc/dropbear-initramfs/authorized_keys" 2>/dev/null || echo "0")
        echo "SSH_KEYS,$key_count"
        log_metric "ssh_keys,$key_count"
        
        if [[ $key_count -eq 0 ]]; then
            return 1
        fi
    else
        echo "SSH_KEYS,ERROR"
        log_metric "ssh_keys,error"
        return 1
    fi
    
    return 0
}

check_luks_devices() {
    local device_count=0
    local unlocked_count=0
    
    # Count LUKS devices
    for device in $(blkid | grep crypto_LUKS | cut -d: -f1); do
        ((device_count++))
        
        # Check if device is unlocked
        local mapper_name=$(basename "$device")
        if cryptsetup status "luks_$mapper_name" >/dev/null 2>&1; then
            ((unlocked_count++))
        fi
    done
    
    echo "LUKS_DEVICES,$device_count"
    echo "LUKS_UNLOCKED,$unlocked_count"
    log_metric "luks_devices,$device_count"
    log_metric "luks_unlocked,$unlocked_count"
    
    return 0
}

check_network_config() {
    # Check initramfs network configuration
    if grep -q "IP=" "/etc/initramfs-tools/initramfs.conf" 2>/dev/null; then
        echo "NETWORK_CONFIG,OK"
        log_metric "network_config,ok"
    else
        echo "NETWORK_CONFIG,DHCP"
        log_metric "network_config,dhcp"
    fi
    
    # Check IPv6 configuration
    if [[ -f "/etc/initramfs-tools/scripts/init-premount/ipv6" ]]; then
        echo "IPV6_CONFIG,OK"
        log_metric "ipv6_config,ok"
    else
        echo "IPV6_CONFIG,NONE"
        log_metric "ipv6_config,none"
    fi
    
    return 0
}

check_key_rotation() {
    local key_dir="/etc/luks/ssh-keys"
    local needs_rotation=false
    
    if [[ -d "$key_dir" ]]; then
        for key_file in "$key_dir"/*.pub; do
            if [[ -f "$key_file" ]]; then
                local age_days=$(( ($(date +%s) - $(stat -c %Y "$key_file")) / 86400 ))
                
                if [[ $age_days -gt ${KEY_ROTATION_DAYS:-90} ]]; then
                    needs_rotation=true
                    break
                fi
            fi
        done
    fi
    
    if [[ "$needs_rotation" == "true" ]]; then
        echo "KEY_ROTATION,NEEDED"
        log_metric "key_rotation,needed"
        return 1
    else
        echo "KEY_ROTATION,OK"
        log_metric "key_rotation,ok"
        return 0
    fi
}

# Main monitoring execution
failed_checks=0

if ! check_dropbear_status; then
    ((failed_checks++))
fi

check_luks_devices
check_network_config

if ! check_key_rotation; then
    ((failed_checks++))
fi

# Generate alert if multiple failures
if [[ $failed_checks -gt 1 ]]; then
    if [[ -x "$ALERT_SCRIPT" ]]; then
        "$ALERT_SCRIPT" "CRITICAL" "Multiple LUKS remote unlock failures detected"
    fi
    logger -p crit "LUKS Remote Unlock: Multiple system failures detected"
fi

exit $failed_checks
EOF

    chmod +x "/usr/local/bin/luks-remote-monitor.sh"
    
    # Create alert script
    cat > "/usr/local/bin/luks-alert.sh" << 'EOF'
#!/bin/bash
# LUKS Remote Unlock Alert Handler

SEVERITY="$1"
MESSAGE="$2"

# Log to syslog
logger -p daemon."$(echo "$SEVERITY" | tr '[:upper:]' '[:lower:]')" "LUKS Remote Unlock [$SEVERITY]: $MESSAGE"

# Send email if configured
if [[ -n "${ALERT_EMAIL:-}" ]] && command -v mail >/dev/null 2>&1; then
    echo "$MESSAGE" | mail -s "LUKS Alert [$SEVERITY]" "$ALERT_EMAIL"
fi

# Send to monitoring system (customize as needed)
if [[ -n "${MONITORING_WEBHOOK:-}" ]]; then
    curl -X POST -H "Content-Type: application/json" \
        -d "{\"severity\":\"$SEVERITY\",\"message\":\"$MESSAGE\",\"timestamp\":\"$(date -Iseconds)\"}" \
        "$MONITORING_WEBHOOK" 2>/dev/null || true
fi
EOF

    chmod +x "/usr/local/bin/luks-alert.sh"
    
    # Create systemd timer
    cat > "/etc/systemd/system/luks-remote-monitor.service" << EOF
[Unit]
Description=LUKS Remote Unlock Monitoring
After=network.target

[Service]
Type=oneshot
ExecStart=/usr/local/bin/luks-remote-monitor.sh
User=root
StandardOutput=journal
StandardError=journal
EOF

    cat > "/etc/systemd/system/luks-remote-monitor.timer" << EOF
[Unit]
Description=Run LUKS remote unlock monitoring every 10 minutes
Requires=luks-remote-monitor.service

[Timer]
OnCalendar=*:*/10:00
Persistent=true

[Install]
WantedBy=timers.target
EOF

    systemctl daemon-reload
    systemctl enable luks-remote-monitor.timer
    systemctl start luks-remote-monitor.timer
    
    success "Monitoring system setup completed"
}

# Create emergency recovery procedures
create_recovery_procedures() {
    log "Creating emergency recovery procedures..."
    
    local recovery_dir="/etc/luks/recovery"
    mkdir -p "$recovery_dir"
    chmod 700 "$recovery_dir"
    
    # Create recovery documentation
    cat > "$recovery_dir/EMERGENCY_PROCEDURES.md" << 'EOF'
# LUKS Remote Unlock Emergency Recovery Procedures

## Connection Issues

### Cannot Connect to Dropbear SSH
1. Verify network connectivity to the server
2. Check if server is powered on and booting
3. Try backup SSH port (default 2223)
4. Use emergency SSH key if primary key fails
5. Check firewall rules on network equipment

### Permission Denied Errors
1. Verify SSH key is correct and matches authorized_keys
2. Try using RSA algorithm flag: `ssh -o PubkeyAcceptedAlgorithms=+ssh-rsa`
3. Use backup or emergency SSH key
4. Check key file permissions (should be 600)

### Network Configuration Issues
1. Try DHCP if static IP fails
2. Verify IPv6 configuration if using IPv6
3. Check network interface name in configuration
4. Verify gateway and DNS settings

## LUKS Issues

### Cannot Unlock Device
1. Verify correct passphrase/key
2. Check LUKS header integrity: `cryptsetup luksDump /dev/sdX`
3. Try different key slot if available
4. Restore from header backup if corrupted

### Device Not Found
1. Check device path and partition layout
2. Verify initramfs includes correct drivers
3. Check for hardware failures
4. Review boot logs for device recognition

## Recovery Commands

### Emergency Unlock
```bash
# Connect to dropbear
ssh -o PubkeyAcceptedAlgorithms=+ssh-rsa -p 2222 root@server-ip

# Manual unlock (if cryptroot-unlock fails)
cryptsetup luksOpen /dev/sdX1 luks_root
exit

Header Recovery

# Restore header from backup
cryptsetup luksHeaderRestore /dev/sdX --header-backup-file /path/to/header.img

Network Recovery

# Manual IP configuration in initramfs
ip addr add 192.168.1.100/24 dev eth0
ip route add default via 192.168.1.1

Contact Information

  • Primary Admin: [ADMIN_EMAIL]

  • Backup Admin: [BACKUP_EMAIL]

  • Emergency Contact: [EMERGENCY_PHONE] EOF

    Create quick reference card

    cat > “$recovery_dir/QUICK_REFERENCE.txt” « EOF LUKS Remote Unlock Quick Reference =================================

SSH Connection: ssh -o PubkeyAcceptedAlgorithms=+ssh-rsa -p 2222 root@[SERVER_IP]

Backup Port: ssh -o PubkeyAcceptedAlgorithms=+ssh-rsa -p 2223 root@[SERVER_IP]

Unlock Command: cryptroot-unlock

Manual Unlock: cryptsetup luksOpen /dev/[DEVICE] [MAPPER_NAME]

Emergency Keys Location: /etc/luks/ssh-keys/

Configuration Files:

  • /etc/dropbear-initramfs/config
  • /etc/dropbear-initramfs/authorized_keys
  • /etc/initramfs-tools/initramfs.conf
  • /etc/default/grub

Recovery Scripts: /etc/luks/recovery/ EOF

success "Emergency recovery procedures created"

}

Test remote unlock functionality

test_remote_unlock() { log “Testing remote unlock functionality…”

# Check dropbear configuration
if [[ ! -f "$DROPBEAR_CONFIG" ]]; then
    error "Dropbear configuration not found"
fi

# Check authorized keys
if [[ ! -f "/etc/dropbear-initramfs/authorized_keys" ]]; then
    error "Dropbear authorized_keys file not found"
fi

# Validate SSH keys
local key_count=0
for key_file in "$SSH_KEYS_DIR"/*.pub; do
    if [[ -f "$key_file" ]]; then
        if ssh-keygen -l -f "$key_file" >/dev/null 2>&1; then
            ((key_count++))
            success "SSH key validation passed: $(basename "$key_file")"
        else
            warn "SSH key validation failed: $(basename "$key_file")"
        fi
    fi
done

if [[ $key_count -eq 0 ]]; then
    error "No valid SSH keys found"
fi

# Check network configuration
if grep -q "IP=" "$INITRAMFS_CONFIG" 2>/dev/null; then
    success "Static IP configuration found"
else
    warn "No static IP configuration (will use DHCP)"
fi

# Check IPv6 scripts
if [[ -f "/etc/initramfs-tools/scripts/init-premount/ipv6" ]]; then
    success "IPv6 configuration scripts installed"
else
    warn "IPv6 scripts not installed"
fi

success "Remote unlock functionality tests completed"

}

Update initramfs and GRUB

update_boot_configuration() { log “Updating boot configuration…”

# Update initramfs
update-initramfs -u

# Update GRUB
update-grub

success "Boot configuration updated"

}

Generate deployment report

generate_deployment_report() { local report_file="$LOG_DIR/deployment_report_$(date +%Y%m%d_%H%M%S).txt"

log "Generating deployment report..."

{
    echo "ENTERPRISE LUKS REMOTE UNLOCK DEPLOYMENT REPORT"
    echo "=============================================="
    echo "Generated: $(date)"
    echo "Hostname: $(hostname)"
    echo ""
    
    echo "Configuration Summary:"
    echo "====================="
    echo "Dropbear Port: $DROPBEAR_PORT"
    echo "Backup Port: $BACKUP_PORT"
    echo "IPv6 Enabled: $IPV6_ENABLED"
    echo "Management Network: $MANAGEMENT_NETWORK"
    echo "Key Rotation Days: $KEY_ROTATION_DAYS"
    echo ""
    
    echo "SSH Keys Generated:"
    echo "=================="
    for key_file in "$SSH_KEYS_DIR"/*.pub; do
        if [[ -f "$key_file" ]]; then
            echo "$(basename "$key_file"): $(ssh-keygen -l -f "$key_file" 2>/dev/null || echo 'Invalid')"
        fi
    done
    echo ""
    
    echo "Network Configuration:"
    echo "====================="
    if [[ -n "${STATIC_IPV4:-}" ]]; then
        echo "IPv4: $STATIC_IPV4"
    else
        echo "IPv4: DHCP"
    fi
    
    if [[ "$IPV6_ENABLED" == "true" && -n "${STATIC_IPV6:-}" ]]; then
        echo "IPv6: $STATIC_IPV6"
    else
        echo "IPv6: Not configured"
    fi
    echo ""
    
    echo "Files Created:"
    echo "============="
    echo "- $DROPBEAR_CONFIG"
    echo "- /etc/dropbear-initramfs/authorized_keys"
    echo "- SSH keys in $SSH_KEYS_DIR"
    echo "- Monitoring scripts in /usr/local/bin/"
    echo "- Recovery procedures in /etc/luks/recovery/"
    echo ""
    
    echo "Next Steps:"
    echo "==========="
    echo "1. Reboot the system to test remote unlock"
    echo "2. Verify SSH connectivity during boot"
    echo "3. Test LUKS unlock procedure"
    echo "4. Configure monitoring alerts"
    echo "5. Train operations staff on procedures"
    
} > "$report_file"

cat "$report_file"
success "Deployment report generated: $report_file"

}

Main deployment function

main() { case “${1:-deploy}” in “deploy”) log “Starting enterprise LUKS remote unlock deployment…” setup_environment setup_ssh_infrastructure configure_dropbear configure_networking setup_monitoring create_recovery_procedures test_remote_unlock update_boot_configuration generate_deployment_report success “Enterprise LUKS remote unlock deployment completed!” ;; “test”) test_remote_unlock ;; “monitor”) /usr/local/bin/luks-remote-monitor.sh ;; “recover”) cat “/etc/luks/recovery/QUICK_REFERENCE.txt” ;; *) echo “Usage: $0 {deploy|test|monitor|recover}” echo "" echo “Commands:” echo " deploy - Complete enterprise deployment" echo " test - Test configuration" echo " monitor - Run monitoring check" echo " recover - Show recovery procedures" exit 1 ;; esac }

Check for root privileges

if [[ $EUID -ne 0 ]]; then error “This script must be run as root” fi

main “$@”


# [High Availability and Automated Management](#high-availability-automated-management)

## Enterprise Key Management and Rotation

### Advanced Key Lifecycle Management System

```python
#!/usr/bin/env python3
"""
Enterprise LUKS Key Management and Rotation Framework
"""

import subprocess
import json
import time
import threading
import schedule
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests

@dataclass
class KeyRotationPolicy:
    rotation_interval_days: int = 90
    warning_days_before: int = 7
    max_key_age_days: int = 365
    require_dual_approval: bool = True
    backup_old_keys: bool = True
    notify_on_rotation: bool = True

@dataclass
class KeyMetadata:
    created_date: float
    last_used: float
    slot_number: int
    purpose: str
    creator: str
    approval_required: bool = False
    scheduled_rotation: Optional[float] = None

class EnterpriseKeyManager:
    def __init__(self, config_file: str = "/etc/luks/key-management.json"):
        self.config_file = Path(config_file)
        self.devices: Dict[str, Dict] = {}
        self.rotation_policies: Dict[str, KeyRotationPolicy] = {}
        self.key_metadata: Dict[str, Dict[int, KeyMetadata]] = {}
        
        self.logger = self._setup_logging()
        self.notification_handlers = []
        
        self._load_configuration()
        self._setup_scheduler()
    
    def _setup_logging(self) -> logging.Logger:
        """Setup comprehensive logging"""
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        
        # File handler with rotation
        file_handler = logging.handlers.RotatingFileHandler(
            '/var/log/luks-key-management.log', 
            maxBytes=10*1024*1024, 
            backupCount=5
        )
        file_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(file_formatter)
        
        # Syslog handler for important events
        syslog_handler = logging.handlers.SysLogHandler()
        syslog_formatter = logging.Formatter(
            'LUKS-KeyMgmt[%(process)d]: %(levelname)s - %(message)s'
        )
        syslog_handler.setFormatter(syslog_formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(syslog_handler)
        
        return logger
    
    def _load_configuration(self) -> None:
        """Load key management configuration"""
        if self.config_file.exists():
            try:
                with open(self.config_file, 'r') as f:
                    config = json.load(f)
                
                self.devices = config.get('devices', {})
                
                # Load rotation policies
                for device, policy_data in config.get('rotation_policies', {}).items():
                    self.rotation_policies[device] = KeyRotationPolicy(**policy_data)
                
                # Load key metadata
                for device, metadata in config.get('key_metadata', {}).items():
                    self.key_metadata[device] = {}
                    for slot_str, meta_data in metadata.items():
                        slot = int(slot_str)
                        self.key_metadata[device][slot] = KeyMetadata(**meta_data)
                
                self.logger.info(f"Loaded configuration for {len(self.devices)} devices")
                
            except Exception as e:
                self.logger.error(f"Failed to load configuration: {e}")
    
    def save_configuration(self) -> None:
        """Save current configuration"""
        config = {
            'devices': self.devices,
            'rotation_policies': {
                device: asdict(policy) 
                for device, policy in self.rotation_policies.items()
            },
            'key_metadata': {
                device: {
                    str(slot): asdict(metadata)
                    for slot, metadata in device_metadata.items()
                }
                for device, device_metadata in self.key_metadata.items()
            }
        }
        
        with open(self.config_file, 'w') as f:
            json.dump(config, f, indent=2)
        
        self.config_file.chmod(0o600)
        self.logger.info("Configuration saved")
    
    def add_device(self, device_name: str, device_path: str, 
                   policy: Optional[KeyRotationPolicy] = None) -> None:
        """Add device to key management"""
        self.devices[device_name] = {
            'device_path': device_path,
            'added_date': time.time()
        }
        
        if policy:
            self.rotation_policies[device_name] = policy
        else:
            self.rotation_policies[device_name] = KeyRotationPolicy()
        
        self.key_metadata[device_name] = {}
        
        # Scan existing key slots
        self._scan_existing_keys(device_name)
        
        self.save_configuration()
        self.logger.info(f"Added device {device_name} to key management")
    
    def _scan_existing_keys(self, device_name: str) -> None:
        """Scan and catalog existing LUKS key slots"""
        if device_name not in self.devices:
            return
        
        device_path = self.devices[device_name]['device_path']
        
        try:
            # Get LUKS dump information
            result = subprocess.run(['cryptsetup', 'luksDump', device_path], 
                                  capture_output=True, text=True)
            
            if result.returncode == 0:
                lines = result.stdout.split('\n')
                current_slot = None
                
                for line in lines:
                    if 'Key Slot' in line and 'ENABLED' in line:
                        slot_match = line.split(':')[0].strip()
                        if slot_match.startswith('Key Slot '):
                            slot_num = int(slot_match.split()[-1])
                            
                            # Create metadata for existing key if not present
                            if slot_num not in self.key_metadata[device_name]:
                                self.key_metadata[device_name][slot_num] = KeyMetadata(
                                    created_date=time.time(),
                                    last_used=time.time(),
                                    slot_number=slot_num,
                                    purpose="existing",
                                    creator="system-scan"
                                )
                
                self.logger.info(f"Scanned {len(self.key_metadata[device_name])} key slots for {device_name}")
                
        except Exception as e:
            self.logger.error(f"Failed to scan keys for {device_name}: {e}")
    
    def check_key_rotation_needed(self, device_name: str) -> List[int]:
        """Check which key slots need rotation"""
        if device_name not in self.rotation_policies:
            return []
        
        policy = self.rotation_policies[device_name]
        current_time = time.time()
        slots_needing_rotation = []
        
        for slot, metadata in self.key_metadata.get(device_name, {}).items():
            key_age_days = (current_time - metadata.created_date) / 86400
            
            if key_age_days >= policy.rotation_interval_days:
                slots_needing_rotation.append(slot)
                self.logger.warning(f"Key slot {slot} for {device_name} needs rotation (age: {key_age_days:.1f} days)")
        
        return slots_needing_rotation
    
    def schedule_key_rotation(self, device_name: str, slot: int, 
                             scheduled_time: Optional[float] = None) -> bool:
        """Schedule key rotation for specific slot"""
        if scheduled_time is None:
            scheduled_time = time.time() + 3600  # Default: 1 hour from now
        
        if device_name in self.key_metadata and slot in self.key_metadata[device_name]:
            self.key_metadata[device_name][slot].scheduled_rotation = scheduled_time
            self.save_configuration()
            
            self.logger.info(f"Scheduled key rotation for {device_name} slot {slot} at {time.ctime(scheduled_time)}")
            return True
        
        return False
    
    def rotate_key(self, device_name: str, slot: int, old_password: str, 
                   new_password: str, creator: str = "automated") -> bool:
        """Rotate key in specific slot"""
        if device_name not in self.devices:
            self.logger.error(f"Device {device_name} not found")
            return False
        
        device_path = self.devices[device_name]['device_path']
        
        try:
            # Backup old key if policy requires
            policy = self.rotation_policies.get(device_name, KeyRotationPolicy())
            if policy.backup_old_keys:
                self._backup_key_slot(device_name, slot)
            
            # Remove old key
            old_key_file = f"/tmp/luks_old_{device_name}_{slot}"
            with open(old_key_file, 'w') as f:
                f.write(old_password)
            Path(old_key_file).chmod(0o600)
            
            try:
                subprocess.run(['cryptsetup', 'luksRemoveKey', device_path, 
                              '--key-file', old_key_file, '--key-slot', str(slot)], 
                             check=True)
                
                # Add new key
                new_key_file = f"/tmp/luks_new_{device_name}_{slot}"
                with open(new_key_file, 'w') as f:
                    f.write(new_password)
                Path(new_key_file).chmod(0o600)
                
                # Use another key slot for authentication
                auth_slots = [s for s in self.key_metadata.get(device_name, {}).keys() if s != slot]
                if not auth_slots:
                    raise ValueError("No other key slots available for authentication")
                
                # For simplicity, this assumes you have the master password
                # In practice, you'd need a secure way to authenticate
                subprocess.run(['cryptsetup', 'luksAddKey', device_path, 
                              new_key_file, '--key-slot', str(slot)], 
                             input=old_password, text=True, check=True)
                
                # Update metadata
                self.key_metadata[device_name][slot] = KeyMetadata(
                    created_date=time.time(),
                    last_used=time.time(),
                    slot_number=slot,
                    purpose="rotated",
                    creator=creator,
                    scheduled_rotation=None
                )
                
                self.save_configuration()
                
                # Send notification
                if policy.notify_on_rotation:
                    self._notify_key_rotation(device_name, slot, creator)
                
                self.logger.info(f"Successfully rotated key for {device_name} slot {slot}")
                return True
                
            finally:
                # Clean up temporary files
                Path(old_key_file).unlink(missing_ok=True)
                Path(new_key_file).unlink(missing_ok=True)
                
        except Exception as e:
            self.logger.error(f"Failed to rotate key for {device_name} slot {slot}: {e}")
            return False
    
    def _backup_key_slot(self, device_name: str, slot: int) -> bool:
        """Backup key slot information"""
        try:
            backup_dir = Path(f"/etc/luks/backups/{device_name}")
            backup_dir.mkdir(parents=True, exist_ok=True)
            backup_dir.chmod(0o700)
            
            # Backup key metadata
            if device_name in self.key_metadata and slot in self.key_metadata[device_name]:
                metadata = self.key_metadata[device_name][slot]
                backup_file = backup_dir / f"slot_{slot}_metadata_{int(time.time())}.json"
                
                with open(backup_file, 'w') as f:
                    json.dump(asdict(metadata), f, indent=2)
                backup_file.chmod(0o600)
            
            self.logger.info(f"Backed up key slot {slot} for {device_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to backup key slot {slot} for {device_name}: {e}")
            return False
    
    def _notify_key_rotation(self, device_name: str, slot: int, creator: str) -> None:
        """Send notification about key rotation"""
        message = f"LUKS key rotation completed for device {device_name}, slot {slot} by {creator}"
        
        # Log to syslog
        self.logger.warning(message)
        
        # Send email notifications
        for handler in self.notification_handlers:
            try:
                handler(device_name, slot, creator, message)
            except Exception as e:
                self.logger.error(f"Notification handler failed: {e}")
    
    def add_notification_handler(self, handler_func) -> None:
        """Add notification handler function"""
        self.notification_handlers.append(handler_func)
    
    def _setup_scheduler(self) -> None:
        """Setup automated key rotation scheduler"""
        def check_all_devices():
            for device_name in self.devices:
                slots_needing_rotation = self.check_key_rotation_needed(device_name)
                
                for slot in slots_needing_rotation:
                    metadata = self.key_metadata[device_name][slot]
                    policy = self.rotation_policies[device_name]
                    
                    # Check if rotation is already scheduled
                    if metadata.scheduled_rotation is None:
                        # Schedule rotation for next maintenance window (example: 2 AM)
                        next_rotation = time.time() + 86400  # Tomorrow
                        self.schedule_key_rotation(device_name, slot, next_rotation)
                        
                        # Send warning notification
                        warning_message = f"Key rotation scheduled for {device_name} slot {slot}"
                        self.logger.warning(warning_message)
        
        def process_scheduled_rotations():
            current_time = time.time()
            
            for device_name, device_metadata in self.key_metadata.items():
                for slot, metadata in device_metadata.items():
                    if (metadata.scheduled_rotation and 
                        metadata.scheduled_rotation <= current_time):
                        
                        policy = self.rotation_policies.get(device_name, KeyRotationPolicy())
                        
                        if policy.require_dual_approval:
                            self.logger.warning(f"Key rotation for {device_name} slot {slot} requires manual approval")
                            # In practice, you'd implement an approval workflow
                        else:
                            # Auto-rotate with generated password
                            new_password = self._generate_secure_password()
                            if self.rotate_key(device_name, slot, "current_password", new_password, "automated"):
                                self.logger.info(f"Automated key rotation completed for {device_name} slot {slot}")
        
        # Schedule daily checks
        schedule.every().day.at("01:00").do(check_all_devices)
        schedule.every().hour.do(process_scheduled_rotations)
        
        # Start scheduler thread
        def run_scheduler():
            while True:
                schedule.run_pending()
                time.sleep(60)
        
        scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
        scheduler_thread.start()
        
        self.logger.info("Key rotation scheduler started")
    
    def _generate_secure_password(self, length: int = 32) -> str:
        """Generate secure password for key rotation"""
        import secrets
        import string
        
        alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
        return ''.join(secrets.choice(alphabet) for _ in range(length))
    
    def generate_key_management_report(self) -> str:
        """Generate comprehensive key management report"""
        report = []
        report.append("ENTERPRISE LUKS KEY MANAGEMENT REPORT")
        report.append("=" * 50)
        report.append(f"Generated: {time.ctime()}")
        report.append("")
        
        for device_name, device_info in self.devices.items():
            report.append(f"Device: {device_name}")
            report.append(f"Path: {device_info['device_path']}")
            
            policy = self.rotation_policies.get(device_name, KeyRotationPolicy())
            report.append(f"Rotation Interval: {policy.rotation_interval_days} days")
            
            device_metadata = self.key_metadata.get(device_name, {})
            report.append(f"Key Slots: {len(device_metadata)}")
            
            for slot, metadata in device_metadata.items():
                age_days = (time.time() - metadata.created_date) / 86400
                status = "OK"
                
                if age_days >= policy.rotation_interval_days:
                    status = "NEEDS ROTATION"
                elif age_days >= (policy.rotation_interval_days - policy.warning_days_before):
                    status = "WARNING"
                
                report.append(f"  Slot {slot}: {metadata.purpose} by {metadata.creator} "
                             f"({age_days:.1f} days) - {status}")
                
                if metadata.scheduled_rotation:
                    report.append(f"    Scheduled rotation: {time.ctime(metadata.scheduled_rotation)}")
            
            report.append("")
        
        return "\n".join(report)

# Email notification handler
def email_notification_handler(smtp_server: str, smtp_port: int, 
                             username: str, password: str, 
                             recipients: List[str]):
    """Create email notification handler"""
    def handler(device_name: str, slot: int, creator: str, message: str):
        msg = MIMEMultipart()
        msg['From'] = username
        msg['To'] = ', '.join(recipients)
        msg['Subject'] = f"LUKS Key Rotation - {device_name}"
        
        body = f"""
LUKS Key Rotation Notification

Device: {device_name}
Key Slot: {slot}
Rotated By: {creator}
Timestamp: {time.ctime()}

Message: {message}

This is an automated notification from the Enterprise LUKS Key Management System.
"""
        
        msg.attach(MIMEText(body, 'plain'))
        
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(username, password)
            server.send_message(msg)
    
    return handler

# Webhook notification handler
def webhook_notification_handler(webhook_url: str, auth_token: Optional[str] = None):
    """Create webhook notification handler"""
    def handler(device_name: str, slot: int, creator: str, message: str):
        payload = {
            'device_name': device_name,
            'key_slot': slot,
            'creator': creator,
            'message': message,
            'timestamp': time.time()
        }
        
        headers = {'Content-Type': 'application/json'}
        if auth_token:
            headers['Authorization'] = f'Bearer {auth_token}'
        
        requests.post(webhook_url, json=payload, headers=headers, timeout=10)
    
    return handler

# Example usage
def setup_enterprise_key_management():
    """Example enterprise key management setup"""
    key_manager = EnterpriseKeyManager()
    
    # Add devices
    key_manager.add_device(
        "root_disk", 
        "/dev/sda1", 
        KeyRotationPolicy(
            rotation_interval_days=60,
            warning_days_before=7,
            require_dual_approval=True
        )
    )
    
    key_manager.add_device(
        "data_disk", 
        "/dev/sdb1", 
        KeyRotationPolicy(
            rotation_interval_days=90,
            warning_days_before=14,
            require_dual_approval=False
        )
    )
    
    # Add notification handlers
    email_handler = email_notification_handler(
        "smtp.company.com", 587,
        "luks-alerts@company.com", "password",
        ["admin@company.com", "security@company.com"]
    )
    key_manager.add_notification_handler(email_handler)
    
    webhook_handler = webhook_notification_handler(
        "https://monitoring.company.com/webhooks/luks-alerts",
        "auth_token_here"
    )
    key_manager.add_notification_handler(webhook_handler)
    
    return key_manager

if __name__ == "__main__":
    # Demonstration
    key_manager = setup_enterprise_key_management()
    
    print("Enterprise Key Management System initialized")
    print(key_manager.generate_key_management_report())

This comprehensive enterprise LUKS guide provides production-ready frameworks for full disk encryption, remote unlock automation, and advanced key management. The included tools support high availability deployments, automated security operations, and enterprise-grade compliance requirements essential for protecting critical infrastructure in modern data center environments.