Multi Producer Multi Consumer (MPMC) Channel

The MPMC channel allows adding new entries from any core or thread (multi producer), picking the next entry by any core or thread (multi consumer) and works like a FIFO queue. There are for sure multiple approaches possible to implement such a channel in a non-blocking way. I will present quite a simple one here that has proven to work at least for all my current use cases in a bare metal no_std environment. The source code can be found here.

Revisit the Brain

With the channel in place we can adopt the Brain to keep the Sender and the Receiver side of it. The Sender will be shared with the Waker while the Receiver is used by the Brain only.

struct Brain {
    /// the sender side of the mpmc channel to pass the ``Thought``s that
    /// require processing
    sender: mpmc::Sender<Arc<Thought>>,
    /// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
    /// from to process them
    receiver: mpmc::Receiver<Arc<Thought>>,
}

Note Well you might wonder why there is a Multi-Sender-Multi-Consumer channel if the consuming part is used by the Brain only? The reason is, that we will use the Brain to run it's code on different cores and therefor indeed a multi-consumer is required.

Revisit the Wakeable

In addition to updating the Brain storing now the two sides of the channel we also can now store the Sender of the channel with each Thought which allows the same to push itself back to the queue of things the Brain should process.

The updated Thought will look like this:

pub struct Thought {
    /// This is the actual thing the brain should process as part of the Thought
    pub thinkable: DataLock<Pin<Box<dyn Future<Output = ()> + 'static>>>,
    /// The sender side of the queue of the `Brain` to push myself for
    /// re-processing
    pub sender: mpmc::Sender<Arc<Thought>>,
}

The implementation of the Wakeable trait to add the wake behavior to it can now finally be implemented:

impl Wakeable for Thought {
    fn wake_by_ref(self: &Arc<Self>) {
        let clone = Arc::clone(self);

        self.sender.send(clone);
    }
}

The final bit now is to always pass the Sender to the Thought once the Brain is requested to think on one:

fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
    // ensure the given Future is getting a fixed position on the HEAP
    let thinkable = Box::pin(thinkable);
    // create the Thought
    let thought = Arc::new(
        Thought {
                thinkable: DataLock::new(thinkable),
                sender: self.sender.clone(),
            }
        );
    // push the Thought to the list of thoughts to think on
    self.sender.send(thought);
}