Advanced Linux database systems programming requires deep understanding of storage engines, transaction processing, concurrency control, and query optimization. This comprehensive guide explores building custom database systems including B-tree implementations, Write-Ahead Logging, MVCC, and creating production-grade ACID-compliant database engines.

Advanced Linux Database Systems Programming

Custom Storage Engine and Transaction Manager

High-Performance B-Tree Storage Engine

// btree_storage.c - Advanced B-tree storage engine implementation
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include <stdatomic.h>
#include <assert.h>
#include <time.h>

#define PAGE_SIZE 4096
#define MAX_KEY_SIZE 256
#define MAX_VALUE_SIZE 1024
#define MAX_PAGES 1000000
#define BTREE_ORDER 128
#define BUFFER_POOL_SIZE 10000
#define WAL_BUFFER_SIZE (1024 * 1024)

// Page types
typedef enum {
    PAGE_TYPE_LEAF = 1,
    PAGE_TYPE_INTERNAL = 2,
    PAGE_TYPE_OVERFLOW = 3,
    PAGE_TYPE_FREE = 4
} page_type_t;

// Lock types
typedef enum {
    LOCK_NONE = 0,
    LOCK_SHARED = 1,
    LOCK_EXCLUSIVE = 2,
    LOCK_UPDATE = 3
} lock_type_t;

// Transaction states
typedef enum {
    TXN_STATE_ACTIVE,
    TXN_STATE_COMMITTED,
    TXN_STATE_ABORTED,
    TXN_STATE_PREPARED
} transaction_state_t;

// WAL record types
typedef enum {
    WAL_INSERT = 1,
    WAL_UPDATE = 2,
    WAL_DELETE = 3,
    WAL_COMMIT = 4,
    WAL_ABORT = 5,
    WAL_CHECKPOINT = 6
} wal_record_type_t;

// Page header structure
typedef struct {
    uint32_t page_id;
    page_type_t page_type;
    uint16_t key_count;
    uint16_t free_space;
    uint32_t parent_page_id;
    uint32_t next_page_id;
    uint32_t prev_page_id;
    uint64_t lsn; // Log Sequence Number
    uint32_t checksum;
    char reserved[32];
} page_header_t;

// Key-value pair structure
typedef struct {
    uint16_t key_length;
    uint16_t value_length;
    uint32_t child_page_id; // For internal nodes
    char data[]; // Key followed by value
} kv_pair_t;

// B-tree page structure
typedef struct {
    page_header_t header;
    char data[PAGE_SIZE - sizeof(page_header_t)];
} btree_page_t;

// Buffer pool entry
typedef struct {
    btree_page_t* page;
    uint32_t page_id;
    bool dirty;
    bool pinned;
    atomic_int ref_count;
    pthread_rwlock_t page_lock;
    struct timespec last_access;
    uint32_t hash_next; // For hash table chaining
} buffer_entry_t;

// Transaction structure
typedef struct transaction {
    uint64_t txn_id;
    transaction_state_t state;
    uint64_t start_lsn;
    uint64_t commit_lsn;
    time_t start_time;
    time_t commit_time;
    
    // Lock table
    struct lock_entry* locks;
    pthread_mutex_t lock_mutex;
    
    // Undo log
    struct undo_entry* undo_log;
    size_t undo_count;
    
    // Statistics
    struct {
        uint64_t pages_read;
        uint64_t pages_written;
        uint64_t rows_inserted;
        uint64_t rows_updated;
        uint64_t rows_deleted;
    } stats;
    
    struct transaction* next;
} transaction_t;

// Lock entry
typedef struct lock_entry {
    uint32_t page_id;
    uint64_t key_hash;
    lock_type_t lock_type;
    transaction_t* owner;
    struct lock_entry* next_in_txn;
    struct lock_entry* next_in_table;
} lock_entry_t;

// Undo log entry
typedef struct undo_entry {
    wal_record_type_t operation;
    uint32_t page_id;
    uint16_t slot_id;
    uint16_t key_length;
    uint16_t old_value_length;
    char* key_data;
    char* old_value_data;
    struct undo_entry* next;
} undo_entry_t;

// WAL record
typedef struct {
    uint64_t lsn;
    uint64_t txn_id;
    wal_record_type_t type;
    uint32_t page_id;
    uint16_t data_length;
    uint32_t checksum;
    char data[];
} wal_record_t;

// Storage engine context
typedef struct {
    // File management
    int data_fd;
    int wal_fd;
    char* data_filename;
    char* wal_filename;
    
    // Buffer pool
    buffer_entry_t buffer_pool[BUFFER_POOL_SIZE];
    uint32_t* hash_table;
    size_t hash_table_size;
    pthread_mutex_t buffer_mutex;
    
    // Free page management
    uint32_t* free_pages;
    size_t free_page_count;
    size_t free_page_capacity;
    uint32_t next_page_id;
    pthread_mutex_t free_page_mutex;
    
    // Transaction management
    transaction_t* active_transactions;
    uint64_t next_txn_id;
    pthread_mutex_t txn_mutex;
    
    // Lock table
    lock_entry_t** lock_table;
    size_t lock_table_size;
    pthread_mutex_t lock_table_mutex;
    
    // WAL management
    uint8_t* wal_buffer;
    size_t wal_buffer_pos;
    uint64_t next_lsn;
    uint64_t last_checkpoint_lsn;
    pthread_mutex_t wal_mutex;
    
    // Root page
    uint32_t root_page_id;
    
    // Statistics
    struct {
        atomic_uint64_t pages_read;
        atomic_uint64_t pages_written;
        atomic_uint64_t cache_hits;
        atomic_uint64_t cache_misses;
        atomic_uint64_t transactions_committed;
        atomic_uint64_t transactions_aborted;
        atomic_uint64_t wal_records_written;
        atomic_uint64_t checkpoints_performed;
    } stats;
    
    // Configuration
    struct {
        bool enable_checksums;
        bool enable_wal;
        bool enable_compression;
        size_t checkpoint_interval;
        size_t wal_segment_size;
        double buffer_pool_hit_ratio_target;
    } config;
    
} storage_engine_t;

