Advanced Linux Database Systems Programming: Building High-Performance Storage Engines and Transaction Processing
Advanced Linux database systems programming requires deep understanding of storage engines, transaction processing, concurrency control, and query optimization. This comprehensive guide explores building custom database systems including B-tree implementations, Write-Ahead Logging, MVCC, and creating production-grade ACID-compliant database engines.
Advanced Linux Database Systems Programming
Custom Storage Engine and Transaction Manager
High-Performance B-Tree Storage Engine
// btree_storage.c - Advanced B-tree storage engine implementation
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <pthread.h>
#include <stdatomic.h>
#include <assert.h>
#include <time.h>
#define PAGE_SIZE 4096
#define MAX_KEY_SIZE 256
#define MAX_VALUE_SIZE 1024
#define MAX_PAGES 1000000
#define BTREE_ORDER 128
#define BUFFER_POOL_SIZE 10000
#define WAL_BUFFER_SIZE (1024 * 1024)
// Page types
typedef enum {
PAGE_TYPE_LEAF = 1,
PAGE_TYPE_INTERNAL = 2,
PAGE_TYPE_OVERFLOW = 3,
PAGE_TYPE_FREE = 4
} page_type_t;
// Lock types
typedef enum {
LOCK_NONE = 0,
LOCK_SHARED = 1,
LOCK_EXCLUSIVE = 2,
LOCK_UPDATE = 3
} lock_type_t;
// Transaction states
typedef enum {
TXN_STATE_ACTIVE,
TXN_STATE_COMMITTED,
TXN_STATE_ABORTED,
TXN_STATE_PREPARED
} transaction_state_t;
// WAL record types
typedef enum {
WAL_INSERT = 1,
WAL_UPDATE = 2,
WAL_DELETE = 3,
WAL_COMMIT = 4,
WAL_ABORT = 5,
WAL_CHECKPOINT = 6
} wal_record_type_t;
// Page header structure
typedef struct {
uint32_t page_id;
page_type_t page_type;
uint16_t key_count;
uint16_t free_space;
uint32_t parent_page_id;
uint32_t next_page_id;
uint32_t prev_page_id;
uint64_t lsn; // Log Sequence Number
uint32_t checksum;
char reserved[32];
} page_header_t;
// Key-value pair structure
typedef struct {
uint16_t key_length;
uint16_t value_length;
uint32_t child_page_id; // For internal nodes
char data[]; // Key followed by value
} kv_pair_t;
// B-tree page structure
typedef struct {
page_header_t header;
char data[PAGE_SIZE - sizeof(page_header_t)];
} btree_page_t;
// Buffer pool entry
typedef struct {
btree_page_t* page;
uint32_t page_id;
bool dirty;
bool pinned;
atomic_int ref_count;
pthread_rwlock_t page_lock;
struct timespec last_access;
uint32_t hash_next; // For hash table chaining
} buffer_entry_t;
// Transaction structure
typedef struct transaction {
uint64_t txn_id;
transaction_state_t state;
uint64_t start_lsn;
uint64_t commit_lsn;
time_t start_time;
time_t commit_time;
// Lock table
struct lock_entry* locks;
pthread_mutex_t lock_mutex;
// Undo log
struct undo_entry* undo_log;
size_t undo_count;
// Statistics
struct {
uint64_t pages_read;
uint64_t pages_written;
uint64_t rows_inserted;
uint64_t rows_updated;
uint64_t rows_deleted;
} stats;
struct transaction* next;
} transaction_t;
// Lock entry
typedef struct lock_entry {
uint32_t page_id;
uint64_t key_hash;
lock_type_t lock_type;
transaction_t* owner;
struct lock_entry* next_in_txn;
struct lock_entry* next_in_table;
} lock_entry_t;
// Undo log entry
typedef struct undo_entry {
wal_record_type_t operation;
uint32_t page_id;
uint16_t slot_id;
uint16_t key_length;
uint16_t old_value_length;
char* key_data;
char* old_value_data;
struct undo_entry* next;
} undo_entry_t;
// WAL record
typedef struct {
uint64_t lsn;
uint64_t txn_id;
wal_record_type_t type;
uint32_t page_id;
uint16_t data_length;
uint32_t checksum;
char data[];
} wal_record_t;
// Storage engine context
typedef struct {
// File management
int data_fd;
int wal_fd;
char* data_filename;
char* wal_filename;
// Buffer pool
buffer_entry_t buffer_pool[BUFFER_POOL_SIZE];
uint32_t* hash_table;
size_t hash_table_size;
pthread_mutex_t buffer_mutex;
// Free page management
uint32_t* free_pages;
size_t free_page_count;
size_t free_page_capacity;
uint32_t next_page_id;
pthread_mutex_t free_page_mutex;
// Transaction management
transaction_t* active_transactions;
uint64_t next_txn_id;
pthread_mutex_t txn_mutex;
// Lock table
lock_entry_t** lock_table;
size_t lock_table_size;
pthread_mutex_t lock_table_mutex;
// WAL management
uint8_t* wal_buffer;
size_t wal_buffer_pos;
uint64_t next_lsn;
uint64_t last_checkpoint_lsn;
pthread_mutex_t wal_mutex;
// Root page
uint32_t root_page_id;
// Statistics
struct {
atomic_uint64_t pages_read;
atomic_uint64_t pages_written;
atomic_uint64_t cache_hits;
atomic_uint64_t cache_misses;
atomic_uint64_t transactions_committed;
atomic_uint64_t transactions_aborted;
atomic_uint64_t wal_records_written;
atomic_uint64_t checkpoints_performed;
} stats;
// Configuration
struct {
bool enable_checksums;
bool enable_wal;
bool enable_compression;
size_t checkpoint_interval;
size_t wal_segment_size;
double buffer_pool_hit_ratio_target;
} config;
} storage_engine_t;
static storage_engine_t storage_engine = {0};
// Utility functions
static uint32_t hash_page_id(uint32_t page_id)
{
// Simple hash function
return page_id % storage_engine.hash_table_size;
}
static uint64_t hash_key(const char* key, size_t length)
{
// FNV-1a hash
uint64_t hash = 14695981039346656037ULL;
for (size_t i = 0; i < length; i++) {
hash ^= (uint8_t)key[i];
hash *= 1099511628211ULL;
}
return hash;
}
static uint32_t calculate_checksum(const void* data, size_t length)
{
// Simple CRC32-like checksum
uint32_t checksum = 0;
const uint8_t* bytes = (const uint8_t*)data;
for (size_t i = 0; i < length; i++) {
checksum = (checksum << 1) ^ bytes[i];
}
return checksum;
}
// WAL (Write-Ahead Logging) implementation
static uint64_t allocate_lsn(void)
{
return atomic_fetch_add(&storage_engine.next_lsn, 1);
}
static int write_wal_record(uint64_t txn_id, wal_record_type_t type,
uint32_t page_id, const void* data, size_t data_length)
{
pthread_mutex_lock(&storage_engine.wal_mutex);
size_t record_size = sizeof(wal_record_t) + data_length;
// Check if we need to flush the buffer
if (storage_engine.wal_buffer_pos + record_size > WAL_BUFFER_SIZE) {
// Write buffer to disk
ssize_t bytes_written = write(storage_engine.wal_fd, storage_engine.wal_buffer,
storage_engine.wal_buffer_pos);
if (bytes_written != (ssize_t)storage_engine.wal_buffer_pos) {
pthread_mutex_unlock(&storage_engine.wal_mutex);
return -1;
}
storage_engine.wal_buffer_pos = 0;
fsync(storage_engine.wal_fd);
}
// Create WAL record
wal_record_t* record = (wal_record_t*)(storage_engine.wal_buffer + storage_engine.wal_buffer_pos);
record->lsn = allocate_lsn();
record->txn_id = txn_id;
record->type = type;
record->page_id = page_id;
record->data_length = data_length;
if (data && data_length > 0) {
memcpy(record->data, data, data_length);
}
record->checksum = calculate_checksum(record, record_size - sizeof(record->checksum));
storage_engine.wal_buffer_pos += record_size;
atomic_fetch_add(&storage_engine.stats.wal_records_written, 1);
pthread_mutex_unlock(&storage_engine.wal_mutex);
return 0;
}
static int flush_wal_buffer(void)
{
pthread_mutex_lock(&storage_engine.wal_mutex);
if (storage_engine.wal_buffer_pos > 0) {
ssize_t bytes_written = write(storage_engine.wal_fd, storage_engine.wal_buffer,
storage_engine.wal_buffer_pos);
if (bytes_written != (ssize_t)storage_engine.wal_buffer_pos) {
pthread_mutex_unlock(&storage_engine.wal_mutex);
return -1;
}
fsync(storage_engine.wal_fd);
storage_engine.wal_buffer_pos = 0;
}
pthread_mutex_unlock(&storage_engine.wal_mutex);
return 0;
}
// Buffer pool management
static buffer_entry_t* find_buffer_entry(uint32_t page_id)
{
uint32_t hash = hash_page_id(page_id);
uint32_t entry_idx = storage_engine.hash_table[hash];
while (entry_idx != UINT32_MAX) {
buffer_entry_t* entry = &storage_engine.buffer_pool[entry_idx];
if (entry->page_id == page_id) {
return entry;
}
entry_idx = entry->hash_next;
}
return NULL;
}
static buffer_entry_t* allocate_buffer_entry(uint32_t page_id)
{
pthread_mutex_lock(&storage_engine.buffer_mutex);
// Find least recently used unpinned page
buffer_entry_t* victim = NULL;
struct timespec oldest_time = {0};
for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
buffer_entry_t* entry = &storage_engine.buffer_pool[i];
if (!entry->pinned && atomic_load(&entry->ref_count) == 0) {
if (!victim ||
entry->last_access.tv_sec < oldest_time.tv_sec ||
(entry->last_access.tv_sec == oldest_time.tv_sec &&
entry->last_access.tv_nsec < oldest_time.tv_nsec)) {
victim = entry;
oldest_time = entry->last_access;
}
}
}
if (!victim) {
pthread_mutex_unlock(&storage_engine.buffer_mutex);
return NULL; // Buffer pool full
}
// Evict victim if dirty
if (victim->dirty && victim->page) {
// Write page to disk
off_t offset = victim->page_id * PAGE_SIZE;
if (pwrite(storage_engine.data_fd, victim->page, PAGE_SIZE, offset) != PAGE_SIZE) {
pthread_mutex_unlock(&storage_engine.buffer_mutex);
return NULL;
}
atomic_fetch_add(&storage_engine.stats.pages_written, 1);
victim->dirty = false;
}
// Remove from hash table
if (victim->page_id != 0) {
uint32_t hash = hash_page_id(victim->page_id);
uint32_t* current = &storage_engine.hash_table[hash];
while (*current != UINT32_MAX) {
if (*current == (victim - storage_engine.buffer_pool)) {
*current = victim->hash_next;
break;
}
current = &storage_engine.buffer_pool[*current].hash_next;
}
}
// Initialize new entry
victim->page_id = page_id;
victim->dirty = false;
victim->pinned = false;
atomic_store(&victim->ref_count, 1);
clock_gettime(CLOCK_MONOTONIC, &victim->last_access);
// Add to hash table
uint32_t hash = hash_page_id(page_id);
victim->hash_next = storage_engine.hash_table[hash];
storage_engine.hash_table[hash] = victim - storage_engine.buffer_pool;
pthread_mutex_unlock(&storage_engine.buffer_mutex);
return victim;
}
static btree_page_t* get_page(uint32_t page_id, lock_type_t lock_type)
{
// Check buffer pool first
buffer_entry_t* entry = find_buffer_entry(page_id);
if (entry) {
atomic_fetch_add(&entry->ref_count, 1);
clock_gettime(CLOCK_MONOTONIC, &entry->last_access);
atomic_fetch_add(&storage_engine.stats.cache_hits, 1);
// Acquire page lock
if (lock_type == LOCK_SHARED) {
pthread_rwlock_rdlock(&entry->page_lock);
} else if (lock_type == LOCK_EXCLUSIVE) {
pthread_rwlock_wrlock(&entry->page_lock);
}
return entry->page;
}
// Page not in buffer pool - allocate new entry
entry = allocate_buffer_entry(page_id);
if (!entry) {
return NULL; // Buffer pool full
}
atomic_fetch_add(&storage_engine.stats.cache_misses, 1);
// Allocate page memory if needed
if (!entry->page) {
entry->page = aligned_alloc(PAGE_SIZE, PAGE_SIZE);
if (!entry->page) {
atomic_fetch_sub(&entry->ref_count, 1);
return NULL;
}
}
// Read page from disk
off_t offset = page_id * PAGE_SIZE;
if (pread(storage_engine.data_fd, entry->page, PAGE_SIZE, offset) != PAGE_SIZE) {
// Page doesn't exist - initialize new page
memset(entry->page, 0, PAGE_SIZE);
entry->page->header.page_id = page_id;
entry->page->header.page_type = PAGE_TYPE_LEAF;
entry->page->header.free_space = PAGE_SIZE - sizeof(page_header_t);
entry->dirty = true;
}
atomic_fetch_add(&storage_engine.stats.pages_read, 1);
// Verify checksum if enabled
if (storage_engine.config.enable_checksums) {
uint32_t stored_checksum = entry->page->header.checksum;
entry->page->header.checksum = 0;
uint32_t calculated_checksum = calculate_checksum(entry->page, PAGE_SIZE);
entry->page->header.checksum = stored_checksum;
if (stored_checksum != 0 && stored_checksum != calculated_checksum) {
printf("Checksum mismatch for page %u\n", page_id);
atomic_fetch_sub(&entry->ref_count, 1);
return NULL;
}
}
// Acquire page lock
if (lock_type == LOCK_SHARED) {
pthread_rwlock_rdlock(&entry->page_lock);
} else if (lock_type == LOCK_EXCLUSIVE) {
pthread_rwlock_wrlock(&entry->page_lock);
}
return entry->page;
}
static void release_page(uint32_t page_id, lock_type_t lock_type)
{
buffer_entry_t* entry = find_buffer_entry(page_id);
if (!entry) {
return;
}
// Release page lock
if (lock_type == LOCK_SHARED || lock_type == LOCK_EXCLUSIVE) {
pthread_rwlock_unlock(&entry->page_lock);
}
atomic_fetch_sub(&entry->ref_count, 1);
}
static void mark_page_dirty(uint32_t page_id)
{
buffer_entry_t* entry = find_buffer_entry(page_id);
if (entry) {
entry->dirty = true;
// Update LSN
if (storage_engine.config.enable_wal) {
entry->page->header.lsn = storage_engine.next_lsn - 1;
}
// Update checksum
if (storage_engine.config.enable_checksums) {
entry->page->header.checksum = 0;
entry->page->header.checksum = calculate_checksum(entry->page, PAGE_SIZE);
}
}
}
// Free page management
static uint32_t allocate_page(void)
{
pthread_mutex_lock(&storage_engine.free_page_mutex);
uint32_t page_id;
if (storage_engine.free_page_count > 0) {
// Reuse a free page
page_id = storage_engine.free_pages[--storage_engine.free_page_count];
} else {
// Allocate new page
page_id = storage_engine.next_page_id++;
}
pthread_mutex_unlock(&storage_engine.free_page_mutex);
return page_id;
}
static void deallocate_page(uint32_t page_id)
{
pthread_mutex_lock(&storage_engine.free_page_mutex);
// Grow free page array if necessary
if (storage_engine.free_page_count >= storage_engine.free_page_capacity) {
size_t new_capacity = storage_engine.free_page_capacity * 2;
if (new_capacity == 0) new_capacity = 1024;
uint32_t* new_array = realloc(storage_engine.free_pages,
new_capacity * sizeof(uint32_t));
if (new_array) {
storage_engine.free_pages = new_array;
storage_engine.free_page_capacity = new_capacity;
}
}
if (storage_engine.free_page_count < storage_engine.free_page_capacity) {
storage_engine.free_pages[storage_engine.free_page_count++] = page_id;
}
pthread_mutex_unlock(&storage_engine.free_page_mutex);
}
// B-tree operations
static int compare_keys(const char* key1, size_t len1, const char* key2, size_t len2)
{
size_t min_len = len1 < len2 ? len1 : len2;
int result = memcmp(key1, key2, min_len);
if (result == 0) {
if (len1 < len2) return -1;
if (len1 > len2) return 1;
return 0;
}
return result;
}
static kv_pair_t* get_kv_pair(btree_page_t* page, int index)
{
if (index < 0 || index >= page->header.key_count) {
return NULL;
}
// Find the key-value pair at the given index
char* data_ptr = page->data;
for (int i = 0; i <= index; i++) {
if (i == index) {
return (kv_pair_t*)data_ptr;
}
kv_pair_t* kv = (kv_pair_t*)data_ptr;
data_ptr += sizeof(kv_pair_t) + kv->key_length + kv->value_length;
}
return NULL;
}
static int find_key_position(btree_page_t* page, const char* key, size_t key_length)
{
int left = 0;
int right = page->header.key_count - 1;
while (left <= right) {
int mid = (left + right) / 2;
kv_pair_t* kv = get_kv_pair(page, mid);
if (!kv) break;
const char* kv_key = kv->data;
int cmp = compare_keys(key, key_length, kv_key, kv->key_length);
if (cmp == 0) {
return mid; // Exact match
} else if (cmp < 0) {
right = mid - 1;
} else {
left = mid + 1;
}
}
return left; // Insertion position
}
static int insert_kv_pair(btree_page_t* page, int position, const char* key, size_t key_length,
const char* value, size_t value_length, uint32_t child_page_id)
{
size_t pair_size = sizeof(kv_pair_t) + key_length + value_length;
if (page->header.free_space < pair_size) {
return -1; // Not enough space
}
// Find insertion point
char* insert_ptr = page->data;
for (int i = 0; i < position; i++) {
kv_pair_t* kv = (kv_pair_t*)insert_ptr;
insert_ptr += sizeof(kv_pair_t) + kv->key_length + kv->value_length;
}
// Calculate space needed to move existing data
char* end_ptr = page->data + (PAGE_SIZE - sizeof(page_header_t) - page->header.free_space);
size_t move_size = end_ptr - insert_ptr;
// Move existing data to make room
if (move_size > 0) {
memmove(insert_ptr + pair_size, insert_ptr, move_size);
}
// Insert new key-value pair
kv_pair_t* new_kv = (kv_pair_t*)insert_ptr;
new_kv->key_length = key_length;
new_kv->value_length = value_length;
new_kv->child_page_id = child_page_id;
memcpy(new_kv->data, key, key_length);
memcpy(new_kv->data + key_length, value, value_length);
page->header.key_count++;
page->header.free_space -= pair_size;
return 0;
}
static int delete_kv_pair(btree_page_t* page, int position)
{
if (position < 0 || position >= page->header.key_count) {
return -1;
}
kv_pair_t* kv = get_kv_pair(page, position);
if (!kv) return -1;
size_t pair_size = sizeof(kv_pair_t) + kv->key_length + kv->value_length;
// Calculate data to move
char* delete_ptr = (char*)kv;
char* next_ptr = delete_ptr + pair_size;
char* end_ptr = page->data + (PAGE_SIZE - sizeof(page_header_t) - page->header.free_space);
size_t move_size = end_ptr - next_ptr;
// Move data to close the gap
if (move_size > 0) {
memmove(delete_ptr, next_ptr, move_size);
}
page->header.key_count--;
page->header.free_space += pair_size;
return 0;
}
// Transaction management
static transaction_t* begin_transaction(void)
{
transaction_t* txn = malloc(sizeof(transaction_t));
if (!txn) {
return NULL;
}
memset(txn, 0, sizeof(*txn));
pthread_mutex_lock(&storage_engine.txn_mutex);
txn->txn_id = storage_engine.next_txn_id++;
txn->state = TXN_STATE_ACTIVE;
txn->start_time = time(NULL);
txn->start_lsn = storage_engine.next_lsn;
pthread_mutex_init(&txn->lock_mutex, NULL);
// Add to active transactions list
txn->next = storage_engine.active_transactions;
storage_engine.active_transactions = txn;
pthread_mutex_unlock(&storage_engine.txn_mutex);
printf("Transaction %lu started\n", txn->txn_id);
return txn;
}
static int commit_transaction(transaction_t* txn)
{
if (!txn || txn->state != TXN_STATE_ACTIVE) {
return -1;
}
pthread_mutex_lock(&txn->lock_mutex);
// Write commit record to WAL
if (storage_engine.config.enable_wal) {
write_wal_record(txn->txn_id, WAL_COMMIT, 0, NULL, 0);
flush_wal_buffer();
}
txn->state = TXN_STATE_COMMITTED;
txn->commit_time = time(NULL);
txn->commit_lsn = storage_engine.next_lsn - 1;
// Release all locks
lock_entry_t* lock = txn->locks;
while (lock) {
lock_entry_t* next_lock = lock->next_in_txn;
free(lock);
lock = next_lock;
}
txn->locks = NULL;
pthread_mutex_unlock(&txn->lock_mutex);
atomic_fetch_add(&storage_engine.stats.transactions_committed, 1);
printf("Transaction %lu committed\n", txn->txn_id);
return 0;
}
static int abort_transaction(transaction_t* txn)
{
if (!txn || txn->state != TXN_STATE_ACTIVE) {
return -1;
}
pthread_mutex_lock(&txn->lock_mutex);
// Apply undo operations in reverse order
undo_entry_t* undo = txn->undo_log;
while (undo) {
// Implement undo logic here
// This would restore the old values
undo_entry_t* next_undo = undo->next;
free(undo->key_data);
free(undo->old_value_data);
free(undo);
undo = next_undo;
}
txn->undo_log = NULL;
// Write abort record to WAL
if (storage_engine.config.enable_wal) {
write_wal_record(txn->txn_id, WAL_ABORT, 0, NULL, 0);
}
txn->state = TXN_STATE_ABORTED;
// Release all locks
lock_entry_t* lock = txn->locks;
while (lock) {
lock_entry_t* next_lock = lock->next_in_txn;
free(lock);
lock = next_lock;
}
txn->locks = NULL;
pthread_mutex_unlock(&txn->lock_mutex);
atomic_fetch_add(&storage_engine.stats.transactions_aborted, 1);
printf("Transaction %lu aborted\n", txn->txn_id);
return 0;
}
// High-level database operations
static int db_insert(transaction_t* txn, const char* key, size_t key_length,
const char* value, size_t value_length)
{
if (!txn || txn->state != TXN_STATE_ACTIVE) {
return -1;
}
// Start at root page
uint32_t page_id = storage_engine.root_page_id;
btree_page_t* page = get_page(page_id, LOCK_EXCLUSIVE);
if (!page) {
return -1;
}
// Find leaf page for insertion
while (page->header.page_type == PAGE_TYPE_INTERNAL) {
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
release_page(page_id, LOCK_EXCLUSIVE);
page_id = child_page_id;
page = get_page(page_id, LOCK_EXCLUSIVE);
if (!page) {
return -1;
}
}
// Check if key already exists
int pos = find_key_position(page, key, key_length);
kv_pair_t* existing = get_kv_pair(page, pos);
if (existing && compare_keys(key, key_length, existing->data, existing->key_length) == 0) {
release_page(page_id, LOCK_EXCLUSIVE);
return -1; // Key already exists
}
// Write WAL record
if (storage_engine.config.enable_wal) {
char wal_data[MAX_KEY_SIZE + MAX_VALUE_SIZE + 8];
size_t wal_size = 0;
memcpy(wal_data + wal_size, &key_length, sizeof(key_length));
wal_size += sizeof(key_length);
memcpy(wal_data + wal_size, &value_length, sizeof(value_length));
wal_size += sizeof(value_length);
memcpy(wal_data + wal_size, key, key_length);
wal_size += key_length;
memcpy(wal_data + wal_size, value, value_length);
wal_size += value_length;
write_wal_record(txn->txn_id, WAL_INSERT, page_id, wal_data, wal_size);
}
// Insert key-value pair
if (insert_kv_pair(page, pos, key, key_length, value, value_length, 0) == 0) {
mark_page_dirty(page_id);
txn->stats.rows_inserted++;
printf("Inserted key-value pair in transaction %lu\n", txn->txn_id);
}
release_page(page_id, LOCK_EXCLUSIVE);
return 0;
}
static int db_search(transaction_t* txn, const char* key, size_t key_length,
char* value, size_t* value_length)
{
if (!txn || txn->state != TXN_STATE_ACTIVE) {
return -1;
}
// Start at root page
uint32_t page_id = storage_engine.root_page_id;
btree_page_t* page = get_page(page_id, LOCK_SHARED);
if (!page) {
return -1;
}
// Navigate to leaf page
while (page->header.page_type == PAGE_TYPE_INTERNAL) {
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
release_page(page_id, LOCK_SHARED);
page_id = child_page_id;
page = get_page(page_id, LOCK_SHARED);
if (!page) {
return -1;
}
}
// Search for key in leaf page
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
if (kv && compare_keys(key, key_length, kv->data, kv->key_length) == 0) {
// Found the key
const char* kv_value = kv->data + kv->key_length;
size_t copy_length = kv->value_length < *value_length ? kv->value_length : *value_length;
memcpy(value, kv_value, copy_length);
*value_length = kv->value_length;
release_page(page_id, LOCK_SHARED);
printf("Found key in transaction %lu\n", txn->txn_id);
return 0;
}
release_page(page_id, LOCK_SHARED);
return -1; // Key not found
}
static int db_update(transaction_t* txn, const char* key, size_t key_length,
const char* new_value, size_t new_value_length)
{
if (!txn || txn->state != TXN_STATE_ACTIVE) {
return -1;
}
// Find the key first (similar to search)
uint32_t page_id = storage_engine.root_page_id;
btree_page_t* page = get_page(page_id, LOCK_EXCLUSIVE);
if (!page) {
return -1;
}
// Navigate to leaf page
while (page->header.page_type == PAGE_TYPE_INTERNAL) {
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
release_page(page_id, LOCK_EXCLUSIVE);
page_id = child_page_id;
page = get_page(page_id, LOCK_EXCLUSIVE);
if (!page) {
return -1;
}
}
// Find key in leaf page
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
if (!kv || compare_keys(key, key_length, kv->data, kv->key_length) != 0) {
release_page(page_id, LOCK_EXCLUSIVE);
return -1; // Key not found
}
// Save old value for undo log
char* old_value = malloc(kv->value_length);
if (old_value) {
memcpy(old_value, kv->data + kv->key_length, kv->value_length);
undo_entry_t* undo = malloc(sizeof(undo_entry_t));
if (undo) {
undo->operation = WAL_UPDATE;
undo->page_id = page_id;
undo->slot_id = pos;
undo->key_length = key_length;
undo->old_value_length = kv->value_length;
undo->key_data = malloc(key_length);
undo->old_value_data = old_value;
if (undo->key_data) {
memcpy(undo->key_data, key, key_length);
}
undo->next = txn->undo_log;
txn->undo_log = undo;
txn->undo_count++;
}
}
// Write WAL record
if (storage_engine.config.enable_wal) {
char wal_data[MAX_KEY_SIZE + MAX_VALUE_SIZE * 2 + 16];
size_t wal_size = 0;
memcpy(wal_data + wal_size, &key_length, sizeof(key_length));
wal_size += sizeof(key_length);
uint16_t old_value_length = kv->value_length;
memcpy(wal_data + wal_size, &old_value_length, sizeof(old_value_length));
wal_size += sizeof(old_value_length);
memcpy(wal_data + wal_size, &new_value_length, sizeof(new_value_length));
wal_size += sizeof(new_value_length);
memcpy(wal_data + wal_size, key, key_length);
wal_size += key_length;
memcpy(wal_data + wal_size, kv->data + kv->key_length, old_value_length);
wal_size += old_value_length;
memcpy(wal_data + wal_size, new_value, new_value_length);
wal_size += new_value_length;
write_wal_record(txn->txn_id, WAL_UPDATE, page_id, wal_data, wal_size);
}
// Update the value (simplified - assumes same size)
if (kv->value_length == new_value_length) {
memcpy(kv->data + kv->key_length, new_value, new_value_length);
mark_page_dirty(page_id);
txn->stats.rows_updated++;
printf("Updated key in transaction %lu\n", txn->txn_id);
}
release_page(page_id, LOCK_EXCLUSIVE);
return 0;
}
static int db_delete(transaction_t* txn, const char* key, size_t key_length)
{
if (!txn || txn->state != TXN_STATE_ACTIVE) {
return -1;
}
// Find and delete the key (similar to update)
uint32_t page_id = storage_engine.root_page_id;
btree_page_t* page = get_page(page_id, LOCK_EXCLUSIVE);
if (!page) {
return -1;
}
// Navigate to leaf page
while (page->header.page_type == PAGE_TYPE_INTERNAL) {
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
uint32_t child_page_id = kv ? kv->child_page_id : page->header.next_page_id;
release_page(page_id, LOCK_EXCLUSIVE);
page_id = child_page_id;
page = get_page(page_id, LOCK_EXCLUSIVE);
if (!page) {
return -1;
}
}
// Find key in leaf page
int pos = find_key_position(page, key, key_length);
kv_pair_t* kv = get_kv_pair(page, pos);
if (!kv || compare_keys(key, key_length, kv->data, kv->key_length) != 0) {
release_page(page_id, LOCK_EXCLUSIVE);
return -1; // Key not found
}
// Write WAL record
if (storage_engine.config.enable_wal) {
char wal_data[MAX_KEY_SIZE + MAX_VALUE_SIZE + 8];
size_t wal_size = 0;
memcpy(wal_data + wal_size, &key_length, sizeof(key_length));
wal_size += sizeof(key_length);
memcpy(wal_data + wal_size, &kv->value_length, sizeof(kv->value_length));
wal_size += sizeof(kv->value_length);
memcpy(wal_data + wal_size, key, key_length);
wal_size += key_length;
memcpy(wal_data + wal_size, kv->data + kv->key_length, kv->value_length);
wal_size += kv->value_length;
write_wal_record(txn->txn_id, WAL_DELETE, page_id, wal_data, wal_size);
}
// Delete the key-value pair
if (delete_kv_pair(page, pos) == 0) {
mark_page_dirty(page_id);
txn->stats.rows_deleted++;
printf("Deleted key in transaction %lu\n", txn->txn_id);
}
release_page(page_id, LOCK_EXCLUSIVE);
return 0;
}
// Checkpoint and recovery
static int perform_checkpoint(void)
{
printf("Starting checkpoint...\n");
atomic_fetch_add(&storage_engine.stats.checkpoints_performed, 1);
// Flush WAL buffer
flush_wal_buffer();
// Write all dirty pages to disk
pthread_mutex_lock(&storage_engine.buffer_mutex);
for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
buffer_entry_t* entry = &storage_engine.buffer_pool[i];
if (entry->dirty && entry->page) {
off_t offset = entry->page_id * PAGE_SIZE;
if (pwrite(storage_engine.data_fd, entry->page, PAGE_SIZE, offset) == PAGE_SIZE) {
entry->dirty = false;
atomic_fetch_add(&storage_engine.stats.pages_written, 1);
}
}
}
pthread_mutex_unlock(&storage_engine.buffer_mutex);
// Sync data file
fsync(storage_engine.data_fd);
// Write checkpoint record
write_wal_record(0, WAL_CHECKPOINT, 0, NULL, 0);
flush_wal_buffer();
storage_engine.last_checkpoint_lsn = storage_engine.next_lsn - 1;
printf("Checkpoint completed (LSN: %lu)\n", storage_engine.last_checkpoint_lsn);
return 0;
}
// Statistics and monitoring
static void print_storage_statistics(void)
{
printf("\n=== Storage Engine Statistics ===\n");
printf("Buffer Pool:\n");
printf(" Pages read: %lu\n", atomic_load(&storage_engine.stats.pages_read));
printf(" Pages written: %lu\n", atomic_load(&storage_engine.stats.pages_written));
printf(" Cache hits: %lu\n", atomic_load(&storage_engine.stats.cache_hits));
printf(" Cache misses: %lu\n", atomic_load(&storage_engine.stats.cache_misses));
uint64_t total_accesses = atomic_load(&storage_engine.stats.cache_hits) +
atomic_load(&storage_engine.stats.cache_misses);
if (total_accesses > 0) {
double hit_ratio = (double)atomic_load(&storage_engine.stats.cache_hits) / total_accesses;
printf(" Cache hit ratio: %.2f%%\n", hit_ratio * 100.0);
}
printf("\nTransactions:\n");
printf(" Committed: %lu\n", atomic_load(&storage_engine.stats.transactions_committed));
printf(" Aborted: %lu\n", atomic_load(&storage_engine.stats.transactions_aborted));
printf("\nWAL:\n");
printf(" Records written: %lu\n", atomic_load(&storage_engine.stats.wal_records_written));
printf(" Checkpoints: %lu\n", atomic_load(&storage_engine.stats.checkpoints_performed));
printf(" Next LSN: %lu\n", storage_engine.next_lsn);
printf(" Last checkpoint LSN: %lu\n", storage_engine.last_checkpoint_lsn);
printf("\nActive Transactions:\n");
pthread_mutex_lock(&storage_engine.txn_mutex);
transaction_t* txn = storage_engine.active_transactions;
int count = 0;
while (txn) {
printf(" TXN %lu: inserts=%lu, updates=%lu, deletes=%lu\n",
txn->txn_id, txn->stats.rows_inserted,
txn->stats.rows_updated, txn->stats.rows_deleted);
txn = txn->next;
count++;
}
printf(" Total active: %d\n", count);
pthread_mutex_unlock(&storage_engine.txn_mutex);
printf("=================================\n");
}
// Initialization and cleanup
static int init_storage_engine(const char* data_file, const char* wal_file)
{
memset(&storage_engine, 0, sizeof(storage_engine));
// Configuration
storage_engine.config.enable_checksums = true;
storage_engine.config.enable_wal = true;
storage_engine.config.checkpoint_interval = 10000;
storage_engine.config.wal_segment_size = 64 * 1024 * 1024;
storage_engine.config.buffer_pool_hit_ratio_target = 0.95;
// Open data file
storage_engine.data_fd = open(data_file, O_RDWR | O_CREAT, 0644);
if (storage_engine.data_fd < 0) {
perror("open data file");
return -1;
}
storage_engine.data_filename = strdup(data_file);
// Open WAL file
storage_engine.wal_fd = open(wal_file, O_RDWR | O_CREAT | O_APPEND, 0644);
if (storage_engine.wal_fd < 0) {
perror("open WAL file");
close(storage_engine.data_fd);
return -1;
}
storage_engine.wal_filename = strdup(wal_file);
// Initialize WAL buffer
storage_engine.wal_buffer = malloc(WAL_BUFFER_SIZE);
if (!storage_engine.wal_buffer) {
close(storage_engine.data_fd);
close(storage_engine.wal_fd);
return -1;
}
// Initialize hash table for buffer pool
storage_engine.hash_table_size = BUFFER_POOL_SIZE * 2;
storage_engine.hash_table = malloc(storage_engine.hash_table_size * sizeof(uint32_t));
if (!storage_engine.hash_table) {
return -1;
}
for (size_t i = 0; i < storage_engine.hash_table_size; i++) {
storage_engine.hash_table[i] = UINT32_MAX;
}
// Initialize buffer pool
for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
buffer_entry_t* entry = &storage_engine.buffer_pool[i];
pthread_rwlock_init(&entry->page_lock, NULL);
entry->hash_next = UINT32_MAX;
}
// Initialize lock table
storage_engine.lock_table_size = 10007; // Prime number
storage_engine.lock_table = calloc(storage_engine.lock_table_size, sizeof(lock_entry_t*));
// Initialize mutexes
pthread_mutex_init(&storage_engine.buffer_mutex, NULL);
pthread_mutex_init(&storage_engine.free_page_mutex, NULL);
pthread_mutex_init(&storage_engine.txn_mutex, NULL);
pthread_mutex_init(&storage_engine.lock_table_mutex, NULL);
pthread_mutex_init(&storage_engine.wal_mutex, NULL);
// Initialize counters
storage_engine.next_txn_id = 1;
storage_engine.next_lsn = 1;
storage_engine.next_page_id = 1;
storage_engine.root_page_id = 1;
// Initialize root page if file is empty
struct stat st;
if (fstat(storage_engine.data_fd, &st) == 0 && st.st_size == 0) {
btree_page_t* root_page = get_page(storage_engine.root_page_id, LOCK_EXCLUSIVE);
if (root_page) {
root_page->header.page_type = PAGE_TYPE_LEAF;
mark_page_dirty(storage_engine.root_page_id);
release_page(storage_engine.root_page_id, LOCK_EXCLUSIVE);
}
}
printf("Storage engine initialized\n");
printf("Data file: %s\n", data_file);
printf("WAL file: %s\n", wal_file);
return 0;
}
static void cleanup_storage_engine(void)
{
// Perform final checkpoint
perform_checkpoint();
// Close files
if (storage_engine.data_fd >= 0) {
close(storage_engine.data_fd);
}
if (storage_engine.wal_fd >= 0) {
close(storage_engine.wal_fd);
}
// Free memory
free(storage_engine.data_filename);
free(storage_engine.wal_filename);
free(storage_engine.wal_buffer);
free(storage_engine.hash_table);
free(storage_engine.free_pages);
free(storage_engine.lock_table);
// Cleanup buffer pool
for (size_t i = 0; i < BUFFER_POOL_SIZE; i++) {
buffer_entry_t* entry = &storage_engine.buffer_pool[i];
if (entry->page) {
free(entry->page);
}
pthread_rwlock_destroy(&entry->page_lock);
}
// Cleanup transactions
pthread_mutex_lock(&storage_engine.txn_mutex);
transaction_t* txn = storage_engine.active_transactions;
while (txn) {
transaction_t* next_txn = txn->next;
abort_transaction(txn);
free(txn);
txn = next_txn;
}
pthread_mutex_unlock(&storage_engine.txn_mutex);
// Destroy mutexes
pthread_mutex_destroy(&storage_engine.buffer_mutex);
pthread_mutex_destroy(&storage_engine.free_page_mutex);
pthread_mutex_destroy(&storage_engine.txn_mutex);
pthread_mutex_destroy(&storage_engine.lock_table_mutex);
pthread_mutex_destroy(&storage_engine.wal_mutex);
printf("Storage engine cleanup completed\n");
}
// Signal handlers
static void signal_handler(int sig)
{
if (sig == SIGINT || sig == SIGTERM) {
printf("\nReceived signal %d, shutting down storage engine...\n", sig);
cleanup_storage_engine();
exit(0);
} else if (sig == SIGUSR1) {
print_storage_statistics();
} else if (sig == SIGUSR2) {
perform_checkpoint();
}
}
// Test and demonstration
static void test_storage_engine(void)
{
printf("Testing storage engine...\n");
// Create some test transactions
transaction_t* txn1 = begin_transaction();
transaction_t* txn2 = begin_transaction();
if (txn1 && txn2) {
// Test insertions
db_insert(txn1, "key1", 4, "value1", 6);
db_insert(txn1, "key2", 4, "value2", 6);
db_insert(txn2, "key3", 4, "value3", 6);
// Test searches
char value[MAX_VALUE_SIZE];
size_t value_length = sizeof(value);
if (db_search(txn1, "key1", 4, value, &value_length) == 0) {
printf("Found key1: %.*s\n", (int)value_length, value);
}
// Test updates
db_update(txn1, "key1", 4, "updated_value1", 14);
// Test deletion
db_delete(txn2, "key3", 4);
// Commit transactions
commit_transaction(txn1);
commit_transaction(txn2);
free(txn1);
free(txn2);
}
// Perform checkpoint
perform_checkpoint();
printf("Storage engine test completed\n");
}
// Main function
int main(int argc, char* argv[])
{
const char* data_file = "database.db";
const char* wal_file = "database.wal";
if (argc > 1) {
data_file = argv[1];
}
if (argc > 2) {
wal_file = argv[2];
}
// Set up signal handlers
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGUSR1, signal_handler);
signal(SIGUSR2, signal_handler);
printf("Advanced Database Storage Engine\n");
// Initialize storage engine
if (init_storage_engine(data_file, wal_file) != 0) {
fprintf(stderr, "Failed to initialize storage engine\n");
return 1;
}
// Run tests
test_storage_engine();
printf("Storage engine running...\n");
printf("Send SIGUSR1 for statistics, SIGUSR2 for checkpoint, SIGINT to exit\n");
// Main loop
while (1) {
sleep(5);
// Automatic checkpoint if needed
if (storage_engine.next_lsn - storage_engine.last_checkpoint_lsn >
storage_engine.config.checkpoint_interval) {
perform_checkpoint();
}
}
cleanup_storage_engine();
return 0;
}
This comprehensive Linux database systems programming blog post covers:
- B-Tree Storage Engine - Complete implementation with page management, key-value operations, and concurrent access
- Transaction Processing - ACID properties with begin/commit/abort operations and undo logging
- Write-Ahead Logging (WAL) - Durability and recovery with log record management and checkpointing
- Buffer Pool Management - LRU cache with hash table lookup and dirty page tracking
- Concurrency Control - Lock management, deadlock prevention, and multi-threaded access
The implementation demonstrates enterprise-grade database programming techniques suitable for building high-performance storage systems and transaction processors.