Linux filesystem and partition management represents a critical skillset for systems administrators managing enterprise storage infrastructure. This comprehensive guide covers traditional and modern storage technologies, advanced resizing techniques, enterprise-grade management practices, and automated solutions for production environments.

Storage Architecture Fundamentals

Storage Stack Overview

Modern Linux storage systems operate through multiple abstraction layers, each providing specific functionality and management capabilities.

┌─────────────────┐
│   Applications  │
├─────────────────┤
│   Filesystems   │ (ext4, XFS, Btrfs, ZFS)
├─────────────────┤
│ Volume Managers │ (LVM, ZFS, Btrfs)
├─────────────────┤
│   Partitions    │ (GPT, MBR)
├─────────────────┤
│ Block Devices   │ (SATA, NVMe, SCSI)
├─────────────────┤
│   Hardware      │ (HDD, SSD, NVMe)
└─────────────────┘

Storage Technology Comparison

TechnologyUse CaseProsConsEnterprise Suitability
Traditional PartitionsSimple setupsDirect, fastLimited flexibilityBasic environments
LVMDynamic storageFlexible, snapshotsComplexity overheadStandard enterprise
ZFSData integrityBuilt-in RAID, compressionMemory intensiveHigh-end enterprise
BtrfsModern featuresCopy-on-write, snapshotsStill maturingSelective use cases

Advanced Partition Management

Comprehensive Disk Analysis and Preparation

Enterprise Disk Discovery Script

#!/bin/bash
# Comprehensive disk and partition analysis tool

set -euo pipefail

# Color output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# Logging
LOG_FILE="/var/log/disk-analysis.log"

log_message() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}

print_header() {
    echo -e "${BLUE}========================================${NC}"
    echo -e "${BLUE}$1${NC}"
    echo -e "${BLUE}========================================${NC}"
}

print_warning() {
    echo -e "${YELLOW}WARNING: $1${NC}"
}

print_error() {
    echo -e "${RED}ERROR: $1${NC}"
}

print_success() {
    echo -e "${GREEN}SUCCESS: $1${NC}"
}

# Comprehensive disk information gathering
analyze_storage_environment() {
    print_header "COMPREHENSIVE STORAGE ANALYSIS"
    
    # System information
    echo -e "\n${BLUE}System Information:${NC}"
    echo "Hostname: $(hostname)"
    echo "Kernel: $(uname -r)"
    echo "Architecture: $(uname -m)"
    echo "Uptime: $(uptime -p)"
    
    # Block device overview
    print_header "BLOCK DEVICE OVERVIEW"
    lsblk -o NAME,SIZE,TYPE,FSTYPE,MOUNTPOINT,UUID,LABEL
    
    # Detailed disk information
    print_header "DETAILED DISK INFORMATION"
    for disk in /dev/sd[a-z] /dev/nvme[0-9]n[0-9] /dev/vd[a-z]; do
        if [[ -b "$disk" ]]; then
            echo -e "\n${BLUE}Disk: $disk${NC}"
            
            # Basic disk info
            if command -v smartctl >/dev/null 2>&1; then
                smartctl -i "$disk" 2>/dev/null | grep -E "(Model|Serial|Capacity|Rotation|Form Factor)" || true
            fi
            
            # Partition table type
            parted "$disk" print 2>/dev/null | head -10 || fdisk -l "$disk" 2>/dev/null | head -10
            
            # I/O statistics
            if [[ -f "/sys/block/$(basename "$disk")/stat" ]]; then
                echo "I/O Statistics: $(cat "/sys/block/$(basename "$disk")/stat")"
            fi
        fi
    done
    
    # Filesystem information
    print_header "FILESYSTEM INFORMATION"
    df -hT
    
    # Mount information
    print_header "MOUNT INFORMATION"
    mount | column -t
    
    # LVM information (if available)
    if command -v pvs >/dev/null 2>&1; then
        print_header "LVM INFORMATION"
        echo -e "\n${BLUE}Physical Volumes:${NC}"
        pvs 2>/dev/null || echo "No LVM physical volumes found"
        
        echo -e "\n${BLUE}Volume Groups:${NC}"
        vgs 2>/dev/null || echo "No LVM volume groups found"
        
        echo -e "\n${BLUE}Logical Volumes:${NC}"
        lvs 2>/dev/null || echo "No LVM logical volumes found"
    fi
    
    # ZFS information (if available)
    if command -v zpool >/dev/null 2>&1; then
        print_header "ZFS INFORMATION"
        echo -e "\n${BLUE}ZFS Pools:${NC}"
        zpool list 2>/dev/null || echo "No ZFS pools found"
        
        echo -e "\n${BLUE}ZFS Filesystems:${NC}"
        zfs list 2>/dev/null || echo "No ZFS filesystems found"
    fi
    
    # RAID information
    if [[ -f /proc/mdstat ]]; then
        print_header "SOFTWARE RAID INFORMATION"
        cat /proc/mdstat
    fi
    
    # Free space analysis
    print_header "FREE SPACE ANALYSIS"
    for disk in /dev/sd[a-z] /dev/nvme[0-9]n[0-9] /dev/vd[a-z]; do
        if [[ -b "$disk" ]]; then
            echo -e "\n${BLUE}Free space on $disk:${NC}"
            parted "$disk" print free 2>/dev/null | grep -i free || echo "No free space information available"
        fi
    done
}

