POSIX threads (pthreads) form the backbone of multi-threaded programming in Linux. While creating threads is straightforward, building efficient, scalable, and correct multi-threaded applications requires deep understanding of synchronization primitives, memory models, and performance characteristics. This guide explores advanced pthread patterns and optimization techniques used in production systems.

Mastering POSIX Threads

Thread Lifecycle and Management

Advanced Thread Creation

#include <pthread.h>
#include <sched.h>
#include <sys/resource.h>

typedef struct {
    int thread_id;
    int cpu_affinity;
    size_t stack_size;
    void* (*work_function)(void*);
    void* work_data;
} thread_config_t;

pthread_t create_configured_thread(thread_config_t* config) {
    pthread_t thread;
    pthread_attr_t attr;
    
    // Initialize attributes
    pthread_attr_init(&attr);
    
    // Set stack size
    if (config->stack_size > 0) {
        pthread_attr_setstacksize(&attr, config->stack_size);
    }
    
    // Set detach state
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    
    // Create thread
    int ret = pthread_create(&thread, &attr, 
                            config->work_function, 
                            config->work_data);
    
    if (ret == 0 && config->cpu_affinity >= 0) {
        // Set CPU affinity
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(config->cpu_affinity, &cpuset);
        
        pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
    }
    
    // Set thread name for debugging
    char thread_name[16];
    snprintf(thread_name, sizeof(thread_name), "worker-%d", 
             config->thread_id);
    pthread_setname_np(thread, thread_name);
    
    pthread_attr_destroy(&attr);
    
    return thread;
}

// Thread-local storage for per-thread data
__thread int thread_local_id = -1;
__thread char thread_local_buffer[1024];

void* worker_thread(void* arg) {
    thread_config_t* config = (thread_config_t*)arg;
    thread_local_id = config->thread_id;
    
    // Set thread priority
    struct sched_param param = {
        .sched_priority = 10  // 1-99 for real-time
    };
    pthread_setschedparam(pthread_self(), SCHED_FIFO, &param);
    
    // Thread work...
    
    return NULL;
}

Thread Cancellation and Cleanup

// Cleanup handlers for resource management
typedef struct {
    int fd;
    void* buffer;
    pthread_mutex_t* mutex;
} cleanup_data_t;

void cleanup_handler(void* arg) {
    cleanup_data_t* data = (cleanup_data_t*)arg;
    
    if (data->fd >= 0) {
        close(data->fd);
    }
    
    if (data->buffer) {
        free(data->buffer);
    }
    
    if (data->mutex) {
        pthread_mutex_unlock(data->mutex);
    }
}

void* cancellable_thread(void* arg) {
    cleanup_data_t cleanup = {
        .fd = -1,
        .buffer = NULL,
        .mutex = NULL
    };
    
    // Push cleanup handler
    pthread_cleanup_push(cleanup_handler, &cleanup);
    
    // Set cancellation state
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
    
    // Allocate resources
    cleanup.buffer = malloc(4096);
    cleanup.fd = open("/tmp/data.txt", O_RDONLY);
    
    // Cancellation point
    pthread_testcancel();
    
    // Long-running operation with cancellation points
    while (1) {
        char buf[256];
        ssize_t n = read(cleanup.fd, buf, sizeof(buf));  // Cancellation point
        
        if (n <= 0) break;
        
        // Process data...
        pthread_testcancel();  // Explicit cancellation point
    }
    
    // Pop cleanup handler (execute if non-zero)
    pthread_cleanup_pop(1);
    
    return NULL;
}

Advanced Synchronization Primitives

Read-Write Locks with Priority

typedef struct {
    pthread_rwlock_t lock;
    pthread_mutex_t priority_mutex;
    int waiting_writers;
    int active_readers;
} priority_rwlock_t;

void priority_rwlock_init(priority_rwlock_t* rwl) {
    pthread_rwlock_init(&rwl->lock, NULL);
    pthread_mutex_init(&rwl->priority_mutex, NULL);
    rwl->waiting_writers = 0;
    rwl->active_readers = 0;
}

