Skip to content

mernux/mthread

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

The mthread Library

mthread logo

The mthread C++ library is a set of tools designed to simplify the development of multithreaded applications. The goal is to provide a small, easy-to-learn API for event-based communication between threads.

mthread has been developed by Mario Laux.

Usage

mthread is a single-file header-only library with no external dependencies for C++11 or higher. You can simply #include "mthread.h" and you are good to go. The standard library header <thread> is required. Everything is defined within the namespace mthread, so you can either use using namespace mthread; or specify the namespace explicitly where needed. Documentation of the public API can be found in the header file, but the following tutorial is probably the better starting point.

TL;DR As a rule of thumb, if you pass around mthread's objects by value (and/or move), you are most likely using mthread correctly. Do not share instances between threads by reference or pointer, do not use static globals.

If you are looking for a more complex project showing mthread in action, you can check out the worked example in ./nim.

All assets, this tutorial, the example project, and the header file are all published under the MIT license. Internal benchmarks and tests are not part of this public repository.

Conceptual Overview

mthread provides three ingredients to make life easier:

  1. Task queues provide the means to sequentialize dynamically generated tasks. They can receive tasks from multiple threads simultaneously and process them one by one, possibly deferring a task until the executing worker is ready.
  2. Event emitters and observers provide a mechanism for exchanging pieces of data between threads. The receiver must not only specify a callback function to handle an event, but must also determine from which task queue that callback shall be invoked. This mostly eliminates the need for additional synchronization from within event handlers.
  3. A synchronizing pointer is a type of smart-pointer that manages access to a resource that is shared between threads. It ensures that the resource is only accessed by one thread at a time and that it exists for as long as it might potentially be needed.

We use C++17 throughout this tutorial because it allows for a cleaner syntax in many places.

Task Queues

Events such as the completion of a computation, new input from a sensor or from a user interface, or a change of the logical state of an application can happen simultaneously and at unpredictable points in time. In order for a thread to be able to react to such events, it needs a mechanism to receive them, order them, and then process them one at a time once it has the capacity to do so.

task queue overview

A task queue in mthread is managed by a single worker and one or more to-do lists. The worker processes the tasks from the task queue one at a time while the instructing to-do lists can add new tasks, potentially from multiple threads concurrently. There is also an animated version of the above illustration.

Construction

We first demonstrate how a new task queue can be constructed:

// create a new task queue: this function returns a pair
// of a to-do list and a worker
auto tq = make_task_queue();
TodoList & todo_list = tq.first;
Worker & worker = tq.second;

Structured binding declarations allow for a less verbose syntax, which we will use from now on:

// create a new task queue
auto [ todo_list, worker ] = make_task_queue();

To-do lists can be copied and distributed throughout the program. Since task queues are meant to process tasks sequentially, there can only be one worker per task queue. Thus, workers can only be moved.

Adding Tasks

To-do lists push tasks to their associated task queue for future processing:

// add a new task (e.g. a lambda) to the task queue using
// one of its to-do lists
todo_list.push([] {
    std::cout << "Hello from a task.\n";
});

// once the to-do list is no longer used, we can explicitly
// release control over the task queue
todo_list.release();

Now the task queue contains a single task. However, this task will only be executed once the worker is passed to one of the two execution algorithms.

Synchronous Execution

A worker can run synchronously in the thread that calls run_sync:

// pass ownership (move) to the run_sync algorithm
run_sync(std::move(worker));

This will print "Hello from a task." to the console. The run_sync algorithm processes the tasks in order and blocks until ...

  • there are no instructing to-do lists left, and
  • all the scheduled tasks have been processed.

Thus, had we not explicitly released the to-do list, the worker would have kept waiting for new tasks forever.

Asynchronous Execution

Alternatively, a worker can be run asynchronously in its own thread by means of run_async:

// pass ownership (move) to the run_async algorithm
ExecutionHandle ex_handle = run_async(std::move(worker));
ex_handle.finish();

