Skip to content

Latest commit

 

History

History
420 lines (322 loc) · 18 KB

simple-but-unsafe-green-thread-implementation.md

File metadata and controls

420 lines (322 loc) · 18 KB

An implementation of green threads

Before we start I'll mention that the code we write is quite unsafe and is not a "best practice" when writing Rust code. I want to try to make this as safe as possible without introducing a lot of unneeded complexity, so I encourage you dear reader to suggest a PR to the projects repo if you see something that could be done a safer way without making our code too complex.

Lets get going

The first thing we do is to delete our example in our main.rsso we start from scratch and add the following.

#![feature(asm)]
#![feature(naked_functions)]
use std::ptr;

const DEFAULT_STACK_SIZE: usize = 1024 * 1024* 2;
const MAX_THREADS: usize = 4;
static mut RUNTIME: usize = 0;

We enable two features the asmfeature that we covered earlier, and the naked_functionsfeature, that we need to explain.

naked_functions

You see, when Rust compiles a function, it adds a small prologue and epilogue to each function and this causes some issues for us when we switch contexts since we end up with a misaligned stack. This worked fine in our first simple example but once we need to push more functions to the stack we end up with trouble. Marking the a function as #[naked]removes the prologue and epilogue and as you will see with some adjustments it makes the code run on both OSX, Linux and Windows.

{% hint style="info" %} If you are interested you can read more about thenaked_functionsfeature in RFC #1201 {% endhint %}

Our DEFAULT_STACK_SIZEis set to 2 MB which is more than enough for our use. We also set MAX_THREADSto 4 since we don't need more for our example.

The last constant RUNTIMEis a pointer to our runtime (yeah, I know, it's not pretty with a mutable global variable but we need it later and we're only setting this variable on runtime initialization).

Let's start fleshing out something to represent our data:

pub struct Runtime {
    threads: Vec<Thread>,
    current: usize,
}

#[derive(PartialEq, Eq, Debug)]
enum State {
    Available,
    Running,
    Ready,
}

struct Thread {
    id: usize,
    stack: Vec<u8>,
    ctx: ThreadContext,
    state: State,
}

#[derive(Debug, Default)]
#[repr(C)]
struct ThreadContext {
    rsp: u64,
    r15: u64,
    r14: u64,
    r13: u64,
    r12: u64,
    rbx: u64,
    rbp: u64,
}

Runtimeis going to be where our main entry point. We are basically going to create a very small, simple runtime to schedule and switch between our threads. The runtime holds an array of Threadsand a currentfield to indicate which thread we are currently running.

Thread holds data for a thread. Each thread has an id so we can separate them from each other. The stackis similar to what we saw in our first example in earlier chapters. The ctxfield is a context representing the data our CPU needs to resume where it left of on a stack, and a statewhich is our thread state.

Stateis an enum representing the states our threads can be in:

  • Availablemeans the thread is available and ready to be assigned a task if needed.
  • Running means the thread is running
  • Readymeans the thread is ready to move forward and resume execution

ThreadContextholds data for the registers that CPU needs to resume execution on a stack.

{% hint style="info" %} Go back to the chapter Background Information to read about the registers if you don't remember. These are the registers marked as "callee saved" in the specification of the x86-64 arcitecture. {% endhint %}

Lets move on:

impl Thread {
    fn new(id: usize) -> Self {
        Thread {
            id,
            stack: vec![0_u8; DEFAULT_STACK_SIZE],
            ctx: ThreadContext::default(),
            state: State::Available,
        }
    }
}

This is pretty easy. A new thread starts in the Availablestate indicating it is ready to be assigned a task.

One thing to note is that we allocate our stack here. That is not needed and is not an optimal use of our resources since we allocate memory for threads we might need instead of allocating on first use. However, this keeps complexity down in the parts of our code that has a more important focus than allocating memory for our stack.

{% hint style="warning" %} The important thing to note is that once a stack is allocated it must not move! Nopush()on the vector or any other methods that might trigger a reallocation. In a better version of this code we would make our own type that only exposes the methods we consider safe to use. {% endhint %}

Implementing the Runtime

All the code in this segment is in impl Runtimeblock meaning that they are methods on the Runtimestruct.

impl Runtime {
    pub fn new() -> Self {
        // This will be our base thread, which will be initialized in 
        // the `running` state
        let base_thread = Thread {
            id: 0,
            stack: vec![0_u8; DEFAULT_STACK_SIZE],
            ctx: ThreadContext::default(),
            state: State::Running,
        };

        let mut threads = vec![base_thread];
        let mut available_threads: Vec<Thread> = (1..MAX_THREADS).map(|i| Thread::new(i)).collect();
        threads.append(&mut available_threads);

        Runtime {
            threads,
            current: 0,
        }
    }

When we instantiate our Runtimewe set up a base thread. This thread will be set to the Runningstate and will make sure we keep the run-time running until all tasks are finished.

Then we instantiate the rest of the threads and set the current thread to 0which is our base thread.

