The simple Brain

Now we have everything in place to implement a first functional simple Brain.

/// A quite naive Brain that should process the `Future`s wrapped in a `Thought`
struct Brain {
    /// the sender side of the mpmc channel to pass the ``Thought``s that
    /// require processing
    sender: mpmc::Sender<Arc<Thought>>,
    /// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
    /// from to process them
    receiver: mpmc::Receiver<Arc<Thought>>,
}

impl Brain {
    fn default() -> Self {
        let (sender, receiver) = mpmc::channel();

        Self {
            sender,
            receiver,
        }
    }

    /// Add a new `Future` to the [Brain], so it can be processed
    fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
        // ensure the given Future is getting a fixed position on the HEAP
        let thinkable = Box::pin(thinkable);
        // create the Thought
        let thought = Arc::new(
            Thought {
                    thinkable: DataLock::new(thinkable),
                    sender: self.sender.clone(),
                }
            );
        // push the Thought to the list of thoughts to think on
        self.sender.send(thought);
    }
}

impl Brain {
    /// Do the actual thinking - check for Thoughts that waits for processing
    /// and drive them to completion
    fn do_thinking(&self) {
        // check if there is a new Thought available in the channel
        while let Ok(thought) = self.receiver.recv() {
            // create the Waker from the current Thought
            let waker = Wakeable::into_waker(&thought);
            // create the Context from the given Waker
            let mut context = Context::from_waker(&waker);
            // lock the Future contained in the Thought and poll it
            let mut thinkable = thought.thinkable.lock();
            if let Poll::Pending = thinkable.as_mut().poll(&mut context) {
                // if the state is Poll::Pending we just unlock the Future
                drop(thinkable);
                // in case it will be still valid and required to re-process this
                // Thought the Waker will resend it through the channel
            }
        }
    }
}

The usage of this simple brain is pretty much similar to the first attempt. We implement an example Future and a simple async function that can be spawned to the Brain and is awaiting the example Future.

struct GiveNumberFuture {
    give_after_tries: u32,
    current_tries: u32,
}

impl Future for GiveNumberFuture {
    type Output = u32;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        println!("polled {} time(s)", this.current_tries + 1);
        if this.give_after_tries > this.current_tries + 1 {
            this.current_tries += 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        } else {
            Poll::Ready(20)
        }
    }
}

async fn main_thought() {
    let future = GiveNumberFuture {
        give_after_tries: 10,
        current_tries: 0,
    };

    let number = future.await;
    println!("waited for {}", number);
}

And finally the actual main function utilizing the Brain to process the Future.

//! # Naive Async Processing
//!
//! Implementing a simple *Brain*

extern crate alloc;

use core::{
    future::Future,
    task::{Poll, Context},
    pin::Pin,
};

use alloc::{
    boxed::Box,
    sync::Arc,
};

use ruspiro_channel::mpmc;
use ruspiro_lock::DataLock;

mod thought;
use thought::*;

mod wakeable;
use wakeable::Wakeable;

/// A quite naive Brain that should process the `Future`s wrapped in a `Thought`
struct Brain {
    /// the sender side of the mpmc channel to pass the ``Thought``s that
    /// require processing
    sender: mpmc::Sender<Arc<Thought>>,
    /// the receiver side of the mpmc channel the ``Brain`` picks ``Thought``s
    /// from to process them
    receiver: mpmc::Receiver<Arc<Thought>>,
}

impl Brain {
    fn default() -> Self {
        let (sender, receiver) = mpmc::channel();

        Self {
            sender,
            receiver,
        }
    }

    /// Add a new `Future` to the [Brain], so it can be processed
    fn think_on(&self, thinkable: impl Future<Output = ()> + 'static) {
        // ensure the given Future is getting a fixed position on the HEAP
        let thinkable = Box::pin(thinkable);
        // create the Thought
        let thought = Arc::new(
            Thought {
                    thinkable: DataLock::new(thinkable),
                    sender: self.sender.clone(),
                }
            );
        // push the Thought to the list of thoughts to think on
        self.sender.send(thought);
    }
}

impl Brain {
    /// Do the actual thinking - check for Thoughts that waits for processing
    /// and drive them to completion
    fn do_thinking(&self) {
        // check if there is a new Thought available in the channel
        while let Ok(thought) = self.receiver.recv() {
            // create the Waker from the current Thought
            let waker = Wakeable::into_waker(&thought);
            // create the Context from the given Waker
            let mut context = Context::from_waker(&waker);
            // lock the Future contained in the Thought and poll it
            let mut thinkable = thought.thinkable.lock();
            if let Poll::Pending = thinkable.as_mut().poll(&mut context) {
                // if the state is Poll::Pending we just unlock the Future
                drop(thinkable);
                // in case it will be still valid and required to re-process this
                // Thought the Waker will resend it through the channel
            }
        }
    }
}

struct GiveNumberFuture {
    give_after_tries: u32,
    current_tries: u32,
}

impl Future for GiveNumberFuture {
    type Output = u32;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        println!("polled {} time(s)", this.current_tries + 1);
        if this.give_after_tries > this.current_tries + 1 {
            this.current_tries += 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        } else {
            Poll::Ready(20)
        }
    }
}

async fn main_thought() {
    let future = GiveNumberFuture {
        give_after_tries: 10,
        current_tries: 0,
    };

    let number = future.await;
    println!("waited for {}", number);
}

fn main() {
    println!("Hello, world!");

    let brain = Brain::default();

    brain.think_on(main_thought());

    loop {
        brain.do_thinking();
    }
}

Running this will yield the result

Hello, world!
polled 1 time(s)
polled 2 time(s)
polled 3 time(s)
polled 4 time(s)
polled 5 time(s)
polled 6 time(s)
polled 7 time(s)
polled 8 time(s)
polled 9 time(s)
polled 10 time(s)
waited for 20