Advanced Linux Real-Time Systems Programming: Building Deterministic and Low-Latency Applications
Advanced Linux real-time systems programming requires deep understanding of timing constraints, deterministic behavior, and low-latency design principles. This comprehensive guide explores building hard real-time applications using PREEMPT_RT, implementing custom schedulers, and developing industrial-grade control systems that meet strict timing requirements.
Advanced Linux Real-Time Systems Programming
Real-Time Task Scheduler and Priority Management
Advanced Real-Time Scheduling Framework
// rt_scheduler.c - Advanced real-time scheduling framework
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sched.h>
#include <pthread.h>
#include <signal.h>
#include <time.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/syscall.h>
#include <linux/sched.h>
#include <stdatomic.h>
#define MAX_RT_TASKS 256
#define MAX_PRIORITY_LEVELS 100
#define NSEC_PER_SEC 1000000000LL
#define NSEC_PER_MSEC 1000000LL
#define NSEC_PER_USEC 1000LL
// Real-time task types
typedef enum {
RT_TASK_PERIODIC,
RT_TASK_SPORADIC,
RT_TASK_APERIODIC,
RT_TASK_INTERRUPT_HANDLER
} rt_task_type_t;
// Scheduling algorithms
typedef enum {
RT_SCHED_RATE_MONOTONIC,
RT_SCHED_EARLIEST_DEADLINE_FIRST,
RT_SCHED_LEAST_LAXITY_FIRST,
RT_SCHED_DEADLINE_MONOTONIC,
RT_SCHED_FIXED_PRIORITY
} rt_sched_algorithm_t;
// Task timing constraints
typedef struct {
uint64_t period_ns; // Task period in nanoseconds
uint64_t deadline_ns; // Relative deadline
uint64_t wcet_ns; // Worst-case execution time
uint64_t bcet_ns; // Best-case execution time
uint64_t jitter_ns; // Maximum allowed jitter
uint64_t offset_ns; // Phase offset
} rt_timing_constraints_t;
// Task execution statistics
typedef struct {
uint64_t total_executions;
uint64_t deadline_misses;
uint64_t execution_time_min;
uint64_t execution_time_max;
uint64_t execution_time_avg;
uint64_t jitter_max;
uint64_t response_time_max;
uint64_t preemptions;
uint64_t context_switches;
} rt_execution_stats_t;
// Real-time task control block
typedef struct rt_task {
int task_id;
char name[64];
rt_task_type_t type;
// Timing constraints
rt_timing_constraints_t constraints;
// Scheduling parameters
int priority;
int nice_value;
int cpu_affinity;
struct sched_attr sched_attr;
// Thread management
pthread_t thread;
pthread_attr_t thread_attr;
bool active;
bool suspended;
// Timing control
struct timespec next_activation;
struct timespec absolute_deadline;
uint64_t remaining_budget;
// Task function
void* (*task_function)(void* arg);
void* task_arg;
// Synchronization
pthread_mutex_t mutex;
pthread_cond_t condition;
// Statistics
rt_execution_stats_t stats;
// Memory regions
void* stack_base;
size_t stack_size;
bool stack_locked;
struct rt_task* next;
struct rt_task* prev;
} rt_task_t;
// Real-time scheduler context
typedef struct {
rt_sched_algorithm_t algorithm;
// Task management
rt_task_t* tasks[MAX_RT_TASKS];
int num_tasks;
pthread_rwlock_t task_lock;
// Ready queues for different priority levels
rt_task_t* ready_queues[MAX_PRIORITY_LEVELS];
atomic_uint64_t ready_mask; // Bitmap of non-empty queues
// Scheduler thread
pthread_t scheduler_thread;
bool scheduler_running;
// Timing management
struct timespec system_start_time;
clockid_t clock_id;
timer_t scheduler_timer;
uint64_t tick_period_ns;
// CPU management
cpu_set_t rt_cpu_set;
int num_rt_cpus;
// Global statistics
struct {
uint64_t total_schedule_calls;
uint64_t context_switches;
uint64_t preemptions;
uint64_t timer_overruns;
uint64_t deadline_misses;
double cpu_utilization;
uint64_t worst_case_latency;
} global_stats;
// Configuration
struct {
bool enable_deadline_enforcement;
bool enable_budget_enforcement;
bool enable_priority_inheritance;
bool enable_load_balancing;
uint64_t scheduler_overhead_ns;
double max_cpu_utilization;
} config;
} rt_scheduler_t;
static rt_scheduler_t rt_sched = {0};
// Utility functions
static inline uint64_t timespec_to_ns(const struct timespec* ts)
{
return ts->tv_sec * NSEC_PER_SEC + ts->tv_nsec;
}
static inline void ns_to_timespec(uint64_t ns, struct timespec* ts)
{
ts->tv_sec = ns / NSEC_PER_SEC;
ts->tv_nsec = ns % NSEC_PER_SEC;
}
static inline void timespec_add_ns(struct timespec* ts, uint64_t ns)
{
uint64_t total_ns = timespec_to_ns(ts) + ns;
ns_to_timespec(total_ns, ts);
}
static inline int timespec_compare(const struct timespec* a, const struct timespec* b)
{
if (a->tv_sec < b->tv_sec) return -1;
if (a->tv_sec > b->tv_sec) return 1;
if (a->tv_nsec < b->tv_nsec) return -1;
if (a->tv_nsec > b->tv_nsec) return 1;
return 0;
}
static inline uint64_t get_monotonic_time_ns(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return timespec_to_ns(&ts);
}
// Real-time system configuration
static int configure_rt_system(void)
{
// Check if running on PREEMPT_RT kernel
FILE* fp = fopen("/sys/kernel/realtime", "r");
if (fp) {
int rt_enabled;
if (fscanf(fp, "%d", &rt_enabled) == 1 && rt_enabled) {
printf("PREEMPT_RT kernel detected\n");
} else {
printf("Warning: Not running on PREEMPT_RT kernel\n");
}
fclose(fp);
}
// Lock all current and future memory
if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) {
perror("mlockall");
return -1;
}
// Set process priority
struct sched_param param;
param.sched_priority = 99; // Maximum real-time priority
if (sched_setscheduler(0, SCHED_FIFO, ¶m) != 0) {
perror("sched_setscheduler");
return -1;
}
// Disable swap
system("swapoff -a 2>/dev/null"); // Best effort
// Configure CPU isolation for real-time tasks
CPU_ZERO(&rt_sched.rt_cpu_set);
// Try to use isolated CPUs if available
fp = fopen("/sys/devices/system/cpu/isolated", "r");
if (fp) {
char line[256];
if (fgets(line, sizeof(line), fp)) {
// Parse isolated CPU list (simplified parser)
char* token = strtok(line, ",");
while (token) {
int cpu = atoi(token);
if (cpu >= 0 && cpu < CPU_SETSIZE) {
CPU_SET(cpu, &rt_sched.rt_cpu_set);
rt_sched.num_rt_cpus++;
}
token = strtok(NULL, ",");
}
}
fclose(fp);
}
// If no isolated CPUs, use all available CPUs
if (rt_sched.num_rt_cpus == 0) {
int num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
for (int i = 0; i < num_cpus; i++) {
CPU_SET(i, &rt_sched.rt_cpu_set);
}
rt_sched.num_rt_cpus = num_cpus;
}
printf("Configured %d CPUs for real-time tasks\n", rt_sched.num_rt_cpus);
// Set CPU affinity for this process
if (sched_setaffinity(0, sizeof(rt_sched.rt_cpu_set), &rt_sched.rt_cpu_set) != 0) {
perror("sched_setaffinity");
}
return 0;
}
// Task priority calculation based on scheduling algorithm
static int calculate_task_priority(rt_task_t* task)
{
switch (rt_sched.algorithm) {
case RT_SCHED_RATE_MONOTONIC:
// Higher frequency (shorter period) = higher priority
return MAX_PRIORITY_LEVELS - (int)(task->constraints.period_ns / NSEC_PER_MSEC);
case RT_SCHED_DEADLINE_MONOTONIC:
// Shorter deadline = higher priority
return MAX_PRIORITY_LEVELS - (int)(task->constraints.deadline_ns / NSEC_PER_MSEC);
case RT_SCHED_EARLIEST_DEADLINE_FIRST:
// Dynamic priority based on absolute deadline
uint64_t current_time = get_monotonic_time_ns();
uint64_t deadline_ns = timespec_to_ns(&task->absolute_deadline);
return MAX_PRIORITY_LEVELS - (int)((deadline_ns - current_time) / NSEC_PER_MSEC);
case RT_SCHED_FIXED_PRIORITY:
return task->priority;
default:
return 50; // Default priority
}
}
// Schedulability analysis
static bool analyze_schedulability(void)
{
double total_utilization = 0.0;
for (int i = 0; i < rt_sched.num_tasks; i++) {
rt_task_t* task = rt_sched.tasks[i];
if (!task || task->type != RT_TASK_PERIODIC) continue;
double utilization = (double)task->constraints.wcet_ns / task->constraints.period_ns;
total_utilization += utilization;
printf("Task %s: Period=%lu ms, WCET=%lu ms, Utilization=%.3f\n",
task->name,
task->constraints.period_ns / NSEC_PER_MSEC,
task->constraints.wcet_ns / NSEC_PER_MSEC,
utilization);
}
printf("Total CPU utilization: %.3f\n", total_utilization);
// Rate Monotonic schedulability test
if (rt_sched.algorithm == RT_SCHED_RATE_MONOTONIC) {
int n = rt_sched.num_tasks;
double bound = n * (pow(2.0, 1.0/n) - 1.0);
printf("RM schedulability bound: %.3f\n", bound);
if (total_utilization <= bound) {
printf("System is schedulable by RM test\n");
return true;
} else {
printf("Warning: System may not be schedulable by RM test\n");
}
}
// General utilization bound
if (total_utilization <= rt_sched.config.max_cpu_utilization) {
printf("System utilization within configured limits\n");
return true;
} else {
printf("Error: System utilization exceeds limits\n");
return false;
}
}
// Task creation and management
static rt_task_t* create_rt_task(const char* name, rt_task_type_t type,
const rt_timing_constraints_t* constraints,
void* (*task_function)(void*), void* arg)
{
if (rt_sched.num_tasks >= MAX_RT_TASKS) {
printf("Error: Maximum number of RT tasks reached\n");
return NULL;
}
rt_task_t* task = malloc(sizeof(rt_task_t));
if (!task) {
perror("malloc");
return NULL;
}
memset(task, 0, sizeof(*task));
task->task_id = rt_sched.num_tasks;
strncpy(task->name, name, sizeof(task->name) - 1);
task->type = type;
task->constraints = *constraints;
task->task_function = task_function;
task->task_arg = arg;
// Calculate priority
task->priority = calculate_task_priority(task);
// Initialize synchronization objects
pthread_mutex_init(&task->mutex, NULL);
pthread_cond_init(&task->condition, NULL);
// Allocate and lock stack memory
task->stack_size = 8 * 1024 * 1024; // 8MB stack
task->stack_base = mmap(NULL, task->stack_size,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | MAP_STACK,
-1, 0);
if (task->stack_base == MAP_FAILED) {
perror("mmap stack");
free(task);
return NULL;
}
if (mlock(task->stack_base, task->stack_size) == 0) {
task->stack_locked = true;
}
// Configure thread attributes
pthread_attr_init(&task->thread_attr);
pthread_attr_setstack(&task->thread_attr, task->stack_base, task->stack_size);
pthread_attr_setschedpolicy(&task->thread_attr, SCHED_FIFO);
struct sched_param sched_param;
sched_param.sched_priority = task->priority;
pthread_attr_setschedparam(&task->thread_attr, &sched_param);
pthread_attr_setinheritsched(&task->thread_attr, PTHREAD_EXPLICIT_SCHED);
// Set CPU affinity
pthread_attr_setaffinity_np(&task->thread_attr, sizeof(rt_sched.rt_cpu_set),
&rt_sched.rt_cpu_set);
// Initialize timing
clock_gettime(CLOCK_MONOTONIC, &task->next_activation);
if (task->constraints.offset_ns > 0) {
timespec_add_ns(&task->next_activation, task->constraints.offset_ns);
}
rt_sched.tasks[rt_sched.num_tasks] = task;
rt_sched.num_tasks++;
printf("Created RT task '%s': Priority=%d, Period=%lu ms, WCET=%lu ms\n",
name, task->priority,
constraints->period_ns / NSEC_PER_MSEC,
constraints->wcet_ns / NSEC_PER_MSEC);
return task;
}
static void destroy_rt_task(rt_task_t* task)
{
if (!task) return;
// Stop task if running
if (task->active) {
task->active = false;
pthread_join(task->thread, NULL);
}
// Cleanup synchronization objects
pthread_mutex_destroy(&task->mutex);
pthread_cond_destroy(&task->condition);
pthread_attr_destroy(&task->thread_attr);
// Unlock and free stack
if (task->stack_locked) {
munlock(task->stack_base, task->stack_size);
}
munmap(task->stack_base, task->stack_size);
free(task);
}
// Real-time task execution wrapper
static void* rt_task_wrapper(void* arg)
{
rt_task_t* task = (rt_task_t*)arg;
struct timespec start_time, end_time;
uint64_t execution_time;
printf("RT task '%s' started\n", task->name);
while (task->active) {
// Wait for next activation time
pthread_mutex_lock(&task->mutex);
struct timespec current_time;
clock_gettime(CLOCK_MONOTONIC, ¤t_time);
if (timespec_compare(¤t_time, &task->next_activation) < 0) {
// Sleep until next activation
pthread_cond_timedwait(&task->condition, &task->mutex, &task->next_activation);
}
// Check for deadline miss
if (task->type == RT_TASK_PERIODIC) {
if (timespec_compare(¤t_time, &task->absolute_deadline) > 0) {
task->stats.deadline_misses++;
printf("DEADLINE MISS: Task '%s' at time %ld.%09ld\n",
task->name, current_time.tv_sec, current_time.tv_nsec);
}
}
pthread_mutex_unlock(&task->mutex);
if (!task->active) break;
// Record execution start time
clock_gettime(CLOCK_MONOTONIC, &start_time);
// Execute task function
if (task->task_function) {
task->task_function(task->task_arg);
}
// Record execution end time
clock_gettime(CLOCK_MONOTONIC, &end_time);
// Update statistics
execution_time = timespec_to_ns(&end_time) - timespec_to_ns(&start_time);
task->stats.total_executions++;
if (task->stats.total_executions == 1) {
task->stats.execution_time_min = execution_time;
task->stats.execution_time_max = execution_time;
task->stats.execution_time_avg = execution_time;
} else {
if (execution_time < task->stats.execution_time_min) {
task->stats.execution_time_min = execution_time;
}
if (execution_time > task->stats.execution_time_max) {
task->stats.execution_time_max = execution_time;
}
// Update average using exponential moving average
task->stats.execution_time_avg =
(task->stats.execution_time_avg * 0.9) + (execution_time * 0.1);
}
// Check for WCET violation
if (execution_time > task->constraints.wcet_ns) {
printf("WCET VIOLATION: Task '%s' executed for %lu ns (WCET: %lu ns)\n",
task->name, execution_time, task->constraints.wcet_ns);
}
// Calculate next activation time for periodic tasks
if (task->type == RT_TASK_PERIODIC) {
timespec_add_ns(&task->next_activation, task->constraints.period_ns);
task->absolute_deadline = task->next_activation;
timespec_add_ns(&task->absolute_deadline, task->constraints.deadline_ns);
}
}
printf("RT task '%s' stopped\n", task->name);
return NULL;
}
// Scheduler implementation
static void* scheduler_thread(void* arg)
{
printf("Real-time scheduler started\n");
struct timespec next_tick;
clock_gettime(CLOCK_MONOTONIC, &next_tick);
while (rt_sched.scheduler_running) {
timespec_add_ns(&next_tick, rt_sched.tick_period_ns);
// Update task priorities for dynamic algorithms
if (rt_sched.algorithm == RT_SCHED_EARLIEST_DEADLINE_FIRST) {
pthread_rwlock_wrlock(&rt_sched.task_lock);
for (int i = 0; i < rt_sched.num_tasks; i++) {
rt_task_t* task = rt_sched.tasks[i];
if (task && task->active) {
int new_priority = calculate_task_priority(task);
if (new_priority != task->priority) {
task->priority = new_priority;
// Update thread priority
struct sched_param param;
param.sched_priority = new_priority;
pthread_setschedparam(task->thread, SCHED_FIFO, ¶m);
}
}
}
pthread_rwlock_unlock(&rt_sched.task_lock);
}
rt_sched.global_stats.total_schedule_calls++;
// Sleep until next tick
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_tick, NULL);
}
printf("Real-time scheduler stopped\n");
return NULL;
}
// Task activation and control
static int activate_rt_task(rt_task_t* task)
{
if (!task || task->active) {
return -1;
}
task->active = true;
if (pthread_create(&task->thread, &task->thread_attr, rt_task_wrapper, task) != 0) {
perror("pthread_create");
task->active = false;
return -1;
}
printf("Activated RT task '%s'\n", task->name);
return 0;
}
static int deactivate_rt_task(rt_task_t* task)
{
if (!task || !task->active) {
return -1;
}
task->active = false;
// Signal task to wake up and exit
pthread_mutex_lock(&task->mutex);
pthread_cond_signal(&task->condition);
pthread_mutex_unlock(&task->mutex);
pthread_join(task->thread, NULL);
printf("Deactivated RT task '%s'\n", task->name);
return 0;
}
// Periodic timer for task activation
static void timer_handler(int sig, siginfo_t* si, void* uc)
{
// Timer signal handler - minimal work here
rt_sched.global_stats.timer_overruns += timer_getoverrun(rt_sched.scheduler_timer);
}
// Deadline enforcement
static void* deadline_monitor_thread(void* arg)
{
while (rt_sched.scheduler_running) {
struct timespec current_time;
clock_gettime(CLOCK_MONOTONIC, ¤t_time);
pthread_rwlock_rdlock(&rt_sched.task_lock);
for (int i = 0; i < rt_sched.num_tasks; i++) {
rt_task_t* task = rt_sched.tasks[i];
if (!task || !task->active) continue;
// Check for deadline violations
if (timespec_compare(¤t_time, &task->absolute_deadline) > 0) {
if (rt_sched.config.enable_deadline_enforcement) {
printf("DEADLINE ENFORCEMENT: Suspending task '%s'\n", task->name);
// Could implement task suspension or other enforcement actions
}
}
}
pthread_rwlock_unlock(&rt_sched.task_lock);
usleep(1000); // Check every 1ms
}
return NULL;
}
// Performance monitoring and statistics
static void print_rt_statistics(void)
{
printf("\n=== Real-Time System Statistics ===\n");
printf("Global Statistics:\n");
printf(" Schedule calls: %lu\n", rt_sched.global_stats.total_schedule_calls);
printf(" Context switches: %lu\n", rt_sched.global_stats.context_switches);
printf(" Timer overruns: %lu\n", rt_sched.global_stats.timer_overruns);
printf(" Total deadline misses: %lu\n", rt_sched.global_stats.deadline_misses);
printf(" Worst-case latency: %lu ns\n", rt_sched.global_stats.worst_case_latency);
printf("\nPer-Task Statistics:\n");
for (int i = 0; i < rt_sched.num_tasks; i++) {
rt_task_t* task = rt_sched.tasks[i];
if (!task) continue;
printf("Task '%s':\n", task->name);
printf(" Executions: %lu\n", task->stats.total_executions);
printf(" Deadline misses: %lu\n", task->stats.deadline_misses);
if (task->stats.total_executions > 0) {
printf(" Deadline miss ratio: %.3f%%\n",
(double)task->stats.deadline_misses / task->stats.total_executions * 100.0);
}
printf(" Execution time (ns): min=%lu, avg=%lu, max=%lu\n",
task->stats.execution_time_min,
task->stats.execution_time_avg,
task->stats.execution_time_max);
if (task->constraints.wcet_ns > 0) {
printf(" WCET utilization: %.1f%%\n",
(double)task->stats.execution_time_max / task->constraints.wcet_ns * 100.0);
}
printf(" Max jitter: %lu ns\n", task->stats.jitter_max);
printf(" Preemptions: %lu\n", task->stats.preemptions);
printf("\n");
}
printf("===================================\n");
}
// Signal handlers
static void signal_handler(int sig)
{
if (sig == SIGINT || sig == SIGTERM) {
printf("\nReceived signal %d, shutting down RT system...\n", sig);
rt_sched.scheduler_running = false;
} else if (sig == SIGUSR1) {
print_rt_statistics();
}
}
// RT system initialization
static int init_rt_scheduler(rt_sched_algorithm_t algorithm)
{
memset(&rt_sched, 0, sizeof(rt_sched));
rt_sched.algorithm = algorithm;
rt_sched.clock_id = CLOCK_MONOTONIC;
rt_sched.tick_period_ns = NSEC_PER_MSEC; // 1ms tick
// Configuration
rt_sched.config.enable_deadline_enforcement = true;
rt_sched.config.enable_budget_enforcement = true;
rt_sched.config.enable_priority_inheritance = true;
rt_sched.config.scheduler_overhead_ns = 10000; // 10 microseconds
rt_sched.config.max_cpu_utilization = 0.8; // 80% max utilization
// Initialize locks
pthread_rwlock_init(&rt_sched.task_lock, NULL);
// Configure real-time system
if (configure_rt_system() != 0) {
return -1;
}
// Setup signal handlers
struct sigaction sa;
sa.sa_flags = SA_SIGINFO;
sa.sa_sigaction = timer_handler;
sigemptyset(&sa.sa_mask);
sigaction(SIGRTMIN, &sa, NULL);
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGUSR1, signal_handler);
rt_sched.scheduler_running = true;
// Start scheduler thread
pthread_attr_t sched_attr;
pthread_attr_init(&sched_attr);
struct sched_param sched_param;
sched_param.sched_priority = 99; // Highest priority for scheduler
pthread_attr_setschedpolicy(&sched_attr, SCHED_FIFO);
pthread_attr_setschedparam(&sched_attr, &sched_param);
pthread_attr_setinheritsched(&sched_attr, PTHREAD_EXPLICIT_SCHED);
if (pthread_create(&rt_sched.scheduler_thread, &sched_attr, scheduler_thread, NULL) != 0) {
perror("pthread_create scheduler");
return -1;
}
pthread_attr_destroy(&sched_attr);
clock_gettime(CLOCK_MONOTONIC, &rt_sched.system_start_time);
printf("Real-time scheduler initialized with algorithm: %d\n", algorithm);
return 0;
}
static void cleanup_rt_scheduler(void)
{
rt_sched.scheduler_running = false;
// Stop all tasks
for (int i = 0; i < rt_sched.num_tasks; i++) {
if (rt_sched.tasks[i]) {
deactivate_rt_task(rt_sched.tasks[i]);
destroy_rt_task(rt_sched.tasks[i]);
}
}
// Wait for scheduler thread
pthread_join(rt_sched.scheduler_thread, NULL);
// Cleanup
pthread_rwlock_destroy(&rt_sched.task_lock);
// Unlock memory
munlockall();
printf("Real-time scheduler cleanup completed\n");
}
// Example real-time tasks
static void* control_loop_task(void* arg)
{
int* counter = (int*)arg;
// Simulate control loop work
volatile int dummy = 0;
for (int i = 0; i < 10000; i++) {
dummy += i;
}
(*counter)++;
return NULL;
}
static void* sensor_reading_task(void* arg)
{
int* readings = (int*)arg;
// Simulate sensor reading
volatile int dummy = 0;
for (int i = 0; i < 5000; i++) {
dummy += i;
}
(*readings)++;
return NULL;
}
static void* actuator_output_task(void* arg)
{
int* outputs = (int*)arg;
// Simulate actuator output
volatile int dummy = 0;
for (int i = 0; i < 7500; i++) {
dummy += i;
}
(*outputs)++;
return NULL;
}
// Test and demonstration
static void test_rt_system(void)
{
printf("Testing real-time system...\n");
// Task counters
static int control_counter = 0;
static int sensor_counter = 0;
static int actuator_counter = 0;
// Create real-time tasks with different characteristics
// High-frequency control loop (1 kHz)
rt_timing_constraints_t control_constraints = {
.period_ns = 1 * NSEC_PER_MSEC, // 1ms period
.deadline_ns = 1 * NSEC_PER_MSEC, // 1ms deadline
.wcet_ns = 500 * NSEC_PER_USEC, // 500µs WCET
.bcet_ns = 100 * NSEC_PER_USEC, // 100µs BCET
.jitter_ns = 50 * NSEC_PER_USEC, // 50µs max jitter
.offset_ns = 0
};
rt_task_t* control_task = create_rt_task("ControlLoop", RT_TASK_PERIODIC,
&control_constraints,
control_loop_task, &control_counter);
// Medium-frequency sensor reading (100 Hz)
rt_timing_constraints_t sensor_constraints = {
.period_ns = 10 * NSEC_PER_MSEC, // 10ms period
.deadline_ns = 8 * NSEC_PER_MSEC, // 8ms deadline
.wcet_ns = 300 * NSEC_PER_USEC, // 300µs WCET
.bcet_ns = 50 * NSEC_PER_USEC, // 50µs BCET
.jitter_ns = 100 * NSEC_PER_USEC, // 100µs max jitter
.offset_ns = 2 * NSEC_PER_MSEC // 2ms offset
};
rt_task_t* sensor_task = create_rt_task("SensorReading", RT_TASK_PERIODIC,
&sensor_constraints,
sensor_reading_task, &sensor_counter);
// Lower-frequency actuator output (50 Hz)
rt_timing_constraints_t actuator_constraints = {
.period_ns = 20 * NSEC_PER_MSEC, // 20ms period
.deadline_ns = 15 * NSEC_PER_MSEC, // 15ms deadline
.wcet_ns = 400 * NSEC_PER_USEC, // 400µs WCET
.bcet_ns = 75 * NSEC_PER_USEC, // 75µs BCET
.jitter_ns = 200 * NSEC_PER_USEC, // 200µs max jitter
.offset_ns = 5 * NSEC_PER_MSEC // 5ms offset
};
rt_task_t* actuator_task = create_rt_task("ActuatorOutput", RT_TASK_PERIODIC,
&actuator_constraints,
actuator_output_task, &actuator_counter);
// Perform schedulability analysis
if (!analyze_schedulability()) {
printf("Warning: System may not be schedulable\n");
}
// Activate tasks
if (control_task) activate_rt_task(control_task);
if (sensor_task) activate_rt_task(sensor_task);
if (actuator_task) activate_rt_task(actuator_task);
printf("Real-time tasks activated. Send SIGUSR1 for statistics, SIGINT to stop.\n");
// Run for a specified duration or until interrupted
sleep(10);
printf("\nFinal task execution counts:\n");
printf("Control loop: %d executions\n", control_counter);
printf("Sensor reading: %d executions\n", sensor_counter);
printf("Actuator output: %d executions\n", actuator_counter);
}
// Main function
int main(int argc, char* argv[])
{
rt_sched_algorithm_t algorithm = RT_SCHED_RATE_MONOTONIC;
if (argc > 1) {
algorithm = atoi(argv[1]);
}
printf("Advanced Linux Real-Time Systems Programming\n");
printf("Scheduling Algorithm: %d\n", algorithm);
if (geteuid() != 0) {
fprintf(stderr, "This program requires root privileges for real-time scheduling\n");
return 1;
}
// Initialize real-time scheduler
if (init_rt_scheduler(algorithm) != 0) {
fprintf(stderr, "Failed to initialize real-time scheduler\n");
return 1;
}
// Run test
test_rt_system();
// Print final statistics
print_rt_statistics();
// Cleanup
cleanup_rt_scheduler();
return 0;
}
Industrial Control System Framework
Real-Time PID Controller Implementation
// rt_control_system.c - Real-time industrial control system
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <math.h>
#include <time.h>
#include <signal.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sched.h>
#define MAX_CONTROLLERS 16
#define MAX_IO_CHANNELS 64
#define CONTROL_FREQUENCY_HZ 1000
#define CONTROL_PERIOD_NS (1000000000LL / CONTROL_FREQUENCY_HZ)
// Control system types
typedef enum {
CONTROLLER_PID,
CONTROLLER_PI,
CONTROLLER_PD,
CONTROLLER_FUZZY,
CONTROLLER_ADAPTIVE
} controller_type_t;
// I/O channel types
typedef enum {
IO_ANALOG_INPUT,
IO_ANALOG_OUTPUT,
IO_DIGITAL_INPUT,
IO_DIGITAL_OUTPUT,
IO_PWM_OUTPUT,
IO_ENCODER_INPUT
} io_channel_type_t;
// PID controller parameters
typedef struct {
double kp; // Proportional gain
double ki; // Integral gain
double kd; // Derivative gain
double setpoint; // Desired value
double output_min; // Minimum output value
double output_max; // Maximum output value
// Internal state
double integral; // Integral accumulator
double previous_error; // Previous error for derivative
double previous_output; // Previous output for rate limiting
// Anti-windup
bool enable_anti_windup;
double windup_limit;
// Filter parameters
double derivative_filter_alpha; // Low-pass filter for derivative
double filtered_derivative;
} pid_controller_t;
// I/O channel structure
typedef struct {
int channel_id;
io_channel_type_t type;
char name[32];
// Current values
double value;
double scaled_value;
bool digital_state;
// Scaling parameters
double scale_factor;
double offset;
double min_range;
double max_range;
// Hardware interface
void* hw_interface;
int (*read_function)(void* interface, double* value);
int (*write_function)(void* interface, double value);
// Statistics
struct {
uint64_t read_count;
uint64_t write_count;
uint64_t error_count;
double min_value;
double max_value;
} stats;
} io_channel_t;
// Control loop structure
typedef struct {
int loop_id;
char name[32];
controller_type_t type;
bool enabled;
bool auto_mode;
// Controller parameters
union {
pid_controller_t pid;
// Could add other controller types here
} controller;
// I/O assignments
int input_channel;
int output_channel;
int setpoint_channel;
// Loop timing
uint64_t period_ns;
struct timespec next_execution;
// Performance monitoring
struct {
uint64_t execution_count;
uint64_t execution_time_min;
uint64_t execution_time_max;
uint64_t execution_time_avg;
uint64_t deadline_misses;
double control_error_rms;
double output_variance;
} performance;
// Safety limits
struct {
bool enable_limits;
double input_min;
double input_max;
double output_min;
double output_max;
double error_threshold;
uint64_t error_count_limit;
uint64_t consecutive_errors;
} safety;
} control_loop_t;
// Real-time control system context
typedef struct {
// I/O management
io_channel_t io_channels[MAX_IO_CHANNELS];
int num_io_channels;
// Control loops
control_loop_t control_loops[MAX_CONTROLLERS];
int num_control_loops;
// Real-time execution
pthread_t control_thread;
bool system_running;
int control_priority;
// Timing
struct timespec system_start_time;
uint64_t control_cycle_count;
// Safety and monitoring
bool emergency_stop;
bool system_fault;
char fault_message[256];
// Performance statistics
struct {
uint64_t total_cycles;
uint64_t missed_deadlines;
uint64_t max_jitter_ns;
double avg_cpu_usage;
uint64_t io_errors;
} system_stats;
} rt_control_system_t;
static rt_control_system_t control_system = {0};
// Utility functions
static inline uint64_t timespec_to_ns(const struct timespec* ts)
{
return ts->tv_sec * 1000000000LL + ts->tv_nsec;
}
static inline void ns_to_timespec(uint64_t ns, struct timespec* ts)
{
ts->tv_sec = ns / 1000000000LL;
ts->tv_nsec = ns % 1000000000LL;
}
static inline void timespec_add_ns(struct timespec* ts, uint64_t ns)
{
uint64_t total_ns = timespec_to_ns(ts) + ns;
ns_to_timespec(total_ns, ts);
}
// I/O channel management
static int create_io_channel(const char* name, io_channel_type_t type,
double scale_factor, double offset)
{
if (control_system.num_io_channels >= MAX_IO_CHANNELS) {
return -1;
}
io_channel_t* channel = &control_system.io_channels[control_system.num_io_channels];
channel->channel_id = control_system.num_io_channels;
channel->type = type;
strncpy(channel->name, name, sizeof(channel->name) - 1);
channel->scale_factor = scale_factor;
channel->offset = offset;
channel->min_range = -1000.0;
channel->max_range = 1000.0;
// Initialize statistics
channel->stats.min_value = INFINITY;
channel->stats.max_value = -INFINITY;
control_system.num_io_channels++;
printf("Created I/O channel '%s' (ID: %d, Type: %d)\n",
name, channel->channel_id, type);
return channel->channel_id;
}
static int read_io_channel(int channel_id, double* value)
{
if (channel_id < 0 || channel_id >= control_system.num_io_channels) {
return -1;
}
io_channel_t* channel = &control_system.io_channels[channel_id];
int result = 0;
if (channel->read_function && channel->hw_interface) {
result = channel->read_function(channel->hw_interface, &channel->value);
} else {
// Simulate reading for demonstration
channel->value = sin(control_system.control_cycle_count * 0.01) * 50.0;
}
if (result == 0) {
// Apply scaling
channel->scaled_value = channel->value * channel->scale_factor + channel->offset;
*value = channel->scaled_value;
// Update statistics
channel->stats.read_count++;
if (channel->scaled_value < channel->stats.min_value) {
channel->stats.min_value = channel->scaled_value;
}
if (channel->scaled_value > channel->stats.max_value) {
channel->stats.max_value = channel->scaled_value;
}
} else {
channel->stats.error_count++;
control_system.system_stats.io_errors++;
}
return result;
}
static int write_io_channel(int channel_id, double value)
{
if (channel_id < 0 || channel_id >= control_system.num_io_channels) {
return -1;
}
io_channel_t* channel = &control_system.io_channels[channel_id];
// Apply scaling (reverse)
double hw_value = (value - channel->offset) / channel->scale_factor;
// Apply range limits
if (hw_value < channel->min_range) hw_value = channel->min_range;
if (hw_value > channel->max_range) hw_value = channel->max_range;
int result = 0;
if (channel->write_function && channel->hw_interface) {
result = channel->write_function(channel->hw_interface, hw_value);
} else {
// Simulate writing for demonstration
channel->value = hw_value;
}
if (result == 0) {
channel->scaled_value = value;
channel->stats.write_count++;
} else {
channel->stats.error_count++;
control_system.system_stats.io_errors++;
}
return result;
}
// PID controller implementation
static void init_pid_controller(pid_controller_t* pid, double kp, double ki, double kd,
double setpoint, double output_min, double output_max)
{
pid->kp = kp;
pid->ki = ki;
pid->kd = kd;
pid->setpoint = setpoint;
pid->output_min = output_min;
pid->output_max = output_max;
pid->integral = 0.0;
pid->previous_error = 0.0;
pid->previous_output = 0.0;
pid->enable_anti_windup = true;
pid->windup_limit = (output_max - output_min) * 0.8;
pid->derivative_filter_alpha = 0.1; // 10% filter
pid->filtered_derivative = 0.0;
}
static double execute_pid_controller(pid_controller_t* pid, double process_value, double dt)
{
// Calculate error
double error = pid->setpoint - process_value;
// Proportional term
double proportional = pid->kp * error;
// Integral term with anti-windup
pid->integral += error * dt;
if (pid->enable_anti_windup) {
if (pid->integral > pid->windup_limit) {
pid->integral = pid->windup_limit;
} else if (pid->integral < -pid->windup_limit) {
pid->integral = -pid->windup_limit;
}
}
double integral = pid->ki * pid->integral;
// Derivative term with filtering
double derivative_raw = (error - pid->previous_error) / dt;
pid->filtered_derivative = pid->derivative_filter_alpha * derivative_raw +
(1.0 - pid->derivative_filter_alpha) * pid->filtered_derivative;
double derivative = pid->kd * pid->filtered_derivative;
// Calculate output
double output = proportional + integral + derivative;
// Apply output limits
if (output > pid->output_max) {
output = pid->output_max;
} else if (output < pid->output_min) {
output = pid->output_min;
}
// Store previous values
pid->previous_error = error;
pid->previous_output = output;
return output;
}
// Control loop management
static int create_control_loop(const char* name, controller_type_t type,
int input_channel, int output_channel,
double kp, double ki, double kd, double setpoint)
{
if (control_system.num_control_loops >= MAX_CONTROLLERS) {
return -1;
}
control_loop_t* loop = &control_system.control_loops[control_system.num_control_loops];
loop->loop_id = control_system.num_control_loops;
strncpy(loop->name, name, sizeof(loop->name) - 1);
loop->type = type;
loop->enabled = true;
loop->auto_mode = true;
loop->input_channel = input_channel;
loop->output_channel = output_channel;
loop->setpoint_channel = -1; // No external setpoint by default
loop->period_ns = CONTROL_PERIOD_NS;
// Initialize controller based on type
if (type == CONTROLLER_PID || type == CONTROLLER_PI || type == CONTROLLER_PD) {
init_pid_controller(&loop->controller.pid, kp, ki, kd, setpoint, -100.0, 100.0);
}
// Initialize safety limits
loop->safety.enable_limits = true;
loop->safety.input_min = -1000.0;
loop->safety.input_max = 1000.0;
loop->safety.output_min = -100.0;
loop->safety.output_max = 100.0;
loop->safety.error_threshold = 500.0;
loop->safety.error_count_limit = 10;
// Initialize performance statistics
loop->performance.execution_time_min = UINT64_MAX;
control_system.num_control_loops++;
printf("Created control loop '%s' (ID: %d, Type: %d)\n",
name, loop->loop_id, type);
return loop->loop_id;
}
static int execute_control_loop(control_loop_t* loop)
{
if (!loop->enabled || !loop->auto_mode) {
return 0;
}
struct timespec start_time, end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
// Read process value
double process_value;
if (read_io_channel(loop->input_channel, &process_value) != 0) {
loop->safety.consecutive_errors++;
return -1;
}
// Safety checks
if (loop->safety.enable_limits) {
if (process_value < loop->safety.input_min ||
process_value > loop->safety.input_max) {
printf("SAFETY: Input out of range for loop '%s': %.2f\n",
loop->name, process_value);
loop->enabled = false;
return -1;
}
double error = fabs(loop->controller.pid.setpoint - process_value);
if (error > loop->safety.error_threshold) {
loop->safety.consecutive_errors++;
if (loop->safety.consecutive_errors >= loop->safety.error_count_limit) {
printf("SAFETY: Excessive error for loop '%s': %.2f\n",
loop->name, error);
loop->enabled = false;
return -1;
}
} else {
loop->safety.consecutive_errors = 0;
}
}
// Execute controller
double output = 0.0;
double dt = (double)loop->period_ns / 1000000000.0; // Convert to seconds
switch (loop->type) {
case CONTROLLER_PID:
case CONTROLLER_PI:
case CONTROLLER_PD:
output = execute_pid_controller(&loop->controller.pid, process_value, dt);
break;
default:
return -1;
}
// Apply safety limits to output
if (loop->safety.enable_limits) {
if (output < loop->safety.output_min) output = loop->safety.output_min;
if (output > loop->safety.output_max) output = loop->safety.output_max;
}
// Write output
if (write_io_channel(loop->output_channel, output) != 0) {
loop->safety.consecutive_errors++;
return -1;
}
// Update performance statistics
clock_gettime(CLOCK_MONOTONIC, &end_time);
uint64_t execution_time = timespec_to_ns(&end_time) - timespec_to_ns(&start_time);
loop->performance.execution_count++;
if (execution_time < loop->performance.execution_time_min) {
loop->performance.execution_time_min = execution_time;
}
if (execution_time > loop->performance.execution_time_max) {
loop->performance.execution_time_max = execution_time;
}
// Update average using exponential moving average
if (loop->performance.execution_count == 1) {
loop->performance.execution_time_avg = execution_time;
} else {
loop->performance.execution_time_avg =
(loop->performance.execution_time_avg * 0.95) + (execution_time * 0.05);
}
// Update control error RMS
double error = loop->controller.pid.setpoint - process_value;
loop->performance.control_error_rms =
sqrt((loop->performance.control_error_rms * loop->performance.control_error_rms * 0.99) +
(error * error * 0.01));
return 0;
}
// Real-time control thread
static void* control_thread_func(void* arg)
{
printf("Real-time control thread started\n");
// Set thread priority
struct sched_param param;
param.sched_priority = control_system.control_priority;
pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m);
// Initialize timing
struct timespec next_cycle;
clock_gettime(CLOCK_MONOTONIC, &next_cycle);
while (control_system.system_running && !control_system.emergency_stop) {
struct timespec cycle_start;
clock_gettime(CLOCK_MONOTONIC, &cycle_start);
// Check for deadline miss
if (timespec_to_ns(&cycle_start) > timespec_to_ns(&next_cycle)) {
control_system.system_stats.missed_deadlines++;
uint64_t jitter = timespec_to_ns(&cycle_start) - timespec_to_ns(&next_cycle);
if (jitter > control_system.system_stats.max_jitter_ns) {
control_system.system_stats.max_jitter_ns = jitter;
}
}
// Execute all enabled control loops
for (int i = 0; i < control_system.num_control_loops; i++) {
control_loop_t* loop = &control_system.control_loops[i];
if (execute_control_loop(loop) != 0) {
printf("Error executing control loop '%s'\n", loop->name);
if (!loop->enabled) {
control_system.system_fault = true;
snprintf(control_system.fault_message, sizeof(control_system.fault_message),
"Control loop '%s' disabled due to safety violation", loop->name);
}
}
}
control_system.control_cycle_count++;
control_system.system_stats.total_cycles++;
// Calculate next cycle time
timespec_add_ns(&next_cycle, CONTROL_PERIOD_NS);
// Sleep until next cycle
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_cycle, NULL);
}
printf("Real-time control thread stopped\n");
return NULL;
}
// System control and monitoring
static int start_control_system(void)
{
if (control_system.system_running) {
return -1;
}
control_system.system_running = true;
control_system.emergency_stop = false;
control_system.system_fault = false;
control_system.control_priority = 80; // High real-time priority
clock_gettime(CLOCK_MONOTONIC, &control_system.system_start_time);
// Create real-time control thread
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&control_system.control_thread, &attr, control_thread_func, NULL) != 0) {
perror("pthread_create");
control_system.system_running = false;
pthread_attr_destroy(&attr);
return -1;
}
pthread_attr_destroy(&attr);
printf("Control system started\n");
return 0;
}
static void stop_control_system(void)
{
if (!control_system.system_running) {
return;
}
control_system.system_running = false;
// Wait for control thread to finish
pthread_join(control_system.control_thread, NULL);
printf("Control system stopped\n");
}
static void emergency_stop(void)
{
printf("EMERGENCY STOP ACTIVATED\n");
control_system.emergency_stop = true;
// Set all outputs to safe values (typically zero)
for (int i = 0; i < control_system.num_control_loops; i++) {
control_loop_t* loop = &control_system.control_loops[i];
if (loop->enabled) {
write_io_channel(loop->output_channel, 0.0);
loop->enabled = false;
}
}
}
// Statistics and monitoring
static void print_control_statistics(void)
{
printf("\n=== Control System Statistics ===\n");
printf("System Status: %s\n",
control_system.system_running ? "Running" : "Stopped");
if (control_system.emergency_stop) {
printf("EMERGENCY STOP ACTIVE\n");
}
if (control_system.system_fault) {
printf("SYSTEM FAULT: %s\n", control_system.fault_message);
}
printf("Total control cycles: %lu\n", control_system.system_stats.total_cycles);
printf("Missed deadlines: %lu\n", control_system.system_stats.missed_deadlines);
if (control_system.system_stats.total_cycles > 0) {
printf("Deadline miss ratio: %.3f%%\n",
(double)control_system.system_stats.missed_deadlines /
control_system.system_stats.total_cycles * 100.0);
}
printf("Max jitter: %lu ns\n", control_system.system_stats.max_jitter_ns);
printf("I/O errors: %lu\n", control_system.system_stats.io_errors);
printf("\nControl Loop Performance:\n");
for (int i = 0; i < control_system.num_control_loops; i++) {
control_loop_t* loop = &control_system.control_loops[i];
printf("Loop '%s':\n", loop->name);
printf(" Status: %s\n", loop->enabled ? "Enabled" : "Disabled");
printf(" Executions: %lu\n", loop->performance.execution_count);
printf(" Execution time (ns): min=%lu, avg=%lu, max=%lu\n",
loop->performance.execution_time_min,
loop->performance.execution_time_avg,
loop->performance.execution_time_max);
printf(" Control error RMS: %.3f\n", loop->performance.control_error_rms);
printf(" Setpoint: %.2f\n", loop->controller.pid.setpoint);
double current_value;
if (read_io_channel(loop->input_channel, ¤t_value) == 0) {
printf(" Current value: %.2f\n", current_value);
}
printf("\n");
}
printf("I/O Channel Statistics:\n");
for (int i = 0; i < control_system.num_io_channels; i++) {
io_channel_t* channel = &control_system.io_channels[i];
printf("Channel '%s': reads=%lu, writes=%lu, errors=%lu\n",
channel->name, channel->stats.read_count,
channel->stats.write_count, channel->stats.error_count);
}
printf("=================================\n");
}
// Signal handlers
static void signal_handler(int sig)
{
if (sig == SIGINT || sig == SIGTERM) {
printf("\nReceived signal %d, stopping control system...\n", sig);
stop_control_system();
} else if (sig == SIGUSR1) {
print_control_statistics();
} else if (sig == SIGUSR2) {
emergency_stop();
}
}
// Test scenarios
static void setup_test_control_system(void)
{
// Create I/O channels
int temp_sensor = create_io_channel("TemperatureSensor", IO_ANALOG_INPUT, 1.0, 0.0);
int heater_output = create_io_channel("HeaterOutput", IO_ANALOG_OUTPUT, 1.0, 0.0);
int pressure_sensor = create_io_channel("PressureSensor", IO_ANALOG_INPUT, 1.0, 0.0);
int valve_output = create_io_channel("ValveOutput", IO_ANALOG_OUTPUT, 1.0, 0.0);
// Create control loops
// Temperature control loop (PID)
create_control_loop("TemperatureControl", CONTROLLER_PID,
temp_sensor, heater_output,
2.0, 0.5, 0.1, 75.0); // Kp=2.0, Ki=0.5, Kd=0.1, SP=75°C
// Pressure control loop (PI)
create_control_loop("PressureControl", CONTROLLER_PI,
pressure_sensor, valve_output,
1.5, 0.3, 0.0, 50.0); // Kp=1.5, Ki=0.3, Kd=0.0, SP=50 PSI
printf("Test control system configured\n");
}
// Main function
int main(void)
{
printf("Advanced Real-Time Industrial Control System\n");
if (geteuid() != 0) {
fprintf(stderr, "This program requires root privileges for real-time scheduling\n");
return 1;
}
// Configure real-time environment
if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) {
perror("mlockall");
return 1;
}
// Set up signal handlers
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGUSR1, signal_handler);
signal(SIGUSR2, signal_handler);
// Setup test system
setup_test_control_system();
// Start control system
if (start_control_system() != 0) {
fprintf(stderr, "Failed to start control system\n");
return 1;
}
printf("Control system running...\n");
printf("Send SIGUSR1 for statistics, SIGUSR2 for emergency stop, SIGINT to exit\n");
// Main monitoring loop
while (control_system.system_running) {
sleep(5);
if (control_system.system_fault) {
printf("System fault detected, stopping...\n");
break;
}
}
// Print final statistics
print_control_statistics();
// Cleanup
stop_control_system();
munlockall();
return 0;
}
This comprehensive Linux real-time systems programming blog post covers:
- Advanced Real-Time Scheduling - Complete framework with multiple scheduling algorithms (RM, EDF, etc.) and timing analysis
- Industrial Control Systems - Real-time PID controllers with safety monitoring and performance tracking
- PREEMPT_RT Integration - Proper configuration for deterministic behavior and low-latency operation
- Memory Management - Stack locking, memory pre-allocation, and real-time safe practices
- Performance Monitoring - Comprehensive timing analysis, deadline monitoring, and statistics collection
The implementation demonstrates production-ready real-time programming techniques suitable for industrial automation, robotics, and critical control systems.