void priority_read_lock(priority_rwlock_t* rwl) {
    pthread_mutex_lock(&rwl->priority_mutex);
    
    // Wait if writers are waiting (writer priority)
    while (rwl->waiting_writers > 0) {
        pthread_mutex_unlock(&rwl->priority_mutex);
        usleep(1000);  // Yield to writers
        pthread_mutex_lock(&rwl->priority_mutex);
    }
    
    rwl->active_readers++;
    pthread_mutex_unlock(&rwl->priority_mutex);
    
    pthread_rwlock_rdlock(&rwl->lock);
}

void priority_write_lock(priority_rwlock_t* rwl) {
    pthread_mutex_lock(&rwl->priority_mutex);
    rwl->waiting_writers++;
    pthread_mutex_unlock(&rwl->priority_mutex);
    
    pthread_rwlock_wrlock(&rwl->lock);
    
    pthread_mutex_lock(&rwl->priority_mutex);
    rwl->waiting_writers--;
    pthread_mutex_unlock(&rwl->priority_mutex);
}

Condition Variables with Timeouts

typedef struct {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int value;
    int waiters;
} timed_event_t;

int wait_for_event_timeout(timed_event_t* event, int expected_value, 
                          int timeout_ms) {
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    
    // Add timeout
    ts.tv_sec += timeout_ms / 1000;
    ts.tv_nsec += (timeout_ms % 1000) * 1000000;
    if (ts.tv_nsec >= 1000000000) {
        ts.tv_sec++;
        ts.tv_nsec -= 1000000000;
    }
    
    pthread_mutex_lock(&event->mutex);
    event->waiters++;
    
    int ret = 0;
    while (event->value != expected_value && ret == 0) {
        ret = pthread_cond_timedwait(&event->cond, &event->mutex, &ts);
    }
    
    event->waiters--;
    int result = (event->value == expected_value) ? 0 : -1;
    pthread_mutex_unlock(&event->mutex);
    
    return result;
}

// Broadcast with predicate
void signal_event(timed_event_t* event, int new_value) {
    pthread_mutex_lock(&event->mutex);
    event->value = new_value;
    
    if (event->waiters > 0) {
        pthread_cond_broadcast(&event->cond);
    }
    
    pthread_mutex_unlock(&event->mutex);
}

Lock-Free Programming

Compare-and-Swap Operations

#include <stdatomic.h>

typedef struct node {
    void* data;
    struct node* next;
} node_t;

typedef struct {
    _Atomic(node_t*) head;
    _Atomic(size_t) size;
} lockfree_stack_t;

void lockfree_push(lockfree_stack_t* stack, void* data) {
    node_t* new_node = malloc(sizeof(node_t));
    new_node->data = data;
    
    node_t* head;
    do {
        head = atomic_load(&stack->head);
        new_node->next = head;
    } while (!atomic_compare_exchange_weak(&stack->head, &head, new_node));
    
    atomic_fetch_add(&stack->size, 1);
}

void* lockfree_pop(lockfree_stack_t* stack) {
    node_t* head;
    node_t* next;
    
    do {
        head = atomic_load(&stack->head);
        if (head == NULL) {
            return NULL;
        }
        next = head->next;
    } while (!atomic_compare_exchange_weak(&stack->head, &head, next));
    
    void* data = head->data;
    free(head);
    
    atomic_fetch_sub(&stack->size, 1);
    return data;
}

// Lock-free counter with backoff
typedef struct {
    _Atomic(int64_t) value;
    char padding[64 - sizeof(_Atomic(int64_t))];  // Prevent false sharing
} aligned_counter_t;

