Understanding the Tokio Reactor Core

Coredump hosts about every second meetup of Rust Zürichsee, a group helping people interested in the Rust programming language to connect. The group also organizes RustFest Zürich in September.

At our last meetup in the hackerspace we presented our projects. Futures came up twice.

In this post I will present my experiments with the event loop used in the Tokio project.

Tokio is currently a single threaded event loop driven to resolve at least one future. To understand the inner workings a little better, I implemented a Future myself.

Preparations

To use the required structs I created a new project with cargo new --lib future_testbed and added the two crates to Cargo.toml:

[dependencies]
futures = "0.1.13"
tokio-core = "0.1.7"

This allowed me to create test cases in a straightforward way and see when I broke something during development.

NO-OP

My first step was to create a future that resolves immediately.

use futures::{Future, Poll, Async};

/// This is a NO-OP Future
struct Poller;
impl Future for Poller {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        Ok(Async::Ready( () ))
    }
}

You may wonder why it is called Poller. Well, Poller means buffer stop in Swiss German.

Source lines explained

Let us go through it line by line.

use futures::{Future, Poll, Async};

A future in rust is an implementation of the trait “Future”. “Poll” is basically a type for a Result<Async<T>, E>.  “Async” is an enum like the std::Option.

struct Poller;

Next I created an empty struct. Note this is basically the same as struct Poller { /* no fields */ }. The struct will remain private for this project.

impl Future for Poller {

Now we start implementing the trait.

    type Item = ();
    type Error = ();

With these definitions we can centralize the type we use. This is very handy when the interface changes over time and directs the default implementations to the correct types, see associated types. For this implementation the types are the empty tuple () that is comparable with the void type from other languages.

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

This is the function the event loop will call every time it would like to make progress. Note that we get a mutable borrow so we can modify the fields later.

Ok(Async::Ready( () ))

Since we would like to complete immediatly we can return a good result Ok containing the state Ready containing the actual data () which is nothing.

Testing

Now it is time to put all the parts together.

use tokio_core::reactor::Core;
#[test]
pub fn poller() {
    let mut core = Core::new().unwrap();

    let poller = Poller { };

    assert_eq!(Ok( () ), core.run(poller));
}

We spin up the reactor core and let it run until it completes our future which should return ().

Poll me n times

The next step is to not complete until a condition is reached. So I created a Future that requires some polling.

struct PollMeNTimes<T> {
    n: u64,
    answer: T
}

First I drafted the structure with a generic way. I do not place any restrictions on the data structure. Rust can place additional restrictions on the used types depending on the usage, see std::vec::Vec. Then I did my first impl:

impl<T: Copy> Future for PollMeNTimes<T> {
    type Item = T;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        println!("poll: {}", self.n);
        if self.n != 0 {
            self.n -= 1;
            Ok(Async::NotReady)
        } else {
            Ok(Async::Ready(self.answer))
        }
    }
}

My first implementation requires the generic type T to implement the Copy trait. This allows the code to be short.

Testing

Putting the pieces together like last time.

#[test]
pub fn poll_me_0_times() {
    let mut core = Core::new().unwrap();

    let pm0t = PollMeNTimes { n: 0, answer: 42 };

    assert_eq!(Ok( 42 ), core.run(pm0t));
}

This test works as expected. So I created another one:

#[test]
pub fn poll_me_3_times() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let pm3t = PollMeNTimes { n: 3, answer: 42 };

    handle.spawn(Poller);
    handle.spawn(Poller);
    handle.spawn(Poller);

    assert_eq!(Ok( 42 ), core.run(pm3t));
}

This test fails with a timeout. I added three NO-OP futures to debug the case. The Poller future completes, but after that the core hangs forever.

Reading through the docs many times and asking on the slack channel I found the missing piece:

futures::task::park().unpark();

This line will unpark the task tokio::Core is using to handle the futures. As long as the task is in the Async::NotReady state it will never be rescheduled. Therefore it will never make progress again. Adding the line to the poll function resolves that issue and the reactor will poll the future for progress until it completes. The complete code:

impl<T: Copy> Future for PollMeNTimes<T> {
    type Item = T;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        println!("poll: {}", self.n);
        if self.n != 0 {
            self.n -= 1;
            futures::task::park().unpark();
            Ok(Async::NotReady)
        } else {
            Ok(Async::Ready(self.answer))
        }
    }
}

While writing this article I noticed that tokio changed its behavior and the futures::task::park().unpark() call is already deprecated. Development is moving at the speed of light in Tokio.