Enterprise network infrastructure requires sophisticated automation frameworks, declarative configuration management, and comprehensive orchestration systems to ensure consistent, reliable, and scalable network deployments across thousands of systems. This guide covers advanced Netplan implementations, enterprise network automation architectures, production configuration management, and comprehensive infrastructure as code strategies for modern data centers.

Enterprise Network Automation Architecture

Declarative Network Configuration Strategy

Enterprise network management demands declarative, version-controlled, and automated configuration systems that provide consistency, auditability, and rapid deployment capabilities across diverse infrastructure environments.

Enterprise Netplan Architecture Framework

┌─────────────────────────────────────────────────────────────────┐
│             Enterprise Network Automation System                │
├─────────────────┬─────────────────┬─────────────────┬───────────┤
│  Config Layer   │  Validation     │  Deployment     │ Monitoring│
├─────────────────┼─────────────────┼─────────────────┼───────────┤
│ ┌─────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │ ┌───────┐ │
│ │ Git/IaC     │ │ │ Schema Valid│ │ │ Ansible     │ │ │ Metrics│ │
│ │ Templates   │ │ │ Lint/Test   │ │ │ Terraform   │ │ │ Alerts │ │
│ │ Inventory   │ │ │ Simulation  │ │ │ Kubernetes  │ │ │ Logs   │ │
│ │ Secrets     │ │ │ Compliance  │ │ │ CI/CD       │ │ │ SIEM   │ │
│ └─────────────┘ │ └─────────────┘ │ └─────────────┘ │ └───────┘ │
│                 │                 │                 │           │
│ • Version ctrl  │ • Pre-deploy    │ • Zero-touch    │ • Real    │
│ • Declarative   │ • Policy check  │ • Idempotent    │ • Time    │
│ • Multi-env     │ • Security scan │ • Rollback      │ • Audit   │
└─────────────────┴─────────────────┴─────────────────┴───────────┘

Network Configuration Maturity Model

LevelConfigurationDeploymentValidationScale
ManualText filesSSH/consoleManual test10s
ScriptedShell scriptsAutomation toolsBasic checks100s
ManagedConfig managementOrchestrationAutomated tests1000s
EnterpriseGitOps/IaCFull CI/CDPolicy as code10000s+

Advanced Netplan Configuration Framework

Enterprise Network Configuration System

#!/usr/bin/env python3
"""
Enterprise Netplan Network Configuration Management Framework
"""

import os
import sys
import json
import yaml
import logging
import asyncio
import ipaddress
import subprocess
from typing import Dict, List, Optional, Tuple, Any, Union, Set
from dataclasses import dataclass, asdict, field
from pathlib import Path
from enum import Enum
from datetime import datetime
import jinja2
import jsonschema
from cryptography.fernet import Fernet
import consul
import etcd3
from prometheus_client import Counter, Gauge, Histogram
import aiofiles
import aiohttp
from netaddr import IPNetwork, IPAddress
import dns.resolver
import paramiko
from ansible_runner import run as ansible_run

class NetworkType(Enum):
    PHYSICAL = "physical"
    VLAN = "vlan"
    BOND = "bond"
    BRIDGE = "bridge"
    VXLAN = "vxlan"
    WIREGUARD = "wireguard"
    OPENVSWITCH = "openvswitch"

class ConfigState(Enum):
    PENDING = "pending"
    VALIDATING = "validating"
    DEPLOYING = "deploying"
    ACTIVE = "active"
    FAILED = "failed"
    ROLLBACK = "rollback"

class ValidationLevel(Enum):
    SYNTAX = "syntax"
    SEMANTIC = "semantic"
    POLICY = "policy"
    SECURITY = "security"
    PERFORMANCE = "performance"

@dataclass
class NetworkInterface:
    """Network interface configuration"""
    name: str
    type: NetworkType
    addresses: List[str] = field(default_factory=list)
    gateway4: Optional[str] = None
    gateway6: Optional[str] = None
    nameservers: Dict[str, List[str]] = field(default_factory=dict)
    mtu: int = 1500
    dhcp4: bool = False
    dhcp6: bool = False
    dhcp4_overrides: Dict[str, Any] = field(default_factory=dict)
    dhcp6_overrides: Dict[str, Any] = field(default_factory=dict)
    routes: List[Dict[str, Any]] = field(default_factory=list)
    routing_policy: List[Dict[str, Any]] = field(default_factory=list)
    link_local: List[str] = field(default_factory=list)
    critical: bool = True
    optional: bool = False
    activation_mode: str = "manual"
    parameters: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class NetplanConfig:
    """Complete Netplan configuration"""
    version: int = 2
    renderer: str = "networkd"
    ethernets: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    wifis: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    bonds: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    bridges: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    vlans: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    tunnels: Dict[str, Dict[str, Any]] = field(default_factory=dict)

@dataclass
class NetworkPolicy:
    """Network security and compliance policy"""
    name: str
    description: str
    rules: List[Dict[str, Any]]
    enforcement: str = "strict"
    exceptions: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)