    /// This is cheating a bit, but we need a pointer to our Runtime 
    /// stored so we can call yield on it even if we don't have a 
    /// reference to it.
    pub fn init(&self) {
        unsafe {
            let r_ptr: *const Runtime = self;
            RUNTIME = r_ptr as usize;
        }
    }

Right now we need this. As I mentioned when going through our constants we need this to be able to call yieldlater on. It's not pretty, but we know that our runtime will be alive as long as there is any thread to yieldso as long as we don't abuse this it's safe to do.

    pub fn run(&mut self) -> ! {
        while self.t_yield() {}
        std::process::exit(0);
    }

This is where we start running our run-time. It will continually call t_yield()until it returns falsewhich means that there are no more work to do and we can exit the process.

    fn t_return(&mut self) {
        if self.current != 0 {
            self.threads[self.current].state = State::Available;
            self.t_yield();
        } 
    }

This is our return function that we call when the thread is finished. returnis another reserved keyword in Rust so we name this t_return(). Make a note that the user of our threads does not call this, we set up our stack so this is called when the task is done.

If the calling thread is the base_thread we don't do anything. Our runtime will call yieldfor us on the base thread. If it's called from a spawned thread we know it's finished since all threads have a guard function on top of their stack (which we'll show further down) and the only place this function is called is on our guardfunction.

We set its state to Available letting the runtime know it's ready to be assigned a new task and then immediately call t_yield which will schedule a new thread to be run.

Next: our yieldfunction:

    fn t_yield(&mut self) -> bool {
        let mut pos = self.current;
        while self.threads[pos].state != State::Ready {
            pos += 1;

            if pos == self.threads.len() {
                pos = 0;
            }
            if pos == self.current {
                return false;
            }
        }

        if self.threads[self.current].state != State::Available {
            self.threads[self.current].state = State::Ready;
        }

        self.threads[pos].state = State::Running;
        let old_pos = self.current;
        self.current = pos;

        unsafe {
            switch(&mut self.threads[old_pos].ctx, &self.threads[pos].ctx);
        }

        true
    }

This is the heart of our run-time. We have to name this t_yieldsince yieldis a reserved keyword in Rust.

Here we go through all the threads and see if anyone is in the Ready state which indicates it has a task it is ready to make progress on. This could be a database call that has returned in a real world application.

If no thread is Ready we're all done. This is an extremely simple scheduler using only a round-robin algorithm, a real scheduler might have a much more sophisticated way of deciding what task to run next.

{% hint style="info" %} This is a very naive implementation tailor made for our example. What happens if our thread is not ready to make progress (not in a Ready state) and still waiting for a response from i.e. a database?

It's not too difficult to work around this, instead of running our code directly when a thread is Ready we could instead poll it for a status. For example it could return IsReady if it's really ready to run orPendingif it's waiting for some operation to finish. In the latter case we could just leave it in it's Ready state to get polled again later. Does this sound familiar? If you've read about how Futures in rust works, we are starting to connect some dots on how this all fits together. {% endhint %}

If we find a thread that's ready to be run we change the state of the current thread from Running to Ready.

Then we call switch which will save the current context (the old context) and load the new context into the CPU. The new context is either a new task, or all the information the CPU needs to resume work on an existing task.

Next up is our spawn()function:

pub fn spawn(&mut self, f: fn()) {
        let available = self
            .threads
            .iter_mut()
            .find(|t| t.state == State::Available)
            .expect("no available thread.");

        let size = available.stack.len();
        let s_ptr = available.stack.as_mut_ptr();

        unsafe {
            ptr::write(s_ptr.offset((size - 8) as isize) as *mut u64, guard as u64);
            ptr::write(s_ptr.offset((size - 16) as isize) as *mut u64, f as u64);
            available.ctx.rsp = s_ptr.offset((size - 16) as isize) as u64;
        }

        available.state = State::Ready;
    }
}

While t_yield is the logically interesting function I think this the technically most interesting.

When we spawn a new thread we first check if there are any available threads (threads in Available state). If we run out of threads we panic in this scenario but there are several (better) ways to handle that. We keep things simple for now.

When we find an available thread we get the stack length and a pointer to our u8 byte-array.

In the next segment we have to use some unsafe functions. First we write the address to our guard function that will be called when the task we provide finishes and the function returns. Then we write the address to fwhich is the function we pass inn and want to run.

{% hint style="info" %} Remember how we explained how the stack works in The Stackchapter. We want the ffunction to be the first to run so we set the base pointer to fand make sure it's 16 byte aligned. We then push the address to guardfunction. This is not 16 byte aligned but when freturns the CPU will read the next address as the return address of fand resume execution there. {% endhint %}

Third, we set the value of rsp which is the stack pointer to the address of our provided function so we start executing that first when we are scheduled to run.

Lastly we set the state as Ready which means we have work to do and that we are ready to do it. Remember, it's up to our "scheduler" to actually start up this thread.

We're now finished implementing our Runtime, if you got all this you basically understand how green threads work. However there are still a few details needed to implement them.

Guard and switch functions

#[cfg_attr(any(target_os="windows", target_os="linux"), naked)]
fn guard() {
    unsafe {
        let rt_ptr = RUNTIME as *mut Runtime;
        let rt = &mut *rt_ptr;
        println!("THREAD {} FINISHED.", rt.threads[rt.current].id);
        rt.t_return();
    };
}

