Enterprise edge computing deployments on Raspberry Pi require sophisticated automation frameworks, robust fleet management systems, and comprehensive infrastructure orchestration to deliver reliable IoT solutions at scale. This guide covers advanced Ubuntu deployment strategies on ARM devices, enterprise-grade edge computing architectures, automated provisioning systems, and production fleet management for thousands of edge nodes.

Enterprise Edge Computing Architecture Overview

Raspberry Pi Fleet Management Strategy

Enterprise edge deployments demand comprehensive management across thousands of distributed devices, requiring automated provisioning, centralized monitoring, secure communications, and resilient update mechanisms to maintain operational excellence.

Enterprise Edge Infrastructure Framework

┌─────────────────────────────────────────────────────────────────┐
│              Enterprise Edge Computing Architecture             │
├─────────────────┬─────────────────┬─────────────────┬───────────┤
│  Device Layer   │  Platform Layer │  Management     │  Cloud    │
├─────────────────┼─────────────────┼─────────────────┼───────────┤
│ ┌─────────────┐ │ ┌─────────────┐ │ ┌─────────────┐ │ ┌───────┐ │
│ │ RPi 4/CM4   │ │ │ Ubuntu Core │ │ │ Fleet Mgmt  │ │ │ AWS   │ │
│ │ Hardware    │ │ │ K3s/MicroK8s│ │ │ Monitoring  │ │ │ Azure │ │
│ │ Sensors     │ │ │ Containers  │ │ │ Updates     │ │ │ GCP   │ │
│ │ Actuators   │ │ │ Edge Apps   │ │ │ Security    │ │ │ Edge  │ │
│ └─────────────┘ │ └─────────────┘ │ └─────────────┘ │ └───────┘ │
│                 │                 │                 │           │
│ • Distributed   │ • Lightweight   │ • Centralized   │ • Hybrid  │
│ • Low power     │ • Secure        │ • Scalable      │ • ML/AI   │
│ • Resilient     │ • Real-time     │ • Automated     │ • Storage │
└─────────────────┴─────────────────┴─────────────────┴───────────┘

Edge Deployment Maturity Model

LevelDevice ManagementApplication DeploymentMonitoringScale
BasicManual setupDirect installLocal logs10s
ManagedScripted provisioningContainer-basedRemote access100s
AdvancedAutomated imagingOrchestratedCentralized1000s
EnterpriseZero-touch provisioningGitOps/CI/CDAI-driven10000s+

Advanced Raspberry Pi Provisioning Framework

Enterprise Ubuntu Edge Deployment System

#!/usr/bin/env python3
"""
Enterprise Raspberry Pi Ubuntu Deployment and Management Framework
"""

import os
import sys
import json
import yaml
import logging
import time
import hashlib
import asyncio
import aiohttp
import subprocess
from typing import Dict, List, Optional, Tuple, Any, Union
from dataclasses import dataclass, asdict, field
from pathlib import Path
from enum import Enum
import paramiko
import netifaces
import psutil
from cryptography.fernet import Fernet
import paho.mqtt.client as mqtt
import docker
from kubernetes import client, config as k8s_config

class DeviceState(Enum):
    UNPROVISIONED = "unprovisioned"
    PROVISIONING = "provisioning"
    INITIALIZING = "initializing"
    READY = "ready"
    UPDATING = "updating"
    ERROR = "error"
    OFFLINE = "offline"

class DeviceRole(Enum):
    GATEWAY = "gateway"
    SENSOR = "sensor"
    COMPUTE = "compute"
    STORAGE = "storage"
    HYBRID = "hybrid"

class DeploymentType(Enum):
    BARE_METAL = "bare_metal"
    CONTAINER = "container"
    KUBERNETES = "kubernetes"
    SNAP = "snap"

@dataclass
class NetworkConfiguration:
    interface: str
    dhcp: bool = True
    ip_address: Optional[str] = None
    netmask: Optional[str] = None
    gateway: Optional[str] = None
    dns_servers: List[str] = field(default_factory=list)
    wifi_ssid: Optional[str] = None
    wifi_password: Optional[str] = None
    vlan_id: Optional[int] = None

@dataclass
class DeviceProfile:
    device_id: str
    hostname: str
    role: DeviceRole
    model: str  # Pi 3B+, Pi 4, CM4, etc.
    serial_number: str
    network_config: NetworkConfiguration
    ssh_keys: List[str]
    timezone: str = "UTC"
    locale: str = "en_US.UTF-8"
    packages: List[str] = field(default_factory=list)
    services: List[str] = field(default_factory=list)
    environment_vars: Dict[str, str] = field(default_factory=dict)

@dataclass
class ApplicationDeployment:
    name: str
    version: str
    deployment_type: DeploymentType
    image: Optional[str] = None
    compose_file: Optional[str] = None
    helm_chart: Optional[str] = None
    snap_name: Optional[str] = None
    config: Dict[str, Any] = field(default_factory=dict)
    resources: Dict[str, Any] = field(default_factory=dict)
    health_check: Dict[str, Any] = field(default_factory=dict)