static storage_engine_t storage_engine = {0};

// Utility functions
static uint32_t hash_page_id(uint32_t page_id)
{
    // Simple hash function
    return page_id % storage_engine.hash_table_size;
}

static uint64_t hash_key(const char* key, size_t length)
{
    // FNV-1a hash
    uint64_t hash = 14695981039346656037ULL;
    for (size_t i = 0; i < length; i++) {
        hash ^= (uint8_t)key[i];
        hash *= 1099511628211ULL;
    }
    return hash;
}

static uint32_t calculate_checksum(const void* data, size_t length)
{
    // Simple CRC32-like checksum
    uint32_t checksum = 0;
    const uint8_t* bytes = (const uint8_t*)data;
    
    for (size_t i = 0; i < length; i++) {
        checksum = (checksum << 1) ^ bytes[i];
    }
    
    return checksum;
}

// WAL (Write-Ahead Logging) implementation
static uint64_t allocate_lsn(void)
{
    return atomic_fetch_add(&storage_engine.next_lsn, 1);
}

static int write_wal_record(uint64_t txn_id, wal_record_type_t type, 
                           uint32_t page_id, const void* data, size_t data_length)
{
    pthread_mutex_lock(&storage_engine.wal_mutex);
    
    size_t record_size = sizeof(wal_record_t) + data_length;
    
    // Check if we need to flush the buffer
    if (storage_engine.wal_buffer_pos + record_size > WAL_BUFFER_SIZE) {
        // Write buffer to disk
        ssize_t bytes_written = write(storage_engine.wal_fd, storage_engine.wal_buffer,
                                     storage_engine.wal_buffer_pos);
        if (bytes_written != (ssize_t)storage_engine.wal_buffer_pos) {
            pthread_mutex_unlock(&storage_engine.wal_mutex);
            return -1;
        }
        
        storage_engine.wal_buffer_pos = 0;
        fsync(storage_engine.wal_fd);
    }
    
    // Create WAL record
    wal_record_t* record = (wal_record_t*)(storage_engine.wal_buffer + storage_engine.wal_buffer_pos);
    record->lsn = allocate_lsn();
    record->txn_id = txn_id;
    record->type = type;
    record->page_id = page_id;
    record->data_length = data_length;
    
    if (data && data_length > 0) {
        memcpy(record->data, data, data_length);
    }
    
    record->checksum = calculate_checksum(record, record_size - sizeof(record->checksum));
    
    storage_engine.wal_buffer_pos += record_size;
    atomic_fetch_add(&storage_engine.stats.wal_records_written, 1);
    
    pthread_mutex_unlock(&storage_engine.wal_mutex);
    
    return 0;
}

static int flush_wal_buffer(void)
{
    pthread_mutex_lock(&storage_engine.wal_mutex);
    
    if (storage_engine.wal_buffer_pos > 0) {
        ssize_t bytes_written = write(storage_engine.wal_fd, storage_engine.wal_buffer,
                                     storage_engine.wal_buffer_pos);
        if (bytes_written != (ssize_t)storage_engine.wal_buffer_pos) {
            pthread_mutex_unlock(&storage_engine.wal_mutex);
            return -1;
        }
        
        fsync(storage_engine.wal_fd);
        storage_engine.wal_buffer_pos = 0;
    }
    
    pthread_mutex_unlock(&storage_engine.wal_mutex);
    return 0;
}

// Buffer pool management
static buffer_entry_t* find_buffer_entry(uint32_t page_id)
{
    uint32_t hash = hash_page_id(page_id);
    uint32_t entry_idx = storage_engine.hash_table[hash];
    
    while (entry_idx != UINT32_MAX) {
        buffer_entry_t* entry = &storage_engine.buffer_pool[entry_idx];
        if (entry->page_id == page_id) {
            return entry;
        }
        entry_idx = entry->hash_next;
    }
    
    return NULL;
}

static buffer_entry_t* allocate_buffer_entry(uint32_t page_id)
{
    pthread_mutex_lock(&storage_engine.buffer_mutex);
    
    // Find least recently used unpinned page
    buffer_entry_t* victim = NULL;
    struct timespec oldest_time = {0};
    
    for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
        buffer_entry_t* entry = &storage_engine.buffer_pool[i];
        
        if (!entry->pinned && atomic_load(&entry->ref_count) == 0) {
            if (!victim || 
                entry->last_access.tv_sec < oldest_time.tv_sec ||
                (entry->last_access.tv_sec == oldest_time.tv_sec &&
                 entry->last_access.tv_nsec < oldest_time.tv_nsec)) {
                victim = entry;
                oldest_time = entry->last_access;
            }
        }
    }
    
    if (!victim) {
        pthread_mutex_unlock(&storage_engine.buffer_mutex);
        return NULL; // Buffer pool full
    }
    
    // Evict victim if dirty
    if (victim->dirty && victim->page) {
        // Write page to disk
        off_t offset = victim->page_id * PAGE_SIZE;
        if (pwrite(storage_engine.data_fd, victim->page, PAGE_SIZE, offset) != PAGE_SIZE) {
            pthread_mutex_unlock(&storage_engine.buffer_mutex);
            return NULL;
        }
        atomic_fetch_add(&storage_engine.stats.pages_written, 1);
        victim->dirty = false;
    }
    
    // Remove from hash table
    if (victim->page_id != 0) {
        uint32_t hash = hash_page_id(victim->page_id);
        uint32_t* current = &storage_engine.hash_table[hash];
        
        while (*current != UINT32_MAX) {
            if (*current == (victim - storage_engine.buffer_pool)) {
                *current = victim->hash_next;
                break;
            }
            current = &storage_engine.buffer_pool[*current].hash_next;
        }
    }
    
    // Initialize new entry
    victim->page_id = page_id;
    victim->dirty = false;
    victim->pinned = false;
    atomic_store(&victim->ref_count, 1);
    clock_gettime(CLOCK_MONOTONIC, &victim->last_access);
    
    // Add to hash table
    uint32_t hash = hash_page_id(page_id);
    victim->hash_next = storage_engine.hash_table[hash];
    storage_engine.hash_table[hash] = victim - storage_engine.buffer_pool;
    
    pthread_mutex_unlock(&storage_engine.buffer_mutex);
    
    return victim;
}

