Advanced Process and Thread Management Techniques in Linux
Advanced process and thread management forms the backbone of high-performance enterprise systems. This comprehensive guide explores sophisticated techniques for controlling, orchestrating, and optimizing processes and threads in Linux environments, covering everything from low-level scheduling mechanisms to enterprise-scale process orchestration patterns.
Advanced Process Creation and Control
Modern Linux systems provide sophisticated mechanisms for process creation, control, and monitoring that go far beyond basic fork() and exec() operations.
Process Creation with Fine-Grained Control
#define _GNU_SOURCE
#include <sched.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/mount.h>
#include <sys/prctl.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
typedef struct {
pid_t pid;
int clone_flags;
cpu_set_t cpu_affinity;
int priority;
int nice_value;
// Resource limits
struct rlimit memory_limit;
struct rlimit cpu_limit;
struct rlimit file_limit;
// Namespace information
char *mount_namespace;
char *net_namespace;
char *pid_namespace;
char *user_namespace;
// Control structures
int control_pipe[2];
int status_pipe[2];
// Security context
uid_t uid;
gid_t gid;
char *security_label;
// Monitoring
struct timespec start_time;
struct rusage resource_usage;
} advanced_process_t;
// Stack for clone() system call
#define STACK_SIZE (1024 * 1024)
// Child process entry point with comprehensive setup
int child_process_main(void *arg) {
advanced_process_t *proc = (advanced_process_t*)arg;
// Set process title
prctl(PR_SET_NAME, "managed_process", 0, 0, 0);
// Set up death signal to parent
prctl(PR_SET_PDEATHSIG, SIGTERM);
// Apply CPU affinity
if (sched_setaffinity(0, sizeof(cpu_set_t), &proc->cpu_affinity) < 0) {
perror("Failed to set CPU affinity");
}
// Set scheduling priority
struct sched_param param;
param.sched_priority = proc->priority;
if (sched_setscheduler(0, SCHED_FIFO, ¶m) < 0) {
// Fallback to nice value
if (setpriority(PRIO_PROCESS, 0, proc->nice_value) < 0) {
perror("Failed to set process priority");
}
}
// Apply resource limits
setrlimit(RLIMIT_AS, &proc->memory_limit);
setrlimit(RLIMIT_CPU, &proc->cpu_limit);
setrlimit(RLIMIT_NOFILE, &proc->file_limit);
// Set user/group ID
if (proc->gid != 0 && setgid(proc->gid) < 0) {
perror("Failed to set GID");
return -1;
}
if (proc->uid != 0 && setuid(proc->uid) < 0) {
perror("Failed to set UID");
return -1;
}
// Signal readiness to parent
char ready_signal = 1;
write(proc->status_pipe[1], &ready_signal, 1);
close(proc->status_pipe[1]);
// Wait for start signal from parent
char start_signal;
read(proc->control_pipe[0], &start_signal, 1);
close(proc->control_pipe[0]);
// Execute main program logic here
// This would be replaced with actual application code
return 0;
}
// Create advanced process with full control
advanced_process_t* create_advanced_process(int clone_flags,
const cpu_set_t *cpu_affinity,
int priority,
const struct rlimit *limits) {
advanced_process_t *proc = calloc(1, sizeof(advanced_process_t));
if (!proc) return NULL;
proc->clone_flags = clone_flags;
if (cpu_affinity) {
proc->cpu_affinity = *cpu_affinity;
} else {
CPU_ZERO(&proc->cpu_affinity);
CPU_SET(0, &proc->cpu_affinity); // Default to CPU 0
}
proc->priority = priority;
proc->nice_value = 0;
// Set default resource limits
if (limits) {
proc->memory_limit = limits[0];
proc->cpu_limit = limits[1];
proc->file_limit = limits[2];
} else {
// Set reasonable defaults
proc->memory_limit.rlim_cur = 1024 * 1024 * 1024; // 1GB
proc->memory_limit.rlim_max = 2048 * 1024 * 1024; // 2GB
proc->cpu_limit.rlim_cur = 300; // 5 minutes
proc->cpu_limit.rlim_max = 600; // 10 minutes
proc->file_limit.rlim_cur = 1024;
proc->file_limit.rlim_max = 2048;
}
// Create communication pipes
if (pipe(proc->control_pipe) < 0 || pipe(proc->status_pipe) < 0) {
free(proc);
return NULL;
}
// Allocate stack for child
void *child_stack = malloc(STACK_SIZE);
if (!child_stack) {
close(proc->control_pipe[0]);
close(proc->control_pipe[1]);
close(proc->status_pipe[0]);
close(proc->status_pipe[1]);
free(proc);
return NULL;
}
// Create process using clone()
proc->pid = clone(child_process_main,
(char*)child_stack + STACK_SIZE,
proc->clone_flags | SIGCHLD,
proc);
if (proc->pid < 0) {
perror("Failed to create process");
free(child_stack);
close(proc->control_pipe[0]);
close(proc->control_pipe[1]);
close(proc->status_pipe[0]);
close(proc->status_pipe[1]);
free(proc);
return NULL;
}
// Close child ends of pipes
close(proc->control_pipe[0]);
close(proc->status_pipe[1]);
// Wait for child readiness
char ready_signal;
read(proc->status_pipe[0], &ready_signal, 1);
close(proc->status_pipe[0]);
clock_gettime(CLOCK_MONOTONIC, &proc->start_time);
return proc;
}
// Start the created process
void start_advanced_process(advanced_process_t *proc) {
char start_signal = 1;
write(proc->control_pipe[1], &start_signal, 1);
close(proc->control_pipe[1]);
}
// Monitor process resource usage
void update_process_stats(advanced_process_t *proc) {
if (getrusage(RUSAGE_CHILDREN, &proc->resource_usage) < 0) {
perror("Failed to get resource usage");
}
}
Container and Namespace Management
Linux namespaces provide powerful isolation mechanisms that are essential for modern container and security architectures.
Comprehensive Namespace Management
#include <sys/mount.h>
#include <sys/stat.h>
typedef struct {
char *name;
int type; // CLONE_NEWPID, CLONE_NEWNET, etc.
int fd; // File descriptor for the namespace
bool active;
} namespace_t;
typedef struct {
namespace_t namespaces[8]; // Support for all namespace types
int namespace_count;
char *container_root;
bool isolated;
// Network configuration for NEWNET
char *veth_pair[2];
char *bridge_name;
char *ip_address;
// Mount configuration for NEWNS
char **bind_mounts;
char **mount_targets;
int mount_count;
} namespace_manager_t;
// Initialize namespace manager
namespace_manager_t* create_namespace_manager(const char *container_name) {
namespace_manager_t *manager = calloc(1, sizeof(namespace_manager_t));
if (!manager) return NULL;
manager->container_root = malloc(256);
snprintf(manager->container_root, 256, "/var/lib/containers/%s", container_name);
// Create container directory
mkdir(manager->container_root, 0755);
return manager;
}
// Add namespace to management
int add_namespace(namespace_manager_t *manager, const char *name, int type) {
if (manager->namespace_count >= 8) return -1;
namespace_t *ns = &manager->namespaces[manager->namespace_count];
ns->name = strdup(name);
ns->type = type;
ns->fd = -1;
ns->active = false;
manager->namespace_count++;
return 0;
}
// Create isolated process with custom namespaces
pid_t create_namespaced_process(namespace_manager_t *manager,
int (*child_func)(void*),
void *child_arg) {
int clone_flags = SIGCHLD;
// Build clone flags based on configured namespaces
for (int i = 0; i < manager->namespace_count; i++) {
clone_flags |= manager->namespaces[i].type;
}
void *child_stack = malloc(STACK_SIZE);
if (!child_stack) return -1;
pid_t child_pid = clone(child_func,
(char*)child_stack + STACK_SIZE,
clone_flags,
child_arg);
if (child_pid > 0) {
// Save namespace file descriptors for later access
save_namespace_fds(manager, child_pid);
}
return child_pid;
}
// Save namespace file descriptors for management
void save_namespace_fds(namespace_manager_t *manager, pid_t pid) {
char ns_path[256];
for (int i = 0; i < manager->namespace_count; i++) {
const char *ns_name = get_namespace_name(manager->namespaces[i].type);
snprintf(ns_path, sizeof(ns_path), "/proc/%d/ns/%s", pid, ns_name);
manager->namespaces[i].fd = open(ns_path, O_RDONLY);
if (manager->namespaces[i].fd >= 0) {
manager->namespaces[i].active = true;
}
}
}
// Get namespace name from type
const char* get_namespace_name(int type) {
switch (type) {
case CLONE_NEWPID: return "pid";
case CLONE_NEWNET: return "net";
case CLONE_NEWNS: return "mnt";
case CLONE_NEWUTS: return "uts";
case CLONE_NEWIPC: return "ipc";
case CLONE_NEWUSER: return "user";
case CLONE_NEWCGROUP: return "cgroup";
default: return "unknown";
}
}
// Enter existing namespace
int enter_namespace(namespace_manager_t *manager, const char *ns_name) {
for (int i = 0; i < manager->namespace_count; i++) {
if (strcmp(manager->namespaces[i].name, ns_name) == 0 &&
manager->namespaces[i].active) {
if (setns(manager->namespaces[i].fd, manager->namespaces[i].type) < 0) {
perror("Failed to enter namespace");
return -1;
}
return 0;
}
}
return -1;
}
// Set up network namespace with veth pair
int setup_network_namespace(namespace_manager_t *manager) {
// This is a simplified version - real implementation would use netlink
char cmd[512];
// Create veth pair
snprintf(cmd, sizeof(cmd),
"ip link add %s type veth peer name %s",
manager->veth_pair[0], manager->veth_pair[1]);
system(cmd);
// Move one end to namespace
snprintf(cmd, sizeof(cmd),
"ip link set %s netns %d",
manager->veth_pair[1], getpid());
system(cmd);
// Configure IP address
if (manager->ip_address) {
snprintf(cmd, sizeof(cmd),
"ip addr add %s dev %s",
manager->ip_address, manager->veth_pair[1]);
system(cmd);
snprintf(cmd, sizeof(cmd),
"ip link set %s up",
manager->veth_pair[1]);
system(cmd);
}
return 0;
}
// Set up mount namespace with bind mounts
int setup_mount_namespace(namespace_manager_t *manager) {
// Create new mount namespace
if (unshare(CLONE_NEWNS) < 0) {
perror("Failed to create mount namespace");
return -1;
}
// Make all mounts private to prevent propagation
if (mount(NULL, "/", NULL, MS_REC | MS_PRIVATE, NULL) < 0) {
perror("Failed to make mounts private");
return -1;
}
// Set up bind mounts
for (int i = 0; i < manager->mount_count; i++) {
if (mount(manager->bind_mounts[i], manager->mount_targets[i],
NULL, MS_BIND, NULL) < 0) {
fprintf(stderr, "Failed to bind mount %s to %s\n",
manager->bind_mounts[i], manager->mount_targets[i]);
}
}
return 0;
}
Advanced Thread Management
Sophisticated thread management requires understanding of thread affinity, scheduling policies, and coordination mechanisms.
High-Performance Thread Pool Implementation
#include <pthread.h>
#include <semaphore.h>
#include <stdatomic.h>
typedef struct task {
void (*function)(void *arg);
void *argument;
int priority;
struct timespec submit_time;
struct task *next;
} task_t;
typedef struct {
pthread_t thread_id;
int cpu_affinity;
bool active;
atomic_size_t tasks_completed;
atomic_size_t tasks_failed;
struct timespec total_execution_time;
// Thread-local statistics
double average_task_time;
size_t cache_misses;
size_t context_switches;
} worker_thread_t;
typedef struct {
worker_thread_t *workers;
int num_workers;
int max_workers;
int min_workers;
// Task queue with priority support
task_t *task_queue_head;
task_t *task_queue_tail;
task_t *high_priority_queue;
atomic_size_t queue_size;
size_t max_queue_size;
// Synchronization
pthread_mutex_t queue_mutex;
pthread_cond_t work_available;
pthread_cond_t queue_not_full;
// Thread pool control
atomic_bool shutdown;
atomic_bool immediate_shutdown;
// Adaptive scaling
atomic_size_t idle_workers;
atomic_size_t busy_workers;
time_t last_scale_time;
int scale_interval;
// Load balancing
int load_balance_strategy; // 0: round-robin, 1: least-loaded, 2: work-stealing
atomic_size_t next_worker;
// Performance monitoring
atomic_size_t total_tasks_submitted;
atomic_size_t total_tasks_completed;
atomic_size_t tasks_rejected;
double average_queue_wait_time;
} thread_pool_t;
// Create thread pool with advanced configuration
thread_pool_t* create_thread_pool(int initial_workers, int max_workers,
int max_queue_size, int load_balance_strategy) {
thread_pool_t *pool = calloc(1, sizeof(thread_pool_t));
if (!pool) return NULL;
pool->num_workers = initial_workers;
pool->max_workers = max_workers;
pool->min_workers = initial_workers;
pool->max_queue_size = max_queue_size;
pool->load_balance_strategy = load_balance_strategy;
pool->scale_interval = 30; // 30 seconds
pool->workers = calloc(max_workers, sizeof(worker_thread_t));
if (!pool->workers) {
free(pool);
return NULL;
}
// Initialize synchronization primitives
pthread_mutex_init(&pool->queue_mutex, NULL);
pthread_cond_init(&pool->work_available, NULL);
pthread_cond_init(&pool->queue_not_full, NULL);
atomic_store(&pool->shutdown, false);
atomic_store(&pool->immediate_shutdown, false);
atomic_store(&pool->queue_size, 0);
atomic_store(&pool->idle_workers, 0);
atomic_store(&pool->busy_workers, 0);
atomic_store(&pool->next_worker, 0);
// Create initial worker threads
for (int i = 0; i < initial_workers; i++) {
create_worker_thread(pool, i);
}
return pool;
}
// Worker thread function with advanced features
void* worker_thread_function(void *arg) {
thread_pool_t *pool = (thread_pool_t*)arg;
int worker_id = atomic_fetch_add(&pool->next_worker, 1) % pool->max_workers;
worker_thread_t *worker = &pool->workers[worker_id];
worker->thread_id = pthread_self();
worker->active = true;
// Set CPU affinity if specified
if (worker->cpu_affinity >= 0) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(worker->cpu_affinity, &cpuset);
pthread_setaffinity_np(worker->thread_id, sizeof(cpu_set_t), &cpuset);
}
// Set thread name for debugging
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "worker_%d", worker_id);
pthread_setname_np(worker->thread_id, thread_name);
while (!atomic_load(&pool->shutdown)) {
task_t *task = NULL;
pthread_mutex_lock(&pool->queue_mutex);
// Wait for work or shutdown signal
while (pool->task_queue_head == NULL &&
pool->high_priority_queue == NULL &&
!atomic_load(&pool->shutdown)) {
atomic_fetch_add(&pool->idle_workers, 1);
pthread_cond_wait(&pool->work_available, &pool->queue_mutex);
atomic_fetch_sub(&pool->idle_workers, 1);
}
if (atomic_load(&pool->shutdown)) {
pthread_mutex_unlock(&pool->queue_mutex);
break;
}
// Dequeue task (high priority first)
if (pool->high_priority_queue) {
task = pool->high_priority_queue;
pool->high_priority_queue = task->next;
} else if (pool->task_queue_head) {
task = pool->task_queue_head;
pool->task_queue_head = task->next;
if (pool->task_queue_head == NULL) {
pool->task_queue_tail = NULL;
}
}
if (task) {
atomic_fetch_sub(&pool->queue_size, 1);
pthread_cond_signal(&pool->queue_not_full);
}
pthread_mutex_unlock(&pool->queue_mutex);
if (task) {
atomic_fetch_add(&pool->busy_workers, 1);
struct timespec start_time, end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
// Execute task
if (task->function) {
task->function(task->argument);
atomic_fetch_add(&worker->tasks_completed, 1);
atomic_fetch_add(&pool->total_tasks_completed, 1);
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
// Update statistics
double execution_time = (end_time.tv_sec - start_time.tv_sec) +
(end_time.tv_nsec - start_time.tv_nsec) / 1e9;
worker->average_task_time = (worker->average_task_time * 0.9) +
(execution_time * 0.1);
free(task);
atomic_fetch_sub(&pool->busy_workers, 1);
}
}
worker->active = false;
return NULL;
}
// Submit task with priority
int submit_task(thread_pool_t *pool, void (*function)(void*),
void *argument, int priority) {
if (atomic_load(&pool->shutdown)) {
return -1; // Pool is shutting down
}
task_t *task = malloc(sizeof(task_t));
if (!task) return -1;
task->function = function;
task->argument = argument;
task->priority = priority;
task->next = NULL;
clock_gettime(CLOCK_MONOTONIC, &task->submit_time);
pthread_mutex_lock(&pool->queue_mutex);
// Check queue size limit
while (atomic_load(&pool->queue_size) >= pool->max_queue_size &&
!atomic_load(&pool->shutdown)) {
pthread_cond_wait(&pool->queue_not_full, &pool->queue_mutex);
}
if (atomic_load(&pool->shutdown)) {
pthread_mutex_unlock(&pool->queue_mutex);
free(task);
return -1;
}
// Add to appropriate queue based on priority
if (priority > 0) {
// High priority queue (LIFO for high priority)
task->next = pool->high_priority_queue;
pool->high_priority_queue = task;
} else {
// Normal priority queue (FIFO)
if (pool->task_queue_tail) {
pool->task_queue_tail->next = task;
} else {
pool->task_queue_head = task;
}
pool->task_queue_tail = task;
}
atomic_fetch_add(&pool->queue_size, 1);
atomic_fetch_add(&pool->total_tasks_submitted, 1);
pthread_cond_signal(&pool->work_available);
pthread_mutex_unlock(&pool->queue_mutex);
// Trigger adaptive scaling check
check_adaptive_scaling(pool);
return 0;
}
// Adaptive thread pool scaling
void check_adaptive_scaling(thread_pool_t *pool) {
time_t current_time = time(NULL);
if (current_time - pool->last_scale_time < pool->scale_interval) {
return; // Too soon to scale
}
pool->last_scale_time = current_time;
size_t queue_size = atomic_load(&pool->queue_size);
size_t idle_workers = atomic_load(&pool->idle_workers);
size_t busy_workers = atomic_load(&pool->busy_workers);
// Scale up if queue is building up and we have capacity
if (queue_size > pool->num_workers * 2 &&
pool->num_workers < pool->max_workers &&
idle_workers < 2) {
printf("Scaling up thread pool: %d -> %d workers\n",
pool->num_workers, pool->num_workers + 1);
create_worker_thread(pool, pool->num_workers);
pool->num_workers++;
}
// Scale down if too many idle workers
else if (idle_workers > pool->num_workers / 2 &&
pool->num_workers > pool->min_workers &&
queue_size < pool->num_workers / 4) {
printf("Scaling down thread pool: %d -> %d workers\n",
pool->num_workers, pool->num_workers - 1);
// Signal a worker to exit (simplified)
pool->num_workers--;
}
}
// Create individual worker thread
void create_worker_thread(thread_pool_t *pool, int worker_index) {
worker_thread_t *worker = &pool->workers[worker_index];
// Set CPU affinity for NUMA optimization
worker->cpu_affinity = worker_index % sysconf(_SC_NPROCESSORS_ONLN);
pthread_create(&worker->thread_id, NULL, worker_thread_function, pool);
}
Inter-Process Communication (IPC) Mechanisms
Advanced IPC mechanisms are essential for coordinating complex multi-process applications.
High-Performance Shared Memory IPC
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/msg.h>
#include <mqueue.h>
// Shared memory ring buffer for high-throughput IPC
typedef struct {
atomic_size_t head;
atomic_size_t tail;
size_t size;
size_t mask; // size - 1, for power-of-2 sizes
char padding1[64]; // Cache line padding
// Producer/consumer control
atomic_bool producer_waiting;
atomic_bool consumer_waiting;
char padding2[64];
// Statistics
atomic_size_t messages_sent;
atomic_size_t messages_received;
atomic_size_t buffer_overruns;
// Data buffer follows
char data[];
} shm_ring_buffer_t;
typedef struct {
key_t shm_key;
int shm_id;
shm_ring_buffer_t *buffer;
size_t buffer_size;
size_t message_size;
// Synchronization
int sem_id;
// Process identification
pid_t producer_pid;
pid_t consumer_pid;
bool is_producer;
} shm_ipc_context_t;
// Create shared memory IPC context
shm_ipc_context_t* create_shm_ipc(key_t key, size_t buffer_size,
size_t message_size, bool is_producer) {
shm_ipc_context_t *ctx = malloc(sizeof(shm_ipc_context_t));
if (!ctx) return NULL;
ctx->shm_key = key;
ctx->buffer_size = buffer_size;
ctx->message_size = message_size;
ctx->is_producer = is_producer;
// Ensure buffer size is power of 2
size_t actual_size = 1;
while (actual_size < buffer_size) {
actual_size <<= 1;
}
size_t total_size = sizeof(shm_ring_buffer_t) +
(actual_size * message_size);
// Create or attach to shared memory
ctx->shm_id = shmget(key, total_size,
is_producer ? (IPC_CREAT | 0666) : 0);
if (ctx->shm_id < 0) {
perror("Failed to create/attach shared memory");
free(ctx);
return NULL;
}
ctx->buffer = (shm_ring_buffer_t*)shmat(ctx->shm_id, NULL, 0);
if (ctx->buffer == (void*)-1) {
perror("Failed to attach shared memory");
free(ctx);
return NULL;
}
// Initialize buffer if producer
if (is_producer) {
atomic_store(&ctx->buffer->head, 0);
atomic_store(&ctx->buffer->tail, 0);
ctx->buffer->size = actual_size;
ctx->buffer->mask = actual_size - 1;
atomic_store(&ctx->buffer->producer_waiting, false);
atomic_store(&ctx->buffer->consumer_waiting, false);
atomic_store(&ctx->buffer->messages_sent, 0);
atomic_store(&ctx->buffer->messages_received, 0);
atomic_store(&ctx->buffer->buffer_overruns, 0);
ctx->buffer->producer_pid = getpid();
} else {
ctx->buffer->consumer_pid = getpid();
}
// Create semaphore for synchronization
ctx->sem_id = semget(key + 1, 2,
is_producer ? (IPC_CREAT | 0666) : 0);
if (ctx->sem_id < 0) {
perror("Failed to create/access semaphore");
shmdt(ctx->buffer);
free(ctx);
return NULL;
}
// Initialize semaphores if producer
if (is_producer) {
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
} sem_union;
sem_union.val = 0;
semctl(ctx->sem_id, 0, SETVAL, sem_union); // Producer semaphore
semctl(ctx->sem_id, 1, SETVAL, sem_union); // Consumer semaphore
}
return ctx;
}
// Send message through shared memory ring buffer
int shm_send_message(shm_ipc_context_t *ctx, const void *message) {
if (!ctx->is_producer) return -1;
shm_ring_buffer_t *buffer = ctx->buffer;
size_t head = atomic_load(&buffer->head);
size_t tail = atomic_load(&buffer->tail);
// Check if buffer is full
if (((head + 1) & buffer->mask) == tail) {
atomic_fetch_add(&buffer->buffer_overruns, 1);
// Optionally wait for space
atomic_store(&buffer->producer_waiting, true);
struct sembuf sem_op = {1, -1, 0}; // Wait on consumer semaphore
if (semop(ctx->sem_id, &sem_op, 1) < 0) {
return -1;
}
atomic_store(&buffer->producer_waiting, false);
// Recheck after waiting
head = atomic_load(&buffer->head);
tail = atomic_load(&buffer->tail);
if (((head + 1) & buffer->mask) == tail) {
return -1; // Still full
}
}
// Copy message to buffer
char *slot = buffer->data + (head * ctx->message_size);
memcpy(slot, message, ctx->message_size);
// Memory barrier to ensure write completes before updating head
atomic_thread_fence(memory_order_release);
// Update head pointer
atomic_store(&buffer->head, (head + 1) & buffer->mask);
atomic_fetch_add(&buffer->messages_sent, 1);
// Signal consumer if waiting
if (atomic_load(&buffer->consumer_waiting)) {
struct sembuf sem_op = {0, 1, 0}; // Signal producer semaphore
semop(ctx->sem_id, &sem_op, 1);
}
return 0;
}
// Receive message from shared memory ring buffer
int shm_receive_message(shm_ipc_context_t *ctx, void *message) {
if (ctx->is_producer) return -1;
shm_ring_buffer_t *buffer = ctx->buffer;
size_t head = atomic_load(&buffer->head);
size_t tail = atomic_load(&buffer->tail);
// Check if buffer is empty
if (head == tail) {
// Optionally wait for data
atomic_store(&buffer->consumer_waiting, true);
struct sembuf sem_op = {0, -1, 0}; // Wait on producer semaphore
if (semop(ctx->sem_id, &sem_op, 1) < 0) {
return -1;
}
atomic_store(&buffer->consumer_waiting, false);
// Recheck after waiting
head = atomic_load(&buffer->head);
tail = atomic_load(&buffer->tail);
if (head == tail) {
return -1; // Still empty
}
}
// Memory barrier to ensure read happens after head check
atomic_thread_fence(memory_order_acquire);
// Copy message from buffer
char *slot = buffer->data + (tail * ctx->message_size);
memcpy(message, slot, ctx->message_size);
// Update tail pointer
atomic_store(&buffer->tail, (tail + 1) & buffer->mask);
atomic_fetch_add(&buffer->messages_received, 1);
// Signal producer if waiting
if (atomic_load(&buffer->producer_waiting)) {
struct sembuf sem_op = {1, 1, 0}; // Signal consumer semaphore
semop(ctx->sem_id, &sem_op, 1);
}
return 0;
}
// Message queue with priority support
typedef struct {
mqd_t mq_descriptor;
char queue_name[64];
struct mq_attr attributes;
bool is_sender;
// Statistics
atomic_size_t messages_sent;
atomic_size_t messages_received;
atomic_size_t send_failures;
atomic_size_t receive_failures;
} priority_mq_context_t;
priority_mq_context_t* create_priority_mq(const char *name,
int max_messages,
int message_size,
bool is_sender) {
priority_mq_context_t *ctx = malloc(sizeof(priority_mq_context_t));
if (!ctx) return NULL;
snprintf(ctx->queue_name, sizeof(ctx->queue_name), "/%s", name);
ctx->is_sender = is_sender;
// Set queue attributes
ctx->attributes.mq_flags = 0;
ctx->attributes.mq_maxmsg = max_messages;
ctx->attributes.mq_msgsize = message_size;
ctx->attributes.mq_curmsgs = 0;
// Open message queue
int flags = is_sender ? (O_WRONLY | O_CREAT) : O_RDONLY;
ctx->mq_descriptor = mq_open(ctx->queue_name, flags, 0644, &ctx->attributes);
if (ctx->mq_descriptor == (mqd_t)-1) {
perror("Failed to open message queue");
free(ctx);
return NULL;
}
atomic_store(&ctx->messages_sent, 0);
atomic_store(&ctx->messages_received, 0);
atomic_store(&ctx->send_failures, 0);
atomic_store(&ctx->receive_failures, 0);
return ctx;
}
// Send priority message
int send_priority_message(priority_mq_context_t *ctx,
const void *message,
size_t message_size,
unsigned int priority) {
if (!ctx->is_sender) return -1;
if (mq_send(ctx->mq_descriptor, (const char*)message,
message_size, priority) < 0) {
atomic_fetch_add(&ctx->send_failures, 1);
return -1;
}
atomic_fetch_add(&ctx->messages_sent, 1);
return 0;
}
// Receive priority message
ssize_t receive_priority_message(priority_mq_context_t *ctx,
void *message,
size_t buffer_size,
unsigned int *priority) {
if (ctx->is_sender) return -1;
ssize_t bytes_received = mq_receive(ctx->mq_descriptor, (char*)message,
buffer_size, priority);
if (bytes_received < 0) {
atomic_fetch_add(&ctx->receive_failures, 1);
return -1;
}
atomic_fetch_add(&ctx->messages_received, 1);
return bytes_received;
}
Resource Control and Monitoring
Advanced resource control using cgroups and comprehensive monitoring provides the foundation for enterprise-grade process management.
Cgroups Integration and Resource Limits
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
typedef struct {
char *cgroup_path;
char *cgroup_name;
pid_t managed_pids[1024];
int num_pids;
// Resource limits
uint64_t memory_limit;
uint64_t cpu_shares;
uint64_t cpu_quota;
uint64_t cpu_period;
uint64_t blkio_weight;
// Current usage
uint64_t memory_usage;
uint64_t cpu_usage;
uint64_t blkio_usage;
// Monitoring
bool monitoring_enabled;
pthread_t monitor_thread;
int monitor_interval;
} cgroup_manager_t;
// Create and configure cgroup
cgroup_manager_t* create_cgroup(const char *cgroup_name) {
cgroup_manager_t *manager = calloc(1, sizeof(cgroup_manager_t));
if (!manager) return NULL;
manager->cgroup_name = strdup(cgroup_name);
manager->cgroup_path = malloc(256);
// Create cgroup directory structure
snprintf(manager->cgroup_path, 256, "/sys/fs/cgroup/%s", cgroup_name);
// Create memory cgroup
char path[512];
snprintf(path, sizeof(path), "/sys/fs/cgroup/memory/%s", cgroup_name);
mkdir(path, 0755);
// Create CPU cgroup
snprintf(path, sizeof(path), "/sys/fs/cgroup/cpu/%s", cgroup_name);
mkdir(path, 0755);
// Create blkio cgroup
snprintf(path, sizeof(path), "/sys/fs/cgroup/blkio/%s", cgroup_name);
mkdir(path, 0755);
// Set default values
manager->memory_limit = 1024 * 1024 * 1024; // 1GB
manager->cpu_shares = 1024; // Default shares
manager->cpu_quota = 100000; // 100ms
manager->cpu_period = 100000; // 100ms period
manager->blkio_weight = 500; // Default weight
return manager;
}
// Apply resource limits to cgroup
int apply_cgroup_limits(cgroup_manager_t *manager) {
char path[512];
char value[64];
int fd;
// Set memory limit
snprintf(path, sizeof(path),
"/sys/fs/cgroup/memory/%s/memory.limit_in_bytes",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
snprintf(value, sizeof(value), "%lu", manager->memory_limit);
write(fd, value, strlen(value));
close(fd);
}
// Set CPU shares
snprintf(path, sizeof(path),
"/sys/fs/cgroup/cpu/%s/cpu.shares",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
snprintf(value, sizeof(value), "%lu", manager->cpu_shares);
write(fd, value, strlen(value));
close(fd);
}
// Set CPU quota
snprintf(path, sizeof(path),
"/sys/fs/cgroup/cpu/%s/cpu.cfs_quota_us",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
snprintf(value, sizeof(value), "%lu", manager->cpu_quota);
write(fd, value, strlen(value));
close(fd);
}
// Set CPU period
snprintf(path, sizeof(path),
"/sys/fs/cgroup/cpu/%s/cpu.cfs_period_us",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
snprintf(value, sizeof(value), "%lu", manager->cpu_period);
write(fd, value, strlen(value));
close(fd);
}
// Set blkio weight
snprintf(path, sizeof(path),
"/sys/fs/cgroup/blkio/%s/blkio.weight",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
snprintf(value, sizeof(value), "%lu", manager->blkio_weight);
write(fd, value, strlen(value));
close(fd);
}
return 0;
}
// Add process to cgroup
int add_process_to_cgroup(cgroup_manager_t *manager, pid_t pid) {
if (manager->num_pids >= 1024) return -1;
char path[512];
char pid_str[32];
int fd;
snprintf(pid_str, sizeof(pid_str), "%d", pid);
// Add to memory cgroup
snprintf(path, sizeof(path),
"/sys/fs/cgroup/memory/%s/cgroup.procs",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
write(fd, pid_str, strlen(pid_str));
close(fd);
}
// Add to CPU cgroup
snprintf(path, sizeof(path),
"/sys/fs/cgroup/cpu/%s/cgroup.procs",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
write(fd, pid_str, strlen(pid_str));
close(fd);
}
// Add to blkio cgroup
snprintf(path, sizeof(path),
"/sys/fs/cgroup/blkio/%s/cgroup.procs",
manager->cgroup_name);
fd = open(path, O_WRONLY);
if (fd >= 0) {
write(fd, pid_str, strlen(pid_str));
close(fd);
}
manager->managed_pids[manager->num_pids++] = pid;
return 0;
}
// Monitor cgroup resource usage
void* cgroup_monitor_thread(void *arg) {
cgroup_manager_t *manager = (cgroup_manager_t*)arg;
char path[512];
char buffer[1024];
FILE *fp;
while (manager->monitoring_enabled) {
// Read memory usage
snprintf(path, sizeof(path),
"/sys/fs/cgroup/memory/%s/memory.usage_in_bytes",
manager->cgroup_name);
fp = fopen(path, "r");
if (fp) {
if (fgets(buffer, sizeof(buffer), fp)) {
manager->memory_usage = strtoull(buffer, NULL, 10);
}
fclose(fp);
}
// Read CPU usage
snprintf(path, sizeof(path),
"/sys/fs/cgroup/cpu/%s/cpuacct.usage",
manager->cgroup_name);
fp = fopen(path, "r");
if (fp) {
if (fgets(buffer, sizeof(buffer), fp)) {
manager->cpu_usage = strtoull(buffer, NULL, 10);
}
fclose(fp);
}
// Check for memory pressure
if (manager->memory_usage > manager->memory_limit * 0.9) {
printf("Warning: Memory usage approaching limit for cgroup %s\n",
manager->cgroup_name);
}
sleep(manager->monitor_interval);
}
return NULL;
}
// Start monitoring
void start_cgroup_monitoring(cgroup_manager_t *manager, int interval_seconds) {
manager->monitor_interval = interval_seconds;
manager->monitoring_enabled = true;
pthread_create(&manager->monitor_thread, NULL,
cgroup_monitor_thread, manager);
}
// Generate resource usage report
void generate_cgroup_report(cgroup_manager_t *manager) {
printf("=== Cgroup Resource Report ===\n");
printf("Cgroup: %s\n", manager->cgroup_name);
printf("Managed processes: %d\n", manager->num_pids);
printf("\n--- Memory ---\n");
printf("Limit: %lu bytes (%.2f MB)\n",
manager->memory_limit, manager->memory_limit / 1048576.0);
printf("Usage: %lu bytes (%.2f MB)\n",
manager->memory_usage, manager->memory_usage / 1048576.0);
printf("Utilization: %.1f%%\n",
100.0 * manager->memory_usage / manager->memory_limit);
printf("\n--- CPU ---\n");
printf("Shares: %lu\n", manager->cpu_shares);
printf("Quota: %lu us\n", manager->cpu_quota);
printf("Period: %lu us\n", manager->cpu_period);
printf("Usage: %lu ns\n", manager->cpu_usage);
printf("\n--- Block I/O ---\n");
printf("Weight: %lu\n", manager->blkio_weight);
printf("=============================\n");
}
Conclusion
Advanced process and thread management techniques provide the foundation for building robust, scalable enterprise systems. The comprehensive strategies presented in this guide demonstrate how to leverage Linux’s sophisticated process control mechanisms, from fine-grained resource management through cgroups to high-performance inter-process communication and adaptive thread pool management.
Key principles for successful implementation include understanding the underlying kernel mechanisms, implementing robust error handling and monitoring, optimizing for specific workload characteristics, and maintaining comprehensive observability. By combining these techniques with proper resource isolation, security controls, and performance monitoring, developers can create systems that efficiently manage complex multi-process architectures while maintaining predictable performance characteristics under varying load conditions.
The patterns and implementations shown here form the basis for building sophisticated process orchestration systems, container runtime environments, and high-performance computing clusters that can scale to meet enterprise demands while providing the reliability and observability required for production deployment.