class EnterpriseEdgeOrchestrator:
    def __init__(self, config_file: str = "edge_config.yaml"):
        self.config = self._load_config(config_file)
        self.devices = {}
        self.applications = {}
        self.deployment_queue = []
        
        # Initialize components
        self._setup_logging()
        self._initialize_services()
        
    def _load_config(self, config_file: str) -> Dict:
        """Load edge orchestration configuration"""
        try:
            with open(config_file, 'r') as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            return self._create_default_config()
    
    def _create_default_config(self) -> Dict:
        """Create default configuration"""
        return {
            'management': {
                'server': 'edge-control.enterprise.com',
                'mqtt_broker': 'mqtt.enterprise.com',
                'api_endpoint': 'https://api.enterprise.com/edge',
                'update_server': 'https://updates.enterprise.com'
            },
            'provisioning': {
                'cloud_init_server': 'http://cloud-init.enterprise.com',
                'default_user': 'ubuntu',
                'default_password_hash': '$6$rounds=4096$salt$hash',
                'ssh_authorized_keys': []
            },
            'networking': {
                'management_vlan': 100,
                'data_vlan': 200,
                'dns_servers': ['8.8.8.8', '8.8.4.4'],
                'ntp_servers': ['time1.google.com', 'time2.google.com']
            },
            'security': {
                'tls_enabled': True,
                'certificate_authority': '/etc/edge/ca.crt',
                'device_cert_path': '/etc/edge/device.crt',
                'device_key_path': '/etc/edge/device.key',
                'encryption_key': Fernet.generate_key().decode()
            },
            'monitoring': {
                'prometheus_pushgateway': 'http://prometheus-push.enterprise.com:9091',
                'loki_endpoint': 'http://loki.enterprise.com:3100',
                'metrics_interval': 60,
                'log_level': 'INFO'
            },
            'updates': {
                'automatic_updates': True,
                'update_window': '02:00-04:00',
                'rollback_enabled': True,
                'max_rollback_attempts': 3
            }
        }
    
    def _setup_logging(self):
        """Setup enterprise logging with remote shipping"""
        log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        
        # Console handler
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(logging.Formatter(log_format))
        
        # File handler with rotation
        from logging.handlers import RotatingFileHandler
        file_handler = RotatingFileHandler(
            '/var/log/edge-orchestrator.log',
            maxBytes=10485760,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(logging.Formatter(log_format))
        
        # Configure root logger
        logging.basicConfig(
            level=getattr(logging, self.config['monitoring']['log_level']),
            handlers=[console_handler, file_handler]
        )
        
        self.logger = logging.getLogger(__name__)
        
        # Setup remote log shipping if configured
        if 'loki_endpoint' in self.config['monitoring']:
            self._setup_loki_handler()
    
    def _setup_loki_handler(self):
        """Setup Loki log shipping"""
        try:
            from python_logging_loki import LokiHandler
            
            loki_handler = LokiHandler(
                url=f"{self.config['monitoring']['loki_endpoint']}/loki/api/v1/push",
                tags={"service": "edge-orchestrator"},
                version="1"
            )
            
            logging.getLogger().addHandler(loki_handler)
            self.logger.info("Loki log shipping configured")
        except ImportError:
            self.logger.warning("Loki handler not available, skipping remote logging")
    
    def _initialize_services(self):
        """Initialize orchestration services"""
        # Initialize MQTT client for device communication
        self.mqtt_client = self._setup_mqtt_client()
        
        # Initialize Docker client for container management
        try:
            self.docker_client = docker.from_env()
            self.logger.info("Docker client initialized")
        except:
            self.logger.warning("Docker not available")
            self.docker_client = None
        
        # Initialize Kubernetes client if available
        try:
            k8s_config.load_incluster_config()
            self.k8s_client = client.ApiClient()
            self.logger.info("Kubernetes client initialized")
        except:
            self.logger.warning("Kubernetes not available")
            self.k8s_client = None
        
        # Start background tasks
        asyncio.create_task(self._device_discovery_loop())
        asyncio.create_task(self._health_check_loop())
        asyncio.create_task(self._update_check_loop())
    
    def _setup_mqtt_client(self) -> mqtt.Client:
        """Setup MQTT client for device communication"""
        client = mqtt.Client(client_id="edge-orchestrator")
        
        # Configure TLS if enabled
        if self.config['security']['tls_enabled']:
            client.tls_set(
                ca_certs=self.config['security']['certificate_authority'],
                certfile=self.config['security']['device_cert_path'],
                keyfile=self.config['security']['device_key_path']
            )
        
        # Set callbacks
        client.on_connect = self._on_mqtt_connect
        client.on_message = self._on_mqtt_message
        client.on_disconnect = self._on_mqtt_disconnect
        
        # Connect to broker
        try:
            client.connect(
                self.config['management']['mqtt_broker'],
                port=8883 if self.config['security']['tls_enabled'] else 1883,
                keepalive=60
            )
            client.loop_start()
            self.logger.info("MQTT client connected")
        except Exception as e:
            self.logger.error(f"MQTT connection failed: {e}")
        
        return client
    
    def _on_mqtt_connect(self, client, userdata, flags, rc):
        """MQTT connection callback"""
        if rc == 0:
            self.logger.info("Connected to MQTT broker")
            # Subscribe to device topics
            client.subscribe("edge/devices/+/status")
            client.subscribe("edge/devices/+/metrics")
            client.subscribe("edge/devices/+/logs")
            client.subscribe("edge/devices/+/command/response")
        else:
            self.logger.error(f"MQTT connection failed with code: {rc}")
    
    def _on_mqtt_message(self, client, userdata, msg):
        """Process MQTT messages from devices"""
        try:
            topic_parts = msg.topic.split('/')
            device_id = topic_parts[2]
            message_type = topic_parts[3]
            
            payload = json.loads(msg.payload.decode())
            
            if message_type == "status":
                self._handle_device_status(device_id, payload)
            elif message_type == "metrics":
                self._handle_device_metrics(device_id, payload)
            elif message_type == "logs":
                self._handle_device_logs(device_id, payload)
            elif message_type == "command" and len(topic_parts) > 4:
                self._handle_command_response(device_id, payload)
                
        except Exception as e:
            self.logger.error(f"Error processing MQTT message: {e}")
    
    def _on_mqtt_disconnect(self, client, userdata, rc):
        """MQTT disconnection callback"""
        if rc != 0:
            self.logger.warning(f"Unexpected MQTT disconnection: {rc}")
            # Attempt reconnection
            time.sleep(5)
            try:
                client.reconnect()
            except:
                self.logger.error("MQTT reconnection failed")
    
    async def provision_device(self, device_profile: DeviceProfile) -> str:
        """Provision a new Raspberry Pi device"""
        self.logger.info(f"Provisioning device: {device_profile.device_id}")
        
        # Generate cloud-init configuration
        cloud_init_config = self._generate_cloud_init(device_profile)
        
        # Create custom Ubuntu image if needed
        if device_profile.model in ["CM4", "Pi4-8GB"]:
            image_path = await self._create_custom_image(device_profile)
        else:
            image_path = self._get_standard_image(device_profile.model)
        
        # Flash image to SD card (if local provisioning)
        if self.config.get('local_provisioning', False):
            await self._flash_sd_card(image_path, device_profile)
        
        # Register device in management system
        self.devices[device_profile.device_id] = {
            'profile': device_profile,
            'state': DeviceState.PROVISIONING,
            'provisioned_at': time.time(),
            'last_seen': None,
            'metrics': {},
            'applications': []
        }
        
        # Wait for device to come online
        online = await self._wait_for_device_online(device_profile.device_id)
        
        if online:
            # Run post-provisioning tasks
            await self._run_post_provisioning(device_profile)
            
            self.devices[device_profile.device_id]['state'] = DeviceState.READY
            self.logger.info(f"Device provisioned successfully: {device_profile.device_id}")
        else:
            self.devices[device_profile.device_id]['state'] = DeviceState.ERROR
            self.logger.error(f"Device failed to come online: {device_profile.device_id}")
        
        return device_profile.device_id
    
    def _generate_cloud_init(self, device_profile: DeviceProfile) -> Dict:
        """Generate cloud-init configuration for device"""
        cloud_init = {
            'hostname': device_profile.hostname,
            'manage_etc_hosts': True,
            'users': [
                {
                    'name': self.config['provisioning']['default_user'],
                    'groups': ['sudo', 'docker'],
                    'shell': '/bin/bash',
                    'sudo': 'ALL=(ALL) NOPASSWD:ALL',
                    'ssh_authorized_keys': device_profile.ssh_keys
                }
            ],
            'packages': [
                'docker.io',
                'python3-pip',
                'git',
                'htop',
                'iotop',
                'vim',
                'curl',
                'wget',
                'jq'
            ] + device_profile.packages,
            'write_files': [
                {
                    'path': '/etc/netplan/01-netcfg.yaml',
                    'content': self._generate_netplan_config(device_profile.network_config)
                },
                {
                    'path': '/etc/systemd/timesyncd.conf',
                    'content': self._generate_ntp_config()
                },
                {
                    'path': '/etc/edge/device.json',
                    'content': json.dumps({
                        'device_id': device_profile.device_id,
                        'role': device_profile.role.value,
                        'model': device_profile.model
                    })
                }
            ],
            'runcmd': [
                # Set timezone
                f"timedatectl set-timezone {device_profile.timezone}",
                
                # Configure locale
                f"locale-gen {device_profile.locale}",
                f"update-locale LANG={device_profile.locale}",
                
                # Enable services
                "systemctl enable docker",
                "systemctl start docker",
                
                # Install edge agent
                f"curl -sSL {self.config['management']['server']}/install-agent.sh | bash",
                
                # Configure edge agent
                f"edge-agent configure --device-id {device_profile.device_id} --server {self.config['management']['server']}",
                
                # Start edge agent
                "systemctl enable edge-agent",
                "systemctl start edge-agent"
            ] + [f"systemctl enable {service}" for service in device_profile.services],
            'power_state': {
                'mode': 'reboot',
                'timeout': 30,
                'condition': True
            }
        }
        
        # Add WiFi configuration if present
        if device_profile.network_config.wifi_ssid:
            cloud_init['write_files'].append({
                'path': '/etc/netplan/02-wifi.yaml',
                'content': self._generate_wifi_config(device_profile.network_config)
            })
        
        return cloud_init
    
    def _generate_netplan_config(self, network_config: NetworkConfiguration) -> str:
        """Generate Netplan network configuration"""
        config = {
            'network': {
                'version': 2,
                'ethernets': {
                    network_config.interface: {}
                }
            }
        }
        
        eth_config = config['network']['ethernets'][network_config.interface]
        
        if network_config.dhcp:
            eth_config['dhcp4'] = True
        else:
            eth_config['dhcp4'] = False
            eth_config['addresses'] = [f"{network_config.ip_address}/{network_config.netmask}"]
            eth_config['gateway4'] = network_config.gateway
            eth_config['nameservers'] = {'addresses': network_config.dns_servers}
        
        if network_config.vlan_id:
            # Create VLAN interface
            vlan_name = f"{network_config.interface}.{network_config.vlan_id}"
            config['network']['vlans'] = {
                vlan_name: {
                    'id': network_config.vlan_id,
                    'link': network_config.interface
                }
            }
        
        return yaml.dump(config)
    
    def _generate_wifi_config(self, network_config: NetworkConfiguration) -> str:
        """Generate WiFi configuration"""
        config = {
            'network': {
                'version': 2,
                'wifis': {
                    'wlan0': {
                        'access-points': {
                            network_config.wifi_ssid: {
                                'password': network_config.wifi_password
                            }
                        },
                        'dhcp4': True
                    }
                }
            }
        }
        
        return yaml.dump(config)
    
    def _generate_ntp_config(self) -> str:
        """Generate NTP configuration"""
        ntp_servers = ' '.join(self.config['networking']['ntp_servers'])
        return f"""
[Time]
NTP={ntp_servers}
FallbackNTP=ntp.ubuntu.com
"""
    
    async def _create_custom_image(self, device_profile: DeviceProfile) -> str:
        """Create custom Ubuntu image for device"""
        self.logger.info(f"Creating custom image for {device_profile.model}")
        
        work_dir = f"/tmp/rpi-image-{device_profile.device_id}"
        os.makedirs(work_dir, exist_ok=True)
        
        # Download base Ubuntu image
        base_image = await self._download_ubuntu_image(device_profile.model)
        
        # Mount image
        mount_point = f"{work_dir}/mount"
        os.makedirs(mount_point, exist_ok=True)
        
        # Use kpartx to map partitions
        subprocess.run(['kpartx', '-av', base_image], check=True)
        
        # Mount root partition
        subprocess.run(['mount', '/dev/mapper/loop0p2', mount_point], check=True)
        
        try:
            # Customize image
            # Copy cloud-init configuration
            cloud_init_dir = f"{mount_point}/var/lib/cloud/seed/nocloud"
            os.makedirs(cloud_init_dir, exist_ok=True)
            
            with open(f"{cloud_init_dir}/user-data", 'w') as f:
                f.write("#cloud-config\n")
                yaml.dump(self._generate_cloud_init(device_profile), f)
            
            with open(f"{cloud_init_dir}/meta-data", 'w') as f:
                f.write(f"instance-id: {device_profile.device_id}\n")
                f.write(f"local-hostname: {device_profile.hostname}\n")
            
            # Install additional packages in chroot
            self._install_packages_chroot(mount_point, device_profile.packages)
            
            # Configure services
            self._configure_services_chroot(mount_point, device_profile.services)
            
            # Add custom scripts
            self._add_custom_scripts(mount_point, device_profile)
            
        finally:
            # Unmount
            subprocess.run(['umount', mount_point], check=True)
            subprocess.run(['kpartx', '-d', base_image], check=True)
        
        # Compress image
        compressed_image = f"{work_dir}/ubuntu-{device_profile.model}-{device_profile.device_id}.img.xz"
        subprocess.run(['xz', '-9', base_image, '-c'], 
                      stdout=open(compressed_image, 'wb'), check=True)
        
        self.logger.info(f"Custom image created: {compressed_image}")
        return compressed_image
    
    async def _download_ubuntu_image(self, model: str) -> str:
        """Download appropriate Ubuntu image for Pi model"""
        image_urls = {
            'Pi3B+': 'https://cdimage.ubuntu.com/releases/20.04/release/ubuntu-20.04.3-preinstalled-server-arm64+raspi.img.xz',
            'Pi4': 'https://cdimage.ubuntu.com/releases/20.04/release/ubuntu-20.04.3-preinstalled-server-arm64+raspi.img.xz',
            'CM4': 'https://cdimage.ubuntu.com/releases/20.04/release/ubuntu-20.04.3-preinstalled-server-arm64+raspi.img.xz'
        }
        
        url = image_urls.get(model.split('-')[0], image_urls['Pi4'])
        filename = url.split('/')[-1]
        image_path = f"/var/cache/rpi-images/{filename}"
        
        os.makedirs(os.path.dirname(image_path), exist_ok=True)
        
        # Download if not cached
        if not os.path.exists(image_path):
            self.logger.info(f"Downloading Ubuntu image from {url}")
            
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    with open(image_path, 'wb') as f:
                        async for chunk in response.content.iter_chunked(8192):
                            f.write(chunk)
        
        # Decompress
        decompressed_path = image_path.replace('.xz', '')
        if not os.path.exists(decompressed_path):
            subprocess.run(['xz', '-dk', image_path], check=True)
        
        return decompressed_path
    
    def _install_packages_chroot(self, mount_point: str, packages: List[str]):
        """Install packages in chroot environment"""
        if not packages:
            return
        
        # Copy resolv.conf for DNS
        subprocess.run(['cp', '/etc/resolv.conf', f"{mount_point}/etc/"], check=True)
        
        # Mount required filesystems
        for fs in ['proc', 'sys', 'dev']:
            subprocess.run(['mount', '--bind', f"/{fs}", f"{mount_point}/{fs}"], check=True)
        
        try:
            # Update package lists
            subprocess.run(['chroot', mount_point, 'apt-get', 'update'], check=True)
            
            # Install packages
            subprocess.run(['chroot', mount_point, 'apt-get', 'install', '-y'] + packages, 
                         check=True)
            
        finally:
            # Unmount filesystems
            for fs in ['dev', 'sys', 'proc']:
                subprocess.run(['umount', f"{mount_point}/{fs}"], check=False)
    
    def _configure_services_chroot(self, mount_point: str, services: List[str]):
        """Configure services in chroot environment"""
        for service in services:
            service_file = f"{mount_point}/etc/systemd/system/multi-user.target.wants/{service}.service"
            if os.path.exists(f"{mount_point}/lib/systemd/system/{service}.service"):
                os.makedirs(os.path.dirname(service_file), exist_ok=True)
                os.symlink(f"/lib/systemd/system/{service}.service", service_file)
    
    def _add_custom_scripts(self, mount_point: str, device_profile: DeviceProfile):
        """Add custom scripts to image"""
        scripts_dir = f"{mount_point}/opt/edge/scripts"
        os.makedirs(scripts_dir, exist_ok=True)
        
        # Add health check script
        health_check_script = """#!/bin/bash
# Edge device health check script

# Check system resources
CPU_USAGE=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
MEM_USAGE=$(free | grep Mem | awk '{print ($3/$2) * 100.0}')
DISK_USAGE=$(df -h / | awk 'NR==2 {print $5}' | cut -d'%' -f1)
TEMP=$(vcgencmd measure_temp | cut -d'=' -f2 | cut -d"'" -f1)

# Check services
DOCKER_STATUS=$(systemctl is-active docker)
AGENT_STATUS=$(systemctl is-active edge-agent)

# Generate health report
cat > /tmp/health.json <<EOF
{
  "timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
  "device_id": "{device_id}",
  "cpu_usage": $CPU_USAGE,
  "memory_usage": $MEM_USAGE,
  "disk_usage": $DISK_USAGE,
  "temperature": $TEMP,
  "services": {
    "docker": "$DOCKER_STATUS",
    "edge_agent": "$AGENT_STATUS"
  }
}
EOF

# Send to management server
mosquitto_pub -h {mqtt_broker} -t "edge/devices/{device_id}/metrics" -f /tmp/health.json
"""
        
        health_check_script = health_check_script.replace('{device_id}', device_profile.device_id)
        health_check_script = health_check_script.replace('{mqtt_broker}', 
                                                        self.config['management']['mqtt_broker'])
        
        with open(f"{scripts_dir}/health-check.sh", 'w') as f:
            f.write(health_check_script)
        
        os.chmod(f"{scripts_dir}/health-check.sh", 0o755)
        
        # Add to cron
        cron_file = f"{mount_point}/etc/cron.d/edge-health"
        with open(cron_file, 'w') as f:
            f.write("*/5 * * * * root /opt/edge/scripts/health-check.sh\n")
    
    async def _flash_sd_card(self, image_path: str, device_profile: DeviceProfile):
        """Flash image to SD card"""
        # This would interface with hardware to flash SD cards
        # For example, using a USB SD card writer
        self.logger.info(f"Flashing image to SD card for {device_profile.device_id}")
        
        # Find SD card device
        sd_device = self._find_sd_card()
        if not sd_device:
            raise Exception("No SD card found")
        
        # Flash image
        subprocess.run(['dd', f'if={image_path}', f'of={sd_device}', 'bs=4M', 
                       'conv=fsync', 'status=progress'], check=True)
        
        self.logger.info("Image flashed successfully")
    
    def _find_sd_card(self) -> Optional[str]:
        """Find SD card device"""
        # Look for removable block devices
        for device in Path('/sys/block').iterdir():
            if device.name.startswith('sd') or device.name.startswith('mmcblk'):
                removable = (device / 'removable').read_text().strip()
                if removable == '1':
                    return f"/dev/{device.name}"
        return None
    
    async def _wait_for_device_online(self, device_id: str, timeout: int = 600) -> bool:
        """Wait for device to come online after provisioning"""
        start_time = time.time()
        
        while (time.time() - start_time) < timeout:
            if device_id in self.devices and self.devices[device_id].get('last_seen'):
                last_seen = self.devices[device_id]['last_seen']
                if (time.time() - last_seen) < 60:
                    return True
            
            await asyncio.sleep(10)
        
        return False
    
    async def _run_post_provisioning(self, device_profile: DeviceProfile):
        """Run post-provisioning tasks"""
        self.logger.info(f"Running post-provisioning for {device_profile.device_id}")
        
        # Deploy initial applications based on role
        if device_profile.role == DeviceRole.GATEWAY:
            await self.deploy_application(device_profile.device_id, 
                                        self._get_gateway_applications())
        elif device_profile.role == DeviceRole.SENSOR:
            await self.deploy_application(device_profile.device_id, 
                                        self._get_sensor_applications())
        elif device_profile.role == DeviceRole.COMPUTE:
            await self.deploy_application(device_profile.device_id, 
                                        self._get_compute_applications())
    
    def _get_gateway_applications(self) -> List[ApplicationDeployment]:
        """Get default applications for gateway devices"""
        return [
            ApplicationDeployment(
                name="mosquitto",
                version="2.0",
                deployment_type=DeploymentType.CONTAINER,
                image="eclipse-mosquitto:2.0",
                config={
                    'ports': ['1883:1883', '8883:8883'],
                    'volumes': ['/etc/mosquitto:/mosquitto/config']
                }
            ),
            ApplicationDeployment(
                name="node-red",
                version="2.2",
                deployment_type=DeploymentType.CONTAINER,
                image="nodered/node-red:2.2",
                config={
                    'ports': ['1880:1880'],
                    'volumes': ['/data/node-red:/data']
                }
            )
        ]
    
    def _get_sensor_applications(self) -> List[ApplicationDeployment]:
        """Get default applications for sensor devices"""
        return [
            ApplicationDeployment(
                name="telegraf",
                version="1.21",
                deployment_type=DeploymentType.CONTAINER,
                image="telegraf:1.21",
                config={
                    'volumes': ['/etc/telegraf:/etc/telegraf:ro'],
                    'environment': {
                        'HOST_PROC': '/host/proc',
                        'HOST_SYS': '/host/sys'
                    }
                }
            )
        ]
    
    def _get_compute_applications(self) -> List[ApplicationDeployment]:
        """Get default applications for compute devices"""
        return [
            ApplicationDeployment(
                name="k3s",
                version="1.22",
                deployment_type=DeploymentType.BARE_METAL,
                config={
                    'install_script': 'https://get.k3s.io',
                    'args': '--disable traefik --disable servicelb'
                }
            )
        ]
    
    async def deploy_application(self, device_id: str, 
                                applications: List[ApplicationDeployment]):
        """Deploy applications to a device"""
        if device_id not in self.devices:
            raise ValueError(f"Unknown device: {device_id}")
        
        device = self.devices[device_id]
        
        for app in applications:
            self.logger.info(f"Deploying {app.name} to {device_id}")
            
            deployment_command = self._generate_deployment_command(app)
            
            # Send deployment command via MQTT
            command_id = f"{device_id}-{app.name}-{int(time.time())}"
            command_payload = {
                'command_id': command_id,
                'type': 'deploy_application',
                'application': asdict(app),
                'command': deployment_command
            }
            
            self.mqtt_client.publish(
                f"edge/devices/{device_id}/command",
                json.dumps(command_payload)
            )
            
            # Track deployment
            device['applications'].append({
                'name': app.name,
                'version': app.version,
                'deployed_at': time.time(),
                'status': 'deploying'
            })
            
            # Wait for deployment confirmation
            # This would be handled by the command response handler
    
    def _generate_deployment_command(self, app: ApplicationDeployment) -> str:
        """Generate deployment command based on deployment type"""
        if app.deployment_type == DeploymentType.CONTAINER:
            # Docker run command
            cmd = f"docker run -d --name {app.name} --restart unless-stopped"
            
            # Add port mappings
            for port in app.config.get('ports', []):
                cmd += f" -p {port}"
            
            # Add volume mappings
            for volume in app.config.get('volumes', []):
                cmd += f" -v {volume}"
            
            # Add environment variables
            for key, value in app.config.get('environment', {}).items():
                cmd += f" -e {key}={value}"
            
            # Add image
            cmd += f" {app.image}"
            
            return cmd
            
        elif app.deployment_type == DeploymentType.KUBERNETES:
            # Kubectl apply command
            return f"kubectl apply -f {app.helm_chart}"
            
        elif app.deployment_type == DeploymentType.SNAP:
            # Snap install command
            return f"snap install {app.snap_name}"
            
        elif app.deployment_type == DeploymentType.BARE_METAL:
            # Custom installation script
            if 'install_script' in app.config:
                cmd = f"curl -sfL {app.config['install_script']} | sh -"
                if 'args' in app.config:
                    cmd += f" -s - {app.config['args']}"
                return cmd
            else:
                return app.config.get('command', '')
    
    def _handle_device_status(self, device_id: str, status: Dict):
        """Handle device status update"""
        if device_id not in self.devices:
            # New device discovered
            self.logger.info(f"New device discovered: {device_id}")
            self.devices[device_id] = {
                'state': DeviceState.UNPROVISIONED,
                'last_seen': time.time()
            }
        
        self.devices[device_id]['last_seen'] = time.time()
        self.devices[device_id]['status'] = status
        
        # Update device state based on status
        if status.get('provisioned', False):
            if self.devices[device_id]['state'] == DeviceState.UNPROVISIONED:
                self.devices[device_id]['state'] = DeviceState.READY
    
    def _handle_device_metrics(self, device_id: str, metrics: Dict):
        """Handle device metrics"""
        if device_id in self.devices:
            self.devices[device_id]['metrics'] = metrics
            self.devices[device_id]['last_metrics'] = time.time()
            
            # Forward to Prometheus pushgateway
            self._push_metrics_to_prometheus(device_id, metrics)
            
            # Check for alerts
            self._check_metric_alerts(device_id, metrics)
    
    def _push_metrics_to_prometheus(self, device_id: str, metrics: Dict):
        """Push metrics to Prometheus pushgateway"""
        try:
            from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
            
            registry = CollectorRegistry()
            
            # Create gauges for each metric
            for metric_name, value in metrics.items():
                if isinstance(value, (int, float)):
                    gauge = Gauge(f'edge_{metric_name}', f'Edge device {metric_name}', 
                                ['device_id'], registry=registry)
                    gauge.labels(device_id=device_id).set(value)
            
            # Push to gateway
            push_to_gateway(
                self.config['monitoring']['prometheus_pushgateway'],
                job='edge_devices',
                registry=registry
            )
            
        except Exception as e:
            self.logger.error(f"Failed to push metrics: {e}")
    
    def _check_metric_alerts(self, device_id: str, metrics: Dict):
        """Check metrics against alert thresholds"""
        alerts = []
        
        # CPU usage alert
        if metrics.get('cpu_usage', 0) > 80:
            alerts.append({
                'severity': 'warning',
                'metric': 'cpu_usage',
                'value': metrics['cpu_usage'],
                'threshold': 80,
                'message': f"High CPU usage on {device_id}"
            })
        
        # Memory usage alert
        if metrics.get('memory_usage', 0) > 85:
            alerts.append({
                'severity': 'warning',
                'metric': 'memory_usage',
                'value': metrics['memory_usage'],
                'threshold': 85,
                'message': f"High memory usage on {device_id}"
            })
        
        # Temperature alert
        if metrics.get('temperature', 0) > 70:
            alerts.append({
                'severity': 'critical',
                'metric': 'temperature',
                'value': metrics['temperature'],
                'threshold': 70,
                'message': f"High temperature on {device_id}"
            })
        
        # Send alerts
        for alert in alerts:
            self._send_alert(device_id, alert)
    
    def _send_alert(self, device_id: str, alert: Dict):
        """Send alert to monitoring system"""
        self.logger.warning(f"Alert for {device_id}: {alert['message']}")
        
        # Send to alerting system (e.g., PagerDuty, Slack)
        # This would integrate with enterprise alerting
    
    def _handle_device_logs(self, device_id: str, logs: Dict):
        """Handle device logs"""
        # Forward to centralized logging
        # This would integrate with Loki, Elasticsearch, etc.
        pass
    
    def _handle_command_response(self, device_id: str, response: Dict):
        """Handle command response from device"""
        command_id = response.get('command_id')
        status = response.get('status')
        
        self.logger.info(f"Command response from {device_id}: {command_id} - {status}")
        
        # Update application status if deployment command
        if 'deploy_application' in command_id:
            app_name = command_id.split('-')[1]
            if device_id in self.devices:
                for app in self.devices[device_id].get('applications', []):
                    if app['name'] == app_name:
                        app['status'] = 'deployed' if status == 'success' else 'failed'
                        break
    
    async def _device_discovery_loop(self):
        """Discover new devices on the network"""
        while True:
            try:
                # Scan for new devices
                # This could use mDNS, DHCP monitoring, or network scanning
                await self._scan_for_devices()
                
            except Exception as e:
                self.logger.error(f"Device discovery error: {e}")
            
            await asyncio.sleep(60)  # Scan every minute
    
    async def _scan_for_devices(self):
        """Scan network for new Raspberry Pi devices"""
        # Use nmap or similar to find devices
        # Look for Raspberry Pi MAC address prefixes
        # B8:27:EB - Raspberry Pi Foundation
        # DC:A6:32 - Raspberry Pi Trading Ltd
        pass
    
    async def _health_check_loop(self):
        """Monitor device health"""
        while True:
            try:
                current_time = time.time()
                
                for device_id, device in self.devices.items():
                    last_seen = device.get('last_seen', 0)
                    
                    # Mark offline if not seen for 5 minutes
                    if (current_time - last_seen) > 300:
                        if device['state'] != DeviceState.OFFLINE:
                            device['state'] = DeviceState.OFFLINE
                            self.logger.warning(f"Device offline: {device_id}")
                            self._send_alert(device_id, {
                                'severity': 'critical',
                                'message': f"Device {device_id} is offline"
                            })
                
            except Exception as e:
                self.logger.error(f"Health check error: {e}")
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def _update_check_loop(self):
        """Check for and apply updates"""
        while True:
            try:
                if self.config['updates']['automatic_updates']:
                    # Check if within update window
                    if self._in_update_window():
                        await self._check_and_apply_updates()
                
            except Exception as e:
                self.logger.error(f"Update check error: {e}")
            
            await asyncio.sleep(3600)  # Check every hour
    
    def _in_update_window(self) -> bool:
        """Check if current time is within update window"""
        window = self.config['updates']['update_window']
        start_hour, end_hour = map(lambda x: int(x.split(':')[0]), window.split('-'))
        
        current_hour = time.localtime().tm_hour
        
        if start_hour < end_hour:
            return start_hour <= current_hour < end_hour
        else:  # Window crosses midnight
            return current_hour >= start_hour or current_hour < end_hour
    
    async def _check_and_apply_updates(self):
        """Check for and apply system updates"""
        # Check update server for new versions
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.config['management']['update_server']}/manifest.json") as response:
                manifest = await response.json()
        
        # Compare versions and apply updates
        # This would implement a sophisticated update mechanism
        # with rollback capabilities
    
    def generate_fleet_report(self) -> Dict[str, Any]:
        """Generate comprehensive fleet status report"""
        report = {
            'timestamp': time.time(),
            'summary': {
                'total_devices': len(self.devices),
                'online': 0,
                'offline': 0,
                'by_state': {},
                'by_role': {}
            },
            'devices': {},
            'applications': {},
            'alerts': []
        }
        
        # Analyze device fleet
        for device_id, device in self.devices.items():
            state = device.get('state', DeviceState.UNKNOWN)
            
            # Count by state
            state_str = state.value if hasattr(state, 'value') else str(state)
            report['summary']['by_state'][state_str] = \
                report['summary']['by_state'].get(state_str, 0) + 1
            
            # Count online/offline
            if state == DeviceState.OFFLINE:
                report['summary']['offline'] += 1
            else:
                report['summary']['online'] += 1
            
            # Count by role
            if 'profile' in device:
                role = device['profile'].role.value
                report['summary']['by_role'][role] = \
                    report['summary']['by_role'].get(role, 0) + 1
            
            # Device details
            report['devices'][device_id] = {
                'state': state_str,
                'last_seen': device.get('last_seen'),
                'metrics': device.get('metrics', {}),
                'applications': device.get('applications', [])
            }
            
            # Application summary
            for app in device.get('applications', []):
                app_name = app['name']
                if app_name not in report['applications']:
                    report['applications'][app_name] = {
                        'total': 0,
                        'deployed': 0,
                        'failed': 0
                    }
                
                report['applications'][app_name]['total'] += 1
                if app.get('status') == 'deployed':
                    report['applications'][app_name]['deployed'] += 1
                elif app.get('status') == 'failed':
                    report['applications'][app_name]['failed'] += 1
        
        return report