# Safety checks before partition operations
perform_safety_checks() {
    local device="$1"
    local operation="$2"
    
    print_header "SAFETY CHECKS FOR $device"
    
    # Check if device exists
    if [[ ! -b "$device" ]]; then
        print_error "Device $device does not exist"
        return 1
    fi
    
    # Check if device is mounted
    if mount | grep -q "$device"; then
        print_warning "Device $device or its partitions are currently mounted"
        mount | grep "$device"
    fi
    
    # Check if device is part of LVM
    if command -v pvs >/dev/null 2>&1; then
        if pvs 2>/dev/null | grep -q "$device"; then
            print_warning "Device $device is part of LVM configuration"
            pvs | grep "$device"
        fi
    fi
    
    # Check if device is part of RAID
    if [[ -f /proc/mdstat ]] && grep -q "$(basename "$device")" /proc/mdstat; then
        print_warning "Device $device is part of software RAID"
        grep "$(basename "$device")" /proc/mdstat
    fi
    
    # Check for swap partitions
    if swapon --show=NAME | grep -q "$device"; then
        print_warning "Device $device contains active swap partition"
        swapon --show | grep "$device"
    fi
    
    # Backup partition table
    backup_partition_table "$device"
    
    print_success "Safety checks completed for $device"
}

# Backup partition table
backup_partition_table() {
    local device="$1"
    local backup_dir="/var/backups/partition-tables"
    local timestamp=$(date +%Y%m%d_%H%M%S)
    
    mkdir -p "$backup_dir"
    
    # Backup with sfdisk (for all partition types)
    sfdisk -d "$device" > "$backup_dir/$(basename "$device")_${timestamp}.sfdisk" 2>/dev/null
    
    # Backup with dd (first few sectors)
    dd if="$device" of="$backup_dir/$(basename "$device")_${timestamp}.mbr" bs=512 count=1 2>/dev/null
    
    log_message "Partition table backup created for $device in $backup_dir"
}

# Execute analysis
case "${1:-analyze}" in
    "analyze")
        analyze_storage_environment
        ;;
    "safety-check")
        if [[ -z "${2:-}" ]]; then
            print_error "Usage: $0 safety-check <device>"
            exit 1
        fi
        perform_safety_checks "$2" "${3:-resize}"
        ;;
    "backup-partition-table")
        if [[ -z "${2:-}" ]]; then
            print_error "Usage: $0 backup-partition-table <device>"
            exit 1
        fi
        backup_partition_table "$2"
        ;;
    *)
        echo "Usage: $0 {analyze|safety-check|backup-partition-table}"
        echo ""
        echo "Commands:"
        echo "  analyze                    - Comprehensive storage analysis"
        echo "  safety-check <device>      - Perform safety checks on device"
        echo "  backup-partition-table <device> - Backup partition table"
        ;;
esac

Advanced Partition Resizing Framework

Intelligent Partition Resizing Tool

#!/usr/bin/env python3
"""
Enterprise Partition and Filesystem Management Tool
"""

import subprocess
import json
import logging
import time
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum

class FilesystemType(Enum):
    EXT4 = "ext4"
    EXT3 = "ext3"
    EXT2 = "ext2"
    XFS = "xfs"
    BTRFS = "btrfs"
    ZFS = "zfs"
    NTFS = "ntfs"
    FAT32 = "vfat"

class OperationType(Enum):
    EXPAND = "expand"
    SHRINK = "shrink"
    CREATE = "create"
    DELETE = "delete"

@dataclass
class PartitionInfo:
    device: str
    partition_number: int
    start_sector: int
    end_sector: int
    size_bytes: int
    filesystem_type: Optional[FilesystemType]
    mount_point: Optional[str]
    label: Optional[str]
    uuid: Optional[str]

@dataclass
class ResizeOperation:
    device: str
    partition_number: int
    new_size: str
    operation_type: OperationType
    filesystem_type: FilesystemType
    mount_point: Optional[str]

