Enterprise VNC Remote Console Management Guide 2025: Secure iDRAC & KVM Access at Scale
Enterprise VNC Remote Console Management Guide 2025: Secure iDRAC & KVM Access at Scale
Managing remote console access for thousands of servers requires sophisticated security, automation, and monitoring capabilities that go far beyond basic VNC viewer connections. This comprehensive guide transforms simple VNC usage into an enterprise-grade remote console management system with encryption, compliance, and intelligent session management.
Table of Contents
- VNC Architecture and Security Overview
- Enterprise VNC Deployment Strategy
- Secure Connection Management
- Multi-Server Console Automation
- Session Recording and Compliance
- Performance Optimization
- Integration with Management Systems
- Advanced Security Hardening
- Monitoring and Analytics
- Troubleshooting Framework
- Disaster Recovery Procedures
- Best Practices and Guidelines
VNC Architecture and Security Overview
Understanding VNC in Enterprise Environments
VNC (Virtual Network Computing) provides critical remote console access, but enterprise deployments require careful architecture:
#!/usr/bin/env python3
"""
Enterprise VNC Architecture Assessment
Maps and validates VNC infrastructure
"""
import asyncio
import socket
import ssl
import struct
from typing import Dict, List, Optional, Tuple
import json
import yaml
import logging
from datetime import datetime
import nmap
import concurrent.futures
class VNCInfrastructureMapper:
"""Map and assess VNC infrastructure"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.discovered_vnc = []
self.security_issues = []
self.logger = logging.getLogger(__name__)
async def discover_vnc_services(self) -> List[Dict]:
"""Discover all VNC services in infrastructure"""
discoveries = []
# Scan for VNC ports
nm = nmap.PortScanner()
for subnet in self.config['subnets']:
self.logger.info(f"Scanning subnet: {subnet}")
# Scan common VNC ports
nm.scan(hosts=subnet, ports='5900-5999,5800-5899', arguments='-sV')
for host in nm.all_hosts():
for port in nm[host]['tcp']:
if nm[host]['tcp'][port]['state'] == 'open':
service = nm[host]['tcp'][port]
if 'vnc' in service.get('name', '').lower() or \
port in range(5900, 6000):
discovery = {
'ip': host,
'port': port,
'service': service.get('name', 'vnc'),
'version': service.get('version', 'unknown'),
'product': service.get('product', 'unknown'),
'discovered_at': datetime.utcnow()
}
# Perform deeper inspection
vnc_info = await self._inspect_vnc_service(host, port)
discovery.update(vnc_info)
discoveries.append(discovery)
return discoveries
async def _inspect_vnc_service(self, host: str, port: int) -> Dict:
"""Deeply inspect VNC service"""
info = {
'security_types': [],
'desktop_name': None,
'rfb_version': None,
'encryption': False,
'authentication': None
}
try:
# Connect and get RFB protocol version
reader, writer = await asyncio.open_connection(host, port)
# Read RFB version
rfb_version = await reader.read(12)
info['rfb_version'] = rfb_version.decode('ascii').strip()
# Send client version
writer.write(b'RFB 003.008\n')
await writer.drain()
# Read security types
num_security_types = struct.unpack('B', await reader.read(1))[0]
if num_security_types > 0:
security_types = struct.unpack(
f'{num_security_types}B',
await reader.read(num_security_types)
)
info['security_types'] = list(security_types)
# Interpret security types
security_names = {
1: 'None',
2: 'VNC Authentication',
5: 'RA2',
6: 'RA2ne',
16: 'Tight',
17: 'Ultra',
18: 'TLS',
19: 'VeNCrypt'
}
info['authentication'] = [
security_names.get(st, f'Unknown({st})')
for st in security_types
]
# Check for encryption
if any(st in [18, 19] for st in security_types):
info['encryption'] = True
writer.close()
await writer.wait_closed()
except Exception as e:
self.logger.debug(f"Failed to inspect {host}:{port}: {e}")
return info
def assess_security(self, discoveries: List[Dict]) -> Dict:
"""Assess security of discovered VNC services"""
assessment = {
'total_services': len(discoveries),
'security_summary': {
'encrypted': 0,
'unencrypted': 0,
'no_auth': 0,
'weak_auth': 0
},
'vulnerabilities': [],
'recommendations': []
}
for service in discoveries:
# Check encryption
if service.get('encryption'):
assessment['security_summary']['encrypted'] += 1
else:
assessment['security_summary']['unencrypted'] += 1
assessment['vulnerabilities'].append({
'severity': 'HIGH',
'service': f"{service['ip']}:{service['port']}",
'issue': 'Unencrypted VNC connection',
'impact': 'Credentials and session data transmitted in clear text'
})
# Check authentication
if 'None' in service.get('authentication', []):
assessment['security_summary']['no_auth'] += 1
assessment['vulnerabilities'].append({
'severity': 'CRITICAL',
'service': f"{service['ip']}:{service['port']}",
'issue': 'No authentication required',
'impact': 'Anyone can access the console'
})
elif 'VNC Authentication' in service.get('authentication', []):
assessment['security_summary']['weak_auth'] += 1
assessment['vulnerabilities'].append({
'severity': 'MEDIUM',
'service': f"{service['ip']}:{service['port']}",
'issue': 'Weak VNC authentication',
'impact': 'DES-based authentication is cryptographically weak'
})
# Generate recommendations
if assessment['security_summary']['unencrypted'] > 0:
assessment['recommendations'].append(
"Implement VNC over SSH tunneling or TLS encryption for all connections"
)
if assessment['security_summary']['no_auth'] > 0:
assessment['recommendations'].append(
"Enable authentication on all VNC services immediately"
)
return assessment
# Security configuration templates
VNC_SECURITY_CONFIGS = {
'high_security': {
'encryption': 'required',
'tunnel': 'ssh',
'authentication': 'multi_factor',
'session_recording': True,
'idle_timeout': 900, # 15 minutes
'access_control': 'ip_whitelist',
'clipboard': 'disabled',
'file_transfer': 'disabled'
},
'standard_security': {
'encryption': 'required',
'tunnel': 'tls',
'authentication': 'password',
'session_recording': True,
'idle_timeout': 1800, # 30 minutes
'access_control': 'network_based',
'clipboard': 'audit_only',
'file_transfer': 'audit_only'
},
'development': {
'encryption': 'preferred',
'tunnel': 'optional',
'authentication': 'password',
'session_recording': False,
'idle_timeout': 3600, # 60 minutes
'access_control': 'subnet_based',
'clipboard': 'enabled',
'file_transfer': 'enabled'
}
}
Enterprise VNC Deployment Strategy
Centralized VNC Gateway Architecture
Implement a secure, scalable VNC gateway:
#!/usr/bin/env python3
"""
Enterprise VNC Gateway System
Centralized, secure access to all VNC endpoints
"""
import asyncio
import ssl
import websockets
import jwt
import redis
from typing import Dict, List, Optional, Tuple
import json
import logging
from datetime import datetime, timedelta
import uuid
from cryptography.fernet import Fernet
class VNCGateway:
"""Enterprise VNC gateway with security and routing"""
def __init__(self, config: Dict):
self.config = config
self.connections = {}
self.redis_client = redis.Redis(
host=config['redis']['host'],
port=config['redis']['port'],
decode_responses=True
)
self.encryption_key = Fernet.generate_key()
self.fernet = Fernet(self.encryption_key)
async def start(self):
"""Start VNC gateway server"""
# Create SSL context
ssl_context = self._create_ssl_context()
# Start WebSocket server for browser clients
async with websockets.serve(
self.handle_client,
self.config['listen_address'],
self.config['listen_port'],
ssl=ssl_context
):
self.logger.info(f"VNC Gateway started on {self.config['listen_address']}:{self.config['listen_port']}")
await asyncio.Future() # Run forever
def _create_ssl_context(self) -> ssl.SSLContext:
"""Create secure SSL context"""
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
self.config['ssl']['cert_file'],
self.config['ssl']['key_file']
)
# Force TLS 1.2 or higher
context.minimum_version = ssl.TLSVersion.TLSv1_2
# Strong cipher suites only
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
return context
async def handle_client(self, websocket, path):
"""Handle client WebSocket connection"""
client_id = str(uuid.uuid4())
client_info = {
'id': client_id,
'websocket': websocket,
'authenticated': False,
'user': None,
'target_server': None,
'connected_at': datetime.utcnow()
}
try:
# Authenticate client
auth_token = await self._authenticate_client(websocket)
if not auth_token:
await websocket.close(1008, "Authentication failed")
return
client_info['authenticated'] = True
client_info['user'] = auth_token['user']
# Get target server
target = await self._get_target_server(websocket, auth_token)
if not target:
await websocket.close(1008, "Invalid target server")
return
client_info['target_server'] = target
# Check authorization
if not await self._authorize_access(auth_token['user'], target):
await websocket.close(1008, "Access denied")
return
# Establish VNC connection
vnc_connection = await self._connect_to_vnc(target)
if not vnc_connection:
await websocket.close(1008, "Failed to connect to VNC server")
return
# Start session recording if required
session_recorder = None
if self.config['session_recording']['enabled']:
session_recorder = SessionRecorder(client_info, target)
await session_recorder.start()
# Proxy VNC traffic
await self._proxy_vnc_traffic(
websocket,
vnc_connection,
client_info,
session_recorder
)
except Exception as e:
self.logger.error(f"Client handler error: {e}")
finally:
# Cleanup
if client_id in self.connections:
del self.connections[client_id]
if session_recorder:
await session_recorder.stop()
# Audit log
await self._log_session_end(client_info)
async def _authenticate_client(self, websocket) -> Optional[Dict]:
"""Authenticate client using JWT"""
try:
# Request authentication
await websocket.send(json.dumps({'type': 'auth_required'}))
# Wait for auth token
auth_msg = await asyncio.wait_for(websocket.recv(), timeout=30)
auth_data = json.loads(auth_msg)
if auth_data.get('type') != 'auth':
return None
# Verify JWT token
token = auth_data.get('token')
payload = jwt.decode(
token,
self.config['jwt_secret'],
algorithms=['HS256']
)
# Check token expiration
if datetime.fromtimestamp(payload['exp']) < datetime.utcnow():
return None
# Verify user exists and is active
if not await self._verify_user(payload['user']):
return None
return payload
except Exception as e:
self.logger.error(f"Authentication error: {e}")
return None
async def _authorize_access(self, user: str, target: Dict) -> bool:
"""Check if user is authorized to access target"""
# Check user permissions
user_perms = await self._get_user_permissions(user)
# Check server access list
if target['id'] not in user_perms.get('allowed_servers', []):
# Check group permissions
for group in user_perms.get('groups', []):
group_servers = await self._get_group_servers(group)
if target['id'] in group_servers:
break
else:
return False
# Check time-based access
if not self._check_time_restrictions(user_perms):
return False
# Check MFA if required
if user_perms.get('require_mfa') and not await self._verify_mfa(user):
return False
return True
async def _connect_to_vnc(self, target: Dict) -> Optional[Tuple]:
"""Establish connection to VNC server"""
try:
if target.get('tunnel') == 'ssh':
# Create SSH tunnel first
vnc_reader, vnc_writer = await self._create_ssh_tunnel(target)
else:
# Direct connection
vnc_reader, vnc_writer = await asyncio.open_connection(
target['host'],
target['port']
)
# Perform VNC handshake
if not await self._vnc_handshake(vnc_reader, vnc_writer, target):
vnc_writer.close()
return None
return (vnc_reader, vnc_writer)
except Exception as e:
self.logger.error(f"VNC connection failed: {e}")
return None
class SessionRecorder:
"""Record VNC sessions for compliance and auditing"""
def __init__(self, client_info: Dict, target: Dict):
self.client_info = client_info
self.target = target
self.session_id = str(uuid.uuid4())
self.start_time = datetime.utcnow()
self.recording_file = None
self.metadata = {
'session_id': self.session_id,
'user': client_info['user'],
'target': target['id'],
'start_time': self.start_time.isoformat()
}
async def start(self):
"""Start session recording"""
# Create recording file
filename = f"vnc_session_{self.session_id}_{self.start_time.strftime('%Y%m%d_%H%M%S')}.rec"
self.recording_file = open(f"/var/vnc_recordings/{filename}", 'wb')
# Write metadata header
header = json.dumps(self.metadata).encode('utf-8')
self.recording_file.write(len(header).to_bytes(4, 'big'))
self.recording_file.write(header)
async def record_traffic(self, direction: str, data: bytes):
"""Record VNC traffic"""
if not self.recording_file:
return
timestamp = datetime.utcnow()
# Create packet record
packet = {
'timestamp': (timestamp - self.start_time).total_seconds(),
'direction': direction, # 'client_to_server' or 'server_to_client'
'size': len(data)
}
# Write packet header
packet_header = json.dumps(packet).encode('utf-8')
self.recording_file.write(len(packet_header).to_bytes(2, 'big'))
self.recording_file.write(packet_header)
# Write packet data
self.recording_file.write(data)
async def stop(self):
"""Stop session recording"""
if self.recording_file:
# Write session end marker
end_metadata = {
'end_time': datetime.utcnow().isoformat(),
'duration': (datetime.utcnow() - self.start_time).total_seconds()
}
end_data = json.dumps(end_metadata).encode('utf-8')
self.recording_file.write(b'\x00\x00\x00\x00') # End marker
self.recording_file.write(len(end_data).to_bytes(4, 'big'))
self.recording_file.write(end_data)
self.recording_file.close()
class VNCConnectionManager:
"""Manage VNC connections at scale"""
def __init__(self, config: Dict):
self.config = config
self.connection_pool = {}
self.connection_limits = config['connection_limits']
async def get_connection(self, user: str, target: str) -> Optional[Dict]:
"""Get or create VNC connection"""
# Check connection limits
user_connections = self._count_user_connections(user)
if user_connections >= self.connection_limits['per_user']:
raise Exception(f"User connection limit exceeded ({user_connections}/{self.connection_limits['per_user']})")
# Check if connection exists
conn_key = f"{user}:{target}"
if conn_key in self.connection_pool:
conn = self.connection_pool[conn_key]
if await self._validate_connection(conn):
return conn
else:
# Remove stale connection
del self.connection_pool[conn_key]
# Create new connection
conn = await self._create_connection(user, target)
if conn:
self.connection_pool[conn_key] = conn
return conn
def _count_user_connections(self, user: str) -> int:
"""Count active connections for user"""
count = 0
for key in self.connection_pool:
if key.startswith(f"{user}:"):
count += 1
return count
Secure Connection Management
SSH Tunnel VNC Wrapper
Implement secure VNC over SSH:
#!/bin/bash
# Secure VNC Connection Manager
# Establishes VNC connections through SSH tunnels
set -euo pipefail
# Configuration
CONFIG_FILE="/etc/vnc/secure_vnc.conf"
LOG_FILE="/var/log/secure_vnc.log"
SOCKET_DIR="/var/run/vnc_tunnels"
# Load configuration
source "$CONFIG_FILE"
# Logging function
log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# Function to create SSH tunnel
create_ssh_tunnel() {
local target_host=$1
local target_port=$2
local local_port=$3
local ssh_user=$4
local ssh_key=$5
log "Creating SSH tunnel to $target_host:$target_port"
# Create control socket directory
mkdir -p "$SOCKET_DIR"
# Establish SSH tunnel with control socket
ssh -f -N -M \
-S "$SOCKET_DIR/${target_host}_${local_port}.sock" \
-i "$ssh_key" \
-o "StrictHostKeyChecking=yes" \
-o "UserKnownHostsFile=/etc/vnc/known_hosts" \
-o "ServerAliveInterval=30" \
-o "ServerAliveCountMax=3" \
-o "ExitOnForwardFailure=yes" \
-L "$local_port:localhost:$target_port" \
"$ssh_user@$target_host"
# Verify tunnel is established
if ssh -S "$SOCKET_DIR/${target_host}_${local_port}.sock" -O check "$ssh_user@$target_host" 2>/dev/null; then
log "SSH tunnel established successfully"
return 0
else
log "ERROR: Failed to establish SSH tunnel"
return 1
fi
}
# Function to close SSH tunnel
close_ssh_tunnel() {
local target_host=$1
local local_port=$2
local ssh_user=$3
local socket_file="$SOCKET_DIR/${target_host}_${local_port}.sock"
if [ -S "$socket_file" ]; then
log "Closing SSH tunnel to $target_host"
ssh -S "$socket_file" -O exit "$ssh_user@$target_host" 2>/dev/null || true
rm -f "$socket_file"
fi
}
# Function to launch VNC viewer with security options
launch_vnc_viewer() {
local local_port=$1
local scale=$2
local password_file=$3
local recording_enabled=$4
# Build VNC viewer command
VNC_CMD="ssvncviewer"
# Add security options
VNC_OPTS=(
"-scale" "$scale"
"-passwd" "$password_file"
"-encodings" "tight zrle hextile" # Prefer compressed encodings
"-compresslevel" "9" # Maximum compression
"-quality" "8" # Good quality
"-noraiseonbeep" # Don't raise window on beep
)
# Add recording if enabled
if [ "$recording_enabled" = "true" ]; then
SESSION_ID=$(uuidgen)
RECORDING_FILE="/var/vnc_recordings/session_${SESSION_ID}_$(date +%Y%m%d_%H%M%S).vncrec"
VNC_OPTS+=("-record" "$RECORDING_FILE")
log "Recording session to: $RECORDING_FILE"
fi
# Launch viewer
log "Launching VNC viewer on localhost:$local_port"
$VNC_CMD "${VNC_OPTS[@]}" "localhost:$local_port"
}
# Function to connect to iDRAC
connect_idrac() {
local idrac_host=$1
local idrac_user=${2:-root}
local vnc_password_file=$3
local scale=${4:-0.85}
log "Connecting to iDRAC: $idrac_host"
# Generate local port based on hash of hostname
local_port=$((5900 + $(echo -n "$idrac_host" | cksum | cut -d' ' -f1) % 100))
# Check if tunnel already exists
if [ -S "$SOCKET_DIR/${idrac_host}_${local_port}.sock" ]; then
log "Reusing existing tunnel on port $local_port"
else
# Create SSH tunnel to iDRAC
if ! create_ssh_tunnel "$idrac_host" 5901 "$local_port" "$idrac_user" "$SSH_KEY"; then
log "ERROR: Failed to create tunnel to $idrac_host"
return 1
fi
fi
# Launch VNC viewer
launch_vnc_viewer "$local_port" "$scale" "$vnc_password_file" "$ENABLE_RECORDING"
# Optionally close tunnel after viewer exits
if [ "$PERSISTENT_TUNNELS" != "true" ]; then
close_ssh_tunnel "$idrac_host" "$local_port" "$idrac_user"
fi
}
# Function to connect with enhanced security
secure_connect() {
local target=$1
# Validate target
if ! validate_target "$target"; then
log "ERROR: Invalid target: $target"
exit 1
fi
# Check user authorization
if ! check_authorization "$USER" "$target"; then
log "ERROR: User $USER not authorized for $target"
exit 1
fi
# Get connection parameters
params=$(get_connection_params "$target")
# Audit log
audit_log "VNC_CONNECT" "$USER" "$target" "START"
# Connect
connect_idrac $params
# Audit log
audit_log "VNC_CONNECT" "$USER" "$target" "END"
}
# Main execution
case "${1:-}" in
connect)
shift
secure_connect "$@"
;;
list-tunnels)
log "Active SSH tunnels:"
ls -la "$SOCKET_DIR"/*.sock 2>/dev/null || echo "No active tunnels"
;;
close-all)
log "Closing all SSH tunnels"
for sock in "$SOCKET_DIR"/*.sock; do
[ -S "$sock" ] || continue
# Extract host and user from socket filename
basename "$sock" | sed 's/\.sock$//' | while IFS='_' read -r host port; do
ssh -S "$sock" -O exit "root@$host" 2>/dev/null || true
done
rm -f "$sock"
done
;;
*)
echo "Usage: $0 {connect|list-tunnels|close-all} [options]"
echo ""
echo "Commands:"
echo " connect <host> - Connect to iDRAC VNC"
echo " list-tunnels - List active SSH tunnels"
echo " close-all - Close all SSH tunnels"
exit 1
;;
esac
Advanced VNC Client Wrapper
Create an intelligent VNC client wrapper:
#!/usr/bin/env python3
"""
Enterprise VNC Client Wrapper
Provides secure, monitored VNC access with advanced features
"""
import os
import sys
import subprocess
import tempfile
import json
import time
import threading
import queue
from typing import Dict, List, Optional
import tkinter as tk
from tkinter import messagebox
import pyotp
import requests
import psutil
import logging
from datetime import datetime
class SecureVNCClient:
"""Secure VNC client with enterprise features"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.logger = self._setup_logging()
self.session_id = None
self.start_time = None
self.metrics_queue = queue.Queue()
def _setup_logging(self) -> logging.Logger:
"""Setup logging configuration"""
logger = logging.getLogger('SecureVNCClient')
logger.setLevel(logging.INFO)
# Console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# File handler
fh = logging.FileHandler('/var/log/vnc_client.log')
fh.setLevel(logging.DEBUG)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
ch.setFormatter(formatter)
fh.setFormatter(formatter)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
def connect(self, target: str, user_credentials: Dict) -> bool:
"""Establish secure VNC connection"""
try:
# Authenticate user
if not self._authenticate_user(user_credentials):
self.logger.error("Authentication failed")
return False
# Get target details
target_info = self._get_target_info(target)
if not target_info:
self.logger.error(f"Unknown target: {target}")
return False
# Check authorization
if not self._check_authorization(user_credentials['username'], target):
self.logger.error("User not authorized for target")
return False
# Request MFA if required
if target_info.get('require_mfa', True):
if not self._verify_mfa(user_credentials['username']):
self.logger.error("MFA verification failed")
return False
# Create session
self.session_id = self._create_session(
user_credentials['username'],
target
)
# Setup connection
connection_params = self._setup_connection(target_info)
# Start monitoring
monitor_thread = threading.Thread(
target=self._monitor_session,
args=(connection_params,)
)
monitor_thread.daemon = True
monitor_thread.start()
# Launch VNC viewer
self._launch_viewer(connection_params)
return True
except Exception as e:
self.logger.error(f"Connection failed: {e}")
return False
def _authenticate_user(self, credentials: Dict) -> bool:
"""Authenticate user against enterprise directory"""
auth_endpoint = self.config['auth_server']['endpoint']
try:
response = requests.post(
f"{auth_endpoint}/authenticate",
json={
'username': credentials['username'],
'password': credentials['password'],
'domain': credentials.get('domain', 'default')
},
timeout=10,
verify=self.config['auth_server']['verify_ssl']
)
if response.status_code == 200:
auth_data = response.json()
self.auth_token = auth_data['token']
return True
except Exception as e:
self.logger.error(f"Authentication error: {e}")
return False
def _verify_mfa(self, username: str) -> bool:
"""Verify MFA token"""
# Create MFA dialog
mfa_dialog = MFADialog()
mfa_token = mfa_dialog.get_token()
if not mfa_token:
return False
# Verify token
try:
response = requests.post(
f"{self.config['auth_server']['endpoint']}/verify_mfa",
json={
'username': username,
'token': mfa_token
},
headers={'Authorization': f'Bearer {self.auth_token}'},
timeout=10
)
return response.status_code == 200
except Exception as e:
self.logger.error(f"MFA verification error: {e}")
return False
def _setup_connection(self, target_info: Dict) -> Dict:
"""Setup secure VNC connection"""
params = {
'host': target_info['host'],
'port': target_info['port'],
'password_file': None,
'local_port': None
}
# Create password file
if target_info.get('password'):
params['password_file'] = self._create_password_file(
target_info['password']
)
# Setup SSH tunnel if required
if target_info.get('tunnel_required', True):
params['local_port'] = self._create_ssh_tunnel(target_info)
params['host'] = 'localhost'
params['port'] = params['local_port']
return params
def _create_ssh_tunnel(self, target_info: Dict) -> int:
"""Create SSH tunnel for VNC connection"""
import random
# Find available local port
local_port = random.randint(15900, 15999)
# Build SSH command
ssh_cmd = [
'ssh',
'-f', '-N',
'-o', 'StrictHostKeyChecking=yes',
'-o', 'UserKnownHostsFile=/etc/vnc/known_hosts',
'-o', 'ServerAliveInterval=30',
'-o', 'ExitOnForwardFailure=yes',
'-i', self.config['ssh_key'],
'-L', f"{local_port}:localhost:{target_info['port']}",
f"{target_info['ssh_user']}@{target_info['host']}"
]
# Execute SSH tunnel
result = subprocess.run(ssh_cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"SSH tunnel failed: {result.stderr}")
self.logger.info(f"SSH tunnel established on port {local_port}")
return local_port
def _launch_viewer(self, params: Dict):
"""Launch VNC viewer with security settings"""
# Build viewer command
viewer_cmd = [
self.config['vnc_viewer']['binary'],
'-scale', str(self.config['vnc_viewer']['scale']),
'-encodings', 'tight zrle hextile',
'-compresslevel', '9',
'-quality', '8'
]
# Add password file if exists
if params['password_file']:
viewer_cmd.extend(['-passwd', params['password_file']])
# Add target
viewer_cmd.append(f"{params['host']}:{params['port']}")
# Launch viewer
self.logger.info(f"Launching VNC viewer: {' '.join(viewer_cmd)}")
self.start_time = datetime.now()
# Run viewer
subprocess.run(viewer_cmd)
# Cleanup
self._cleanup_session(params)
def _monitor_session(self, params: Dict):
"""Monitor VNC session"""
while True:
try:
# Collect metrics
metrics = {
'timestamp': datetime.now(),
'cpu_usage': psutil.cpu_percent(interval=1),
'memory_usage': psutil.virtual_memory().percent,
'network_bytes_sent': psutil.net_io_counters().bytes_sent,
'network_bytes_recv': psutil.net_io_counters().bytes_recv
}
# Check for idle timeout
if self._check_idle_timeout():
self.logger.warning("Session idle timeout reached")
self._terminate_session()
break
# Send metrics
self._send_metrics(metrics)
time.sleep(10)
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
break
def _cleanup_session(self, params: Dict):
"""Cleanup session resources"""
# Remove password file
if params.get('password_file') and os.path.exists(params['password_file']):
os.unlink(params['password_file'])
# Close SSH tunnel
if params.get('local_port'):
self._close_ssh_tunnel(params['local_port'])
# End session
if self.session_id:
self._end_session()
class MFADialog:
"""MFA token input dialog"""
def __init__(self):
self.token = None
def get_token(self) -> Optional[str]:
"""Show MFA dialog and get token"""
root = tk.Tk()
root.title("Multi-Factor Authentication")
root.geometry("300x150")
# Create UI elements
tk.Label(root, text="Enter MFA Token:").pack(pady=10)
token_entry = tk.Entry(root, width=20, font=("Arial", 14))
token_entry.pack(pady=10)
token_entry.focus()
def submit():
self.token = token_entry.get()
root.destroy()
tk.Button(root, text="Submit", command=submit).pack(pady=10)
# Bind Enter key
root.bind('<Return>', lambda e: submit())
# Center window
root.update_idletasks()
x = (root.winfo_screenwidth() // 2) - (root.winfo_width() // 2)
y = (root.winfo_screenheight() // 2) - (root.winfo_height() // 2)
root.geometry(f"+{x}+{y}")
root.mainloop()
return self.token
Multi-Server Console Automation
Parallel Console Manager
Manage multiple VNC sessions efficiently:
#!/usr/bin/env python3
"""
Multi-Server VNC Console Manager
Manage multiple simultaneous VNC sessions
"""
import asyncio
import concurrent.futures
from typing import Dict, List, Optional, Set
import json
import yaml
import logging
from datetime import datetime
import psutil
import resource
class MultiConsoleManager:
"""Manage multiple VNC console sessions"""
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.active_sessions = {}
self.session_limits = self._calculate_session_limits()
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.session_limits['max_concurrent']
)
def _calculate_session_limits(self) -> Dict:
"""Calculate session limits based on system resources"""
# Get system resources
cpu_count = psutil.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
# Calculate limits
limits = {
'max_concurrent': min(
cpu_count * 2, # 2 sessions per CPU
int(memory_gb / 0.5), # 500MB per session
self.config['limits']['max_sessions']
),
'max_per_user': self.config['limits']['max_per_user'],
'max_bandwidth_mbps': self.config['limits']['max_bandwidth_mbps']
}
return limits
async def open_consoles(self, targets: List[str], user: str) -> Dict:
"""Open multiple console sessions"""
results = {
'requested': len(targets),
'opened': 0,
'failed': 0,
'sessions': []
}
# Check user limits
current_user_sessions = self._count_user_sessions(user)
available_slots = self.session_limits['max_per_user'] - current_user_sessions
if available_slots <= 0:
results['error'] = f"User session limit reached ({self.session_limits['max_per_user']})"
return results
# Limit targets to available slots
targets_to_open = targets[:available_slots]
# Open sessions in parallel
tasks = []
for target in targets_to_open:
task = self._open_single_console(target, user)
tasks.append(task)
# Wait for all sessions
session_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(session_results):
if isinstance(result, Exception):
results['failed'] += 1
results['sessions'].append({
'target': targets_to_open[i],
'status': 'failed',
'error': str(result)
})
else:
results['opened'] += 1
results['sessions'].append(result)
return results
async def _open_single_console(self, target: str, user: str) -> Dict:
"""Open single console session"""
session_id = f"{user}_{target}_{datetime.now().timestamp()}"
try:
# Get target configuration
target_config = await self._get_target_config(target)
# Create session
session = VNCSession(
session_id=session_id,
user=user,
target=target,
config=target_config
)
# Start session
await session.start()
# Track session
self.active_sessions[session_id] = session
return {
'target': target,
'status': 'connected',
'session_id': session_id,
'connection_info': session.get_connection_info()
}
except Exception as e:
self.logger.error(f"Failed to open console for {target}: {e}")
raise
def _count_user_sessions(self, user: str) -> int:
"""Count active sessions for user"""
count = 0
for session_id, session in self.active_sessions.items():
if session.user == user and session.is_active():
count += 1
return count
class VNCSession:
"""Individual VNC session management"""
def __init__(self, session_id: str, user: str, target: str, config: Dict):
self.session_id = session_id
self.user = user
self.target = target
self.config = config
self.process = None
self.tunnel_process = None
self.start_time = None
self.metrics = {
'bytes_sent': 0,
'bytes_received': 0,
'keystrokes': 0,
'mouse_events': 0
}
async def start(self):
"""Start VNC session"""
self.start_time = datetime.now()
# Setup tunnel if needed
if self.config.get('requires_tunnel'):
self.tunnel_process = await self._setup_tunnel()
# Launch VNC viewer
self.process = await self._launch_viewer()
# Start monitoring
asyncio.create_task(self._monitor())
async def _setup_tunnel(self) -> subprocess.Process:
"""Setup SSH tunnel for VNC"""
local_port = self._find_free_port()
tunnel_cmd = [
'ssh', '-N',
'-o', 'StrictHostKeyChecking=yes',
'-o', 'ServerAliveInterval=30',
'-L', f"{local_port}:localhost:{self.config['vnc_port']}",
f"{self.config['ssh_user']}@{self.config['host']}"
]
process = await asyncio.create_subprocess_exec(
*tunnel_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Update config with local port
self.config['local_port'] = local_port
return process
async def _launch_viewer(self) -> subprocess.Process:
"""Launch VNC viewer process"""
viewer_cmd = self._build_viewer_command()
process = await asyncio.create_subprocess_exec(
*viewer_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
return process
def _build_viewer_command(self) -> List[str]:
"""Build VNC viewer command"""
cmd = ['vncviewer']
# Add options
if self.config.get('password_file'):
cmd.extend(['-passwd', self.config['password_file']])
if self.config.get('scale'):
cmd.extend(['-scale', str(self.config['scale'])])
# Add quality settings
cmd.extend([
'-encodings', 'tight zrle hextile',
'-compresslevel', '9',
'-quality', '8'
])
# Add target
if self.config.get('local_port'):
cmd.append(f"localhost:{self.config['local_port']}")
else:
cmd.append(f"{self.config['host']}:{self.config['vnc_port']}")
return cmd
async def _monitor(self):
"""Monitor session health and metrics"""
while self.is_active():
try:
# Check process health
if self.process and self.process.returncode is not None:
await self.cleanup()
break
# Collect metrics
await self._collect_metrics()
# Check limits
if await self._check_limits():
await self.terminate("Limit exceeded")
break
await asyncio.sleep(5)
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
break
def is_active(self) -> bool:
"""Check if session is active"""
return self.process is not None and self.process.returncode is None
class ConsoleOrchestrator:
"""Orchestrate console sessions across teams"""
def __init__(self):
self.policies = self._load_policies()
self.scheduler = AsyncIOScheduler()
def _load_policies(self) -> Dict:
"""Load console access policies"""
return {
'maintenance_windows': {
'production': {
'days': ['sunday'],
'hours': [(2, 6)], # 2 AM - 6 AM
'max_concurrent': 5
},
'development': {
'days': ['any'],
'hours': [(0, 24)],
'max_concurrent': 20
}
},
'priority_groups': {
'oncall': 100,
'sre': 80,
'developers': 50,
'readonly': 10
},
'session_limits': {
'max_duration_minutes': {
'production': 120,
'development': 480
},
'idle_timeout_minutes': {
'production': 15,
'development': 60
}
}
}
async def request_console_access(self, request: Dict) -> Dict:
"""Process console access request"""
# Validate request
validation = self._validate_request(request)
if not validation['valid']:
return {
'approved': False,
'reason': validation['reason']
}
# Check maintenance windows
if not self._check_maintenance_window(request):
return {
'approved': False,
'reason': 'Outside maintenance window',
'next_window': self._get_next_window(request['environment'])
}
# Check concurrent session limits
if not await self._check_session_availability(request):
# Queue request
queue_position = await self._queue_request(request)
return {
'approved': False,
'reason': 'Session limit reached',
'queued': True,
'queue_position': queue_position
}
# Approve and allocate session
session_info = await self._allocate_session(request)
return {
'approved': True,
'session_info': session_info,
'expires_at': self._calculate_expiry(request)
}
Session Recording and Compliance
Comprehensive Session Recording
Implement session recording for compliance:
#!/usr/bin/env python3
"""
VNC Session Recording and Compliance System
Records and manages VNC sessions for audit and compliance
"""
import os
import struct
import zlib
import json
import hashlib
from typing import Dict, List, Optional, Tuple
import asyncio
from datetime import datetime
import cv2
import numpy as np
class VNCRecorder:
"""Record VNC sessions with compression and indexing"""
def __init__(self, session_id: str, metadata: Dict):
self.session_id = session_id
self.metadata = metadata
self.recording_path = f"/var/vnc_recordings/{session_id}"
self.index_file = None
self.data_file = None
self.frame_index = []
self.start_time = datetime.utcnow()
async def start_recording(self):
"""Initialize recording session"""
# Create recording directory
os.makedirs(self.recording_path, exist_ok=True)
# Open files
self.data_file = open(f"{self.recording_path}/session.dat", 'wb')
self.index_file = open(f"{self.recording_path}/index.json", 'w')
# Write metadata
metadata = {
'session_id': self.session_id,
'start_time': self.start_time.isoformat(),
'user': self.metadata['user'],
'target': self.metadata['target'],
'client_ip': self.metadata.get('client_ip'),
'recording_version': '2.0'
}
json.dump(metadata, self.index_file)
self.index_file.write('\n')
async def record_frame(self, frame_data: bytes, frame_type: str = 'full'):
"""Record a single frame"""
timestamp = (datetime.utcnow() - self.start_time).total_seconds()
# Compress frame
compressed = zlib.compress(frame_data, level=6)
# Calculate checksum
checksum = hashlib.sha256(frame_data).hexdigest()[:16]
# Write frame header
header = struct.pack(
'>IIQH16s',
len(compressed), # Compressed size
len(frame_data), # Original size
int(timestamp * 1000), # Timestamp in ms
1 if frame_type == 'full' else 2, # Frame type
checksum.encode('ascii') # Checksum
)
# Write to data file
offset = self.data_file.tell()
self.data_file.write(header)
self.data_file.write(compressed)
# Update index
self.frame_index.append({
'offset': offset,
'timestamp': timestamp,
'type': frame_type,
'size': len(compressed)
})
async def record_event(self, event_type: str, event_data: Dict):
"""Record user interaction event"""
event = {
'timestamp': (datetime.utcnow() - self.start_time).total_seconds(),
'type': event_type,
'data': event_data
}
# Write to index
self.index_file.write(json.dumps({'event': event}) + '\n')
self.index_file.flush()
async def stop_recording(self):
"""Finalize recording"""
if self.data_file:
self.data_file.close()
if self.index_file:
# Write frame index
self.index_file.write(json.dumps({
'frame_index': self.frame_index,
'end_time': datetime.utcnow().isoformat(),
'duration': (datetime.utcnow() - self.start_time).total_seconds()
}))
self.index_file.close()
# Generate summary
await self._generate_summary()
async def _generate_summary(self):
"""Generate recording summary with thumbnails"""
summary = {
'session_id': self.session_id,
'duration': (datetime.utcnow() - self.start_time).total_seconds(),
'frame_count': len(self.frame_index),
'file_size': os.path.getsize(f"{self.recording_path}/session.dat"),
'thumbnails': []
}
# Generate thumbnails at key points
thumbnail_times = [0, 0.25, 0.5, 0.75, 1.0] # Relative positions
for rel_time in thumbnail_times:
frame_idx = int(len(self.frame_index) * rel_time)
if frame_idx < len(self.frame_index):
thumbnail = await self._extract_thumbnail(frame_idx)
if thumbnail:
summary['thumbnails'].append({
'time': self.frame_index[frame_idx]['timestamp'],
'image': thumbnail
})
# Save summary
with open(f"{self.recording_path}/summary.json", 'w') as f:
json.dump(summary, f, indent=2)
class SessionPlayer:
"""Play back recorded VNC sessions"""
def __init__(self, recording_path: str):
self.recording_path = recording_path
self.metadata = None
self.frame_index = None
self.data_file = None
async def load_recording(self):
"""Load recording metadata and index"""
# Load index file
with open(f"{self.recording_path}/index.json", 'r') as f:
lines = f.readlines()
# Parse metadata
self.metadata = json.loads(lines[0])
# Parse events and frame index
self.events = []
for line in lines[1:]:
data = json.loads(line)
if 'event' in data:
self.events.append(data['event'])
elif 'frame_index' in data:
self.frame_index = data['frame_index']
# Open data file
self.data_file = open(f"{self.recording_path}/session.dat", 'rb')
async def play(self, speed: float = 1.0, start_time: float = 0):
"""Play recording at specified speed"""
# Find starting frame
start_frame = 0
for i, frame in enumerate(self.frame_index):
if frame['timestamp'] >= start_time:
start_frame = i
break
# Create window
cv2.namedWindow('VNC Recording Playback', cv2.WINDOW_NORMAL)
# Play frames
for i in range(start_frame, len(self.frame_index)):
frame_info = self.frame_index[i]
# Read frame
frame_data = await self._read_frame(frame_info['offset'])
# Decompress and display
if frame_data:
img = self._decode_frame(frame_data)
cv2.imshow('VNC Recording Playback', img)
# Calculate delay
if i < len(self.frame_index) - 1:
next_time = self.frame_index[i + 1]['timestamp']
delay = int((next_time - frame_info['timestamp']) * 1000 / speed)
else:
delay = 33 # ~30 FPS
# Check for quit
if cv2.waitKey(delay) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
async def export_video(self, output_file: str, fps: int = 30):
"""Export recording as video file"""
# Determine video dimensions from first frame
first_frame = await self._read_frame(self.frame_index[0]['offset'])
img = self._decode_frame(first_frame)
height, width = img.shape[:2]
# Create video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (width, height))
# Write frames
current_time = 0
frame_interval = 1.0 / fps
for frame_info in self.frame_index:
# Skip frames if needed to maintain FPS
if frame_info['timestamp'] < current_time:
continue
# Read and decode frame
frame_data = await self._read_frame(frame_info['offset'])
if frame_data:
img = self._decode_frame(frame_data)
# Write frame(s) to maintain timing
while current_time <= frame_info['timestamp']:
out.write(img)
current_time += frame_interval
out.release()
class ComplianceManager:
"""Manage VNC session compliance and retention"""
def __init__(self, config: Dict):
self.config = config
self.retention_policies = config['retention_policies']
async def enforce_retention(self):
"""Enforce retention policies on recordings"""
recordings_dir = self.config['recordings_directory']
for session_id in os.listdir(recordings_dir):
session_path = os.path.join(recordings_dir, session_id)
if os.path.isdir(session_path):
# Load session metadata
metadata = await self._load_session_metadata(session_path)
if metadata:
# Check retention policy
if await self._should_delete(metadata):
await self._delete_recording(session_path)
elif await self._should_archive(metadata):
await self._archive_recording(session_path)
async def _should_delete(self, metadata: Dict) -> bool:
"""Check if recording should be deleted"""
# Get applicable retention policy
policy = self._get_retention_policy(metadata)
# Calculate age
start_time = datetime.fromisoformat(metadata['start_time'])
age_days = (datetime.utcnow() - start_time).days
return age_days > policy['retention_days']
async def _should_archive(self, metadata: Dict) -> bool:
"""Check if recording should be archived"""
policy = self._get_retention_policy(metadata)
if not policy.get('archive_after_days'):
return False
start_time = datetime.fromisoformat(metadata['start_time'])
age_days = (datetime.utcnow() - start_time).days
return age_days > policy['archive_after_days']
def _get_retention_policy(self, metadata: Dict) -> Dict:
"""Get applicable retention policy"""
# Check for compliance flags
if metadata.get('compliance_flag'):
return self.retention_policies['compliance']
# Check environment
target = metadata.get('target', '')
if 'prod' in target.lower():
return self.retention_policies['production']
else:
return self.retention_policies['default']
async def generate_compliance_report(self, start_date: datetime,
end_date: datetime) -> Dict:
"""Generate compliance report for date range"""
report = {
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'total_sessions': 0,
'sessions_by_user': {},
'sessions_by_target': {},
'compliance_violations': [],
'storage_usage': 0
}
# Analyze all sessions in date range
sessions = await self._get_sessions_in_range(start_date, end_date)
for session in sessions:
report['total_sessions'] += 1
# Count by user
user = session['metadata']['user']
report['sessions_by_user'][user] = \
report['sessions_by_user'].get(user, 0) + 1
# Count by target
target = session['metadata']['target']
report['sessions_by_target'][target] = \
report['sessions_by_target'].get(target, 0) + 1
# Check for violations
violations = await self._check_compliance_violations(session)
report['compliance_violations'].extend(violations)
# Add storage usage
report['storage_usage'] += session.get('file_size', 0)
return report
Performance Optimization
VNC Performance Tuning
Optimize VNC performance for enterprise use:
#!/usr/bin/env python3
"""
VNC Performance Optimization System
Dynamically optimize VNC connections for best performance
"""
import asyncio
import psutil
import statistics
from typing import Dict, List, Optional, Tuple
import json
import logging
from datetime import datetime, timedelta
class VNCPerformanceOptimizer:
"""Optimize VNC performance based on network and system conditions"""
def __init__(self):
self.metrics_history = []
self.optimization_profiles = self._load_profiles()
self.current_profile = 'balanced'
def _load_profiles(self) -> Dict:
"""Load optimization profiles"""
return {
'high_quality': {
'encoding': 'tight',
'compression': 6,
'quality': 9,
'color_depth': 24,
'resolution_scale': 1.0,
'frame_rate_limit': 60
},
'balanced': {
'encoding': 'tight',
'compression': 8,
'quality': 7,
'color_depth': 16,
'resolution_scale': 1.0,
'frame_rate_limit': 30
},
'low_bandwidth': {
'encoding': 'zrle',
'compression': 9,
'quality': 5,
'color_depth': 8,
'resolution_scale': 0.75,
'frame_rate_limit': 15
},
'minimal': {
'encoding': 'hextile',
'compression': 9,
'quality': 3,
'color_depth': 8,
'resolution_scale': 0.5,
'frame_rate_limit': 10
}
}
async def monitor_performance(self, connection_id: str):
"""Monitor VNC connection performance"""
while True:
try:
# Collect metrics
metrics = await self._collect_metrics(connection_id)
self.metrics_history.append(metrics)
# Keep only recent history (5 minutes)
cutoff = datetime.now() - timedelta(minutes=5)
self.metrics_history = [
m for m in self.metrics_history
if m['timestamp'] > cutoff
]
# Analyze and optimize
if len(self.metrics_history) >= 10:
optimization = await self._analyze_and_optimize()
if optimization:
await self._apply_optimization(connection_id, optimization)
await asyncio.sleep(5)
except Exception as e:
self.logger.error(f"Performance monitoring error: {e}")
await asyncio.sleep(10)
async def _collect_metrics(self, connection_id: str) -> Dict:
"""Collect performance metrics"""
# Network metrics
net_io = psutil.net_io_counters()
# Calculate bandwidth usage
if hasattr(self, '_last_net_io'):
bytes_sent = net_io.bytes_sent - self._last_net_io.bytes_sent
bytes_recv = net_io.bytes_recv - self._last_net_io.bytes_recv
bandwidth_mbps = (bytes_sent + bytes_recv) * 8 / 1e6 / 5 # 5 second interval
else:
bandwidth_mbps = 0
self._last_net_io = net_io
# Get connection-specific metrics
conn_metrics = await self._get_connection_metrics(connection_id)
return {
'timestamp': datetime.now(),
'bandwidth_mbps': bandwidth_mbps,
'latency_ms': conn_metrics.get('latency', 0),
'packet_loss': conn_metrics.get('packet_loss', 0),
'cpu_usage': psutil.cpu_percent(interval=1),
'memory_usage': psutil.virtual_memory().percent,
'frame_rate': conn_metrics.get('frame_rate', 0),
'compression_ratio': conn_metrics.get('compression_ratio', 1.0)
}
async def _analyze_and_optimize(self) -> Optional[Dict]:
"""Analyze metrics and determine optimization"""
# Calculate averages
avg_bandwidth = statistics.mean(m['bandwidth_mbps'] for m in self.metrics_history)
avg_latency = statistics.mean(m['latency_ms'] for m in self.metrics_history)
avg_packet_loss = statistics.mean(m['packet_loss'] for m in self.metrics_history)
avg_cpu = statistics.mean(m['cpu_usage'] for m in self.metrics_history)
# Determine optimal profile
new_profile = self.current_profile
if avg_packet_loss > 5 or avg_latency > 200:
new_profile = 'minimal'
elif avg_bandwidth < 1 or avg_latency > 100:
new_profile = 'low_bandwidth'
elif avg_bandwidth > 10 and avg_latency < 50 and avg_cpu < 50:
new_profile = 'high_quality'
else:
new_profile = 'balanced'
# Check if change needed
if new_profile != self.current_profile:
self.logger.info(f"Switching profile: {self.current_profile} -> {new_profile}")
self.current_profile = new_profile
return self.optimization_profiles[new_profile]
return None
async def _apply_optimization(self, connection_id: str, optimization: Dict):
"""Apply optimization settings to connection"""
# Build VNC parameters
params = []
# Encoding
params.append(f"-encodings {optimization['encoding']}")
# Compression
params.append(f"-compresslevel {optimization['compression']}")
# Quality
params.append(f"-quality {optimization['quality']}")
# Color depth
if optimization['color_depth'] < 24:
params.append(f"-depth {optimization['color_depth']}")
# Resolution scaling
if optimization['resolution_scale'] < 1.0:
params.append(f"-scale {optimization['resolution_scale']}")
# Apply to connection
await self._update_connection_params(connection_id, params)
class NetworkOptimizer:
"""Network-level optimization for VNC traffic"""
def __init__(self):
self.qos_rules = self._initialize_qos()
def _initialize_qos(self) -> Dict:
"""Initialize QoS rules for VNC traffic"""
return {
'vnc_priority': {
'dscp': 26, # AF31 - Assured Forwarding
'bandwidth_guarantee_mbps': 2,
'bandwidth_limit_mbps': 10,
'packet_priority': 'medium-high'
},
'console_priority': {
'dscp': 34, # AF41 - Higher priority for console
'bandwidth_guarantee_mbps': 5,
'bandwidth_limit_mbps': 20,
'packet_priority': 'high'
}
}
async def configure_network_qos(self, interface: str):
"""Configure network QoS for VNC traffic"""
# Create traffic control hierarchy
commands = [
# Root qdisc
f"tc qdisc add dev {interface} root handle 1: htb default 30",
# Main class
f"tc class add dev {interface} parent 1: classid 1:1 htb rate 1000mbit",
# VNC traffic class
f"tc class add dev {interface} parent 1:1 classid 1:10 htb "
f"rate {self.qos_rules['vnc_priority']['bandwidth_guarantee_mbps']}mbit "
f"ceil {self.qos_rules['vnc_priority']['bandwidth_limit_mbps']}mbit prio 2",
# Console traffic class (higher priority)
f"tc class add dev {interface} parent 1:1 classid 1:20 htb "
f"rate {self.qos_rules['console_priority']['bandwidth_guarantee_mbps']}mbit "
f"ceil {self.qos_rules['console_priority']['bandwidth_limit_mbps']}mbit prio 1",
# Add filters for VNC ports
f"tc filter add dev {interface} parent 1: protocol ip prio 1 u32 "
f"match ip dport 5900 0xfff0 flowid 1:10",
# Add DSCP marking
f"iptables -t mangle -A POSTROUTING -o {interface} -p tcp "
f"--dport 5900:5999 -j DSCP --set-dscp {self.qos_rules['vnc_priority']['dscp']}"
]
for cmd in commands:
await self._execute_command(cmd)
class CachingProxy:
"""Caching proxy for VNC static content"""
def __init__(self, cache_size_mb: int = 100):
self.cache_size = cache_size_mb * 1024 * 1024
self.cache = {}
self.cache_stats = {
'hits': 0,
'misses': 0,
'bytes_saved': 0
}
async def handle_frame(self, frame_data: bytes) -> Tuple[bytes, bool]:
"""Handle frame with caching"""
# Calculate frame hash
frame_hash = hashlib.md5(frame_data).hexdigest()
# Check cache
if frame_hash in self.cache:
self.cache_stats['hits'] += 1
self.cache_stats['bytes_saved'] += len(frame_data)
# Update LRU
self.cache[frame_hash]['last_used'] = datetime.now()
return (frame_hash.encode(), True) # Return hash instead of data
else:
self.cache_stats['misses'] += 1
# Add to cache if space available
if self._get_cache_size() + len(frame_data) > self.cache_size:
await self._evict_lru()
self.cache[frame_hash] = {
'data': frame_data,
'size': len(frame_data),
'last_used': datetime.now(),
'hit_count': 0
}
return (frame_data, False)
def _get_cache_size(self) -> int:
"""Calculate current cache size"""
return sum(entry['size'] for entry in self.cache.values())
async def _evict_lru(self):
"""Evict least recently used entries"""
# Sort by last used time
sorted_entries = sorted(
self.cache.items(),
key=lambda x: x[1]['last_used']
)
# Evict until we have 10% free space
target_size = self.cache_size * 0.9
current_size = self._get_cache_size()
for frame_hash, entry in sorted_entries:
if current_size <= target_size:
break
del self.cache[frame_hash]
current_size -= entry['size']
Integration with Management Systems
Comprehensive Integration Framework
Integrate VNC management with enterprise systems:
#!/usr/bin/env python3
"""
VNC Enterprise Integration Framework
Integrate VNC with ITSM, DCIM, and monitoring platforms
"""
import asyncio
import aiohttp
from typing import Dict, List, Optional
import json
import logging
from datetime import datetime
class EnterpriseVNCIntegration:
"""Master integration class for VNC management"""
def __init__(self, config: Dict):
self.config = config
self.integrations = {}
self._setup_integrations()
def _setup_integrations(self):
"""Initialize all integrations"""
# ServiceNow Integration
if self.config.get('servicenow'):
self.integrations['servicenow'] = ServiceNowIntegration(
self.config['servicenow']
)
# DCIM Integration
if self.config.get('dcim'):
self.integrations['dcim'] = DCIMIntegration(
self.config['dcim']
)
# Monitoring Integration
if self.config.get('monitoring'):
self.integrations['monitoring'] = MonitoringIntegration(
self.config['monitoring']
)
# LDAP/AD Integration
if self.config.get('ldap'):
self.integrations['ldap'] = LDAPIntegration(
self.config['ldap']
)
async def on_session_start(self, session_info: Dict):
"""Handle session start event"""
# Create incident ticket if required
if session_info.get('create_ticket'):
ticket = await self.integrations['servicenow'].create_access_ticket(
session_info
)
session_info['ticket_number'] = ticket['number']
# Update DCIM
await self.integrations['dcim'].update_console_status(
session_info['target'],
'in_use',
session_info['user']
)
# Send monitoring event
await self.integrations['monitoring'].send_event({
'event_type': 'vnc_session_start',
'user': session_info['user'],
'target': session_info['target'],
'source_ip': session_info.get('client_ip')
})
async def on_session_end(self, session_info: Dict):
"""Handle session end event"""
# Update ticket
if session_info.get('ticket_number'):
await self.integrations['servicenow'].update_ticket(
session_info['ticket_number'],
{
'state': 'resolved',
'close_notes': f"VNC session ended. Duration: {session_info['duration']}"
}
)
# Update DCIM
await self.integrations['dcim'].update_console_status(
session_info['target'],
'available'
)
# Log session
await self._log_session(session_info)
class ServiceNowIntegration:
"""ServiceNow ITSM integration"""
def __init__(self, config: Dict):
self.config = config
self.base_url = f"https://{config['instance']}.service-now.com/api/now"
self.auth = aiohttp.BasicAuth(config['username'], config['password'])
async def create_access_ticket(self, session_info: Dict) -> Dict:
"""Create access request ticket"""
ticket_data = {
'short_description': f"VNC Access: {session_info['target']}",
'description': f"""
VNC Console Access Request
User: {session_info['user']}
Target: {session_info['target']}
Purpose: {session_info.get('purpose', 'Console access')}
Start Time: {session_info['start_time']}
This ticket tracks VNC console access for audit purposes.
""",
'category': 'Access Request',
'subcategory': 'Console Access',
'priority': session_info.get('priority', 4),
'assignment_group': 'Data Center Operations',
'caller_id': session_info['user']
}
async with aiohttp.ClientSession(auth=self.auth) as session:
async with session.post(
f"{self.base_url}/table/incident",
json=ticket_data,
headers={'Content-Type': 'application/json'}
) as response:
result = await response.json()
return result['result']
async def check_maintenance_window(self, target: str) -> bool:
"""Check if target is in maintenance window"""
query = f"cmdb_ci={target}^active=true^type=Maintenance"
async with aiohttp.ClientSession(auth=self.auth) as session:
async with session.get(
f"{self.base_url}/table/change_request",
params={'sysparm_query': query}
) as response:
result = await response.json()
return len(result['result']) > 0
class DCIMIntegration:
"""Data Center Infrastructure Management integration"""
def __init__(self, config: Dict):
self.config = config
self.api_key = config['api_key']
self.base_url = config['base_url']
async def get_server_info(self, hostname: str) -> Dict:
"""Get server information from DCIM"""
headers = {'X-API-Key': self.api_key}
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/api/dcim/devices/",
params={'name': hostname},
headers=headers
) as response:
result = await response.json()
if result['count'] > 0:
device = result['results'][0]
return {
'id': device['id'],
'name': device['name'],
'serial': device['serial'],
'model': device['device_type']['model'],
'location': device['site']['name'],
'rack': device['rack']['name'] if device['rack'] else None,
'position': device['position'],
'status': device['status']['value'],
'primary_ip': device['primary_ip']['address'] if device['primary_ip'] else None,
'oob_ip': device.get('oob_ip', {}).get('address')
}
return None
async def update_console_status(self, hostname: str, status: str, user: str = None):
"""Update console access status in DCIM"""
device = await self.get_server_info(hostname)
if device:
custom_fields = {
'console_status': status,
'console_user': user,
'console_updated': datetime.utcnow().isoformat()
}
headers = {'X-API-Key': self.api_key}
async with aiohttp.ClientSession() as session:
await session.patch(
f"{self.base_url}/api/dcim/devices/{device['id']}/",
json={'custom_fields': custom_fields},
headers=headers
)
class MonitoringIntegration:
"""Integration with monitoring systems"""
def __init__(self, config: Dict):
self.config = config
self.prometheus_gateway = config.get('prometheus_pushgateway')
self.elasticsearch_url = config.get('elasticsearch_url')
async def send_metrics(self, metrics: Dict):
"""Send metrics to monitoring systems"""
# Prometheus metrics
if self.prometheus_gateway:
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
registry = CollectorRegistry()
# Create gauges
active_sessions = Gauge(
'vnc_active_sessions',
'Number of active VNC sessions',
['environment'],
registry=registry
)
session_duration = Gauge(
'vnc_session_duration_seconds',
'VNC session duration',
['user', 'target'],
registry=registry
)
# Set values
active_sessions.labels(
environment=metrics.get('environment', 'default')
).set(metrics['active_sessions'])
if 'session_duration' in metrics:
session_duration.labels(
user=metrics['user'],
target=metrics['target']
).set(metrics['session_duration'])
# Push to gateway
push_to_gateway(
self.prometheus_gateway,
job='vnc_monitoring',
registry=registry
)
async def send_event(self, event: Dict):
"""Send event to logging system"""
if self.elasticsearch_url:
# Add metadata
event['@timestamp'] = datetime.utcnow().isoformat()
event['@version'] = '1'
event['type'] = 'vnc_event'
# Send to Elasticsearch
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.elasticsearch_url}/vnc-events/_doc",
json=event,
headers={'Content-Type': 'application/json'}
)
class LDAPIntegration:
"""LDAP/Active Directory integration"""
def __init__(self, config: Dict):
self.config = config
import ldap3
self.server = ldap3.Server(
config['server'],
port=config.get('port', 389),
use_ssl=config.get('use_ssl', False)
)
async def authenticate_user(self, username: str, password: str) -> bool:
"""Authenticate user against LDAP"""
import ldap3
# Construct DN
user_dn = f"{self.config['user_dn_prefix']}{username},{self.config['base_dn']}"
try:
conn = ldap3.Connection(
self.server,
user=user_dn,
password=password,
auto_bind=True
)
conn.unbind()
return True
except:
return False
async def get_user_groups(self, username: str) -> List[str]:
"""Get user's group memberships"""
import ldap3
# Bind with service account
conn = ldap3.Connection(
self.server,
user=self.config['bind_dn'],
password=self.config['bind_password'],
auto_bind=True
)
# Search for user
search_filter = f"(&(objectClass=user)(sAMAccountName={username}))"
conn.search(
search_base=self.config['base_dn'],
search_filter=search_filter,
attributes=['memberOf']
)
groups = []
if conn.entries:
user = conn.entries[0]
for group_dn in user.memberOf:
# Extract group name from DN
group_name = group_dn.split(',')[0].split('=')[1]
groups.append(group_name)
conn.unbind()
return groups
async def check_vnc_permission(self, username: str, target: str) -> bool:
"""Check if user has VNC access permission"""
# Get user groups
groups = await self.get_user_groups(username)
# Check VNC access groups
vnc_groups = self.config.get('vnc_access_groups', [])
# Check if user is in any VNC access group
for group in groups:
if group in vnc_groups:
# Check target-specific permissions
if await self._check_target_permission(group, target):
return True
return False
# Orchestration script
ORCHESTRATION_SCRIPT = """
#!/usr/bin/env python3
# VNC Session Orchestration
import asyncio
import sys
import json
from vnc_integration import EnterpriseVNCIntegration
async def main():
# Load configuration
with open('/etc/vnc/integration.json', 'r') as f:
config = json.load(f)
# Initialize integration
integration = EnterpriseVNCIntegration(config)
# Parse command
command = sys.argv[1] if len(sys.argv) > 1 else 'help'
if command == 'connect':
# Get parameters
user = sys.argv[2]
target = sys.argv[3]
# Check permissions
if not await integration.check_access(user, target):
print("Access denied")
sys.exit(1)
# Create session
session_info = {
'user': user,
'target': target,
'start_time': datetime.utcnow().isoformat(),
'client_ip': os.environ.get('SSH_CLIENT', '').split()[0]
}
# Handle session lifecycle
await integration.on_session_start(session_info)
try:
# Launch VNC
await launch_vnc_session(session_info)
finally:
# Cleanup
await integration.on_session_end(session_info)
elif command == 'list':
# List available targets
targets = await integration.get_available_targets(sys.argv[2])
for target in targets:
print(f"{target['name']:<30} {target['status']:<10} {target['location']}")
else:
print("Usage: vnc-connect {connect|list} [options]")
if __name__ == '__main__':
asyncio.run(main())
"""
Advanced Security Hardening
Comprehensive Security Implementation
Implement advanced security for VNC:
#!/bin/bash
# VNC Security Hardening Script
set -euo pipefail
# Configuration
SECURITY_CONFIG="/etc/vnc/security.conf"
AUDIT_LOG="/var/log/vnc_security_audit.log"
# Load security configuration
source "$SECURITY_CONFIG"
# Function to log security events
security_log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] SECURITY: $1" | tee -a "$AUDIT_LOG"
}
# Function to harden VNC server
harden_vnc_server() {
local server=$1
security_log "Hardening VNC server: $server"
# 1. Disable unencrypted connections
configure_encryption "$server"
# 2. Setup firewall rules
configure_firewall "$server"
# 3. Enable audit logging
enable_audit_logging "$server"
# 4. Configure session limits
configure_session_limits "$server"
# 5. Setup intrusion detection
setup_ids "$server"
}
# Function to configure encryption
configure_encryption() {
local server=$1
# Force TLS encryption
cat > "/etc/vnc/${server}_tls.conf" << EOF
# TLS Configuration for VNC
SecurityTypes=TLS,X509Plain
X509Cert=/etc/vnc/certs/${server}.crt
X509Key=/etc/vnc/certs/${server}.key
GnuTLSPriority=SECURE256:+SECURE128:-VERS-ALL:+VERS-TLS1.2:+VERS-TLS1.3
EOF
# Generate certificates if needed
if [ ! -f "/etc/vnc/certs/${server}.crt" ]; then
generate_tls_certificate "$server"
fi
security_log "TLS encryption configured for $server"
}
# Function to generate TLS certificate
generate_tls_certificate() {
local server=$1
mkdir -p /etc/vnc/certs
# Generate private key
openssl genrsa -out "/etc/vnc/certs/${server}.key" 4096
# Generate certificate
openssl req -new -x509 -days 365 -key "/etc/vnc/certs/${server}.key" \
-out "/etc/vnc/certs/${server}.crt" \
-subj "/C=US/ST=State/L=City/O=Organization/CN=${server}"
# Set permissions
chmod 600 "/etc/vnc/certs/${server}.key"
chmod 644 "/etc/vnc/certs/${server}.crt"
security_log "Generated TLS certificate for $server"
}
# Function to configure firewall
configure_firewall() {
local server=$1
# Default deny all
iptables -A INPUT -p tcp --dport 5900:5999 -j DROP
# Allow from management network only
for network in "${ALLOWED_NETWORKS[@]}"; do
iptables -I INPUT -p tcp --dport 5900:5999 -s "$network" -j ACCEPT
done
# Rate limiting
iptables -I INPUT -p tcp --dport 5900:5999 -m state --state NEW \
-m recent --set --name VNC --rsource
iptables -I INPUT -p tcp --dport 5900:5999 -m state --state NEW \
-m recent --update --seconds 60 --hitcount 10 --name VNC --rsource -j DROP
# Log blocked attempts
iptables -A INPUT -p tcp --dport 5900:5999 -j LOG --log-prefix "VNC-BLOCKED: "
security_log "Firewall rules configured for VNC"
}
# Function to enable audit logging
enable_audit_logging() {
local server=$1
# Configure auditd rules
cat >> /etc/audit/rules.d/vnc.rules << EOF
# VNC Access Auditing
-w /usr/bin/vncserver -p x -k vnc_server
-w /usr/bin/vncviewer -p x -k vnc_client
-w /etc/vnc/ -p wa -k vnc_config
-a always,exit -F arch=b64 -S connect -F a0=5900:5999 -k vnc_connection
EOF
# Reload audit rules
auditctl -R /etc/audit/rules.d/vnc.rules
# Setup log rotation
cat > /etc/logrotate.d/vnc-audit << EOF
/var/log/vnc_*.log {
daily
rotate 90
compress
delaycompress
notifempty
create 0600 root root
postrotate
/usr/bin/killall -HUP rsyslogd
endscript
}
EOF
security_log "Audit logging enabled for VNC"
}
# Function to setup intrusion detection
setup_ids() {
local server=$1
# Create AIDE configuration for VNC
cat >> /etc/aide/aide.conf.d/vnc << EOF
# VNC Integrity Monitoring
/usr/bin/vnc* f+p+u+g+s+m+c+md5+sha256
/etc/vnc/ f+p+u+g+s+m+c+md5+sha256
EOF
# Initialize AIDE database
aide --init
mv /var/lib/aide/aide.db.new /var/lib/aide/aide.db
# Setup fail2ban for VNC
cat > /etc/fail2ban/jail.d/vnc.conf << EOF
[vnc]
enabled = true
port = 5900:5999
filter = vnc-auth
logpath = /var/log/vnc_access.log
maxretry = 3
bantime = 3600
findtime = 600
EOF
cat > /etc/fail2ban/filter.d/vnc-auth.conf << EOF
[Definition]
failregex = .*Authentication failed from <HOST>.*
.*Invalid password from <HOST>.*
.*Connection refused from <HOST>.*
ignoreregex =
EOF
# Restart fail2ban
systemctl restart fail2ban
security_log "Intrusion detection configured for VNC"
}
# Function to perform security audit
security_audit() {
security_log "Starting VNC security audit"
echo "=== VNC Security Audit Report ===" > /tmp/vnc_audit.txt
echo "Date: $(date)" >> /tmp/vnc_audit.txt
echo "" >> /tmp/vnc_audit.txt
# Check for weak passwords
echo "1. Password Strength Check:" >> /tmp/vnc_audit.txt
check_password_strength >> /tmp/vnc_audit.txt
# Check encryption status
echo -e "\n2. Encryption Status:" >> /tmp/vnc_audit.txt
check_encryption_status >> /tmp/vnc_audit.txt
# Check open ports
echo -e "\n3. Open VNC Ports:" >> /tmp/vnc_audit.txt
netstat -tlnp | grep ':59' >> /tmp/vnc_audit.txt
# Check failed login attempts
echo -e "\n4. Failed Login Attempts (Last 24h):" >> /tmp/vnc_audit.txt
grep "Authentication failed" /var/log/vnc_access.log | tail -20 >> /tmp/vnc_audit.txt
# Check certificate expiration
echo -e "\n5. Certificate Status:" >> /tmp/vnc_audit.txt
check_certificate_expiry >> /tmp/vnc_audit.txt
security_log "Security audit completed"
# Email report if configured
if [ -n "$SECURITY_EMAIL" ]; then
mail -s "VNC Security Audit Report" "$SECURITY_EMAIL" < /tmp/vnc_audit.txt
fi
}
# Main execution
case "${1:-}" in
harden)
shift
harden_vnc_server "$@"
;;
audit)
security_audit
;;
monitor)
# Continuous monitoring mode
while true; do
check_security_violations
sleep 300 # Check every 5 minutes
done
;;
*)
echo "Usage: $0 {harden|audit|monitor} [options]"
exit 1
;;
esac
Troubleshooting Framework
Advanced Troubleshooting Tools
Comprehensive troubleshooting for VNC issues:
#!/usr/bin/env python3
"""
VNC Troubleshooting Framework
Advanced diagnostics and problem resolution
"""
import asyncio
import subprocess
import socket
import struct
import time
from typing import Dict, List, Optional, Tuple
import json
import psutil
import logging
class VNCTroubleshooter:
"""Comprehensive VNC troubleshooting system"""
def __init__(self):
self.diagnostics = []
self.logger = logging.getLogger(__name__)
async def diagnose_connection(self, host: str, port: int = 5901) -> Dict:
"""Run comprehensive connection diagnostics"""
diagnosis = {
'host': host,
'port': port,
'timestamp': datetime.utcnow(),
'tests': [],
'issues': [],
'recommendations': []
}
# Run diagnostic tests
tests = [
self._test_network_connectivity,
self._test_port_accessibility,
self._test_vnc_handshake,
self._test_authentication,
self._test_encryption,
self._test_performance,
self._test_firewall,
self._test_resource_availability
]
for test in tests:
result = await test(host, port)
diagnosis['tests'].append(result)
if not result['passed']:
diagnosis['issues'].extend(result.get('issues', []))
diagnosis['recommendations'].extend(result.get('recommendations', []))
# Generate summary
diagnosis['summary'] = self._generate_diagnosis_summary(diagnosis)
return diagnosis
async def _test_network_connectivity(self, host: str, port: int) -> Dict:
"""Test basic network connectivity"""
result = {
'test': 'network_connectivity',
'passed': False,
'details': {},
'issues': [],
'recommendations': []
}
try:
# Ping test
ping_result = subprocess.run(
['ping', '-c', '4', '-W', '2', host],
capture_output=True,
text=True
)
if ping_result.returncode == 0:
# Parse ping statistics
lines = ping_result.stdout.splitlines()
for line in lines:
if 'packet loss' in line:
packet_loss = float(line.split('%')[0].split()[-1])
result['details']['packet_loss'] = packet_loss
if 'min/avg/max' in line:
times = line.split('=')[1].split('/')
result['details']['latency_avg'] = float(times[1])
result['passed'] = result['details'].get('packet_loss', 100) < 5
if not result['passed']:
result['issues'].append(f"High packet loss: {packet_loss}%")
result['recommendations'].append("Check network connectivity and routing")
else:
result['issues'].append("Host unreachable")
result['recommendations'].append("Verify hostname/IP and network connectivity")
except Exception as e:
result['issues'].append(f"Network test failed: {str(e)}")
return result
async def _test_port_accessibility(self, host: str, port: int) -> Dict:
"""Test if VNC port is accessible"""
result = {
'test': 'port_accessibility',
'passed': False,
'details': {},
'issues': [],
'recommendations': []
}
try:
# Test TCP connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
start_time = time.time()
sock.connect((host, port))
connect_time = time.time() - start_time
result['details']['connect_time'] = connect_time
result['passed'] = True
sock.close()
except socket.timeout:
result['issues'].append("Connection timeout")
result['recommendations'].append("Check firewall rules and ensure VNC service is running")
except ConnectionRefused:
result['issues'].append("Connection refused")
result['recommendations'].append("Verify VNC server is running on the target port")
except Exception as e:
result['issues'].append(f"Connection failed: {str(e)}")
return result
async def _test_vnc_handshake(self, host: str, port: int) -> Dict:
"""Test VNC protocol handshake"""
result = {
'test': 'vnc_handshake',
'passed': False,
'details': {},
'issues': [],
'recommendations': []
}
try:
reader, writer = await asyncio.open_connection(host, port)
# Read RFB version
rfb_version = await asyncio.wait_for(reader.read(12), timeout=5)
result['details']['server_version'] = rfb_version.decode('ascii').strip()
# Check version
if rfb_version.startswith(b'RFB'):
# Send client version
writer.write(b'RFB 003.008\n')
await writer.drain()
# Read security types
num_types = struct.unpack('B', await reader.read(1))[0]
if num_types > 0:
security_types = struct.unpack(f'{num_types}B', await reader.read(num_types))
result['details']['security_types'] = list(security_types)
result['passed'] = True
else:
# Read error message
reason_length = struct.unpack('>I', await reader.read(4))[0]
reason = await reader.read(reason_length)
result['issues'].append(f"Server rejected connection: {reason.decode('utf-8')}")
else:
result['issues'].append("Invalid RFB protocol response")
writer.close()
await writer.wait_closed()
except asyncio.TimeoutError:
result['issues'].append("VNC handshake timeout")
result['recommendations'].append("Check if VNC server is properly configured")
except Exception as e:
result['issues'].append(f"Handshake failed: {str(e)}")
return result
async def _test_performance(self, host: str, port: int) -> Dict:
"""Test VNC performance characteristics"""
result = {
'test': 'performance',
'passed': True,
'details': {},
'issues': [],
'recommendations': []
}
try:
# Measure bandwidth to host
bandwidth = await self._measure_bandwidth(host)
result['details']['bandwidth_mbps'] = bandwidth
if bandwidth < 1:
result['passed'] = False
result['issues'].append(f"Low bandwidth: {bandwidth:.2f} Mbps")
result['recommendations'].append("Consider using compression or reducing color depth")
# Check system resources
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
result['details']['cpu_usage'] = cpu_usage
result['details']['memory_usage'] = memory_usage
if cpu_usage > 80:
result['issues'].append(f"High CPU usage: {cpu_usage}%")
result['recommendations'].append("Close unnecessary applications")
if memory_usage > 90:
result['issues'].append(f"High memory usage: {memory_usage}%")
result['recommendations'].append("Free up memory before connecting")
except Exception as e:
result['issues'].append(f"Performance test failed: {str(e)}")
return result
class VNCDebugger:
"""Interactive VNC debugging tool"""
def __init__(self):
self.connection = None
self.debug_log = []
async def debug_session(self, host: str, port: int):
"""Start interactive debug session"""
print(f"VNC Debugger - Connecting to {host}:{port}")
print("=" * 50)
try:
# Establish raw connection
self.connection = await self._create_debug_connection(host, port)
# Interactive debug loop
while True:
command = input("\nDebug> ").strip().lower()
if command == 'quit':
break
elif command == 'help':
self._show_help()
elif command == 'status':
await self._show_status()
elif command == 'handshake':
await self._debug_handshake()
elif command == 'auth':
await self._debug_authentication()
elif command == 'capture':
await self._capture_traffic()
elif command == 'analyze':
await self._analyze_protocol()
else:
print(f"Unknown command: {command}")
except Exception as e:
print(f"Debug session error: {e}")
finally:
if self.connection:
self.connection.close()
def _show_help(self):
"""Show debug commands"""
print("""
Available commands:
status - Show connection status
handshake - Debug VNC handshake
auth - Debug authentication
capture - Capture protocol traffic
analyze - Analyze protocol messages
quit - Exit debugger
""")
# Troubleshooting guide generator
class TroubleshootingGuide:
"""Generate troubleshooting guides based on issues"""
def __init__(self):
self.knowledge_base = self._load_knowledge_base()
def _load_knowledge_base(self) -> Dict:
"""Load troubleshooting knowledge base"""
return {
'connection_timeout': {
'symptoms': ['Connection timeout', 'Unable to connect'],
'causes': [
'Firewall blocking VNC ports',
'VNC server not running',
'Incorrect port number',
'Network connectivity issues'
],
'solutions': [
'Check firewall rules: iptables -L -n | grep 59',
'Verify VNC server status: systemctl status vncserver',
'Test port connectivity: telnet <host> <port>',
'Check network route: traceroute <host>'
]
},
'authentication_failed': {
'symptoms': ['Authentication failed', 'Invalid password'],
'causes': [
'Incorrect password',
'Wrong authentication method',
'Account locked',
'Password expired'
],
'solutions': [
'Verify password is correct',
'Check VNC authentication configuration',
'Verify account status in LDAP/AD',
'Reset VNC password if needed'
]
},
'poor_performance': {
'symptoms': ['Slow response', 'Laggy display', 'Frequent disconnects'],
'causes': [
'Network bandwidth limitations',
'High latency connection',
'Inefficient encoding',
'System resource constraints'
],
'solutions': [
'Use compression: -compresslevel 9',
'Reduce color depth: -depth 8',
'Change encoding: -encodings "tight zrle"',
'Check system resources: top, iotop'
]
}
}
def generate_guide(self, symptoms: List[str]) -> str:
"""Generate troubleshooting guide based on symptoms"""
guide = "# VNC Troubleshooting Guide\n\n"
guide += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"
# Match symptoms to issues
matched_issues = []
for issue, details in self.knowledge_base.items():
for symptom in symptoms:
if any(s.lower() in symptom.lower() for s in details['symptoms']):
matched_issues.append((issue, details))
break
if not matched_issues:
guide += "No specific issues matched. Please check:\n"
guide += "- Basic network connectivity\n"
guide += "- VNC service status\n"
guide += "- Firewall configuration\n"
else:
for issue_name, issue_details in matched_issues:
guide += f"## Issue: {issue_name.replace('_', ' ').title()}\n\n"
guide += "### Possible Causes:\n"
for cause in issue_details['causes']:
guide += f"- {cause}\n"
guide += "\n### Recommended Solutions:\n"
for i, solution in enumerate(issue_details['solutions'], 1):
guide += f"{i}. {solution}\n"
guide += "\n"
return guide
Best Practices and Guidelines
Enterprise VNC Best Practices
Security Architecture
- Always use SSH tunneling or VPN for VNC connections
- Implement multi-factor authentication
- Use certificate-based authentication where possible
- Regular security audits and penetration testing
- Network isolation for management traffic
Access Control
- Integrate with enterprise identity management
- Role-based access control (RBAC)
- Time-based access restrictions
- Automated access revocation
- Comprehensive audit logging
Performance Optimization
- Use appropriate compression levels
- Implement caching for static content
- Quality of Service (QoS) for VNC traffic
- Monitor and optimize bandwidth usage
- Regular performance tuning
Compliance and Auditing
compliance_requirements: session_recording: enabled: true retention_days: 90 encryption: AES-256 access_logging: detailed_logs: true centralized_storage: true tamper_protection: true data_protection: in_transit: TLS 1.2+ at_rest: encrypted key_management: HSM audit_trails: who: username, source_ip what: actions_performed when: timestamps_utc where: target_systemsOperational Excellence
- Automated health checks
- Proactive monitoring
- Capacity planning
- Disaster recovery procedures
- Regular training for operators
Scalability Considerations
- Connection pooling
- Load balancing across gateways
- Horizontal scaling capability
- Resource limits per user/group
- Automated scaling based on demand
This comprehensive guide transforms basic VNC viewer usage into a complete enterprise remote console management system with security, compliance, and operational excellence at its core.