# Main execution
async def main():
    """Main execution function"""
    # Initialize orchestrator
    orchestrator = EnterpriseEdgeOrchestrator()
    
    # Example: Provision a fleet of sensor devices
    sensor_devices = []
    
    for i in range(10):
        device_profile = DeviceProfile(
            device_id=f"sensor-{i:03d}",
            hostname=f"edge-sensor-{i:03d}",
            role=DeviceRole.SENSOR,
            model="Pi4",
            serial_number=f"SN{i:010d}",
            network_config=NetworkConfiguration(
                interface="eth0",
                dhcp=True,
                vlan_id=200
            ),
            ssh_keys=[
                "ssh-rsa AAAAB3NzaC1yc2EA... admin@enterprise.com"
            ],
            packages=["python3-smbus", "i2c-tools"],
            services=["telegraf"],
            environment_vars={
                "SENSOR_TYPE": "temperature",
                "SAMPLE_RATE": "10"
            }
        )
        
        sensor_devices.append(device_profile)
    
    # Provision devices in parallel
    provisioning_tasks = [
        orchestrator.provision_device(device)
        for device in sensor_devices
    ]
    
    results = await asyncio.gather(*provisioning_tasks, return_exceptions=True)
    
    # Check results
    successful = sum(1 for r in results if isinstance(r, str))
    failed = sum(1 for r in results if isinstance(r, Exception))
    
    print(f"Provisioning complete: {successful} successful, {failed} failed")
    
    # Generate fleet report
    await asyncio.sleep(10)  # Wait for devices to report in
    report = orchestrator.generate_fleet_report()
    
    print("\nFleet Status Report")
    print("==================")
    print(f"Total Devices: {report['summary']['total_devices']}")
    print(f"Online: {report['summary']['online']}")
    print(f"Offline: {report['summary']['offline']}")
    print(f"By Role: {report['summary']['by_role']}")
    print(f"Applications Deployed: {len(report['applications'])}")

