Inter-process communication (IPC) is fundamental to building scalable enterprise systems. This comprehensive guide explores advanced IPC mechanisms, from high-performance shared memory implementations to sophisticated message queue architectures, enabling efficient communication between distributed system components.

Advanced Shared Memory Programming

Section 1: High-Performance Shared Memory Architecture

Shared memory provides the fastest form of IPC by allowing multiple processes to access the same memory region directly, eliminating data copying overhead.

Production-Grade Shared Memory Implementation

// shared_memory.c - Advanced shared memory management system
#include <sys/mman.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>

#define SHM_NAME_MAX 256
#define SHM_MAGIC 0xDEADBEEF
#define MAX_READERS 64
#define CACHE_LINE_SIZE 64

// Shared memory region header
struct shm_header {
    uint32_t magic;
    uint32_t version;
    size_t total_size;
    size_t data_size;
    pid_t creator_pid;
    time_t creation_time;
    
    // Reader-writer synchronization
    pthread_rwlock_t rwlock;
    pthread_rwlockattr_t rwlock_attr;
    
    // Statistics
    uint64_t read_count;
    uint64_t write_count;
    uint64_t reader_count;
    pid_t active_readers[MAX_READERS];
    
    // Data follows this header
    char data[0];
} __attribute__((aligned(CACHE_LINE_SIZE)));

// Shared memory control structure
struct shm_control {
    int fd;
    void *addr;
    size_t size;
    char name[SHM_NAME_MAX];
    struct shm_header *header;
    bool is_creator;
    bool is_mapped;
};

// Create shared memory region
struct shm_control *shm_create(const char *name, size_t data_size)
{
    struct shm_control *shm;
    size_t total_size;
    int fd;
    
    if (!name || data_size == 0) {
        errno = EINVAL;
        return NULL;
    }
    
    shm = calloc(1, sizeof(*shm));
    if (!shm) {
        return NULL;
    }
    
    // Calculate total size including header
    total_size = sizeof(struct shm_header) + data_size;
    total_size = (total_size + 4095) & ~4095;  // Page-align
    
    strncpy(shm->name, name, sizeof(shm->name) - 1);
    shm->size = total_size;
    
    // Create shared memory object
    fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, 0666);
    if (fd == -1) {
        if (errno == EEXIST) {
            // Try to open existing
            fd = shm_open(name, O_RDWR, 0666);
            if (fd == -1) {
                free(shm);
                return NULL;
            }
            shm->is_creator = false;
        } else {
            free(shm);
            return NULL;
        }
    } else {
        shm->is_creator = true;
        
        // Set size for new shared memory
        if (ftruncate(fd, total_size) == -1) {
            close(fd);
            shm_unlink(name);
            free(shm);
            return NULL;
        }
    }
    
    shm->fd = fd;
    
    // Map shared memory
    shm->addr = mmap(NULL, total_size, PROT_READ | PROT_WRITE, 
                     MAP_SHARED, fd, 0);
    if (shm->addr == MAP_FAILED) {
        close(fd);
        if (shm->is_creator) {
            shm_unlink(name);
        }
        free(shm);
        return NULL;
    }
    
    shm->header = (struct shm_header *)shm->addr;
    shm->is_mapped = true;
    
    // Initialize header if creator
    if (shm->is_creator) {
        memset(shm->header, 0, sizeof(*shm->header));
        shm->header->magic = SHM_MAGIC;
        shm->header->version = 1;
        shm->header->total_size = total_size;
        shm->header->data_size = data_size;
        shm->header->creator_pid = getpid();
        shm->header->creation_time = time(NULL);
        
        // Initialize rwlock with process-shared attribute
        pthread_rwlockattr_init(&shm->header->rwlock_attr);
        pthread_rwlockattr_setpshared(&shm->header->rwlock_attr, 
                                     PTHREAD_PROCESS_SHARED);
        pthread_rwlock_init(&shm->header->rwlock, &shm->header->rwlock_attr);
        
        // Ensure data is written to memory
        msync(shm->addr, sizeof(*shm->header), MS_SYNC);
    } else {
        // Validate existing shared memory
        if (shm->header->magic != SHM_MAGIC) {
            munmap(shm->addr, total_size);
            close(fd);
            free(shm);
            errno = EINVAL;
            return NULL;
        }
    }
    
    return shm;
}