class EnterpriseNetplanManager:
    """Enterprise Netplan configuration management system"""
    
    def __init__(self, config_path: str):
        self.config = self._load_config(config_path)
        self.logger = self._setup_logging()
        self.template_env = self._setup_templates()
        self.consul_client = consul.Consul(
            host=self.config.get('consul_host', 'localhost'),
            port=self.config.get('consul_port', 8500)
        )
        self.etcd_client = etcd3.client(
            host=self.config.get('etcd_host', 'localhost'),
            port=self.config.get('etcd_port', 2379)
        )
        
        # Metrics
        self.config_deployments = Counter('netplan_deployments_total',
                                        'Total Netplan deployments',
                                        ['status', 'environment'])
        self.validation_errors = Counter('netplan_validation_errors_total',
                                       'Total validation errors',
                                       ['type', 'severity'])
        self.config_drift = Gauge('netplan_config_drift',
                                'Configuration drift detected',
                                ['hostname', 'interface'])
        
        # Schema for validation
        self.netplan_schema = self._load_netplan_schema()
        self.policies = self._load_policies()
    
    def _load_config(self, config_path: str) -> Dict[str, Any]:
        """Load configuration from file"""
        with open(config_path, 'r') as f:
            return yaml.safe_load(f)
    
    def _setup_logging(self) -> logging.Logger:
        """Setup enterprise logging"""
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # File handler with rotation
        from logging.handlers import RotatingFileHandler
        file_handler = RotatingFileHandler(
            '/var/log/netplan-manager/netplan-manager.log',
            maxBytes=10*1024*1024,  # 10MB
            backupCount=10
        )
        file_handler.setLevel(logging.DEBUG)
        
        # Syslog handler for SIEM
        syslog_handler = logging.handlers.SysLogHandler(
            address=(self.config.get('syslog_host', 'localhost'), 514)
        )
        syslog_handler.setLevel(logging.WARNING)
        
        # Formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        for handler in [console_handler, file_handler, syslog_handler]:
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        
        return logger
    
    def _setup_templates(self) -> jinja2.Environment:
        """Setup Jinja2 template environment"""
        template_dir = self.config.get('template_dir', '/etc/netplan-manager/templates')
        
        env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(template_dir),
            autoescape=True,
            trim_blocks=True,
            lstrip_blocks=True
        )
        
        # Add custom filters
        env.filters['ipaddr'] = self._ipaddr_filter
        env.filters['ipsubnet'] = self._ipsubnet_filter
        env.filters['hwaddr'] = self._hwaddr_filter
        
        return env
    
    def _ipaddr_filter(self, value: str, query: str = '') -> str:
        """Jinja2 filter for IP address manipulation"""
        try:
            if query == 'address':
                return str(ipaddress.ip_interface(value).ip)
            elif query == 'network':
                return str(ipaddress.ip_interface(value).network)
            elif query == 'netmask':
                return str(ipaddress.ip_interface(value).netmask)
            elif query == 'prefix':
                return str(ipaddress.ip_interface(value).network.prefixlen)
            else:
                return str(ipaddress.ip_interface(value))
        except:
            return value
    
    def _ipsubnet_filter(self, value: str, prefix: int) -> str:
        """Jinja2 filter for subnet calculation"""
        try:
            network = ipaddress.ip_network(f"{value}/{prefix}", strict=False)
            return str(network)
        except:
            return value
    
    def _hwaddr_filter(self, value: str, format: str = 'linux') -> str:
        """Jinja2 filter for MAC address formatting"""
        # Remove any existing separators
        mac = value.replace(':', '').replace('-', '').replace('.', '')
        
        if format == 'linux':
            # aa:bb:cc:dd:ee:ff
            return ':'.join(mac[i:i+2] for i in range(0, 12, 2)).lower()
        elif format == 'windows':
            # AA-BB-CC-DD-EE-FF
            return '-'.join(mac[i:i+2] for i in range(0, 12, 2)).upper()
        elif format == 'cisco':
            # aabb.ccdd.eeff
            return '.'.join(mac[i:i+4] for i in range(0, 12, 4)).lower()
        else:
            return value
    
    def _load_netplan_schema(self) -> Dict[str, Any]:
        """Load Netplan JSON schema for validation"""
        schema_path = self.config.get('netplan_schema', '/etc/netplan-manager/netplan-schema.json')
        
        if os.path.exists(schema_path):
            with open(schema_path, 'r') as f:
                return json.load(f)
        else:
            # Simplified schema if file not found
            return {
                "$schema": "http://json-schema.org/draft-07/schema#",
                "type": "object",
                "properties": {
                    "network": {
                        "type": "object",
                        "properties": {
                            "version": {"type": "integer", "enum": [2]},
                            "renderer": {"type": "string", "enum": ["networkd", "NetworkManager"]},
                            "ethernets": {"type": "object"},
                            "bonds": {"type": "object"},
                            "bridges": {"type": "object"},
                            "vlans": {"type": "object"}
                        },
                        "required": ["version"]
                    }
                },
                "required": ["network"]
            }
    
    def _load_policies(self) -> List[NetworkPolicy]:
        """Load network policies"""
        policies = []
        policy_dir = self.config.get('policy_dir', '/etc/netplan-manager/policies')
        
        if os.path.exists(policy_dir):
            for policy_file in Path(policy_dir).glob('*.yaml'):
                with open(policy_file, 'r') as f:
                    policy_data = yaml.safe_load(f)
                    policies.append(NetworkPolicy(**policy_data))
        
        return policies
    
    async def generate_config(self,
                            hostname: str,
                            environment: str = 'production',
                            variables: Dict[str, Any] = None) -> NetplanConfig:
        """Generate Netplan configuration for host"""
        self.logger.info(f"Generating configuration for {hostname} in {environment}")
        
        # Get host data from inventory
        host_data = await self._get_host_data(hostname)
        
        # Merge with provided variables
        if variables:
            host_data.update(variables)
        
        # Add environment-specific data
        host_data['environment'] = environment
        host_data['hostname'] = hostname
        
        # Load and render template
        template_name = host_data.get('network_template', 'default.yaml.j2')
        template = self.template_env.get_template(template_name)
        
        # Render configuration
        rendered = template.render(**host_data)
        
        # Parse YAML
        config_dict = yaml.safe_load(rendered)
        
        # Create NetplanConfig object
        config = self._dict_to_netplan_config(config_dict.get('network', {}))
        
        # Validate configuration
        validation_results = await self.validate_config(config, hostname)
        
        if not all(r['passed'] for r in validation_results):
            raise ValueError(f"Configuration validation failed: {validation_results}")
        
        return config
    
    async def _get_host_data(self, hostname: str) -> Dict[str, Any]:
        """Get host data from inventory systems"""
        host_data = {}
        
        # Try Consul first
        try:
            _, data = self.consul_client.kv.get(f"hosts/{hostname}/network")
            if data:
                host_data.update(json.loads(data['Value'].decode()))
        except Exception as e:
            self.logger.warning(f"Failed to get data from Consul: {e}")
        
        # Try etcd
        try:
            value = self.etcd_client.get(f"/hosts/{hostname}/network")
            if value:
                host_data.update(json.loads(value))
        except Exception as e:
            self.logger.warning(f"Failed to get data from etcd: {e}")
        
        # Try external API
        if self.config.get('inventory_api'):
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(
                        f"{self.config['inventory_api']}/hosts/{hostname}",
                        headers={'Authorization': f"Bearer {self.config.get('api_token')}"}
                    ) as response:
                        if response.status == 200:
                            api_data = await response.json()
                            host_data.update(api_data.get('network', {}))
            except Exception as e:
                self.logger.warning(f"Failed to get data from API: {e}")
        
        # Apply defaults
        defaults = self.config.get('host_defaults', {})
        for key, value in defaults.items():
            if key not in host_data:
                host_data[key] = value
        
        return host_data
    
    def _dict_to_netplan_config(self, config_dict: Dict[str, Any]) -> NetplanConfig:
        """Convert dictionary to NetplanConfig object"""
        return NetplanConfig(
            version=config_dict.get('version', 2),
            renderer=config_dict.get('renderer', 'networkd'),
            ethernets=config_dict.get('ethernets', {}),
            wifis=config_dict.get('wifis', {}),
            bonds=config_dict.get('bonds', {}),
            bridges=config_dict.get('bridges', {}),
            vlans=config_dict.get('vlans', {}),
            tunnels=config_dict.get('tunnels', {})
        )
    
    async def validate_config(self,
                            config: NetplanConfig,
                            hostname: str,
                            level: ValidationLevel = ValidationLevel.POLICY) -> List[Dict[str, Any]]:
        """Validate Netplan configuration"""
        self.logger.info(f"Validating configuration for {hostname} at level {level.value}")
        
        results = []
        
        # Syntax validation
        if level.value in ['syntax', 'semantic', 'policy', 'security', 'performance']:
            syntax_result = await self._validate_syntax(config)
            results.append(syntax_result)
            if not syntax_result['passed']:
                return results
        
        # Semantic validation
        if level.value in ['semantic', 'policy', 'security', 'performance']:
            semantic_result = await self._validate_semantic(config, hostname)
            results.append(semantic_result)
            if not semantic_result['passed']:
                return results
        
        # Policy validation
        if level.value in ['policy', 'security', 'performance']:
            policy_result = await self._validate_policy(config, hostname)
            results.append(policy_result)
        
        # Security validation
        if level.value in ['security', 'performance']:
            security_result = await self._validate_security(config)
            results.append(security_result)
        
        # Performance validation
        if level.value == 'performance':
            perf_result = await self._validate_performance(config)
            results.append(perf_result)
        
        return results
    
    async def _validate_syntax(self, config: NetplanConfig) -> Dict[str, Any]:
        """Validate configuration syntax"""
        result = {
            'validation': 'syntax',
            'passed': True,
            'errors': [],
            'warnings': []
        }
        
        try:
            # Convert to dict for schema validation
            config_dict = {
                'network': {
                    'version': config.version,
                    'renderer': config.renderer
                }
            }
            
            # Add non-empty sections
            for section in ['ethernets', 'bonds', 'bridges', 'vlans']:
                section_data = getattr(config, section)
                if section_data:
                    config_dict['network'][section] = section_data
            
            # Validate against schema
            jsonschema.validate(config_dict, self.netplan_schema)
            
            # Additional syntax checks
            for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
                interfaces = getattr(config, iface_type)
                for name, iface_config in interfaces.items():
                    # Check interface naming
                    if not self._valid_interface_name(name):
                        result['errors'].append(
                            f"Invalid interface name: {name}"
                        )
                        result['passed'] = False
                    
                    # Check IP addresses
                    for addr in iface_config.get('addresses', []):
                        try:
                            ipaddress.ip_interface(addr)
                        except ValueError:
                            result['errors'].append(
                                f"Invalid IP address on {name}: {addr}"
                            )
                            result['passed'] = False
                    
                    # Check MTU
                    mtu = iface_config.get('mtu', 1500)
                    if not 68 <= mtu <= 9000:
                        result['warnings'].append(
                            f"Unusual MTU on {name}: {mtu}"
                        )
        
        except jsonschema.ValidationError as e:
            result['passed'] = False
            result['errors'].append(f"Schema validation failed: {e.message}")
        except Exception as e:
            result['passed'] = False
            result['errors'].append(f"Syntax validation error: {str(e)}")
        
        if not result['passed']:
            self.validation_errors.labels(
                type='syntax',
                severity='error'
            ).inc()
        
        return result
    
    def _valid_interface_name(self, name: str) -> bool:
        """Check if interface name is valid"""
        import re
        
        # Linux interface naming rules
        if len(name) > 15:
            return False
        
        # Must not be empty or contain certain characters
        if not name or not re.match(r'^[a-zA-Z0-9._-]+$', name):
            return False
        
        # Reserved names
        reserved = ['all', 'default', 'lo']
        if name in reserved:
            return False
        
        return True
    
    async def _validate_semantic(self, config: NetplanConfig, hostname: str) -> Dict[str, Any]:
        """Validate configuration semantics"""
        result = {
            'validation': 'semantic',
            'passed': True,
            'errors': [],
            'warnings': []
        }
        
        # Check for duplicate IP addresses
        all_addresses = []
        for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                for addr in iface_config.get('addresses', []):
                    if addr in all_addresses:
                        result['errors'].append(
                            f"Duplicate IP address: {addr}"
                        )
                        result['passed'] = False
                    all_addresses.append(addr)
        
        # Check gateway configuration
        gateway_interfaces = []
        for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                if iface_config.get('gateway4') or iface_config.get('gateway6'):
                    gateway_interfaces.append(name)
        
        if len(gateway_interfaces) > 1:
            result['warnings'].append(
                f"Multiple interfaces with gateways: {gateway_interfaces}"
            )
        
        # Check bond configuration
        for bond_name, bond_config in config.bonds.items():
            interfaces = bond_config.get('interfaces', [])
            
            if len(interfaces) < 2:
                result['errors'].append(
                    f"Bond {bond_name} has less than 2 interfaces"
                )
                result['passed'] = False
            
            # Check bond mode
            params = bond_config.get('parameters', {})
            mode = params.get('mode', 'balance-rr')
            valid_modes = [
                'balance-rr', 'active-backup', 'balance-xor',
                'broadcast', '802.3ad', 'balance-tlb', 'balance-alb'
            ]
            
            if mode not in valid_modes:
                result['errors'].append(
                    f"Invalid bond mode for {bond_name}: {mode}"
                )
                result['passed'] = False
        
        # Check VLAN configuration
        for vlan_name, vlan_config in config.vlans.items():
            vlan_id = vlan_config.get('id')
            if not vlan_id or not 1 <= vlan_id <= 4094:
                result['errors'].append(
                    f"Invalid VLAN ID for {vlan_name}: {vlan_id}"
                )
                result['passed'] = False
            
            link = vlan_config.get('link')
            if not link:
                result['errors'].append(
                    f"No link interface specified for VLAN {vlan_name}"
                )
                result['passed'] = False
        
        if not result['passed']:
            self.validation_errors.labels(
                type='semantic',
                severity='error'
            ).inc()
        
        return result
    
    async def _validate_policy(self, config: NetplanConfig, hostname: str) -> Dict[str, Any]:
        """Validate configuration against policies"""
        result = {
            'validation': 'policy',
            'passed': True,
            'errors': [],
            'warnings': []
        }
        
        # Get host metadata for policy evaluation
        host_data = await self._get_host_data(hostname)
        
        for policy in self.policies:
            # Check if policy applies to this host
            if not self._policy_applies(policy, host_data):
                continue
            
            self.logger.debug(f"Evaluating policy: {policy.name}")
            
            for rule in policy.rules:
                rule_result = self._evaluate_policy_rule(rule, config, host_data)
                
                if not rule_result['passed']:
                    if policy.enforcement == 'strict':
                        result['errors'].extend(rule_result['violations'])
                        result['passed'] = False
                    else:
                        result['warnings'].extend(rule_result['violations'])
        
        if not result['passed']:
            self.validation_errors.labels(
                type='policy',
                severity='error'
            ).inc()
        
        return result
    
    def _policy_applies(self, policy: NetworkPolicy, host_data: Dict[str, Any]) -> bool:
        """Check if policy applies to host"""
        # Check exceptions
        hostname = host_data.get('hostname', '')
        if hostname in policy.exceptions:
            return False
        
        # Check policy metadata conditions
        conditions = policy.metadata.get('conditions', {})
        
        for key, value in conditions.items():
            host_value = host_data.get(key)
            
            if isinstance(value, list):
                if host_value not in value:
                    return False
            else:
                if host_value != value:
                    return False
        
        return True
    
    def _evaluate_policy_rule(self,
                            rule: Dict[str, Any],
                            config: NetplanConfig,
                            host_data: Dict[str, Any]) -> Dict[str, Any]:
        """Evaluate a single policy rule"""
        result = {
            'passed': True,
            'violations': []
        }
        
        rule_type = rule.get('type')
        
        if rule_type == 'required_interface':
            # Check for required interfaces
            required = rule.get('interfaces', [])
            all_interfaces = set()
            
            for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
                interfaces = getattr(config, iface_type)
                all_interfaces.update(interfaces.keys())
            
            for req_iface in required:
                if req_iface not in all_interfaces:
                    result['passed'] = False
                    result['violations'].append(
                        f"Required interface missing: {req_iface}"
                    )
        
        elif rule_type == 'mtu_range':
            # Check MTU ranges
            min_mtu = rule.get('min', 1500)
            max_mtu = rule.get('max', 9000)
            
            for iface_type in ['ethernets', 'bonds', 'bridges']:
                interfaces = getattr(config, iface_type)
                for name, iface_config in interfaces.items():
                    mtu = iface_config.get('mtu', 1500)
                    if not min_mtu <= mtu <= max_mtu:
                        result['passed'] = False
                        result['violations'].append(
                            f"Interface {name} MTU {mtu} outside range "
                            f"[{min_mtu}, {max_mtu}]"
                        )
        
        elif rule_type == 'ip_range':
            # Check IP address ranges
            allowed_ranges = [IPNetwork(r) for r in rule.get('ranges', [])]
            
            for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
                interfaces = getattr(config, iface_type)
                for name, iface_config in interfaces.items():
                    for addr in iface_config.get('addresses', []):
                        ip = IPAddress(addr.split('/')[0])
                        
                        if not any(ip in range for range in allowed_ranges):
                            result['passed'] = False
                            result['violations'].append(
                                f"Interface {name} IP {addr} not in allowed ranges"
                            )
        
        elif rule_type == 'dns_servers':
            # Check DNS server configuration
            required_dns = rule.get('servers', [])
            
            for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
                interfaces = getattr(config, iface_type)
                for name, iface_config in interfaces.items():
                    nameservers = iface_config.get('nameservers', {})
                    addresses = nameservers.get('addresses', [])
                    
                    if addresses and not all(dns in required_dns for dns in addresses):
                        result['passed'] = False
                        result['violations'].append(
                            f"Interface {name} has unauthorized DNS servers"
                        )
        
        return result
    
    async def _validate_security(self, config: NetplanConfig) -> Dict[str, Any]:
        """Validate security aspects of configuration"""
        result = {
            'validation': 'security',
            'passed': True,
            'errors': [],
            'warnings': []
        }
        
        # Check for interfaces with DHCP enabled
        dhcp_interfaces = []
        for iface_type in ['ethernets', 'bonds', 'bridges']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                if iface_config.get('dhcp4') or iface_config.get('dhcp6'):
                    dhcp_interfaces.append(name)
        
        if dhcp_interfaces:
            result['warnings'].append(
                f"Interfaces with DHCP enabled: {dhcp_interfaces}"
            )
        
        # Check for missing critical interfaces
        for iface_type in ['ethernets', 'bonds', 'bridges']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                if not iface_config.get('critical', True):
                    result['warnings'].append(
                        f"Interface {name} marked as non-critical"
                    )
        
        # Check for public IP addresses
        public_interfaces = []
        for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                for addr in iface_config.get('addresses', []):
                    ip = ipaddress.ip_interface(addr)
                    if ip.is_global:
                        public_interfaces.append(f"{name}: {addr}")
        
        if public_interfaces:
            result['warnings'].append(
                f"Interfaces with public IPs: {public_interfaces}"
            )
        
        # Check routing policy
        for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                routing_policy = iface_config.get('routing-policy', [])
                if routing_policy:
                    result['warnings'].append(
                        f"Interface {name} has custom routing policy"
                    )
        
        return result
    
    async def _validate_performance(self, config: NetplanConfig) -> Dict[str, Any]:
        """Validate performance aspects of configuration"""
        result = {
            'validation': 'performance',
            'passed': True,
            'errors': [],
            'warnings': []
        }
        
        # Check MTU consistency
        mtu_map = {}
        for iface_type in ['ethernets', 'bonds', 'bridges']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                mtu = iface_config.get('mtu', 1500)
                mtu_map[name] = mtu
        
        # Check VLAN MTU
        for vlan_name, vlan_config in config.vlans.items():
            vlan_mtu = vlan_config.get('mtu', 1500)
            link = vlan_config.get('link')
            
            if link and link in mtu_map:
                parent_mtu = mtu_map[link]
                if vlan_mtu > parent_mtu:
                    result['errors'].append(
                        f"VLAN {vlan_name} MTU ({vlan_mtu}) exceeds "
                        f"parent {link} MTU ({parent_mtu})"
                    )
                    result['passed'] = False
        
        # Check bond configuration for performance
        for bond_name, bond_config in config.bonds.items():
            params = bond_config.get('parameters', {})
            mode = params.get('mode', 'balance-rr')
            
            # Check lacp-rate for 802.3ad
            if mode == '802.3ad':
                lacp_rate = params.get('lacp-rate', 'slow')
                if lacp_rate == 'slow':
                    result['warnings'].append(
                        f"Bond {bond_name} using slow LACP rate"
                    )
            
            # Check mii-monitor-interval
            mii_interval = params.get('mii-monitor-interval', 100)
            if mii_interval > 100:
                result['warnings'].append(
                    f"Bond {bond_name} has high MII monitor interval: {mii_interval}ms"
                )
        
        # Check for jumbo frames consistency
        jumbo_interfaces = []
        standard_interfaces = []
        
        for iface_type in ['ethernets', 'bonds', 'bridges']:
            interfaces = getattr(config, iface_type)
            for name, iface_config in interfaces.items():
                mtu = iface_config.get('mtu', 1500)
                if mtu > 1500:
                    jumbo_interfaces.append(f"{name}({mtu})")
                else:
                    standard_interfaces.append(f"{name}({mtu})")
        
        if jumbo_interfaces and standard_interfaces:
            result['warnings'].append(
                f"Mixed MTU sizes - Jumbo: {jumbo_interfaces}, "
                f"Standard: {standard_interfaces}"
            )
        
        if not result['passed']:
            self.validation_errors.labels(
                type='performance',
                severity='error'
            ).inc()
        
        return result
    
    async def deploy_config(self,
                          config: NetplanConfig,
                          hostname: str,
                          method: str = 'ansible',
                          dry_run: bool = False) -> Dict[str, Any]:
        """Deploy configuration to host"""
        self.logger.info(f"Deploying configuration to {hostname} using {method}")
        
        deployment_result = {
            'hostname': hostname,
            'method': method,
            'timestamp': datetime.utcnow().isoformat(),
            'status': ConfigState.PENDING.value,
            'details': {}
        }
        
        try:
            # Generate configuration file
            config_content = self._generate_netplan_yaml(config)
            
            # Store configuration for audit
            await self._store_config_version(hostname, config_content)
            
            if dry_run:
                deployment_result['status'] = ConfigState.ACTIVE.value
                deployment_result['details']['dry_run'] = True
                deployment_result['details']['config'] = config_content
                return deployment_result
            
            # Deploy based on method
            if method == 'ansible':
                result = await self._deploy_with_ansible(hostname, config_content)
            elif method == 'ssh':
                result = await self._deploy_with_ssh(hostname, config_content)
            elif method == 'api':
                result = await self._deploy_with_api(hostname, config_content)
            else:
                raise ValueError(f"Unknown deployment method: {method}")
            
            deployment_result['details'] = result
            
            if result['success']:
                deployment_result['status'] = ConfigState.ACTIVE.value
                self.config_deployments.labels(
                    status='success',
                    environment=self.config.get('environment', 'production')
                ).inc()
                
                # Schedule post-deployment validation
                asyncio.create_task(
                    self._post_deployment_validation(hostname, config)
                )
            else:
                deployment_result['status'] = ConfigState.FAILED.value
                self.config_deployments.labels(
                    status='failure',
                    environment=self.config.get('environment', 'production')
                ).inc()
                
                # Attempt rollback if configured
                if self.config.get('auto_rollback', True):
                    rollback_result = await self._rollback_config(hostname)
                    deployment_result['rollback'] = rollback_result
            
        except Exception as e:
            self.logger.error(f"Deployment failed: {e}")
            deployment_result['status'] = ConfigState.FAILED.value
            deployment_result['error'] = str(e)
            
            self.config_deployments.labels(
                status='error',
                environment=self.config.get('environment', 'production')
            ).inc()
        
        return deployment_result
    
    def _generate_netplan_yaml(self, config: NetplanConfig) -> str:
        """Generate Netplan YAML from configuration object"""
        config_dict = {
            'network': {
                'version': config.version,
                'renderer': config.renderer
            }
        }
        
        # Add non-empty sections
        for section in ['ethernets', 'wifis', 'bonds', 'bridges', 'vlans', 'tunnels']:
            section_data = getattr(config, section)
            if section_data:
                config_dict['network'][section] = section_data
        
        # Convert to YAML with proper formatting
        yaml_content = yaml.dump(
            config_dict,
            default_flow_style=False,
            allow_unicode=True,
            sort_keys=False
        )
        
        return yaml_content
    
    async def _store_config_version(self, hostname: str, config_content: str):
        """Store configuration version for audit and rollback"""
        timestamp = datetime.utcnow().isoformat()
        
        # Store in Consul
        try:
            self.consul_client.kv.put(
                f"netplan/configs/{hostname}/current",
                config_content.encode()
            )
            self.consul_client.kv.put(
                f"netplan/configs/{hostname}/history/{timestamp}",
                config_content.encode()
            )
        except Exception as e:
            self.logger.error(f"Failed to store config in Consul: {e}")
        
        # Store in etcd
        try:
            self.etcd_client.put(
                f"/netplan/configs/{hostname}/current",
                config_content
            )
            self.etcd_client.put(
                f"/netplan/configs/{hostname}/history/{timestamp}",
                config_content
            )
        except Exception as e:
            self.logger.error(f"Failed to store config in etcd: {e}")
    
    async def _deploy_with_ansible(self, hostname: str, config_content: str) -> Dict[str, Any]:
        """Deploy using Ansible"""
        result = {
            'success': False,
            'output': '',
            'errors': ''
        }
        
        # Create temporary playbook
        playbook = [{
            'name': 'Deploy Netplan Configuration',
            'hosts': hostname,
            'gather_facts': False,
            'tasks': [
                {
                    'name': 'Backup current configuration',
                    'copy': {
                        'src': '/etc/netplan/',
                        'dest': '/etc/netplan.backup/',
                        'remote_src': True,
                        'backup': True
                    }
                },
                {
                    'name': 'Deploy new configuration',
                    'copy': {
                        'content': config_content,
                        'dest': '/etc/netplan/50-cloud-init.yaml',
                        'owner': 'root',
                        'group': 'root',
                        'mode': '0600',
                        'backup': True
                    }
                },
                {
                    'name': 'Validate configuration',
                    'command': 'netplan generate',
                    'register': 'generate_result',
                    'failed_when': 'generate_result.rc != 0'
                },
                {
                    'name': 'Apply configuration',
                    'command': 'netplan apply',
                    'register': 'apply_result',
                    'when': 'generate_result.rc == 0'
                }
            ]
        }]
        
        # Write playbook to temporary file
        import tempfile
        with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
            yaml.dump(playbook, f)
            playbook_path = f.name
        
        try:
            # Run ansible-playbook
            ansible_result = ansible_run(
                playbook=playbook_path,
                inventory=self.config.get('ansible_inventory'),
                extravars={
                    'ansible_user': self.config.get('ansible_user', 'root'),
                    'ansible_ssh_private_key_file': self.config.get('ansible_key_file')
                }
            )
            
            result['success'] = ansible_result.status == 'successful'
            result['output'] = ansible_result.stdout.read()
            
            if not result['success']:
                result['errors'] = ansible_result.stderr.read()
        
        finally:
            # Cleanup
            os.unlink(playbook_path)
        
        return result
    
    async def _deploy_with_ssh(self, hostname: str, config_content: str) -> Dict[str, Any]:
        """Deploy using direct SSH"""
        result = {
            'success': False,
            'output': '',
            'errors': ''
        }
        
        try:
            # Setup SSH connection
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            ssh.connect(
                hostname,
                username=self.config.get('ssh_user', 'root'),
                key_filename=self.config.get('ssh_key_file'),
                timeout=30
            )
            
            # Backup current configuration
            stdin, stdout, stderr = ssh.exec_command(
                'cp -r /etc/netplan/ /etc/netplan.backup/'
            )
            stdout.read()
            
            # Write new configuration
            sftp = ssh.open_sftp()
            with sftp.file('/etc/netplan/50-cloud-init.yaml', 'w') as f:
                f.write(config_content)
            sftp.close()
            
            # Validate configuration
            stdin, stdout, stderr = ssh.exec_command('netplan generate')
            generate_output = stdout.read().decode()
            generate_error = stderr.read().decode()
            
            if stderr.channel.recv_exit_status() != 0:
                result['errors'] = f"Validation failed: {generate_error}"
                
                # Restore backup
                ssh.exec_command('rm -rf /etc/netplan/ && mv /etc/netplan.backup/ /etc/netplan/')
                return result
            
            # Apply configuration
            stdin, stdout, stderr = ssh.exec_command('netplan apply')
            apply_output = stdout.read().decode()
            apply_error = stderr.read().decode()
            
            if stderr.channel.recv_exit_status() == 0:
                result['success'] = True
                result['output'] = f"{generate_output}\n{apply_output}"
            else:
                result['errors'] = f"Apply failed: {apply_error}"
                
                # Restore backup
                ssh.exec_command('rm -rf /etc/netplan/ && mv /etc/netplan.backup/ /etc/netplan/')
                ssh.exec_command('netplan apply')
            
            ssh.close()
            
        except Exception as e:
            result['errors'] = f"SSH deployment failed: {str(e)}"
        
        return result
    
    async def _post_deployment_validation(self, hostname: str, config: NetplanConfig):
        """Validate configuration after deployment"""
        await asyncio.sleep(30)  # Wait for network to stabilize
        
        try:
            # Check connectivity
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            ssh.connect(
                hostname,
                username=self.config.get('ssh_user', 'root'),
                key_filename=self.config.get('ssh_key_file'),
                timeout=10
            )
            
            # Check interface status
            stdin, stdout, stderr = ssh.exec_command('ip -j link show')
            link_data = json.loads(stdout.read().decode())
            
            # Check each configured interface
            expected_interfaces = set()
            for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
                interfaces = getattr(config, iface_type)
                expected_interfaces.update(interfaces.keys())
            
            actual_interfaces = {link['ifname'] for link in link_data}
            
            missing_interfaces = expected_interfaces - actual_interfaces
            if missing_interfaces:
                self.logger.error(
                    f"Missing interfaces on {hostname}: {missing_interfaces}"
                )
                self.config_drift.labels(
                    hostname=hostname,
                    interface='multiple'
                ).set(len(missing_interfaces))
            
            # Check IP addresses
            stdin, stdout, stderr = ssh.exec_command('ip -j addr show')
            addr_data = json.loads(stdout.read().decode())
            
            # Validate each interface configuration
            for iface in addr_data:
                iface_name = iface['ifname']
                
                # Find configuration for this interface
                iface_config = None
                for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
                    interfaces = getattr(config, iface_type)
                    if iface_name in interfaces:
                        iface_config = interfaces[iface_name]
                        break
                
                if iface_config:
                    # Check addresses
                    expected_addrs = set(iface_config.get('addresses', []))
                    actual_addrs = set()
                    
                    for addr_info in iface.get('addr_info', []):
                        addr = f"{addr_info['local']}/{addr_info['prefixlen']}"
                        actual_addrs.add(addr)
                    
                    if expected_addrs != actual_addrs:
                        self.logger.warning(
                            f"Address mismatch on {hostname}:{iface_name} - "
                            f"Expected: {expected_addrs}, Actual: {actual_addrs}"
                        )
                        self.config_drift.labels(
                            hostname=hostname,
                            interface=iface_name
                        ).set(1)
            
            ssh.close()
            
        except Exception as e:
            self.logger.error(f"Post-deployment validation failed: {e}")