if __name__ == "__main__":
    asyncio.run(main())

Edge Application Deployment Pipeline

CI/CD for Edge Devices

#!/bin/bash
# Enterprise Edge Application Deployment Script

set -euo pipefail

# Configuration
REGISTRY="${REGISTRY:-registry.enterprise.com}"
EDGE_API="${EDGE_API:-https://api.enterprise.com/edge}"
ROLLOUT_STRATEGY="${ROLLOUT_STRATEGY:-canary}"
CANARY_PERCENTAGE="${CANARY_PERCENTAGE:-10}"

# Build and deploy edge application
deploy_edge_application() {
    local app_name="$1"
    local version="$2"
    local target_role="${3:-all}"
    
    echo "🚀 Deploying $app_name version $version to $target_role devices"
    
    # Build multi-arch container images
    build_multiarch_images "$app_name" "$version"
    
    # Push to registry
    push_to_registry "$app_name" "$version"
    
    # Create deployment manifest
    create_deployment_manifest "$app_name" "$version" "$target_role"
    
    # Deploy based on strategy
    case "$ROLLOUT_STRATEGY" in
        "canary")
            deploy_canary "$app_name" "$version" "$target_role"
            ;;
        "blue-green")
            deploy_blue_green "$app_name" "$version" "$target_role"
            ;;
        "rolling")
            deploy_rolling "$app_name" "$version" "$target_role"
            ;;
        *)
            echo "Unknown rollout strategy: $ROLLOUT_STRATEGY"
            exit 1
            ;;
    esac
}