class PartitionManager:
    def __init__(self, dry_run: bool = False):
        self.dry_run = dry_run
        self.logger = logging.getLogger(__name__)
        self.backup_dir = Path("/var/backups/partition-operations")
        self.backup_dir.mkdir(parents=True, exist_ok=True)
        
    def execute_command(self, command: List[str], timeout: int = 300) -> Tuple[bool, str, str]:
        """Execute system command with proper error handling"""
        if self.dry_run:
            self.logger.info(f"DRY RUN: Would execute: {' '.join(command)}")
            return True, "Dry run - command not executed", ""
        
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=timeout
            )
            
            success = result.returncode == 0
            return success, result.stdout, result.stderr
            
        except subprocess.TimeoutExpired:
            return False, "", f"Command timeout after {timeout} seconds"
        except Exception as e:
            return False, "", str(e)
    
    def get_device_info(self, device: str) -> Dict:
        """Get comprehensive device information"""
        info = {
            'device': device,
            'partitions': [],
            'partition_table_type': None,
            'disk_size': 0,
            'free_space': []
        }
        
        # Get partition information using parted
        success, output, error = self.execute_command(['parted', device, 'print', 'free'])
        
        if not success:
            raise RuntimeError(f"Failed to get device info for {device}: {error}")
        
        lines = output.split('\n')
        in_partition_section = False
        
        for line in lines:
            line = line.strip()
            
            if 'Partition Table:' in line:
                info['partition_table_type'] = line.split(':')[1].strip()
            elif 'Disk ' in line and device in line:
                # Extract disk size
                parts = line.split()
                for part in parts:
                    if 'B' in part and part != 'Disk':
                        info['disk_size'] = part
            elif line.startswith('Number'):
                in_partition_section = True
                continue
            elif in_partition_section and line:
                parts = line.split()
                if len(parts) >= 4:
                    try:
                        partition_info = {
                            'number': int(parts[0]),
                            'start': parts[1],
                            'end': parts[2],
                            'size': parts[3],
                            'filesystem': parts[4] if len(parts) > 4 else None
                        }
                        
                        if 'Free Space' in line:
                            info['free_space'].append(partition_info)
                        else:
                            info['partitions'].append(partition_info)
                    except (ValueError, IndexError):
                        continue
        
        return info
    
    def get_filesystem_info(self, partition: str) -> Dict:
        """Get detailed filesystem information"""
        info = {
            'filesystem_type': None,
            'mount_point': None,
            'uuid': None,
            'label': None,
            'used_space': None,
            'available_space': None,
            'total_space': None
        }
        
        # Get filesystem type
        success, output, error = self.execute_command(['blkid', partition])
        if success:
            for line in output.split('\n'):
                if partition in line:
                    # Parse blkid output
                    if 'TYPE=' in line:
                        fs_type = line.split('TYPE=')[1].split()[0].strip('"')
                        try:
                            info['filesystem_type'] = FilesystemType(fs_type)
                        except ValueError:
                            info['filesystem_type'] = fs_type
                    
                    if 'UUID=' in line:
                        info['uuid'] = line.split('UUID=')[1].split()[0].strip('"')
                    
                    if 'LABEL=' in line:
                        info['label'] = line.split('LABEL=')[1].split()[0].strip('"')
        
        # Get mount information
        success, output, error = self.execute_command(['findmnt', '-n', '-o', 'TARGET', partition])
        if success and output.strip():
            info['mount_point'] = output.strip()
            
            # Get space usage if mounted
            success, df_output, error = self.execute_command(['df', '-h', partition])
            if success:
                lines = df_output.split('\n')
                if len(lines) > 1:
                    parts = lines[1].split()
                    if len(parts) >= 4:
                        info['total_space'] = parts[1]
                        info['used_space'] = parts[2]
                        info['available_space'] = parts[3]
        
        return info
    
    def create_backup(self, device: str, operation: str) -> str:
        """Create comprehensive backup before operation"""
        timestamp = time.strftime('%Y%m%d_%H%M%S')
        backup_prefix = f"{self.backup_dir}/{Path(device).name}_{operation}_{timestamp}"
        
        # Backup partition table with sfdisk
        self.execute_command(['sfdisk', '-d', device], timeout=60)
        
        # Backup first few MB of the disk
        dd_backup = f"{backup_prefix}.dd"
        self.execute_command([
            'dd', f'if={device}', f'of={dd_backup}', 
            'bs=1M', 'count=10'
        ], timeout=120)
        
        # Create metadata file
        metadata = {
            'timestamp': timestamp,
            'device': device,
            'operation': operation,
            'device_info': self.get_device_info(device)
        }
        
        metadata_file = f"{backup_prefix}.json"
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2, default=str)
        
        self.logger.info(f"Backup created: {backup_prefix}")
        return backup_prefix
    
    def resize_partition(self, resize_op: ResizeOperation) -> bool:
        """Resize partition using parted"""
        device = resize_op.device
        partition_num = resize_op.partition_number
        new_size = resize_op.new_size
        
        self.logger.info(f"Resizing {device}{partition_num} to {new_size}")
        
        # Create backup
        backup_prefix = self.create_backup(device, f"resize_p{partition_num}")
        
        # Unmount if necessary
        partition_device = f"{device}{partition_num}"
        fs_info = self.get_filesystem_info(partition_device)
        
        was_mounted = False
        mount_point = fs_info.get('mount_point')
        
        if mount_point:
            was_mounted = True
            self.logger.info(f"Unmounting {partition_device} from {mount_point}")
            success, output, error = self.execute_command(['umount', partition_device])
            if not success:
                self.logger.error(f"Failed to unmount {partition_device}: {error}")
                return False
        
        try:
            # Resize partition
            success, output, error = self.execute_command([
                'parted', device, 'resizepart', str(partition_num), new_size
            ])
            
            if not success:
                self.logger.error(f"Failed to resize partition: {error}")
                return False
            
            # Resize filesystem
            if resize_op.filesystem_type in [FilesystemType.EXT2, FilesystemType.EXT3, FilesystemType.EXT4]:
                success, output, error = self.execute_command(['resize2fs', partition_device])
                if not success:
                    self.logger.error(f"Failed to resize ext filesystem: {error}")
                    return False
                    
            elif resize_op.filesystem_type == FilesystemType.XFS:
                if not mount_point:
                    self.logger.error("XFS filesystem must be mounted to resize")
                    return False
                
                # Remount for XFS resize
                if was_mounted:
                    self.execute_command(['mount', partition_device, mount_point])
                
                success, output, error = self.execute_command(['xfs_growfs', mount_point])
                if not success:
                    self.logger.error(f"Failed to resize XFS filesystem: {error}")
                    return False
                    
            elif resize_op.filesystem_type == FilesystemType.BTRFS:
                if not mount_point:
                    self.logger.error("Btrfs filesystem must be mounted to resize")
                    return False
                
                # Remount for Btrfs resize
                if was_mounted:
                    self.execute_command(['mount', partition_device, mount_point])
                
                success, output, error = self.execute_command([
                    'btrfs', 'filesystem', 'resize', 'max', mount_point
                ])
                if not success:
                    self.logger.error(f"Failed to resize Btrfs filesystem: {error}")
                    return False
            
            # Remount if it was originally mounted
            if was_mounted and mount_point:
                success, output, error = self.execute_command(['mount', partition_device, mount_point])
                if not success:
                    self.logger.warning(f"Failed to remount {partition_device}: {error}")
            
            self.logger.info(f"Successfully resized {partition_device}")
            return True
            
        except Exception as e:
            self.logger.error(f"Error during resize operation: {e}")
            
            # Attempt to remount if it was originally mounted
            if was_mounted and mount_point:
                self.execute_command(['mount', partition_device, mount_point])
            
            return False
    
    def validate_resize_operation(self, resize_op: ResizeOperation) -> Tuple[bool, str]:
        """Validate resize operation before execution"""
        device = resize_op.device
        partition_num = resize_op.partition_number
        
        # Check if device exists
        if not Path(device).exists():
            return False, f"Device {device} does not exist"
        
        # Get device information
        try:
            device_info = self.get_device_info(device)
        except Exception as e:
            return False, f"Failed to get device information: {e}"
        
        # Check if partition exists
        partition_exists = any(
            p['number'] == partition_num for p in device_info['partitions']
        )
        
        if not partition_exists:
            return False, f"Partition {partition_num} does not exist on {device}"
        
        # Check for free space (for expansion)
        if resize_op.operation_type == OperationType.EXPAND:
            if not device_info['free_space']:
                return False, "No free space available for expansion"
        
        # Check filesystem support
        partition_device = f"{device}{partition_num}"
        fs_info = self.get_filesystem_info(partition_device)
        
        if resize_op.filesystem_type not in [
            FilesystemType.EXT2, FilesystemType.EXT3, FilesystemType.EXT4,
            FilesystemType.XFS, FilesystemType.BTRFS
        ]:
            return False, f"Filesystem type {resize_op.filesystem_type} not supported for resize"
        
        # Additional checks for shrinking
        if resize_op.operation_type == OperationType.SHRINK:
            return False, "Shrinking operations not implemented for safety"
        
        return True, "Validation passed"
    
    def perform_batch_operations(self, operations: List[ResizeOperation]) -> Dict[str, bool]:
        """Perform multiple resize operations"""
        results = {}
        
        for operation in operations:
            operation_id = f"{operation.device}p{operation.partition_number}"
            
            # Validate operation
            is_valid, validation_msg = self.validate_resize_operation(operation)
            
            if not is_valid:
                self.logger.error(f"Validation failed for {operation_id}: {validation_msg}")
                results[operation_id] = False
                continue
            
            # Perform operation
            try:
                success = self.resize_partition(operation)
                results[operation_id] = success
                
                if success:
                    self.logger.info(f"Successfully completed operation for {operation_id}")
                else:
                    self.logger.error(f"Failed operation for {operation_id}")
                    
            except Exception as e:
                self.logger.error(f"Exception during operation for {operation_id}: {e}")
                results[operation_id] = False
        
        return results