void increment_with_backoff(aligned_counter_t* counter) {
    int backoff = 1;
    
    while (1) {
        int64_t current = atomic_load_explicit(&counter->value, 
                                             memory_order_relaxed);
        
        if (atomic_compare_exchange_weak_explicit(&counter->value, 
                                                 &current, 
                                                 current + 1,
                                                 memory_order_release,
                                                 memory_order_relaxed)) {
            break;
        }
        
        // Exponential backoff
        for (int i = 0; i < backoff; i++) {
            __builtin_ia32_pause();  // CPU pause instruction
        }
        
        backoff = (backoff < 1024) ? backoff * 2 : backoff;
    }
}

Thread Pool Implementation

Work-Stealing Thread Pool

typedef struct work_item {
    void (*function)(void*);
    void* arg;
    struct work_item* next;
} work_item_t;

typedef struct {
    pthread_mutex_t mutex;
    work_item_t* head;
    work_item_t* tail;
    _Atomic(int) size;
} work_queue_t;

typedef struct {
    int num_threads;
    pthread_t* threads;
    work_queue_t* queues;  // Per-thread queues
    _Atomic(int) running;
    _Atomic(int) active_threads;
} thread_pool_t;

void* worker_thread_steal(void* arg) {
    thread_pool_t* pool = (thread_pool_t*)arg;
    int thread_id = (int)(intptr_t)pthread_getspecific(thread_id_key);
    work_queue_t* my_queue = &pool->queues[thread_id];
    
    while (atomic_load(&pool->running)) {
        work_item_t* item = NULL;
        
        // Try to get work from own queue
        pthread_mutex_lock(&my_queue->mutex);
        if (my_queue->head) {
            item = my_queue->head;
            my_queue->head = item->next;
            if (!my_queue->head) {
                my_queue->tail = NULL;
            }
            atomic_fetch_sub(&my_queue->size, 1);
        }
        pthread_mutex_unlock(&my_queue->mutex);
        
        // If no work, try to steal from others
        if (!item) {
            for (int i = 0; i < pool->num_threads && !item; i++) {
                if (i == thread_id) continue;
                
                work_queue_t* victim = &pool->queues[i];
                
                pthread_mutex_lock(&victim->mutex);
                if (atomic_load(&victim->size) > 1) {  // Leave some work
                    item = victim->head;
                    victim->head = item->next;
                    if (!victim->head) {
                        victim->tail = NULL;
                    }
                    atomic_fetch_sub(&victim->size, 1);
                }
                pthread_mutex_unlock(&victim->mutex);
            }
        }
        
        if (item) {
            atomic_fetch_add(&pool->active_threads, 1);
            item->function(item->arg);
            atomic_fetch_sub(&pool->active_threads, 1);
            free(item);
        } else {
            // No work available, sleep briefly
            usleep(1000);
        }
    }
    
    return NULL;
}

void thread_pool_submit(thread_pool_t* pool, 
                       void (*function)(void*), 
                       void* arg) {
    work_item_t* item = malloc(sizeof(work_item_t));
    item->function = function;
    item->arg = arg;
    item->next = NULL;
    
    // Simple round-robin distribution
    static _Atomic(int) next_queue = 0;
    int queue_id = atomic_fetch_add(&next_queue, 1) % pool->num_threads;
    work_queue_t* queue = &pool->queues[queue_id];
    
    pthread_mutex_lock(&queue->mutex);
    if (queue->tail) {
        queue->tail->next = item;
    } else {
        queue->head = item;
    }
    queue->tail = item;
    atomic_fetch_add(&queue->size, 1);
    pthread_mutex_unlock(&queue->mutex);
}

Memory Ordering and Barriers

Memory Fence Examples

// Producer-consumer with memory barriers
typedef struct {
    _Atomic(int) sequence;
    void* data;
    char padding[64 - sizeof(int) - sizeof(void*)];
} seqlock_t;

void seqlock_write(seqlock_t* lock, void* new_data) {
    int seq = atomic_load_explicit(&lock->sequence, memory_order_relaxed);
    
    // Increment sequence (make it odd)
    atomic_store_explicit(&lock->sequence, seq + 1, memory_order_release);
    
    // Memory barrier ensures sequence update is visible
    atomic_thread_fence(memory_order_acquire);
    
    // Update data
    lock->data = new_data;
    
    // Memory barrier ensures data update completes
    atomic_thread_fence(memory_order_release);
    
    // Increment sequence again (make it even)
    atomic_store_explicit(&lock->sequence, seq + 2, memory_order_release);
}