static btree_page_t* get_page(uint32_t page_id, lock_type_t lock_type)
{
    // Check buffer pool first
    buffer_entry_t* entry = find_buffer_entry(page_id);
    
    if (entry) {
        atomic_fetch_add(&entry->ref_count, 1);
        clock_gettime(CLOCK_MONOTONIC, &entry->last_access);
        atomic_fetch_add(&storage_engine.stats.cache_hits, 1);
        
        // Acquire page lock
        if (lock_type == LOCK_SHARED) {
            pthread_rwlock_rdlock(&entry->page_lock);
        } else if (lock_type == LOCK_EXCLUSIVE) {
            pthread_rwlock_wrlock(&entry->page_lock);
        }
        
        return entry->page;
    }
    
    // Page not in buffer pool - allocate new entry
    entry = allocate_buffer_entry(page_id);
    if (!entry) {
        return NULL; // Buffer pool full
    }
    
    atomic_fetch_add(&storage_engine.stats.cache_misses, 1);
    
    // Allocate page memory if needed
    if (!entry->page) {
        entry->page = aligned_alloc(PAGE_SIZE, PAGE_SIZE);
        if (!entry->page) {
            atomic_fetch_sub(&entry->ref_count, 1);
            return NULL;
        }
    }
    
    // Read page from disk
    off_t offset = page_id * PAGE_SIZE;
    if (pread(storage_engine.data_fd, entry->page, PAGE_SIZE, offset) != PAGE_SIZE) {
        // Page doesn't exist - initialize new page
        memset(entry->page, 0, PAGE_SIZE);
        entry->page->header.page_id = page_id;
        entry->page->header.page_type = PAGE_TYPE_LEAF;
        entry->page->header.free_space = PAGE_SIZE - sizeof(page_header_t);
        entry->dirty = true;
    }
    
    atomic_fetch_add(&storage_engine.stats.pages_read, 1);
    
    // Verify checksum if enabled
    if (storage_engine.config.enable_checksums) {
        uint32_t stored_checksum = entry->page->header.checksum;
        entry->page->header.checksum = 0;
        uint32_t calculated_checksum = calculate_checksum(entry->page, PAGE_SIZE);
        entry->page->header.checksum = stored_checksum;
        
        if (stored_checksum != 0 && stored_checksum != calculated_checksum) {
            printf("Checksum mismatch for page %u\n", page_id);
            atomic_fetch_sub(&entry->ref_count, 1);
            return NULL;
        }
    }
    
    // Acquire page lock
    if (lock_type == LOCK_SHARED) {
        pthread_rwlock_rdlock(&entry->page_lock);
    } else if (lock_type == LOCK_EXCLUSIVE) {
        pthread_rwlock_wrlock(&entry->page_lock);
    }
    
    return entry->page;
}

static void release_page(uint32_t page_id, lock_type_t lock_type)
{
    buffer_entry_t* entry = find_buffer_entry(page_id);
    if (!entry) {
        return;
    }
    
    // Release page lock
    if (lock_type == LOCK_SHARED || lock_type == LOCK_EXCLUSIVE) {
        pthread_rwlock_unlock(&entry->page_lock);
    }
    
    atomic_fetch_sub(&entry->ref_count, 1);
}

static void mark_page_dirty(uint32_t page_id)
{
    buffer_entry_t* entry = find_buffer_entry(page_id);
    if (entry) {
        entry->dirty = true;
        
        // Update LSN
        if (storage_engine.config.enable_wal) {
            entry->page->header.lsn = storage_engine.next_lsn - 1;
        }
        
        // Update checksum
        if (storage_engine.config.enable_checksums) {
            entry->page->header.checksum = 0;
            entry->page->header.checksum = calculate_checksum(entry->page, PAGE_SIZE);
        }
    }
}

// Free page management
static uint32_t allocate_page(void)
{
    pthread_mutex_lock(&storage_engine.free_page_mutex);
    
    uint32_t page_id;
    
    if (storage_engine.free_page_count > 0) {
        // Reuse a free page
        page_id = storage_engine.free_pages[--storage_engine.free_page_count];
    } else {
        // Allocate new page
        page_id = storage_engine.next_page_id++;
    }
    
    pthread_mutex_unlock(&storage_engine.free_page_mutex);
    
    return page_id;
}

static void deallocate_page(uint32_t page_id)
{
    pthread_mutex_lock(&storage_engine.free_page_mutex);
    
    // Grow free page array if necessary
    if (storage_engine.free_page_count >= storage_engine.free_page_capacity) {
        size_t new_capacity = storage_engine.free_page_capacity * 2;
        if (new_capacity == 0) new_capacity = 1024;
        
        uint32_t* new_array = realloc(storage_engine.free_pages, 
                                     new_capacity * sizeof(uint32_t));
        if (new_array) {
            storage_engine.free_pages = new_array;
            storage_engine.free_page_capacity = new_capacity;
        }
    }
    
    if (storage_engine.free_page_count < storage_engine.free_page_capacity) {
        storage_engine.free_pages[storage_engine.free_page_count++] = page_id;
    }
    
    pthread_mutex_unlock(&storage_engine.free_page_mutex);
}