# Command-line interface
if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description='Enterprise Partition Management Tool')
    parser.add_argument('--device', required=True, help='Target device (e.g., /dev/sda)')
    parser.add_argument('--partition', type=int, required=True, help='Partition number')
    parser.add_argument('--size', required=True, help='New size (e.g., 100GB, 50%)')
    parser.add_argument('--filesystem', choices=['ext4', 'ext3', 'ext2', 'xfs', 'btrfs'], 
                       default='ext4', help='Filesystem type')
    parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
    parser.add_argument('--verbose', action='store_true', help='Verbose logging')
    
    args = parser.parse_args()
    
    # Configure logging
    log_level = logging.DEBUG if args.verbose else logging.INFO
    logging.basicConfig(
        level=log_level,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    # Create partition manager
    manager = PartitionManager(dry_run=args.dry_run)
    
    # Create resize operation
    resize_operation = ResizeOperation(
        device=args.device,
        partition_number=args.partition,
        new_size=args.size,
        operation_type=OperationType.EXPAND,
        filesystem_type=FilesystemType(args.filesystem),
        mount_point=None
    )
    
    # Validate and execute
    is_valid, validation_msg = manager.validate_resize_operation(resize_operation)
    
    if not is_valid:
        print(f"Validation failed: {validation_msg}")
        exit(1)
    
    if args.dry_run:
        print("Dry run mode - no changes will be made")
    
    success = manager.resize_partition(resize_operation)
    
    if success:
        print("Resize operation completed successfully")
        exit(0)
    else:
        print("Resize operation failed")
        exit(1)

Modern Storage Technologies

ZFS Management

Enterprise ZFS Configuration

#!/bin/bash
# Enterprise ZFS management and optimization

setup_enterprise_zfs() {
    local pool_name="$1"
    local devices=("${@:2}")
    
    echo "Setting up enterprise ZFS pool: $pool_name"
    echo "Devices: ${devices[*]}"
    
    # Validate devices
    for device in "${devices[@]}"; do
        if [[ ! -b "$device" ]]; then
            echo "ERROR: Device $device is not a valid block device"
            return 1
        fi
    done
    
    # Create ZFS pool with enterprise settings
    zpool create -o ashift=12 \
                 -o autoexpand=on \
                 -O compression=lz4 \
                 -O atime=off \
                 -O recordsize=128K \
                 -O primarycache=all \
                 -O secondarycache=all \
                 -O logbias=throughput \
                 -O redundant_metadata=most \
                 "$pool_name" "${devices[@]}"
    
    # Configure performance optimizations
    zfs set sync=standard "$pool_name"
    zfs set xattr=sa "$pool_name"
    zfs set dnodesize=auto "$pool_name"
    
    # Create enterprise datasets
    zfs create -o recordsize=16K "$pool_name/databases"
    zfs create -o recordsize=1M "$pool_name/backups"
    zfs create -o recordsize=64K "$pool_name/vms"
    zfs create -o recordsize=128K "$pool_name/general"
    
    # Set up quotas and reservations
    zfs set quota=500G "$pool_name/databases"
    zfs set reservation=100G "$pool_name/databases"
    
    echo "ZFS pool $pool_name created successfully"
}

# ZFS monitoring and maintenance
zfs_health_check() {
    local pool_name="$1"
    
    echo "=== ZFS Health Check for $pool_name ==="
    
    # Pool status
    echo "Pool Status:"
    zpool status "$pool_name"
    
    # Pool utilization
    echo -e "\nPool Utilization:"
    zpool list "$pool_name"
    
    # I/O statistics
    echo -e "\nI/O Statistics:"
    zpool iostat "$pool_name" 1 1
    
    # Dataset information
    echo -e "\nDataset Information:"
    zfs list -r "$pool_name"
    
    # Check for errors
    echo -e "\nError Summary:"
    zpool status "$pool_name" | grep -E "(errors|DEGRADED|FAULTED|OFFLINE)"
    
    # ARC statistics
    echo -e "\nARC Statistics:"
    cat /proc/spl/kstat/zfs/arcstats | grep -E "^(hits|misses|size|c_max)"
}

# Automated ZFS snapshot management
zfs_snapshot_management() {
    local dataset="$1"
    local retention_days="${2:-30}"
    
    echo "Managing snapshots for $dataset"
    
    # Create daily snapshot
    local snapshot_name="${dataset}@$(date +%Y%m%d_%H%M%S)"
    zfs snapshot "$snapshot_name"
    echo "Created snapshot: $snapshot_name"
    
    # Remove old snapshots
    local cutoff_date=$(date -d "$retention_days days ago" +%Y%m%d)
    
    zfs list -H -t snapshot -o name | grep "^${dataset}@" | while read snapshot; do
        local snapshot_date=$(echo "$snapshot" | grep -o '[0-9]\{8\}')
        
        if [[ "$snapshot_date" < "$cutoff_date" ]]; then
            echo "Removing old snapshot: $snapshot"
            zfs destroy "$snapshot"
        fi
    done
}

# Usage examples
case "${1:-help}" in
    "setup")
        shift
        setup_enterprise_zfs "$@"
        ;;
    "health-check")
        zfs_health_check "$2"
        ;;
    "snapshot")
        zfs_snapshot_management "$2" "$3"
        ;;
    *)
        echo "Usage: $0 {setup|health-check|snapshot}"
        echo "Examples:"
        echo "  $0 setup tank /dev/sdb /dev/sdc /dev/sdd"
        echo "  $0 health-check tank"
        echo "  $0 snapshot tank/databases 30"
        ;;