void* seqlock_read(seqlock_t* lock) {
    void* data;
    int seq1, seq2;
    
    do {
        // Read sequence
        seq1 = atomic_load_explicit(&lock->sequence, memory_order_acquire);
        
        // If odd, writer is active
        if (seq1 & 1) {
            continue;
        }
        
        // Read data
        atomic_thread_fence(memory_order_acquire);
        data = lock->data;
        atomic_thread_fence(memory_order_acquire);
        
        // Check sequence again
        seq2 = atomic_load_explicit(&lock->sequence, memory_order_acquire);
        
    } while (seq1 != seq2);  // Retry if sequence changed
    
    return data;
}

Performance Optimization

Cache-Line Aware Programming

#define CACHE_LINE_SIZE 64

// Aligned data structures to prevent false sharing
typedef struct {
    _Atomic(int64_t) counter;
    char padding[CACHE_LINE_SIZE - sizeof(_Atomic(int64_t))];
} __attribute__((aligned(CACHE_LINE_SIZE))) cache_aligned_counter_t;

typedef struct {
    // Read-mostly data together
    struct {
        void* config;
        int flags;
        char padding[CACHE_LINE_SIZE - sizeof(void*) - sizeof(int)];
    } __attribute__((aligned(CACHE_LINE_SIZE))) read_only;
    
    // Frequently written data on separate cache lines
    cache_aligned_counter_t counters[MAX_THREADS];
    
} cache_optimized_stats_t;

// NUMA-aware memory allocation
void* numa_aware_alloc(size_t size, int numa_node) {
    void* ptr = NULL;
    
    #ifdef _GNU_SOURCE
    // Allocate on specific NUMA node
    ptr = numa_alloc_onnode(size, numa_node);
    #else
    ptr = aligned_alloc(CACHE_LINE_SIZE, size);
    #endif
    
    return ptr;
}

Thread-Local Storage Optimization

// Fast thread-local allocation pools
typedef struct {
    void* free_list;
    size_t allocated;
    size_t freed;
} thread_pool_t;

__thread thread_pool_t local_pool = {0};

void* fast_alloc(size_t size) {
    // Try thread-local pool first
    if (local_pool.free_list) {
        void* ptr = local_pool.free_list;
        local_pool.free_list = *(void**)ptr;
        local_pool.allocated++;
        return ptr;
    }
    
    // Fall back to malloc
    return malloc(size);
}

void fast_free(void* ptr) {
    // Return to thread-local pool
    *(void**)ptr = local_pool.free_list;
    local_pool.free_list = ptr;
    local_pool.freed++;
}

Debugging Multi-threaded Applications

Thread Sanitizer Integration

// Annotations for thread sanitizer
#ifdef __has_feature
  #if __has_feature(thread_sanitizer)
    #define TSAN_ENABLED
  #endif
#endif

#ifdef TSAN_ENABLED
  void __tsan_acquire(void *addr);
  void __tsan_release(void *addr);
  
  #define ANNOTATE_HAPPENS_BEFORE(addr) __tsan_release(addr)
  #define ANNOTATE_HAPPENS_AFTER(addr) __tsan_acquire(addr)
#else
  #define ANNOTATE_HAPPENS_BEFORE(addr)
  #define ANNOTATE_HAPPENS_AFTER(addr)
#endif

// Custom synchronization with annotations
typedef struct {
    _Atomic(int) flag;
    void* data;
} custom_sync_t;

void custom_sync_publish(custom_sync_t* sync, void* data) {
    sync->data = data;
    ANNOTATE_HAPPENS_BEFORE(&sync->flag);
    atomic_store(&sync->flag, 1);
}