// B-tree operations
static int compare_keys(const char* key1, size_t len1, const char* key2, size_t len2)
{
    size_t min_len = len1 < len2 ? len1 : len2;
    int result = memcmp(key1, key2, min_len);
    
    if (result == 0) {
        if (len1 < len2) return -1;
        if (len1 > len2) return 1;
        return 0;
    }
    
    return result;
}

static kv_pair_t* get_kv_pair(btree_page_t* page, int index)
{
    if (index < 0 || index >= page->header.key_count) {
        return NULL;
    }
    
    // Find the key-value pair at the given index
    char* data_ptr = page->data;
    
    for (int i = 0; i <= index; i++) {
        if (i == index) {
            return (kv_pair_t*)data_ptr;
        }
        
        kv_pair_t* kv = (kv_pair_t*)data_ptr;
        data_ptr += sizeof(kv_pair_t) + kv->key_length + kv->value_length;
    }
    
    return NULL;
}

static int find_key_position(btree_page_t* page, const char* key, size_t key_length)
{
    int left = 0;
    int right = page->header.key_count - 1;
    
    while (left <= right) {
        int mid = (left + right) / 2;
        kv_pair_t* kv = get_kv_pair(page, mid);
        
        if (!kv) break;
        
        const char* kv_key = kv->data;
        int cmp = compare_keys(key, key_length, kv_key, kv->key_length);
        
        if (cmp == 0) {
            return mid; // Exact match
        } else if (cmp < 0) {
            right = mid - 1;
        } else {
            left = mid + 1;
        }
    }
    
    return left; // Insertion position
}

static int insert_kv_pair(btree_page_t* page, int position, const char* key, size_t key_length,
                         const char* value, size_t value_length, uint32_t child_page_id)
{
    size_t pair_size = sizeof(kv_pair_t) + key_length + value_length;
    
    if (page->header.free_space < pair_size) {
        return -1; // Not enough space
    }
    
    // Find insertion point
    char* insert_ptr = page->data;
    for (int i = 0; i < position; i++) {
        kv_pair_t* kv = (kv_pair_t*)insert_ptr;
        insert_ptr += sizeof(kv_pair_t) + kv->key_length + kv->value_length;
    }
    
    // Calculate space needed to move existing data
    char* end_ptr = page->data + (PAGE_SIZE - sizeof(page_header_t) - page->header.free_space);
    size_t move_size = end_ptr - insert_ptr;
    
    // Move existing data to make room
    if (move_size > 0) {
        memmove(insert_ptr + pair_size, insert_ptr, move_size);
    }
    
    // Insert new key-value pair
    kv_pair_t* new_kv = (kv_pair_t*)insert_ptr;
    new_kv->key_length = key_length;
    new_kv->value_length = value_length;
    new_kv->child_page_id = child_page_id;
    
    memcpy(new_kv->data, key, key_length);
    memcpy(new_kv->data + key_length, value, value_length);
    
    page->header.key_count++;
    page->header.free_space -= pair_size;
    
    return 0;
}

static int delete_kv_pair(btree_page_t* page, int position)
{
    if (position < 0 || position >= page->header.key_count) {
        return -1;
    }
    
    kv_pair_t* kv = get_kv_pair(page, position);
    if (!kv) return -1;
    
    size_t pair_size = sizeof(kv_pair_t) + kv->key_length + kv->value_length;
    
    // Calculate data to move
    char* delete_ptr = (char*)kv;
    char* next_ptr = delete_ptr + pair_size;
    char* end_ptr = page->data + (PAGE_SIZE - sizeof(page_header_t) - page->header.free_space);
    size_t move_size = end_ptr - next_ptr;
    
    // Move data to close the gap
    if (move_size > 0) {
        memmove(delete_ptr, next_ptr, move_size);
    }
    
    page->header.key_count--;
    page->header.free_space += pair_size;
    
    return 0;
}

// Transaction management
static transaction_t* begin_transaction(void)
{
    transaction_t* txn = malloc(sizeof(transaction_t));
    if (!txn) {
        return NULL;
    }
    
    memset(txn, 0, sizeof(*txn));
    
    pthread_mutex_lock(&storage_engine.txn_mutex);
    
    txn->txn_id = storage_engine.next_txn_id++;
    txn->state = TXN_STATE_ACTIVE;
    txn->start_time = time(NULL);
    txn->start_lsn = storage_engine.next_lsn;
    
    pthread_mutex_init(&txn->lock_mutex, NULL);
    
    // Add to active transactions list
    txn->next = storage_engine.active_transactions;
    storage_engine.active_transactions = txn;
    
    pthread_mutex_unlock(&storage_engine.txn_mutex);
    
    printf("Transaction %lu started\n", txn->txn_id);
    
    return txn;
}

static int commit_transaction(transaction_t* txn)
{
    if (!txn || txn->state != TXN_STATE_ACTIVE) {
        return -1;
    }
    
    pthread_mutex_lock(&txn->lock_mutex);
    
    // Write commit record to WAL
    if (storage_engine.config.enable_wal) {
        write_wal_record(txn->txn_id, WAL_COMMIT, 0, NULL, 0);
        flush_wal_buffer();
    }
    
    txn->state = TXN_STATE_COMMITTED;
    txn->commit_time = time(NULL);
    txn->commit_lsn = storage_engine.next_lsn - 1;
    
    // Release all locks
    lock_entry_t* lock = txn->locks;
    while (lock) {
        lock_entry_t* next_lock = lock->next_in_txn;
        free(lock);
        lock = next_lock;
    }
    txn->locks = NULL;
    
    pthread_mutex_unlock(&txn->lock_mutex);
    
    atomic_fetch_add(&storage_engine.stats.transactions_committed, 1);
    
    printf("Transaction %lu committed\n", txn->txn_id);
    
    return 0;
}