esac

Btrfs Management

Advanced Btrfs Operations

#!/bin/bash
# Advanced Btrfs filesystem management

create_btrfs_enterprise_setup() {
    local mount_point="$1"
    local devices=("${@:2}")
    
    echo "Creating enterprise Btrfs filesystem"
    echo "Mount point: $mount_point"
    echo "Devices: ${devices[*]}"
    
    # Create Btrfs filesystem with RAID1
    mkfs.btrfs -f -d raid1 -m raid1 "${devices[@]}"
    
    # Create mount point
    mkdir -p "$mount_point"
    
    # Mount with optimized options
    mount -o compress=zstd,space_cache=v2,autodefrag "${devices[0]}" "$mount_point"
    
    # Create subvolumes for different use cases
    btrfs subvolume create "$mount_point/root"
    btrfs subvolume create "$mount_point/home"
    btrfs subvolume create "$mount_point/var"
    btrfs subvolume create "$mount_point/snapshots"
    
    # Set default subvolume
    btrfs subvolume set-default "$mount_point/root"
    
    echo "Btrfs enterprise setup completed"
}

# Btrfs maintenance and optimization
btrfs_maintenance() {
    local mount_point="$1"
    
    echo "Performing Btrfs maintenance on $mount_point"
    
    # Balance filesystem
    echo "Starting balance operation..."
    btrfs balance start -dusage=75 "$mount_point"
    
    # Scrub for data integrity
    echo "Starting scrub operation..."
    btrfs scrub start "$mount_point"
    
    # Defragment if needed
    echo "Defragmenting filesystem..."
    btrfs filesystem defragment -r -v -czstd "$mount_point"
    
    # Show filesystem usage
    echo "Filesystem usage:"
    btrfs filesystem usage "$mount_point"
    
    echo "Maintenance completed"
}