class NetplanConfigGenerator:
    """Generate Netplan configurations from templates"""
    
    def __init__(self, template_dir: str):
        self.template_dir = Path(template_dir)
        self.env = jinja2.Environment(
            loader=jinja2.FileSystemLoader(self.template_dir),
            autoescape=True,
            trim_blocks=True,
            lstrip_blocks=True
        )
    
    def generate_datacenter_config(self,
                                 hostname: str,
                                 role: str,
                                 datacenter: str,
                                 rack: str) -> str:
        """Generate datacenter server configuration"""
        template = self.env.get_template('datacenter.yaml.j2')
        
        # Calculate network parameters based on location
        pod = int(rack[1:3])  # Extract pod from rack ID
        rack_num = int(rack[3:5])  # Extract rack number
        
        # Management network: 10.{datacenter}.{pod}.{rack}/24
        mgmt_network = f"10.{self._dc_to_octet(datacenter)}.{pod}.0/24"
        mgmt_ip = f"10.{self._dc_to_octet(datacenter)}.{pod}.{rack_num}"
        
        # Storage network: 172.16.{pod}.0/24
        storage_network = f"172.16.{pod}.0/24"
        storage_ip = f"172.16.{pod}.{rack_num}"
        
        # vMotion network: 172.17.{pod}.0/24
        vmotion_network = f"172.17.{pod}.0/24"
        vmotion_ip = f"172.17.{pod}.{rack_num}"
        
        return template.render(
            hostname=hostname,
            role=role,
            datacenter=datacenter,
            rack=rack,
            mgmt_ip=mgmt_ip,
            mgmt_network=mgmt_network,
            storage_ip=storage_ip,
            storage_network=storage_network,
            vmotion_ip=vmotion_ip,
            vmotion_network=vmotion_network,
            dns_servers=['8.8.8.8', '8.8.4.4'],
            ntp_servers=['0.pool.ntp.org', '1.pool.ntp.org'],
            domain='example.com'
        )
    
    def _dc_to_octet(self, datacenter: str) -> int:
        """Convert datacenter code to IP octet"""
        dc_map = {
            'dc1': 1,
            'dc2': 2,
            'dc3': 3,
            'aws-us-east-1': 10,
            'aws-us-west-2': 11,
            'azure-eastus': 20,
            'azure-westus': 21
        }
        return dc_map.get(datacenter.lower(), 99)