// Attach to existing shared memory
struct shm_control *shm_attach(const char *name)
{
    struct shm_control *shm;
    struct stat sb;
    int fd;
    
    if (!name) {
        errno = EINVAL;
        return NULL;
    }
    
    shm = calloc(1, sizeof(*shm));
    if (!shm) {
        return NULL;
    }
    
    strncpy(shm->name, name, sizeof(shm->name) - 1);
    
    // Open existing shared memory
    fd = shm_open(name, O_RDWR, 0);
    if (fd == -1) {
        free(shm);
        return NULL;
    }
    
    // Get size
    if (fstat(fd, &sb) == -1) {
        close(fd);
        free(shm);
        return NULL;
    }
    
    shm->fd = fd;
    shm->size = sb.st_size;
    shm->is_creator = false;
    
    // Map shared memory
    shm->addr = mmap(NULL, shm->size, PROT_READ | PROT_WRITE,
                     MAP_SHARED, fd, 0);
    if (shm->addr == MAP_FAILED) {
        close(fd);
        free(shm);
        return NULL;
    }
    
    shm->header = (struct shm_header *)shm->addr;
    shm->is_mapped = true;
    
    // Validate shared memory
    if (shm->header->magic != SHM_MAGIC) {
        munmap(shm->addr, shm->size);
        close(fd);
        free(shm);
        errno = EINVAL;
        return NULL;
    }
    
    return shm;
}