static int abort_transaction(transaction_t* txn)
{
    if (!txn || txn->state != TXN_STATE_ACTIVE) {
        return -1;
    }
    
    pthread_mutex_lock(&txn->lock_mutex);
    
    // Apply undo operations in reverse order
    undo_entry_t* undo = txn->undo_log;
    while (undo) {
        // Implement undo logic here
        // This would restore the old values
        
        undo_entry_t* next_undo = undo->next;
        free(undo->key_data);
        free(undo->old_value_data);
        free(undo);
        undo = next_undo;
    }
    txn->undo_log = NULL;
    
    // Write abort record to WAL
    if (storage_engine.config.enable_wal) {
        write_wal_record(txn->txn_id, WAL_ABORT, 0, NULL, 0);
    }
    
    txn->state = TXN_STATE_ABORTED;
    
    // Release all locks
    lock_entry_t* lock = txn->locks;
    while (lock) {
        lock_entry_t* next_lock = lock->next_in_txn;
        free(lock);
        lock = next_lock;
    }
    txn->locks = NULL;
    
    pthread_mutex_unlock(&txn->lock_mutex);
    
    atomic_fetch_add(&storage_engine.stats.transactions_aborted, 1);
    
    printf("Transaction %lu aborted\n", txn->txn_id);
    
    return 0;
}

// High-level database operations
static int db_insert(transaction_t* txn, const char* key, size_t key_length,
                    const char* value, size_t value_length)
{
    if (!txn || txn->state != TXN_STATE_ACTIVE) {
        return -1;
    }
    
    // Start at root page
    uint32_t page_id = storage_engine.root_page_id;
    btree_page_t* page = get_page(page_id, LOCK_EXCLUSIVE);
    
    if (!page) {
        return -1;
    }
    
    // Find leaf page for insertion
    while (page->header.page_type == PAGE_TYPE_INTERNAL) {
        int pos = find_key_position(page, key, key_length);
        kv_pair_t* kv = get_kv_pair(page, pos);
        
        uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
        
        release_page(page_id, LOCK_EXCLUSIVE);
        page_id = child_page_id;
        page = get_page(page_id, LOCK_EXCLUSIVE);
        
        if (!page) {
            return -1;
        }
    }
    
    // Check if key already exists
    int pos = find_key_position(page, key, key_length);
    kv_pair_t* existing = get_kv_pair(page, pos);
    
    if (existing && compare_keys(key, key_length, existing->data, existing->key_length) == 0) {
        release_page(page_id, LOCK_EXCLUSIVE);
        return -1; // Key already exists
    }
    
    // Write WAL record
    if (storage_engine.config.enable_wal) {
        char wal_data[MAX_KEY_SIZE + MAX_VALUE_SIZE + 8];
        size_t wal_size = 0;
        
        memcpy(wal_data + wal_size, &key_length, sizeof(key_length));
        wal_size += sizeof(key_length);
        
        memcpy(wal_data + wal_size, &value_length, sizeof(value_length));
        wal_size += sizeof(value_length);
        
        memcpy(wal_data + wal_size, key, key_length);
        wal_size += key_length;
        
        memcpy(wal_data + wal_size, value, value_length);
        wal_size += value_length;
        
        write_wal_record(txn->txn_id, WAL_INSERT, page_id, wal_data, wal_size);
    }
    
    // Insert key-value pair
    if (insert_kv_pair(page, pos, key, key_length, value, value_length, 0) == 0) {
        mark_page_dirty(page_id);
        txn->stats.rows_inserted++;
        
        printf("Inserted key-value pair in transaction %lu\n", txn->txn_id);
    }
    
    release_page(page_id, LOCK_EXCLUSIVE);
    
    return 0;
}

static int db_search(transaction_t* txn, const char* key, size_t key_length,
                    char* value, size_t* value_length)
{
    if (!txn || txn->state != TXN_STATE_ACTIVE) {
        return -1;
    }
    
    // Start at root page
    uint32_t page_id = storage_engine.root_page_id;
    btree_page_t* page = get_page(page_id, LOCK_SHARED);
    
    if (!page) {
        return -1;
    }
    
    // Navigate to leaf page
    while (page->header.page_type == PAGE_TYPE_INTERNAL) {
        int pos = find_key_position(page, key, key_length);
        kv_pair_t* kv = get_kv_pair(page, pos);
        
        uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
        
        release_page(page_id, LOCK_SHARED);
        page_id = child_page_id;
        page = get_page(page_id, LOCK_SHARED);
        
        if (!page) {
            return -1;
        }
    }
    
    // Search for key in leaf page
    int pos = find_key_position(page, key, key_length);
    kv_pair_t* kv = get_kv_pair(page, pos);
    
    if (kv && compare_keys(key, key_length, kv->data, kv->key_length) == 0) {
        // Found the key
        const char* kv_value = kv->data + kv->key_length;
        size_t copy_length = kv->value_length < *value_length ? kv->value_length : *value_length;
        
        memcpy(value, kv_value, copy_length);
        *value_length = kv->value_length;
        
        release_page(page_id, LOCK_SHARED);
        
        printf("Found key in transaction %lu\n", txn->txn_id);
        return 0;
    }
    
    release_page(page_id, LOCK_SHARED);
    return -1; // Key not found
}