async def main():
    """Main execution function"""
    import argparse
    
    parser = argparse.ArgumentParser(description='Enterprise Netplan Manager')
    parser.add_argument('--config', default='/etc/netplan-manager/config.yaml',
                       help='Configuration file path')
    parser.add_argument('--action', required=True,
                       choices=['generate', 'validate', 'deploy', 'rollback', 'status'],
                       help='Action to perform')
    parser.add_argument('--hostname', required=True, help='Target hostname')
    parser.add_argument('--environment', default='production',
                       help='Deployment environment')
    parser.add_argument('--template', help='Template name')
    parser.add_argument('--dry-run', action='store_true',
                       help='Perform dry run')
    parser.add_argument('--method', default='ansible',
                       choices=['ansible', 'ssh', 'api'],
                       help='Deployment method')
    
    args = parser.parse_args()
    
    # Initialize manager
    manager = EnterpriseNetplanManager(args.config)
    
    try:
        if args.action == 'generate':
            # Generate configuration
            config = await manager.generate_config(
                args.hostname,
                args.environment
            )
            
            # Output YAML
            yaml_content = manager._generate_netplan_yaml(config)
            print(yaml_content)
        
        elif args.action == 'validate':
            # Generate and validate configuration
            config = await manager.generate_config(
                args.hostname,
                args.environment
            )
            
            results = await manager.validate_config(
                config,
                args.hostname,
                ValidationLevel.POLICY
            )
            
            # Display results
            for result in results:
                print(f"\n{result['validation'].upper()} Validation:")
                print(f"  Status: {'PASSED' if result['passed'] else 'FAILED'}")
                
                if result['errors']:
                    print("  Errors:")
                    for error in result['errors']:
                        print(f"    - {error}")
                
                if result['warnings']:
                    print("  Warnings:")
                    for warning in result['warnings']:
                        print(f"    - {warning}")
        
        elif args.action == 'deploy':
            # Generate, validate, and deploy
            config = await manager.generate_config(
                args.hostname,
                args.environment
            )
            
            deployment = await manager.deploy_config(
                config,
                args.hostname,
                method=args.method,
                dry_run=args.dry_run
            )
            
            print(json.dumps(deployment, indent=2))
        
        elif args.action == 'rollback':
            # Rollback to previous configuration
            result = await manager._rollback_config(args.hostname)
            print(json.dumps(result, indent=2))
        
        elif args.action == 'status':
            # Get current status
            # This would query the system for current configuration
            print(f"Status check for {args.hostname}")
    
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    asyncio.run(main())