// Read data from shared memory with reader-writer synchronization
ssize_t shm_read(struct shm_control *shm, void *buf, size_t count, off_t offset)
{
    ssize_t bytes_read;
    pid_t pid = getpid();
    int reader_slot = -1;
    
    if (!shm || !buf || !shm->is_mapped) {
        errno = EINVAL;
        return -1;
    }
    
    if (offset + count > shm->header->data_size) {
        errno = EINVAL;
        return -1;
    }
    
    // Acquire read lock
    if (pthread_rwlock_rdlock(&shm->header->rwlock) != 0) {
        return -1;
    }
    
    // Find available reader slot
    for (int i = 0; i < MAX_READERS; i++) {
        if (__atomic_compare_exchange_n(&shm->header->active_readers[i], 
                                       &(pid_t){0}, pid, false,
                                       __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
            reader_slot = i;
            break;
        }
    }
    
    // Update statistics
    __atomic_fetch_add(&shm->header->read_count, 1, __ATOMIC_RELAXED);
    __atomic_fetch_add(&shm->header->reader_count, 1, __ATOMIC_ACQUIRE);
    
    // Copy data
    memcpy(buf, shm->header->data + offset, count);
    bytes_read = count;
    
    // Release reader slot
    if (reader_slot >= 0) {
        __atomic_store_n(&shm->header->active_readers[reader_slot], 0, 
                         __ATOMIC_RELEASE);
    }
    
    __atomic_fetch_sub(&shm->header->reader_count, 1, __ATOMIC_RELEASE);
    
    // Release read lock
    pthread_rwlock_unlock(&shm->header->rwlock);
    
    return bytes_read;
}

// Write data to shared memory with synchronization
ssize_t shm_write(struct shm_control *shm, const void *buf, size_t count, off_t offset)
{
    ssize_t bytes_written;
    
    if (!shm || !buf || !shm->is_mapped) {
        errno = EINVAL;
        return -1;
    }
    
    if (offset + count > shm->header->data_size) {
        errno = EINVAL;
        return -1;
    }
    
    // Acquire write lock
    if (pthread_rwlock_wrlock(&shm->header->rwlock) != 0) {
        return -1;
    }
    
    // Update statistics
    __atomic_fetch_add(&shm->header->write_count, 1, __ATOMIC_RELAXED);
    
    // Copy data
    memcpy(shm->header->data + offset, buf, count);
    bytes_written = count;
    
    // Ensure data is written to memory
    msync(shm->header->data + offset, count, MS_ASYNC);
    
    // Release write lock
    pthread_rwlock_unlock(&shm->header->rwlock);
    
    return bytes_written;
}

// Get pointer to shared data for zero-copy access
void *shm_get_data_ptr(struct shm_control *shm)
{
    if (!shm || !shm->is_mapped) {
        return NULL;
    }
    
    return shm->header->data;
}

// Lock shared memory for exclusive access
int shm_lock_exclusive(struct shm_control *shm)
{
    if (!shm || !shm->is_mapped) {
        return -1;
    }
    
    return pthread_rwlock_wrlock(&shm->header->rwlock);
}

// Lock shared memory for shared access
int shm_lock_shared(struct shm_control *shm)
{
    if (!shm || !shm->is_mapped) {
        return -1;
    }
    
    return pthread_rwlock_rdlock(&shm->header->rwlock);
}

// Unlock shared memory
int shm_unlock(struct shm_control *shm)
{
    if (!shm || !shm->is_mapped) {
        return -1;
    }
    
    return pthread_rwlock_unlock(&shm->header->rwlock);
}

// Get shared memory statistics
void shm_get_stats(struct shm_control *shm, struct shm_stats *stats)
{
    if (!shm || !stats || !shm->is_mapped) {
        return;
    }
    
    stats->total_size = shm->header->total_size;
    stats->data_size = shm->header->data_size;
    stats->read_count = __atomic_load_n(&shm->header->read_count, __ATOMIC_RELAXED);
    stats->write_count = __atomic_load_n(&shm->header->write_count, __ATOMIC_RELAXED);
    stats->reader_count = __atomic_load_n(&shm->header->reader_count, __ATOMIC_ACQUIRE);
    stats->creator_pid = shm->header->creator_pid;
    stats->creation_time = shm->header->creation_time;
}

// Cleanup shared memory
void shm_destroy(struct shm_control *shm)
{
    if (!shm) {
        return;
    }
    
    if (shm->is_mapped) {
        munmap(shm->addr, shm->size);
    }
    
    if (shm->fd >= 0) {
        close(shm->fd);
    }
    
    if (shm->is_creator) {
        shm_unlink(shm->name);
    }
    
    free(shm);
}

Section 2: Lock-Free Ring Buffer for Shared Memory

Lock-free data structures in shared memory enable high-performance communication without blocking synchronization.

High-Performance Lock-Free Ring Buffer

// lockfree_ringbuffer.c - Lock-free ring buffer for shared memory IPC
#include <stdatomic.h>
#include <stdint.h>
#include <stdbool.h>
#include <string.h>

#define RING_BUFFER_MAGIC 0xFEEDFACE

// Lock-free ring buffer structure
struct lockfree_ringbuffer {
    uint32_t magic;
    size_t capacity;
    size_t mask;  // capacity - 1 (for power-of-2 sizes)
    
    // Cache-aligned atomic variables
    struct {
        atomic_size_t head;
        char padding1[CACHE_LINE_SIZE - sizeof(atomic_size_t)];
    };
    
    struct {
        atomic_size_t tail;
        char padding2[CACHE_LINE_SIZE - sizeof(atomic_size_t)];
    };
    
    // Statistics
    atomic_uint64_t enqueue_count;
    atomic_uint64_t dequeue_count;
    atomic_uint64_t enqueue_failures;
    atomic_uint64_t dequeue_failures;
    
    // Data buffer follows
    char data[0];
} __attribute__((aligned(CACHE_LINE_SIZE)));

// Message header for variable-length messages
struct message_header {
    uint32_t magic;
    uint32_t size;
    uint64_t sequence;
    uint64_t timestamp;
} __attribute__((packed));

#define MESSAGE_MAGIC 0xCAFEBABE

// Initialize ring buffer
int ringbuf_init(struct lockfree_ringbuffer *rb, size_t capacity)
{
    if (!rb || capacity == 0 || (capacity & (capacity - 1)) != 0) {
        return -1;  // Capacity must be power of 2
    }
    
    rb->magic = RING_BUFFER_MAGIC;
    rb->capacity = capacity;
    rb->mask = capacity - 1;
    
    atomic_init(&rb->head, 0);
    atomic_init(&rb->tail, 0);
    atomic_init(&rb->enqueue_count, 0);
    atomic_init(&rb->dequeue_count, 0);
    atomic_init(&rb->enqueue_failures, 0);
    atomic_init(&rb->dequeue_failures, 0);
    
    return 0;
}

// Calculate total size needed for ring buffer
size_t ringbuf_size(size_t capacity)
{
    return sizeof(struct lockfree_ringbuffer) + capacity;
}

// Enqueue data (producer)
int ringbuf_enqueue(struct lockfree_ringbuffer *rb, const void *data, size_t size)
{
    if (!rb || !data || size == 0 || rb->magic != RING_BUFFER_MAGIC) {
        return -1;
    }
    
    struct message_header header = {
        .magic = MESSAGE_MAGIC,
        .size = size,
        .sequence = atomic_load(&rb->enqueue_count),
        .timestamp = get_time_ns()
    };
    
    size_t total_size = sizeof(header) + size;
    size_t current_head = atomic_load_explicit(&rb->head, memory_order_relaxed);
    size_t current_tail = atomic_load_explicit(&rb->tail, memory_order_acquire);
    
    // Check if there's enough space
    size_t available = rb->capacity - (current_head - current_tail);
    if (available < total_size) {
        atomic_fetch_add(&rb->enqueue_failures, 1);
        return -1;  // Buffer full
    }
    
    // Write header
    size_t pos = current_head & rb->mask;
    if (pos + sizeof(header) <= rb->capacity) {
        memcpy(&rb->data[pos], &header, sizeof(header));
    } else {
        // Wrap around
        size_t first_part = rb->capacity - pos;
        memcpy(&rb->data[pos], &header, first_part);
        memcpy(&rb->data[0], (char *)&header + first_part, sizeof(header) - first_part);
    }
    
    // Write data
    pos = (current_head + sizeof(header)) & rb->mask;
    if (pos + size <= rb->capacity) {
        memcpy(&rb->data[pos], data, size);
    } else {
        // Wrap around
        size_t first_part = rb->capacity - pos;
        memcpy(&rb->data[pos], data, first_part);
        memcpy(&rb->data[0], (char *)data + first_part, size - first_part);
    }
    
    // Update head pointer
    atomic_store_explicit(&rb->head, current_head + total_size, memory_order_release);
    atomic_fetch_add(&rb->enqueue_count, 1);
    
    return 0;
}

// Dequeue data (consumer)
int ringbuf_dequeue(struct lockfree_ringbuffer *rb, void *data, size_t *size)
{
    if (!rb || !data || !size || rb->magic != RING_BUFFER_MAGIC) {
        return -1;
    }
    
    size_t current_tail = atomic_load_explicit(&rb->tail, memory_order_relaxed);
    size_t current_head = atomic_load_explicit(&rb->head, memory_order_acquire);
    
    // Check if data is available
    if (current_tail == current_head) {
        atomic_fetch_add(&rb->dequeue_failures, 1);
        return -1;  // Buffer empty
    }
    
    // Read message header
    struct message_header header;
    size_t pos = current_tail & rb->mask;
    
    if (pos + sizeof(header) <= rb->capacity) {
        memcpy(&header, &rb->data[pos], sizeof(header));
    } else {
        // Wrap around
        size_t first_part = rb->capacity - pos;
        memcpy(&header, &rb->data[pos], first_part);
        memcpy((char *)&header + first_part, &rb->data[0], sizeof(header) - first_part);
    }
    
    // Validate header
    if (header.magic != MESSAGE_MAGIC) {
        atomic_fetch_add(&rb->dequeue_failures, 1);
        return -1;  // Corrupted data
    }
    
    // Check if caller's buffer is large enough
    if (*size < header.size) {
        *size = header.size;
        return -1;  // Buffer too small
    }
    
    // Read data
    pos = (current_tail + sizeof(header)) & rb->mask;
    if (pos + header.size <= rb->capacity) {
        memcpy(data, &rb->data[pos], header.size);
    } else {
        // Wrap around
        size_t first_part = rb->capacity - pos;
        memcpy(data, &rb->data[pos], first_part);
        memcpy((char *)data + first_part, &rb->data[0], header.size - first_part);
    }
    
    *size = header.size;
    
    // Update tail pointer
    size_t total_size = sizeof(header) + header.size;
    atomic_store_explicit(&rb->tail, current_tail + total_size, memory_order_release);
    atomic_fetch_add(&rb->dequeue_count, 1);
    
    return 0;
}

// Peek at next message without removing it
int ringbuf_peek(struct lockfree_ringbuffer *rb, void *data, size_t *size)
{
    if (!rb || !data || !size || rb->magic != RING_BUFFER_MAGIC) {
        return -1;
    }
    
    size_t current_tail = atomic_load_explicit(&rb->tail, memory_order_relaxed);
    size_t current_head = atomic_load_explicit(&rb->head, memory_order_acquire);
    
    // Check if data is available
    if (current_tail == current_head) {
        return -1;  // Buffer empty
    }
    
    // Read message header
    struct message_header header;
    size_t pos = current_tail & rb->mask;
    
    if (pos + sizeof(header) <= rb->capacity) {
        memcpy(&header, &rb->data[pos], sizeof(header));
    } else {
        // Wrap around
        size_t first_part = rb->capacity - pos;
        memcpy(&header, &rb->data[pos], first_part);
        memcpy((char *)&header + first_part, &rb->data[0], sizeof(header) - first_part);
    }
    
    // Validate header
    if (header.magic != MESSAGE_MAGIC) {
        return -1;  // Corrupted data
    }
    
    // Check if caller's buffer is large enough
    if (*size < header.size) {
        *size = header.size;
        return -1;  // Buffer too small
    }
    
    // Read data
    pos = (current_tail + sizeof(header)) & rb->mask;
    if (pos + header.size <= rb->capacity) {
        memcpy(data, &rb->data[pos], header.size);
    } else {
        // Wrap around
        size_t first_part = rb->capacity - pos;
        memcpy(data, &rb->data[pos], first_part);
        memcpy((char *)data + first_part, &rb->data[0], header.size - first_part);
    }
    
    *size = header.size;
    return 0;
}

// Get ring buffer statistics
void ringbuf_stats(struct lockfree_ringbuffer *rb, struct ringbuf_stats *stats)
{
    if (!rb || !stats || rb->magic != RING_BUFFER_MAGIC) {
        return;
    }
    
    size_t head = atomic_load(&rb->head);
    size_t tail = atomic_load(&rb->tail);
    
    stats->capacity = rb->capacity;
    stats->used_bytes = head - tail;
    stats->free_bytes = rb->capacity - stats->used_bytes;
    stats->enqueue_count = atomic_load(&rb->enqueue_count);
    stats->dequeue_count = atomic_load(&rb->dequeue_count);
    stats->enqueue_failures = atomic_load(&rb->enqueue_failures);
    stats->dequeue_failures = atomic_load(&rb->dequeue_failures);
}

// Check if ring buffer is empty
bool ringbuf_is_empty(struct lockfree_ringbuffer *rb)
{
    if (!rb || rb->magic != RING_BUFFER_MAGIC) {
        return true;
    }
    
    size_t head = atomic_load_explicit(&rb->head, memory_order_acquire);
    size_t tail = atomic_load_explicit(&rb->tail, memory_order_relaxed);
    
    return head == tail;
}

// Check if ring buffer is full
bool ringbuf_is_full(struct lockfree_ringbuffer *rb)
{
    if (!rb || rb->magic != RING_BUFFER_MAGIC) {
        return true;
    }
    
    size_t head = atomic_load_explicit(&rb->head, memory_order_relaxed);
    size_t tail = atomic_load_explicit(&rb->tail, memory_order_acquire);
    
    return (head - tail) >= rb->capacity;
}

Advanced Message Queue Systems

Section 3: POSIX Message Queues with Priority Handling

POSIX message queues provide reliable, priority-based message delivery between processes with built-in blocking and non-blocking semantics.

Enterprise Message Queue Implementation

// message_queue.c - Advanced POSIX message queue wrapper
#include <mqueue.h>
#include <sys/stat.h>
#include <fcntl.h>

#define MQ_NAME_MAX 256
#define MQ_DEFAULT_MAXMSG 100
#define MQ_DEFAULT_MSGSIZE 8192

// Message queue configuration
struct mq_config {
    char name[MQ_NAME_MAX];
    long maxmsg;
    long msgsize;
    int flags;  // O_CREAT, O_EXCL, O_RDONLY, O_WRONLY, O_RDWR, O_NONBLOCK
    mode_t mode;
};

// Message queue control structure
struct mq_control {
    mqd_t mqd;
    struct mq_config config;
    struct mq_attr attr;
    bool is_creator;
    
    // Statistics
    uint64_t sent_count;
    uint64_t received_count;
    uint64_t send_errors;
    uint64_t receive_errors;
    
    // Timeouts
    struct timespec send_timeout;
    struct timespec receive_timeout;
};

// Create or open message queue
struct mq_control *mq_create(const struct mq_config *config)
{
    struct mq_control *mq;
    struct mq_attr attr;
    
    if (!config || !config->name[0]) {
        errno = EINVAL;
        return NULL;
    }
    
    mq = calloc(1, sizeof(*mq));
    if (!mq) {
        return NULL;
    }
    
    // Copy configuration
    mq->config = *config;
    
    // Set default values if not specified
    if (mq->config.maxmsg <= 0) {
        mq->config.maxmsg = MQ_DEFAULT_MAXMSG;
    }
    if (mq->config.msgsize <= 0) {
        mq->config.msgsize = MQ_DEFAULT_MSGSIZE;
    }
    if (mq->config.mode == 0) {
        mq->config.mode = 0666;
    }
    
    // Set up attributes
    attr.mq_flags = 0;
    attr.mq_maxmsg = mq->config.maxmsg;
    attr.mq_msgsize = mq->config.msgsize;
    attr.mq_curmsgs = 0;
    
    // Try to create the message queue
    mq->mqd = mq_open(mq->config.name, mq->config.flags | O_CREAT | O_EXCL,
                      mq->config.mode, &attr);
    
    if (mq->mqd == (mqd_t)-1) {
        if (errno == EEXIST && !(mq->config.flags & O_EXCL)) {
            // Queue exists, try to open it
            mq->mqd = mq_open(mq->config.name, mq->config.flags & ~O_CREAT,
                             mq->config.mode, NULL);
            if (mq->mqd == (mqd_t)-1) {
                free(mq);
                return NULL;
            }
            mq->is_creator = false;
        } else {
            free(mq);
            return NULL;
        }
    } else {
        mq->is_creator = true;
    }
    
    // Get actual attributes
    if (mq_getattr(mq->mqd, &mq->attr) == -1) {
        mq_close(mq->mqd);
        if (mq->is_creator) {
            mq_unlink(mq->config.name);
        }
        free(mq);
        return NULL;
    }
    
    // Set default timeouts (1 second)
    mq->send_timeout.tv_sec = 1;
    mq->send_timeout.tv_nsec = 0;
    mq->receive_timeout.tv_sec = 1;
    mq->receive_timeout.tv_nsec = 0;
    
    return mq;
}

// Send message with priority
int mq_send_msg(struct mq_control *mq, const void *msg, size_t len, 
                unsigned int priority)
{
    int ret;
    
    if (!mq || !msg || len > mq->attr.mq_msgsize) {
        errno = EINVAL;
        return -1;
    }
    
    ret = mq_send(mq->mqd, msg, len, priority);
    if (ret == 0) {
        mq->sent_count++;
    } else {
        mq->send_errors++;
    }
    
    return ret;
}

// Send message with timeout
int mq_send_msg_timed(struct mq_control *mq, const void *msg, size_t len,
                      unsigned int priority, const struct timespec *timeout)
{
    int ret;
    struct timespec abs_timeout;
    
    if (!mq || !msg || len > mq->attr.mq_msgsize) {
        errno = EINVAL;
        return -1;
    }
    
    // Calculate absolute timeout
    if (timeout) {
        clock_gettime(CLOCK_REALTIME, &abs_timeout);
        abs_timeout.tv_sec += timeout->tv_sec;
        abs_timeout.tv_nsec += timeout->tv_nsec;
        if (abs_timeout.tv_nsec >= 1000000000) {
            abs_timeout.tv_sec++;
            abs_timeout.tv_nsec -= 1000000000;
        }
    } else {
        clock_gettime(CLOCK_REALTIME, &abs_timeout);
        abs_timeout.tv_sec += mq->send_timeout.tv_sec;
        abs_timeout.tv_nsec += mq->send_timeout.tv_nsec;
        if (abs_timeout.tv_nsec >= 1000000000) {
            abs_timeout.tv_sec++;
            abs_timeout.tv_nsec -= 1000000000;
        }
    }
    
    ret = mq_timedsend(mq->mqd, msg, len, priority, &abs_timeout);
    if (ret == 0) {
        mq->sent_count++;
    } else {
        mq->send_errors++;
    }
    
    return ret;
}

// Receive message
ssize_t mq_receive_msg(struct mq_control *mq, void *msg, size_t len,
                       unsigned int *priority)
{
    ssize_t ret;
    
    if (!mq || !msg || len < mq->attr.mq_msgsize) {
        errno = EINVAL;
        return -1;
    }
    
    ret = mq_receive(mq->mqd, msg, len, priority);
    if (ret >= 0) {
        mq->received_count++;
    } else {
        mq->receive_errors++;
    }
    
    return ret;
}

// Receive message with timeout
ssize_t mq_receive_msg_timed(struct mq_control *mq, void *msg, size_t len,
                             unsigned int *priority, const struct timespec *timeout)
{
    ssize_t ret;
    struct timespec abs_timeout;
    
    if (!mq || !msg || len < mq->attr.mq_msgsize) {
        errno = EINVAL;
        return -1;
    }
    
    // Calculate absolute timeout
    if (timeout) {
        clock_gettime(CLOCK_REALTIME, &abs_timeout);
        abs_timeout.tv_sec += timeout->tv_sec;
        abs_timeout.tv_nsec += timeout->tv_nsec;
        if (abs_timeout.tv_nsec >= 1000000000) {
            abs_timeout.tv_sec++;
            abs_timeout.tv_nsec -= 1000000000;
        }
    } else {
        clock_gettime(CLOCK_REALTIME, &abs_timeout);
        abs_timeout.tv_sec += mq->receive_timeout.tv_sec;
        abs_timeout.tv_nsec += mq->receive_timeout.tv_nsec;
        if (abs_timeout.tv_nsec >= 1000000000) {
            abs_timeout.tv_sec++;
            abs_timeout.tv_nsec -= 1000000000;
        }
    }
    
    ret = mq_timedreceive(mq->mqd, msg, len, priority, &abs_timeout);
    if (ret >= 0) {
        mq->received_count++;
    } else {
        mq->receive_errors++;
    }
    
    return ret;
}

// Set message queue attributes
int mq_set_nonblock(struct mq_control *mq, bool nonblock)
{
    struct mq_attr attr;
    
    if (!mq) {
        errno = EINVAL;
        return -1;
    }
    
    attr = mq->attr;
    if (nonblock) {
        attr.mq_flags |= O_NONBLOCK;
    } else {
        attr.mq_flags &= ~O_NONBLOCK;
    }
    
    if (mq_setattr(mq->mqd, &attr, &mq->attr) == -1) {
        return -1;
    }
    
    return 0;
}

// Get message queue statistics
void mq_get_stats(struct mq_control *mq, struct mq_stats *stats)
{
    if (!mq || !stats) {
        return;
    }
    
    // Update current attributes
    mq_getattr(mq->mqd, &mq->attr);
    
    stats->maxmsg = mq->attr.mq_maxmsg;
    stats->msgsize = mq->attr.mq_msgsize;
    stats->curmsgs = mq->attr.mq_curmsgs;
    stats->sent_count = mq->sent_count;
    stats->received_count = mq->received_count;
    stats->send_errors = mq->send_errors;
    stats->receive_errors = mq->receive_errors;
}

// Cleanup message queue
void mq_destroy(struct mq_control *mq)
{
    if (!mq) {
        return;
    }
    
    if (mq->mqd != (mqd_t)-1) {
        mq_close(mq->mqd);
    }
    
    if (mq->is_creator) {
        mq_unlink(mq->config.name);
    }
    
    free(mq);
}

This comprehensive IPC guide demonstrates advanced techniques for building high-performance inter-process communication systems. The implementations cover shared memory with reader-writer synchronization, lock-free ring buffers, and sophisticated message queue architectures. These patterns enable efficient, scalable communication between enterprise system components while maintaining data integrity and optimal performance characteristics.