This will eventually also print "Hello from a task." to the console. The run_async algorithm spawns a new thread, in which the scheduled tasks are processed in order. run_async never blocks, but returns an execution handle instead, which represents ownership of and control over the execution in the newly created thread. Execution handles can only be moved.

The execution handle can be used to control when and how the execution shall terminate. There are two methods:

  • ex_handle.finish(); prevents new tasks from being added and blocks until all previously scheduled tasks have been completed.
  • ex_handle.abort(); prevents new tasks from being added, discards tasks that are currently scheduled for future processing and blocks until the task that is currently being processed has been completed.

One should always use one of these two methods once the worker shall no longer wait for incoming tasks. This eliminates the need to make sure that all instructing to-do lists are dropped. If neither of these two methods is used, the destructor of the execution handle blocks until the termination criteria of run_sync are met.

Example: Concurrent Scheduling

We will now demonstrate how multiple threads can schedule tasks simultaneously:

// create new task queue
auto [ todo_list, worker ] = make_task_queue();

// immediately start executing incoming tasks asynchronously
auto ex_handle = run_async(std::move(worker));

// create a counter, which gets incremented once per
// executed task
std::size_t count = 0;

// make four threads schedule a thousand increments of the
// counter each
std::array<std::thread, 4> scheduling_threads {};
for (auto & t : scheduling_threads) {
    t = std::thread(
        // capture by reference is a red flag (!),
        // see explanation below
        [&count_ref = count, todo_list_copy = todo_list] {
            for (int i = 0; i < 1000; ++i) {
                // schedule an increment
                todo_list_copy.push([&count_ref] {
                    ++count_ref;
                });
            }
        }
    );
}

// wait until the scheduling is complete
for (auto & t : scheduling_threads) t.join();

// wait until all the scheduled tasks are completed
ex_handle.finish();

// observe the effect (prints 4000)
std::cout << count << std::endl;

In this example, four threads simultaneously add a thousand tasks to the task queue via their own to-do list.

Each task holds a reference to a counter, which it increments once executed. Note that this can be done without further synchronization: While the scheduling happens concurrently, the modifications of the counter only happen from within the task queue and thus sequentially in the worker's thread.

We use a reference to count to keep the example simple. However, a lambda capturing a reference (or a raw pointer) is generally a huge red flag in multithreaded programs: Making sure that a reference (or a raw pointer) stays valid is error-prone, even more so across thread boundaries. Consider using a std::shared_ptr in such cases.

Note that we don't have to make sure that all to-do lists are dropped thanks to ex_handle.finish();. If we had used ex_handle.abort();, some tasks could have been discarded and a smaller count might be observed.

Events

In software design, event-based communication is sometimes considered as a decoupling pattern because the portion of the program that causes the event to happen does not have to care about who will react to it and how.

Certain types of events may require additional data to fully describe their occurrences. While a button press in a user interface would "just happen", a click somewhere might be accompanied by two integer values describing where exactly the user has clicked.

event system overview

An event in mthread is defined by a set of emitters, a set of observers, and a set of subscriptions. Observers are used to generate subscriptions. A subscription consists of a callback and a to-do list: each time the event is emitted, the to-do list pushes an invocation of the callback to its task queue. There is also an animated version of the above illustration.

Construction

Let's say we want to be able to emit, observe, and later also react to a click event:

// create a new emitter-observer-pair for a "click" event
// whose occurrences are described by the two integer pixel
// coordinates of where the user has clicked
EventEmitter<int, int> emitter;
EventObserver<int, int> observer;
make_event(emitter, observer);

Here, we first declare an emitter and an observer. Then the free function make_event connects the two to form a new event. It might be helpful to name the created instances in a way that reflects the concrete event, e.g. click_emitter and click_observer. We omit this for brevity.

There is also an equivalent overload, which might be more convenient depending on the situation:

auto [ emitter, observer ] = make_event<int, int>();

Remember the order as "emit before observe". Emitters and observers can be copied and distributed throughout the program.

Subscribing

In the above example, we might have a thread handling all the program logic that would want to react to the click event. Such a reaction is specified in terms of a callback, which is a function that accepts the event's data arguments.