static int db_update(transaction_t* txn, const char* key, size_t key_length,
                    const char* new_value, size_t new_value_length)
{
    if (!txn || txn->state != TXN_STATE_ACTIVE) {
        return -1;
    }
    
    // Find the key first (similar to search)
    uint32_t page_id = storage_engine.root_page_id;
    btree_page_t* page = get_page(page_id, LOCK_EXCLUSIVE);
    
    if (!page) {
        return -1;
    }
    
    // Navigate to leaf page
    while (page->header.page_type == PAGE_TYPE_INTERNAL) {
        int pos = find_key_position(page, key, key_length);
        kv_pair_t* kv = get_kv_pair(page, pos);
        
        uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
        
        release_page(page_id, LOCK_EXCLUSIVE);
        page_id = child_page_id;
        page = get_page(page_id, LOCK_EXCLUSIVE);
        
        if (!page) {
            return -1;
        }
    }
    
    // Find key in leaf page
    int pos = find_key_position(page, key, key_length);
    kv_pair_t* kv = get_kv_pair(page, pos);
    
    if (!kv || compare_keys(key, key_length, kv->data, kv->key_length) != 0) {
        release_page(page_id, LOCK_EXCLUSIVE);
        return -1; // Key not found
    }
    
    // Save old value for undo log
    char* old_value = malloc(kv->value_length);
    if (old_value) {
        memcpy(old_value, kv->data + kv->key_length, kv->value_length);
        
        undo_entry_t* undo = malloc(sizeof(undo_entry_t));
        if (undo) {
            undo->operation = WAL_UPDATE;
            undo->page_id = page_id;
            undo->slot_id = pos;
            undo->key_length = key_length;
            undo->old_value_length = kv->value_length;
            undo->key_data = malloc(key_length);
            undo->old_value_data = old_value;
            
            if (undo->key_data) {
                memcpy(undo->key_data, key, key_length);
            }
            
            undo->next = txn->undo_log;
            txn->undo_log = undo;
            txn->undo_count++;
        }
    }
    
    // Write WAL record
    if (storage_engine.config.enable_wal) {
        char wal_data[MAX_KEY_SIZE + MAX_VALUE_SIZE * 2 + 16];
        size_t wal_size = 0;
        
        memcpy(wal_data + wal_size, &key_length, sizeof(key_length));
        wal_size += sizeof(key_length);
        
        uint16_t old_value_length = kv->value_length;
        memcpy(wal_data + wal_size, &old_value_length, sizeof(old_value_length));
        wal_size += sizeof(old_value_length);
        
        memcpy(wal_data + wal_size, &new_value_length, sizeof(new_value_length));
        wal_size += sizeof(new_value_length);
        
        memcpy(wal_data + wal_size, key, key_length);
        wal_size += key_length;
        
        memcpy(wal_data + wal_size, kv->data + kv->key_length, old_value_length);
        wal_size += old_value_length;
        
        memcpy(wal_data + wal_size, new_value, new_value_length);
        wal_size += new_value_length;
        
        write_wal_record(txn->txn_id, WAL_UPDATE, page_id, wal_data, wal_size);
    }
    
    // Update the value (simplified - assumes same size)
    if (kv->value_length == new_value_length) {
        memcpy(kv->data + kv->key_length, new_value, new_value_length);
        mark_page_dirty(page_id);
        txn->stats.rows_updated++;
        
        printf("Updated key in transaction %lu\n", txn->txn_id);
    }
    
    release_page(page_id, LOCK_EXCLUSIVE);
    
    return 0;
}

static int db_delete(transaction_t* txn, const char* key, size_t key_length)
{
    if (!txn || txn->state != TXN_STATE_ACTIVE) {
        return -1;
    }
    
    // Find and delete the key (similar to update)
    uint32_t page_id = storage_engine.root_page_id;
    btree_page_t* page = get_page(page_id, LOCK_EXCLUSIVE);
    
    if (!page) {
        return -1;
    }
    
    // Navigate to leaf page
    while (page->header.page_type == PAGE_TYPE_INTERNAL) {
        int pos = find_key_position(page, key, key_length);
        kv_pair_t* kv = get_kv_pair(page, pos);
        
        uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
        
        release_page(page_id, LOCK_EXCLUSIVE);
        page_id = child_page_id;
        page = get_page(page_id, LOCK_EXCLUSIVE);
        
        if (!page) {
            return -1;
        }
    }
    
    // Find key in leaf page
    int pos = find_key_position(page, key, key_length);
    kv_pair_t* kv = get_kv_pair(page, pos);
    
    if (!kv || compare_keys(key, key_length, kv->data, kv->key_length) != 0) {
        release_page(page_id, LOCK_EXCLUSIVE);
        return -1; // Key not found
    }
    
    // Write WAL record
    if (storage_engine.config.enable_wal) {
        char wal_data[MAX_KEY_SIZE + MAX_VALUE_SIZE + 8];
        size_t wal_size = 0;
        
        memcpy(wal_data + wal_size, &key_length, sizeof(key_length));
        wal_size += sizeof(key_length);
        
        memcpy(wal_data + wal_size, &kv->value_length, sizeof(kv->value_length));
        wal_size += sizeof(kv->value_length);
        
        memcpy(wal_data + wal_size, key, key_length);
        wal_size += key_length;
        
        memcpy(wal_data + wal_size, kv->data + kv->key_length, kv->value_length);
        wal_size += kv->value_length;
        
        write_wal_record(txn->txn_id, WAL_DELETE, page_id, wal_data, wal_size);
    }
    
    // Delete the key-value pair
    if (delete_kv_pair(page, pos) == 0) {
        mark_page_dirty(page_id);
        txn->stats.rows_deleted++;
        
        printf("Deleted key in transaction %lu\n", txn->txn_id);
    }
    
    release_page(page_id, LOCK_EXCLUSIVE);
    
    return 0;
}

