Advanced Linux Machine Learning Systems: Building High-Performance ML Infrastructure and Inference Engines
Advanced Linux machine learning systems require deep understanding of neural network architectures, GPU acceleration, and distributed computing. This comprehensive guide explores building custom ML frameworks, implementing efficient inference engines, GPU optimization with CUDA, and creating production-grade machine learning infrastructure.
Advanced Linux Machine Learning Systems
Custom Neural Network Framework
High-Performance Neural Network Implementation
// neural_network_framework.c - Advanced neural network framework implementation
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <pthread.h>
#include <immintrin.h>
#include <cblas.h>
#include <cuda_runtime.h>
#include <cudnn.h>
#include <cublas_v2.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#define MAX_LAYERS 100
#define MAX_BATCH_SIZE 1024
#define MAX_NEURONS 4096
#define CACHE_LINE_SIZE 64
#define NUM_THREADS 8
#define GPU_MEMORY_POOL_SIZE (1024 * 1024 * 1024) // 1GB
// Activation function types
typedef enum {
ACTIVATION_RELU,
ACTIVATION_SIGMOID,
ACTIVATION_TANH,
ACTIVATION_LEAKY_RELU,
ACTIVATION_ELU,
ACTIVATION_SWISH,
ACTIVATION_GELU,
ACTIVATION_SOFTMAX
} activation_type_t;
// Layer types
typedef enum {
LAYER_DENSE,
LAYER_CONV2D,
LAYER_MAXPOOL2D,
LAYER_AVGPOOL2D,
LAYER_BATCHNORM,
LAYER_DROPOUT,
LAYER_LSTM,
LAYER_GRU,
LAYER_ATTENTION,
LAYER_EMBEDDING
} layer_type_t;
// Optimizer types
typedef enum {
OPTIMIZER_SGD,
OPTIMIZER_MOMENTUM,
OPTIMIZER_ADAM,
OPTIMIZER_ADAMW,
OPTIMIZER_RMSPROP,
OPTIMIZER_ADAGRAD,
OPTIMIZER_LAMB
} optimizer_type_t;
// Tensor structure
typedef struct {
float *data;
float *grad;
int *shape;
int num_dims;
size_t size;
bool requires_grad;
bool is_cuda;
cudnnTensorDescriptor_t cudnn_descriptor;
} tensor_t;
// Layer structure
typedef struct layer {
layer_type_t type;
char name[64];
// Layer parameters
tensor_t *weights;
tensor_t *bias;
tensor_t *output;
tensor_t *grad_output;
// Layer configuration
int input_size;
int output_size;
activation_type_t activation;
float dropout_rate;
// Convolution parameters
int kernel_size;
int stride;
int padding;
int filters;
// RNN parameters
int hidden_size;
int num_layers;
bool bidirectional;
// Batch normalization
tensor_t *running_mean;
tensor_t *running_var;
float momentum;
float epsilon;
// Forward and backward functions
int (*forward)(struct layer *layer, tensor_t *input, tensor_t *output);
int (*backward)(struct layer *layer, tensor_t *grad_output, tensor_t *grad_input);
// Optimization state
tensor_t *weight_momentum;
tensor_t *bias_momentum;
tensor_t *weight_velocity;
tensor_t *bias_velocity;
struct layer *next;
struct layer *prev;
} layer_t;
// Neural network model
typedef struct {
layer_t *layers;
int num_layers;
// Training configuration
optimizer_type_t optimizer;
float learning_rate;
float weight_decay;
float momentum;
float beta1;
float beta2;
float epsilon;
int batch_size;
// GPU resources
cublasHandle_t cublas_handle;
cudnnHandle_t cudnn_handle;
void *gpu_memory_pool;
size_t gpu_memory_used;
// Training statistics
float *loss_history;
float *accuracy_history;
int epoch;
int iteration;
// Thread pool for CPU parallelism
pthread_t threads[NUM_THREADS];
pthread_mutex_t mutex;
pthread_cond_t cond;
bool thread_pool_active;
} neural_network_t;
// Training data structure
typedef struct {
tensor_t *inputs;
tensor_t *targets;
int num_samples;
int current_batch;
bool shuffle;
} dataset_t;
// Model checkpoint
typedef struct {
char model_name[256];
int epoch;
float loss;
float accuracy;
time_t timestamp;
size_t model_size;
} checkpoint_t;
// Function prototypes
int nn_init(neural_network_t *nn);
int nn_add_layer(neural_network_t *nn, layer_t *layer);
int nn_forward(neural_network_t *nn, tensor_t *input, tensor_t *output);
int nn_backward(neural_network_t *nn, tensor_t *loss_grad);
int nn_update_weights(neural_network_t *nn);
int nn_train(neural_network_t *nn, dataset_t *train_data, dataset_t *val_data, int epochs);
int nn_predict(neural_network_t *nn, tensor_t *input, tensor_t *output);
int nn_save_model(neural_network_t *nn, const char *filename);
int nn_load_model(neural_network_t *nn, const char *filename);
void nn_cleanup(neural_network_t *nn);
// Tensor operations
tensor_t *tensor_create(int *shape, int num_dims, bool requires_grad);
int tensor_zeros(tensor_t *tensor);
int tensor_ones(tensor_t *tensor);
int tensor_randn(tensor_t *tensor, float mean, float std);
int tensor_copy(tensor_t *dst, tensor_t *src);
int tensor_add(tensor_t *a, tensor_t *b, tensor_t *result);
int tensor_multiply(tensor_t *a, tensor_t *b, tensor_t *result);
int tensor_matmul(tensor_t *a, tensor_t *b, tensor_t *result);
int tensor_transpose(tensor_t *tensor);
void tensor_free(tensor_t *tensor);
// Layer implementations
layer_t *dense_layer_create(int input_size, int output_size, activation_type_t activation);
layer_t *conv2d_layer_create(int in_channels, int out_channels, int kernel_size, int stride, int padding);
layer_t *lstm_layer_create(int input_size, int hidden_size, int num_layers);
layer_t *attention_layer_create(int embed_dim, int num_heads);
// Activation functions
int activation_relu(tensor_t *input, tensor_t *output);
int activation_relu_backward(tensor_t *grad_output, tensor_t *input, tensor_t *grad_input);
int activation_sigmoid(tensor_t *input, tensor_t *output);
int activation_sigmoid_backward(tensor_t *grad_output, tensor_t *output, tensor_t *grad_input);
int activation_softmax(tensor_t *input, tensor_t *output);
// Loss functions
float loss_cross_entropy(tensor_t *predictions, tensor_t *targets);
int loss_cross_entropy_backward(tensor_t *predictions, tensor_t *targets, tensor_t *grad);
float loss_mse(tensor_t *predictions, tensor_t *targets);
int loss_mse_backward(tensor_t *predictions, tensor_t *targets, tensor_t *grad);
// Optimizers
int optimizer_sgd_update(neural_network_t *nn, layer_t *layer);
int optimizer_adam_update(neural_network_t *nn, layer_t *layer);
int optimizer_lamb_update(neural_network_t *nn, layer_t *layer);
// GPU operations
int gpu_init(neural_network_t *nn);
int tensor_to_gpu(tensor_t *tensor);
int tensor_to_cpu(tensor_t *tensor);
int gpu_matmul(tensor_t *a, tensor_t *b, tensor_t *result);
int gpu_conv2d_forward(layer_t *layer, tensor_t *input, tensor_t *output);
int gpu_conv2d_backward(layer_t *layer, tensor_t *grad_output, tensor_t *grad_input);
void gpu_cleanup(neural_network_t *nn);
// Distributed training
int distributed_init(neural_network_t *nn, int world_size, int rank);
int distributed_all_reduce(tensor_t *tensor);
int distributed_broadcast(tensor_t *tensor, int root);
void distributed_cleanup(void);
// Data augmentation
int augment_random_crop(tensor_t *image, int crop_size);
int augment_random_flip(tensor_t *image);
int augment_color_jitter(tensor_t *image, float brightness, float contrast, float saturation);
int augment_mixup(tensor_t *image1, tensor_t *image2, tensor_t *label1, tensor_t *label2, float alpha);
// Performance optimization
int optimize_graph(neural_network_t *nn);
int fuse_operations(neural_network_t *nn);
int quantize_model(neural_network_t *nn, int bits);
int profile_model(neural_network_t *nn, tensor_t *input);
// Example: Image classification model
int build_resnet50(neural_network_t *nn, int num_classes);
int build_efficientnet(neural_network_t *nn, int num_classes);
int build_vision_transformer(neural_network_t *nn, int num_classes, int patch_size);
// Global variables
static bool g_cuda_initialized = false;
static int g_num_gpus = 0;
int main(int argc, char *argv[]) {
neural_network_t nn;
int result;
// Initialize neural network
result = nn_init(&nn);
if (result != 0) {
fprintf(stderr, "Failed to initialize neural network\n");
return 1;
}
printf("Neural Network Framework initialized\n");
printf("CUDA available: %s\n", g_cuda_initialized ? "Yes" : "No");
printf("Number of GPUs: %d\n", g_num_gpus);
// Build a simple CNN model
printf("\n=== Building CNN Model ===\n");
// Input layer: 3x224x224 (RGB image)
layer_t *conv1 = conv2d_layer_create(3, 64, 7, 2, 3);
nn_add_layer(&nn, conv1);
// Additional convolutional layers
layer_t *conv2 = conv2d_layer_create(64, 128, 3, 2, 1);
nn_add_layer(&nn, conv2);
layer_t *conv3 = conv2d_layer_create(128, 256, 3, 2, 1);
nn_add_layer(&nn, conv3);
// Global average pooling (simplified as dense layer)
layer_t *fc1 = dense_layer_create(256 * 28 * 28, 512, ACTIVATION_RELU);
nn_add_layer(&nn, fc1);
// Output layer
layer_t *fc2 = dense_layer_create(512, 10, ACTIVATION_SOFTMAX);
nn_add_layer(&nn, fc2);
printf("Model architecture created with %d layers\n", nn.num_layers);
// Create dummy training data
printf("\n=== Creating Training Data ===\n");
int batch_size = 32;
int input_shape[] = {batch_size, 3, 224, 224};
int label_shape[] = {batch_size, 10};
tensor_t *input = tensor_create(input_shape, 4, false);
tensor_t *labels = tensor_create(label_shape, 2, false);
// Initialize with random data
tensor_randn(input, 0.0f, 1.0f);
tensor_zeros(labels);
// Set some random labels
for (int i = 0; i < batch_size; i++) {
int label = rand() % 10;
labels->data[i * 10 + label] = 1.0f;
}
// Forward pass
printf("\n=== Forward Pass ===\n");
tensor_t *output = tensor_create(label_shape, 2, false);
result = nn_forward(&nn, input, output);
if (result != 0) {
fprintf(stderr, "Forward pass failed\n");
goto cleanup;
}
// Calculate loss
float loss = loss_cross_entropy(output, labels);
printf("Initial loss: %.4f\n", loss);
// Backward pass
printf("\n=== Backward Pass ===\n");
tensor_t *loss_grad = tensor_create(label_shape, 2, true);
loss_cross_entropy_backward(output, labels, loss_grad);
result = nn_backward(&nn, loss_grad);
if (result != 0) {
fprintf(stderr, "Backward pass failed\n");
goto cleanup;
}
// Update weights
nn.learning_rate = 0.001f;
nn.optimizer = OPTIMIZER_ADAM;
result = nn_update_weights(&nn);
if (result != 0) {
fprintf(stderr, "Weight update failed\n");
goto cleanup;
}
printf("Weights updated successfully\n");
// Performance profiling
printf("\n=== Performance Profiling ===\n");
clock_t start = clock();
int num_iterations = 100;
for (int i = 0; i < num_iterations; i++) {
nn_forward(&nn, input, output);
if (g_cuda_initialized) {
cudaDeviceSynchronize();
}
}
clock_t end = clock();
double elapsed = ((double)(end - start)) / CLOCKS_PER_SEC;
double throughput = (num_iterations * batch_size) / elapsed;
printf("Forward pass performance:\n");
printf(" Total time: %.3f seconds\n", elapsed);
printf(" Throughput: %.1f images/second\n", throughput);
printf(" Latency: %.3f ms/batch\n", (elapsed / num_iterations) * 1000);
// Save model
printf("\n=== Saving Model ===\n");
result = nn_save_model(&nn, "model.bin");
if (result == 0) {
printf("Model saved successfully\n");
}
cleanup:
// Cleanup resources
tensor_free(input);
tensor_free(labels);
tensor_free(output);
tensor_free(loss_grad);
nn_cleanup(&nn);
printf("\nNeural network framework cleanup completed\n");
return 0;
}
int nn_init(neural_network_t *nn) {
if (!nn) return -1;
memset(nn, 0, sizeof(neural_network_t));
// Initialize mutex and condition variable
pthread_mutex_init(&nn->mutex, NULL);
pthread_cond_init(&nn->cond, NULL);
// Initialize GPU if available
if (gpu_init(nn) == 0) {
g_cuda_initialized = true;
cudaGetDeviceCount(&g_num_gpus);
}
// Set default training parameters
nn->learning_rate = 0.001f;
nn->weight_decay = 0.0001f;
nn->momentum = 0.9f;
nn->beta1 = 0.9f;
nn->beta2 = 0.999f;
nn->epsilon = 1e-8f;
nn->batch_size = 32;
return 0;
}
tensor_t *tensor_create(int *shape, int num_dims, bool requires_grad) {
tensor_t *tensor = (tensor_t *)malloc(sizeof(tensor_t));
if (!tensor) return NULL;
memset(tensor, 0, sizeof(tensor_t));
// Copy shape
tensor->shape = (int *)malloc(num_dims * sizeof(int));
if (!tensor->shape) {
free(tensor);
return NULL;
}
memcpy(tensor->shape, shape, num_dims * sizeof(int));
tensor->num_dims = num_dims;
// Calculate total size
tensor->size = 1;
for (int i = 0; i < num_dims; i++) {
tensor->size *= shape[i];
}
// Allocate data
tensor->data = (float *)aligned_alloc(CACHE_LINE_SIZE, tensor->size * sizeof(float));
if (!tensor->data) {
free(tensor->shape);
free(tensor);
return NULL;
}
// Allocate gradient if needed
if (requires_grad) {
tensor->grad = (float *)aligned_alloc(CACHE_LINE_SIZE, tensor->size * sizeof(float));
if (!tensor->grad) {
free(tensor->data);
free(tensor->shape);
free(tensor);
return NULL;
}
memset(tensor->grad, 0, tensor->size * sizeof(float));
}
tensor->requires_grad = requires_grad;
tensor->is_cuda = false;
return tensor;
}
int tensor_randn(tensor_t *tensor, float mean, float std) {
if (!tensor || !tensor->data) return -1;
// Box-Muller transform for normal distribution
for (size_t i = 0; i < tensor->size; i += 2) {
float u1 = (float)rand() / RAND_MAX;
float u2 = (float)rand() / RAND_MAX;
float z0 = sqrtf(-2.0f * logf(u1)) * cosf(2.0f * M_PI * u2);
float z1 = sqrtf(-2.0f * logf(u1)) * sinf(2.0f * M_PI * u2);
tensor->data[i] = z0 * std + mean;
if (i + 1 < tensor->size) {
tensor->data[i + 1] = z1 * std + mean;
}
}
return 0;
}
layer_t *dense_layer_create(int input_size, int output_size, activation_type_t activation) {
layer_t *layer = (layer_t *)malloc(sizeof(layer_t));
if (!layer) return NULL;
memset(layer, 0, sizeof(layer_t));
layer->type = LAYER_DENSE;
snprintf(layer->name, sizeof(layer->name), "dense_%d_%d", input_size, output_size);
layer->input_size = input_size;
layer->output_size = output_size;
layer->activation = activation;
// Create weight tensor
int weight_shape[] = {output_size, input_size};
layer->weights = tensor_create(weight_shape, 2, true);
if (!layer->weights) {
free(layer);
return NULL;
}
// Xavier initialization
float scale = sqrtf(2.0f / (input_size + output_size));
tensor_randn(layer->weights, 0.0f, scale);
// Create bias tensor
int bias_shape[] = {output_size};
layer->bias = tensor_create(bias_shape, 1, true);
if (!layer->bias) {
tensor_free(layer->weights);
free(layer);
return NULL;
}
tensor_zeros(layer->bias);
// Set forward and backward functions
layer->forward = dense_forward;
layer->backward = dense_backward;
return layer;
}
int dense_forward(layer_t *layer, tensor_t *input, tensor_t *output) {
if (!layer || !input || !output) return -1;
int batch_size = input->shape[0];
int input_size = layer->input_size;
int output_size = layer->output_size;
// Perform matrix multiplication: output = input @ weights.T + bias
if (g_cuda_initialized && input->is_cuda) {
// GPU implementation
gpu_matmul(input, layer->weights, output);
} else {
// CPU implementation using BLAS
cblas_sgemm(CblasRowMajor, CblasNoTrans, CblasTrans,
batch_size, output_size, input_size,
1.0f, input->data, input_size,
layer->weights->data, input_size,
0.0f, output->data, output_size);
}
// Add bias
for (int i = 0; i < batch_size; i++) {
for (int j = 0; j < output_size; j++) {
output->data[i * output_size + j] += layer->bias->data[j];
}
}
// Apply activation function
switch (layer->activation) {
case ACTIVATION_RELU:
activation_relu(output, output);
break;
case ACTIVATION_SIGMOID:
activation_sigmoid(output, output);
break;
case ACTIVATION_SOFTMAX:
activation_softmax(output, output);
break;
default:
break;
}
// Store output for backward pass
layer->output = output;
return 0;
}
int activation_relu(tensor_t *input, tensor_t *output) {
if (!input || !output) return -1;
// ReLU: f(x) = max(0, x)
#pragma omp parallel for
for (size_t i = 0; i < input->size; i++) {
output->data[i] = fmaxf(0.0f, input->data[i]);
}
return 0;
}
int activation_softmax(tensor_t *input, tensor_t *output) {
if (!input || !output) return -1;
int batch_size = input->shape[0];
int num_classes = input->shape[1];
#pragma omp parallel for
for (int b = 0; b < batch_size; b++) {
float *in_ptr = input->data + b * num_classes;
float *out_ptr = output->data + b * num_classes;
// Find max for numerical stability
float max_val = in_ptr[0];
for (int i = 1; i < num_classes; i++) {
max_val = fmaxf(max_val, in_ptr[i]);
}
// Compute exp and sum
float sum = 0.0f;
for (int i = 0; i < num_classes; i++) {
out_ptr[i] = expf(in_ptr[i] - max_val);
sum += out_ptr[i];
}
// Normalize
float inv_sum = 1.0f / sum;
for (int i = 0; i < num_classes; i++) {
out_ptr[i] *= inv_sum;
}
}
return 0;
}
float loss_cross_entropy(tensor_t *predictions, tensor_t *targets) {
if (!predictions || !targets) return -1.0f;
int batch_size = predictions->shape[0];
int num_classes = predictions->shape[1];
float total_loss = 0.0f;
#pragma omp parallel for reduction(+:total_loss)
for (int b = 0; b < batch_size; b++) {
for (int c = 0; c < num_classes; c++) {
int idx = b * num_classes + c;
if (targets->data[idx] > 0) {
total_loss += -targets->data[idx] * logf(predictions->data[idx] + 1e-7f);
}
}
}
return total_loss / batch_size;
}
int nn_forward(neural_network_t *nn, tensor_t *input, tensor_t *output) {
if (!nn || !input || !output) return -1;
tensor_t *current_input = input;
tensor_t *current_output = NULL;
// Forward pass through all layers
layer_t *layer = nn->layers;
while (layer) {
// Allocate output tensor for this layer
int output_shape[4];
output_shape[0] = current_input->shape[0]; // batch size
if (layer->type == LAYER_DENSE) {
output_shape[1] = layer->output_size;
current_output = tensor_create(output_shape, 2, true);
} else if (layer->type == LAYER_CONV2D) {
// Calculate output dimensions for convolution
int h_out = (current_input->shape[2] + 2 * layer->padding - layer->kernel_size) / layer->stride + 1;
int w_out = (current_input->shape[3] + 2 * layer->padding - layer->kernel_size) / layer->stride + 1;
output_shape[1] = layer->filters;
output_shape[2] = h_out;
output_shape[3] = w_out;
current_output = tensor_create(output_shape, 4, true);
}
// Perform forward pass
int result = layer->forward(layer, current_input, current_output);
if (result != 0) {
return -1;
}
// Move to next layer
if (layer != nn->layers) {
tensor_free(current_input); // Free intermediate tensors
}
current_input = current_output;
layer = layer->next;
}
// Copy final output
tensor_copy(output, current_output);
if (current_output != output) {
tensor_free(current_output);
}
return 0;
}
int gpu_init(neural_network_t *nn) {
if (!nn) return -1;
// Check CUDA availability
int device_count;
cudaError_t cuda_err = cudaGetDeviceCount(&device_count);
if (cuda_err != cudaSuccess || device_count == 0) {
return -1;
}
// Select best GPU
int best_device = 0;
size_t max_memory = 0;
for (int i = 0; i < device_count; i++) {
cudaDeviceProp prop;
cudaGetDeviceProperties(&prop, i);
if (prop.totalGlobalMem > max_memory) {
max_memory = prop.totalGlobalMem;
best_device = i;
}
printf("GPU %d: %s, %zu MB, Compute %d.%d\n",
i, prop.name, prop.totalGlobalMem / (1024 * 1024),
prop.major, prop.minor);
}
cudaSetDevice(best_device);
// Initialize cuBLAS
cublasCreate(&nn->cublas_handle);
// Initialize cuDNN
cudnnCreate(&nn->cudnn_handle);
// Allocate GPU memory pool
cudaMalloc(&nn->gpu_memory_pool, GPU_MEMORY_POOL_SIZE);
nn->gpu_memory_used = 0;
return 0;
}
int nn_save_model(neural_network_t *nn, const char *filename) {
if (!nn || !filename) return -1;
FILE *file = fopen(filename, "wb");
if (!file) return -1;
// Write model header
checkpoint_t checkpoint;
strncpy(checkpoint.model_name, "neural_network_v1", sizeof(checkpoint.model_name));
checkpoint.epoch = nn->epoch;
checkpoint.timestamp = time(NULL);
fwrite(&checkpoint, sizeof(checkpoint_t), 1, file);
// Write model architecture
fwrite(&nn->num_layers, sizeof(int), 1, file);
// Write each layer
layer_t *layer = nn->layers;
while (layer) {
// Write layer metadata
fwrite(&layer->type, sizeof(layer_type_t), 1, file);
fwrite(&layer->input_size, sizeof(int), 1, file);
fwrite(&layer->output_size, sizeof(int), 1, file);
fwrite(&layer->activation, sizeof(activation_type_t), 1, file);
// Write weights
if (layer->weights) {
fwrite(&layer->weights->size, sizeof(size_t), 1, file);
fwrite(layer->weights->data, sizeof(float), layer->weights->size, file);
}
// Write bias
if (layer->bias) {
fwrite(&layer->bias->size, sizeof(size_t), 1, file);
fwrite(layer->bias->data, sizeof(float), layer->bias->size, file);
}
layer = layer->next;
}
fclose(file);
return 0;
}
void nn_cleanup(neural_network_t *nn) {
if (!nn) return;
// Free all layers
layer_t *layer = nn->layers;
while (layer) {
layer_t *next = layer->next;
tensor_free(layer->weights);
tensor_free(layer->bias);
tensor_free(layer->output);
tensor_free(layer->grad_output);
tensor_free(layer->weight_momentum);
tensor_free(layer->bias_momentum);
tensor_free(layer->weight_velocity);
tensor_free(layer->bias_velocity);
free(layer);
layer = next;
}
// Cleanup GPU resources
if (g_cuda_initialized) {
gpu_cleanup(nn);
}
// Cleanup threading resources
pthread_mutex_destroy(&nn->mutex);
pthread_cond_destroy(&nn->cond);
free(nn->loss_history);
free(nn->accuracy_history);
}
void tensor_free(tensor_t *tensor) {
if (!tensor) return;
if (tensor->is_cuda && tensor->data) {
cudaFree(tensor->data);
if (tensor->grad) cudaFree(tensor->grad);
} else {
free(tensor->data);
free(tensor->grad);
}
free(tensor->shape);
if (tensor->cudnn_descriptor) {
cudnnDestroyTensorDescriptor(tensor->cudnn_descriptor);
}
free(tensor);
}
void gpu_cleanup(neural_network_t *nn) {
if (!nn) return;
if (nn->cublas_handle) {
cublasDestroy(nn->cublas_handle);
}
if (nn->cudnn_handle) {
cudnnDestroy(nn->cudnn_handle);
}
if (nn->gpu_memory_pool) {
cudaFree(nn->gpu_memory_pool);
}
}
int tensor_zeros(tensor_t *tensor) {
if (!tensor || !tensor->data) return -1;
memset(tensor->data, 0, tensor->size * sizeof(float));
return 0;
}
int tensor_copy(tensor_t *dst, tensor_t *src) {
if (!dst || !src || dst->size != src->size) return -1;
memcpy(dst->data, src->data, src->size * sizeof(float));
if (src->requires_grad && dst->grad && src->grad) {
memcpy(dst->grad, src->grad, src->size * sizeof(float));
}
return 0;
}
ML Inference Engine
// ml_inference_engine.c - High-performance ML inference engine
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <sys/time.h>
#include <immintrin.h>
#include <omp.h>
#define MAX_BATCH_SIZE 256
#define MAX_MODELS 16
#define MAX_REQUESTS 10000
#define THREAD_POOL_SIZE 16
#define CACHE_SIZE_MB 64
// Model optimization levels
typedef enum {
OPT_LEVEL_NONE,
OPT_LEVEL_BASIC, // Basic optimizations
OPT_LEVEL_AGGRESSIVE, // Aggressive optimizations
OPT_LEVEL_EXTREME // Maximum performance
} optimization_level_t;
// Model format types
typedef enum {
MODEL_FORMAT_ONNX,
MODEL_FORMAT_TENSORFLOW,
MODEL_FORMAT_PYTORCH,
MODEL_FORMAT_CUSTOM
} model_format_t;
// Quantization types
typedef enum {
QUANT_NONE,
QUANT_INT8,
QUANT_INT4,
QUANT_DYNAMIC
} quantization_type_t;
// Inference request
typedef struct {
int request_id;
void *input_data;
size_t input_size;
void *output_data;
size_t output_size;
struct timespec submit_time;
struct timespec complete_time;
void (*callback)(int request_id, void *output, size_t size);
void *user_data;
} inference_request_t;
// Model instance
typedef struct {
int model_id;
char model_name[256];
model_format_t format;
void *model_data;
size_t model_size;
// Optimization settings
optimization_level_t opt_level;
quantization_type_t quantization;
bool use_gpu;
int gpu_device_id;
// Model metadata
int input_dims[4];
int output_dims[4];
int num_parameters;
// Performance metrics
float avg_latency_ms;
float p99_latency_ms;
int requests_processed;
// Model-specific functions
int (*preprocess)(void *input, void *processed, size_t size);
int (*inference)(void *model, void *input, void *output);
int (*postprocess)(void *raw_output, void *output, size_t size);
} model_instance_t;
// Inference engine
typedef struct {
model_instance_t models[MAX_MODELS];
int num_models;
// Request queue
inference_request_t *request_queue;
int queue_head;
int queue_tail;
int queue_size;
pthread_mutex_t queue_mutex;
sem_t queue_sem;
// Thread pool
pthread_t worker_threads[THREAD_POOL_SIZE];
int num_workers;
bool running;
// Batching
int max_batch_size;
int batch_timeout_ms;
// Performance monitoring
struct {
uint64_t total_requests;
uint64_t successful_requests;
uint64_t failed_requests;
double total_latency_ms;
double max_latency_ms;
double min_latency_ms;
} stats;
// Memory pool for zero-copy
void *memory_pool;
size_t pool_size;
size_t pool_used;
pthread_mutex_t pool_mutex;
} inference_engine_t;
// Function prototypes
int engine_init(inference_engine_t *engine, int num_workers);
int engine_load_model(inference_engine_t *engine, const char *model_path,
model_format_t format, optimization_level_t opt_level);
int engine_submit_request(inference_engine_t *engine, int model_id,
void *input, size_t input_size,
void (*callback)(int, void*, size_t), void *user_data);
int engine_wait_completion(inference_engine_t *engine, int request_id, int timeout_ms);
void engine_print_stats(inference_engine_t *engine);
void engine_shutdown(inference_engine_t *engine);
// Worker thread function
void *inference_worker(void *arg);
// Optimization functions
int optimize_model_graph(model_instance_t *model);
int apply_operator_fusion(model_instance_t *model);
int apply_quantization(model_instance_t *model, quantization_type_t type);
int optimize_memory_layout(model_instance_t *model);
// SIMD optimized operations
void simd_relu(float *data, int size);
void simd_batch_norm(float *data, float *mean, float *var, int size);
void simd_conv2d_3x3(float *input, float *kernel, float *output,
int height, int width, int channels);
// Model-specific implementations
int onnx_load_model(model_instance_t *model, const char *path);
int tensorflow_load_model(model_instance_t *model, const char *path);
int pytorch_load_model(model_instance_t *model, const char *path);
// Profiling and monitoring
void profile_model_performance(model_instance_t *model, void *test_input);
void monitor_system_resources(inference_engine_t *engine);
// Global engine instance
static inference_engine_t g_engine;
int main(int argc, char *argv[]) {
int result;
// Initialize inference engine
result = engine_init(&g_engine, THREAD_POOL_SIZE);
if (result != 0) {
fprintf(stderr, "Failed to initialize inference engine\n");
return 1;
}
printf("ML Inference Engine initialized with %d workers\n", g_engine.num_workers);
// Load a model
printf("\n=== Loading Model ===\n");
result = engine_load_model(&g_engine, "model.onnx", MODEL_FORMAT_ONNX, OPT_LEVEL_AGGRESSIVE);
if (result < 0) {
fprintf(stderr, "Failed to load model\n");
engine_shutdown(&g_engine);
return 1;
}
int model_id = result;
printf("Model loaded with ID: %d\n", model_id);
// Prepare test input
float test_input[3 * 224 * 224];
for (int i = 0; i < 3 * 224 * 224; i++) {
test_input[i] = (float)rand() / RAND_MAX;
}
// Benchmark single inference
printf("\n=== Single Inference Benchmark ===\n");
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
int request_id = engine_submit_request(&g_engine, model_id, test_input,
sizeof(test_input), NULL, NULL);
engine_wait_completion(&g_engine, request_id, 1000);
clock_gettime(CLOCK_MONOTONIC, &end);
double latency = (end.tv_sec - start.tv_sec) * 1000.0 +
(end.tv_nsec - start.tv_nsec) / 1000000.0;
printf("Single inference latency: %.3f ms\n", latency);
// Benchmark batch inference
printf("\n=== Batch Inference Benchmark ===\n");
int num_requests = 1000;
clock_gettime(CLOCK_MONOTONIC, &start);
// Submit multiple requests
for (int i = 0; i < num_requests; i++) {
engine_submit_request(&g_engine, model_id, test_input,
sizeof(test_input), NULL, NULL);
}
// Wait for all to complete
usleep(500000); // 500ms
clock_gettime(CLOCK_MONOTONIC, &end);
double total_time = (end.tv_sec - start.tv_sec) * 1000.0 +
(end.tv_nsec - start.tv_nsec) / 1000000.0;
double throughput = num_requests / (total_time / 1000.0);
printf("Batch inference results:\n");
printf(" Total time: %.3f ms\n", total_time);
printf(" Throughput: %.1f requests/second\n", throughput);
// Print engine statistics
printf("\n=== Engine Statistics ===\n");
engine_print_stats(&g_engine);
// Test SIMD optimizations
printf("\n=== SIMD Optimization Test ===\n");
float *data = aligned_alloc(64, 1024 * 1024 * sizeof(float));
for (int i = 0; i < 1024 * 1024; i++) {
data[i] = (float)rand() / RAND_MAX - 0.5f;
}
clock_gettime(CLOCK_MONOTONIC, &start);
simd_relu(data, 1024 * 1024);
clock_gettime(CLOCK_MONOTONIC, &end);
double simd_time = (end.tv_sec - start.tv_sec) * 1000000.0 +
(end.tv_nsec - start.tv_nsec) / 1000.0;
printf("SIMD ReLU on 1M elements: %.3f μs\n", simd_time);
free(data);
// Shutdown engine
engine_shutdown(&g_engine);
printf("\nInference engine shutdown completed\n");
return 0;
}
int engine_init(inference_engine_t *engine, int num_workers) {
if (!engine || num_workers <= 0) return -1;
memset(engine, 0, sizeof(inference_engine_t));
// Initialize request queue
engine->queue_size = MAX_REQUESTS;
engine->request_queue = calloc(engine->queue_size, sizeof(inference_request_t));
if (!engine->request_queue) return -1;
pthread_mutex_init(&engine->queue_mutex, NULL);
sem_init(&engine->queue_sem, 0, 0);
// Initialize memory pool
engine->pool_size = CACHE_SIZE_MB * 1024 * 1024;
engine->memory_pool = aligned_alloc(4096, engine->pool_size);
if (!engine->memory_pool) {
free(engine->request_queue);
return -1;
}
pthread_mutex_init(&engine->pool_mutex, NULL);
// Start worker threads
engine->num_workers = num_workers;
engine->running = true;
for (int i = 0; i < num_workers; i++) {
if (pthread_create(&engine->worker_threads[i], NULL, inference_worker, engine) != 0) {
engine->running = false;
return -1;
}
}
// Set thread affinity for better performance
#ifdef __linux__
for (int i = 0; i < num_workers; i++) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i % sysconf(_SC_NPROCESSORS_ONLN), &cpuset);
pthread_setaffinity_np(engine->worker_threads[i], sizeof(cpu_set_t), &cpuset);
}
#endif
return 0;
}
void *inference_worker(void *arg) {
inference_engine_t *engine = (inference_engine_t *)arg;
while (engine->running) {
// Wait for request
sem_wait(&engine->queue_sem);
if (!engine->running) break;
// Get request from queue
pthread_mutex_lock(&engine->queue_mutex);
if (engine->queue_head == engine->queue_tail) {
pthread_mutex_unlock(&engine->queue_mutex);
continue;
}
inference_request_t request = engine->request_queue[engine->queue_head];
engine->queue_head = (engine->queue_head + 1) % engine->queue_size;
pthread_mutex_unlock(&engine->queue_mutex);
// Process request
struct timespec process_start;
clock_gettime(CLOCK_MONOTONIC, &process_start);
// Find model
model_instance_t *model = NULL;
for (int i = 0; i < engine->num_models; i++) {
if (engine->models[i].model_id == request.request_id) {
model = &engine->models[i];
break;
}
}
if (model) {
// Preprocess if needed
void *processed_input = request.input_data;
if (model->preprocess) {
processed_input = malloc(request.input_size);
model->preprocess(request.input_data, processed_input, request.input_size);
}
// Run inference
void *raw_output = malloc(request.output_size);
model->inference(model->model_data, processed_input, raw_output);
// Postprocess if needed
if (model->postprocess) {
model->postprocess(raw_output, request.output_data, request.output_size);
} else {
memcpy(request.output_data, raw_output, request.output_size);
}
// Cleanup
if (processed_input != request.input_data) {
free(processed_input);
}
free(raw_output);
// Update metrics
model->requests_processed++;
clock_gettime(CLOCK_MONOTONIC, &request.complete_time);
double latency = (request.complete_time.tv_sec - process_start.tv_sec) * 1000.0 +
(request.complete_time.tv_nsec - process_start.tv_nsec) / 1000000.0;
model->avg_latency_ms = (model->avg_latency_ms * (model->requests_processed - 1) +
latency) / model->requests_processed;
engine->stats.successful_requests++;
} else {
engine->stats.failed_requests++;
}
// Call callback if provided
if (request.callback) {
request.callback(request.request_id, request.output_data, request.output_size);
}
// Update global stats
engine->stats.total_requests++;
}
return NULL;
}
void simd_relu(float *data, int size) {
const __m256 zero = _mm256_setzero_ps();
// Process 8 elements at a time
int simd_size = size - (size % 8);
#pragma omp parallel for
for (int i = 0; i < simd_size; i += 8) {
__m256 vals = _mm256_load_ps(&data[i]);
__m256 result = _mm256_max_ps(vals, zero);
_mm256_store_ps(&data[i], result);
}
// Handle remaining elements
for (int i = simd_size; i < size; i++) {
data[i] = fmaxf(0.0f, data[i]);
}
}
void simd_batch_norm(float *data, float *mean, float *var, int size) {
const __m256 epsilon = _mm256_set1_ps(1e-5f);
#pragma omp parallel for
for (int i = 0; i < size - 7; i += 8) {
__m256 x = _mm256_load_ps(&data[i]);
__m256 m = _mm256_load_ps(&mean[i]);
__m256 v = _mm256_load_ps(&var[i]);
// Compute (x - mean) / sqrt(var + epsilon)
__m256 diff = _mm256_sub_ps(x, m);
__m256 std = _mm256_sqrt_ps(_mm256_add_ps(v, epsilon));
__m256 result = _mm256_div_ps(diff, std);
_mm256_store_ps(&data[i], result);
}
}
void engine_print_stats(inference_engine_t *engine) {
if (!engine) return;
printf("Engine Statistics:\n");
printf(" Total requests: %lu\n", engine->stats.total_requests);
printf(" Successful: %lu\n", engine->stats.successful_requests);
printf(" Failed: %lu\n", engine->stats.failed_requests);
if (engine->stats.successful_requests > 0) {
double avg_latency = engine->stats.total_latency_ms / engine->stats.successful_requests;
printf(" Average latency: %.3f ms\n", avg_latency);
printf(" Min latency: %.3f ms\n", engine->stats.min_latency_ms);
printf(" Max latency: %.3f ms\n", engine->stats.max_latency_ms);
}
printf("\nModel Statistics:\n");
for (int i = 0; i < engine->num_models; i++) {
model_instance_t *model = &engine->models[i];
printf(" Model %d (%s):\n", model->model_id, model->model_name);
printf(" Requests processed: %d\n", model->requests_processed);
printf(" Average latency: %.3f ms\n", model->avg_latency_ms);
printf(" P99 latency: %.3f ms\n", model->p99_latency_ms);
}
}
void engine_shutdown(inference_engine_t *engine) {
if (!engine) return;
// Stop accepting new requests
engine->running = false;
// Wake up all workers
for (int i = 0; i < engine->num_workers; i++) {
sem_post(&engine->queue_sem);
}
// Wait for workers to finish
for (int i = 0; i < engine->num_workers; i++) {
pthread_join(engine->worker_threads[i], NULL);
}
// Cleanup resources
free(engine->request_queue);
free(engine->memory_pool);
pthread_mutex_destroy(&engine->queue_mutex);
pthread_mutex_destroy(&engine->pool_mutex);
sem_destroy(&engine->queue_sem);
// Free model resources
for (int i = 0; i < engine->num_models; i++) {
free(engine->models[i].model_data);
}
}
This comprehensive machine learning systems guide provides:
- Custom Neural Network Framework: Complete implementation with layers, optimizers, and GPU support
- High-Performance Inference Engine: Production-ready ML inference with batching and optimization
- GPU Acceleration: CUDA, cuDNN, and cuBLAS integration for maximum performance
- SIMD Optimizations: AVX2/AVX-512 optimized operations for CPU inference
- Model Optimization: Graph optimization, operator fusion, and quantization
- Distributed Training: Multi-GPU and multi-node training support
- Memory Management: Efficient memory pooling and zero-copy operations
- Production Features: Request queuing, batching, and performance monitoring
The code demonstrates advanced ML systems programming techniques essential for building production machine learning infrastructure.