Conceptually, the callback shall be invoked every time the event occurs. However, our logic thread might be busy with something else at the exact point in time when the click event happens. This is why callbacks in mthread are not called immediately, but rather have their invocation pushed to a task queue:

auto [ todo_list, worker ] = make_task_queue();

// subscribe to the event
Subscription<int, int> sub = observer.subscribe(
    [] (int const & x, int const & y) {
        std::cout << x << ", " << y << '\n';
    },
    todo_list // this creates a copy
);

Here, the receiving thread subscribes a callback to the click event, which accepts the coordinates and simply prints them to the console. For every occurrence of the event, the invocation of this callback will be placed into the task queue via the to-do list. The subscription will remain active for as long as sub exists or until it is explicitly released via sub.release(). Remember that the subscription contains a to-do list: If the instructed worker is run synchronously, it will wait for the event to happen for at least as long as this subscription exists.

The callback does not have to be a lambda; any callable will do as long as it accepts the event's data. Subscriptions can only be moved.

Emitting

In our click event example, there will be a thread running the graphical user interface. This thread will detect any clicks and can then simply emit the corresponding event to make it known to all subscriptions:

emitter.emit(42, 101);

Intercepting Events

In some cases, it might be desirable to intercept an event right when it is emitted instead of automatically pushing it to a task queue. This can be done by using a custom target that simply provides its own thread-safe .push method (just like the TodoList type in mthread does). To illustrate the point:

struct InvokeImmediately {

    template<typename FT>
    void push(FT && f) const {
        std::forward<FT>(f)();
    }

};

An instance of InvokeImmediately could now be used as a target. Since the target's .push method is invoked from the emitting thread, this will effectively invoke the callbacks immediately from there:

auto [ emitter, observer ] = make_event<>();
InvokeImmediately target;

auto sub = observer.subscribe(
    []() { std::cout << "observed\n"; },
    target
);

// "observed" is printed before "done"
emitter.emit();
std::cout << "done\n";

This example just serves as a quick illustration of the concept. Processing events from within the emitting thread (while they are still being emitted!) is generally not advised. This feature is intended for cases where the invocation of callbacks must be forwarded to external systems that use their own sequentialization mechanisms.

Example: Logging

As a little example, we want to show how a logging class could be used to collect and log messages coming from different points in a program. Printing messages originating from different threads to, say, the console requires proper synchronization. The event-based communication model fits this problem well. Let's have a look at the code first:

class Logger {

    EventObserver<std::string> m_message_obs;
    Subscription<std::string> m_message_sub;

    ExecutionHandle m_ex_handle;

    bool is_running;

public:

    Logger(EventObserver<std::string> message_observer)
        : m_message_obs { std::move(message_observer) }
        , is_running { false }
    {}

    Logger(Logger &&) noexcept = default;

    Logger & operator=(Logger &&) noexcept = default;

    ~Logger() {
        stop();
    }

    void start() {

        if (is_running) return;
        is_running = true;

        auto [ todo_list, worker ] = make_task_queue();

        m_message_sub = m_message_obs.subscribe(
            [] (std::string const & message) {
                std::cout << message << std::endl;
            },
            todo_list
        );

        m_ex_handle = run_async(std::move(worker));

    }

    void stop() {

        if (!is_running) return;
        is_running = false;

        m_message_sub.release();
        m_ex_handle.finish();

    }

};

As the Logger type should react to incoming messages, it is constructed from an already existing observer. The public interface is pretty simple: The .start() method subscribes to the message event and starts processing incoming messages asynchronously. The .stop() method simply cancels the subscription and waits until all pending messages have been logged.

Note that the Logger type automatically became move-only as it contains the move-only data member m_ex_handle. This is exactly the behavior that we want because having two copies of the same logger and starting them both would defeat the whole purpose of synchronizing the output of messages.

A simulation could look like this:

// first, create the message event
auto [ msg_emt, msg_obs ] = make_event<std::string>();

// create a logger and start it immediately
Logger logger(msg_obs);
logger.start();