Enterprise Netplan Templates

Production Template Examples

Datacenter Server Template

# datacenter.yaml.j2 - Enterprise datacenter server network configuration
network:
  version: 2
  renderer: networkd
  
  ethernets:
    # Management interface
    eno1:
      addresses:
        - {{ mgmt_ip }}/24
      gateway4: {{ mgmt_network | ipaddr('1') | ipaddr('address') }}
      nameservers:
        addresses: {{ dns_servers }}
        search:
          - {{ domain }}
      routes:
        - to: 10.0.0.0/8
          via: {{ mgmt_network | ipaddr('1') | ipaddr('address') }}
          metric: 100
    
    # Storage network interfaces (bonded)
    eno2:
      mtu: 9000
    eno3:
      mtu: 9000
    
    # vMotion/Migration interfaces (bonded)
    eno4:
      mtu: 9000
    eno5:
      mtu: 9000
    
    # VM traffic interfaces (bonded)
    eno6:
      mtu: 1500
    eno7:
      mtu: 1500
  
  bonds:
    # Storage bond (LACP)
    bond-storage:
      interfaces:
        - eno2
        - eno3
      addresses:
        - {{ storage_ip }}/24
      mtu: 9000
      parameters:
        mode: 802.3ad
        lacp-rate: fast
        mii-monitor-interval: 50
        transmit-hash-policy: layer3+4
    
    # vMotion bond (Active/Backup)
    bond-vmotion:
      interfaces:
        - eno4
        - eno5
      addresses:
        - {{ vmotion_ip }}/24
      mtu: 9000
      parameters:
        mode: active-backup
        mii-monitor-interval: 50
        primary: eno4
    
    # VM traffic bond (LACP)
    bond-vm:
      interfaces:
        - eno6
        - eno7
      mtu: 1500
      parameters:
        mode: 802.3ad
        lacp-rate: fast
        mii-monitor-interval: 50
        transmit-hash-policy: layer2+3
  
  vlans:
    # Production VLANs
    {% for vlan_id in production_vlans %}
    vlan{{ vlan_id }}:
      id: {{ vlan_id }}
      link: bond-vm
      addresses:
        - {{ vlan_networks[vlan_id] | ipsubnet(24, rack_num) }}
    {% endfor %}

