Ping-Pong Buffers in Rust

A “ping-pong buffer” is a data structure which is very useful in systems programming. The ping-pong buffer is two-element buffer which allows simultaneous access by a single producer and a single consumer. One element is reserved for writing by the producer, and the other element is reserved for reading by the consumer. When writing and reading are finished, the roles of the two elements are swapped (i.e. the one which was written will be next to be read, and the one which was read will be next to be overwritten). This way, the data which is written doesn’t have to be copied to a different memory location in order to be read.

Jason Sachs has an excellent EmbeddedRelated post about ping-pong buffers, although he calls them “revolving fireplaces”. I highly recommend reading that post (and indeed, every article that Jason has written).

Lately, the Rust programming language has been an object of my fascination. As a low-level systems language, Rust excels at providing expressiveness and abstraction without overhead. It’s possible to implement a ping-pong buffer as a Rust “smart pointer” type (similar to the built-in RefCell), which preserves Rust’s memory safety guarantees with a minimum of runtime overhead.

I wrote a Rust library called “atomic_pingpong”, which provides a generic implementation of ping-pong buffer functionality. It uses the built-in atomic types to provide lock-free thread-safety between the producer and consumer. The library is published and documented through crates.io, with source code available from GitHub.

Usage

Let’s create a ping-pong buffer where each element is an array of 100 integers.

let buffer = atomic_pingpong::Buffer::new([0; 100]);

To write data into the buffer, we obtain a mutable reference to a buffer element by calling Buffer::write(). (Actually, write() returns an Option – the call to write() might fail because the ping-pong buffer only allows a single producer to write data at a time. If we know that we are the only producer, we can unwrap() the result of write(). Otherwise, we would have to handle the case of a None return value.)

fn write_values(new_val: i32) {
    let mut arr = buffer.write().unwrap();
    for val in arr.iter_mut() {
        *val = new_val;
    }
}

Similarly, to read data from the buffer, we obtain a reference to a buffer element by calling Buffer::read(). (And once again, this actually returns an Option, because only a single consumer is allowed to read data at a time.)

fn read_values() {
    let arr = buffer.read().unwrap();
    for (i, val) in arr.iter().enumerate() {
        println!("arr[{}] = {}", i, val);
    }
}

In this example, the ping-pong buffer has three critical attributes:

Implementation

The state of the buffer consists of four boolean values. These are stored as four bits in an AtomicU8:

struct BufferState(atomic::AtomicU8);

impl BufferState {
    // Bits of the bitmask:
    const LOCK_READ: u8 = 0b0000_0001;        // reading is in progress
    const LOCK_WRITE: u8 = 0b0000_0010;       // writing is in progress
    const MODE_IS_FLIPPED: u8 = 0b0000_0100;  // buffer is flip-flopped
    const WANT_MODE_CHANGE: u8 = 0b0000_1000; // buffer is ready to flip-flop

    const fn new() -> Self {
        Self(atomic::AtomicU8::new(0))
    }

(A fifth bit of state, NEW_DATA_READY, is present in the library to support applications in which each buffer element should be read precisely once after being written. For simplicity, it’s omitted from this explanation.)

To begin reading (BufferState::lock_read()) or writing (BufferState::lock_write()), we check if the corresponding lock bit is already set. If it is, we return None: we can’t start the requested operation because it’s already in progress. Otherwise, we return Some(true) if the buffer is currently flip-flopped, and Some(false) if it’s not.

    fn lock(&self, condition: fn(u8) -> bool, action: fn(u8) -> u8) -> Option<bool> {
        let mut new_flags = None::<u8>;
        let _ = self.0.fetch_update(
            atomic::Ordering::Acquire,
            atomic::Ordering::Relaxed,
            |flags| {
                if condition(flags) {
                    new_flags = Some(action(flags));
                }
                new_flags
            },
        );
        new_flags.map(|f| f & Self::MODE_IS_FLIPPED != 0)
    }
    fn lock_read(&self, allow_repeated: bool) -> Option<bool> {
        self.lock(
            |flags| flags & Self::LOCK_READ == 0,
            |flags| flags | Self::LOCK_READ,
        )
    }
    fn lock_write(&self, allow_repeated: bool) -> Option<bool> {
        self.lock(
            |flags| flags & Self::LOCK_WRITE == 0,
            |flags| flags | Self::LOCK_WRITE,
        )
    }

When reading or writing is finished, we clear the corresponding flag, and also check if the buffer needs to be flip-flopped:

    fn release(&self, action: fn(u8) -> u8) {
        let _ = self.0.fetch_update(
            atomic::Ordering::Release,
            atomic::Ordering::Relaxed,
            |flags| Some(action(flags)),
        );
    }
    fn release_read(&self) {
        self.release(|mut flags| {
            flags &= !Self::LOCK_READ;
            if flags & (Self::LOCK_WRITE | Self::WANT_MODE_CHANGE) == Self::WANT_MODE_CHANGE {
                flags &= !Self::WANT_MODE_CHANGE;
                flags ^= Self::MODE_IS_FLIPPED;
            }
            flags
        })
    }
    fn release_write(&self) {
        self.release(|mut flags| {
            flags &= !Self::LOCK_WRITE;
            if flags & Self::LOCK_READ == 0 {
                flags &= !Self::WANT_MODE_CHANGE;
                flags ^= Self::MODE_IS_FLIPPED;
            } else {
                flags |= Self::WANT_MODE_CHANGE;
            }
            flags
        })
    }
}

Calls to Buffer::read() and Buffer::write() will, of course, call BufferState::lock_read() and BufferState::lock_write() as needed to begin reading or writing. But how can we make sure unlock_read() or unlock_write() gets called when reading or writing is finished?

The answer to this question is Rust’s “smart pointer” concept. Instead of returning Option<&T> from Buffer<T>::read(), we’ll return an Option<Ref<T>>. The type Ref<T> implements the Deref trait, so it can be used in the same way as a regular &T reference, but it has a special Drop handler which updates the buffer state when it goes out of scope.

pub struct Ref<'a, T> {
    ptr: &'a T,
    state: &'a BufferState,
}

impl<'a, T> Deref for Ref<'a, T> {
    type Target = T;
    fn deref(&self) -> &T {
        self.ptr
    }
}

impl<'a, T> Drop for Ref<'a, T> {
    fn drop(&mut self) {
        self.state.release_read();
    }
}

For Buffer::write(), we similarly define RefMut, which implements DerefMut instead of Deref and calls release_write() instead of release_read().

Now we’re ready to define the Buffer struct:

pub struct Buffer<T> {
    ping: UnsafeCell<T>,
    pong: UnsafeCell<T>,
    state: BufferState,
}

We use UnsafeCell<T> instead of T because the Buffer type has interior mutability: you are allowed to mutate a Buffer by calling read() or write(), even if you only have an immutable reference to the buffer.

The last bits of plumbing are straightforward:

impl<T> Buffer<T> {
    fn get_pointer(&self, state: bool, read: bool) -> *mut T {
        // state = false => read ping and write pong
        // state = true  => read pong and write ping
        (if state ^ read { &self.ping } else { &self.pong }).get()
    }
    pub fn read(&self) -> Option<Ref<T>> {
        let mode = buf.state.lock_read()?;
        Some(Ref {
            ptr: unsafe { &*self.get_pointer(mode, true) },
            state: &self.state,
        })
    }
    pub fn write(&self) -> Option<RefMut<T>> {
        let mode = buf.state.lock_write()?;
        Some(RefMut {
            ptr: unsafe { &mut *self.get_pointer(mode, false) },
            state: &self.state,
        })
    }
}