Writing a Ring Buffer in Rust
A simple multi-threaded ring buffer implementation in rust.
A ring buffer aka circular buffer that holds some kind of data (https://en.wikipedia.org/wiki/Circular_buffer).
The requirements are that the buffer has some fixed size, and we have at least one consumer of events and one producer of events.
The consumer reads events if they exist in the buffer, and the producer will continue to produce events unless the buffer is full.
In this task, we seek to implement such a buffer in rust. We extend this to the multi-threaded version to deepen our understanding of rust and threading concepts in general.
You can see my complete implementation in this gist.
Ring Buffer
First, let's implement the actual structure for our ring buffer.
Struct
#[derive(Clone)]
struct Job {
job_id: u32,
}
struct RingBuffer {
buffer: Vec<Job>,
capacity: u32,
size: u32, // always <= capacity
write_index: u32,
read_index: u32,
}Here we have a buffer that we use to store objects, this is just a vector. We have a capacity which is effectively the max size. A size attribute to track the current size. We also need a read and write index to know where we are up to with current reads and writes.
Implementation
impl RingBuffer {
fn new(capacity: u32) -> RingBuffer {
let buffer = vec![Job { job_id: 0 }; capacity as usize]; // Initialize with capacity
RingBuffer {
buffer,
capacity,
size: 0,
write_index: 0,
read_index: 0,
}
}
fn add(&mut self, job: Job) -> bool {
if self.size >= self.capacity {
return false;
}
self.buffer[self.write_index as usize] = job;
self.write_index = (self.write_index + 1) % self.capacity;
self.size += 1;
true
}
fn read(&mut self) -> Option<&Job> {
if self.size == 0 {
return None;
}
let job = &self.buffer[self.read_index as usize];
self.read_index = (self.read_index + 1) % self.capacity;
self.size -= 1;
Some(job)
}
}In new, we need to simply create an empty buffer. In this case we create one with an empty first item.
For add, we need to check that there is room in the buffer. If there is, we add the item and increment the write index.
For read, it's basically the same. If there is an item to read, then we read it and increment the read index, decrement the size.
Processing
Single Threaded
Next we move to our main function, where we can show the single threaded version.
println!("Single Threaded version!");
let mut jq = RingBuffer::new(8);
// this is the single-threaded version
for i in 0..5 {
let j = Job { job_id: i };
if jq.add(j) {
println!("Added job with id: {i}");
}
}
// read all the jobs from the queue
while let Some(job) = jq.read() {
println!("Read job with id: {}", job.job_id);
}This is relatively straightforward. Write some items to the buffer and read them. Looks good to me.
Single Threaded version!
Added job with id: 0
Added job with id: 1
Added job with id: 2
Added job with id: 3
Added job with id: 4
Read job with id: 0
Read job with id: 1
Read job with id: 2
Read job with id: 3
Read job with id: 4Multi-Threaded
This is where things get a bit more complicated. How do we do multiple threads in rust?
We need a few things for this.
First, we create a shared struct.
struct Shared {
buf: Mutex<RingBuffer>, // protects the data
not_full: Condvar,
not_empty: Condvar,
}Here we have our RingBuffer from before, but this time it's wrapped in a Mutex. ThisMutex is used to ensure only a single thread can perform operations on the RingBuffer at a time.
We also introduce, not_full and not_empty. These are Condvar's. A Condvar is a variable that represents the ability to block a thread. For example we can wait until not_full is true to continue writing items, or wait until not_empty is true to continue reading items.
In Python you have threading.Condition, C/C++ has pthread_cond_t and Go has sync.Cond.
Now, we can create our shared object.
println!("Multithreaded version!");
let number_of_jobs = 20;
let num_producers = 2;
let num_consumers = 2;
// run the multi-threaded version
let shared = Arc::new(Shared {
buf: Mutex::new(RingBuffer::new(5)),
not_full: Condvar::new(),
not_empty: Condvar::new(),
});It's important that we wrap our shared object in an Arc (Atomically Reference Counted). This property enables us to share this object with multiple threads. In our case, we have a read and write index attributes. Using the Arc, we can share this object among our multiple threads.
Producer
let mut producer_handles = Vec::new();
for prod_id in 0..num_producers {
let producer_buf: Arc<Shared> = Arc::clone(&shared);
let prod_handle = std::thread::spawn(move || {
for i in 0..number_of_jobs {
// create the job with desired number
let job_id = number_of_jobs * prod_id + i;
let j = Job { job_id };
// get the mutex
let mut guard = producer_buf.buf.lock().unwrap();
// while full, wait
while guard.size == guard.capacity {
guard = producer_buf.not_full.wait(guard).unwrap();
}
// now not full, so add
guard.add(j);
println!("Added job with id: {} by producer {}", job_id, prod_id);
// notify that the queue is not empty anymore
producer_buf.not_empty.notify_all();
}
});
producer_handles.push(prod_handle);
}Now to create our producers.
We first grab our version of the shared object. Then we create a unique job, wait until we have capacity to add, and then we add our item. Finally we also notify other threads that the queue is no longer empty.
Consumer
let mut consumer_handles = Vec::new();
for cons_id in 0..num_consumers {
let consumer_buf = Arc::clone(&shared);
let consumer_handle = std::thread::spawn(move || {
let mut jobs_read = 0;
while jobs_read < number_of_jobs {
let mut guard = consumer_buf.buf.lock().unwrap();
// wait for things to not be empty anymore
while guard.size == 0 {
guard = consumer_buf.not_empty.wait(guard).unwrap();
}
// not empty
let job = guard.read();
consumer_buf.not_full.notify_all();
println!(
"Read job with id: {} by consumer {}",
job.unwrap().job_id,
cons_id
);
jobs_read += 1;
}
});
consumer_handles.push(consumer_handle)
}Our consumer also grabs the shared object, waits until we have items to read and then reads them. Once done it notifies that the buffer is no longer full.
Threading
Finally, we join these handlers up together, and we have a multithreaded version working.
// Join all consumers first
for handle in consumer_handles {
handle.join().unwrap();
}
// Join all producers
for handle in producer_handles {
handle.join().unwrap();
}We can see that things look like they are working in the output:
Read job with id: 11 by consumer 0
Read job with id: 12 by consumer 0
Read job with id: 13 by consumer 0
Read job with id: 20 by consumer 1
Added job with id: 21 by producer 1
Added job with id: 22 by producer 1
Read job with id: 21 by consumer 0
Read job with id: 22 by consumer 0
Added job with id: 14 by producer 0
Added job with id: 15 by producer 0
Added job with id: 16 by producer 0
Read job with id: 14 by consumer 0
Read job with id: 15 by consumer 0
Added job with id: 17 by producer 0
Added job with id: 18 by producer 0
Added job with id: 19 by producer 0
Read job with id: 16 by consumer 1
Read job with id: 17 by consumer 1Conclusion and Extensions
So there you go, a multi-threaded ring buffer in rust.
Some extensions that could be added
ring buffer supports a generic type
create some benchmarks to inspect performance
play with external libraries like Tokio or crossbeam-queue.
improve performance by removing using of locks in a lock-free queue. Some discussion on potential data structures