Kubernetes Node Template

# kubernetes.yaml.j2 - Kubernetes node network configuration
network:
  version: 2
  renderer: networkd
  
  ethernets:
    # Primary interface
    {{ primary_interface }}:
      dhcp4: false
      dhcp6: false
      {% if not bonding_enabled %}
      addresses:
        - {{ node_ip }}/{{ subnet_prefix }}
      gateway4: {{ gateway }}
      nameservers:
        addresses: {{ dns_servers }}
      {% endif %}
    
    {% if bonding_enabled %}
    # Secondary interface for bonding
    {{ secondary_interface }}:
      dhcp4: false
      dhcp6: false
    {% endif %}
  
  {% if bonding_enabled %}
  bonds:
    bond0:
      interfaces:
        - {{ primary_interface }}
        - {{ secondary_interface }}
      addresses:
        - {{ node_ip }}/{{ subnet_prefix }}
      gateway4: {{ gateway }}
      nameservers:
        addresses: {{ dns_servers }}
      parameters:
        mode: {{ bond_mode | default('802.3ad') }}
        lacp-rate: fast
        mii-monitor-interval: 100
  {% endif %}
  
  bridges:
    # CNI bridge for pod networking
    cni0:
      addresses:
        - {{ pod_network | default('10.244.0.1/24') }}
      forward-delay: 0
      stp: false
      parameters:
        forward-delay: 0
    
    {% if calico_enabled %}
    # Calico IPIP tunnel bridge
    tunl0:
      addresses:
        - {{ calico_tunnel_ip }}/32
      parameters:
        mode: ipip
    {% endif %}