Here we meet our first portability issue. [cfg_attr(any(target_os="windows", target_os="linux"), naked)] is a conditional compilation attribute. If the target OS is Windows or Linux we compile this function with the #[naked]attribute, if not we don't compile it with the attribute. This way the code runs fine on Windows, the Rust Playground and on my mac.

The function means that the function we passed in has returned and that means our thread is finished running its task so we de-references our Runtimeand call t_return(). We could have made a function that did some additional work when a thread is finished but right now our t_return()function does all we need. It marks our thread as Available(if it's not our base thread) and yieldsso we can resume work on a different thread.

pub fn yield_thread() {
    unsafe {
        let rt_ptr = RUNTIME as *mut Runtime;
        (*rt_ptr).t_yield();
    };
}

This is just a helper function that lets us call yieldfrom an arbitrary place in our code. This is pretty unsafe though, if we call this and our Runtimeis not initialized yet or the runtime is dropped it will cause a panic(). However making this safer is not a priority for us just to get our example up and running.

We are very soon at the finish line, just one more function to go. This one should be possible to understand without much comments if you've gone through the previous chapters:

#[naked]
unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
    asm!("
        mov     %rsp, 0x00($0)
        mov     %r15, 0x08($0)
        mov     %r14, 0x10($0)
        mov     %r13, 0x18($0)
        mov     %r12, 0x20($0)
        mov     %rbx, 0x28($0)
        mov     %rbp, 0x30($0)

        mov     0x00($1), %rsp
        mov     0x08($1), %r15
        mov     0x10($1), %r14
        mov     0x18($1), %r13
        mov     0x20($1), %r12
        mov     0x28($1), %rbx
        mov     0x30($1), %rbp
        ret
        
        "
    : "=*m"(old)
    : "r"(new)
    :
    : "alignstack" // needed to work on windows
    );
}

So here is our inline Assembly. As you remember from our first example this is just a bit more elaborate where we first read out the values of all the registers we need and then sets all the register values to the register values we saved when we suspended execution on the "new" thread.

This is essentially all we need to do to save and resume execution.

{% hint style="info" %} Most of this inline assembly is explained in the end of the chapter An example we can build upon so if this seems foreign to you, go and read that part of the chapter and come back. {% endhint %}

There are two things in this function that differs from our first function:

The "=*m" constraint on our output parameter is new. As i warned before, inline assembly can be a bit gnarly, but this indicates that we provide a pointer to a memory location so we want to de-reference the memory location and write the values to the de-referenced location.

0x00($1) # 0
0x08($1) # 8
0x10($1) # 16
0x18($1) # 24

I mentioned this briefly, but here you see it in action. These are hexnumbers indicating the offset from the memory pointer to which we want to read/write. I wrote down the base-10 numbers as comments so you see we only offset the pointer in 8 byte steps which is the same size as the u64fields on our ThreadContextstruct.

This is also why it's important to annotate ThreadContextwith #[repr(C)]so we know that the data will be represented in memory this way and we write to the right field. The Rust ABI makes no guarantee that they are represented in the same order in memory, however the C-ABI does.

The main function

fn main() {
    let mut runtime = Runtime::new();
    runtime.init();

    runtime.spawn(|| {
        println!("THREAD 1 STARTING");
        let id = 1;
        for i in 0..10 {
            println!("thread: {} counter: {}", id, i);
            yield_thread();
        }
    });

    runtime.spawn(|| {
        println!("THREAD 2 STARTING");
        let id = 2;
        for i in 0..15 {
            println!("thread: {} counter: {}", id, i);
            yield_thread();
        }
    });
    
    runtime.run();
}

As you see here we initialize our runtime and spawn two threads one that counts to 10 and yieldsbetween each count, and one that counts to 15. When we cargo runour project we should get the following output:

Finished dev [unoptimized + debuginfo] target(s) in 2.17s
Running `target/debug/green_threads`
THREAD 1 STARTING
thread: 1 counter: 0
THREAD 2 STARTING
thread: 2 counter: 0
thread: 1 counter: 1
thread: 2 counter: 1
thread: 1 counter: 2
thread: 2 counter: 2
thread: 1 counter: 3
thread: 2 counter: 3
thread: 1 counter: 4
thread: 2 counter: 4
thread: 1 counter: 5
thread: 2 counter: 5
thread: 1 counter: 6
thread: 2 counter: 6
thread: 1 counter: 7
thread: 2 counter: 7
thread: 1 counter: 8
thread: 2 counter: 8
thread: 1 counter: 9
thread: 2 counter: 9
THREAD 1 FINISHED.
thread: 2 counter: 10
thread: 2 counter: 11
thread: 2 counter: 12
thread: 2 counter: 13
thread: 2 counter: 14
THREAD 2 FINISHED.

Beautiful!! Our threads alternates since they yield control on each count until thread 1 finishes and thread 2 counts the last numbers before it finishes its task.

Congratulations

You have now implemented a super simple, but working, example of green threads. It was quite a ride we had to take, but if you came this far and read through everything you deserve a little break. Thanks for reading!