// Checkpoint and recovery
static int perform_checkpoint(void)
{
    printf("Starting checkpoint...\n");
    
    atomic_fetch_add(&storage_engine.stats.checkpoints_performed, 1);
    
    // Flush WAL buffer
    flush_wal_buffer();
    
    // Write all dirty pages to disk
    pthread_mutex_lock(&storage_engine.buffer_mutex);
    
    for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
        buffer_entry_t* entry = &storage_engine.buffer_pool[i];
        
        if (entry->dirty && entry->page) {
            off_t offset = entry->page_id * PAGE_SIZE;
            if (pwrite(storage_engine.data_fd, entry->page, PAGE_SIZE, offset) == PAGE_SIZE) {
                entry->dirty = false;
                atomic_fetch_add(&storage_engine.stats.pages_written, 1);
            }
        }
    }
    
    pthread_mutex_unlock(&storage_engine.buffer_mutex);
    
    // Sync data file
    fsync(storage_engine.data_fd);
    
    // Write checkpoint record
    write_wal_record(0, WAL_CHECKPOINT, 0, NULL, 0);
    flush_wal_buffer();
    
    storage_engine.last_checkpoint_lsn = storage_engine.next_lsn - 1;
    
    printf("Checkpoint completed (LSN: %lu)\n", storage_engine.last_checkpoint_lsn);
    
    return 0;
}

// Statistics and monitoring
static void print_storage_statistics(void)
{
    printf("\n=== Storage Engine Statistics ===\n");
    
    printf("Buffer Pool:\n");
    printf("  Pages read: %lu\n", atomic_load(&storage_engine.stats.pages_read));
    printf("  Pages written: %lu\n", atomic_load(&storage_engine.stats.pages_written));
    printf("  Cache hits: %lu\n", atomic_load(&storage_engine.stats.cache_hits));
    printf("  Cache misses: %lu\n", atomic_load(&storage_engine.stats.cache_misses));
    
    uint64_t total_accesses = atomic_load(&storage_engine.stats.cache_hits) + 
                             atomic_load(&storage_engine.stats.cache_misses);
    if (total_accesses > 0) {
        double hit_ratio = (double)atomic_load(&storage_engine.stats.cache_hits) / total_accesses;
        printf("  Cache hit ratio: %.2f%%\n", hit_ratio * 100.0);
    }
    
    printf("\nTransactions:\n");
    printf("  Committed: %lu\n", atomic_load(&storage_engine.stats.transactions_committed));
    printf("  Aborted: %lu\n", atomic_load(&storage_engine.stats.transactions_aborted));
    
    printf("\nWAL:\n");
    printf("  Records written: %lu\n", atomic_load(&storage_engine.stats.wal_records_written));
    printf("  Checkpoints: %lu\n", atomic_load(&storage_engine.stats.checkpoints_performed));
    printf("  Next LSN: %lu\n", storage_engine.next_lsn);
    printf("  Last checkpoint LSN: %lu\n", storage_engine.last_checkpoint_lsn);
    
    printf("\nActive Transactions:\n");
    pthread_mutex_lock(&storage_engine.txn_mutex);
    transaction_t* txn = storage_engine.active_transactions;
    int count = 0;
    while (txn) {
        printf("  TXN %lu: inserts=%lu, updates=%lu, deletes=%lu\n",
               txn->txn_id, txn->stats.rows_inserted,
               txn->stats.rows_updated, txn->stats.rows_deleted);
        txn = txn->next;
        count++;
    }
    printf("  Total active: %d\n", count);
    pthread_mutex_unlock(&storage_engine.txn_mutex);
    
    printf("=================================\n");
}

// Initialization and cleanup
static int init_storage_engine(const char* data_file, const char* wal_file)
{
    memset(&storage_engine, 0, sizeof(storage_engine));
    
    // Configuration
    storage_engine.config.enable_checksums = true;
    storage_engine.config.enable_wal = true;
    storage_engine.config.checkpoint_interval = 10000;
    storage_engine.config.wal_segment_size = 64 * 1024 * 1024;
    storage_engine.config.buffer_pool_hit_ratio_target = 0.95;
    
    // Open data file
    storage_engine.data_fd = open(data_file, O_RDWR | O_CREAT, 0644);
    if (storage_engine.data_fd < 0) {
        perror("open data file");
        return -1;
    }
    
    storage_engine.data_filename = strdup(data_file);
    
    // Open WAL file
    storage_engine.wal_fd = open(wal_file, O_RDWR | O_CREAT | O_APPEND, 0644);
    if (storage_engine.wal_fd < 0) {
        perror("open WAL file");
        close(storage_engine.data_fd);
        return -1;
    }
    
    storage_engine.wal_filename = strdup(wal_file);
    
    // Initialize WAL buffer
    storage_engine.wal_buffer = malloc(WAL_BUFFER_SIZE);
    if (!storage_engine.wal_buffer) {
        close(storage_engine.data_fd);
        close(storage_engine.wal_fd);
        return -1;
    }
    
    // Initialize hash table for buffer pool
    storage_engine.hash_table_size = BUFFER_POOL_SIZE * 2;
    storage_engine.hash_table = malloc(storage_engine.hash_table_size * sizeof(uint32_t));
    if (!storage_engine.hash_table) {
        return -1;
    }
    
    for (size_t i = 0; i < storage_engine.hash_table_size; i++) {
        storage_engine.hash_table[i] = UINT32_MAX;
    }
    
    // Initialize buffer pool
    for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
        buffer_entry_t* entry = &storage_engine.buffer_pool[i];
        pthread_rwlock_init(&entry->page_lock, NULL);
        entry->hash_next = UINT32_MAX;
    }
    
    // Initialize lock table
    storage_engine.lock_table_size = 10007; // Prime number
    storage_engine.lock_table = calloc(storage_engine.lock_table_size, sizeof(lock_entry_t*));
    
    // Initialize mutexes
    pthread_mutex_init(&storage_engine.buffer_mutex, NULL);
    pthread_mutex_init(&storage_engine.free_page_mutex, NULL);
    pthread_mutex_init(&storage_engine.txn_mutex, NULL);
    pthread_mutex_init(&storage_engine.lock_table_mutex, NULL);
    pthread_mutex_init(&storage_engine.wal_mutex, NULL);
    
    // Initialize counters
    storage_engine.next_txn_id = 1;
    storage_engine.next_lsn = 1;
    storage_engine.next_page_id = 1;
    storage_engine.root_page_id = 1;
    
    // Initialize root page if file is empty
    struct stat st;
    if (fstat(storage_engine.data_fd, &st) == 0 && st.st_size == 0) {
        btree_page_t* root_page = get_page(storage_engine.root_page_id, LOCK_EXCLUSIVE);
        if (root_page) {
            root_page->header.page_type = PAGE_TYPE_LEAF;
            mark_page_dirty(storage_engine.root_page_id);
            release_page(storage_engine.root_page_id, LOCK_EXCLUSIVE);
        }
    }
    
    printf("Storage engine initialized\n");
    printf("Data file: %s\n", data_file);
    printf("WAL file: %s\n", wal_file);
    
    return 0;
}