Network Automation Scripts

Bulk Configuration Deployment

#!/bin/bash
# deploy-netplan-bulk.sh - Deploy Netplan configurations to multiple hosts

set -euo pipefail

# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
INVENTORY_FILE="${1:-/etc/netplan-manager/inventory.yaml}"
ENVIRONMENT="${2:-production}"
LOG_DIR="/var/log/netplan-deployments"
PARALLEL_JOBS=10

# Create log directory
mkdir -p "$LOG_DIR"
LOG_FILE="$LOG_DIR/bulk-deploy-$(date +%Y%m%d-%H%M%S).log"

log() {
    echo "[$(date +'%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"
}

error() {
    echo "[$(date +'%Y-%m-%d %H:%M:%S')] ERROR: $*" | tee -a "$LOG_FILE" >&2
}

# Load inventory
load_inventory() {
    log "Loading inventory from $INVENTORY_FILE"
    
    # Parse YAML inventory
    python3 -c "
import yaml
import sys

with open('$INVENTORY_FILE', 'r') as f:
    inventory = yaml.safe_load(f)

for group, hosts in inventory.get('hosts', {}).items():
    if '$ENVIRONMENT' in hosts.get('environments', []):
        for host in hosts.get('members', []):
            print(f\"{host} {group}\")
"
}

# Deploy to single host
deploy_host() {
    local hostname="$1"
    local group="$2"
    local log_file="$LOG_DIR/deploy-${hostname}-$(date +%Y%m%d-%H%M%S).log"
    
    log "Deploying to $hostname (group: $group)"
    
    # Generate configuration
    if ! netplan-manager --action generate \
                       --hostname "$hostname" \
                       --environment "$ENVIRONMENT" \
                       > "/tmp/netplan-${hostname}.yaml" 2>"$log_file"; then
        error "Failed to generate config for $hostname"
        return 1
    fi
    
    # Validate configuration
    if ! netplan-manager --action validate \
                       --hostname "$hostname" \
                       --environment "$ENVIRONMENT" \
                       >> "$log_file" 2>&1; then
        error "Validation failed for $hostname"
        return 1
    fi
    
    # Deploy configuration
    if ! netplan-manager --action deploy \
                       --hostname "$hostname" \
                       --environment "$ENVIRONMENT" \
                       --method ansible \
                       >> "$log_file" 2>&1; then
        error "Deployment failed for $hostname"
        return 1
    fi
    
    log "Successfully deployed to $hostname"
    return 0
}

# Export function for parallel execution
export -f deploy_host log error
export LOG_DIR ENVIRONMENT

# Main deployment
main() {
    log "Starting bulk Netplan deployment for environment: $ENVIRONMENT"
    
    # Get host list
    hosts=$(load_inventory)
    
    if [[ -z "$hosts" ]]; then
        error "No hosts found in inventory for environment $ENVIRONMENT"
        exit 1
    fi
    
    # Count hosts
    host_count=$(echo "$hosts" | wc -l)
    log "Found $host_count hosts to deploy"
    
    # Deploy in parallel
    echo "$hosts" | parallel -j "$PARALLEL_JOBS" --colsep ' ' deploy_host {1} {2}
    
    # Check results
    success_count=$(grep -c "Successfully deployed" "$LOG_FILE" || true)
    failed_count=$((host_count - success_count))
    
    log "Deployment complete: $success_count succeeded, $failed_count failed"
    
    if [[ $failed_count -gt 0 ]]; then
        error "Some deployments failed. Check logs in $LOG_DIR"
        exit 1
    fi
}

# Run main function
main

Network Configuration Validator

#!/usr/bin/env python3
"""
Network Configuration Compliance Validator
"""

import asyncio
import argparse
import yaml
from typing import List, Dict, Any
from pathlib import Path

class NetworkComplianceValidator:
    """Validate network configurations against compliance policies"""
    
    def __init__(self, policy_file: str):
        self.policies = self._load_policies(policy_file)
        self.violations = []
    
    def _load_policies(self, policy_file: str) -> Dict[str, Any]:
        """Load compliance policies"""
        with open(policy_file, 'r') as f:
            return yaml.safe_load(f)
    
    async def validate_host(self, hostname: str, config_file: str) -> bool:
        """Validate host configuration"""
        with open(config_file, 'r') as f:
            config = yaml.safe_load(f)
        
        network_config = config.get('network', {})
        
        # Run all validators
        validators = [
            self._validate_ip_ranges,
            self._validate_vlan_usage,
            self._validate_mtu_settings,
            self._validate_security_settings,
            self._validate_redundancy
        ]
        
        for validator in validators:
            await validator(hostname, network_config)
        
        return len(self.violations) == 0
    
    async def _validate_ip_ranges(self, hostname: str, config: Dict[str, Any]):
        """Validate IP address assignments"""
        allowed_ranges = self.policies.get('ip_ranges', {})
        
        for iface_type in ['ethernets', 'bonds', 'bridges', 'vlans']:
            interfaces = config.get(iface_type, {})
            
            for iface_name, iface_config in interfaces.items():
                for address in iface_config.get('addresses', []):
                    if not self._ip_in_allowed_ranges(address, allowed_ranges):
                        self.violations.append({
                            'host': hostname,
                            'interface': iface_name,
                            'type': 'ip_range',
                            'message': f'IP {address} not in allowed ranges'
                        })
    
    async def _validate_redundancy(self, hostname: str, config: Dict[str, Any]):
        """Validate network redundancy requirements"""
        redundancy_required = self.policies.get('redundancy_required', [])
        
        bonds = config.get('bonds', {})
        
        for req_type in redundancy_required:
            if req_type == 'management' and not any('mgmt' in b for b in bonds):
                self.violations.append({
                    'host': hostname,
                    'type': 'redundancy',
                    'message': 'No redundant management interface found'
                })

# Example compliance policy file:
"""
# compliance-policy.yaml
ip_ranges:
  management:
    - 10.0.0.0/8
  production:
    - 172.16.0.0/12
  dmz:
    - 192.168.0.0/16

vlan_ranges:
  production: [100, 199]
  development: [200, 299]
  management: [10, 19]

mtu_requirements:
  storage: 9000
  vmotion: 9000
  standard: 1500

security_requirements:
  - no_dhcp_on_production
  - no_public_ips_on_internal
  - firewall_zones_required

redundancy_required:
  - management
  - storage
  - production
"""

Production Operations

Monitoring and Alerting

Netplan Configuration Drift Detection

#!/usr/bin/env python3
"""
Netplan Configuration Drift Detector
"""

import asyncio
import difflib
import hashlib
from datetime import datetime
from prometheus_client import Gauge, Counter

class DriftDetector:
    """Detect configuration drift across fleet"""
    
    def __init__(self, manager):
        self.manager = manager
        self.drift_gauge = Gauge(
            'netplan_config_drift_score',
            'Configuration drift score',
            ['hostname']
        )
        self.drift_events = Counter(
            'netplan_drift_events_total',
            'Total drift events detected',
            ['hostname', 'severity']
        )
    
    async def check_drift(self, hostname: str) -> Dict[str, Any]:
        """Check for configuration drift"""
        # Get expected configuration
        expected = await self.manager.generate_config(hostname)
        expected_yaml = self.manager._generate_netplan_yaml(expected)
        
        # Get actual configuration
        actual_yaml = await self._get_actual_config(hostname)
        
        # Calculate drift
        drift_score = self._calculate_drift_score(expected_yaml, actual_yaml)
        
        self.drift_gauge.labels(hostname=hostname).set(drift_score)
        
        if drift_score > 0:
            severity = 'high' if drift_score > 50 else 'low'
            self.drift_events.labels(
                hostname=hostname,
                severity=severity
            ).inc()
            
            return {
                'hostname': hostname,
                'drift_detected': True,
                'drift_score': drift_score,
                'expected_hash': hashlib.sha256(expected_yaml.encode()).hexdigest(),
                'actual_hash': hashlib.sha256(actual_yaml.encode()).hexdigest(),
                'diff': list(difflib.unified_diff(
                    expected_yaml.splitlines(),
                    actual_yaml.splitlines(),
                    fromfile='expected',
                    tofile='actual'
                ))
            }
        
        return {
            'hostname': hostname,
            'drift_detected': False,
            'drift_score': 0
        }

Troubleshooting Guide

Common Issues and Solutions

1. Network Connectivity Lost After Apply

# Emergency recovery procedure
# Boot with recovery/single-user mode

# Mount root filesystem
mount -o remount,rw /

# Restore backup configuration
cp /etc/netplan.backup/*.yaml /etc/netplan/

# Remove problematic configuration
rm /etc/netplan/50-cloud-init.yaml

# Apply working configuration
netplan --debug generate
netplan --debug apply

# If still failing, use networkd directly
systemctl stop systemd-networkd
systemctl start systemd-networkd

2. Validation Errors

# Debug validation issues
netplan --debug generate

# Check systemd-networkd status
systemctl status systemd-networkd
journalctl -u systemd-networkd -f

# Validate YAML syntax
python3 -c "import yaml; yaml.safe_load(open('/etc/netplan/50-cloud-init.yaml'))"

# Test configuration without applying
netplan try --timeout 120

3. Bond Interface Issues

# Check bond status
cat /proc/net/bonding/bond0

# Monitor LACP
tcpdump -i eno1 -n "ether proto 0x8809"

# Force bond interface up
ip link set bond0 up
ip link set eno1 master bond0
ip link set eno2 master bond0

# Check switch LACP configuration
show lacp neighbor  # On switch

4. VLAN Connectivity Problems

# Check VLAN interface
ip -d link show vlan100

# Verify VLAN tagging
tcpdump -i bond0 -n -e vlan

# Test specific VLAN
ip link add link bond0 name test100 type vlan id 100
ip addr add 192.168.100.1/24 dev test100
ip link set test100 up
ping 192.168.100.254

Best Practices

1. Configuration Management

  • Always version control configurations
  • Use templates for consistency
  • Implement pre-deployment validation
  • Maintain configuration backups
  • Document all customizations

2. Security Considerations

  • Restrict configuration file permissions (600)
  • Use encrypted communication for deployment
  • Implement network segmentation
  • Regular security audits
  • Monitor for unauthorized changes

3. Performance Optimization

  • Use appropriate MTU sizes
  • Configure bond modes correctly
  • Implement traffic shaping where needed
  • Monitor interface statistics
  • Regular performance testing

4. Operational Excellence

  • Automated deployment pipelines
  • Comprehensive monitoring
  • Drift detection and remediation
  • Regular disaster recovery drills
  • Clear rollback procedures

Conclusion

Enterprise Netplan network automation provides the foundation for scalable, reliable, and secure network infrastructure management. By implementing comprehensive automation frameworks, validation systems, and monitoring solutions, organizations can maintain consistent network configurations across thousands of systems while ensuring compliance, security, and operational efficiency.

The combination of declarative configuration, infrastructure as code principles, and sophisticated orchestration enables network teams to manage complex environments with confidence, reducing manual errors and accelerating deployment times while maintaining the highest standards of reliability and security.