# Build multi-architecture images
build_multiarch_images() {
    local app_name="$1"
    local version="$2"
    
    echo "Building multi-arch images for $app_name:$version"
    
    # Setup Docker buildx
    docker buildx create --use --name edge-builder || true
    
    # Build for ARM64 and ARMv7
    docker buildx build \
        --platform linux/arm64,linux/arm/v7 \
        --tag "$REGISTRY/$app_name:$version" \
        --tag "$REGISTRY/$app_name:latest" \
        --push \
        --file "./Dockerfile" \
        .
    
    echo "✅ Multi-arch build complete"
}

# Push images to registry
push_to_registry() {
    local app_name="$1"
    local version="$2"
    
    echo "Pushing images to registry..."
    
    # Sign images for security
    if command -v cosign &> /dev/null; then
        cosign sign "$REGISTRY/$app_name:$version"
    fi
    
    # Generate SBOM
    if command -v syft &> /dev/null; then
        syft "$REGISTRY/$app_name:$version" -o spdx-json > "sbom-$app_name-$version.json"
    fi
    
    echo "✅ Images pushed and signed"
}

# Create deployment manifest
create_deployment_manifest() {
    local app_name="$1"
    local version="$2"
    local target_role="$3"
    
    cat > "deployment-$app_name-$version.yaml" <<EOF
apiVersion: edge.enterprise.com/v1
kind: EdgeApplication
metadata:
  name: $app_name
  version: $version
spec:
  selector:
    role: $target_role
  deployment:
    type: container
    image: $REGISTRY/$app_name:$version
    resources:
      requests:
        memory: "64Mi"
        cpu: "250m"
      limits:
        memory: "256Mi"
        cpu: "1000m"
    healthCheck:
      httpGet:
        path: /health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
    env:
      - name: EDGE_DEVICE_ID
        valueFrom:
          fieldRef:
            fieldPath: metadata.name
      - name: EDGE_ROLE
        valueFrom:
          fieldRef:
            fieldPath: spec.role
  monitoring:
    enabled: true
    metrics:
      port: 9090
      path: /metrics
  update:
    strategy: $ROLLOUT_STRATEGY
    canary:
      percentage: $CANARY_PERCENTAGE
      duration: "30m"
      metrics:
        - name: error_rate
          threshold: 5
        - name: latency_p95
          threshold: 500
EOF
}

