Enterprise ISO Management & Automation Guide 2025: Mass Deployment, Virtual Media & Network Boot
Enterprise ISO Management & Automation Guide 2025: Mass Deployment, Virtual Media & Network Boot
Managing ISO files in enterprise environments requires sophisticated automation, security, and scalability that goes far beyond basic mounting commands. This comprehensive guide transforms simple ISO operations into enterprise-grade deployment systems with automated provisioning, virtual media management, and network boot infrastructure.
Table of Contents
- ISO Management Architecture
- Enterprise ISO Repository
- Automated ISO Deployment
- Virtual Media Management
- Network Boot Infrastructure
- Mass Installation Automation
- Security and Compliance
- Performance Optimization
- Integration with Orchestration
- Advanced Troubleshooting
- Monitoring and Analytics
- Best Practices and Guidelines
ISO Management Architecture
Enterprise ISO Management Framework
Build a comprehensive ISO management system:
#!/usr/bin/env python3
"""
Enterprise ISO Management Framework
Comprehensive ISO lifecycle management system
"""
import os
import hashlib
import json
import sqlite3
import asyncio
import aiofiles
import aiohttp
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
import yaml
from pathlib import Path
import subprocess
class ISORepository:
"""Enterprise ISO repository with lifecycle management"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.base_path = Path(self.config['repository']['base_path'])
self.db_path = self.base_path / 'iso_repository.db'
self.setup_repository()
def setup_repository(self):
"""Initialize repository structure and database"""
# Create directory structure
self.base_path.mkdir(parents=True, exist_ok=True)
# Initialize database
self.init_database()
# Create category directories
categories = ['operating_systems', 'applications', 'utilities', 'custom']
for category in categories:
(self.base_path / category).mkdir(exist_ok=True)
def init_database(self):
"""Initialize SQLite database for ISO metadata"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS iso_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
category TEXT NOT NULL,
version TEXT,
architecture TEXT,
size INTEGER,
checksum_sha256 TEXT,
checksum_md5 TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
access_count INTEGER DEFAULT 0,
last_accessed TIMESTAMP,
metadata JSON,
tags TEXT,
status TEXT DEFAULT 'active'
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS mount_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
iso_id INTEGER,
mount_point TEXT,
user_id TEXT,
hostname TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ended_at TIMESTAMP,
status TEXT DEFAULT 'active',
purpose TEXT,
FOREIGN KEY (iso_id) REFERENCES iso_files (id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS deployment_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
iso_id INTEGER,
target_system TEXT,
deployment_type TEXT,
status TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
automation_config JSON,
FOREIGN KEY (iso_id) REFERENCES iso_files (id)
)
''')
conn.commit()
conn.close()
async def add_iso(self, file_path: str, category: str, metadata: Dict = None) -> str:
"""Add ISO file to repository"""
file_path = Path(file_path)
# Calculate checksums
sha256_hash, md5_hash = await self._calculate_checksums(file_path)
# Extract metadata
iso_metadata = await self._extract_iso_metadata(file_path)
if metadata:
iso_metadata.update(metadata)
# Generate unique filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
new_filename = f"{timestamp}_{file_path.name}"
# Move file to repository
target_path = self.base_path / category / new_filename
await self._copy_file(file_path, target_path)
# Store in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO iso_files (filename, category, version, architecture,
size, checksum_sha256, checksum_md5, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
new_filename,
category,
iso_metadata.get('version'),
iso_metadata.get('architecture'),
file_path.stat().st_size,
sha256_hash,
md5_hash,
json.dumps(iso_metadata)
))
iso_id = cursor.lastrowid
conn.commit()
conn.close()
self.logger.info(f"Added ISO {new_filename} to repository (ID: {iso_id})")
return iso_id
async def _calculate_checksums(self, file_path: Path) -> Tuple[str, str]:
"""Calculate SHA256 and MD5 checksums"""
sha256_hash = hashlib.sha256()
md5_hash = hashlib.md5()
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192):
sha256_hash.update(chunk)
md5_hash.update(chunk)
return sha256_hash.hexdigest(), md5_hash.hexdigest()
async def _extract_iso_metadata(self, file_path: Path) -> Dict:
"""Extract metadata from ISO file"""
metadata = {}
try:
# Use isoinfo to extract metadata
result = subprocess.run(
['isoinfo', '-d', '-i', str(file_path)],
capture_output=True,
text=True
)
if result.returncode == 0:
# Parse isoinfo output
for line in result.stdout.splitlines():
if ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower().replace(' ', '_')
metadata[key] = value.strip()
# Try to extract boot information
boot_info = subprocess.run(
['isoinfo', '-l', '-i', str(file_path)],
capture_output=True,
text=True
)
if boot_info.returncode == 0:
# Look for boot files
if 'ISOLINUX' in boot_info.stdout:
metadata['bootable'] = True
metadata['boot_type'] = 'isolinux'
elif 'GRUB' in boot_info.stdout:
metadata['bootable'] = True
metadata['boot_type'] = 'grub'
except Exception as e:
self.logger.warning(f"Failed to extract metadata from {file_path}: {e}")
return metadata
async def mount_iso(self, iso_id: str, mount_point: str, user_id: str,
hostname: str, purpose: str = None) -> Dict:
"""Mount ISO file with tracking"""
# Get ISO information
iso_info = await self.get_iso_info(iso_id)
if not iso_info:
raise ValueError(f"ISO {iso_id} not found")
iso_path = self.base_path / iso_info['category'] / iso_info['filename']
# Verify checksum
if not await self._verify_checksum(iso_path, iso_info['checksum_sha256']):
raise ValueError("ISO checksum verification failed")
# Create mount point
mount_path = Path(mount_point)
mount_path.mkdir(parents=True, exist_ok=True)
# Mount the ISO
try:
subprocess.run([
'sudo', 'mount', '-t', 'iso9660', '-o', 'loop,ro',
str(iso_path), str(mount_path)
], check=True)
# Record mount session
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO mount_sessions (iso_id, mount_point, user_id, hostname, purpose)
VALUES (?, ?, ?, ?, ?)
''', (iso_id, str(mount_path), user_id, hostname, purpose))
session_id = cursor.lastrowid
# Update access count
cursor.execute('''
UPDATE iso_files
SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP
WHERE id = ?
''', (iso_id,))
conn.commit()
conn.close()
return {
'session_id': session_id,
'mount_point': str(mount_path),
'iso_path': str(iso_path),
'status': 'mounted'
}
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to mount ISO: {e}")
class ISOAutomationEngine:
"""Automation engine for ISO deployment"""
def __init__(self, repository: ISORepository):
self.repository = repository
self.automation_templates = self._load_templates()
def _load_templates(self) -> Dict:
"""Load automation templates"""
return {
'ubuntu_server': {
'preseed_template': 'ubuntu-server-preseed.cfg',
'kickstart_template': None,
'cloud_init_template': 'ubuntu-cloud-init.yaml',
'supported_versions': ['20.04', '22.04', '24.04']
},
'centos_rhel': {
'preseed_template': None,
'kickstart_template': 'centos-kickstart.cfg',
'cloud_init_template': 'centos-cloud-init.yaml',
'supported_versions': ['7', '8', '9']
},
'windows_server': {
'unattend_template': 'windows-unattend.xml',
'supported_versions': ['2019', '2022']
}
}
async def create_custom_iso(self, base_iso_id: str, customizations: Dict) -> str:
"""Create customized ISO with automation"""
base_iso = await self.repository.get_iso_info(base_iso_id)
# Create temporary workspace
workspace = Path('/tmp/iso_customization') / f"custom_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
workspace.mkdir(parents=True, exist_ok=True)
try:
# Extract base ISO
extract_path = workspace / 'extracted'
await self._extract_iso(base_iso['full_path'], extract_path)
# Apply customizations
await self._apply_customizations(extract_path, customizations)
# Create new ISO
custom_iso_path = workspace / 'custom.iso'
await self._create_iso(extract_path, custom_iso_path)
# Add to repository
custom_iso_id = await self.repository.add_iso(
str(custom_iso_path),
'custom',
{
'base_iso_id': base_iso_id,
'customizations': customizations,
'created_by': 'automation_engine'
}
)
return custom_iso_id
finally:
# Cleanup workspace
subprocess.run(['sudo', 'rm', '-rf', str(workspace)])
async def _apply_customizations(self, extract_path: Path, customizations: Dict):
"""Apply customizations to extracted ISO"""
# Apply preseed/kickstart configuration
if 'preseed' in customizations:
await self._apply_preseed(extract_path, customizations['preseed'])
if 'kickstart' in customizations:
await self._apply_kickstart(extract_path, customizations['kickstart'])
# Add additional packages
if 'additional_packages' in customizations:
await self._add_packages(extract_path, customizations['additional_packages'])
# Apply scripts
if 'scripts' in customizations:
await self._add_scripts(extract_path, customizations['scripts'])
# Modify isolinux/grub configuration
if 'boot_config' in customizations:
await self._modify_boot_config(extract_path, customizations['boot_config'])
# Advanced ISO mounting with enterprise features
class EnterpriseISOManager:
"""Enterprise-grade ISO management with advanced features"""
def __init__(self, config: Dict):
self.config = config
self.active_mounts = {}
self.mount_policies = config.get('mount_policies', {})
async def secure_mount(self, iso_path: str, mount_point: str,
user_context: Dict) -> Dict:
"""Secure ISO mounting with user context and policies"""
# Validate user permissions
if not await self._check_mount_permissions(user_context):
raise PermissionError("User not authorized to mount ISOs")
# Check mount policies
policy_result = await self._check_mount_policies(iso_path, user_context)
if not policy_result['allowed']:
raise PolicyError(f"Mount denied by policy: {policy_result['reason']}")
# Generate secure mount point
secure_mount_point = await self._generate_secure_mount_point(
mount_point, user_context
)
# Mount with security options
mount_options = self._build_mount_options(user_context)
try:
# Execute mount command
cmd = [
'sudo', 'mount',
'-t', 'iso9660',
'-o', mount_options,
iso_path,
secure_mount_point
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Track mount session
session_info = {
'iso_path': iso_path,
'mount_point': secure_mount_point,
'user': user_context['username'],
'mounted_at': datetime.now(),
'options': mount_options,
'process_id': os.getpid()
}
self.active_mounts[secure_mount_point] = session_info
# Schedule automatic cleanup
asyncio.create_task(self._schedule_cleanup(secure_mount_point))
return {
'mount_point': secure_mount_point,
'session_id': secure_mount_point,
'status': 'mounted',
'expires_at': datetime.now() + timedelta(hours=2)
}
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Mount failed: {e.stderr}")
def _build_mount_options(self, user_context: Dict) -> str:
"""Build mount options based on user context"""
options = ['loop', 'ro', 'noexec', 'nosuid', 'nodev']
# Add user-specific options
if user_context.get('uid'):
options.append(f"uid={user_context['uid']}")
if user_context.get('gid'):
options.append(f"gid={user_context['gid']}")
# Security options
if self.config.get('security', {}).get('strict_mode'):
options.extend(['noatime', 'nodiratime'])
return ','.join(options)
async def _schedule_cleanup(self, mount_point: str):
"""Schedule automatic cleanup of mount"""
# Wait for timeout
await asyncio.sleep(self.config.get('mount_timeout', 7200)) # 2 hours
# Check if still mounted
if mount_point in self.active_mounts:
await self.unmount(mount_point)
Enterprise ISO Repository
Centralized ISO Repository with Version Control
Implement a centralized repository system:
#!/usr/bin/env python3
"""
Enterprise ISO Repository with Version Control
Centralized management with versioning and distribution
"""
import asyncio
import aiofiles
import asyncssh
from typing import Dict, List, Optional
import json
import yaml
from pathlib import Path
import hashlib
import sqlite3
from datetime import datetime
import logging
class VersionedISORepository:
"""ISO repository with version control capabilities"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.repo_path = Path(self.config['repository_path'])
self.metadata_db = self.repo_path / 'metadata.db'
self.sync_nodes = self.config.get('sync_nodes', [])
self.setup_repository()
def setup_repository(self):
"""Initialize repository structure"""
# Create directory structure
for subdir in ['os', 'applications', 'utilities', 'custom', 'archive']:
(self.repo_path / subdir).mkdir(parents=True, exist_ok=True)
# Initialize metadata database
self.init_metadata_db()
def init_metadata_db(self):
"""Initialize metadata database"""
conn = sqlite3.connect(self.metadata_db)
conn.execute('''
CREATE TABLE IF NOT EXISTS iso_versions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
version TEXT NOT NULL,
category TEXT NOT NULL,
filename TEXT NOT NULL,
size INTEGER,
checksum_sha256 TEXT,
checksum_md5 TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by TEXT,
description TEXT,
changelog TEXT,
tags TEXT,
parent_version_id INTEGER,
status TEXT DEFAULT 'active',
FOREIGN KEY (parent_version_id) REFERENCES iso_versions (id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS distribution_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
iso_version_id INTEGER,
target_node TEXT,
sync_started TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sync_completed TIMESTAMP,
status TEXT,
error_message TEXT,
FOREIGN KEY (iso_version_id) REFERENCES iso_versions (id)
)
''')
conn.commit()
conn.close()
async def add_version(self, iso_path: str, name: str, version: str,
category: str, metadata: Dict = None) -> int:
"""Add new version to repository"""
iso_path = Path(iso_path)
# Calculate checksums
sha256_hash = await self._calculate_checksum(iso_path, 'sha256')
md5_hash = await self._calculate_checksum(iso_path, 'md5')
# Generate filename with version
filename = f"{name}_{version}_{datetime.now().strftime('%Y%m%d')}.iso"
target_path = self.repo_path / category / filename
# Copy file to repository
await self._copy_file(iso_path, target_path)
# Store metadata
conn = sqlite3.connect(self.metadata_db)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO iso_versions (name, version, category, filename, size,
checksum_sha256, checksum_md5, created_by,
description, changelog, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
name, version, category, filename, iso_path.stat().st_size,
sha256_hash, md5_hash, metadata.get('created_by'),
metadata.get('description'), metadata.get('changelog'),
json.dumps(metadata.get('tags', []))
))
version_id = cursor.lastrowid
conn.commit()
conn.close()
# Trigger synchronization
await self._sync_to_nodes(version_id)
return version_id
async def get_versions(self, name: str = None, category: str = None) -> List[Dict]:
"""Get available versions"""
conn = sqlite3.connect(self.metadata_db)
cursor = conn.cursor()
query = "SELECT * FROM iso_versions WHERE status = 'active'"
params = []
if name:
query += " AND name = ?"
params.append(name)
if category:
query += " AND category = ?"
params.append(category)
query += " ORDER BY created_at DESC"
cursor.execute(query, params)
columns = [desc[0] for desc in cursor.description]
versions = []
for row in cursor.fetchall():
version_dict = dict(zip(columns, row))
version_dict['tags'] = json.loads(version_dict['tags'] or '[]')
versions.append(version_dict)
conn.close()
return versions
async def _sync_to_nodes(self, version_id: int):
"""Synchronize ISO version to remote nodes"""
version_info = await self.get_version_info(version_id)
tasks = []
for node in self.sync_nodes:
task = self._sync_to_node(version_id, version_info, node)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def _sync_to_node(self, version_id: int, version_info: Dict, node: Dict):
"""Sync to single node"""
try:
# Log sync start
await self._log_sync_start(version_id, node['hostname'])
# Upload file via SSH
async with asyncssh.connect(
node['hostname'],
username=node['username'],
known_hosts=None
) as conn:
# Create remote directory
await conn.run(f"mkdir -p {node['remote_path']}/{version_info['category']}")
# Upload file
local_path = self.repo_path / version_info['category'] / version_info['filename']
remote_path = f"{node['remote_path']}/{version_info['category']}/{version_info['filename']}"
await asyncssh.scp(str(local_path), (conn, remote_path))
# Verify checksum
result = await conn.run(f"sha256sum {remote_path}")
remote_checksum = result.stdout.split()[0]
if remote_checksum != version_info['checksum_sha256']:
raise ValueError("Checksum mismatch after sync")
# Log sync completion
await self._log_sync_completion(version_id, node['hostname'], 'success')
except Exception as e:
await self._log_sync_completion(version_id, node['hostname'], 'failed', str(e))
raise
class ISODistributionManager:
"""Manage ISO distribution across network"""
def __init__(self, repository: VersionedISORepository):
self.repository = repository
self.distribution_cache = {}
async def distribute_iso(self, iso_id: str, targets: List[str],
method: str = 'rsync') -> Dict:
"""Distribute ISO to multiple targets"""
results = {
'iso_id': iso_id,
'total_targets': len(targets),
'successful': 0,
'failed': 0,
'details': []
}
# Get ISO information
iso_info = await self.repository.get_version_info(iso_id)
# Distribute to each target
tasks = []
for target in targets:
if method == 'rsync':
task = self._distribute_rsync(iso_info, target)
elif method == 'torrent':
task = self._distribute_torrent(iso_info, target)
else:
task = self._distribute_http(iso_info, target)
tasks.append(task)
# Execute distributions
distribution_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(distribution_results):
if isinstance(result, Exception):
results['failed'] += 1
results['details'].append({
'target': targets[i],
'status': 'failed',
'error': str(result)
})
else:
results['successful'] += 1
results['details'].append({
'target': targets[i],
'status': 'success',
'details': result
})
return results
async def _distribute_rsync(self, iso_info: Dict, target: str) -> Dict:
"""Distribute using rsync"""
local_path = self.repository.repo_path / iso_info['category'] / iso_info['filename']
# Build rsync command
cmd = [
'rsync', '-avz', '--progress',
str(local_path),
f"{target}:/var/iso_cache/"
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return {
'method': 'rsync',
'bytes_transferred': iso_info['size'],
'duration': 'calculated_from_output'
}
else:
raise RuntimeError(f"rsync failed: {stderr.decode()}")
class WebISORepository:
"""Web-based ISO repository interface"""
def __init__(self, repository: VersionedISORepository):
self.repository = repository
self.app = self._create_web_app()
def _create_web_app(self):
"""Create web application for repository access"""
from flask import Flask, request, jsonify, send_file
app = Flask(__name__)
@app.route('/api/isos', methods=['GET'])
async def list_isos():
"""List available ISOs"""
category = request.args.get('category')
name = request.args.get('name')
versions = await self.repository.get_versions(name, category)
return jsonify(versions)
@app.route('/api/isos/<int:iso_id>', methods=['GET'])
async def get_iso_info(iso_id):
"""Get ISO information"""
info = await self.repository.get_version_info(iso_id)
return jsonify(info)
@app.route('/api/isos/<int:iso_id>/download', methods=['GET'])
async def download_iso(iso_id):
"""Download ISO file"""
info = await self.repository.get_version_info(iso_id)
iso_path = self.repository.repo_path / info['category'] / info['filename']
return send_file(
str(iso_path),
as_attachment=True,
download_name=info['filename']
)
@app.route('/api/isos', methods=['POST'])
async def upload_iso():
"""Upload new ISO"""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
metadata = request.form.get('metadata', '{}')
metadata = json.loads(metadata)
# Save temporary file
temp_path = f"/tmp/{file.filename}"
file.save(temp_path)
try:
# Add to repository
version_id = await self.repository.add_version(
temp_path,
metadata['name'],
metadata['version'],
metadata['category'],
metadata
)
return jsonify({
'version_id': version_id,
'status': 'uploaded'
})
finally:
os.unlink(temp_path)
return app
def run(self, host='0.0.0.0', port=5000):
"""Run web server"""
self.app.run(host=host, port=port, debug=False)
Automated ISO Deployment
Mass Deployment Automation
Implement automated deployment for multiple systems:
#!/usr/bin/env python3
"""
Automated ISO Deployment System
Mass deployment with orchestration and monitoring
"""
import asyncio
import json
import yaml
from typing import Dict, List, Optional
import logging
from datetime import datetime
import concurrent.futures
import subprocess
import paramiko
import requests
class DeploymentOrchestrator:
"""Orchestrate ISO deployments across multiple systems"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.deployment_queue = asyncio.Queue()
self.active_deployments = {}
self.deployment_templates = self._load_templates()
def _load_templates(self) -> Dict:
"""Load deployment templates"""
return {
'ubuntu_server': {
'boot_method': 'pxe',
'automation': 'preseed',
'template': 'ubuntu-server-template.cfg',
'post_install': ['configure_ssh', 'install_monitoring']
},
'centos_server': {
'boot_method': 'pxe',
'automation': 'kickstart',
'template': 'centos-server-template.cfg',
'post_install': ['configure_ssh', 'install_monitoring']
},
'windows_server': {
'boot_method': 'virtual_media',
'automation': 'unattend',
'template': 'windows-server-template.xml',
'post_install': ['configure_winrm', 'install_monitoring']
}
}
async def deploy_batch(self, deployments: List[Dict]) -> Dict:
"""Deploy ISO to multiple systems"""
batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
batch_result = {
'batch_id': batch_id,
'total_deployments': len(deployments),
'successful': 0,
'failed': 0,
'in_progress': 0,
'deployments': []
}
# Queue all deployments
for deployment in deployments:
deployment['batch_id'] = batch_id
await self.deployment_queue.put(deployment)
# Process deployments
tasks = []
for _ in range(min(len(deployments), self.config['max_concurrent_deployments'])):
task = asyncio.create_task(self._deployment_worker())
tasks.append(task)
# Wait for all deployments to complete
await asyncio.gather(*tasks)
# Collect results
for deployment_id, result in self.active_deployments.items():
if result['batch_id'] == batch_id:
batch_result['deployments'].append(result)
if result['status'] == 'completed':
batch_result['successful'] += 1
elif result['status'] == 'failed':
batch_result['failed'] += 1
else:
batch_result['in_progress'] += 1
return batch_result
async def _deployment_worker(self):
"""Worker to process deployment queue"""
while True:
try:
deployment = await asyncio.wait_for(
self.deployment_queue.get(),
timeout=10
)
await self._execute_deployment(deployment)
self.deployment_queue.task_done()
except asyncio.TimeoutError:
break
except Exception as e:
self.logger.error(f"Deployment worker error: {e}")
async def _execute_deployment(self, deployment: Dict):
"""Execute single deployment"""
deployment_id = deployment['id']
# Initialize deployment tracking
self.active_deployments[deployment_id] = {
'id': deployment_id,
'batch_id': deployment.get('batch_id'),
'target': deployment['target'],
'status': 'initializing',
'started_at': datetime.now(),
'steps': []
}
try:
# Get deployment template
template = self.deployment_templates.get(deployment['template'])
if not template:
raise ValueError(f"Unknown template: {deployment['template']}")
# Execute deployment steps
if template['boot_method'] == 'pxe':
await self._deploy_pxe(deployment_id, deployment, template)
elif template['boot_method'] == 'virtual_media':
await self._deploy_virtual_media(deployment_id, deployment, template)
else:
raise ValueError(f"Unknown boot method: {template['boot_method']}")
# Mark as completed
self.active_deployments[deployment_id]['status'] = 'completed'
self.active_deployments[deployment_id]['completed_at'] = datetime.now()
except Exception as e:
self.active_deployments[deployment_id]['status'] = 'failed'
self.active_deployments[deployment_id]['error'] = str(e)
self.active_deployments[deployment_id]['failed_at'] = datetime.now()
async def _deploy_pxe(self, deployment_id: str, deployment: Dict, template: Dict):
"""Deploy using PXE boot"""
# Update status
self.active_deployments[deployment_id]['status'] = 'configuring_pxe'
# Configure PXE for target
await self._configure_pxe_target(deployment['target'], deployment, template)
# Power on target system
await self._power_on_system(deployment['target'])
# Monitor installation
await self._monitor_installation(deployment_id, deployment['target'])
# Execute post-install steps
await self._execute_post_install(deployment_id, deployment, template)
async def _configure_pxe_target(self, target: str, deployment: Dict, template: Dict):
"""Configure PXE boot for target system"""
# Get target MAC address
mac_address = await self._get_target_mac(target)
# Generate automation config
automation_config = await self._generate_automation_config(deployment, template)
# Create PXE configuration
pxe_config = f"""
DEFAULT install
LABEL install
KERNEL {deployment['kernel_path']}
APPEND initrd={deployment['initrd_path']} {deployment['kernel_args']}
IPAPPEND 2
"""
# Write PXE config file
pxe_config_path = f"/var/lib/tftpboot/pxelinux.cfg/01-{mac_address.replace(':', '-')}"
with open(pxe_config_path, 'w') as f:
f.write(pxe_config)
# Copy automation config to web server
web_path = f"/var/www/html/automation/{target}.cfg"
with open(web_path, 'w') as f:
f.write(automation_config)
class VirtualMediaManager:
"""Manage virtual media for remote deployments"""
def __init__(self, config: Dict):
self.config = config
self.active_sessions = {}
async def mount_virtual_media(self, target: str, iso_id: str,
credentials: Dict) -> str:
"""Mount ISO as virtual media"""
session_id = f"vm_{target}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get target management interface
mgmt_info = await self._get_management_info(target)
# Mount virtual media based on interface type
if mgmt_info['type'] == 'idrac':
await self._mount_idrac_virtual_media(target, iso_id, credentials)
elif mgmt_info['type'] == 'ilo':
await self._mount_ilo_virtual_media(target, iso_id, credentials)
elif mgmt_info['type'] == 'ipmi':
await self._mount_ipmi_virtual_media(target, iso_id, credentials)
else:
raise ValueError(f"Unsupported management interface: {mgmt_info['type']}")
# Track session
self.active_sessions[session_id] = {
'target': target,
'iso_id': iso_id,
'mounted_at': datetime.now(),
'mgmt_type': mgmt_info['type']
}
return session_id
async def _mount_idrac_virtual_media(self, target: str, iso_id: str, credentials: Dict):
"""Mount virtual media via iDRAC"""
# Get ISO HTTP URL
iso_url = f"http://{self.config['iso_server']}/isos/{iso_id}/download"
# Connect to iDRAC
session = requests.Session()
session.auth = (credentials['username'], credentials['password'])
# Get iDRAC session
response = session.post(f"https://{target}/redfish/v1/SessionService/Sessions")
if response.status_code != 201:
raise RuntimeError(f"Failed to create iDRAC session: {response.status_code}")
# Mount virtual media
vm_data = {
"Image": iso_url,
"UserName": credentials.get('iso_username'),
"Password": credentials.get('iso_password'),
"WriteProtected": True
}
response = session.post(
f"https://{target}/redfish/v1/Managers/iDRAC.Embedded.1/VirtualMedia/CD/Actions/VirtualMedia.InsertMedia",
json=vm_data
)
if response.status_code not in [200, 204]:
raise RuntimeError(f"Failed to mount virtual media: {response.status_code}")
async def unmount_virtual_media(self, session_id: str):
"""Unmount virtual media"""
if session_id not in self.active_sessions:
raise ValueError(f"Session {session_id} not found")
session = self.active_sessions[session_id]
# Unmount based on management type
if session['mgmt_type'] == 'idrac':
await self._unmount_idrac_virtual_media(session['target'])
elif session['mgmt_type'] == 'ilo':
await self._unmount_ilo_virtual_media(session['target'])
elif session['mgmt_type'] == 'ipmi':
await self._unmount_ipmi_virtual_media(session['target'])
# Remove from tracking
del self.active_sessions[session_id]
class InstallationMonitor:
"""Monitor installation progress"""
def __init__(self):
self.monitoring_sessions = {}
async def monitor_installation(self, deployment_id: str, target: str,
method: str = 'serial') -> Dict:
"""Monitor installation progress"""
session = {
'deployment_id': deployment_id,
'target': target,
'method': method,
'started_at': datetime.now(),
'progress': 0,
'status': 'monitoring',
'logs': []
}
self.monitoring_sessions[deployment_id] = session
try:
if method == 'serial':
await self._monitor_serial_console(deployment_id, target)
elif method == 'network':
await self._monitor_network_progress(deployment_id, target)
elif method == 'agent':
await self._monitor_agent_progress(deployment_id, target)
session['status'] = 'completed'
session['completed_at'] = datetime.now()
except Exception as e:
session['status'] = 'failed'
session['error'] = str(e)
session['failed_at'] = datetime.now()
return session
async def _monitor_serial_console(self, deployment_id: str, target: str):
"""Monitor via serial console"""
# Connect to serial console (implementation depends on setup)
# This is a placeholder for actual serial console monitoring
progress_indicators = [
"Partitioning disk",
"Installing base system",
"Configuring packages",
"Installing bootloader",
"Installation complete"
]
current_step = 0
while current_step < len(progress_indicators):
# Read console output
# Check for progress indicators
# Update progress
await asyncio.sleep(30) # Check every 30 seconds
async def _monitor_network_progress(self, deployment_id: str, target: str):
"""Monitor via network callbacks"""
# Wait for network-based progress updates
timeout = 3600 # 1 hour timeout
start_time = datetime.now()
while (datetime.now() - start_time).seconds < timeout:
# Check for progress updates from installation
# This could be HTTP callbacks or file system checks
await asyncio.sleep(60) # Check every minute
Virtual Media Management
Enterprise Virtual Media System
Implement comprehensive virtual media management:
#!/bin/bash
# Enterprise Virtual Media Management Script
set -euo pipefail
# Configuration
CONFIG_FILE="/etc/virtualmedia/config.yaml"
LOG_FILE="/var/log/virtualmedia.log"
ACTIVE_SESSIONS_FILE="/var/lib/virtualmedia/active_sessions.json"
# Load configuration
if [ -f "$CONFIG_FILE" ]; then
# Parse YAML config (simplified)
ISO_SERVER=$(grep 'iso_server:' "$CONFIG_FILE" | awk '{print $2}')
DEFAULT_USERNAME=$(grep 'default_username:' "$CONFIG_FILE" | awk '{print $2}')
SESSION_TIMEOUT=$(grep 'session_timeout:' "$CONFIG_FILE" | awk '{print $2}')
fi
# Function to log messages
log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# Function to mount virtual media on iDRAC
mount_idrac_virtual_media() {
local target_ip=$1
local iso_url=$2
local username=$3
local password=$4
log "Mounting virtual media on iDRAC: $target_ip"
# Create session
local session_response=$(curl -s -k -X POST \
-H "Content-Type: application/json" \
-d "{\"UserName\": \"$username\", \"Password\": \"$password\"}" \
"https://$target_ip/redfish/v1/SessionService/Sessions")
if [ $? -ne 0 ]; then
log "ERROR: Failed to create iDRAC session"
return 1
fi
# Extract session token
local session_token=$(echo "$session_response" | jq -r '.Id // empty')
local auth_token=$(echo "$session_response" | jq -r '.Token // empty')
if [ -z "$session_token" ] || [ -z "$auth_token" ]; then
log "ERROR: Failed to extract session credentials"
return 1
fi
# Mount virtual media
local mount_response=$(curl -s -k -X POST \
-H "Content-Type: application/json" \
-H "X-Auth-Token: $auth_token" \
-d "{\"Image\": \"$iso_url\", \"WriteProtected\": true}" \
"https://$target_ip/redfish/v1/Managers/iDRAC.Embedded.1/VirtualMedia/CD/Actions/VirtualMedia.InsertMedia")
if [ $? -eq 0 ]; then
log "Virtual media mounted successfully"
# Store session info
local session_id="vm_${target_ip}_$(date +%s)"
echo "{\"session_id\": \"$session_id\", \"target\": \"$target_ip\", \"auth_token\": \"$auth_token\", \"mounted_at\": \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"}" >> "$ACTIVE_SESSIONS_FILE"
echo "$session_id"
return 0
else
log "ERROR: Failed to mount virtual media"
return 1
fi
}
# Function to mount virtual media on HP iLO
mount_ilo_virtual_media() {
local target_ip=$1
local iso_url=$2
local username=$3
local password=$4
log "Mounting virtual media on iLO: $target_ip"
# Create session
local session_response=$(curl -s -k -X POST \
-H "Content-Type: application/json" \
-d "{\"UserName\": \"$username\", \"Password\": \"$password\"}" \
"https://$target_ip/redfish/v1/SessionService/Sessions")
if [ $? -ne 0 ]; then
log "ERROR: Failed to create iLO session"
return 1
fi
# Extract session token
local auth_token=$(echo "$session_response" | grep -i 'x-auth-token' | awk '{print $2}')
# Mount virtual media
local mount_response=$(curl -s -k -X POST \
-H "Content-Type: application/json" \
-H "X-Auth-Token: $auth_token" \
-d "{\"Image\": \"$iso_url\"}" \
"https://$target_ip/redfish/v1/Managers/1/VirtualMedia/2/Actions/VirtualMedia.InsertMedia")
if [ $? -eq 0 ]; then
log "Virtual media mounted successfully on iLO"
# Store session info
local session_id="vm_${target_ip}_$(date +%s)"
echo "{\"session_id\": \"$session_id\", \"target\": \"$target_ip\", \"auth_token\": \"$auth_token\", \"mounted_at\": \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"}" >> "$ACTIVE_SESSIONS_FILE"
echo "$session_id"
return 0
else
log "ERROR: Failed to mount virtual media on iLO"
return 1
fi
}
# Function to unmount virtual media
unmount_virtual_media() {
local session_id=$1
log "Unmounting virtual media session: $session_id"
# Find session info
local session_info=$(grep "$session_id" "$ACTIVE_SESSIONS_FILE" 2>/dev/null || echo "")
if [ -z "$session_info" ]; then
log "ERROR: Session not found: $session_id"
return 1
fi
# Parse session info
local target_ip=$(echo "$session_info" | jq -r '.target')
local auth_token=$(echo "$session_info" | jq -r '.auth_token')
# Unmount (try both iDRAC and iLO endpoints)
curl -s -k -X POST \
-H "Content-Type: application/json" \
-H "X-Auth-Token: $auth_token" \
"https://$target_ip/redfish/v1/Managers/iDRAC.Embedded.1/VirtualMedia/CD/Actions/VirtualMedia.EjectMedia" &>/dev/null
curl -s -k -X POST \
-H "Content-Type: application/json" \
-H "X-Auth-Token: $auth_token" \
"https://$target_ip/redfish/v1/Managers/1/VirtualMedia/2/Actions/VirtualMedia.EjectMedia" &>/dev/null
# Remove from active sessions
grep -v "$session_id" "$ACTIVE_SESSIONS_FILE" > "$ACTIVE_SESSIONS_FILE.tmp" 2>/dev/null || true
mv "$ACTIVE_SESSIONS_FILE.tmp" "$ACTIVE_SESSIONS_FILE" 2>/dev/null || true
log "Virtual media unmounted: $session_id"
}
# Function to cleanup expired sessions
cleanup_expired_sessions() {
log "Cleaning up expired sessions"
local current_time=$(date +%s)
local timeout_seconds=${SESSION_TIMEOUT:-7200} # 2 hours default
if [ ! -f "$ACTIVE_SESSIONS_FILE" ]; then
return 0
fi
# Create temporary file for active sessions
local temp_file=$(mktemp)
# Check each session
while IFS= read -r session_line; do
if [ -z "$session_line" ]; then
continue
fi
local mounted_at=$(echo "$session_line" | jq -r '.mounted_at')
local mounted_timestamp=$(date -d "$mounted_at" +%s 2>/dev/null || echo "0")
if [ $((current_time - mounted_timestamp)) -gt $timeout_seconds ]; then
# Session expired
local session_id=$(echo "$session_line" | jq -r '.session_id')
log "Cleaning up expired session: $session_id"
unmount_virtual_media "$session_id"
else
# Session still valid
echo "$session_line" >> "$temp_file"
fi
done < "$ACTIVE_SESSIONS_FILE"
# Replace active sessions file
mv "$temp_file" "$ACTIVE_SESSIONS_FILE"
}
# Function to list active sessions
list_active_sessions() {
if [ ! -f "$ACTIVE_SESSIONS_FILE" ]; then
echo "No active sessions"
return 0
fi
echo "Active Virtual Media Sessions:"
echo "=============================="
while IFS= read -r session_line; do
if [ -z "$session_line" ]; then
continue
fi
local session_id=$(echo "$session_line" | jq -r '.session_id')
local target=$(echo "$session_line" | jq -r '.target')
local mounted_at=$(echo "$session_line" | jq -r '.mounted_at')
echo "Session: $session_id"
echo "Target: $target"
echo "Mounted: $mounted_at"
echo "---"
done < "$ACTIVE_SESSIONS_FILE"
}
# Function to mount with automatic management detection
auto_mount_virtual_media() {
local target_ip=$1
local iso_url=$2
local username=$3
local password=$4
log "Auto-detecting management interface for $target_ip"
# Try to detect management interface type
local mgmt_type=""
# Check for iDRAC
if curl -s -k --max-time 5 "https://$target_ip/redfish/v1/Managers/iDRAC.Embedded.1" &>/dev/null; then
mgmt_type="idrac"
# Check for iLO
elif curl -s -k --max-time 5 "https://$target_ip/redfish/v1/Managers/1" | grep -i "ilo" &>/dev/null; then
mgmt_type="ilo"
else
log "ERROR: Unable to detect management interface type"
return 1
fi
log "Detected management interface: $mgmt_type"
# Mount based on detected type
case "$mgmt_type" in
"idrac")
mount_idrac_virtual_media "$target_ip" "$iso_url" "$username" "$password"
;;
"ilo")
mount_ilo_virtual_media "$target_ip" "$iso_url" "$username" "$password"
;;
*)
log "ERROR: Unsupported management interface: $mgmt_type"
return 1
;;
esac
}
# Function to batch mount virtual media
batch_mount() {
local targets_file=$1
local iso_url=$2
local username=$3
local password=$4
log "Starting batch virtual media mount"
if [ ! -f "$targets_file" ]; then
log "ERROR: Targets file not found: $targets_file"
return 1
fi
local success_count=0
local failure_count=0
while IFS= read -r target_ip; do
if [ -z "$target_ip" ] || [[ "$target_ip" =~ ^# ]]; then
continue
fi
log "Processing target: $target_ip"
if auto_mount_virtual_media "$target_ip" "$iso_url" "$username" "$password"; then
((success_count++))
else
((failure_count++))
fi
# Small delay between operations
sleep 2
done < "$targets_file"
log "Batch mount completed: $success_count successful, $failure_count failed"
}
# Main execution
case "${1:-}" in
mount)
if [ $# -lt 4 ]; then
echo "Usage: $0 mount <target_ip> <iso_url> <username> [password]"
exit 1
fi
target_ip=$2
iso_url=$3
username=$4
password=${5:-}
if [ -z "$password" ]; then
read -s -p "Enter password: " password
echo
fi
session_id=$(auto_mount_virtual_media "$target_ip" "$iso_url" "$username" "$password")
if [ $? -eq 0 ]; then
echo "Virtual media mounted. Session ID: $session_id"
else
echo "Failed to mount virtual media"
exit 1
fi
;;
unmount)
if [ $# -lt 2 ]; then
echo "Usage: $0 unmount <session_id>"
exit 1
fi
unmount_virtual_media "$2"
;;
batch-mount)
if [ $# -lt 4 ]; then
echo "Usage: $0 batch-mount <targets_file> <iso_url> <username> [password]"
exit 1
fi
targets_file=$2
iso_url=$3
username=$4
password=${5:-}
if [ -z "$password" ]; then
read -s -p "Enter password: " password
echo
fi
batch_mount "$targets_file" "$iso_url" "$username" "$password"
;;
list)
list_active_sessions
;;
cleanup)
cleanup_expired_sessions
;;
monitor)
# Continuous monitoring mode
while true; do
cleanup_expired_sessions
sleep 300 # Check every 5 minutes
done
;;
*)
echo "Usage: $0 {mount|unmount|batch-mount|list|cleanup|monitor}"
echo ""
echo "Commands:"
echo " mount <target_ip> <iso_url> <username> [password]"
echo " unmount <session_id>"
echo " batch-mount <targets_file> <iso_url> <username> [password]"
echo " list"
echo " cleanup"
echo " monitor"
exit 1
;;
esac
Network Boot Infrastructure
PXE Boot Management System
Implement comprehensive PXE boot infrastructure:
#!/usr/bin/env python3
"""
Enterprise PXE Boot Management System
Complete network boot infrastructure with automation
"""
import asyncio
import struct
import socket
from typing import Dict, List, Optional, Tuple
import json
import yaml
from pathlib import Path
import logging
from datetime import datetime
import ipaddress
import sqlite3
class PXEBootManager:
"""Manage PXE boot infrastructure"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.tftp_root = Path(self.config['tftp_root'])
self.http_root = Path(self.config['http_root'])
self.db_path = Path(self.config['db_path'])
self.setup_infrastructure()
def setup_infrastructure(self):
"""Setup PXE infrastructure"""
# Create directory structure
self.tftp_root.mkdir(parents=True, exist_ok=True)
(self.tftp_root / 'pxelinux.cfg').mkdir(exist_ok=True)
self.http_root.mkdir(parents=True, exist_ok=True)
(self.http_root / 'images').mkdir(exist_ok=True)
(self.http_root / 'configs').mkdir(exist_ok=True)
# Initialize database
self.init_database()
def init_database(self):
"""Initialize boot configuration database"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS boot_configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac_address TEXT UNIQUE NOT NULL,
hostname TEXT,
ip_address TEXT,
boot_image TEXT,
kernel_args TEXT,
automation_config TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
boot_count INTEGER DEFAULT 0,
last_boot TIMESTAMP,
status TEXT DEFAULT 'active'
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS boot_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mac_address TEXT,
ip_address TEXT,
boot_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
boot_image TEXT,
status TEXT,
error_message TEXT
)
''')
conn.commit()
conn.close()
async def add_boot_config(self, mac_address: str, hostname: str,
boot_config: Dict) -> int:
"""Add boot configuration for MAC address"""
mac_address = mac_address.lower().replace('-', ':')
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO boot_configs
(mac_address, hostname, ip_address, boot_image, kernel_args, automation_config)
VALUES (?, ?, ?, ?, ?, ?)
''', (
mac_address,
hostname,
boot_config.get('ip_address'),
boot_config['boot_image'],
boot_config.get('kernel_args', ''),
json.dumps(boot_config.get('automation_config', {}))
))
config_id = cursor.lastrowid
conn.commit()
conn.close()
# Generate PXE configuration file
await self._generate_pxe_config(mac_address, boot_config)
return config_id
async def _generate_pxe_config(self, mac_address: str, boot_config: Dict):
"""Generate PXE configuration file"""
mac_hex = mac_address.replace(':', '-')
config_file = self.tftp_root / 'pxelinux.cfg' / f"01-{mac_hex}"
# Build PXE configuration
pxe_config = f"""
DEFAULT install
LABEL install
KERNEL {boot_config['kernel_path']}
APPEND initrd={boot_config['initrd_path']} {boot_config.get('kernel_args', '')}
IPAPPEND 2
LABEL local
LOCALBOOT 0
PROMPT 0
TIMEOUT 10
"""
# Write configuration file
with open(config_file, 'w') as f:
f.write(pxe_config)
self.logger.info(f"Generated PXE config for {mac_address}")
async def create_automation_config(self, hostname: str, template: str,
variables: Dict) -> str:
"""Create automation configuration (preseed/kickstart)"""
template_path = Path(self.config['template_dir']) / f"{template}.j2"
if not template_path.exists():
raise ValueError(f"Template not found: {template}")
# Load template
with open(template_path, 'r') as f:
template_content = f.read()
# Simple template substitution (for production, use Jinja2)
config_content = template_content
for key, value in variables.items():
config_content = config_content.replace(f"{{{{ {key} }}}}", str(value))
# Write configuration file
config_file = self.http_root / 'configs' / f"{hostname}.cfg"
with open(config_file, 'w') as f:
f.write(config_content)
return f"http://{self.config['http_server']}/configs/{hostname}.cfg"
class DHCPPXEIntegration:
"""Integration with DHCP for PXE boot"""
def __init__(self, config: Dict):
self.config = config
self.reservations = {}
async def setup_dhcp_reservation(self, mac_address: str, hostname: str,
ip_address: str):
"""Setup DHCP reservation for PXE boot"""
reservation = {
'mac_address': mac_address,
'hostname': hostname,
'ip_address': ip_address,
'next_server': self.config['pxe_server'],
'filename': 'pxelinux.0'
}
# Generate DHCP configuration
dhcp_config = f"""
host {hostname} {{
hardware ethernet {mac_address};
fixed-address {ip_address};
option host-name "{hostname}";
next-server {self.config['pxe_server']};
filename "pxelinux.0";
}}
"""
# Write to DHCP configuration file
dhcp_file = Path(self.config['dhcp_config_dir']) / f"{hostname}.conf"
with open(dhcp_file, 'w') as f:
f.write(dhcp_config)
# Reload DHCP server
await self._reload_dhcp_server()
self.reservations[mac_address] = reservation
async def _reload_dhcp_server(self):
"""Reload DHCP server configuration"""
import subprocess
try:
subprocess.run(['systemctl', 'reload', 'dhcpd'], check=True)
self.logger.info("DHCP server configuration reloaded")
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to reload DHCP server: {e}")
class NetworkBootOrchestrator:
"""Orchestrate network boot deployments"""
def __init__(self, pxe_manager: PXEBootManager, dhcp_integration: DHCPPXEIntegration):
self.pxe_manager = pxe_manager
self.dhcp_integration = dhcp_integration
self.active_deployments = {}
async def deploy_batch(self, systems: List[Dict]) -> Dict:
"""Deploy multiple systems via network boot"""
batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
results = {
'batch_id': batch_id,
'total_systems': len(systems),
'successful': 0,
'failed': 0,
'systems': []
}
# Process each system
for system in systems:
try:
# Setup DHCP reservation
await self.dhcp_integration.setup_dhcp_reservation(
system['mac_address'],
system['hostname'],
system['ip_address']
)
# Create automation config
automation_config_url = await self.pxe_manager.create_automation_config(
system['hostname'],
system['template'],
system['variables']
)
# Setup PXE boot
boot_config = {
'boot_image': system['boot_image'],
'kernel_path': system['kernel_path'],
'initrd_path': system['initrd_path'],
'kernel_args': f"auto=true url={automation_config_url} {system.get('extra_args', '')}",
'ip_address': system['ip_address'],
'automation_config': system['variables']
}
config_id = await self.pxe_manager.add_boot_config(
system['mac_address'],
system['hostname'],
boot_config
)
# Power on system
await self._power_on_system(system)
results['successful'] += 1
results['systems'].append({
'hostname': system['hostname'],
'mac_address': system['mac_address'],
'status': 'deployed',
'config_id': config_id
})
except Exception as e:
results['failed'] += 1
results['systems'].append({
'hostname': system['hostname'],
'mac_address': system['mac_address'],
'status': 'failed',
'error': str(e)
})
return results
async def _power_on_system(self, system: Dict):
"""Power on system for network boot"""
# Implementation depends on power management interface
# This could be IPMI, iDRAC, iLO, etc.
pass
# PXE Boot configuration templates
PXE_TEMPLATES = {
'ubuntu_server': """
DEFAULT install
LABEL install
KERNEL images/ubuntu/vmlinuz
APPEND initrd=images/ubuntu/initrd.gz auto=true url={{ automation_config_url }} netcfg/choose_interface=auto
IPAPPEND 2
LABEL local
LOCALBOOT 0
PROMPT 0
TIMEOUT 10
""",
'centos_server': """
DEFAULT install
LABEL install
KERNEL images/centos/vmlinuz
APPEND initrd=images/centos/initrd.img ks={{ automation_config_url }} ksdevice=bootif
IPAPPEND 2
LABEL local
LOCALBOOT 0
PROMPT 0
TIMEOUT 10
""",
'windows_pe': """
DEFAULT install
LABEL install
KERNEL images/windows/pxeboot.com
APPEND bootmgr.exe
LABEL local
LOCALBOOT 0
PROMPT 0
TIMEOUT 10
"""
}
# Preseed template example
UBUNTU_PRESEED_TEMPLATE = """
# Ubuntu Server Preseed Configuration
d-i debian-installer/locale string en_US
d-i console-setup/ask_detect boolean false
d-i keyboard-configuration/xkb-keymap select us
# Network configuration
d-i netcfg/choose_interface select auto
d-i netcfg/get_hostname string {{ hostname }}
d-i netcfg/get_domain string {{ domain }}
# Mirror settings
d-i mirror/country string manual
d-i mirror/http/hostname string {{ mirror_hostname }}
d-i mirror/http/directory string {{ mirror_directory }}
d-i mirror/http/proxy string
# Account setup
d-i passwd/user-fullname string {{ user_fullname }}
d-i passwd/username string {{ username }}
d-i passwd/user-password-crypted password {{ password_hash }}
d-i user-setup/allow-password-weak boolean true
d-i user-setup/encrypt-home boolean false
# Clock and time zone setup
d-i clock-setup/utc boolean true
d-i time/zone string {{ timezone }}
d-i clock-setup/ntp boolean true
# Partitioning
d-i partman-auto/disk string {{ disk_device }}
d-i partman-auto/method string lvm
d-i partman-lvm/device_remove_lvm boolean true
d-i partman-md/device_remove_md boolean true
d-i partman-lvm/confirm boolean true
d-i partman-lvm/confirm_nooverwrite boolean true
d-i partman-auto-lvm/guided_size string max
d-i partman-auto/choose_recipe select atomic
d-i partman-partitioning/confirm_write_new_label boolean true
d-i partman/choose_partition select finish
d-i partman/confirm boolean true
d-i partman/confirm_nooverwrite boolean true
# Base system installation
d-i base-installer/install-recommends boolean false
d-i base-installer/kernel/image string linux-generic
# Package selection
tasksel tasksel/first multiselect standard, server
d-i pkgsel/include string {{ additional_packages }}
d-i pkgsel/upgrade select full-upgrade
d-i pkgsel/update-policy select none
# Boot loader installation
d-i grub-installer/only_debian boolean true
d-i grub-installer/with_other_os boolean true
d-i grub-installer/bootdev string {{ disk_device }}
# Finishing up the installation
d-i finish-install/reboot_in_progress note
# Post-installation commands
d-i preseed/late_command string \\
in-target mkdir -p /root/.ssh; \\
in-target chmod 700 /root/.ssh; \\
echo "{{ ssh_public_key }}" > /target/root/.ssh/authorized_keys; \\
in-target chmod 600 /root/.ssh/authorized_keys; \\
in-target systemctl enable ssh
"""
# Kickstart template example
CENTOS_KICKSTART_TEMPLATE = """
# CentOS/RHEL Kickstart Configuration
install
text
reboot --eject
# System language
lang en_US.UTF-8
# Keyboard layouts
keyboard --vckeymap=us --xlayouts='us'
# Network information
network --bootproto=dhcp --device={{ network_device }} --onboot=on --ipv6=auto --activate
network --hostname={{ hostname }}
# Root password
rootpw --iscrypted {{ root_password_hash }}
# System services
services --enabled="chronyd"
# System timezone
timezone {{ timezone }} --isUtc
# System bootloader configuration
bootloader --location=mbr --boot-drive={{ disk_device }}
# Clear the Master Boot Record
zerombr
# Partition clearing information
clearpart --all --initlabel --drives={{ disk_device }}
# Disk partitioning information
part /boot --fstype="xfs" --ondisk={{ disk_device }} --size=1024
part pv.01 --fstype="lvmpv" --ondisk={{ disk_device }} --grow
volgroup vg_root --pesize=4096 pv.01
logvol / --fstype="xfs" --size=20480 --name=lv_root --vgname=vg_root
logvol /var --fstype="xfs" --size=10240 --name=lv_var --vgname=vg_root
logvol /tmp --fstype="xfs" --size=2048 --name=lv_tmp --vgname=vg_root
logvol swap --fstype="swap" --size=4096 --name=lv_swap --vgname=vg_root
# Package installation
%packages
@^minimal
@core
chrony
kexec-tools
{{ additional_packages }}
%end
# Post-installation script
%post
# Configure SSH
mkdir -p /root/.ssh
chmod 700 /root/.ssh
echo "{{ ssh_public_key }}" > /root/.ssh/authorized_keys
chmod 600 /root/.ssh/authorized_keys
# Enable SSH
systemctl enable sshd
# Configure firewall
firewall-cmd --permanent --add-service=ssh
firewall-cmd --reload
# Set up monitoring
{{ post_install_commands }}
%end
"""
Security and Compliance
Enterprise Security Framework
Implement comprehensive security for ISO management:
#!/usr/bin/env python3
"""
Enterprise ISO Security and Compliance Framework
Comprehensive security implementation for ISO management
"""
import hashlib
import hmac
import secrets
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
class ISOSecurityManager:
"""Manage security for ISO operations"""
def __init__(self, config: Dict):
self.config = config
self.encryption_key = self._derive_encryption_key()
self.fernet = Fernet(self.encryption_key)
self.setup_audit_logging()
def _derive_encryption_key(self) -> bytes:
"""Derive encryption key from master password"""
password = self.config['master_password'].encode()
salt = self.config['salt'].encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
return key
def setup_audit_logging(self):
"""Setup comprehensive audit logging"""
self.audit_logger = logging.getLogger('iso_security_audit')
self.audit_logger.setLevel(logging.INFO)
# Create file handler with rotation
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
'/var/log/iso_security_audit.log',
maxBytes=10485760, # 10MB
backupCount=10
)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.audit_logger.addHandler(handler)
def verify_iso_integrity(self, iso_path: str, expected_checksum: str) -> bool:
"""Verify ISO file integrity"""
try:
# Calculate actual checksum
actual_checksum = self._calculate_checksum(iso_path)
# Compare checksums
if actual_checksum == expected_checksum:
self.audit_logger.info(f"ISO integrity verified: {iso_path}")
return True
else:
self.audit_logger.error(f"ISO integrity check failed: {iso_path}")
return False
except Exception as e:
self.audit_logger.error(f"Error verifying ISO integrity: {e}")
return False
def _calculate_checksum(self, file_path: str) -> str:
"""Calculate SHA256 checksum of file"""
sha256_hash = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive data"""
encrypted = self.fernet.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data"""
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted = self.fernet.decrypt(encrypted_bytes)
return decrypted.decode()
def generate_access_token(self, user_id: str, permissions: List[str]) -> str:
"""Generate secure access token"""
payload = {
'user_id': user_id,
'permissions': permissions,
'issued_at': datetime.utcnow().isoformat(),
'expires_at': (datetime.utcnow() + timedelta(hours=8)).isoformat(),
'nonce': secrets.token_hex(16)
}
# Sign payload
token_data = json.dumps(payload, sort_keys=True).encode()
signature = hmac.new(
self.encryption_key,
token_data,
hashlib.sha256
).hexdigest()
# Combine payload and signature
token = {
'payload': base64.urlsafe_b64encode(token_data).decode(),
'signature': signature
}
return base64.urlsafe_b64encode(json.dumps(token).encode()).decode()
def validate_access_token(self, token: str) -> Optional[Dict]:
"""Validate access token"""
try:
# Decode token
token_data = json.loads(base64.urlsafe_b64decode(token.encode()).decode())
# Verify signature
payload_bytes = base64.urlsafe_b64decode(token_data['payload'].encode())
expected_signature = hmac.new(
self.encryption_key,
payload_bytes,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected_signature, token_data['signature']):
return None
# Parse payload
payload = json.loads(payload_bytes.decode())
# Check expiration
expires_at = datetime.fromisoformat(payload['expires_at'])
if datetime.utcnow() > expires_at:
return None
return payload
except Exception as e:
self.audit_logger.error(f"Token validation error: {e}")
return None
def audit_iso_access(self, user_id: str, iso_id: str, action: str,
details: Dict = None):
"""Audit ISO access operations"""
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'iso_id': iso_id,
'action': action,
'details': details or {},
'source_ip': details.get('source_ip') if details else None
}
self.audit_logger.info(f"AUDIT: {json.dumps(audit_entry)}")
class ComplianceManager:
"""Manage compliance requirements"""
def __init__(self, config: Dict):
self.config = config
self.compliance_rules = self._load_compliance_rules()
def _load_compliance_rules(self) -> Dict:
"""Load compliance rules"""
return {
'data_retention': {
'audit_logs': 2555, # 7 years in days
'iso_metadata': 2555,
'access_logs': 2555
},
'access_controls': {
'min_password_length': 12,
'mfa_required': True,
'session_timeout': 3600, # 1 hour
'max_failed_attempts': 3
},
'encryption': {
'data_at_rest': True,
'data_in_transit': True,
'key_rotation_days': 90
},
'audit_requirements': {
'log_all_access': True,
'log_all_changes': True,
'log_all_downloads': True,
'tamper_protection': True
}
}
def check_compliance(self, operation: str, context: Dict) -> Dict:
"""Check compliance for operation"""
compliance_result = {
'compliant': True,
'violations': [],
'warnings': []
}
# Check access control compliance
if operation in ['mount', 'download', 'deploy']:
if not self._check_access_controls(context):
compliance_result['compliant'] = False
compliance_result['violations'].append('Access control requirements not met')
# Check encryption compliance
if operation in ['store', 'transmit']:
if not self._check_encryption_compliance(context):
compliance_result['compliant'] = False
compliance_result['violations'].append('Encryption requirements not met')
# Check audit compliance
if not self._check_audit_compliance(operation, context):
compliance_result['compliant'] = False
compliance_result['violations'].append('Audit requirements not met')
return compliance_result
def _check_access_controls(self, context: Dict) -> bool:
"""Check access control compliance"""
# Check MFA requirement
if self.compliance_rules['access_controls']['mfa_required']:
if not context.get('mfa_verified'):
return False
# Check session timeout
if 'session_start' in context:
session_age = (datetime.utcnow() - context['session_start']).seconds
if session_age > self.compliance_rules['access_controls']['session_timeout']:
return False
return True
def _check_encryption_compliance(self, context: Dict) -> bool:
"""Check encryption compliance"""
if self.compliance_rules['encryption']['data_at_rest']:
if not context.get('encrypted_storage'):
return False
if self.compliance_rules['encryption']['data_in_transit']:
if not context.get('encrypted_transmission'):
return False
return True
def _check_audit_compliance(self, operation: str, context: Dict) -> bool:
"""Check audit compliance"""
if self.compliance_rules['audit_requirements']['log_all_access']:
if not context.get('audit_logged'):
return False
return True
def generate_compliance_report(self, start_date: datetime,
end_date: datetime) -> Dict:
"""Generate compliance report"""
report = {
'report_period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'compliance_summary': {
'total_operations': 0,
'compliant_operations': 0,
'violations': 0,
'warnings': 0
},
'violation_details': [],
'recommendations': []
}
# Analyze audit logs for compliance
# This would query the audit logs and check compliance
return report
class SecureBootValidator:
"""Validate secure boot configurations"""
def __init__(self):
self.trusted_certificates = self._load_trusted_certificates()
def _load_trusted_certificates(self) -> List[str]:
"""Load trusted certificates for secure boot"""
# Load from configuration or certificate store
return []
def validate_iso_signature(self, iso_path: str) -> bool:
"""Validate ISO digital signature"""
# Implementation depends on signing infrastructure
# This is a placeholder for actual signature validation
return True
def validate_boot_chain(self, boot_files: List[str]) -> bool:
"""Validate secure boot chain"""
# Validate each component in the boot chain
for boot_file in boot_files:
if not self._validate_file_signature(boot_file):
return False
return True
def _validate_file_signature(self, file_path: str) -> bool:
"""Validate individual file signature"""
# Implementation would check digital signature
# This is a placeholder
return True
# Security configuration templates
SECURITY_POLICIES = {
'high_security': {
'encryption': {
'algorithm': 'AES-256-GCM',
'key_derivation': 'PBKDF2',
'iterations': 100000
},
'access_control': {
'mfa_required': True,
'session_timeout': 1800, # 30 minutes
'max_concurrent_sessions': 1
},
'audit': {
'log_level': 'DEBUG',
'log_all_operations': True,
'tamper_detection': True
}
},
'standard_security': {
'encryption': {
'algorithm': 'AES-256-CBC',
'key_derivation': 'PBKDF2',
'iterations': 10000
},
'access_control': {
'mfa_required': False,
'session_timeout': 3600, # 1 hour
'max_concurrent_sessions': 3
},
'audit': {
'log_level': 'INFO',
'log_all_operations': False,
'tamper_detection': False
}
}
}
Best Practices and Guidelines
Enterprise ISO Management Best Practices
Repository Management
- Implement version control for all ISO files
- Use checksums to verify integrity
- Maintain comprehensive metadata
- Regular cleanup of obsolete versions
- Automated synchronization across sites
Security Framework
security_controls: access_management: - role_based_access_control: true - multi_factor_authentication: required - session_timeout: 30_minutes - audit_all_operations: true data_protection: - encryption_at_rest: AES-256 - encryption_in_transit: TLS_1.3 - digital_signatures: required - integrity_monitoring: continuous compliance: - audit_log_retention: 7_years - access_log_retention: 7_years - change_management: required - vulnerability_scanning: weeklyAutomation Standards
- Standardized deployment templates
- Automated testing and validation
- Rollback capabilities
- Progress monitoring
- Error handling and recovery
Performance Optimization
- Distributed caching systems
- Bandwidth optimization
- Load balancing
- Resource monitoring
- Capacity planning
Network Infrastructure
- Dedicated PXE VLAN
- Redundant TFTP servers
- High-availability DHCP
- Monitoring and alerting
- Disaster recovery procedures
Monitoring and Analytics
- Real-time deployment tracking
- Performance metrics collection
- Failure analysis and reporting
- Capacity utilization monitoring
- Predictive maintenance
This comprehensive guide transforms basic ISO mounting into a complete enterprise deployment and management system with automation, security, and operational excellence built-in.