Concepts Snippets
This chapter comes after the abstract concepts on purpose.
The goal is to see how the concepts look in Rust once the theory is already clear. These snippets are intentionally direct. Some are minimal, and some combine multiple abstractions in one place so you can see how the pieces interact in a real design.
Queue with VecDeque<T>
This is the direct in-memory queue shape.
use std::collections::VecDeque; fn main() { let mut queue: VecDeque<&str> = VecDeque::new(); queue.push_back("job-1"); queue.push_back("job-2"); queue.push_back("job-3"); while let Some(job) = queue.pop_front() { println!("processing {job}"); } }
Per-client tracking with HashMap<K, V>
This is the core mapping shape: map a unit to its current state.
use std::collections::HashMap; fn main() { let mut requests_per_client: HashMap<&str, u32> = HashMap::new(); *requests_per_client.entry("alice").or_insert(0) += 1; *requests_per_client.entry("alice").or_insert(0) += 1; *requests_per_client.entry("bob").or_insert(0) += 1; println!("{requests_per_client:?}"); }
Queue plus map
Many systems need both ordering and lookup:
- a queue for pending work
- a map for tracking metadata about that work
use std::collections::{HashMap, VecDeque}; fn main() { let mut pending: VecDeque<u64> = VecDeque::from([101, 102, 103]); let mut status: HashMap<u64, &'static str> = HashMap::new(); while let Some(job_id) = pending.pop_front() { status.insert(job_id, "running"); println!("job {job_id} is {}", status[&job_id]); status.insert(job_id, "done"); } println!("{status:?}"); }
Threaded channel
This is the standard producer-consumer handoff in threaded Rust.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel::<String>(); let producer = thread::spawn(move || { tx.send("job-1".to_string()).unwrap(); tx.send("job-2".to_string()).unwrap(); }); for message in rx { println!("received {message}"); } producer.join().unwrap(); }
Async channel with Tokio
This is the async version of the same delivery idea.
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::<String>(8); let producer = tokio::spawn(async move { tx.send("job-1".to_string()).await.unwrap(); tx.send("job-2".to_string()).await.unwrap(); }); while let Some(message) = rx.recv().await { println!("received {message}"); } producer.await.unwrap(); }
Bounded channel and backpressure
When capacity is small, the sender cannot run ahead forever.
use tokio::sync::mpsc; use tokio::time::{sleep, Duration, Instant}; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::<u32>(1); let sender = tokio::spawn(async move { for i in 0..3 { let start = Instant::now(); tx.send(i).await.unwrap(); println!("sent {i} after waiting {:?}", start.elapsed()); } }); let receiver = tokio::spawn(async move { while let Some(value) = rx.recv().await { println!("handling {value}"); sleep(Duration::from_millis(200)).await; } }); sender.await.unwrap(); receiver.await.unwrap(); }
Shared state with Arc<Mutex<T>>
Use this when multiple threads need coordinated mutable access to the same value.
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = Vec::new(); for _ in 0..4 { let shared = Arc::clone(&counter); handles.push(thread::spawn(move || { let mut guard = shared.lock().unwrap(); *guard += 1; })); } for handle in handles { handle.join().unwrap(); } println!("final count = {}", *counter.lock().unwrap()); }
Worker pool shape
This is the usual scratch-built shape:
- producers submit jobs
- a queue or channel holds pending work
- workers repeatedly pull and process
use std::sync::{mpsc, Arc, Mutex}; use std::thread; type Job = u32; fn main() { let (tx, rx) = mpsc::channel::<Job>(); let shared_rx = Arc::new(Mutex::new(rx)); let mut handles = Vec::new(); for worker_id in 0..2 { let rx = Arc::clone(&shared_rx); handles.push(thread::spawn(move || { loop { let message = rx.lock().unwrap().recv(); match message { Ok(job) => println!("worker {worker_id} processed job {job}"), Err(_) => break, } } })); } for job in 1..=5 { tx.send(job).unwrap(); } drop(tx); for handle in handles { handle.join().unwrap(); } }
Simple rate limiter state
This is the direct state model behind a fixed-window limiter.
use std::collections::HashMap; use std::time::{Duration, Instant}; #[derive(Debug)] struct Entry { window_start: Instant, count: u32, } fn allow( state: &mut HashMap<String, Entry>, client: &str, limit: u32, window: Duration, ) -> bool { let now = Instant::now(); let entry = state.entry(client.to_string()).or_insert(Entry { window_start: now, count: 0, }); if now.duration_since(entry.window_start) >= window { entry.window_start = now; entry.count = 0; } if entry.count >= limit { return false; } entry.count += 1; true } fn main() { let mut state = HashMap::new(); for _ in 0..4 { println!("{}", allow(&mut state, "client-a", 3, Duration::from_secs(1))); } }
State machine with enum
This is the clean Rust shape for explicit states.
enum JobState { Pending, Running, Done, Failed(String), } fn describe(state: &JobState) -> &str { match state { JobState::Pending => "pending", JobState::Running => "running", JobState::Done => "done", JobState::Failed(_) => "failed", } } fn main() { let state = JobState::Running; println!("{}", describe(&state)); }
Error states with Result<T, E>
This is the same idea applied to success and failure.
fn parse_port(input: &str) -> Result<u16, std::num::ParseIntError> { input.parse::<u16>() } fn main() { match parse_port("8080") { Ok(port) => println!("valid port: {port}"), Err(err) => println!("invalid port: {err}"), } }
Putting multiple abstractions together
This combines:
- a channel for delivery
- a queue-like stream of work
- a state enum for explicit system states
- a map for per-job tracking
use std::collections::HashMap; use tokio::sync::mpsc; #[derive(Debug)] enum JobState { Queued, Running, Done, } #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::<u64>(8); let mut jobs: HashMap<u64, JobState> = HashMap::new(); for job_id in 1..=3 { jobs.insert(job_id, JobState::Queued); tx.send(job_id).await.unwrap(); } drop(tx); while let Some(job_id) = rx.recv().await { jobs.insert(job_id, JobState::Running); println!("running job {job_id}"); jobs.insert(job_id, JobState::Done); } println!("{jobs:?}"); }
What to notice
- queues are about ordering
- maps are about association
- channels are about delivery across concurrency boundaries
- bounded capacity creates backpressure
- enums make states explicit
Result<T, E>makes success and error states explicitArc<Mutex<T>>is shared mutable state with coordinated access- these abstractions are often combined, not used in isolation