Advanced Concurrency and Parallel Programming: Mastering Multi-Threading and Synchronization
Modern computing demands sophisticated concurrency and parallel programming techniques to harness multi-core processors and distributed systems. This comprehensive guide explores advanced threading models, synchronization primitives, lock-free algorithms, and parallel processing frameworks for building high-performance concurrent applications.
Advanced Concurrency and Parallel Programming
Advanced Threading Models and Thread Pools
High-Performance Thread Pool Implementation
// thread_pool.c - Advanced thread pool implementation
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <errno.h>
#include <sys/time.h>
#include <sched.h>
// Task structure
typedef struct task {
void (*function)(void *arg);
void *argument;
struct task *next;
int priority;
struct timeval submit_time;
} task_t;
// Task queue with priority support
typedef struct {
task_t **queues; // Array of priority queues
int num_priorities;
pthread_mutex_t mutex;
pthread_cond_t condition;
atomic_int size;
atomic_int total_tasks;
} task_queue_t;
// Worker thread statistics
typedef struct {
atomic_long tasks_executed;
atomic_long total_execution_time_ns;
atomic_long idle_time_ns;
struct timespec last_task_end;
int cpu_affinity;
} worker_stats_t;
// Thread pool structure
typedef struct {
pthread_t *threads;
worker_stats_t *worker_stats;
int num_threads;
task_queue_t task_queue;
atomic_bool shutdown;
atomic_bool immediate_shutdown;
// Work stealing support
task_queue_t *local_queues;
atomic_int *queue_locks;
// Performance monitoring
atomic_long total_tasks_submitted;
atomic_long total_tasks_completed;
struct timespec start_time;
// Dynamic resizing
pthread_mutex_t resize_mutex;
int min_threads;
int max_threads;
atomic_int active_threads;
// Load balancing
atomic_int round_robin_index;
} thread_pool_t;
// Initialize task queue
int task_queue_init(task_queue_t *queue, int num_priorities) {
queue->queues = calloc(num_priorities, sizeof(task_t*));
if (!queue->queues) {
return -1;
}
queue->num_priorities = num_priorities;
atomic_init(&queue->size, 0);
atomic_init(&queue->total_tasks, 0);
if (pthread_mutex_init(&queue->mutex, NULL) != 0) {
free(queue->queues);
return -1;
}
if (pthread_cond_init(&queue->condition, NULL) != 0) {
pthread_mutex_destroy(&queue->mutex);
free(queue->queues);
return -1;
}
return 0;
}
// Add task to priority queue
int task_queue_push(task_queue_t *queue, task_t *task) {
if (task->priority < 0 || task->priority >= queue->num_priorities) {
return -1;
}
pthread_mutex_lock(&queue->mutex);
// Insert at head of priority queue
task->next = queue->queues[task->priority];
queue->queues[task->priority] = task;
atomic_fetch_add(&queue->size, 1);
atomic_fetch_add(&queue->total_tasks, 1);
pthread_cond_signal(&queue->condition);
pthread_mutex_unlock(&queue->mutex);
return 0;
}
// Pop task from highest priority queue
task_t* task_queue_pop(task_queue_t *queue) {
pthread_mutex_lock(&queue->mutex);
while (atomic_load(&queue->size) == 0) {
pthread_cond_wait(&queue->condition, &queue->mutex);
}
task_t *task = NULL;
// Find highest priority non-empty queue
for (int i = queue->num_priorities - 1; i >= 0; i--) {
if (queue->queues[i]) {
task = queue->queues[i];
queue->queues[i] = task->next;
break;
}
}
if (task) {
atomic_fetch_sub(&queue->size, 1);
}
pthread_mutex_unlock(&queue->mutex);
return task;
}
// Try to pop task without blocking
task_t* task_queue_try_pop(task_queue_t *queue) {
if (pthread_mutex_trylock(&queue->mutex) != 0) {
return NULL;
}
task_t *task = NULL;
if (atomic_load(&queue->size) > 0) {
// Find highest priority non-empty queue
for (int i = queue->num_priorities - 1; i >= 0; i--) {
if (queue->queues[i]) {
task = queue->queues[i];
queue->queues[i] = task->next;
atomic_fetch_sub(&queue->size, 1);
break;
}
}
}
pthread_mutex_unlock(&queue->mutex);
return task;
}
// Work stealing implementation
task_t* steal_task(thread_pool_t *pool, int worker_id) {
int num_workers = atomic_load(&pool->active_threads);
// Try to steal from other workers' local queues
for (int i = 1; i < num_workers; i++) {
int target = (worker_id + i) % num_workers;
// Try to acquire lock on target queue
int expected = 0;
if (atomic_compare_exchange_weak(&pool->queue_locks[target], &expected, 1)) {
task_t *stolen_task = task_queue_try_pop(&pool->local_queues[target]);
atomic_store(&pool->queue_locks[target], 0);
if (stolen_task) {
return stolen_task;
}
}
}
return NULL;
}
// Worker thread function
void* worker_thread(void *arg) {
thread_pool_t *pool = (thread_pool_t*)arg;
int worker_id = atomic_fetch_add(&pool->round_robin_index, 1) % pool->max_threads;
// Set CPU affinity if specified
if (pool->worker_stats[worker_id].cpu_affinity >= 0) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(pool->worker_stats[worker_id].cpu_affinity, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}
struct timespec idle_start, idle_end, task_start, task_end;
while (!atomic_load(&pool->shutdown)) {
task_t *task = NULL;
clock_gettime(CLOCK_MONOTONIC, &idle_start);
// Try local queue first (work stealing)
if (pool->local_queues) {
task = task_queue_try_pop(&pool->local_queues[worker_id]);
}
// Try global queue
if (!task) {
task = task_queue_pop(&pool->task_queue);
}
// Try work stealing
if (!task && pool->local_queues) {
task = steal_task(pool, worker_id);
}
if (!task) {
if (atomic_load(&pool->immediate_shutdown)) {
break;
}
continue;
}
clock_gettime(CLOCK_MONOTONIC, &idle_end);
// Update idle time statistics
long idle_ns = (idle_end.tv_sec - idle_start.tv_sec) * 1000000000L +
(idle_end.tv_nsec - idle_start.tv_nsec);
atomic_fetch_add(&pool->worker_stats[worker_id].idle_time_ns, idle_ns);
// Execute task
clock_gettime(CLOCK_MONOTONIC, &task_start);
task->function(task->argument);
clock_gettime(CLOCK_MONOTONIC, &task_end);
// Update execution statistics
long exec_ns = (task_end.tv_sec - task_start.tv_sec) * 1000000000L +
(task_end.tv_nsec - task_start.tv_nsec);
atomic_fetch_add(&pool->worker_stats[worker_id].tasks_executed, 1);
atomic_fetch_add(&pool->worker_stats[worker_id].total_execution_time_ns, exec_ns);
atomic_fetch_add(&pool->total_tasks_completed, 1);
pool->worker_stats[worker_id].last_task_end = task_end;
free(task);
}
return NULL;
}
// Create thread pool
thread_pool_t* thread_pool_create(int num_threads, int min_threads, int max_threads,
bool enable_work_stealing, int num_priorities) {
thread_pool_t *pool = calloc(1, sizeof(thread_pool_t));
if (!pool) {
return NULL;
}
pool->num_threads = num_threads;
pool->min_threads = min_threads;
pool->max_threads = max_threads;
atomic_init(&pool->active_threads, num_threads);
atomic_init(&pool->shutdown, false);
atomic_init(&pool->immediate_shutdown, false);
atomic_init(&pool->total_tasks_submitted, 0);
atomic_init(&pool->total_tasks_completed, 0);
atomic_init(&pool->round_robin_index, 0);
clock_gettime(CLOCK_MONOTONIC, &pool->start_time);
// Initialize main task queue
if (task_queue_init(&pool->task_queue, num_priorities) != 0) {
free(pool);
return NULL;
}
// Initialize work-stealing queues
if (enable_work_stealing) {
pool->local_queues = calloc(max_threads, sizeof(task_queue_t));
pool->queue_locks = calloc(max_threads, sizeof(atomic_int));
if (!pool->local_queues || !pool->queue_locks) {
free(pool->local_queues);
free(pool->queue_locks);
free(pool);
return NULL;
}
for (int i = 0; i < max_threads; i++) {
task_queue_init(&pool->local_queues[i], num_priorities);
atomic_init(&pool->queue_locks[i], 0);
}
}
// Allocate threads and statistics
pool->threads = calloc(max_threads, sizeof(pthread_t));
pool->worker_stats = calloc(max_threads, sizeof(worker_stats_t));
if (!pool->threads || !pool->worker_stats) {
free(pool->threads);
free(pool->worker_stats);
free(pool);
return NULL;
}
// Initialize worker statistics
for (int i = 0; i < max_threads; i++) {
atomic_init(&pool->worker_stats[i].tasks_executed, 0);
atomic_init(&pool->worker_stats[i].total_execution_time_ns, 0);
atomic_init(&pool->worker_stats[i].idle_time_ns, 0);
pool->worker_stats[i].cpu_affinity = -1; // No affinity by default
}
pthread_mutex_init(&pool->resize_mutex, NULL);
// Create worker threads
for (int i = 0; i < num_threads; i++) {
if (pthread_create(&pool->threads[i], NULL, worker_thread, pool) != 0) {
thread_pool_destroy(pool);
return NULL;
}
}
return pool;
}
// Submit task to thread pool
int thread_pool_submit(thread_pool_t *pool, void (*function)(void*),
void *argument, int priority) {
if (atomic_load(&pool->shutdown)) {
return -1;
}
task_t *task = malloc(sizeof(task_t));
if (!task) {
return -1;
}
task->function = function;
task->argument = argument;
task->priority = priority;
task->next = NULL;
gettimeofday(&task->submit_time, NULL);
// Load balancing: distribute tasks among local queues
if (pool->local_queues) {
int target_queue = atomic_fetch_add(&pool->round_robin_index, 1) %
atomic_load(&pool->active_threads);
if (task_queue_push(&pool->local_queues[target_queue], task) == 0) {
atomic_fetch_add(&pool->total_tasks_submitted, 1);
return 0;
}
}
// Fallback to global queue
if (task_queue_push(&pool->task_queue, task) == 0) {
atomic_fetch_add(&pool->total_tasks_submitted, 1);
return 0;
}
free(task);
return -1;
}
// Set CPU affinity for worker thread
int thread_pool_set_affinity(thread_pool_t *pool, int worker_id, int cpu_id) {
if (worker_id < 0 || worker_id >= pool->max_threads) {
return -1;
}
pool->worker_stats[worker_id].cpu_affinity = cpu_id;
// Apply immediately if thread is running
if (worker_id < pool->num_threads) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);
return pthread_setaffinity_np(pool->threads[worker_id], sizeof(cpu_set_t), &cpuset);
}
return 0;
}
// Get thread pool statistics
void thread_pool_stats(thread_pool_t *pool) {
struct timespec current_time;
clock_gettime(CLOCK_MONOTONIC, ¤t_time);
long uptime_ns = (current_time.tv_sec - pool->start_time.tv_sec) * 1000000000L +
(current_time.tv_nsec - pool->start_time.tv_nsec);
printf("=== Thread Pool Statistics ===\n");
printf("Uptime: %.3f seconds\n", uptime_ns / 1e9);
printf("Active threads: %d\n", atomic_load(&pool->active_threads));
printf("Tasks submitted: %ld\n", atomic_load(&pool->total_tasks_submitted));
printf("Tasks completed: %ld\n", atomic_load(&pool->total_tasks_completed));
printf("Tasks pending: %d\n", atomic_load(&pool->task_queue.size));
long total_tasks_executed = 0;
long total_execution_time = 0;
long total_idle_time = 0;
printf("\nPer-worker statistics:\n");
for (int i = 0; i < pool->num_threads; i++) {
long tasks = atomic_load(&pool->worker_stats[i].tasks_executed);
long exec_time = atomic_load(&pool->worker_stats[i].total_execution_time_ns);
long idle_time = atomic_load(&pool->worker_stats[i].idle_time_ns);
total_tasks_executed += tasks;
total_execution_time += exec_time;
total_idle_time += idle_time;
printf(" Worker %d: %ld tasks, %.3f ms avg exec, %.1f%% idle\n",
i, tasks,
tasks > 0 ? (exec_time / 1e6) / tasks : 0,
uptime_ns > 0 ? (idle_time * 100.0) / uptime_ns : 0);
}
printf("\nOverall performance:\n");
printf(" Total tasks executed: %ld\n", total_tasks_executed);
printf(" Average execution time: %.3f ms\n",
total_tasks_executed > 0 ? (total_execution_time / 1e6) / total_tasks_executed : 0);
printf(" Throughput: %.1f tasks/second\n",
uptime_ns > 0 ? (total_tasks_executed * 1e9) / uptime_ns : 0);
}
// Dynamic thread pool resizing
int thread_pool_resize(thread_pool_t *pool, int new_size) {
if (new_size < pool->min_threads || new_size > pool->max_threads) {
return -1;
}
pthread_mutex_lock(&pool->resize_mutex);
int current_size = atomic_load(&pool->active_threads);
if (new_size > current_size) {
// Add threads
for (int i = current_size; i < new_size; i++) {
if (pthread_create(&pool->threads[i], NULL, worker_thread, pool) != 0) {
pthread_mutex_unlock(&pool->resize_mutex);
return -1;
}
}
atomic_store(&pool->active_threads, new_size);
} else if (new_size < current_size) {
// Remove threads (they will exit naturally when checking shutdown flag)
atomic_store(&pool->active_threads, new_size);
// Join excess threads
for (int i = new_size; i < current_size; i++) {
pthread_join(pool->threads[i], NULL);
}
}
pool->num_threads = new_size;
pthread_mutex_unlock(&pool->resize_mutex);
return 0;
}
// Destroy thread pool
void thread_pool_destroy(thread_pool_t *pool) {
if (!pool) return;
// Signal shutdown
atomic_store(&pool->shutdown, true);
// Wake up all threads
pthread_cond_broadcast(&pool->task_queue.condition);
// Wait for threads to finish
for (int i = 0; i < pool->num_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
// Cleanup
pthread_mutex_destroy(&pool->task_queue.mutex);
pthread_cond_destroy(&pool->task_queue.condition);
pthread_mutex_destroy(&pool->resize_mutex);
// Free local queues
if (pool->local_queues) {
for (int i = 0; i < pool->max_threads; i++) {
pthread_mutex_destroy(&pool->local_queues[i].mutex);
pthread_cond_destroy(&pool->local_queues[i].condition);
free(pool->local_queues[i].queues);
}
free(pool->local_queues);
free(pool->queue_locks);
}
free(pool->task_queue.queues);
free(pool->threads);
free(pool->worker_stats);
free(pool);
}
// Example task functions
void cpu_intensive_task(void *arg) {
int iterations = *(int*)arg;
volatile double result = 0.0;
for (int i = 0; i < iterations; i++) {
result += i * 3.14159;
}
printf("CPU task completed: %d iterations, result: %f\n", iterations, result);
}
void io_simulation_task(void *arg) {
int delay_ms = *(int*)arg;
printf("IO simulation starting (%d ms delay)\n", delay_ms);
usleep(delay_ms * 1000);
printf("IO simulation completed\n");
}
// Thread pool demo
int thread_pool_demo(void) {
printf("=== Thread Pool Demo ===\n");
// Create thread pool with work stealing
thread_pool_t *pool = thread_pool_create(4, 2, 8, true, 3);
if (!pool) {
printf("Failed to create thread pool\n");
return -1;
}
// Set CPU affinity for workers
for (int i = 0; i < 4; i++) {
thread_pool_set_affinity(pool, i, i % 4);
}
printf("Created thread pool with 4 workers\n");
// Submit various tasks
int cpu_work[] = {1000000, 2000000, 500000, 1500000, 3000000};
int io_work[] = {100, 200, 50, 150, 300};
// Submit CPU-intensive tasks with different priorities
for (int i = 0; i < 5; i++) {
thread_pool_submit(pool, cpu_intensive_task, &cpu_work[i], 2); // High priority
}
// Submit I/O simulation tasks
for (int i = 0; i < 5; i++) {
thread_pool_submit(pool, io_simulation_task, &io_work[i], 1); // Medium priority
}
// Wait for some tasks to complete
sleep(2);
// Show statistics
thread_pool_stats(pool);
// Resize thread pool
printf("\nResizing thread pool to 6 workers...\n");
thread_pool_resize(pool, 6);
// Submit more tasks
for (int i = 0; i < 3; i++) {
thread_pool_submit(pool, cpu_intensive_task, &cpu_work[i], 1);
}
sleep(2);
// Final statistics
printf("\nFinal statistics:\n");
thread_pool_stats(pool);
// Cleanup
thread_pool_destroy(pool);
return 0;
}
int main(void) {
return thread_pool_demo();
}
Lock-Free Data Structures and Algorithms
Advanced Lock-Free Implementations
// lockfree_advanced.c - Advanced lock-free data structures
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include <assert.h>
// Hazard pointer system for memory management
#define MAX_THREADS 64
#define MAX_HAZARD_POINTERS 8
typedef struct hazard_pointer {
_Atomic(void*) pointer;
atomic_bool active;
} hazard_pointer_t;
typedef struct hazard_pointer_record {
hazard_pointer_t hazards[MAX_HAZARD_POINTERS];
atomic_bool active;
pthread_t thread_id;
} hazard_pointer_record_t;
static hazard_pointer_record_t hazard_pointer_table[MAX_THREADS];
static _Atomic(hazard_pointer_record_t*) hazard_pointer_head = NULL;
// Thread-local hazard pointer record
static __thread hazard_pointer_record_t* local_hazard_record = NULL;
// Get hazard pointer record for current thread
hazard_pointer_record_t* get_hazard_pointer_record(void) {
if (local_hazard_record) {
return local_hazard_record;
}
// Find existing record or create new one
for (int i = 0; i < MAX_THREADS; i++) {
if (!atomic_load(&hazard_pointer_table[i].active)) {
bool expected = false;
if (atomic_compare_exchange_strong(&hazard_pointer_table[i].active,
&expected, true)) {
hazard_pointer_table[i].thread_id = pthread_self();
local_hazard_record = &hazard_pointer_table[i];
// Initialize hazard pointers
for (int j = 0; j < MAX_HAZARD_POINTERS; j++) {
atomic_store(&hazard_pointer_table[i].hazards[j].pointer, NULL);
atomic_store(&hazard_pointer_table[i].hazards[j].active, false);
}
return local_hazard_record;
}
}
}
return NULL; // No available slots
}
// Set hazard pointer
void set_hazard_pointer(int index, void *pointer) {
hazard_pointer_record_t *record = get_hazard_pointer_record();
if (record && index < MAX_HAZARD_POINTERS) {
atomic_store(&record->hazards[index].pointer, pointer);
atomic_store(&record->hazards[index].active, true);
}
}
// Clear hazard pointer
void clear_hazard_pointer(int index) {
hazard_pointer_record_t *record = get_hazard_pointer_record();
if (record && index < MAX_HAZARD_POINTERS) {
atomic_store(&record->hazards[index].active, false);
atomic_store(&record->hazards[index].pointer, NULL);
}
}
// Check if pointer is protected by any hazard pointer
bool is_hazard_pointer(void *pointer) {
for (int i = 0; i < MAX_THREADS; i++) {
if (atomic_load(&hazard_pointer_table[i].active)) {
for (int j = 0; j < MAX_HAZARD_POINTERS; j++) {
if (atomic_load(&hazard_pointer_table[i].hazards[j].active) &&
atomic_load(&hazard_pointer_table[i].hazards[j].pointer) == pointer) {
return true;
}
}
}
}
return false;
}
// Lock-free queue with hazard pointers
typedef struct queue_node {
_Atomic(void*) data;
_Atomic(struct queue_node*) next;
} queue_node_t;
typedef struct {
_Atomic(queue_node_t*) head;
_Atomic(queue_node_t*) tail;
atomic_size_t size;
} lockfree_queue_t;
// Initialize lock-free queue
lockfree_queue_t* lockfree_queue_create(void) {
lockfree_queue_t *queue = malloc(sizeof(lockfree_queue_t));
if (!queue) return NULL;
queue_node_t *dummy = malloc(sizeof(queue_node_t));
if (!dummy) {
free(queue);
return NULL;
}
atomic_store(&dummy->data, NULL);
atomic_store(&dummy->next, NULL);
atomic_store(&queue->head, dummy);
atomic_store(&queue->tail, dummy);
atomic_store(&queue->size, 0);
return queue;
}
// Enqueue operation
bool lockfree_queue_enqueue(lockfree_queue_t *queue, void *data) {
queue_node_t *new_node = malloc(sizeof(queue_node_t));
if (!new_node) return false;
atomic_store(&new_node->data, data);
atomic_store(&new_node->next, NULL);
while (true) {
queue_node_t *tail = atomic_load(&queue->tail);
set_hazard_pointer(0, tail);
// Verify tail is still valid
if (tail != atomic_load(&queue->tail)) {
continue;
}
queue_node_t *next = atomic_load(&tail->next);
if (tail == atomic_load(&queue->tail)) {
if (next == NULL) {
// Try to link new node at end of list
if (atomic_compare_exchange_weak(&tail->next, &next, new_node)) {
// Try to swing tail to new node
atomic_compare_exchange_weak(&queue->tail, &tail, new_node);
atomic_fetch_add(&queue->size, 1);
clear_hazard_pointer(0);
return true;
}
} else {
// Try to swing tail to next node
atomic_compare_exchange_weak(&queue->tail, &tail, next);
}
}
}
}
// Dequeue operation
bool lockfree_queue_dequeue(lockfree_queue_t *queue, void **data) {
while (true) {
queue_node_t *head = atomic_load(&queue->head);
set_hazard_pointer(0, head);
// Verify head is still valid
if (head != atomic_load(&queue->head)) {
continue;
}
queue_node_t *tail = atomic_load(&queue->tail);
queue_node_t *next = atomic_load(&head->next);
set_hazard_pointer(1, next);
if (head == atomic_load(&queue->head)) {
if (head == tail) {
if (next == NULL) {
// Queue is empty
clear_hazard_pointer(0);
clear_hazard_pointer(1);
return false;
}
// Try to swing tail to next node
atomic_compare_exchange_weak(&queue->tail, &tail, next);
} else {
if (next == NULL) {
continue;
}
// Read data before CAS
*data = atomic_load(&next->data);
// Try to swing head to next node
if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
atomic_fetch_sub(&queue->size, 1);
// Free old head node (with hazard pointer protection)
if (!is_hazard_pointer(head)) {
free(head);
}
clear_hazard_pointer(0);
clear_hazard_pointer(1);
return true;
}
}
}
}
}
// Lock-free hash table
#define HASH_TABLE_SIZE 1024
#define HASH_LOAD_FACTOR 0.75
typedef struct hash_node {
atomic_uintptr_t key;
_Atomic(void*) value;
_Atomic(struct hash_node*) next;
atomic_bool deleted;
} hash_node_t;
typedef struct {
_Atomic(hash_node_t*) buckets[HASH_TABLE_SIZE];
atomic_size_t size;
atomic_size_t capacity;
} lockfree_hashtable_t;
// Hash function
static size_t hash_function(uintptr_t key) {
key ^= key >> 16;
key *= 0x85ebca6b;
key ^= key >> 13;
key *= 0xc2b2ae35;
key ^= key >> 16;
return key % HASH_TABLE_SIZE;
}
// Create lock-free hash table
lockfree_hashtable_t* lockfree_hashtable_create(void) {
lockfree_hashtable_t *table = malloc(sizeof(lockfree_hashtable_t));
if (!table) return NULL;
for (int i = 0; i < HASH_TABLE_SIZE; i++) {
atomic_store(&table->buckets[i], NULL);
}
atomic_store(&table->size, 0);
atomic_store(&table->capacity, HASH_TABLE_SIZE);
return table;
}
// Insert key-value pair
bool lockfree_hashtable_insert(lockfree_hashtable_t *table, uintptr_t key, void *value) {
size_t bucket = hash_function(key);
hash_node_t *new_node = malloc(sizeof(hash_node_t));
if (!new_node) return false;
atomic_store(&new_node->key, key);
atomic_store(&new_node->value, value);
atomic_store(&new_node->deleted, false);
while (true) {
hash_node_t *head = atomic_load(&table->buckets[bucket]);
atomic_store(&new_node->next, head);
if (atomic_compare_exchange_weak(&table->buckets[bucket], &head, new_node)) {
atomic_fetch_add(&table->size, 1);
return true;
}
}
}
// Lookup value by key
bool lockfree_hashtable_lookup(lockfree_hashtable_t *table, uintptr_t key, void **value) {
size_t bucket = hash_function(key);
hash_node_t *current = atomic_load(&table->buckets[bucket]);
set_hazard_pointer(0, current);
while (current) {
// Verify node is still valid
if (current != atomic_load(&table->buckets[bucket])) {
current = atomic_load(&table->buckets[bucket]);
set_hazard_pointer(0, current);
continue;
}
if (!atomic_load(¤t->deleted) &&
atomic_load(¤t->key) == key) {
*value = atomic_load(¤t->value);
clear_hazard_pointer(0);
return true;
}
current = atomic_load(¤t->next);
set_hazard_pointer(0, current);
}
clear_hazard_pointer(0);
return false;
}
// Lock-free skip list
#define MAX_LEVEL 16
typedef struct skip_node {
atomic_long key;
_Atomic(void*) value;
atomic_int level;
_Atomic(struct skip_node*) forward[MAX_LEVEL];
atomic_bool deleted;
} skip_node_t;
typedef struct {
skip_node_t *header;
atomic_int max_level;
atomic_size_t size;
} lockfree_skiplist_t;
// Random level generation
static int random_level(void) {
int level = 1;
while ((rand() & 0x1) && level < MAX_LEVEL) {
level++;
}
return level;
}
// Create skip list
lockfree_skiplist_t* lockfree_skiplist_create(void) {
lockfree_skiplist_t *list = malloc(sizeof(lockfree_skiplist_t));
if (!list) return NULL;
list->header = malloc(sizeof(skip_node_t));
if (!list->header) {
free(list);
return NULL;
}
atomic_store(&list->header->key, LONG_MIN);
atomic_store(&list->header->value, NULL);
atomic_store(&list->header->level, MAX_LEVEL);
atomic_store(&list->header->deleted, false);
for (int i = 0; i < MAX_LEVEL; i++) {
atomic_store(&list->header->forward[i], NULL);
}
atomic_store(&list->max_level, 1);
atomic_store(&list->size, 0);
return list;
}
// Insert into skip list
bool lockfree_skiplist_insert(lockfree_skiplist_t *list, long key, void *value) {
skip_node_t *update[MAX_LEVEL];
skip_node_t *current = list->header;
// Find position to insert
for (int i = atomic_load(&list->max_level) - 1; i >= 0; i--) {
while (true) {
skip_node_t *next = atomic_load(¤t->forward[i]);
if (!next || atomic_load(&next->key) >= key) {
break;
}
current = next;
}
update[i] = current;
}
current = atomic_load(¤t->forward[0]);
// Check if key already exists
if (current && atomic_load(¤t->key) == key &&
!atomic_load(¤t->deleted)) {
return false;
}
// Create new node
int level = random_level();
skip_node_t *new_node = malloc(sizeof(skip_node_t));
if (!new_node) return false;
atomic_store(&new_node->key, key);
atomic_store(&new_node->value, value);
atomic_store(&new_node->level, level);
atomic_store(&new_node->deleted, false);
// Update max level if necessary
if (level > atomic_load(&list->max_level)) {
for (int i = atomic_load(&list->max_level); i < level; i++) {
update[i] = list->header;
}
atomic_store(&list->max_level, level);
}
// Link new node
for (int i = 0; i < level; i++) {
skip_node_t *next = atomic_load(&update[i]->forward[i]);
atomic_store(&new_node->forward[i], next);
if (!atomic_compare_exchange_weak(&update[i]->forward[i], &next, new_node)) {
// Retry on failure
free(new_node);
return lockfree_skiplist_insert(list, key, value);
}
}
atomic_fetch_add(&list->size, 1);
return true;
}
// Search in skip list
bool lockfree_skiplist_search(lockfree_skiplist_t *list, long key, void **value) {
skip_node_t *current = list->header;
for (int i = atomic_load(&list->max_level) - 1; i >= 0; i--) {
while (true) {
skip_node_t *next = atomic_load(¤t->forward[i]);
if (!next || atomic_load(&next->key) > key) {
break;
}
if (atomic_load(&next->key) == key && !atomic_load(&next->deleted)) {
*value = atomic_load(&next->value);
return true;
}
current = next;
}
}
return false;
}
// Performance testing
typedef struct {
void *data_structure;
int thread_id;
int operations;
int operation_type; // 0=insert, 1=lookup, 2=mixed
struct timespec start_time;
struct timespec end_time;
int successful_operations;
} test_thread_data_t;
void* queue_test_thread(void *arg) {
test_thread_data_t *data = (test_thread_data_t*)arg;
lockfree_queue_t *queue = (lockfree_queue_t*)data->data_structure;
clock_gettime(CLOCK_MONOTONIC, &data->start_time);
for (int i = 0; i < data->operations; i++) {
if (data->operation_type == 0) {
// Enqueue
int *value = malloc(sizeof(int));
*value = data->thread_id * 1000000 + i;
if (lockfree_queue_enqueue(queue, value)) {
data->successful_operations++;
}
} else if (data->operation_type == 1) {
// Dequeue
void *value;
if (lockfree_queue_dequeue(queue, &value)) {
data->successful_operations++;
free(value);
}
} else {
// Mixed operations
if (i % 2 == 0) {
int *value = malloc(sizeof(int));
*value = data->thread_id * 1000000 + i;
if (lockfree_queue_enqueue(queue, value)) {
data->successful_operations++;
}
} else {
void *value;
if (lockfree_queue_dequeue(queue, &value)) {
data->successful_operations++;
free(value);
}
}
}
}
clock_gettime(CLOCK_MONOTONIC, &data->end_time);
return NULL;
}
int benchmark_lockfree_structures(void) {
printf("=== Lock-Free Data Structure Benchmark ===\n");
const int num_threads = 8;
const int operations_per_thread = 100000;
// Test lock-free queue
printf("\nTesting lock-free queue:\n");
lockfree_queue_t *queue = lockfree_queue_create();
pthread_t threads[num_threads];
test_thread_data_t thread_data[num_threads];
// Mixed producers and consumers
for (int i = 0; i < num_threads; i++) {
thread_data[i].data_structure = queue;
thread_data[i].thread_id = i;
thread_data[i].operations = operations_per_thread;
thread_data[i].operation_type = (i < num_threads/2) ? 0 : 1; // Half producers, half consumers
thread_data[i].successful_operations = 0;
pthread_create(&threads[i], NULL, queue_test_thread, &thread_data[i]);
}
// Wait for completion
for (int i = 0; i < num_threads; i++) {
pthread_join(threads[i], NULL);
}
// Calculate results
long total_operations = 0;
double total_time = 0;
for (int i = 0; i < num_threads; i++) {
double thread_time = (thread_data[i].end_time.tv_sec - thread_data[i].start_time.tv_sec) +
(thread_data[i].end_time.tv_nsec - thread_data[i].start_time.tv_nsec) / 1e9;
total_operations += thread_data[i].successful_operations;
total_time += thread_time;
printf(" Thread %d: %d operations in %.3f seconds (%.0f ops/sec)\n",
i, thread_data[i].successful_operations, thread_time,
thread_data[i].successful_operations / thread_time);
}
double avg_time = total_time / num_threads;
printf(" Total successful operations: %ld\n", total_operations);
printf(" Average throughput: %.0f operations/second\n", total_operations / avg_time);
printf(" Queue final size: %zu\n", atomic_load(&queue->size));
return 0;
}
int main(void) {
srand(time(NULL));
return benchmark_lockfree_structures();
}
Parallel Processing Frameworks
OpenMP and CUDA Integration
// parallel_frameworks.c - OpenMP and parallel processing examples
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <omp.h>
#include <immintrin.h>
// Matrix operations with OpenMP
typedef struct {
double *data;
int rows;
int cols;
} matrix_t;
// Create matrix
matrix_t* matrix_create(int rows, int cols) {
matrix_t *matrix = malloc(sizeof(matrix_t));
if (!matrix) return NULL;
matrix->data = aligned_alloc(32, rows * cols * sizeof(double));
if (!matrix->data) {
free(matrix);
return NULL;
}
matrix->rows = rows;
matrix->cols = cols;
return matrix;
}
// Initialize matrix with random values
void matrix_random_init(matrix_t *matrix) {
#pragma omp parallel for
for (int i = 0; i < matrix->rows * matrix->cols; i++) {
matrix->data[i] = ((double)rand() / RAND_MAX) * 2.0 - 1.0;
}
}
// Matrix multiplication with OpenMP
matrix_t* matrix_multiply_openmp(const matrix_t *a, const matrix_t *b) {
if (a->cols != b->rows) {
return NULL;
}
matrix_t *result = matrix_create(a->rows, b->cols);
if (!result) return NULL;
#pragma omp parallel for collapse(2) schedule(dynamic)
for (int i = 0; i < a->rows; i++) {
for (int j = 0; j < b->cols; j++) {
double sum = 0.0;
#pragma omp simd reduction(+:sum)
for (int k = 0; k < a->cols; k++) {
sum += a->data[i * a->cols + k] * b->data[k * b->cols + j];
}
result->data[i * result->cols + j] = sum;
}
}
return result;
}
// Optimized matrix multiplication with blocking and vectorization
matrix_t* matrix_multiply_optimized(const matrix_t *a, const matrix_t *b) {
if (a->cols != b->rows) {
return NULL;
}
matrix_t *result = matrix_create(a->rows, b->cols);
if (!result) return NULL;
// Initialize result to zero
memset(result->data, 0, result->rows * result->cols * sizeof(double));
const int block_size = 64;
#pragma omp parallel for collapse(2) schedule(dynamic)
for (int ii = 0; ii < a->rows; ii += block_size) {
for (int jj = 0; jj < b->cols; jj += block_size) {
for (int kk = 0; kk < a->cols; kk += block_size) {
int i_max = (ii + block_size < a->rows) ? ii + block_size : a->rows;
int j_max = (jj + block_size < b->cols) ? jj + block_size : b->cols;
int k_max = (kk + block_size < a->cols) ? kk + block_size : a->cols;
for (int i = ii; i < i_max; i++) {
for (int j = jj; j < j_max; j += 4) {
__m256d sum = _mm256_setzero_pd();
for (int k = kk; k < k_max; k++) {
__m256d a_vec = _mm256_broadcast_sd(&a->data[i * a->cols + k]);
__m256d b_vec = _mm256_load_pd(&b->data[k * b->cols + j]);
sum = _mm256_fmadd_pd(a_vec, b_vec, sum);
}
__m256d old_result = _mm256_load_pd(&result->data[i * result->cols + j]);
__m256d new_result = _mm256_add_pd(old_result, sum);
_mm256_store_pd(&result->data[i * result->cols + j], new_result);
}
}
}
}
}
return result;
}
// Parallel algorithms demonstration
void parallel_algorithms_demo(void) {
printf("=== Parallel Algorithms Demonstration ===\n");
const int array_size = 10000000;
double *array = malloc(array_size * sizeof(double));
// Initialize array
#pragma omp parallel for
for (int i = 0; i < array_size; i++) {
array[i] = sin(i * 0.001) + cos(i * 0.002);
}
printf("Array size: %d elements\n", array_size);
printf("Number of threads: %d\n", omp_get_max_threads());
// Parallel reduction - sum
double start_time = omp_get_wtime();
double sum = 0.0;
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < array_size; i++) {
sum += array[i];
}
double end_time = omp_get_wtime();
printf("Parallel sum: %f (time: %.3f ms)\n",
sum, (end_time - start_time) * 1000);
// Parallel scan (prefix sum)
double *prefix_sum = malloc(array_size * sizeof(double));
start_time = omp_get_wtime();
// Two-phase parallel scan
const int num_threads = omp_get_max_threads();
double *thread_sums = calloc(num_threads, sizeof(double));
// Phase 1: Local scan within each thread
#pragma omp parallel
{
int tid = omp_get_thread_num();
int chunk_size = array_size / num_threads;
int start = tid * chunk_size;
int end = (tid == num_threads - 1) ? array_size : start + chunk_size;
if (start < end) {
prefix_sum[start] = array[start];
for (int i = start + 1; i < end; i++) {
prefix_sum[i] = prefix_sum[i-1] + array[i];
}
thread_sums[tid] = prefix_sum[end-1];
}
}
// Phase 2: Compute thread offsets
for (int i = 1; i < num_threads; i++) {
thread_sums[i] += thread_sums[i-1];
}
// Phase 3: Add offsets to local results
#pragma omp parallel
{
int tid = omp_get_thread_num();
if (tid > 0) {
int chunk_size = array_size / num_threads;
int start = tid * chunk_size;
int end = (tid == num_threads - 1) ? array_size : start + chunk_size;
for (int i = start; i < end; i++) {
prefix_sum[i] += thread_sums[tid-1];
}
}
}
end_time = omp_get_wtime();
printf("Parallel prefix sum completed (time: %.3f ms)\n",
(end_time - start_time) * 1000);
// Parallel sort (merge sort)
start_time = omp_get_wtime();
// Create copy for sorting
double *sort_array = malloc(array_size * sizeof(double));
memcpy(sort_array, array, array_size * sizeof(double));
// Parallel merge sort implementation
void parallel_merge_sort(double *arr, double *temp, int left, int right, int depth) {
if (left >= right) return;
int mid = (left + right) / 2;
if (depth > 0 && right - left > 1000) {
#pragma omp task
parallel_merge_sort(arr, temp, left, mid, depth - 1);
#pragma omp task
parallel_merge_sort(arr, temp, mid + 1, right, depth - 1);
#pragma omp taskwait
} else {
parallel_merge_sort(arr, temp, left, mid, 0);
parallel_merge_sort(arr, temp, mid + 1, right, 0);
}
// Merge
int i = left, j = mid + 1, k = left;
while (i <= mid && j <= right) {
if (arr[i] <= arr[j]) {
temp[k++] = arr[i++];
} else {
temp[k++] = arr[j++];
}
}
while (i <= mid) temp[k++] = arr[i++];
while (j <= right) temp[k++] = arr[j++];
for (int idx = left; idx <= right; idx++) {
arr[idx] = temp[idx];
}
}
double *temp_array = malloc(array_size * sizeof(double));
#pragma omp parallel
{
#pragma omp single
{
int max_depth = log2(omp_get_max_threads());
parallel_merge_sort(sort_array, temp_array, 0, array_size - 1, max_depth);
}
}
end_time = omp_get_wtime();
printf("Parallel merge sort completed (time: %.3f ms)\n",
(end_time - start_time) * 1000);
// Verify sort
bool sorted = true;
for (int i = 1; i < array_size && sorted; i++) {
if (sort_array[i] < sort_array[i-1]) {
sorted = false;
}
}
printf("Sort verification: %s\n", sorted ? "PASSED" : "FAILED");
// Cleanup
free(array);
free(prefix_sum);
free(sort_array);
free(temp_array);
free(thread_sums);
}
// Matrix benchmark
void matrix_benchmark(void) {
printf("\n=== Matrix Multiplication Benchmark ===\n");
const int sizes[] = {256, 512, 1024};
const int num_sizes = sizeof(sizes) / sizeof(sizes[0]);
for (int s = 0; s < num_sizes; s++) {
int size = sizes[s];
printf("\nMatrix size: %dx%d\n", size, size);
matrix_t *a = matrix_create(size, size);
matrix_t *b = matrix_create(size, size);
matrix_random_init(a);
matrix_random_init(b);
// Standard OpenMP multiplication
double start_time = omp_get_wtime();
matrix_t *result1 = matrix_multiply_openmp(a, b);
double openmp_time = omp_get_wtime() - start_time;
// Optimized multiplication
start_time = omp_get_wtime();
matrix_t *result2 = matrix_multiply_optimized(a, b);
double optimized_time = omp_get_wtime() - start_time;
// Calculate GFLOPS
double operations = 2.0 * size * size * size;
double openmp_gflops = operations / (openmp_time * 1e9);
double optimized_gflops = operations / (optimized_time * 1e9);
printf(" OpenMP: %.3f seconds (%.2f GFLOPS)\n", openmp_time, openmp_gflops);
printf(" Optimized: %.3f seconds (%.2f GFLOPS)\n", optimized_time, optimized_gflops);
printf(" Speedup: %.2fx\n", openmp_time / optimized_time);
// Cleanup
free(a->data); free(a);
free(b->data); free(b);
free(result1->data); free(result1);
free(result2->data); free(result2);
}
}
// OpenMP features demonstration
void openmp_features_demo(void) {
printf("\n=== OpenMP Features Demonstration ===\n");
// Task parallelism
printf("Task parallelism (Fibonacci):\n");
long fibonacci(int n) {
if (n < 2) return n;
if (n < 20) {
return fibonacci(n-1) + fibonacci(n-2);
}
long x, y;
#pragma omp task shared(x)
x = fibonacci(n-1);
#pragma omp task shared(y)
y = fibonacci(n-2);
#pragma omp taskwait
return x + y;
}
double start_time = omp_get_wtime();
long result;
#pragma omp parallel
{
#pragma omp single
{
result = fibonacci(40);
}
}
double end_time = omp_get_wtime();
printf(" Fibonacci(40) = %ld (time: %.3f seconds)\n",
result, end_time - start_time);
// Worksharing constructs
printf("\nWorksharing constructs:\n");
const int n = 1000;
int *array = malloc(n * sizeof(int));
// Parallel sections
#pragma omp parallel sections
{
#pragma omp section
{
printf(" Section 1: Initializing first half\n");
for (int i = 0; i < n/2; i++) {
array[i] = i * i;
}
}
#pragma omp section
{
printf(" Section 2: Initializing second half\n");
for (int i = n/2; i < n; i++) {
array[i] = i * i;
}
}
}
// Data environment
printf("\nData environment:\n");
int shared_var = 0;
int private_var = 10;
#pragma omp parallel firstprivate(private_var) shared(shared_var) num_threads(4)
{
int tid = omp_get_thread_num();
private_var += tid;
#pragma omp atomic
shared_var += private_var;
#pragma omp critical
{
printf(" Thread %d: private_var = %d\n", tid, private_var);
}
}
printf(" Final shared_var = %d\n", shared_var);
free(array);
}
int main(void) {
srand(time(NULL));
printf("OpenMP version: %d\n", _OPENMP);
printf("Max threads: %d\n\n", omp_get_max_threads());
parallel_algorithms_demo();
matrix_benchmark();
openmp_features_demo();
return 0;
}
Best Practices
- Thread Safety: Design data structures and algorithms to be thread-safe from the ground up
- Memory Management: Use hazard pointers or RCU for safe memory reclamation in lock-free code
- Load Balancing: Implement work-stealing and dynamic load balancing for optimal performance
- NUMA Awareness: Consider NUMA topology when designing parallel algorithms
- Profiling: Use tools like Intel VTune or perf to identify concurrency bottlenecks
Conclusion
Advanced concurrency and parallel programming requires deep understanding of hardware architecture, memory models, and synchronization techniques. From sophisticated thread pools and lock-free algorithms to parallel processing frameworks, these techniques enable building high-performance concurrent applications.
The future of parallel programming lies in heterogeneous computing, combining CPUs, GPUs, and specialized accelerators. By mastering these advanced concurrency techniques, developers can build applications that fully utilize modern computing resources and scale effectively across diverse hardware platforms.