# Deploy using canary strategy
deploy_canary() {
    local app_name="$1"
    local version="$2"
    local target_role="$3"
    
    echo "Starting canary deployment..."
    
    # Get total device count
    total_devices=$(curl -s "$EDGE_API/devices?role=$target_role" | jq '.total')
    canary_devices=$((total_devices * CANARY_PERCENTAGE / 100))
    
    echo "Deploying to $canary_devices canary devices (out of $total_devices)"
    
    # Deploy to canary devices
    curl -X POST "$EDGE_API/deployments" \
        -H "Content-Type: application/json" \
        -d @- <<EOF
{
    "application": "$app_name",
    "version": "$version",
    "strategy": "canary",
    "target": {
        "role": "$target_role",
        "percentage": $CANARY_PERCENTAGE
    }
}
EOF
    
    # Monitor canary deployment
    echo "Monitoring canary deployment..."
    monitor_deployment "$app_name" "$version" "canary"
    
    # Promote or rollback based on metrics
    if check_deployment_health "$app_name" "$version"; then
        echo "✅ Canary deployment successful, promoting to all devices"
        promote_deployment "$app_name" "$version" "$target_role"
    else
        echo "❌ Canary deployment failed, rolling back"
        rollback_deployment "$app_name" "$version" "$target_role"
    fi
}