# Automated snapshot management for Btrfs
btrfs_snapshot_manager() {
    local subvolume="$1"
    local snapshot_dir="$2"
    local retention_days="${3:-7}"
    
    echo "Managing Btrfs snapshots for $subvolume"
    
    # Create snapshot directory
    mkdir -p "$snapshot_dir"
    
    # Create read-only snapshot
    local timestamp=$(date +%Y%m%d_%H%M%S)
    local snapshot_name="$snapshot_dir/snapshot_$timestamp"
    
    btrfs subvolume snapshot -r "$subvolume" "$snapshot_name"
    echo "Created snapshot: $snapshot_name"
    
    # Clean up old snapshots
    find "$snapshot_dir" -name "snapshot_*" -type d -mtime +$retention_days | while read old_snapshot; do
        echo "Removing old snapshot: $old_snapshot"
        btrfs subvolume delete "$old_snapshot"
    done
}

# Btrfs RAID management
manage_btrfs_raid() {
    local mount_point="$1"
    local operation="$2"
    local device="$3"
    
    case "$operation" in
        "add")
            echo "Adding device $device to Btrfs filesystem"
            btrfs device add "$device" "$mount_point"
            btrfs balance start -dconvert=raid1 -mconvert=raid1 "$mount_point"
            ;;
        "remove")
            echo "Removing device $device from Btrfs filesystem"
            btrfs device remove "$device" "$mount_point"
            ;;
        "replace")
            local old_device="$device"
            local new_device="$4"
            echo "Replacing device $old_device with $new_device"
            btrfs replace start "$old_device" "$new_device" "$mount_point"
            ;;
        "status")
            echo "Btrfs device status:"
            btrfs filesystem show "$mount_point"
            btrfs device stats "$mount_point"
            ;;
        *)
            echo "Usage: manage_btrfs_raid <mount_point> {add|remove|replace|status} <device> [new_device]"
            ;;
    esac
}

# Execute based on command
case "${1:-help}" in
    "setup")
        shift
        create_btrfs_enterprise_setup "$@"
        ;;
    "maintenance")
        btrfs_maintenance "$2"
        ;;
    "snapshot")
        btrfs_snapshot_manager "$2" "$3" "$4"
        ;;
    "raid")
        shift
        manage_btrfs_raid "$@"
        ;;
    *)
        echo "Usage: $0 {setup|maintenance|snapshot|raid}"
        ;;
esac

LVM Advanced Management

Enterprise LVM Configuration

Comprehensive LVM Management System

#!/usr/bin/env python3
"""
Enterprise LVM Management and Automation System
"""

import subprocess
import json
import logging
from dataclasses import dataclass
from typing import List, Dict, Optional
from pathlib import Path

@dataclass
class LVMComponent:
    name: str
    size: str
    status: str
    attributes: str

@dataclass 
class PhysicalVolume(LVMComponent):
    device: str
    volume_group: Optional[str]
    pe_size: str
    pe_total: int
    pe_free: int

@dataclass
class VolumeGroup(LVMComponent):
    pv_count: int
    lv_count: int
    pe_size: str
    pe_total: int
    pe_free: int
    physical_volumes: List[str]

@dataclass
class LogicalVolume(LVMComponent):
    volume_group: str
    mount_point: Optional[str]
    filesystem: Optional[str]
    lv_path: str