// simulate messages coming from different threads
std::array<std::thread, 4> sending_threads;
for (int n = 0; n < 4; ++n) {
    sending_threads[n] = std::thread([emt = msg_emt, n] {
        for (int i = 0; i < 3; ++i) {
            std::this_thread::sleep_for(
                std::chrono::milliseconds(4 - n + i)
            );
            emt.emit(
                "Message i=" + std::to_string(i) +
                " from thread n=" + std::to_string(n)
            );
        }
    });
}
for (auto & t : sending_threads) t.join();

// stop the logger once we don't care about new messages
// anymore (since we have waited for the threads to
// complete, we expect all twelve messages to be printed
// before this call completes)
logger.stop();

The emitter msg_emt is, again, passed to the individual threads by value, i.e. each thread uses its own copy. The synchronization is achieved automatically as the actual logging only happens from within the logger's task queue and thus sequentially.

Note that we are guaranteed sequential consistency: while messages from different threads may interleave, the messages from a single thread will be logged in the order in which they were emitted.

Synchronizing Pointers

It is generally advised to minimize the amount of data that is explicitly shared between threads. In many cases it is sufficient to send messages from one thread to another in the form of events. However, if data must be shared, a synchronizing pointer can help to do so safely.

A sync_ptr<T> can be constructed from a resource managed by a std::unique_ptr<T>:

// create a new int resource initialized to 42
auto resource = std::make_unique<int>(42);

// transfer ownership (move) of the resource to a new
// synchronizing pointer
sync_ptr<int> syp { std::move(resource) };

There is an equivalent, more convenient way to construct such a shared resource, namely the make_sync function. It forwards its arguments to its type's constructor (just like std::make_unique does):

// create a new int resource managed by a sync_ptr and
// initialize it to 42
auto syp = make_sync<int>(42);

In order to access a resource managed by a synchronizing pointer, it has to be locked first. This results in a handle representing temporary exclusive ownership. The handle is a std::unique_ptr:

// obtain a handle to the resource (while this handle is
// held, all other calls to lock() will block)
sync_ptr<int>::handle_type up = syp.lock();

// use the handle just like a std::unique_ptr (prints 42)
std::cout << *up << std::endl;

// once the resource is no longer needed, the handle should
// be released such that other threads can obtain a handle
// (this is done automatically once up goes out of scope,
// but we can also give up ownership explicitly)
up.reset();

A handle obtained from a synchronizing pointer represents proper exclusive ownership, and it will not be invalidated if the synchronizing pointer from which it was obtained gets destroyed. The resource will exist for at least as long as the handle is held.

Locking an empty synchronizing pointer will throw an exception. A synchronizing pointer might be empty after default construction or after being moved from. A sync_ptr can be converted to bool just like the smart pointer types unique_ptr and shared_ptr to check whether it is empty.

Now we demonstrate how the resource can be shared between several threads that want to modify it concurrently:

// make four threads increment the shared resource a
// thousand times each
std::array<std::thread, 4> modifying_threads {};
for (auto & t : modifying_threads) {
    // sync_ptr is captured *by value*, i.e. a copy is made
    // (each thread uses its own instance of sync_ptr, but
    // all of them manage the same resource)
    t = std::thread([syp_copy = syp] {
        for (int i = 0; i < 1000; ++i) {
            // obtain ownership and then increment (this
            // could also be done in a single line)
            auto up = syp_copy.lock();
            ++(*up);
            // here, up gets destroyed, which allows other
            // threads to lock their syp_copy
        }
    });
}

// wait until all modifying threads are done
for (auto & t : modifying_threads) t.join();

// observe the effect (prints 4042)
std::cout << *syp.lock() << std::endl;

Note that each thread receives its own copy of syp, which was renamed to syp_copy for emphasis. As a general rule, don't share anything between threads by reference or pointer (including the this-pointer). The types provided by mthread are designed to be passed around by value and copies are cheap.

About

A single-header C++ library for event-driven communication between threads

Topics

Resources

License

Stars

Watchers

Forks

Languages