# Monitor deployment progress
monitor_deployment() {
    local app_name="$1"
    local version="$2"
    local phase="$3"
    
    local timeout=1800  # 30 minutes
    local start_time=$(date +%s)
    
    while true; do
        # Get deployment status
        status=$(curl -s "$EDGE_API/deployments/$app_name/$version/status" | jq -r '.phase')
        
        case "$status" in
            "Progressing")
                echo "⏳ Deployment in progress..."
                ;;
            "Succeeded")
                echo "✅ Deployment succeeded"
                return 0
                ;;
            "Failed")
                echo "❌ Deployment failed"
                return 1
                ;;
        esac
        
        # Check timeout
        current_time=$(date +%s)
        if ((current_time - start_time > timeout)); then
            echo "⏰ Deployment timeout"
            return 1
        fi
        
        # Display metrics
        metrics=$(curl -s "$EDGE_API/deployments/$app_name/$version/metrics")
        echo "Metrics: $(echo "$metrics" | jq -c '.summary')"
        
        sleep 30
    done
}

# Check deployment health
check_deployment_health() {
    local app_name="$1"
    local version="$2"
    
    # Get deployment metrics
    metrics=$(curl -s "$EDGE_API/deployments/$app_name/$version/metrics")
    
    # Check error rate
    error_rate=$(echo "$metrics" | jq -r '.error_rate')
    if (( $(echo "$error_rate > 5" | bc -l) )); then
        echo "High error rate: $error_rate%"
        return 1
    fi
    
    # Check latency
    latency_p95=$(echo "$metrics" | jq -r '.latency_p95')
    if (( $(echo "$latency_p95 > 500" | bc -l) )); then
        echo "High latency: ${latency_p95}ms"
        return 1
    fi
    
    # Check device health
    unhealthy_devices=$(echo "$metrics" | jq -r '.unhealthy_devices')
    if (( unhealthy_devices > 0 )); then
        echo "Unhealthy devices: $unhealthy_devices"
        return 1
    fi
    
    echo "✅ All health checks passed"
    return 0
}

# Promote deployment to all devices
promote_deployment() {
    local app_name="$1"
    local version="$2"
    local target_role="$3"
    
    curl -X POST "$EDGE_API/deployments/$app_name/$version/promote" \
        -H "Content-Type: application/json" \
        -d "{\"target\": \"all\"}"
}

# Rollback deployment
rollback_deployment() {
    local app_name="$1"
    local version="$2"
    local target_role="$3"
    
    curl -X POST "$EDGE_API/deployments/$app_name/$version/rollback" \
        -H "Content-Type: application/json"
}

# Generate deployment report
generate_deployment_report() {
    local app_name="$1"
    local version="$2"
    
    echo "📊 Generating deployment report..."
    
    # Get deployment details
    deployment=$(curl -s "$EDGE_API/deployments/$app_name/$version")
    
    cat > "deployment-report-$app_name-$version.html" <<EOF
<!DOCTYPE html>
<html>
<head>
    <title>Edge Deployment Report - $app_name:$version</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .success { color: green; }
        .failure { color: red; }
        .warning { color: orange; }
        table { border-collapse: collapse; width: 100%; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f2f2f2; }
    </style>
</head>
<body>
    <h1>Edge Deployment Report</h1>
    <h2>Application: $app_name</h2>
    <h3>Version: $version</h3>
    
    <h3>Deployment Summary</h3>
    $(echo "$deployment" | jq -r '.summary' | sed 's/^/<p>/; s/$/<\/p>/')
    
    <h3>Device Status</h3>
    <table>
        <tr>
            <th>Device ID</th>
            <th>Status</th>
            <th>Version</th>
            <th>Last Updated</th>
            <th>Health</th>
        </tr>
        $(echo "$deployment" | jq -r '.devices[] | "<tr><td>\(.id)</td><td>\(.status)</td><td>\(.version)</td><td>\(.updated)</td><td>\(.health)</td></tr>"')
    </table>
    
    <h3>Metrics</h3>
    <pre>$(echo "$deployment" | jq '.metrics')</pre>
    
    <p>Generated: $(date)</p>
</body>
</html>
EOF
    
    echo "Report generated: deployment-report-$app_name-$version.html"
}