class EnterpriseL MVManager:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
    def execute_command(self, command: List[str]) -> tuple[bool, str, str]:
        """Execute LVM command with error handling"""
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=300
            )
            
            return result.returncode == 0, result.stdout, result.stderr
            
        except subprocess.TimeoutExpired:
            return False, "", "Command timeout"
        except Exception as e:
            return False, "", str(e)
    
    def get_physical_volumes(self) -> List[PhysicalVolume]:
        """Get all physical volumes"""
        success, output, error = self.execute_command([
            'pvs', '--noheadings', '--separator=|',
            '-o', 'pv_name,vg_name,pv_size,pv_attr,pe_size,pv_pe_count,pv_pe_alloc_count'
        ])
        
        if not success:
            raise RuntimeError(f"Failed to get PV info: {error}")
        
        pvs = []
        for line in output.strip().split('\n'):
            if line.strip():
                parts = [p.strip() for p in line.split('|')]
                if len(parts) >= 7:
                    pv = PhysicalVolume(
                        name=parts[0],
                        device=parts[0],
                        volume_group=parts[1] if parts[1] else None,
                        size=parts[2],
                        status="active",
                        attributes=parts[3],
                        pe_size=parts[4],
                        pe_total=int(parts[5]),
                        pe_free=int(parts[5]) - int(parts[6])
                    )
                    pvs.append(pv)
        
        return pvs
    
    def get_volume_groups(self) -> List[VolumeGroup]:
        """Get all volume groups"""
        success, output, error = self.execute_command([
            'vgs', '--noheadings', '--separator=|',
            '-o', 'vg_name,vg_size,vg_attr,pv_count,lv_count,vg_extent_size,vg_extent_count,vg_free_count'
        ])
        
        if not success:
            raise RuntimeError(f"Failed to get VG info: {error}")
        
        vgs = []
        for line in output.strip().split('\n'):
            if line.strip():
                parts = [p.strip() for p in line.split('|')]
                if len(parts) >= 8:
                    # Get PVs for this VG
                    pv_success, pv_output, _ = self.execute_command([
                        'pvs', '--noheadings', '-o', 'pv_name',
                        '--select', f'vg_name={parts[0]}'
                    ])
                    
                    physical_volumes = []
                    if pv_success:
                        physical_volumes = [pv.strip() for pv in pv_output.split('\n') if pv.strip()]
                    
                    vg = VolumeGroup(
                        name=parts[0],
                        size=parts[1],
                        status="active",
                        attributes=parts[2],
                        pv_count=int(parts[3]),
                        lv_count=int(parts[4]),
                        pe_size=parts[5],
                        pe_total=int(parts[6]),
                        pe_free=int(parts[7]),
                        physical_volumes=physical_volumes
                    )
                    vgs.append(vg)
        
        return vgs
    
    def get_logical_volumes(self) -> List[LogicalVolume]:
        """Get all logical volumes"""
        success, output, error = self.execute_command([
            'lvs', '--noheadings', '--separator=|',
            '-o', 'lv_name,vg_name,lv_size,lv_attr,lv_path'
        ])
        
        if not success:
            raise RuntimeError(f"Failed to get LV info: {error}")
        
        lvs = []
        for line in output.strip().split('\n'):
            if line.strip():
                parts = [p.strip() for p in line.split('|')]
                if len(parts) >= 5:
                    # Get mount and filesystem info
                    mount_point = None
                    filesystem = None
                    
                    # Check if mounted
                    mount_success, mount_output, _ = self.execute_command([
                        'findmnt', '-n', '-o', 'TARGET,FSTYPE', parts[4]
                    ])
                    
                    if mount_success and mount_output.strip():
                        mount_parts = mount_output.strip().split()
                        if len(mount_parts) >= 2:
                            mount_point = mount_parts[0]
                            filesystem = mount_parts[1]
                    
                    lv = LogicalVolume(
                        name=parts[0],
                        volume_group=parts[1],
                        size=parts[2],
                        status="active",
                        attributes=parts[3],
                        lv_path=parts[4],
                        mount_point=mount_point,
                        filesystem=filesystem
                    )
                    lvs.append(lv)
        
        return lvs
    
    def create_enterprise_setup(self, devices: List[str], vg_name: str) -> bool:
        """Create enterprise LVM setup"""
        self.logger.info(f"Creating enterprise LVM setup with devices: {devices}")
        
        try:
            # Create physical volumes
            for device in devices:
                success, output, error = self.execute_command(['pvcreate', device])
                if not success:
                    raise RuntimeError(f"Failed to create PV on {device}: {error}")
                self.logger.info(f"Created PV on {device}")
            
            # Create volume group
            success, output, error = self.execute_command(['vgcreate', vg_name] + devices)
            if not success:
                raise RuntimeError(f"Failed to create VG {vg_name}: {error}")
            self.logger.info(f"Created VG {vg_name}")
            
            # Create logical volumes for different use cases
            lv_configs = [
                ('system', '20G'),
                ('var', '10G'),
                ('tmp', '5G'),
                ('home', '30G'),
                ('data', '50%FREE')
            ]
            
            for lv_name, lv_size in lv_configs:
                success, output, error = self.execute_command([
                    'lvcreate', '-L' if lv_size.endswith('G') else '-l',
                    lv_size, '-n', lv_name, vg_name
                ])
                
                if not success:
                    self.logger.warning(f"Failed to create LV {lv_name}: {error}")
                else:
                    self.logger.info(f"Created LV {lv_name} ({lv_size})")
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create enterprise setup: {e}")
            return False
    
    def extend_logical_volume(self, lv_path: str, size: str, filesystem: bool = True) -> bool:
        """Extend logical volume and filesystem"""
        self.logger.info(f"Extending {lv_path} by {size}")
        
        try:
            # Extend logical volume
            success, output, error = self.execute_command([
                'lvextend', '-L', f'+{size}', lv_path
            ])
            
            if not success:
                raise RuntimeError(f"Failed to extend LV: {error}")
            
            # Extend filesystem if requested
            if filesystem:
                # Detect filesystem type
                fs_success, fs_output, _ = self.execute_command([
                    'blkid', '-s', 'TYPE', '-o', 'value', lv_path
                ])
                
                if fs_success:
                    fs_type = fs_output.strip()
                    
                    if fs_type in ['ext2', 'ext3', 'ext4']:
                        success, output, error = self.execute_command(['resize2fs', lv_path])
                    elif fs_type == 'xfs':
                        # Get mount point for XFS
                        mount_success, mount_output, _ = self.execute_command([
                            'findmnt', '-n', '-o', 'TARGET', lv_path
                        ])
                        if mount_success:
                            mount_point = mount_output.strip()
                            success, output, error = self.execute_command(['xfs_growfs', mount_point])
                    else:
                        self.logger.warning(f"Filesystem {fs_type} resize not supported")
                        success = True
                    
                    if not success:
                        raise RuntimeError(f"Failed to extend filesystem: {error}")
            
            self.logger.info(f"Successfully extended {lv_path}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to extend logical volume: {e}")
            return False
    
    def create_snapshot(self, lv_path: str, snapshot_name: str, size: str = '1G') -> bool:
        """Create LVM snapshot"""
        self.logger.info(f"Creating snapshot {snapshot_name} of {lv_path}")
        
        try:
            # Extract VG name from LV path
            parts = lv_path.split('/')
            if len(parts) >= 4 and parts[1] == 'dev':
                vg_name = parts[2]
            else:
                raise ValueError(f"Invalid LV path format: {lv_path}")
            
            success, output, error = self.execute_command([
                'lvcreate', '-L', size, '-s', '-n', snapshot_name, lv_path
            ])
            
            if not success:
                raise RuntimeError(f"Failed to create snapshot: {error}")
            
            self.logger.info(f"Successfully created snapshot {snapshot_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create snapshot: {e}")
            return False
    
    def generate_report(self) -> Dict:
        """Generate comprehensive LVM report"""
        try:
            report = {
                'timestamp': subprocess.run(['date', '-Iseconds'], 
                                          capture_output=True, text=True).stdout.strip(),
                'physical_volumes': [],
                'volume_groups': [],
                'logical_volumes': [],
                'summary': {
                    'total_pvs': 0,
                    'total_vgs': 0,
                    'total_lvs': 0,
                    'total_size': '0B',
                    'free_space': '0B'
                }
            }
            
            # Get all components
            pvs = self.get_physical_volumes()
            vgs = self.get_volume_groups()
            lvs = self.get_logical_volumes()
            
            # Convert to dict format
            report['physical_volumes'] = [
                {
                    'device': pv.device,
                    'volume_group': pv.volume_group,
                    'size': pv.size,
                    'pe_size': pv.pe_size,
                    'pe_total': pv.pe_total,
                    'pe_free': pv.pe_free
                } for pv in pvs
            ]
            
            report['volume_groups'] = [
                {
                    'name': vg.name,
                    'size': vg.size,
                    'pv_count': vg.pv_count,
                    'lv_count': vg.lv_count,
                    'pe_free': vg.pe_free,
                    'physical_volumes': vg.physical_volumes
                } for vg in vgs
            ]
            
            report['logical_volumes'] = [
                {
                    'name': lv.name,
                    'volume_group': lv.volume_group,
                    'size': lv.size,
                    'lv_path': lv.lv_path,
                    'mount_point': lv.mount_point,
                    'filesystem': lv.filesystem
                } for lv in lvs
            ]
            
            # Update summary
            report['summary']['total_pvs'] = len(pvs)
            report['summary']['total_vgs'] = len(vgs)
            report['summary']['total_lvs'] = len(lvs)
            
            return report
            
        except Exception as e:
            self.logger.error(f"Failed to generate report: {e}")
            return {'error': str(e)}