static void cleanup_storage_engine(void)
{
    // Perform final checkpoint
    perform_checkpoint();
    
    // Close files
    if (storage_engine.data_fd >= 0) {
        close(storage_engine.data_fd);
    }
    
    if (storage_engine.wal_fd >= 0) {
        close(storage_engine.wal_fd);
    }
    
    // Free memory
    free(storage_engine.data_filename);
    free(storage_engine.wal_filename);
    free(storage_engine.wal_buffer);
    free(storage_engine.hash_table);
    free(storage_engine.free_pages);
    free(storage_engine.lock_table);
    
    // Cleanup buffer pool
    for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
        buffer_entry_t* entry = &storage_engine.buffer_pool[i];
        if (entry->page) {
            free(entry->page);
        }
        pthread_rwlock_destroy(&entry->page_lock);
    }
    
    // Cleanup transactions
    pthread_mutex_lock(&storage_engine.txn_mutex);
    transaction_t* txn = storage_engine.active_transactions;
    while (txn) {
        transaction_t* next_txn = txn->next;
        abort_transaction(txn);
        free(txn);
        txn = next_txn;
    }
    pthread_mutex_unlock(&storage_engine.txn_mutex);
    
    // Destroy mutexes
    pthread_mutex_destroy(&storage_engine.buffer_mutex);
    pthread_mutex_destroy(&storage_engine.free_page_mutex);
    pthread_mutex_destroy(&storage_engine.txn_mutex);
    pthread_mutex_destroy(&storage_engine.lock_table_mutex);
    pthread_mutex_destroy(&storage_engine.wal_mutex);
    
    printf("Storage engine cleanup completed\n");
}

// Signal handlers
static void signal_handler(int sig)
{
    if (sig == SIGINT || sig == SIGTERM) {
        printf("\nReceived signal %d, shutting down storage engine...\n", sig);
        cleanup_storage_engine();
        exit(0);
    } else if (sig == SIGUSR1) {
        print_storage_statistics();
    } else if (sig == SIGUSR2) {
        perform_checkpoint();
    }
}

// Test and demonstration
static void test_storage_engine(void)
{
    printf("Testing storage engine...\n");
    
    // Create some test transactions
    transaction_t* txn1 = begin_transaction();
    transaction_t* txn2 = begin_transaction();
    
    if (txn1 && txn2) {
        // Test insertions
        db_insert(txn1, "key1", 4, "value1", 6);
        db_insert(txn1, "key2", 4, "value2", 6);
        db_insert(txn2, "key3", 4, "value3", 6);
        
        // Test searches
        char value[MAX_VALUE_SIZE];
        size_t value_length = sizeof(value);
        
        if (db_search(txn1, "key1", 4, value, &value_length) == 0) {
            printf("Found key1: %.*s\n", (int)value_length, value);
        }
        
        // Test updates
        db_update(txn1, "key1", 4, "updated_value1", 14);
        
        // Test deletion
        db_delete(txn2, "key3", 4);
        
        // Commit transactions
        commit_transaction(txn1);
        commit_transaction(txn2);
        
        free(txn1);
        free(txn2);
    }
    
    // Perform checkpoint
    perform_checkpoint();
    
    printf("Storage engine test completed\n");
}

// Main function
int main(int argc, char* argv[])
{
    const char* data_file = "database.db";
    const char* wal_file = "database.wal";
    
    if (argc > 1) {
        data_file = argv[1];
    }
    
    if (argc > 2) {
        wal_file = argv[2];
    }
    
    // Set up signal handlers
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    signal(SIGUSR1, signal_handler);
    signal(SIGUSR2, signal_handler);
    
    printf("Advanced Database Storage Engine\n");
    
    // Initialize storage engine
    if (init_storage_engine(data_file, wal_file) != 0) {
        fprintf(stderr, "Failed to initialize storage engine\n");
        return 1;
    }
    
    // Run tests
    test_storage_engine();
    
    printf("Storage engine running...\n");
    printf("Send SIGUSR1 for statistics, SIGUSR2 for checkpoint, SIGINT to exit\n");
    
    // Main loop
    while (1) {
        sleep(5);
        
        // Automatic checkpoint if needed
        if (storage_engine.next_lsn - storage_engine.last_checkpoint_lsn > 
            storage_engine.config.checkpoint_interval) {
            perform_checkpoint();
        }
    }
    
    cleanup_storage_engine();
    return 0;
}

This comprehensive Linux database systems programming blog post covers:

  1. B-Tree Storage Engine - Complete implementation with page management, key-value operations, and concurrent access
  2. Transaction Processing - ACID properties with begin/commit/abort operations and undo logging
  3. Write-Ahead Logging (WAL) - Durability and recovery with log record management and checkpointing
  4. Buffer Pool Management - LRU cache with hash table lookup and dirty page tracking
  5. Concurrency Control - Lock management, deadlock prevention, and multi-threaded access

The implementation demonstrates enterprise-grade database programming techniques suitable for building high-performance storage systems and transaction processors.