void* custom_sync_consume(custom_sync_t* sync) {
    while (atomic_load(&sync->flag) == 0) {
        pthread_yield();
    }
    ANNOTATE_HAPPENS_AFTER(&sync->flag);
    return sync->data;
}

Performance Profiling

typedef struct {
    struct timespec start;
    struct timespec end;
    const char* name;
} profile_section_t;

__thread profile_section_t prof_stack[100];
__thread int prof_depth = 0;

void prof_enter(const char* name) {
    prof_stack[prof_depth].name = name;
    clock_gettime(CLOCK_MONOTONIC, &prof_stack[prof_depth].start);
    prof_depth++;
}

void prof_exit() {
    prof_depth--;
    clock_gettime(CLOCK_MONOTONIC, &prof_stack[prof_depth].end);
    
    double elapsed = (prof_stack[prof_depth].end.tv_sec - 
                     prof_stack[prof_depth].start.tv_sec) +
                    (prof_stack[prof_depth].end.tv_nsec - 
                     prof_stack[prof_depth].start.tv_nsec) / 1e9;
    
    printf("Thread %ld: %s took %.6f seconds\n",
           pthread_self(), prof_stack[prof_depth].name, elapsed);
}

#define PROFILE(name) \
    prof_enter(name); \
    __attribute__((cleanup(prof_exit_cleanup))) int _prof_guard = 0

void prof_exit_cleanup(int* unused) {
    prof_exit();
}

Real-World Patterns

Async Task System

typedef struct {
    void (*callback)(void*, int);
    void* user_data;
} completion_handler_t;

typedef struct {
    void* (*task)(void*);
    void* arg;
    completion_handler_t completion;
} async_task_t;

typedef struct {
    thread_pool_t* pool;
    pthread_mutex_t completion_mutex;
    pthread_cond_t completion_cond;
    GHashTable* pending_tasks;  // task_id -> result
} async_executor_t;

int async_execute(async_executor_t* executor, 
                 async_task_t* task,
                 int* task_id) {
    static _Atomic(int) next_id = 1;
    *task_id = atomic_fetch_add(&next_id, 1);
    
    // Wrapper to handle completion
    typedef struct {
        async_executor_t* executor;
        async_task_t task;
        int id;
    } task_wrapper_t;
    
    task_wrapper_t* wrapper = malloc(sizeof(task_wrapper_t));
    wrapper->executor = executor;
    wrapper->task = *task;
    wrapper->id = *task_id;
    
    thread_pool_submit(executor->pool, async_task_wrapper, wrapper);
    
    return 0;
}

void async_task_wrapper(void* arg) {
    task_wrapper_t* wrapper = (task_wrapper_t*)arg;
    
    // Execute task
    void* result = wrapper->task.task(wrapper->task.arg);
    
    // Store result
    pthread_mutex_lock(&wrapper->executor->completion_mutex);
    g_hash_table_insert(wrapper->executor->pending_tasks,
                       GINT_TO_POINTER(wrapper->id),
                       result);
    pthread_cond_broadcast(&wrapper->executor->completion_cond);
    pthread_mutex_unlock(&wrapper->executor->completion_mutex);
    
    // Call completion handler
    if (wrapper->task.completion.callback) {
        wrapper->task.completion.callback(
            wrapper->task.completion.user_data,
            wrapper->id
        );
    }
    
    free(wrapper);
}

Conclusion

POSIX threads provide a powerful foundation for concurrent programming in Linux, but realizing their full potential requires understanding advanced patterns, synchronization primitives, and performance characteristics. From lock-free data structures to NUMA-aware optimizations, from work-stealing thread pools to custom synchronization primitives, the techniques covered here form the building blocks of high-performance multi-threaded applications.

The key to successful pthread programming lies in choosing the right synchronization primitive for each use case, understanding memory ordering requirements, and carefully considering cache effects and false sharing. By mastering these concepts and patterns, you can build concurrent applications that fully utilize modern multi-core processors while maintaining correctness and avoiding the pitfalls of parallel programming.