Demystifying Async/Await in Rust
This book will explain the backgrounds and implementation details to be able to understand how the async/await programming pattern works in Rust.
The contents shared in this book are based on the experience of the author while implementing the async
/await
programming style and execution model using the actual version of Rust the Programming Language on a bare metal embedded system.
I hope the following chapters will give everyone who is curious about the things that happens behind the scenes of async
/await
some insights into this complex and sometimes mystical appearing topic.
Introduction
One of the main promises of asynchronous, parallel processing is that it can increase overal performance of your program. Although this does not apply to any generic situation, most of us would like to try out whether this could give the desired boost to your specific challenge. However, designing and writing a program ready to benefit from asynchronous execution is often challenging and sometimes requires knowledge of how the underlying bits and pieces fit together.
The book assumes that the reader is already familiar with the Rust programming language and it's main concepts and syntax.
Motivation
I'm - at the time of writing - working on a Raspberry Pi OS written as much as possible in Rust. Especially in the space of micro computer and embedded systems where GPIO's are used to access sensors and the like the OS or program often idles waiting for external events but also blocking any further processing while doing so. So the decision was made, that one of the main features of this OS will be, that it utilises the 4 cores of the provided processor of the Raspberry Pi as best as possbile and thus allows asynchronous and parallel processing (without writing a full threading stack). But writing a custom operating system does mean that one need to deal with some restrictions. One with the most impact on the usage of Rust as programming language is, that it is not possible to use the standard library. So everything need to be built with #[no_std]
in mind.
While Rusts feature to support asynchronous processing is already available for a while as part of the standard library, it just recently got available in no_std
environments thanks to the continues efforts of the Rust language group. Even though the whole async/await in Rust was quite a mystery to me I started to design and implement a runtime for my Raspberry Pi OS. This pretty much allowd me to understand all the different pieces - like async fn()
and future.await
thingies - that build the entire asynchronous features available for the developer with a concise syntax and also enabled me to know how everything fits together.
To share my experience and insights that lead to the de-mystification for me I've written this book to also help others to have this kind of a-ha đŸ’¡ effect.
Terminology
Before we start implementing our own runtime as part of the custom operating system with the help of the following chapters, let's cover some terms to ensure a common understanding througout this book.
Future
A Future
represents the processing whose result may be available at a future point in time. The actual procesing result need to be actively requested. This is called polling and is implemented as the poll
function on the Future
trait. The result of the poll
function represents the the state of the Future
. It could either be Ready
yielding the actual result of the processing, or Pending
indicating that the actual result is still not available.
Executor
The Executor is the working horse of the asynchronous processing. He pretty much represents the runtime that is able to continuesely request the result of every Future
until they return the Ready
state. Registering a Future
at the Executor to allow the same to run it to completion is called spawning. As part of this the Future
is wrapped into a structure that allows proper handling of processing and re-spawning of pending futures. In the Rust libraries this wrapper is typically called a Task.
Async
The term async
is a keyword in the Rust programming language. It is used to tell the compiler that a function is able to be processed asynchronously. This keyword can be seen as syntactic sugar. Every function preceded with the async
keyword will be translated into a function that returns a Future
containing the processing logic of the original function.
Let's picture it with a very lightweight example. Assume we will have a function that should return a number, but should be able to be processed asynchronously. With the keyword mentioned we could write this code:
async fn give_number() -> u32 { 100 } fn main() {}
With the async
hint the compiler will de-sugar the function into one that returns a Future
like demonstrated by the following code.
fn give_number() -> impl Future<Output = u32> {
GiveNumberFuture
}
struct GiveNumberFuture {}
impl Future for GiveNumberFuture {
// the return type of the original async function
type Output = u32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// the expression returning the value of the async function wrapped
// into the Poll::Ready state
Poll::Ready(100)
}
}
The given example of desuggering the
async
function is used for demonstration purposes only and might not match the actual code generated by the compiler.
Await
The term await
is another keyword in the Rust programming language. This is used to tell the compiler that the current sequential processing shall be paused until the value of an asynchronous processing is available. Once the value is availble the processing continues. Like async
the await
keyword is also syntactic sugar for the developer writing code to be run asynchronously. It is used on variables that hold a Future
to hint the compiler to generate code that allows to poll
the actual state of the Future
and only continue in case the value is ready. As waiting for the actual result of a Future
also requires the capability of asynchronous processing this keyword can only be used within an async fn
.
Let's illustrate the usage:
async fn give_number() -> u32 {
100
}
async fn wait_for_number() {
let number = give_number().await;
println!("Number: {}", number);
}
So the wait_for_number
function requires to be async
as well to be able to contain await
points. While the await
-ing of the presence of the value pauses the execution of this actual code the curret thread ore processor core is free to pick up other things to do until the executor decides to re-visit this await
point to check if progress can be made.
Let's Build a Runtime
While all the theory and definitions are great to have and ensure that we are on the same page with respect to the terminology we use, nothing beats practical examples to fully understand or learn something new.
So the purpose of the upcomming chapters is to implement a runtime / executor that enables the use of asynchronous programming in upcomming projects. So let's start defining the first pieces and implement the executor step-by-step.
The main building blocks will be the Brain and Thoughts.
The Thought
The Thought is an entity the Brain is able to process. It is kind of a wrapper that contains the thing that need to be thought on - the Future
. This Future
requires polling until it unveils it's result. As the Future
contained in the Thought
will be shared accross threads or cores in my Raspberry Pi it need to be stored on heap memory and pinned at it location to prevent it from moving around in memory.
//! # A Thought
//!
//! A Thought is an entity that wraps a `Future` the *Brain* could *think* on.
//!
extern crate alloc;
use core::{
future::Future,
pin::Pin,
};
use alloc::boxed::Box;
pub struct Thought {
/// This is the actual thing the brain should process as part of the Thought
pub thinkable: Pin<Box<dyn Future<Output = ()> + 'static>>,
}
You might be wondering, why the Future
that is assigned to the Thought
has a fixed Output
type being the unit type ()
?!
The initial intuition might indicate that this is wrong! How could a Future
with the unit return type ever yield an actual value our code might await
at some point? And you are doing right questioning this and I also struggled at this point in the first place. So lets try to explain why this actually is correct!
The Requirement: At some point the Brain is required to maintain a list of Thought
s that require processing. As the Future
beeing part of the Thought
ultimately will be part of the list as well, it's associated type required to be fully specified to allow it to participate in the list.
Why is this correct: In a typical sequential execution model the process flow starts by entering the main
function and continues until it reaches the end of the main function which typically does not return any value (keep aside any error codes or the like for the time beeing). However, within the main
function you are free to call functions that returns values, work with those values and do further processing. But finally the program does not yield a value at all. From this we can draw an intuition to the asyncronous world. The Thought
(and it's Future
is kind of the async representation of the synchronous main function. Within the Thought
's Future
we can embed other Future
s that yields values, wait on them, process those values etc. But ultimately at the end the Thought
itself does not return anything. However, the advantage of the Thought
's in async programming model is, that we can throw as many of them as we like onto the Brain. And the Brain can decide which Thought
to process next and which need to be parked as it still waits for a value inside it's processing to be available. Throwing new Thought
's onto the Brainis also called spawning.
So the conslusion is: It's totally fine and absolutely correct that the Thought
stores a Future
that does not yield any result.
If a
Future
embedding anotherFuture
and awaiting it's result before processing can continue it is also called chaining ofFuture
's. Chaining ofFuture
's in the async world is compareable to function calls in the syncronous world, where a function can only continue if the called function returns. The chaining ofFuture
's is unlikely to be implemented manually as this is done by the compiler when de-sugaring theawait
points within anasync
function.
With the Thought
defined let's try to implement ur first version of a Brain in the next chapter.
The First Naive Brain
With the first building blocks in place we might be able to sketch our first version of a Brain.
The first part we need to define is the struct that will contain the list of Thoughts
that are about to be processed. We wrap them into an Option
to distinguesh the items that require processing from the ones that are already finished - so no need to further think on them ...
struct Brain {
/// the list of `Thoughts`s that require processing
thoughts: Vec<Option<Thought>>,
}
Now we can implement a function that is able to take a Future
, wrap it into a Thought
and push it to the list.
impl Brain {
/// Add a new `Future` to the [Brain], so it can be processed
fn think_on(&mut self, thinkable: impl Future<Output = ()> + 'static) {
// ensure the given Future is getting a fixed position on the HEAP
let thinkable = Box::pin(thinkable);
// create the Thought
let thought = Thought {
thinkable,
};
// push the Thought to the list of thoughts to think on
self.thoughts.push(Some(thought));
}
}
Finally the Brain requires a function that allows processing of the list of Thought
's. This function will iterate over the items of Brain::thoughts
and will call the poll
function of the Future
contain in the respective Thought
. If this polling yields a Poll::Pending
state the Thought
will be kept in place of the list and is polled again at the next cycle.
impl Brain {
/// Do the actual thinking - check for Thoughts that are waiting to be
/// processed and drive them to completion
fn do_thinking(&mut self) {
// run through the list of Thoughts that require thinking
for maybe_thought in self.thoughts.iter_mut() {
if let Some(thought) = maybe_thought.take() {
// polling the Future requires some kind of Context, we will
// discuss this in the next chapter
if let Poll::Pending = thought.thinkable.as_mut().poll(cx) {
// as long as the state is Poll::Pending we put the
// the Thought back in place for the next round
*maybe_thought = Some(thought);
}
}
}
}
}
The actual first sketch of the Brain
has several flaws. One of them, for example, is that the poll
function of the Future
requires a Context
to be passed. Without having this in place the code will actually not compile.
However, before dealing with the different challenges of the above coude let's have a look how we would actually use the Brain
.
As a first step we will define a Future
that returns a constant value after it has been polled for a fixed number of tries. Nothing really asynchronous here, you are totally right, but let's start simple.
struct GiveNumberFuture {
give_after_tries: u32,
current_tries: u32,
}
impl Future for GiveNumberFuture {
type Output = u32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
println!("polled {} time(s)", this.current_tries + 1);
if this.give_after_tries > this.current_tries + 1 {
this.current_tries += 1;
Poll::Pending
} else {
Poll::Ready(20)
}
}
}
This Future
does indeed return a value. So we need to embed it into a Future
that does not return any value and can be spawned to the Brain
. The most simple way to do so would be the creation of an async
function that does not return any value and await
the result of the GiveNumberFuture
in it like so:
async fn main_thought() {
let future = GiveNumberFuture {
give_after_tries: 10,
current_tries: 0,
};
let number = future.await;
println!("waited for {}", number);
}
Within the main
function we can now create our Brain
, tell it to think on the main thought which will ultimately wait for the GiveNumberValue
to yield it's result.
fn main() {
println!("Hello, world!");
let mut brain = Brain {
thoughts: Vec::new(),
};
brain.think_on(main_thought());
loop {
brain.do_thinking();
}
}
Assuming the first sketch of our Brain
would already compile and run it would create the following output:
Hello, world!
polled 1 time(s)
polled 2 time(s)
polled 3 time(s)
polled 4 time(s)
polled 5 time(s)
polled 6 time(s)
polled 7 time(s)
polled 8 time(s)
polled 9 time(s)
polled 10 time(s)
waited for 20
The Issues of the Naive Brain
- The missing
Context
hindering it to compile - and what is it used for by the way? - The usage of a
Vec
to store theThought
's may grow endlessly without further handling. - The
Brain
requires mutable access to allow adding of newThought
's and processing them.
Let's tackle them one by one in the next chapters.
The Context and the Waker
The first issue of our naive Brain is, that it does not compile for an obvious reason: The signature of the poll
function requires to pass a Context
and we where not able to provide one for now, but - what is this Context
about ?
The Context
In the current version of Rust, the Context
that will be handed over to the Future
while polling the same only contains a reference to a Waker
. This Waker
can be used to wake the processing of a Future
at a later point in time, when the result of ths Future
is likely to be present.
Our initial implementation of the Brain
in the last chapter took a Future
that returned a constant value after it has been polled 10 times. Everytime this polling returned Poll::Pending
the Brain
ensured that the next processing cycle will again invoke the poll
function of this Future
. This is quite inefficient as the contineusly polling of the Future
will likely waste resources and processing capabilities of the Brain
. In a real world scenario it will more likely be an event - a timer, an I/O event, an extern GPIO interrupt - that will indicate that the requested result of a Future
is available.
But how could the Brain
know, that the event for a specific Future
has been raised and therefore the wrapping Thought
need re-processing? This is, where the Context
and it's containing Waker
comes into play. The Brain
will create a Waker
for each Thought
that is about to be processed and pass this as part of the Context
to the Future
that is polled. It is now the responsibility of the Future
to store this Waker
and use it to signal to the Brain
that it need to re-process the current Thought
this Future
is wrapped into. The most common use-case is to register this Waker
with a system I/O event handler or an interrupt handler. So if processing the Thought
returns Poll::Pending
the Brain
can park this one until it got woken by the Waker
.
It's also possible that multiple
Thought
's might share the sameWaker
if their re-processing is likely to depend on the same event, but for the sake of simplicity we stick to the creation of individualWaker
for eachThought
theBrain
will process.
The Waker
The high level concept of a Waker is kind of straight forward. A Waker defines the behavior of a thing (in our case the Thought
) that provides the methods to get woken. This behavior is defined with the Wakeable
trait. The high-level intuiton of a Waker
will look like this:
type Waker = Arc<dyn Wakeable>;
pub trait Wakeable {
/// Wake a Wakeable and consuming the [Arc]
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}
/// Wake the Wakeable keeping the Arc ref count intact
fn wake_by_ref(self: &'_ Arc<Self>);
}
There are several challenges that need to be solved for a Waker
to allow its usage in a general way, as the fact that it is tied to a Thought
is kind of arbitrary from the Waker
's point of view. So if you are curious about the low-level details feel free to continue reading by expanding this block.
When poll
ing a generic Future
we've already seen, that the Waker
will be passed as part of the Context
. In the same way the Future
as part of the Thought
can't contain any generic output type because we would like to store it in some sort of a list in the brain - the Waker
can't hold any generic type information either.
The solution to this problem is to use trsit objects like Arc<dyn Wakeable>
. But as this still covers the type information we would need the raw version of the trait object to be stored. How can this be achieved? Well, the answer to this lies in the definition of a trait object. On raw/memory level its nothing more than a pointer to the actual data of the trait object together with a V(irtual Function)Table. The VTable as such is a list of function pointers where the very first parameter passed is the pointer to the actual data of the object this function belongs to. This type erased representation of a Waker
is provided within the rust core library as RawWaker.
For reference the definitions from the core library below:
pub struct RawWaker {
data: *const (),
vtable: &'static RawWakerVTable,
}
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
Being Wakeable
So the first thing to get the Thought
being wakeable is to create the functions that will make up the VTable for it. All those functions has one thing in common: They get the pointer to the current Wakeable
as a type erased raw pointer. This need to be cast back into a typed raw pointer and from there to its Arc
representation. This is actually safe as the only way this raw pointer could have been created is from the corresponding Arc::into_raw
as shown later.
Function to Clone
The first function required will clone the RawWaker
from the Wakeable
. Being able to create clones of the Waker
enables them to be stored as part of interrupt handler or I/O event handlers to allow waking the Thought
's from within them.
unsafe fn clone<T: Wakeable>(wakeable: *const ()) -> RawWaker {
let wakeable: *const T = wakeable.cast();
let wakeable_ref: &Arc<T> = &*ManuallyDrop::new(
Arc::from_raw(wakeable)
);
Arc::clone(wakeable_ref).into_raw_waker()
}
Function to Wake
The second function required will call the wake
function of the Wakeable
trait that actually will be implemented in the Thought
. This function will consume the Waker
(it's Arc
) when called. This is most likely being called on cloned Waker
for example inside an interrupt handler.
unsafe fn wake<T: Wakeable>(wakeable: *const ()) {
// transfer the raw pointer back into it's type pointer
let wakeable: *const T = wakeable.cast();
let wakeable: Arc<T> = Arc::from_raw(wakeable);
// wake the wakeable
Wakeable::wake(wakeable);
}
There is also a non-consuming version of the wake
function that should be used when the current Wakeable
should not be consumed (as it is the only existing reference for example - like the one directly stored within the context)
unsafe fn wake_by_ref<T: Wakeable>(wakeable: *const ()) {
// transfer the raw pointer back into it's type pointer
let wakeable: *const T = wakeable.cast();
let wakeable_ref = &*ManuallyDrop::new(Arc::from_raw(wakeable));
Wakeable::wake_by_ref(wakeable_ref);
}
Function to Drop
When handing out clones of the Wakeable
it was necessary to ensure those will be dropped manually (by using ManuallyDrop
). For this very reason we also require to implement the drop function for those clones. So just safely drop the Arc
we build from the raw pointer.
unsafe fn drop<T: Wakeable>(wakeable: *const ()) {
// transfer the raw pointer back into it's type pointer
let wakeable: *const T = wakeable.cast();
core::mem::drop(Arc::from_raw(wakeable));
}
With the functions in place building up the VTable its now possible to create a RawWaker
that is a type erased version of a trait object representing the Wakeable
.
fn into_raw_waker(self: Arc<Self>) -> RawWaker {
let raw_wakeable: *const () = Arc::into_raw(self).cast();
let raw_wakeabe_vtable = &Self::WAKER_VTABLE;
RawWaker::new(
raw_wakeable,
raw_wakeabe_vtable,
)
}
This function will be the only way to construct the RawWaker
from a Wakeable
. So it uses the Arc::into_raw
to convert the Arc
into a raw pointer which makes it totally safe to convert the raw pointers passed to the VTable functions back into an Arc
using the Arc::from_raw
function. To keep things tied to gether that belongs together we define a private trait the covers the VTable as well as the into_raw_waker
function. In the following listing the details of the VTable functions are omitted for brevity.
trait WakeableTraitObject: Wakeable + Sized {
/// build the RawWaker from the Wakeable consuming the [Arc] of it
fn into_raw_waker(self: Arc<Self>) -> RawWaker {
let raw_wakeable: *const () = Arc::into_raw(self).cast();
let raw_wakeabe_vtable = &Self::WAKER_VTABLE;
RawWaker::new(
raw_wakeable,
raw_wakeabe_vtable,
)
}
/// specifiying the VTable for this Wakeable
const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
{
unsafe fn clone<T: Wakeable>(wakeable: *const ()) -> RawWaker {
/* details omitted */
}
clone::<Self>
},
{
unsafe fn wake<T: Wakeable>(wakeable: *const ()) {
/* details omitted */
}
wake::<Self>
},
{
unsafe fn wake_by_ref<T: Wakeable>(wakeable: *const ()) {
/* details omitted */
}
wake_by_ref::<Self>
},
{
unsafe fn drop<T: Wakeable>(wakeable: *const ()) {
/* details omitted */
}
drop::<Self>
}
);
}
Finally we provide an auto trait implementation for all types that implement the Wakeable
trait to also implement the WakeableTraitObject
trait.
impl<T: Wakeable> WakeableTraitObject for T {}
With this in place we can now provide a function as part of the Wakeable
trait that allows direct conversion from a Wakeable
into a Waker
.
pub trait Wakeable: Sized {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}
fn wake_by_ref(self: &'_ Arc<Self>);
fn into_waker(self: &Arc<Self>) -> Waker {
unsafe {
Waker::from_raw(
Self::into_raw_waker(Arc::clone(self))
)
}
}
}
Waking the Wakeable
With the Wakeable
trait we can now define our Thought
to be able to get woken, right?
impl Wakeable for Thought {
fn wake_by_ref(self: &Arc<Self>) {
// this Thought shall be able to get woken. This would require the Brain
// to re-process the same. How to achive this? How to push to the Brain?
// Should we pass a borrow of the Thoughtlist of the Brain to the
// Thought?
}
}
Even though we made the Thought
wakeable and we could implement the waking functionality we are struggling here at the next issue with our initial naive Brain implementation
The reason is, that we store the Thought
's that require processing in a Vec
within the Brain
. To add entries to this list we would require mutual exclusive access to the same and we would require to share the access accross different Thought
s. One possible way to address this is to use an Arc
and a Mutex-Like-Lock around this Vec
. But this will also not really solve the problem as the Brain
always acquires this lock while processing the list of Thought
's. Therefore it is very unlikely that the Waker
will ever get a chance to acquire the lock for adding it's related Thought
back to the things the Brain
need to process. Kind of a dilemma, right?
But - there is a solution to this. If we carefully check what the requirements of the list of the Thought
's within the Brain
are we see that it is more acting like a queue. The Brain
is picking up the things that need processing from the front of the queue and the Wakeable
will push itself to the end of the queue once woken.
So what we will need is the implementation of a queue!
The queue does have 2 sides, one that allows popping of Thought
s and one that allows pushing of Thought
's. And both ends shall be able to be shared independendtly - for example to the Thought
's that requires to push themself to the queue again once they are woken. The perfect candidate here is a channel. There are several kinds and implementations of channels available in the open source community. Based on our specific use case, where we might run the Brain
on bare metal in no_std
environment likely on different cores and the need to push Thought
's to this channel from different Waker
, we require a Multi Producer Multi Consumer channel, that preferrably is implemented in a lock-free fashion.
Check out the next chapter for the details.
Multi Producer Multi Consumer (MPMC) Channel
The MPMC channel allows adding new entries from any core or thread (multi producer), picking the next entry by any core or thread (multi consumer) and works like a FIFO queue. There are for sure multiple approaches possible to implement such a channel in a non-blocking way. I will present quite a simple one here that has proven to work at least for all my current use cases in a bare metal no_std
environment. The source code can be found here.
Revisit the Brain
With the channel in place we can adopt the Brain
to keep the Sender
and the Receiver
side of it. The Sender
will be shared with the Waker
while the Receiver
is used by the Brain
only.
struct Brain {
/// the sender side of the mpmc channel to pass the ``Thought``s that
/// require processing
sender: mpmc::Sender<Arc<Thought>>,
/// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
/// from to process them
receiver: mpmc::Receiver<Arc<Thought>>,
}
Well you might wonder why there is a Multi-Sender-Multi-Consumer channel if the consuming part is used by the
Brain
only? The reason is, that we will use theBrain
to run it's code on different cores and therefor indeed a multi-consumer is required.
Revisit the Wakeable
In addition to updating the Brain
storing now the two sides of the channel we also can now store the Sender
of the channel with each Thought
which allows the same to push itself back to the queue of things the Brain
should process.
The updated Thought
will look like this:
pub struct Thought {
/// This is the actual thing the brain should process as part of the Thought
pub thinkable: DataLock<Pin<Box<dyn Future<Output = ()> + 'static>>>,
/// The sender side of the queue of the `Brain` to push myself for
/// re-processing
pub sender: mpmc::Sender<Arc<Thought>>,
}
The implementation of the Wakeable
trait to add the wake behavior to it can now finally be implemented:
impl Wakeable for Thought {
fn wake_by_ref(self: &Arc<Self>) {
let clone = Arc::clone(self);
self.sender.send(clone);
}
}
The final bit now is to always pass the Sender
to the Thought
once the Brain
is requested to think on one:
fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
// ensure the given Future is getting a fixed position on the HEAP
let thinkable = Box::pin(thinkable);
// create the Thought
let thought = Arc::new(
Thought {
thinkable: DataLock::new(thinkable),
sender: self.sender.clone(),
}
);
// push the Thought to the list of thoughts to think on
self.sender.send(thought);
}
The simple Brain
Now we have everything in place to implement a first functional simple Brain
.
/// A quite naive Brain that should process the `Future`s wrapped in a `Thought`
struct Brain {
/// the sender side of the mpmc channel to pass the ``Thought``s that
/// require processing
sender: mpmc::Sender<Arc<Thought>>,
/// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
/// from to process them
receiver: mpmc::Receiver<Arc<Thought>>,
}
impl Brain {
fn default() -> Self {
let (sender, receiver) = mpmc::channel();
Self {
sender,
receiver,
}
}
/// Add a new `Future` to the [Brain], so it can be processed
fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
// ensure the given Future is getting a fixed position on the HEAP
let thinkable = Box::pin(thinkable);
// create the Thought
let thought = Arc::new(
Thought {
thinkable: DataLock::new(thinkable),
sender: self.sender.clone(),
}
);
// push the Thought to the list of thoughts to think on
self.sender.send(thought);
}
}
impl Brain {
/// Do the actual thinking - check for Thoughts that waits for processing
/// and drive them to completion
fn do_thinking(&self) {
// check if there is a new Thought available in the channel
while let Ok(thought) = self.receiver.recv() {
// create the Waker from the current Thought
let waker = Wakeable::into_waker(&thought);
// create the Context from the given Waker
let mut context = Context::from_waker(&waker);
// lock the Future contained in the Thought and poll it
let mut thinkable = thought.thinkable.lock();
if let Poll::Pending = thinkable.as_mut().poll(&mut context) {
// if the state is Poll::Pending we just unlock the Future
drop(thinkable);
// in case it will be still valid and required to re-process this
// Thought the Waker will resend it through the channel
}
}
}
}
The usage of this simple brain is pretty much similar to the first attempt. We implement an example Future
and a simple async
function that can be spawned to the Brain
and is await
ing the example Future
.
struct GiveNumberFuture {
give_after_tries: u32,
current_tries: u32,
}
impl Future for GiveNumberFuture {
type Output = u32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
println!("polled {} time(s)", this.current_tries + 1);
if this.give_after_tries > this.current_tries + 1 {
this.current_tries += 1;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(20)
}
}
}
async fn main_thought() {
let future = GiveNumberFuture {
give_after_tries: 10,
current_tries: 0,
};
let number = future.await;
println!("waited for {}", number);
}
And finally the actual main
function utilizing the Brain
to process the Future
.
//! # Naive Async Processing
//!
//! Implementing a simple *Brain*
extern crate alloc;
use core::{
future::Future,
task::{Poll, Context},
pin::Pin,
};
use alloc::{
boxed::Box,
sync::Arc,
};
use ruspiro_channel::mpmc;
use ruspiro_lock::DataLock;
mod thought;
use thought::*;
mod wakeable;
use wakeable::Wakeable;
/// A quite naive Brain that should process the `Future`s wrapped in a `Thought`
struct Brain {
/// the sender side of the mpmc channel to pass the ``Thought``s that
/// require processing
sender: mpmc::Sender<Arc<Thought>>,
/// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
/// from to process them
receiver: mpmc::Receiver<Arc<Thought>>,
}
impl Brain {
fn default() -> Self {
let (sender, receiver) = mpmc::channel();
Self {
sender,
receiver,
}
}
/// Add a new `Future` to the [Brain], so it can be processed
fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
// ensure the given Future is getting a fixed position on the HEAP
let thinkable = Box::pin(thinkable);
// create the Thought
let thought = Arc::new(
Thought {
thinkable: DataLock::new(thinkable),
sender: self.sender.clone(),
}
);
// push the Thought to the list of thoughts to think on
self.sender.send(thought);
}
}
impl Brain {
/// Do the actual thinking - check for Thoughts that waits for processing
/// and drive them to completion
fn do_thinking(&self) {
// check if there is a new Thought available in the channel
while let Ok(thought) = self.receiver.recv() {
// create the Waker from the current Thought
let waker = Wakeable::into_waker(&thought);
// create the Context from the given Waker
let mut context = Context::from_waker(&waker);
// lock the Future contained in the Thought and poll it
let mut thinkable = thought.thinkable.lock();
if let Poll::Pending = thinkable.as_mut().poll(&mut context) {
// if the state is Poll::Pending we just unlock the Future
drop(thinkable);
// in case it will be still valid and required to re-process this
// Thought the Waker will resend it through the channel
}
}
}
}
struct GiveNumberFuture {
give_after_tries: u32,
current_tries: u32,
}
impl Future for GiveNumberFuture {
type Output = u32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
println!("polled {} time(s)", this.current_tries + 1);
if this.give_after_tries > this.current_tries + 1 {
this.current_tries += 1;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(20)
}
}
}
async fn main_thought() {
let future = GiveNumberFuture {
give_after_tries: 10,
current_tries: 0,
};
let number = future.await;
println!("waited for {}", number);
}
fn main() {
println!("Hello, world!");
let brain = Brain::default();
brain.think_on(main_thought());
loop {
brain.do_thinking();
}
}
Running this will yield the result
Hello, world!
polled 1 time(s)
polled 2 time(s)
polled 3 time(s)
polled 4 time(s)
polled 5 time(s)
polled 6 time(s)
polled 7 time(s)
polled 8 time(s)
polled 9 time(s)
polled 10 time(s)
waited for 20
Proof the Runtime
After we have implemented our own simple runtime / executor to support the async implementation paradigm it's time to proof that it is really working. As we might do this without requiring to deploy this part to any actual embedded hardware we can execute this test within a normal operating system environment.
To do so we use a small binary crate utilizing our Brain
implementation. The presence of different cores can be simulated by running different threads.
So the main
function in this validation will look like this:
fn main() {
println!("Hello, world!");
let brain = Arc::new(
brain::Brain::default()
);
// assume we have 4 cores on the target system -> spawn 3 threads
// the current thread is the 4th core though
for _ in 0..3 {
let cloned_brain = Arc::clone(&brain);
std::thread::spawn( move || {
loop {
cloned_brain.do_thinking();
}
});
}
// just spawn 4 async executions ...
brain.think_on(main_thought(10, 10));
brain.think_on(main_thought(20, 5));
brain.think_on(main_thought(30, 6));
brain.think_on(main_thought(40, 3));
// use the current thread as the 4th core
loop {
brain.do_thinking();
// we could also spawn new [Thought]s here or from within an
// async fn to keep the Brain busy ..
}
}
To be able to look behind the scenes a bit we adjust the implementation of our Future
that will hand out a number after a certain amount of poll
's to see which core/thread the current poll
is executed.
struct GiveNumberFuture {
number_to_give: u32,
give_after_tries: u32,
current_tries: u32,
}
impl Future for GiveNumberFuture {
type Output = u32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
println!("polled {} time(s) - now on {:?}",
this.current_tries + 1,
std::thread::current().id()
);
if this.give_after_tries > this.current_tries + 1 {
this.current_tries += 1;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(this.number_to_give)
}
}
}
async fn main_thought(number: u32, tries: u32) {
let future = GiveNumberFuture {
number_to_give: number,
give_after_tries: tries,
current_tries: 0,
};
let number = future.await;
println!("waited for {}", number);
}
Running now this example will yield the follwoing output
Hello, world!
polled 1 time(s) - now on ThreadId(1)
polled 1 time(s) - now on ThreadId(1)
polled 2 time(s) - now on ThreadId(1)
polled 3 time(s) - now on ThreadId(1)
waited for 40
polled 1 time(s) - now on ThreadId(3)
polled 2 time(s) - now on ThreadId(1)
polled 3 time(s) - now on ThreadId(3)
polled 4 time(s) - now on ThreadId(1)
polled 5 time(s) - now on ThreadId(3)
polled 1 time(s) - now on ThreadId(2)
polled 2 time(s) - now on ThreadId(3)
polled 3 time(s) - now on ThreadId(2)
polled 4 time(s) - now on ThreadId(3)
polled 5 time(s) - now on ThreadId(2)
waited for 20
polled 6 time(s) - now on ThreadId(1)
waited for 30
polled 2 time(s) - now on ThreadId(4)
polled 3 time(s) - now on ThreadId(3)
polled 4 time(s) - now on ThreadId(2)
polled 5 time(s) - now on ThreadId(1)
polled 6 time(s) - now on ThreadId(3)
polled 7 time(s) - now on ThreadId(2)
polled 8 time(s) - now on ThreadId(1)
polled 9 time(s) - now on ThreadId(2)
polled 10 time(s) - now on ThreadId(3)
waited for 10
You can clearely see how the different cores/threads are picking up the work to drive a waiting Future
to completion.
Conclusion
Well I hope the mystery of async
/await
is now gone. If are now inspired to write your own Executor or Runtime just feel free to do so. With those basic concepts you also could extend the functionality of the Brain
to support more advanced features also often required in async processing.
Thanks for reading this small book - happy async coding ...
The Author
André Borrmann (aka 2ndTaleStudio).
I started learning Rust the programming language in 2019. I was implementing a C++ bare metal kernel / OS on my Raspberry Pi back then and ran often into some memory issues that where hard to track down due to the lack of proper debugging tools in the bare metal environment. A colleague of mine introduced Rust to me with its promises on concise code and memory safety, a clear ownership model and the compile time checks that make it hard to run into data races and memory issues. So I gave it a try and never regret it!
In my job I'm using several programming languages like ABAP, C/C++, Java, Javascript/Typescript and Scala - however, Rust is now my favorite though.