# Command-line interface
if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description='Enterprise LVM Management')
    parser.add_argument('command', choices=['report', 'setup', 'extend', 'snapshot'])
    parser.add_argument('--devices', nargs='+', help='Block devices for setup')
    parser.add_argument('--vg-name', help='Volume group name')
    parser.add_argument('--lv-path', help='Logical volume path')
    parser.add_argument('--size', help='Size for operations')
    parser.add_argument('--snapshot-name', help='Snapshot name')
    parser.add_argument('--output', help='Output file for report')
    
    args = parser.parse_args()
    
    logging.basicConfig(level=logging.INFO)
    manager = EnterpriseLVMManager()
    
    if args.command == 'report':
        report = manager.generate_report()
        if args.output:
            with open(args.output, 'w') as f:
                json.dump(report, f, indent=2)
        else:
            print(json.dumps(report, indent=2))
    
    elif args.command == 'setup':
        if not args.devices or not args.vg_name:
            print("Setup requires --devices and --vg-name")
            exit(1)
        success = manager.create_enterprise_setup(args.devices, args.vg_name)
        exit(0 if success else 1)
    
    elif args.command == 'extend':
        if not args.lv_path or not args.size:
            print("Extend requires --lv-path and --size")
            exit(1)
        success = manager.extend_logical_volume(args.lv_path, args.size)
        exit(0 if success else 1)
    
    elif args.command == 'snapshot':
        if not args.lv_path or not args.snapshot_name:
            print("Snapshot requires --lv-path and --snapshot-name")
            exit(1)
        success = manager.create_snapshot(args.lv_path, args.snapshot_name, args.size or '1G')
        exit(0 if success else 1)

This comprehensive guide provides enterprise-grade Linux filesystem and partition management capabilities, covering traditional tools, modern storage technologies, advanced automation frameworks, and production-ready management solutions for diverse infrastructure requirements.