Building Resilient Rust Microservices with Circuit Breakers and Priority Queues
In modern distributed systems, microservices must cope with unreliable dependencies and varying workloads. Two patterns — circuit breaker and priority queues — help make services more resilient and responsive. This comprehensive guide explores how to build a Rust microservice that combines both, ensuring graceful degradation under failure and preferential handling of critical tasks.
Introduction
Microservices architecture brings significant advantages but also introduces complex failure modes. When services depend on one another, failures can cascade through the system. Similarly, when a service processes different types of workloads, we need a way to ensure that critical tasks receive preferential treatment.
In this article, we’ll implement two essential resilience patterns using Rust:
Circuit Breaker Pattern: Prevents cascading failures by detecting when a dependent service is unavailable and “breaking the circuit” to avoid repeated failed calls.
Priority Queue Pattern: Ensures that high-priority tasks are processed before lower-priority ones, even under heavy load.
By combining these patterns, we’ll create a microservice that can gracefully handle failures and prioritize critical work.
Section 1: Understanding the Resilience Patterns
Before diving into implementation, let’s understand how these patterns work and why they’re essential for reliable microservices.
The Circuit Breaker Pattern
The circuit breaker pattern, popularized by Michael Nygard in “Release It!”, helps prevent system failures from cascading when a dependent service becomes unavailable or slow. It works similarly to an electrical circuit breaker:
- Closed State (Normal Operation): Calls pass through to the dependent service
- Open State (Failure Mode): After detecting failures, calls immediately fail without attempting to reach the dependent service
- Half-Open State (Recovery): After a timeout, the circuit allows a limited number of test calls to determine if the service has recovered
This pattern is vital for building resilient microservices because it:
- Prevents overwhelming an already struggling service
- Fails fast when a dependency is down
- Provides time for the dependency to recover
- Reduces latency by avoiding timeout delays
The Priority Queue Pattern
The priority queue pattern ensures that critical tasks are processed before less important ones. Unlike a standard FIFO queue, a priority queue:
- Assigns a priority level to each task
- Orders tasks by their priority rather than arrival time
- Always processes the highest-priority task next
This pattern is essential when:
- Some requests are more critical than others
- System resources are limited
- You want to provide differentiated service levels
Section 2: Project Setup and Dependencies
Let’s start by setting up our Rust project and adding the necessary dependencies.
Creating a New Rust Project
cargo new resilient-service
cd resilient-service
Adding Dependencies to Cargo.toml
[package]
name = "resilient-service"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.28", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
warp = "0.3"
futures = "0.3"
uuid = { version = "1.3", features = ["v4", "serde"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
priority-queue = "1.3"
metrics = "0.21"
metrics-exporter-prometheus = "0.12"
thiserror = "1.0"
Our dependencies include:
- tokio: Asynchronous runtime for Rust
- reqwest: HTTP client for making outbound requests
- serde: Serialization/deserialization framework
- warp: Web server framework
- futures: Utilities for working with asynchronous code
- uuid: For generating unique identifiers
- tracing: For application logging and diagnostics
- priority-queue: For implementing the priority queue
- metrics: For collecting performance metrics
- thiserror: For ergonomic error handling
Section 3: Implementing the Circuit Breaker
The circuit breaker will protect our service from cascading failures when dependent services are unavailable.
Defining Circuit Breaker States and Errors
First, let’s define the possible states of our circuit breaker and the errors it can return:
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CircuitBreakerError<E> {
#[error("circuit breaker is open")]
Open,
#[error("underlying service error: {0}")]
ServiceError(E),
}
enum CircuitState {
Closed { failures: u32 },
Open { until: Instant },
HalfOpen,
}
pub struct CircuitBreaker {
state: Mutex<CircuitState>,
failure_threshold: u32,
reset_timeout: Duration,
}
Implementing the Circuit Breaker
Now, let’s implement the circuit breaker with methods to create it and execute calls through it:
impl CircuitBreaker {
pub fn new(failure_threshold: u32, reset_timeout: Duration) -> Self {
CircuitBreaker {
state: Mutex::new(CircuitState::Closed { failures: 0 }),
failure_threshold,
reset_timeout,
}
}
pub async fn call<F, T, E>(&self, f: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> futures::future::BoxFuture<'static, Result<T, E>>,
E: std::fmt::Debug,
{
let mut state = self.state.lock().await;
match &*state {
CircuitState::Open { until } if Instant::now() < *until => {
tracing::debug!("Circuit is open, failing fast");
return Err(CircuitBreakerError::Open);
}
CircuitState::Open { .. } => {
tracing::debug!("Circuit is changing from open to half-open");
*state = CircuitState::HalfOpen;
}
_ => {}
}
drop(state); // Release the lock before making the call
// Make the actual call to the protected service
match f().await {
Ok(result) => {
// On success, reset the circuit if it was half-open
let mut state = self.state.lock().await;
if matches!(*state, CircuitState::HalfOpen) {
tracing::info!("Circuit is closing after successful call in half-open state");
*state = CircuitState::Closed { failures: 0 };
} else if let CircuitState::Closed { failures } = &mut *state {
// Reset failure count on successful call
*failures = 0;
}
Ok(result)
}
Err(err) => {
// On failure, update the circuit state
let mut state = self.state.lock().await;
match &mut *state {
CircuitState::Closed { failures } => {
*failures += 1;
tracing::debug!("Circuit in closed state, failure count: {}", *failures);
if *failures >= self.failure_threshold {
let until = Instant::now() + self.reset_timeout;
tracing::warn!(
"Circuit is opening after {} consecutive failures",
*failures
);
*state = CircuitState::Open { until };
}
}
CircuitState::HalfOpen => {
let until = Instant::now() + self.reset_timeout;
tracing::warn!("Circuit is re-opening after test call failed in half-open state");
*state = CircuitState::Open { until };
}
_ => {}
}
Err(CircuitBreakerError::ServiceError(err))
}
}
}
pub async fn get_state(&self) -> String {
let state = self.state.lock().await;
match &*state {
CircuitState::Closed { failures } => {
format!("Closed (failures: {})", failures)
}
CircuitState::Open { until } => {
let now = Instant::now();
if &now < until {
format!(
"Open (reopening in {} ms)",
until.duration_since(now).as_millis()
)
} else {
"Open (ready to test)".to_string()
}
}
CircuitState::HalfOpen => "Half-Open".to_string(),
}
}
}
Section 4: Building the Priority Queue System
Next, let’s implement the priority queue to ensure that critical tasks get processed first.
Defining Job Types
First, we’ll define our job structure and the queue that will manage it:
use priority_queue::PriorityQueue;
use serde::{Deserialize, Serialize};
use std::cmp::{Ordering, Reverse};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct Job {
pub id: Uuid,
pub created_at: chrono::DateTime<chrono::Utc>,
pub payload: serde_json::Value,
pub priority: u32,
}
impl PartialOrd for Job {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Job {
fn cmp(&self, other: &Self) -> Ordering {
// First compare by priority (high to low)
let prio_cmp = other.priority.cmp(&self.priority);
if prio_cmp != Ordering::Equal {
return prio_cmp;
}
// If priorities are equal, compare by creation time (old to new)
self.created_at.cmp(&other.created_at)
}
}
Notice that we implement Ord for Job to compare first by priority (reversed, so higher numbers are processed first) and then by creation time (so older jobs are processed first when priorities are equal).
Implementing the Job Queue
Now, let’s implement the priority queue itself:
pub struct JobQueue {
queue: Mutex<PriorityQueue<Job, u32>>,
metrics: metrics::Counter,
}
impl JobQueue {
pub fn new() -> Self {
JobQueue {
queue: Mutex::new(PriorityQueue::new()),
metrics: metrics::counter!("job_queue_size"),
}
}
pub async fn push(&self, job: Job) {
let mut queue = self.queue.lock().await;
queue.push(job.clone(), job.priority);
self.metrics.increment(1);
tracing::debug!(
job_id = %job.id,
priority = job.priority,
"Added job to priority queue"
);
}
pub async fn pop(&self) -> Option<Job> {
let mut queue = self.queue.lock().await;
if let Some((job, _)) = queue.pop() {
self.metrics.decrement(1);
tracing::debug!(
job_id = %job.id,
priority = job.priority,
"Removed job from priority queue"
);
Some(job)
} else {
None
}
}
pub async fn size(&self) -> usize {
let queue = self.queue.lock().await;
queue.len()
}
}
Section 5: Creating the HTTP API with Warp
Now, let’s build the HTTP API for our service using Warp:
use std::convert::Infallible;
use std::sync::Arc;
use warp::{Filter, Rejection, Reply};
async fn handle_enqueue(
json: serde_json::Value,
job_queue: Arc<JobQueue>,
breaker: Arc<CircuitBreaker>,
) -> Result<impl Reply, Rejection> {
// Check if the payload has an "emergency" flag
let is_emergency = json
.get("emergency")
.and_then(|v| v.as_bool())
.unwrap_or(false);
// Assign priority based on the emergency flag
let priority = if is_emergency { 10 } else { 1 };
// Try to validate with external service, protected by circuit breaker
let validation_result = breaker
.call(|| {
Box::pin(async {
let client = reqwest::Client::new();
let resp = client
.post("http://validation-service/validate")
.json(&json)
.timeout(Duration::from_millis(500))
.send()
.await?
.error_for_status()?;
Ok::<bool, reqwest::Error>(resp.status().is_success())
})
})
.await;
match validation_result {
Ok(_) => {
// Create and enqueue the job
let job = Job {
id: Uuid::new_v4(),
created_at: chrono::Utc::now(),
payload: json,
priority,
};
job_queue.push(job.clone()).await;
// Return the job info
Ok(warp::reply::json(&job))
}
Err(CircuitBreakerError::Open) => {
// Circuit is open, return a 503 Service Unavailable
tracing::warn!("Circuit breaker is open, returning 503");
Ok(warp::reply::with_status(
warp::reply::json(&serde_json::json!({
"error": "Service temporarily unavailable, try again later",
"code": "CIRCUIT_OPEN"
})),
warp::http::StatusCode::SERVICE_UNAVAILABLE,
))
}
Err(CircuitBreakerError::ServiceError(err)) => {
// Validation service returned an error
tracing::error!("Validation service error: {:?}", err);
Ok(warp::reply::with_status(
warp::reply::json(&serde_json::json!({
"error": "Failed to validate job",
"code": "VALIDATION_ERROR"
})),
warp::http::StatusCode::BAD_GATEWAY,
))
}
}
}
// Helper function to share state with route handlers
fn with<T: Clone + Send>(
item: T,
) -> impl Filter<Extract = (T,), Error = Infallible> + Clone {
warp::any().map(move || item.clone())
}
// Route to check circuit breaker status
async fn handle_circuit_status(
breaker: Arc<CircuitBreaker>,
) -> Result<impl Reply, Rejection> {
let state = breaker.get_state().await;
Ok(warp::reply::json(&serde_json::json!({
"state": state
})))
}
// Route to check queue status
async fn handle_queue_status(
job_queue: Arc<JobQueue>,
) -> Result<impl Reply, Rejection> {
let size = job_queue.size().await;
Ok(warp::reply::json(&serde_json::json!({
"size": size
})))
}
Section 6: Creating the Worker Process
Next, we need a worker process that consumes jobs from the queue:
async fn process_job(job: Job) -> Result<(), Box<dyn std::error::Error>> {
tracing::info!(
job_id = %job.id,
priority = job.priority,
"Processing job"
);
// Simulate processing time based on job complexity
// Higher priority jobs might be simpler and faster to process
let complexity = job.payload.get("complexity")
.and_then(|v| v.as_f64())
.unwrap_or(1.0);
let processing_time = Duration::from_millis((100.0 * complexity) as u64);
tokio::time::sleep(processing_time).await;
// Simulate occasional failures
if rand::random::<f64>() < 0.05 {
tracing::warn!(job_id = %job.id, "Job processing failed");
return Err("Job processing failed".into());
}
tracing::info!(job_id = %job.id, "Job processed successfully");
Ok(())
}
async fn worker_loop(job_queue: Arc<JobQueue>) {
loop {
// Try to get a job from the queue
if let Some(job) = job_queue.pop().await {
let job_id = job.id;
// Process the job, with metrics
let timer = metrics::histogram!("job_processing_time_ms").start();
let result = process_job(job).await;
let elapsed = timer.stop();
match result {
Ok(_) => {
metrics::counter!("jobs_processed_success").increment(1);
}
Err(e) => {
tracing::error!(job_id = %job_id, error = ?e, "Job processing error");
metrics::counter!("jobs_processed_failure").increment(1);
}
}
} else {
// No jobs available, sleep a bit to avoid CPU spinning
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
Section 7: Putting It All Together
Finally, let’s put everything together in our main.rs file:
use std::sync::Arc;
use std::time::Duration;
use warp::Filter;
mod circuit_breaker;
mod job_queue;
use circuit_breaker::CircuitBreaker;
use job_queue::{Job, JobQueue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing for logging
tracing_subscriber::fmt::init();
// Initialize metrics for monitoring
let metrics_recorder = metrics_exporter_prometheus::PrometheusBuilder::new()
.with_endpoint("/metrics")
.build()?;
metrics_recorder.install()?;
// Create shared resources
let job_queue = Arc::new(JobQueue::new());
let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(30)));
// Create HTTP routes
let enqueue_route = warp::path("jobs")
.and(warp::post())
.and(warp::body::json())
.and(with(job_queue.clone()))
.and(with(circuit_breaker.clone()))
.and_then(handle_enqueue);
let circuit_status_route = warp::path("circuit")
.and(warp::get())
.and(with(circuit_breaker.clone()))
.and_then(handle_circuit_status);
let queue_status_route = warp::path("queue")
.and(warp::get())
.and(with(job_queue.clone()))
.and_then(handle_queue_status);
// Combine all routes
let routes = enqueue_route
.or(circuit_status_route)
.or(queue_status_route)
.with(warp::cors().allow_any_origin())
.with(warp::log("api"));
// Start the worker in a background task
let worker_queue = job_queue.clone();
tokio::spawn(async move {
worker_loop(worker_queue).await;
});
// Start the HTTP server
tracing::info!("Starting server on 0.0.0.0:3030");
warp::serve(routes).run(([0, 0, 0, 0], 3030)).await;
Ok(())
}
Section 8: Testing the Service
Let’s create some test scenarios to verify our service’s resilience:
Testing the Circuit Breaker
We can simulate a failing dependency and verify that the circuit opens after multiple failures:
# Send a request when the validation service is down
curl -X POST http://localhost:3030/jobs \
-H "Content-Type: application/json" \
-d '{"task": "important task", "complexity": 2}'
# Check the circuit status (should be Open after multiple failures)
curl http://localhost:3030/circuit
Testing Priority Processing
We can send multiple jobs with different priorities and verify that high-priority jobs are processed first:
# Send a regular job
curl -X POST http://localhost:3030/jobs \
-H "Content-Type: application/json" \
-d '{"task": "normal task", "complexity": 2}'
# Send an emergency job (should be processed first)
curl -X POST http://localhost:3030/jobs \
-H "Content-Type: application/json" \
-d '{"task": "critical task", "complexity": 2, "emergency": true}'
# Check the queue status
curl http://localhost:3030/queue
Section 9: Advanced Enhancements
Once you have the basic system working, consider these enhancements for a production-grade service:
Persistent Queue Storage
For durability, consider persisting jobs to disk or a database. This ensures that jobs aren’t lost if the service restarts:
use sqlx::{PgPool, postgres::PgPoolOptions};
pub struct PersistentJobQueue {
memory_queue: JobQueue,
db_pool: PgPool,
}
impl PersistentJobQueue {
pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
// Create database connection pool
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(database_url)
.await?;
// Initialize the in-memory queue
let memory_queue = JobQueue::new();
// Create the queue table if it doesn't exist
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS jobs (
id UUID PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
payload JSONB NOT NULL,
priority INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
)
"#,
)
.execute(&pool)
.await?;
// Load pending jobs from the database into memory
let jobs = sqlx::query_as!(
StoredJob,
r#"
SELECT id, created_at, payload, priority
FROM jobs
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
"#,
)
.fetch_all(&pool)
.await?;
let queue = Self {
memory_queue,
db_pool: pool,
};
// Restore jobs to in-memory queue
for stored_job in jobs {
let job = Job {
id: stored_job.id,
created_at: stored_job.created_at,
payload: stored_job.payload,
priority: stored_job.priority as u32,
};
queue.memory_queue.push(job).await;
}
Ok(queue)
}
pub async fn push(&self, job: Job) -> Result<(), sqlx::Error> {
// Store in database first
sqlx::query!(
r#"
INSERT INTO jobs (id, created_at, payload, priority, status)
VALUES ($1, $2, $3, $4, 'pending')
"#,
job.id,
job.created_at,
job.payload,
job.priority as i32,
)
.execute(&self.db_pool)
.await?;
// Then add to in-memory queue
self.memory_queue.push(job).await;
Ok(())
}
pub async fn pop(&self) -> Option<Job> {
// Get from in-memory queue
if let Some(job) = self.memory_queue.pop().await {
// Update status in database
let _ = sqlx::query!(
r#"
UPDATE jobs
SET status = 'processing'
WHERE id = $1
"#,
job.id,
)
.execute(&self.db_pool)
.await;
Some(job)
} else {
None
}
}
pub async fn complete(&self, job_id: Uuid, success: bool) -> Result<(), sqlx::Error> {
let status = if success { "completed" } else { "failed" };
sqlx::query!(
r#"
UPDATE jobs
SET status = $1
WHERE id = $2
"#,
status,
job_id,
)
.execute(&self.db_pool)
.await?;
Ok(())
}
}
Exponential Backoff with Jitter
Improve the circuit breaker by adding exponential backoff with jitter for more effective recovery:
pub struct CircuitBreaker {
state: Mutex<CircuitState>,
failure_threshold: u32,
initial_backoff: Duration,
max_backoff: Duration,
current_backoff: Mutex<Duration>,
}
impl CircuitBreaker {
pub fn new(
failure_threshold: u32,
initial_backoff: Duration,
max_backoff: Duration,
) -> Self {
CircuitBreaker {
state: Mutex::new(CircuitState::Closed { failures: 0 }),
failure_threshold,
initial_backoff,
max_backoff,
current_backoff: Mutex::new(initial_backoff),
}
}
async fn calculate_next_backoff(&self) -> Duration {
let mut current = self.current_backoff.lock().await;
// Calculate next backoff with jitter (between 75% and 100% of calculated time)
let next_backoff = std::cmp::min(
*current * 2,
self.max_backoff
);
// Add jitter (between 75% and 100% of the backoff)
let jitter_factor = 0.75 + (rand::random::<f64>() * 0.25);
let with_jitter = Duration::from_millis(
(next_backoff.as_millis() as f64 * jitter_factor) as u64
);
*current = next_backoff;
with_jitter
}
async fn reset_backoff(&self) {
let mut current = self.current_backoff.lock().await;
*current = self.initial_backoff;
}
}
Adaptive Circuit Breaker
Make the circuit breaker more adaptive by considering response latency, not just failures:
enum CircuitState {
Closed {
failures: u32,
slow_calls: u32,
total_calls: u32,
},
Open { until: Instant },
HalfOpen,
}
pub struct AdaptiveCircuitBreaker {
state: Mutex<CircuitState>,
failure_threshold: u32,
slow_call_threshold: Duration,
slow_call_rate_threshold: f64, // e.g., 0.5 means 50% of calls are slow
reset_timeout: Duration,
}
impl AdaptiveCircuitBreaker {
// Implementation details similar to before, but tracking slow calls too
}
Conclusion
Building resilient microservices requires more than just writing correct code. It requires designing for failure and ensuring that your system can gracefully handle issues with dependencies and prioritize critical work.
By implementing the circuit breaker and priority queue patterns in your Rust microservices, you can:
- Prevent cascading failures when dependencies fail
- Ensure that your most critical tasks get processed first
- Fail fast and provide faster responses to users, even during partial outages
- Gradually recover from failures without overwhelming recovering services
These patterns are essential building blocks for any production-grade microservice architecture, especially in environments with strict reliability requirements.
The Rust programming language, with its strong type system, performance characteristics, and async support, provides an excellent foundation for implementing these resilience patterns efficiently and safely.
To further enhance your microservice’s resilience, consider exploring additional patterns such as bulkheads, health checks, timeouts, and retry mechanisms. Each pattern addresses different aspects of resilience, and when combined, they create a robust system capable of withstanding various failure modes.