Multi Producer Multi Consumer (MPMC) Channel
The MPMC channel allows adding new entries from any core or thread (multi producer), picking the next entry by any core or thread (multi consumer) and works like a FIFO queue. There are for sure multiple approaches possible to implement such a channel in a non-blocking way. I will present quite a simple one here that has proven to work at least for all my current use cases in a bare metal no_std environment. The source code can be found here.
Revisit the Brain
With the channel in place we can adopt the Brain to keep the Sender and the Receiver side of it. The Sender will be shared with the Waker while the Receiver is used by the Brain only.
struct Brain {
/// the sender side of the mpmc channel to pass the ``Thought``s that
/// require processing
sender: mpmc::Sender<Arc<Thought>>,
/// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
/// from to process them
receiver: mpmc::Receiver<Arc<Thought>>,
}
Well you might wonder why there is a Multi-Sender-Multi-Consumer channel if the consuming part is used by the
Brainonly? The reason is, that we will use theBrainto run it's code on different cores and therefor indeed a multi-consumer is required.
Revisit the Wakeable
In addition to updating the Brain storing now the two sides of the channel we also can now store the Sender of the channel with each Thought which allows the same to push itself back to the queue of things the Brain should process.
The updated Thought will look like this:
pub struct Thought {
/// This is the actual thing the brain should process as part of the Thought
pub thinkable: DataLock<Pin<Box<dyn Future<Output = ()> + 'static>>>,
/// The sender side of the queue of the `Brain` to push myself for
/// re-processing
pub sender: mpmc::Sender<Arc<Thought>>,
}
The implementation of the Wakeable trait to add the wake behavior to it can now finally be implemented:
impl Wakeable for Thought {
fn wake_by_ref(self: &Arc<Self>) {
let clone = Arc::clone(self);
self.sender.send(clone);
}
}
The final bit now is to always pass the Sender to the Thought once the Brain is requested to think on one:
fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
// ensure the given Future is getting a fixed position on the HEAP
let thinkable = Box::pin(thinkable);
// create the Thought
let thought = Arc::new(
Thought {
thinkable: DataLock::new(thinkable),
sender: self.sender.clone(),
}
);
// push the Thought to the list of thoughts to think on
self.sender.send(thought);
}
Well you might wonder why there is a Multi-Sender-Multi-Consumer channel if the consuming part is used by the