# Main execution
main() {
    case "${1:-help}" in
        "deploy")
            deploy_edge_application "$2" "$3" "${4:-all}"
            ;;
        "status")
            curl -s "$EDGE_API/deployments/$2/$3/status" | jq
            ;;
        "rollback")
            rollback_deployment "$2" "$3"
            ;;
        "report")
            generate_deployment_report "$2" "$3"
            ;;
        *)
            echo "Usage: $0 {deploy|status|rollback|report} <app_name> <version> [target_role]"
            exit 1
            ;;
    esac
}

# Execute if run directly
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

Edge Security Framework

Security Hardening for Edge Devices

# Edge Security Configuration
apiVersion: v1
kind: ConfigMap
metadata:
  name: edge-security-config
  namespace: edge-system
data:
  security-policy.yaml: |
    security:
      # OS Hardening
      os_hardening:
        - disable_unnecessary_services:
            - bluetooth
            - avahi-daemon
            - cups
        - kernel_parameters:
            - net.ipv4.conf.all.accept_redirects: 0
            - net.ipv4.conf.all.send_redirects: 0
            - net.ipv4.tcp_syncookies: 1
            - kernel.randomize_va_space: 2
        - file_permissions:
            - /etc/passwd: "0644"
            - /etc/shadow: "0640"
            - /etc/ssh/sshd_config: "0600"
      
      # Network Security
      network_security:
        firewall_rules:
          - allow_ssh:
              port: 22
              source: "10.0.0.0/8"
              protocol: tcp
          - allow_mqtt:
              port: 8883
              protocol: tcp
          - allow_metrics:
              port: 9090
              source: "monitoring.enterprise.com"
              protocol: tcp
          - default_policy: DROP
        
        wireguard_vpn:
          enabled: true
          interface: wg0
          port: 51820
          peers:
            - name: edge-control
              endpoint: vpn.enterprise.com:51820
              allowed_ips: "10.100.0.0/16"
      
      # Application Security
      app_security:
        container_runtime:
          - enable_selinux: true
          - enable_apparmor: true
          - disable_privileged: true
          - enable_user_namespaces: true
        
        image_scanning:
          enabled: true
          severity_threshold: MEDIUM
          scan_on_push: true
      
      # Data Security
      data_security:
        encryption_at_rest:
          enabled: true
          method: LUKS
          key_management: TPM
        
        encryption_in_transit:
          tls_version: "1.3"
          cipher_suites:
            - TLS_AES_256_GCM_SHA384
            - TLS_CHACHA20_POLY1305_SHA256
      
      # Compliance
      compliance:
        frameworks:
          - CIS_Ubuntu_20.04_Benchmark
          - NIST_Cybersecurity_Framework
        
        audit:
          enabled: true
          log_destination: /var/log/audit/
          rules:
            - "-w /etc/passwd -p wa -k passwd_changes"
            - "-w /etc/shadow -p wa -k shadow_changes"
            - "-a exit,always -F arch=b64 -S execve -k commands"
---
apiVersion: batch/v1
kind: Job
metadata:
  name: edge-security-hardening
  namespace: edge-system
spec:
  template:
    spec:
      containers:
      - name: hardening
        image: enterprise/edge-hardening:latest
        command: ["/bin/bash", "-c"]
        args:
        - |
          #!/bin/bash
          set -euo pipefail
          
          echo "Starting edge security hardening..."
          
          # Apply OS hardening
          echo "Applying OS hardening..."
          
          # Disable unnecessary services
          for service in bluetooth avahi-daemon cups; do
              systemctl disable $service || true
              systemctl stop $service || true
          done
          
          # Apply kernel parameters
          cat >> /etc/sysctl.d/99-edge-security.conf <<EOF
          net.ipv4.conf.all.accept_redirects = 0
          net.ipv4.conf.all.send_redirects = 0
          net.ipv4.tcp_syncookies = 1
          kernel.randomize_va_space = 2
          net.ipv4.conf.all.rp_filter = 1
          net.ipv4.conf.default.rp_filter = 1
          EOF
          
          sysctl -p /etc/sysctl.d/99-edge-security.conf
          
          # Configure firewall
          echo "Configuring firewall..."
          ufw --force reset
          ufw default deny incoming
          ufw default allow outgoing
          ufw allow from 10.0.0.0/8 to any port 22 proto tcp
          ufw allow 8883/tcp
          ufw allow from monitoring.enterprise.com to any port 9090 proto tcp
          ufw --force enable
          
          # Setup fail2ban
          echo "Configuring fail2ban..."
          apt-get update && apt-get install -y fail2ban
          
          cat > /etc/fail2ban/jail.local <<EOF
          [DEFAULT]
          bantime = 3600
          findtime = 600
          maxretry = 5
          
          [sshd]
          enabled = true
          port = 22
          filter = sshd
          logpath = /var/log/auth.log
          EOF
          
          systemctl enable fail2ban
          systemctl start fail2ban
          
          # Configure automatic updates
          echo "Configuring automatic security updates..."
          apt-get install -y unattended-upgrades
          
          cat > /etc/apt/apt.conf.d/50unattended-upgrades <<EOF
          Unattended-Upgrade::Allowed-Origins {
              "\${distro_id}:\${distro_codename}-security";
          };
          Unattended-Upgrade::AutoFixInterruptedDpkg "true";
          Unattended-Upgrade::MinimalSteps "true";
          Unattended-Upgrade::Remove-Unused-Dependencies "true";
          Unattended-Upgrade::Automatic-Reboot "true";
          Unattended-Upgrade::Automatic-Reboot-Time "03:00";
          EOF
          
          # Setup audit logging
          echo "Configuring audit logging..."
          apt-get install -y auditd
          
          cat >> /etc/audit/rules.d/edge.rules <<EOF
          -w /etc/passwd -p wa -k passwd_changes
          -w /etc/shadow -p wa -k shadow_changes
          -w /etc/group -p wa -k group_changes
          -w /etc/sudoers -p wa -k sudoers_changes
          -a exit,always -F arch=b64 -S execve -k commands
          -w /var/log/lastlog -p wa -k logins
          EOF
          
          systemctl enable auditd
          systemctl start auditd
          
          echo "✅ Security hardening complete"
      restartPolicy: OnFailure

This comprehensive enterprise Ubuntu on Raspberry Pi guide provides:

Key Implementation Benefits

🎯 Complete Edge Infrastructure

  • Zero-touch provisioning with cloud-init automation
  • Multi-architecture support for various Pi models
  • Fleet management for thousands of edge devices
  • GitOps-based deployment pipelines

📊 Advanced Management Features

  • Centralized monitoring with Prometheus and Grafana
  • Remote device management via MQTT and APIs
  • Automated health checks and self-healing
  • OTA updates with rollback capabilities

🚨 Enterprise Security

  • Hardware-based encryption with TPM support
  • Network isolation with WireGuard VPN
  • Automated security patching and compliance
  • Audit logging and intrusion detection

🔧 Production Scalability

  • Container orchestration with K3s/MicroK8s
  • Multi-region deployment support
  • Edge-to-cloud integration with major providers
  • 99.9%+ uptime through redundancy and failover

This edge computing framework enables organizations to deploy and manage 10,000+ Raspberry Pi devices, maintain secure and reliable operations at the edge, achieve sub-second response times for local processing, and seamlessly integrate edge computing with enterprise cloud infrastructure.