Skip to content

Latest commit

 

History

History
2886 lines (2332 loc) · 108 KB

TUTORIAL.md

File metadata and controls

2886 lines (2332 loc) · 108 KB

MicroPython asyncio: a tutorial

This tutorial is intended for users having varying levels of experience with asyncio and includes a section for complete beginners. It is based on the current version of asyncio, V3.0.0. Most code samples are complete scripts which can be cut and pasted at the REPL.

See this overview for a summary of resources for asyncio including device drivers, debugging aids, and documentation.

The name of the module was formerly uasyncio. To run the demo scripts on old firmware please use

import uasyncio as asyncio

Contents

  1. Introduction
    0.1 Installing asyncio primitives Extensions used in the demos.
  2. Cooperative scheduling
    1.1 Modules
         1.1.1 Primitives
         1.1.2 Demo programs
         1.1.3 Device drivers
  3. asyncio concept
    2.1 Program structure
    2.2 Coroutines and Tasks
         2.2.1 Queueing a task for scheduling
         2.2.2 Running a callback function
         2.2.3 Notes Coros as bound methods. Returning values.
         2.2.4 A typical firmware app Avoiding a minor error
    2.3 Delays
  4. Synchronisation
    3.1 Lock
    3.2 Event
         3.2.1 Wait on multiple events Pause until 1 of N events is set.
    3.3 Coordinating multiple tasks
         3.3.1 gather
    3.4 Semaphore
         3.4.1 BoundedSemaphore
    3.5 Queue
    3.6 ThreadSafeFlag Synchronisation with asynchronous events and interrupts.
         3.6.1 Querying a ThreadSafeFlag Check its state without blocking.
    3.7 Barrier
    3.8 Delay_ms Software retriggerable delay.
    3.9 Message
    3.10 Synchronising to hardware Debouncing switches, pushbuttons, ESP32 touchpads and encoder knobs. Taming ADC's.
  5. Designing classes for asyncio
    4.1 Awaitable classes
         4.1.1 Use in context managers
         4.1.2 Portable code
    4.2 Asynchronous iterators
    4.3 Asynchronous context managers
    4.4 Object scope What happens when an object goes out of scope.
  6. Exceptions timeouts and cancellation
    5.1 Exceptions
         5.1.1 Global exception handler
         5.1.2 Keyboard interrupts
    5.2 Cancellation and Timeouts
         5.2.1 Task cancellation
         5.2.2 Tasks with timeouts
         5.2.3 Cancelling running tasks A "gotcha".
  7. Interfacing hardware
    6.1 Timing issues
    6.2 Polling hardware with a task
    6.3 Using the stream mechanism
         6.3.1 A UART driver example
    6.4 Writing streaming device drivers
    6.5 A complete example: aremote.py A driver for an IR remote control receiver.
    6.6 Driver for HTU21D A temperature and humidity sensor.
  8. Hints and tips
    7.1 Program hangs
    7.2 asyncio retains state
    7.3 Garbage Collection
    7.4 Testing
    7.5 A common error This can be hard to find.
    7.6 Socket programming
         7.6.1 WiFi issues
    7.7 CPython compatibility and the event loop Compatibility with CPython 3.5+
    7.8 Race conditions
    7.9 Undocumented asyncio features
  9. Notes for beginners
    8.1 Problem 1: event loops
    8.2 Problem 2: blocking methods
    8.3 The asyncio approach
    8.4 Scheduling in asyncio
    8.5 Why cooperative rather than pre-emptive?
    8.6 Communication
  10. Polling vs Interrupts A common source of confusion.
  11. Interfacing threaded code Taming blocking functions. Multi core coding.

0. Introduction

Most of this document assumes some familiarity with asynchronous programming. For those new to it an introduction may be found in section 8.

The MicroPython asyncio library comprises a subset of Python's asyncio library. It is designed for use on microcontrollers. As such it has a small RAM footprint and fast context switching with zero RAM allocation. This document describes its use with a focus on interfacing hardware devices. The aim is to design drivers in such a way that the application continues to run while the driver is awaiting a response from the hardware. The application remains responsive to events such as user interaction.

Another major application area for asyncio is in network programming: many guides to this may be found online.

Note that MicroPython is based on Python 3.4 with additions from later versions. This version of asyncio supports a subset of CPython 3.8 asyncio. This document identifies supported features. Except where stated program samples run under MicroPython and CPython 3.8.

This tutorial aims to present a consistent programming style compatible with CPython V3.8 and above.

0.1 Installing asyncio primitives

This repository has optional unofficial primitives and extensions. To install these, connect the target hardware to WiFi and issue:

import mip
mip.install("github:peterhinch/micropython-async/v3/primitives")
mip.install("github:peterhinch/micropython-async/v3/threadsafe")

For non-networked targets use mpremote as described in the official docs.

$ mpremote mip install github:peterhinch/micropython-async/v3/primitives
$ mpremote mip install github:peterhinch/micropython-async/v3/threadsafe

1. Cooperative scheduling

The technique of cooperative multi-tasking is widely used in embedded systems. It offers lower overheads than pre-emptive scheduling and avoids many of the pitfalls associated with truly asynchronous threads of execution.

1.1 Modules

1.1.1 Primitives

The directory primitives contains a Python package containing the following:

  • Synchronisation primitives: "micro" versions of CPython's classes.
  • Additional Python primitives including a software retriggerable delay class and a MicroPython optimised ringbuf_queue.
  • Primitives for interfacing hardware. These comprise classes for debouncing switches and pushbuttons, an Encoder class and an asynchronous ADC class. These are documented here.
  • Primitives for event-based coding which aims to reduce the use of callbacks and is discussed here.

The directory threadsafe includes primitives designed to interface asyncio tasks to code running on other threads. These are documented here.

See above for installation.

1.1.2 Demo programs

The directory as_demos contains various demo programs implemented as a Python package. Copy the directory and its contents to the target hardware.

The first two are the most immediately rewarding as they produce visible results by accessing Pyboard hardware. With all demos, issue ctrl-d between runs to soft reset the hardware.

  1. aledflash.py Flashes three Pyboard LEDs asynchronously for 10s. Requires any Pyboard.
  2. apoll.py A device driver for the Pyboard accelerometer. Demonstrates the use of a task to poll a device. Runs for 20s. Requires a Pyboard V1.x.
  3. roundrobin.py Demo of round-robin scheduling. Also a benchmark of scheduling performance. Runs for 5s on any target.
  4. auart.py Demo of streaming I/O via a Pyboard UART. Requires a link between X1 and X2.
  5. auart_hd.py Use of the Pyboard UART to communicate with a device using a half-duplex protocol e.g. devices such as those using the 'AT' modem command set. Link X1-X4, X2-X3.
  6. gather.py Use of gather. Any target.
  7. iorw.py Demo of a read/write device driver using the stream I/O mechanism. Requires a Pyboard.
  8. rate.py Benchmark for asyncio. Any target.

Demos are run using this pattern:

import as_demos.aledflash

1.1.3 Device drivers

These are installed by copying the as_drivers directory and contents to the target. They have their own documentation as follows:

  1. A driver for GPS modules Runs a background task to read and decode NMEA sentences, providing constantly updated position, course, altitude and time/date information.
  2. HTU21D An I2C temperature and humidity sensor. A task periodically queries the sensor maintaining constantly available values.
  3. NEC IR A decoder for NEC IR remote controls. A callback occurs whenever a valid signal is received.
  4. HD44780 Driver for common character based LCD displays based on the Hitachi HD44780 controller
  5. I2C Use Pyboard I2C slave mode to implement a UART-like asynchronous stream interface. Uses: communication with ESP8266, or (with coding) interface a Pyboard to I2C masters.

2. asyncio concept

The asyncio concept is of cooperative multi-tasking based on coroutines (coros). A coro is similar to a function but is intended to run concurrently with other coros. The illusion of concurrency is achieved by periodically yielding to the scheduler, enabling other coros to be scheduled.

2.1 Program structure

Consider the following example:

import asyncio
async def bar():
    count = 0
    while True:
        count += 1
        print(count)
        await asyncio.sleep(1)  # Pause 1s

asyncio.run(bar())

Program execution proceeds normally until the call to asyncio.run(bar()). At this point, execution is controlled by the scheduler. A line after asyncio.run(bar()) would never be executed. The scheduler runs bar because this has been placed on the scheduler's queue by asyncio.run(bar()). In this trivial example, there is only one task: bar. If there were others, the scheduler would schedule them in periods when bar was paused:

import asyncio
async def bar(x):
    count = 0
    while True:
        count += 1
        print('Instance: {} count: {}'.format(x, count))
        await asyncio.sleep(1)  # Pause 1s

async def main():
    tasks = [None] * 3  # For CPython compaibility must store a reference see 2.2 Note
    for x in range(3):
        tasks[x] = asyncio.create_task(bar(x))
    await asyncio.sleep(10)

asyncio.run(main())

In this example, three instances of bar run concurrently. The asyncio.create_task method returns immediately but schedules the passed coro for execution. When main sleeps for 10s the bar instances are scheduled in turn, each time they yield to the scheduler with await asyncio.sleep(1).

In this instance main() terminates after 10s. This is atypical of embedded asyncio systems. Normally the application is started at power up by a one line main.py and runs forever.

2.2 Coroutines and Tasks

The fundamental building block of asyncio is a coro. This is defined with async def and usually contains at least one await statement. This minimal example waits 1 second before printing a message:

async def bar():
    await asyncio.sleep(1)
    print('Done')

Just as a function does nothing until called, a coro does nothing until awaited or converted to a Task. The create_task method takes a coro as its argument and returns a Task instance, which is scheduled for execution. In

async def foo():
    await coro

coro is run with await pausing until coro has completed. Sometimes coros and tasks are interchangeable: the CPython docs refer to them as awaitable, because either may be the target of an await. Consider this:

import asyncio
async def bar(t):
    print('Bar started: waiting {}secs'.format(t))
    await asyncio.sleep(t)
    print('Bar done')

async def main():
    await bar(1)  # Pauses here until bar is complete
    task = asyncio.create_task(bar(5))
    await asyncio.sleep(0)  # bar has now started
    print('Got here: bar running')  # Can run code here
    await task  # Now we wait for the bar task to complete
    print('All done')
asyncio.run(main())

There is a crucial difference between create_task and await: the former is synchronous code and returns immediately, with the passed coro being converted to a Task and queued to run "in the background". By contrast, await causes the passed Task or coro to run to completion before the next line executes. Consider these lines of code:

await asyncio.sleep(delay_secs)
await asyncio.sleep(0)

The first causes the code to pause for the duration of the delay, with other tasks being scheduled for this duration. A delay of 0 causes any pending tasks to be scheduled in round-robin fashion before the following line is run. See the roundrobin.py example.

If a Task is run concurrently with .create_task it may be cancelled. The .create_task method returns the Task instance which may be saved for status checking or cancellation. See note below.

In the following code sample three Task instances are created and scheduled for execution. The "Tasks are running" message is immediately printed. The three instances of the task bar appear to run concurrently. In fact, when one pauses, the scheduler grants execution to the next, giving the illusion of concurrency:

import asyncio
async def bar(x):
    count = 0
    while True:
        count += 1
        print('Instance: {} count: {}'.format(x, count))
        await asyncio.sleep(1)  # Pause 1s

async def main():
    tasks = [None] * 3  # For CPython compaibility must store a reference see 2.2 Note
    for x in range(3):
        tasks[x] = asyncio.create_task(bar(x))
    print('Tasks are running')
    await asyncio.sleep(10)

asyncio.run(main())

Note on CPython compatibility

The CPython docs have a warning that a reference to the task instance should be saved for the task's duration. This was raised in this issue. MicroPython asyncio does not suffer from this bug, but writers of code which must work in CPython and MicroPython should take note. Code samples in this doc are CPython-compatible, but following version is valid in MicroPython:

import asyncio
async def bar(x):
    count = 0
    while True:
        count += 1
        print('Instance: {} count: {}'.format(x, count))
        await asyncio.sleep(1)  # Pause 1s

async def main():
    for x in range(3):
        asyncio.create_task(bar(x))  # No reference stored
    print('Tasks are running')
    await asyncio.sleep(10)

asyncio.run(main())

2.2.1 Queueing a task for scheduling

  • asyncio.create_task Arg: the coro to run. The scheduler converts the coro to a Task and queues the task to run ASAP. Return value: the Task instance. It returns immediately. The coro arg is specified with function call syntax with any required arguments passed.
  • asyncio.run Arg: the coro to run. Return value: any value returned by the passed coro. The scheduler queues the passed coro to run ASAP. The coro arg is specified with function call syntax with any required arguments passed. In the current version the run call returns when the task terminates. However, under CPython, the run call does not terminate.
  • await Arg: the task or coro to run. If a coro is passed it must be specified with function call syntax. Starts the task ASAP. The awaiting task blocks until the awaited one has run to completion. As described in section 2.2, it is possible to await a task which has already been started. In this instance, the await is on the task object (function call syntax is not used).

The above are compatible with CPython 3.8 or above.

2.2.2 Running a callback function

Callbacks should be Python functions designed to complete in a short period of time. This is because tasks will have no opportunity to run for the duration. If it is necessary to schedule a callback to run after t seconds, it may be done as follows:

async def schedule(cb, t, *args, **kwargs):
    await asyncio.sleep(t)
    cb(*args, **kwargs)

In this example the callback runs after three seconds:

import asyncio

async def schedule(cbk, t, *args, **kwargs):
    await asyncio.sleep(t)
    cbk(*args, **kwargs)

def callback(x, y):
    print('x={} y={}'.format(x, y))

async def bar():
    asyncio.create_task(schedule(callback, 3, 42, 100))
    for count in range(6):
        print(count)
        await asyncio.sleep(1)  # Pause 1s

asyncio.run(bar())

2.2.3 Notes

Coros may be bound methods. A coro usually contains at least one await statement, but nothing will break (in MicroPython or CPython 3.8) if it has none.

Similarly to a function or method, a coro can contain a return statement. To retrieve the returned data issue:

result = await my_task()

It is possible to await completion of a set of multiple asynchronously running tasks, accessing the return value of each. This is done by asyncio.gather which launches the tasks and pauses until the last terminates. It returns a list containing the data returned by each task:

import asyncio

async def bar(n):
    for count in range(n):
        await asyncio.sleep_ms(200 * n)  # Pause by varying amounts
    print('Instance {} stops with count = {}'.format(n, count))
    return count * count

async def main():
    tasks = (bar(2), bar(3), bar(4))
    print('Waiting for gather...')
    res = await asyncio.gather(*tasks)
    print(res)

asyncio.run(main())

2.2.4 A typical firmware app

Most firmware applications run forever. This requires the coro passed to asyncio.run() to await a non-terminating coro.

To ease debugging, and for CPython compatibility, some "boilerplate" code is suggested in the sample below.

By default, an exception in a task will not stop the application as a whole from running. This can make debugging difficult. The fix shown below is discussed in 5.1.1.

It is bad practice to create a task prior to issuing asyncio.run(). CPython will throw an exception in this case. MicroPython does not, but it's wise to avoid doing this.

Lastly, asyncio retains state. This means that, by default, you need to reboot between runs of an application. This can be fixed with the new_event_loop method discussed in 7.2.

These considerations suggest the following application structure:

import asyncio
from my_app import MyClass

def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = MyClass()  # Constructor might create tasks
    task = asyncio.create_task(my_class.foo())  # Or you might do this
    await my_class.run_forever()  # Non-terminating method
try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()  # Clear retained state

2.3 Delays

Where a delay is required in a task there are two options. For longer delays and those where the duration need not be precise, the following should be used:

async def foo(delay_secs, delay_ms):
    await asyncio.sleep(delay_secs)
    print('Hello')
    await asyncio.sleep_ms(delay_ms)

While these delays are in progress the scheduler will schedule other tasks. This is generally highly desirable, but it does introduce uncertainty in the timing as the calling routine will only be rescheduled when the one running at the appropriate time has yielded. The amount of latency depends on the design of the application, but is likely to be on the order of tens or hundreds of ms; this is discussed further in Section 6.

Very precise delays may be issued by using the utime functions sleep_ms and sleep_us. These are best suited for short delays as the scheduler will be unable to schedule other tasks while the delay is in progress.

3 Synchronisation

There is often a need to provide synchronisation between tasks. A common example is to avoid what are known as "race conditions" where multiple tasks compete to access a single resource. These are discussed in section 7.8. Another hazard is the "deadly embrace" where two tasks each wait on the other's completion.

Another synchronisation issue arises with producer and consumer tasks. The producer generates data which the consumer uses. Asyncio provides the Queue object. The producer puts data onto the queue while the consumer waits for its arrival (with other tasks getting scheduled for the duration). The Queue guarantees that items are removed in the order in which they were received. Alternatively, a Barrier instance can be used if the producer must wait until the consumer is ready to access the data.

In simple applications, communication may be achieved with global flags or bound variables. A more elegant approach is to use synchronisation primitives. CPython provides the following classes:

  • Lock - already incorporated in new asyncio.
  • Event - already incorporated.
  • ayncio.gather - already incorporated.
  • Semaphore In this repository.
  • BoundedSemaphore. In this repository.
  • Condition. In this repository.
  • Queue. In this repository.

As the table above indicates, not all are yet officially supported. In the interim, implementations may be found in the primitives directory. The following classes which are non-standard, are also in that directory:

  • Message An ISR-friendly Event with an optional data payload.
  • Barrier Based on a Microsoft class, enables multiple coros to synchronise in a similar (but not identical) way to gather.
  • Delay_ms A useful software-retriggerable monostable, akin to a watchdog. Calls a user callback if not cancelled or regularly retriggered.

A further set of primitives for synchronising hardware are detailed in section 3.9.

To install the primitives, copy the primitives directory and contents to the target. A primitive is loaded by issuing (for example):

from primitives import Semaphore, BoundedSemaphore
from primitives import Queue

When asyncio acquires official versions of the CPython primitives, the invocation lines alone should be changed. E.g.:

from asyncio import Semaphore, BoundedSemaphore
from asyncio import Queue
Note on CPython compatibility

CPython will throw a RuntimeError on first use of a synchronisation primitive that was instantiated prior to starting the scheduler. By contrast, MicroPython allows instantiation in synchronous code executed before the scheduler is started. Early instantiation can be advantageous in low resource environments. For example, a class might have a large buffer and bound Event instances. Such a class should be instantiated early, before RAM fragmentation sets in.

The following provides a discussion of the primitives.

3.1 Lock

This describes the use of the official Lock primitive. This guarantees unique access to a shared resource.

from asyncio import Lock
lock = Lock()

Synchronous methods:

  • locked No args. Returns True if locked.
  • release No args. Releases the lock. See note below.

Asynchronous method:

  • acquire No args. Pauses until the lock has been acquired. Use by executing await lock.acquire().

A task waiting on a lock may be cancelled or may be run subject to a timeout. The normal way to use a Lock is in a context manager. In the following code sample a Lock instance lock has been created and is passed to all tasks wishing to access the shared resource. Each task attempts to acquire the lock, pausing execution until it succeeds.

import asyncio
from asyncio import Lock

async def task(i, lock):
    while 1:
        async with lock:
            print("Acquired lock in task", i)
            await asyncio.sleep(0.5)

async def main():
    lock = Lock()  # The Lock instance
    tasks = [None] * 3  # For CPython compaibility must store a reference see 2.2 Note
    for n in range(1, 4):
        tasks[n - 1] = asyncio.create_task(task(n, lock))
    await asyncio.sleep(10)

asyncio.run(main())  # Run for 10s

Use of a context manager is strongly recommended - otherwise an application must ensure that .release is only ever called when that same task has called .locked. Calling .release on an unlocked Lock will raise a ValueError. Calling it on a Lock which has been locked by another task will cause that second task to produce a ValueError when it attempts to release the Lock or when its context manager exits. Context managers avoid these issues.

For the brave the following illustrates use without a CM.

import asyncio
from asyncio import Lock

async def task(i, lock):
    while 1:
        await lock.acquire()
        print("Acquired lock in task", i)
        await asyncio.sleep(0.5)
        lock.release()

async def main():
    lock = Lock()  # The Lock instance
    tasks = [None] * 3  # For CPython compaibility must store a reference see 2.2 Note
    for n in range(1, 4):
        tasks[n - 1] = asyncio.create_task(task(n, lock))
    await asyncio.sleep(10)

asyncio.run(main())  # Run for 10s

3.2 Event

This describes the use of the official Event primitive.

This provides a way for one or more tasks to pause until another one flags them to continue. An Event object is instantiated and made accessible to all tasks using it:

import asyncio
from asyncio import Event

async def waiter(event):
    print('Waiting for event')
    await event.wait()  # Pause here until event is set
    print('Waiter got event.')
    event.clear()  # Flag caller and enable re-use of the event

async def main():
    event = Event()
    task = asyncio.create_task(waiter(event))
    await asyncio.sleep(2)
    print('Setting event')
    event.set()
    await asyncio.sleep(1)
    # Caller can check if event has been cleared
    print('Event is {}'.format('set' if event.is_set() else 'clear'))

asyncio.run(main())

Constructor: no args.
Synchronous Methods:

  • set Initiates the event.
  • clear No args. Clears the event.
  • is_set No args. Returns True if the event is set.

Asynchronous Method:

  • wait Pause until event is set.

Tasks wait on the event by issuing await event.wait(); execution pauses until another one issues event.set(). This causes all tasks waiting on the Event to be queued for execution. Note that the synchronous sequence

event.set()
event.clear()

will cause any tasks waiting on the event to resume in round-robin order. In general, the waiting task should clear the event, as in the waiter example above. This caters for the case where the waiting task has not reached the event at the time when it is triggered. In this instance, by the time the task reaches the event, the task will find it clear and will pause. This can lead to non-deterministic behaviour if timing is marginal.

The Event class is an efficient and effective way to synchronise tasks, but firmware applications often have multiple tasks running while True: loops. The number of Event instances required to synchronise these can multiply. Consider the case of one producer task feeding N consumers. The producer sets an Event to tell the consumer that data is ready; it then needs to wait until all consumers have completed before triggering them again. Consider these approaches:

  1. Each consumer sets an Event on completion. Producer waits until all Events are set before clearing them and setting its own Event.
  2. Consumers do not loop, running to completion. Producer uses gather to instantiate consumer tasks and wait on their completion.
  3. Event instances are replaced with a single Barrier instance.

Solution 1 suffers a proliferation of Events and suffers an inefficient busy-wait where the producer waits on N events. Solution 2 is inefficient with constant creation of tasks. Arguably the Barrier class is the best approach.

WARNING
Event methods must not be called from an interrupt service routine (ISR). The Event class is not thread safe. See ThreadSafeFlag.

3.2.1 Wait on multiple events

The WaitAny primitive allows a task to wait on a list of events. When one of the events is triggered, the task continues. It is effectively a logical or of events.

from primitives import WaitAny
evt1 = Event()
evt2 = Event()
# Launch tasks that might trigger these events
evt = await WaitAny((evt1, evt2)).wait()
# One or other was triggered
if evt is evt1:
    evt1.clear()
    # evt1 was triggered
else:
    evt2.clear()
    # evt2 was triggered

The WaitAll primitive is similar except that the calling task will pause until all passed Events have been set:

from primitives import WaitAll
evt1 = Event()
evt2 = Event()
wa = WaitAll((evt1, evt2)).wait()
# Launch tasks that might trigger these events
await wa
# Both were triggered

Awaiting WaitAll or WaitAny may be cancelled or subject to a timeout.

3.3 Coordinating multiple tasks

Several tasks may be launched together with the launching task pausing until all have completed. The gather mechanism is supported by CPython and MicroPython. CPython 3.11 adds a TaskGroup class which is particularly suited to applications where runtime exceptions may be encountered. It is not yet officially supported by MicroPython.

3.3.1 gather

This official asyncio asynchronous method causes a number of awaitables to run, pausing until all have either run to completion or been terminated by cancellation or timeout. It returns a list of the return values of each task.

Its call signature is

res = await asyncio.gather(*awaitables, return_exceptions=False)

awaitables may comprise tasks or coroutines, the latter being converted to tasks.

The keyword-only boolean arg return_exceptions determines the behaviour in the event of a cancellation or timeout of tasks. If False, the gather terminates immediately, raising the relevant exception which should be trapped by the caller. If True, the gather continues to pause until all have either run to completion or been terminated by cancellation or timeout. In this case, tasks which have been terminated will return the exception object in the list of return values.

The following script may be used to demonstrate this behaviour:

import asyncio

async def barking(n):
    print('Start barking')
    for _ in range(6):
        await asyncio.sleep(1)
    print('Done barking.')
    return 2 * n

async def foo(n):
    print('Start timeout coro foo()')
    while True:
        await asyncio.sleep(1)
        n += 1
    return n

async def bar(n):
    print('Start cancellable bar()')
    while True:
        await asyncio.sleep(1)
        n += 1
    return n

async def do_cancel(task):
    await asyncio.sleep(5)
    print('About to cancel bar')
    task.cancel()

async def main():
    tasks = [asyncio.create_task(bar(70))]
    tasks.append(barking(21))
    tasks.append(asyncio.wait_for(foo(10), 7))
    can = asyncio.create_task(do_cancel(tasks[0]))
    res = None
    try:
        res = await asyncio.gather(*tasks, return_exceptions=True)
    except asyncio.TimeoutError:  # These only happen if return_exceptions is False
        print('Timeout')  # With the default times, cancellation occurs first
    except asyncio.CancelledError:
        print('Cancelled')
    print('Result: ', res)

asyncio.run(main())

3.4 Semaphore

This is currently an unofficial implementation. Its API is as per CPython asyncio.

A semaphore limits the number of tasks which can access a resource. It can be used to limit the number of instances of a particular task which can run concurrently. It performs this using an access counter which is initialised by the constructor and decremented each time a task acquires the semaphore.

Constructor: Optional arg value default 1. Number of permitted concurrent accesses.

Synchronous method:

  • release No args. Increments the access counter.

Asynchronous method:

  • acquire No args. If the access counter is greater than 0, decrements it and terminates. Otherwise waits for it to become greater than 0 before decrementing it and terminating.

The easiest way to use it is with an asynchronous context manager. The following illustrates tasks accessing a resource one at a time:

import asyncio
from primitives import Semaphore

async def foo(n, sema):
    print('foo {} waiting for semaphore'.format(n))
    async with sema:
        print('foo {} got semaphore'.format(n))
        await asyncio.sleep_ms(200)

async def main():
    sema = Semaphore()
    tasks = [None] * 3  # For CPython compaibility must store a reference see 2.2 Note
    for num in range(3):
        tasks[num] = asyncio.create_task(foo(num, sema))
    await asyncio.sleep(2)

asyncio.run(main())

There is a difference between a Semaphore and a Lock. A Lock instance is owned by the coro which locked it: only that coro can release it. A Semaphore can be released by any coro which acquired it.

3.4.1 BoundedSemaphore

This is currently an unofficial implementation. Its API is as per CPython asyncio.

This works identically to the Semaphore class except that if the release method causes the access counter to exceed its initial value, a ValueError is raised.

3.5 Queue

Queue objects provide a means of synchronising producer and consumer tasks: the producer puts data items onto the queue with the consumer removing them. If the queue becomes full, the producer task will block, likewise if the queue becomes empty the consumer will block. Some queue implementations allow producer and consumer to run in different contexts: for example where one runs in an interrupt service routine or on a different thread or core from the asyncio application. Such a queue is termed "thread safe".

The Queue class is an unofficial implementation whose API is a subset of that of CPython's asyncio.Queue. Like asyncio.Queue this class is not thread safe. A queue class optimised for MicroPython is presented in Ringbuf Queue. A thread safe version is documented in ThreadSafeQueue.

Constructor:
Optional arg maxsize=0. If zero, the queue can grow without limit subject to heap size. If maxsize>0 the queue's size will be constrained.

Synchronous methods (immediate return):

  • qsize No arg. Returns the number of items in the queue.
  • empty No arg. Returns True if the queue is empty.
  • full No arg. Returns True if the queue is full.
  • put_nowait Arg: the object to put on the queue. Raises an exception if the queue is full.
  • get_nowait No arg. Returns an object from the queue. Raises an exception if the queue is empty.
  • task_done No arg. Indicate that a task associated with a dequeued item is complete.

Asynchronous methods:

  • put Arg: the object to put on the queue. If the queue is full, it will block until space is available.
  • get No arg. Returns an object from the queue. If the queue is empty, it will block until an object is put on the queue.
  • join No arg. Block until all items in the queue have been received and processed (indicated via task_done).
import asyncio
from primitives import Queue

async def slow_process():
    await asyncio.sleep(2)
    return 42

async def produce(queue):
    print('Waiting for slow process.')
    result = await slow_process()
    print('Putting result onto queue')
    await queue.put(result)  # Put result on queue

async def consume(queue):
    print("Running consume()")
    result = await queue.get()  # Blocks until data is ready
    print('Result was {}'.format(result))

async def queue_go(delay):
    queue = Queue()
    t1 = asyncio.create_task(consume(queue))
    t2 = asyncio.create_task(produce(queue))
    await asyncio.sleep(delay)
    print("Done")

asyncio.run(queue_go(4))

3.6 ThreadSafeFlag

See also Interfacing asyncio to interrupts. Because of this issue the ThreadSafeFlag class does not work under the Unix build.

This official class provides an efficient means of synchronising a task with a truly asynchronous event such as a hardware interrupt service routine or code running in another thread or on another core. It operates in a similar way to Event with the following key differences:

  • It is thread safe: the set event may be called from asynchronous code.
  • It is self-clearing.
  • Only one task may wait on the flag.

Synchronous methods:

  • set Triggers the flag. Like issuing set then clear to an Event.
  • clear Unconditionally clear down the flag.

Asynchronous method:

  • wait Wait for the flag to be set. If the flag is already set then it returns immediately.

Typical usage is having a asyncio task wait on a hard ISR. Only one task should wait on a ThreadSafeFlag. The hard ISR services the interrupting device, sets the ThreadSafeFlag, and quits. A single task waits on the flag. This design conforms with the self-clearing behaviour of the ThreadSafeFlag. Each interrupting device has its own ThreadSafeFlag instance and its own waiting task.

import asyncio
from pyb import Timer

tsf = asyncio.ThreadSafeFlag()

def cb(_):
    tsf.set()

async def foo():
    while True:
        await tsf.wait()
        # Could set an Event here to trigger multiple tasks
        print('Triggered')

tim = Timer(1, freq=1, callback=cb)

asyncio.run(foo())

An example based on one posted by Damien
Link pins X1 and X2 to test.

from machine import Pin, Timer
import asyncio

class AsyncPin:
    def __init__(self, pin, trigger):
        self.pin = pin
        self.flag = asyncio.ThreadSafeFlag()
        self.pin.irq(lambda pin: self.flag.set(), trigger, hard=True)

    async def wait_edge(self):
        await self.flag.wait()

async def foo():
    pin_in = Pin('X1', Pin.IN)
    async_pin = AsyncPin(pin_in, Pin.IRQ_RISING)
    pin_out = Pin('X2', Pin.OUT)  # Toggle pin to test
    t = Timer(-1, period=500, callback=lambda _: pin_out(not pin_out()))
    await asyncio.sleep(0)
    while True:
        await async_pin.wait_edge()
        print('Got edge.')

asyncio.run(foo())

The current implementation provides no performance benefits against polling the hardware: other pending tasks may be granted execution first in round-robin fashion. However the ThreadSafeFlag uses the I/O mechanism. There is a plan to provide a means to reduce the latency such that selected I/O devices are polled every time the scheduler acquires control. This will provide the highest possible level of performance as discussed in Polling vs Interrupts.

Regardless of performance issues, a key use for ThreadSafeFlag is where a hardware device requires the use of an ISR for a μs level response. Having serviced the device, the ISR flags an asynchronous routine, typically processing received data.

See Threadsafe Event for a thread safe class which allows multiple tasks to wait on it.

3.6.1 Querying a ThreadSafeFlag

The ThreadSafeFlag class has no equivalent to Event.is_set. A synchronous function which returns the state of a ThreadSafeFlag instance may be created as follows:

import asyncio
from select import poll, POLLIN
from time import ticks_us, ticks_diff

async def foo(tsf):  # Periodically set the ThreadSafeFlag
    while True:
        await asyncio.sleep(1)
        tsf.set()

def ready(tsf, poller):  # Return a function which returns tsf status
    r = (tsf, POLLIN)
    poller.register(*r)

    def is_rdy():
        return r in poller.ipoll(0)  # Immediate return

    return is_rdy

async def test():
    tsf = asyncio.ThreadSafeFlag()
    tsk = asyncio.create_task(foo(tsf))
    mpoll = poll()
    tsf_ready = ready(tsf, mpoll)  # Create a ready function
    for _ in range(25):  # Run for 5s
        if tsf_ready():
            print("tsf ready")
            t = ticks_us()
            await tsf.wait()
            print(f"got tsf in {ticks_diff(ticks_us(), t)}us")
        else:
            print("Not ready")
            await asyncio.sleep_ms(200)

asyncio.run(test())

The ready closure returns a nonblocking function which tests the status of a passed flag. In this example .wait() is not called until the flag has been set, consequently .wait() returns rapidly.

The select.poll mechanism works because ThreadSafeFlag is subclassed from io.IOBase and has an ioctl method.

3.7 Barrier

This is an unofficial implementation of a primitive supported in CPython 3.11. While similar in purpose to gather there are differences described below.

Its principal purpose is to cause multiple coros to rendezvous at a particular point. For example producer and consumer coros can synchronise at a point where the producer has data available and the consumer is ready to use it. At that point in time, the Barrier can optionally run a callback before releasing the barrier to allow all waiting coros to continue.

Secondly, it can allow a task to pause until one or more other tasks have terminated or passed a particular point. For example an application might want to shut down various peripherals before starting a sleep period. The task wanting to sleep initiates several shut down tasks and waits until they have triggered the barrier to indicate completion. This use case may also be served by gather.

The key difference between Barrier and gather is symmetry: gather is asymmetrical. One task owns the gather and awaits completion of a set of tasks. By contrast, Barrier can be used symmetrically with member tasks pausing until all have reached the barrier. This makes it suited for use in the while True: constructs common in firmware applications. Use of gather would imply instantiating a set of tasks on every pass of the loop.

gather provides access to return values; irrelevant to Barrier because passing a barrier does not imply return. Barrier now has an efficient implementation using Event to suspend waiting tasks.

The following is a typical usage example. A data provider acquires data from some hardware and transmits it concurrently on a number of interfaces. These run at different speeds. The Barrier synchronises these loops. This can run on a Pyboard.

import asyncio
from primitives import Barrier
from machine import UART
import ujson

data = None
async def provider(barrier):
    global data
    n = 0
    while True:
        n += 1  # Get data from some source
        data = ujson.dumps([n, 'the quick brown fox jumps over the lazy dog'])
        print('Provider triggers senders')
        await barrier  # Free sender tasks
        print('Provider waits for last sender to complete')
        await barrier

async def sender(barrier, swriter, n):
    while True:
        await barrier  # Provider has got data
        swriter.write(data)
        await swriter.drain()
        print('UART', n, 'sent', data)
        await barrier  # Trigger provider when last sender has completed

async def main():
    sw1 = asyncio.StreamWriter(UART(1, 9600), {})
    sw2 = asyncio.StreamWriter(UART(2, 1200), {})
    barrier = Barrier(3)
    tasks = [None] * 2  # For CPython compaibility must store a reference see 2.2 Note
    for n, sw in enumerate((sw1, sw2)):
        tasks[n] = asyncio.create_task(sender(barrier, sw, n + 1))
    await provider(barrier)

asyncio.run(main())

Constructor.
Mandatory arg:

  • participants The number of coros which will use the barrier.
    Optional args:
  • func Callback or coroutine to run. Default None.
  • args Tuple of args for the callback. Default ().

Public synchronous methods:

  • busy No args. Returns True if at least one task is waiting on the barrier.
  • trigger No args. The barrier records that the coro has passed the critical point. Returns "immediately".
  • result No args. If a callback was provided, returns the return value from the callback. If a coro, returns the Task instance. See below.

The callback can be a function or a coro. Typically a function will be used; it must run to completion beore the barrier is released. A coro will be promoted to a Task and run asynchronously. The Task may be retrieved (e.g. for cancellation) using the result method.

If a coro waits on a barrier, it should issue an await prior to accessing the result method. To guarantee that the callback has run it is necessary to wait until all participant coros have passed the barrier.

Participant coros issue await my_barrier whereupon execution pauses until all other participants are also waiting on it. At this point any callback will run and then each participant will re-commence execution. See barrier_test and semaphore_test in asyntest.py for example usage.

A special case of Barrier usage is where some coros are allowed to pass the barrier, registering the fact that they have done so. At least one coro must wait on the barrier. That coro will pause until all non-waiting coros have passed the barrier, and all waiting coros have reached it. At that point all waiting coros will resume. A non-waiting coro issues barrier.trigger() to indicate that is has passed the critical point.

3.8 Delay_ms class

This implements the software equivalent of a retriggerable monostable or a watchdog timer. On timeout it can launch a callback or coroutine. It exposes an Event allowing a task to pause until a timeout occurs. The delay period may be altered dynamically.

It may be found in the primitives directory and is documented in Delay_ms class.

3.9 Message

The Message class uses ThreadSafeFlag to provide an object similar to Event with the following differences:

  • .set() has an optional data payload.
  • .set() can be called from another thread, another core, or from an ISR.
  • It is an awaitable class.
  • Payloads may be retrieved in an asynchronous iterator.
  • Multiple tasks can wait on a single Message instance.

It may be found in the threadsafe directory and is documented here.

3.10 Synchronising to hardware

The following hardware-related classes are documented here:

  • ESwitch A debounced switch with an Event interface.
  • Switch A debounced switch which can trigger open and close user callbacks.
  • EButton Debounced pushbutton with Event instances for pressed, released, long press or double-press.
  • Pushbutton Debounced pushbutton with callbacks for pressed, released, long press or double-press.
  • ESP32Touch Extends Pushbutton class to support ESP32 touchpads.
  • Keyboard Interface a crosspoint array of buttons e.g. keypads.
  • SwArray Interface a crosspoint array of pushbuttons or switches.
  • Encoder An asynchronous interface for control knobs with switch contacts configured as a quadrature encoder.
  • AADC Asynchronous ADC. A task can pause until the value read from an ADC goes outside defined bounds. Bounds can be absolute or relative to the current value.

4 Designing classes for asyncio

In the context of device drivers, the aim is to ensure nonblocking operation. The design should ensure that other tasks get scheduled in periods while the driver is waiting for the hardware. For example, a task awaiting data arriving on a UART or a user pressing a button should allow other tasks to be scheduled until the event occurs.

4.1 Awaitable classes

A task can pause execution by waiting on an awaitable object. There is a difference between CPython and MicroPython in the way an awaitable class is defined: see Portable code for a way to write a portable class. This section describes a simpler MicroPython specific solution.

In the following code sample, the __iter__ special method runs for a period. The calling coro blocks, but other coros continue to run. The key point is that __iter__ uses yield from to yield execution to another coro, blocking until it has completed.

import asyncio

class Foo():
    def __iter__(self):
        for n in range(5):
            print('__iter__ called')
            yield from asyncio.sleep(1) # Other tasks get scheduled here
        return 42

async def bar():
    foo = Foo()  # Foo is an awaitable class
    print('waiting for foo')
    res = await foo  # Retrieve result
    print('done', res)

asyncio.run(bar())

4.1.1 Use in context managers

Awaitable objects can be used in synchronous or asynchronous CM's by providing the necessary special methods. The syntax is:

with await awaitable as a:  # The 'as' clause is optional
    # code omitted
async with awaitable as a:  # Asynchronous CM (see below)
    # do something

To achieve this, the __await__ generator should return self. This is passed to any variable in an as clause and also enables the special methods to work.

4.1.2 Portable code

The Python language requires that __await__ is a generator function. In MicroPython generators and tasks are identical, so the solution is to use yield from task(args).

This tutorial aims to offer code portable to CPython 3.8 or above. In CPython tasks and generators are distinct. CPython tasks have an __await__ special method which retrieves a generator. This is portable and was tested under CPython 3.8:

import sys
up = sys.implementation.name == "micropython"
import asyncio

async def times_two(n):  # Coro to await
    await asyncio.sleep(1)
    return 2 * n

class Foo():
    def __await__(self):
        res = 1
        for n in range(5):
            print('__await__ called')
            if up:  # MicroPython
                res = yield from times_two(res)
            else:  # CPython
                res = yield from times_two(res).__await__()
        return res

    __iter__ = __await__

async def bar():
    foo = Foo()  # foo is awaitable
    print('waiting for foo')
    res = await foo  # Retrieve value
    print('done', res)

asyncio.run(bar())

In __await__, yield from asyncio.sleep(1) was allowed in CPython 3.6. In V3.8 it produces a syntax error. It must now be put in the task as in the above example.

4.2 Asynchronous iterators

These provide a means of returning a finite or infinite sequence of values and could be used as a means of retrieving successive data items as they arrive from a read-only device. An asynchronous iterable calls asynchronous code in its next method. The class must conform to the following requirements:

  • It has an __aiter__ method returning the asynchronous iterator.
  • It has an __anext__ method which is a task - i.e. defined with async def and containing at least one await statement. To stop the iteration, it must raise a StopAsyncIteration exception.

Successive values are retrieved with async for as below:

import asyncio
class AsyncIterable:
    def __init__(self):
        self.data = (1, 2, 3, 4, 5)
        self.index = 0

    def __aiter__(self):  # See note below
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        await asyncio.sleep(0.1)  # Other tasks get to run
        if self.index >= len(self.data):
            return None
        x = self.data[self.index]
        self.index += 1
        return x

async def run():
    ai = AsyncIterable()
    async for x in ai:
        print(x)
asyncio.run(run())

The __aiter__ method was formerly an asynchronous method. CPython 3.6 accepts synchronous or asynchronous methods. CPython 3.8 and MicroPython require synchronous code ref.

Asynchronous comprehensions PEP530, supported in CPython 3.6, are not yet supported in MicroPython.

4.3 Asynchronous context managers

Classes can be designed to support asynchronous context managers. These are CM's having enter and exit procedures which are tasks. An example is the Lock class. Such a class has an __aenter__ task which is logically required to run asynchronously. To support the asynchronous CM protocol its __aexit__ method also must be a task. Such classes are accessed from within a task with the following syntax:

async def bar(lock):
    async with lock as obj:  # "as" clause is optional, no real point for a lock
        print('In context manager')

As with normal context managers an exit method is guaranteed to be called when the context manager terminates, whether normally or via an exception. To achieve this, the special methods __aenter__ and __aexit__ must be defined, both being tasks waiting on a task or awaitable object. This example comes from the Lock class:

    async def __aenter__(self):
        await self.acquire()  # a coro defined with async def
        return self

    async def __aexit__(self, *args):
        self.release()  # A synchronous method

If the async with has an as variable clause the variable receives the value returned by __aenter__. The following is a complete example:

import asyncio

class Foo:
    def __init__(self):
        self.data = 0

    async def acquire(self):
        await asyncio.sleep(1)
        return 42

    async def __aenter__(self):
        print('Waiting for data')
        self.data = await self.acquire()
        return self

    def close(self):
        print('Exit')

    async def __aexit__(self, *args):
        print('Waiting to quit')
        await asyncio.sleep(1)  # Can run asynchronous
        self.close()  # or synchronous methods

async def bar():
    foo = Foo()
    async with foo as f:
        print('In context manager')
        res = f.data
    print('Done', res)

asyncio.run(bar())

4.4 Object scope

If an object launches a task and that object goes out of scope, the task will continue to be scheduled. The task will run to completion or until cancelled. If this is undesirable consider writing a deinit method to cancel associated running tasks. Applications can call deinit, for example in a try...finally block or in a context manager.

5 Exceptions timeouts and cancellation

These topics are related: asyncio enables the cancellation of tasks, and the application of a timeout to a task, by throwing an exception to the task.

5.1 Exceptions

Consider a task foo created with asyncio.create_task(foo()). This task might await other tasks, with potential nesting. If an exception occurs, it will propagate up the chain until it reaches foo. This behaviour is as per function calls: the exception propagates up the call chain until trapped. If the exception is not trapped, the foo task stops with a traceback. Crucially other tasks continue to run.

This does not apply to the main task started with asyncio.run. If an exception propagates to that task, the scheduler will stop. This can be demonstrated as follows:

import asyncio

async def bar():
    await asyncio.sleep(0)
    1/0  # Crash

async def foo():
    await asyncio.sleep(0)
    print('Running bar')
    await bar()
    print('Does not print')  # Because bar() raised an exception

async def main():
    task = asyncio.create_task(foo())
    for _ in range(5):
        print('Working')  # Carries on after the exception
        await asyncio.sleep(0.5)
    1/0  # Stops the scheduler
    await asyncio.sleep(0)
    print('This never happens')
    await asyncio.sleep(0)

asyncio.run(main())

If main issued await foo() rather than create_task(foo()) the exception would propagate to main. Being untrapped, the scheduler, and hence the script, would stop.

Warning

Using throw or close to throw an exception to a task is unwise. It subverts asyncio by forcing the task to run, and possibly terminate, when it is still queued for execution.

5.1.1 Global exception handler

During development, it is often best if untrapped exceptions stop the program rather than merely halting a single task. This can be achieved by setting a global exception handler. This debug aid is not CPython compatible:

import asyncio
import sys

def _handle_exception(loop, context):
    print('Global handler')
    sys.print_exception(context["exception"])
    #loop.stop()
    sys.exit()  # Drastic - loop.stop() does not work when used this way

async def bar():
    await asyncio.sleep(0)
    1/0  # Crash

async def main():
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(_handle_exception)
    task = asyncio.create_task(bar())
    for _ in range(5):
        print('Working')
        await asyncio.sleep(0.5)

asyncio.run(main())

5.1.2 Keyboard interrupts

There is a "gotcha" illustrated by the following code sample. If allowed to run to completion, it works as expected.

import asyncio
async def foo():
    await asyncio.sleep(3)
    print('About to throw exception.')
    1/0

async def bar():
    try:
        await foo()
    except ZeroDivisionError:
        print('foo was interrupted by zero division')  # Happens
        raise  # Force shutdown to run by propagating to loop.
    except KeyboardInterrupt:
        print('foo was interrupted by ctrl-c')  # NEVER HAPPENS
        raise

async def shutdown():
    print('Shutdown is running.')  # Happens in both cases
    await asyncio.sleep(1)
    print('done')

try:
    asyncio.run(bar())
except ZeroDivisionError:
    asyncio.run(shutdown())
except KeyboardInterrupt:
    print('Keyboard interrupt at loop level.')
    asyncio.run(shutdown())

However, issuing a keyboard interrupt causes the exception to go to the outermost scope. This is because asyncio.sleep causes execution to be transferred to the scheduler. Consequently, applications requiring cleanup code in response to a keyboard interrupt should trap the exception at the outermost scope.

5.2 Cancellation and Timeouts

Cancellation and timeouts work by throwing an exception to the task. This is unlike a normal exception. If a task cancels another, the running task continues to execute until it yields to the scheduler. Task cancellation occurs at that point, whether or not the cancelled task is scheduled for execution: a task waiting on (say) an Event or a sleep will be cancelled.

For tasks launched with .create_task the exception is transparent to the user: the task simply stops as described above. It is possible to trap the exception, for example to perform cleanup code, typically in a finally clause. The exception thrown to the task is asyncio.CancelledError in both cancellation and timeout. There is no way for the task to distinguish between these two cases.

As stated above, for a task launched with .create_task, trapping the error is optional. Where a task is awaited, to avoid a halt it must be trapped within the task, within the awaiting scope, or both. In the last case, the task must re-raise the exception after trapping so that the error can again be trapped in the outer scope.

5.2.1 Task cancellation

The Task class has a cancel method. This throws a CancelledError to the task. This works with nested tasks. Usage is as follows:

import asyncio
async def printit():
    print('Got here')
    await asyncio.sleep(1)

async def foo():
    while True:
        await printit()
        print('In foo')

async def bar():
    foo_task = asyncio.create_task(foo())  # Create task from task
    await asyncio.sleep(4)  # Show it running
    foo_task.cancel()
    await asyncio.sleep(0)
    print('foo is now cancelled.')
    await asyncio.sleep(4)  # Proof!

asyncio.run(bar())

The exception may be trapped as follows:

import asyncio
async def printit():
    print('Got here')
    await asyncio.sleep(1)

async def foo():
    try:
        while True:
            await printit()
    except asyncio.CancelledError:
        print('Trapped cancelled error.')
        raise  # Enable check in outer scope
    finally:  # Usual way to do cleanup
        print('Cancelled - finally')

async def bar():
    foo_task = asyncio.create_task(foo())
    await asyncio.sleep(4)
    foo_task.cancel()
    await asyncio.sleep(0)
    print('Task is now cancelled')
asyncio.run(bar())

As of firmware V1.18, the current_task() method is supported. This enables a task to pass itself to other tasks, enabling them to cancel it. It also facilitates the following pattern:

class Foo:
    async def run(self):
        self.task = asyncio.current_task()
        # code omitted

    def cancel(self):
        self.task.cancel()

5.2.2 Tasks with timeouts

Timeouts are implemented by means of asyncio methods .wait_for() and .wait_for_ms(). These take as arguments a task and a timeout in seconds or ms respectively. If the timeout expires, a asyncio.CancelledError is thrown to the task, while the caller receives a TimeoutError. Trapping the exception in the task is optional. The caller must trap the TimeoutError, otherwise the exception will interrupt program execution.

import asyncio

async def forever():
    try:
        print('Starting')
        while True:
            await asyncio.sleep_ms(300)
            print('Got here')
    except asyncio.CancelledError:  # Task sees CancelledError
        print('Trapped cancelled error.')
        raise
    finally:  # Usual way to do cleanup
        print('forever timed out')

async def foo():
    try:
        await asyncio.wait_for(forever(), 3)
    except asyncio.TimeoutError:  # Mandatory error trapping
        print('foo got timeout')  # Caller sees TimeoutError
    await asyncio.sleep(2)

asyncio.run(foo())

5.2.3 Cancelling running tasks

This useful technique can provoke counter intuitive behaviour. Consider a task foo created using create_task. Then tasks bar, cancel_me (and possibly others) are created with code like:

async def bar():
    await foo
    # more code

All will pause waiting for foo to terminate. If any one of the waiting tasks is cancelled, the cancellation will propagate to foo. This would be expected behaviour if foo were a coro. The fact that it is a running task means that the cancellation impacts the tasks waiting on it; it actually causes their cancellation. Again, if foo were a coro and a task or coro was waiting on it, cancelling foo would be expected to propagate to the caller. In the context of running tasks, this may be unwelcome.

The behaviour is "correct": CPython asyncio behaves identically. Ref this forum thread.

6 Interfacing hardware

At heart, all interfaces between asyncio and external asynchronous events rely on polling. This is because of the cooperative nature of asyncio scheduling: the task which is expected to respond to the event can only acquire control after another task has relinquished it. There are two ways to handle this.

  • Implicit polling: when a task yields and the scheduler acquires control, the scheduler checks for an event. If it has occurred it schedules a waiting task. This is the approach used by ThreadSafeFlag.
  • Explicit polling: a user task does busy-wait polling on the hardware.

At its simplest, explicit polling may consist of code like this:

async def poll_my_device():
    global my_flag  # Set by device ISR
    while True:
        if my_flag:
            my_flag = False
            # service the device
        await asyncio.sleep(0)

In place of a global, an instance variable or an instance of an awaitable class might be used. Explicit polling is discussed further below.

Implicit polling is more efficient and may gain further from planned improvements to I/O scheduling. Aside from the use of ThreadSafeFlag, it is possible to write code which uses the same technique. This is by designing the driver to behave like a stream I/O device such as a socket or UART, using stream I/O. This polls devices using Python's select.poll system: because polling is done in C it is faster and more efficient than explicit polling. The use of stream I/O is discussed here.

Owing to its efficiency, implicit polling most benefits fast I/O device drivers: streaming drivers can be written for many devices not normally considered as streaming devices section 6.4.

There are hazards involved with approaches to interfacing ISR's which appear to avoid polling. It is invalid to issue create_task or to trigger an Event in an ISR as these can cause a race condition in the scheduler.

6.1 Timing issues

Both explicit and implicit polling are currently based on round-robin scheduling. Assume I/O is operating concurrently with N user tasks each of which yields with a zero delay. When I/O has been serviced it will next be polled once all user tasks have been scheduled. The implied latency needs to be considered in the design. I/O channels may require buffering, with an ISR servicing the hardware in real time from buffers and tasks filling or emptying the buffers in slower time.

The possibility of overrun also needs to be considered: this is the case where something being polled by a task occurs more than once before the task is actually scheduled.

Another timing issue is the accuracy of delays. If a task issues

    await asyncio.sleep_ms(t)
    # next line

the scheduler guarantees that execution will pause for at least tms. The actual delay may be greater depending on the system state when t expires. If, at that time, all other tasks are waiting on nonzero delays, the next line will immediately be scheduled. But if other tasks are pending execution (either because they issued a zero delay or because their time has also elapsed) they may be scheduled first. This introduces a timing uncertainty into the sleep() and sleep_ms() functions. The worst-case value for this overrun may be calculated by summing, for every other task, the worst-case execution time between yielding to the scheduler.

6.2 Polling hardware with a task

This is a simple approach, but is most appropriate to hardware which may be polled at a relatively low rate. This is primarily because polling with a short (or zero) polling interval may cause the task to consume more processor time than is desirable.

The example apoll.py demonstrates this approach by polling the Pyboard accelerometer at 100ms intervals. It performs some simple filtering to ignore noisy samples and prints a message every two seconds if the board is not moved.

Further examples may be found in the primitives directory, notably switch.py and pushbutton.py: drivers for switch and pushbutton devices.

An example of a driver for a device capable of reading and writing is shown below. For ease of testing Pyboard UART 4 emulates the notional device. The driver implements a RecordOrientedUart class, where data is supplied in variable length records consisting of bytes instances. The object appends a delimiter before sending and buffers incoming data until the delimiter is received. This is a demo and is an inefficient way to use a UART compared to stream I/O.

For the purpose of demonstrating asynchronous transmission we assume the device being emulated has a means of checking that transmission is complete and that the application requires that we wait on this. Neither assumption is true in this example but the code fakes it with await asyncio.sleep(0.1).

Link pins X1 and X2 to run.

import asyncio
from pyb import UART

class RecordOrientedUart():
    DELIMITER = b'\0'
    def __init__(self):
        self.uart = UART(4, 9600)
        self.data = b''

    def __iter__(self):  # Not __await__ issue #2678
        data = b''
        while not data.endswith(self.DELIMITER):
            yield from asyncio.sleep(0) # Necessary because:
            while not self.uart.any():
                yield from asyncio.sleep(0) # timing may mean this is never called
            data = b''.join((data, self.uart.read(self.uart.any())))
        self.data = data

    async def send_record(self, data):
        data = b''.join((data, self.DELIMITER))
        self.uart.write(data)
        await self._send_complete()

    # In a real device driver we would poll the hardware
    # for completion in a loop with await asyncio.sleep(0)
    async def _send_complete(self):
        await asyncio.sleep(0.1)

    def read_record(self):  # Synchronous: await the object before calling
        return self.data[0:-1] # Discard delimiter

async def run():
    foo = RecordOrientedUart()
    rx_data = b''
    await foo.send_record(b'A line of text.')
    for _ in range(20):
        await foo  # Other tasks are scheduled while we wait
        rx_data = foo.read_record()
        print('Got: {}'.format(rx_data))
        await foo.send_record(rx_data)
        rx_data = b''

asyncio.run(run())

6.3 Using the stream mechanism

A stream is an abstraction of a device interface which consists of a realtime source of bytes. Examples include UARTs, I2S devices and sockets. Many streams are continuous: an I2S microphone will source data until switched off and the interface is closed. Streams are supported by asyncio.StreamReader and asyncio.StreamWriter classes.

This section applies to platforms other than the Unix build. The latter handles stream I/O in a different way described here. Code samples may not run under the Unix build until it is made more compatible with other platforms.

The stream mechanism can be illustrated using a Pyboard UART. This code sample demonstrates concurrent I/O on one UART. To run, link Pyboard pins X1 and X2 (UART Txd and Rxd).

import asyncio
from machine import UART
uart = UART(4, 9600, timeout=0)  # timeout=0 prevents blocking at low baudrates

async def sender():
    swriter = asyncio.StreamWriter(uart, {})
    while True:
        swriter.write('Hello uart\n')
        await swriter.drain()  # Transmission starts now.
        await asyncio.sleep(2)

async def receiver():
    sreader = asyncio.StreamReader(uart)
    while True:
        res = await sreader.readline()
        print('Received', res)

async def main():
    rx = asyncio.create_task(receiver())
    tx = asyncio.create_task(sender())
    await asyncio.sleep(10)
    print('Quitting')
    tx.cancel()
    rx.cancel()
    await asyncio.sleep(1)
    print('Done')

asyncio.run(main())

The .readline method will pause until \n is received.

StreamWriter write methods

Writing to a StreamWriter occurs in two stages. The synchronous .write method concatenates data for later transmission. The asynchronous .drain causes transmission. To avoid allocation call .drain after each call to .write. If multiple tasks are to write to the same StreamWriter, the best solution is to implement a shared Queue. Each task writes to the Queue and a single task waits on it, issuing .write and .drain whenever data is queued. Do not have multiple tasks calling .drain concurrently: this can result in data corruption for reasons detailed here.

The mechanism works because the device driver (written in C) implements the following methods: ioctl, read, readline and write. See Writing streaming device drivers for details on how such drivers may be written in Python.

StreamReader read methods

The StreamReader read methods fall into two categories depending on whether they wait for a specific end condition. Thus .readline pauses until a newline byte has been received, .read(-1) waits for EOF, and readexactly waits for a precise number of bytes. Other methods return the number of bytes available at the time they are called (upto a maximum). Consider the following fragment:

async def foo(device):
    sr = StreamReader(device)
    data = sr.read(20)

When read is issued, task foo is descheduled. Other tasks are scheduled, resulting in a delay. During that period, depending on the stream source, bytes may be received. The hardware or the device driver may buffer the data, at some point flagging their availability. When the concurrent tasks permit, asyncio polls the device. If data is available foo is rescheduled and pending data is returned. It should be evident that the number of bytes returned and the duration of the pause are variable.

There are also implications for application and device driver design: in the period while the task is descheduled, incoming data must be buffered to avoid data loss. For example in the case of a UART an interrupt service routine buffers incoming characters. To avoid data loss the size of the read buffer should be set based on the maximum latency caused by other tasks along with the baudrate. The buffer size can be reduced if hardware flow control is available.

StreamReader read timeout

It is possible to apply a timeout to a stream. One approach is to subclass StreamReader as follows:

class StreamReaderTo(asyncio.StreamReader):
    def __init__(self, source):
        super().__init__(source)
        self._delay_ms = Delay_ms()  # Allocate once only

    # Task cancels itself if timeout elapses without a byte being received
    async def readintotim(self, buf: bytearray, toms: int) -> int:  # toms: timeout in ms
        mvb = memoryview(buf)
        timer = self._delay_ms
        timer.callback(asyncio.current_task().cancel)
        timer.trigger(toms)  # Start cancellation timer
        n = 0
        nbytes = len(buf)
        try:
            while n < nbytes:
                n += await super().readinto(mvb[n:])
                timer.trigger(toms)  # Retrigger when bytes received
        except asyncio.CancelledError:
            pass
        timer.stop()
        return n

This adds a .readintotim asynchronous method. Like .readinto it reads into a supplied buffer but the read is subject to a timeout to in ms. The read pauses until either the buffer is full or until bytes stop arriving for a time longer than to. The method returns the number of bytes received. If fewer bytes were received than would fill the buffer, a timeout occurred. The script stream_to.py demonstrates this.

6.3.1 A UART driver example

The program auart_hd.py illustrates a method of communicating with a half duplex device such as one responding to the modem 'AT' command set. Half duplex means that the device never sends unsolicited data: its transmissions are always in response to a command from the master.

The device is emulated, enabling the test to be run on a Pyboard with two wire links.

The (highly simplified) emulated device responds to any command by sending four lines of data with a pause between each, to simulate slow processing.

The master sends a command, but does not know in advance how many lines of data will be returned. It starts a retriggerable timer, which is retriggered each time a line is received. When the timer times out it is assumed that the device has completed transmission, and a list of received lines is returned.

The case of device failure is also demonstrated. This is done by omitting the transmission before awaiting a response. After the timeout an empty list is returned. See the code comments for more details.

6.4 Writing streaming device drivers

The stream I/O mechanism is provided to support I/O to stream devices. Its typical use is to support streaming I/O devices such as UARTs and sockets. The mechanism may be employed by drivers of any device which needs to be polled: the polling is delegated to the scheduler which uses select to schedule the handlers for any devices which are ready. This is more efficient than running multiple tasks each polling a device, partly because select is written in C but also because the task performing the polling is descheduled until the poll object returns a ready status.

A device driver capable of employing the stream I/O mechanism may support StreamReader, StreamWriter instances or both. A readable device must provide at least one of the following methods. Note that these are synchronous methods. The ioctl method (see below) ensures that they are only called if data is available. The methods should return as fast as possible with as much data as is available.

readline() Return as many characters as are available up to and including any newline character. Required if you intend to use StreamReader.readline(). It should return a maximum of one line.
read(n) Return as many characters as are available but no more than n. Required to use StreamReader.read() or StreamReader.readexactly()

A writeable driver must provide this synchronous method:
write Arg buf: the buffer to write. This can be a memoryview.
It should return immediately. The return value is the number of characters actually written (may well be 1 if the device is slow). The ioctl method ensures that this is only called if the device is ready to accept data.

Note that this has changed relative to asyncio V2. Formerly write had two additional mandatory args. Existing code will fail because Stream.drain calls write with a single arg (which can be a memoryview).

All devices must provide an ioctl method which polls the hardware to determine its ready status. A typical example for a read/write driver is:

import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)

class MyIO(io.IOBase):
    # Methods omitted
    def ioctl(self, req, arg):  # see ports/stm32/uart.c
        ret = MP_STREAM_ERROR
        if req == MP_STREAM_POLL:
            ret = 0
            if arg & MP_STREAM_POLL_RD:
                if hardware_has_at_least_one_char_to_read:
                    ret |= MP_STREAM_POLL_RD
            if arg & MP_STREAM_POLL_WR:
                if hardware_can_accept_at_least_one_write_character:
                    ret |= MP_STREAM_POLL_WR
        return ret

The following is a complete awaitable delay class.

import asyncio
import utime
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)

class MillisecTimer(io.IOBase):
    def __init__(self):
        self.end = 0
        self.sreader = asyncio.StreamReader(self)

    def __iter__(self):
        await self.sreader.read(1)

    def __call__(self, ms):
        self.end = utime.ticks_add(utime.ticks_ms(), ms)
        return self

    def read(self, _):
        return "a"

    def ioctl(self, req, arg):
        ret = MP_STREAM_ERROR
        if req == MP_STREAM_POLL:
            ret = 0
            if arg & MP_STREAM_POLL_RD:
                if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0:
                    ret |= MP_STREAM_POLL_RD
        return ret

async def timer_test(n):
    timer = MillisecTimer()
    for x in range(n):
        await timer(100)  # Pause 100ms
        print(x)

asyncio.run(timer_test(20))

This currently confers no benefit over await asyncio.sleep_ms(), however if asyncio implements fast I/O scheduling it will be capable of more precise timing. This is because I/O will be tested on every scheduler call. Currently it is polled once per complete pass, i.e. when all other pending tasks have run in round-robin fashion.

It is possible to use I/O scheduling to associate an event with a callback. This is more efficient than a polling loop because the task doing the polling is descheduled until ioctl returns a ready status. The following runs a callback when a pin changes state.

import asyncio
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)

class PinCall(io.IOBase):
    def __init__(self, pin, *, cb_rise=None, cbr_args=(), cb_fall=None, cbf_args=()):
        self.pin = pin
        self.cb_rise = cb_rise
        self.cbr_args = cbr_args
        self.cb_fall = cb_fall
        self.cbf_args = cbf_args
        self.pinval = pin.value()
        self.sreader = asyncio.StreamReader(self)
        self.task = asyncio.create_task(self.run())

    async def run(self):
        while True:
            await self.sreader.read(1)

    def read(self, _):
        v = self.pinval
        if v and self.cb_rise is not None:
            self.cb_rise(*self.cbr_args)
            return
        if not v and self.cb_fall is not None:
            self.cb_fall(*self.cbf_args)

    def ioctl(self, req, arg):
        ret = MP_STREAM_ERROR
        if req == MP_STREAM_POLL:
            ret = 0
            if arg & MP_STREAM_POLL_RD:
                v = self.pin.value()
                if v != self.pinval:
                    self.pinval = v
                    ret = MP_STREAM_POLL_RD
        return ret

Once again latency can be high: if implemented fast I/O scheduling will improve this.

The demo program iorw.py illustrates a complete example.

6.5 A complete example: aremote.py

See aremote.py documented here. This is a complete device driver: a receiver/decoder for an infra red remote controller. The following notes are salient points regarding its asyncio usage.

A pin interrupt records the time of a state change (in μs) and sends a Message, passing the time when the first state change occurred. A task waits on the Message, yields for the duration of a data burst, then decodes the stored data before calling a user-specified callback.

Passing the time to the Message instance enables the task to compensate for any asyncio latency when setting its delay period.

6.6 HTU21D environment sensor

This chip provides accurate measurements of temperature and humidity. The driver is documented here. It has a continuously running task which updates temperature and humidity bound variables which may be accessed "instantly".

The chip takes on the order of 120ms to acquire both data items. The driver works asynchronously by triggering the acquisition and using await asyncio.sleep(t) prior to reading the data. This allows other tasks to run while acquisition is in progress.

import as_drivers.htu21d.htu_test

7 Hints and tips

7.1 Program hangs

Hanging usually occurs because a task has blocked without yielding: this will hang the entire system. When developing, it is useful to have a task which periodically toggles an onboard LED. This provides confirmation that the scheduler is running.

7.2 asyncio retains state

If a asyncio application terminates, the state is retained. Embedded code seldom terminates, but in testing, it is useful to re-run a script without the need for a soft reset. This may be done as follows:

import asyncio

async def main():
    await asyncio.sleep(5)  # Dummy test script

def test():
    try:
        asyncio.run(main())
    except KeyboardInterrupt:  # Trapping this is optional
        print('Interrupted')  # or pass
    finally:
        asyncio.new_event_loop()  # Clear retained state

It should be noted that clearing retained state is not a panacea. Re-running complex applications may require the state to be retained.

7.3 Garbage Collection

You may want to consider running a task which issues:

    gc.collect()
    gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())

This assumes import gc has been issued. The purpose of this is discussed here.

7.4 Testing

It's advisable to test that a device driver yields control when you intend it to. This can be done by running one or more instances of a dummy task which runs a loop printing a message, and checking that it runs in the periods when the driver is blocking:

async def rr(n):
    while True:
        print('Roundrobin ', n)
        await asyncio.sleep(0)

As an example of the type of hazard which can occur, in the RecordOrientedUart example above, the __await__ method was originally written as:

    def __await__(self):
        data = b''
        while not data.endswith(self.DELIMITER):
            while not self.uart.any():
                yield from asyncio.sleep(0)
            data = b''.join((data, self.uart.read(self.uart.any())))
        self.data = data

In testing, this hogged execution until an entire record was received. This was because uart.any() always returned a nonzero quantity. By the time it was called, characters had been received. The solution was to yield execution in the outer loop:

    def __await__(self):
        data = b''
        while not data.endswith(self.DELIMITER):
            yield from asyncio.sleep(0) # Necessary because:
            while not self.uart.any():
                yield from asyncio.sleep(0) # timing may mean this is never called
            data = b''.join((data, self.uart.read(self.uart.any())))
        self.data = data

It is perhaps worth noting that this error would not have been apparent had data been sent to the UART at a slow rate rather than via a loopback test. Welcome to the joys of realtime programming.

7.5 A common error

If a function or method is defined with async def and subsequently called as if it were a regular (synchronous) callable, MicroPython does not issue an error message. This is by design. A coro instance is created and discarded, typically leading to a program silently failing to run correctly:

import asyncio
async def foo():
    await asyncio.sleep(1)
    print('done')

async def main():
    foo()  # Should read: await foo

asyncio.run(main())

7.6 Socket programming

There are two basic approaches to socket programming under asyncio. By default sockets block until a specified read or write operation completes. asyncio supports blocking sockets by using select.poll to prevent them from blocking the scheduler. In most cases it is simplest to use this mechanism. Note that the asyncio stream mechanism employs it. Example client and server code may be found in the client_server directory.

Note that socket.getaddrinfo currently blocks. The time will be minimal in the example code but if a DNS lookup is required the blocking period could be substantial.

The second approach to socket programming is to use nonblocking sockets. This adds complexity but is necessary in some applications, notably where connectivity is via WiFi (see below).

Support for TLS on nonblocking sockets is platform dependent. It works on ESP32, Pyboard D and ESP8266.

The use of nonblocking sockets requires some attention to detail. If a nonblocking read is performed, because of server latency, there is no guarantee that all (or any) of the requested data is returned. Likewise writes may not proceed to completion.

Hence asynchronous read and write methods need to iteratively perform the nonblocking operation until the required data has been read or written. In practice a timeout is likely to be required to cope with server outages.

7.6.1 WiFi issues

The asyncio stream mechanism is not good at detecting WiFi outages. I have found it necessary to use nonblocking sockets to achieve resilient operation and client reconnection in the presence of outages.

This doc describes issues I encountered in WiFi applications which keep sockets open for long periods, and outlines a solution.

This repo offers a resilent asynchronous MQTT client which ensures message integrity over WiFi outages. This repo provides a simple asynchronous full-duplex serial channel between a wirelessly connected client and a wired server with guaranteed message delivery.

7.7 CPython compatibility and the event loop

The samples in this tutorial are compatible with CPython 3.8. If you need compatibility with versions 3.5 or above, the asyncio.run() method is absent. Replace:

asyncio.run(my_task())

with:

loop = asyncio.get_event_loop()
loop.run_until_complete(my_task())

The create_task method is a member of the event_loop instance. Replace

asyncio.create_task(my_task())

with

loop = asyncio.get_event_loop()
loop.create_task(my_task())

Event loop methods are supported in asyncio and in CPython 3.8 but are deprecated. To quote from the official docs:

Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior (reference).

This doc offers better alternatives to get_event_loop if you can confine support to CPython V3.8+.

There is an event loop method run_forever which takes no args and causes the event loop to run. This is supported by asyncio. This has use cases, notably when all of an application's tasks are instantiated in other modules.

7.8 Race conditions

These occur when coroutines compete for access to a resource, each using the resource in a mutually incompatible manner.

This behaviour can be demonstrated by running the switch test. In test_sw() coroutines are scheduled by events. If the switch is cycled rapidly the LED behaviour may seem surprising. This is because each time the switch is closed, a coro is launched to flash the red LED, and on each open event, a coro is launched for the green LED. With rapid cycling a new coro instance will commence while one is still running against the same LED. This race condition leads to the LED behaving erratically.

This is a hazard of asynchronous programming. In some situations, it is desirable to launch a new instance on each button press or switch closure, even if other instances are still incomplete. In other cases it can lead to a race condition, leading to the need to code an interlock to ensure that the desired behaviour occurs. The programmer must define the desired behaviour.

In the case of this test program it might be to ignore events while a similar one is running, or to extend the timer to prolong the LED illumination. Alternatively a subsequent button press might be required to terminate the illumination. The "right" behaviour is application dependent.

7.9 Undocumented asyncio features

These may be subject to change.

A Task instance has a .done() method that returns True if the task has terminated (by running to completion, by throwing an exception or by being cancelled).

If a task has completed, a .data bound variable holds any result which was returned by the task. If the task throws an exception or is cancelled .data holds the exception (or CancelledError).

8 Notes for beginners

These notes are intended for those new to asynchronous code. They start by outlining the problems which schedulers seek to solve, and give an overview of the asyncio approach to a solution.

Section 8.5 discusses the relative merits of asyncio and the _thread module and why you may prefer to use cooperative (asyncio) over pre-emptive (_thread) scheduling.

8.1 Problem 1: event loops

A typical firmware application runs continuously and is required to respond to external events. These might include a voltage change on an ADC, the arrival of a hard interrupt, a character arriving on a UART, or data being available on a socket. These events occur asynchronously and the code must be able to respond regardless of the order in which they occur. Further the application may be required to perform time-dependent tasks such as flashing LED's.

The obvious way to do this is with an event loop. The following is not practical code but serves to illustrate the general form of an event loop.

def event_loop():
    led_1_time = 0
    led_2_time = 0
    switch_state = switch.state()  # Current state of a switch
    while True:
        time_now = utime.time()
        if time_now >= led_1_time:  # Flash LED #1
            led1.toggle()
            led_1_time = time_now + led_1_period
        if time_now >= led_2_time:  # Flash LED #2
            led2.toggle()
            led_2_time = time_now + led_2_period
        # Handle LEDs 3 upwards

        if switch.value() != switch_state:
            switch_state = switch.value()
            # do something
        if uart.any():
            # handle UART input

This works for simple examples but event loops rapidly become unwieldy as the number of events increases. They also violate the principles of object oriented programming by lumping much of the program logic in one place rather than associating code with the object being controlled. We want to design a class for an LED capable of flashing which could be put in a module and imported. An OOP approach to flashing an LED might look like this:

import pyb
class LED_flashable():
    def __init__(self, led_no):
        self.led = pyb.LED(led_no)

    def flash(self, period):
        while True:
            self.led.toggle()
            # somehow wait for period but allow other
            # things to happen at the same time

A cooperative scheduler such as asyncio enables classes such as this to be created.

8.2 Problem 2: blocking methods

Assume you need to read a number of bytes from a socket. If you call socket.read(n) with a default blocking socket it will "block" (i.e. fail to return) until n bytes have been received. During this period the application will be unresponsive to other events.

With asyncio and a non-blocking socket you can write an asynchronous read method. The task requiring the data will (necessarily) block until it is received but during that period other tasks will be scheduled enabling the application to remain responsive.

8.3 The asyncio approach

The following class provides for an LED which can be turned on and off, and which can also be made to flash at an arbitrary rate. A LED_async instance has a run method which can be considered to run continuously. The LED's behaviour can be controlled by methods on(), off() and flash(secs).

import pyb
import asyncio

class LED_async():
    def __init__(self, led_no):
        self.led = pyb.LED(led_no)
        self.rate = 0
        self.task = asyncio.create_task(self.run())

    async def run(self):
        while True:
            if self.rate <= 0:
                await asyncio.sleep_ms(200)
            else:
                self.led.toggle()
                await asyncio.sleep_ms(int(500 / self.rate))

    def flash(self, rate):
        self.rate = rate

    def on(self):
        self.led.on()
        self.rate = 0

    def off(self):
        self.led.off()
        self.rate = 0

Note that on(), off() and flash() are conventional synchronous methods. They change the behaviour of the LED but return immediately. The flashing occurs "in the background". This is explained in detail in the next section.

The class conforms with the OOP principle of keeping the logic associated with the device within the class. Further, the way asyncio works ensures that while the LED is flashing the application can respond to other events. The example below flashes the four Pyboard LED's at different rates while also responding to the USR button which terminates the program.

import pyb
import asyncio
from led_async import LED_async  # Class as listed above

async def main():
    leds = [LED_async(n) for n in range(1, 4)]
    for n, led in enumerate(leds):
        led.flash(0.7 + n/4)
    sw = pyb.Switch()
    while not sw.value():
        await asyncio.sleep_ms(100)

asyncio.run(main())

In contrast to the event loop example the logic associated with the switch is in a function separate from the LED functionality. Note the code used to start the scheduler:

asyncio.run(main())  # Execution passes to tasks.
 # It only continues here once main() terminates, when the
 # scheduler has stopped.

8.4 Scheduling in asyncio

Python 3.5 and MicroPython support the notion of an asynchronous function, known as a task. A task normally includes at least one await statement.

async def hello():
    for _ in range(10):
        print('Hello world.')
        await asyncio.sleep(1)

This function prints the message ten times at one second intervals. While the function is paused pending the time delay asyncio will schedule other tasks, providing an illusion of concurrency.

When a task issues await asyncio.sleep_ms() or await asyncio.sleep() the current task pauses: it is placed on a queue which is ordered on time due, and execution passes to the task at the top of the queue. The queue is designed so that even if the specified sleep is zero other due tasks will run before the current one is resumed. This is "fair round-robin" scheduling. It is common practice to issue await asyncio.sleep(0) in loops to ensure a task doesn't hog execution. The following shows a busy-wait loop which waits for another task to set the global flag. Alas it monopolises the CPU preventing other tasks from running:

async def bad_code():
    global flag
    while not flag:
        pass
    flag = False
    # code omitted

The problem here is that while the flag is False the loop never yields to the scheduler so no other task will get to run. The correct approach is:

async def good_code():
    global flag
    while not flag:
        await asyncio.sleep(0)
    flag = False
    # code omitted

For the same reason it's bad practice to issue delays like utime.sleep(1) because that will lock out other tasks for 1s; use await asyncio.sleep(1). Note that the delays implied by asyncio methods sleep and sleep_ms can overrun the specified time. This is because while the delay is in progress other tasks will run. When the delay period completes, execution will not resume until the running task issues await or terminates. A well-behaved task will always issue await at regular intervals. Where a precise delay is required, especially one below a few ms, it may be necessary to use utime.sleep_us(us).

8.5 Why cooperative rather than pre-emptive?

The initial reaction of beginners to the idea of cooperative multi-tasking is often one of disappointment. Surely pre-emptive is better? Why should I have to explicitly yield control when the Python virtual machine can do it for me?

My background is in hardware interfacing: I am not a web developer. I found this video to be an interesting beginner-level introduction to asynchronous web programming which discusses the relative merits of cooperative and pre-emptive scheduling in that environment.

When it comes to embedded systems the cooperative model has two advantages. Firstly, it is lightweight. It is possible to have large numbers of tasks because unlike descheduled threads, paused tasks contain little state. Secondly it avoids some of the subtle problems associated with pre-emptive scheduling. In practice, cooperative multi-tasking is widely used, notably in user interface applications.

To make a case for the defence a pre-emptive model has one advantage: if someone writes

for x in range(1000000):
    # do something time consuming

it won't lock out other threads. Under cooperative schedulers, the loop must explicitly yield control every so many iterations e.g. by putting the code in a task and periodically issuing await asyncio.sleep(0).

Alas this benefit of pre-emption pales into insignificance compared to the drawbacks. Some of these are covered in the documentation on writing interrupt handlers. In a pre-emptive model every thread can interrupt every other thread, changing data which might be used in other threads. It is generally much easier to find and fix a lockup resulting from a task which fails to yield than locating the sometimes deeply subtle and rarely occurring bugs which can occur in pre-emptive code.

To put this in simple terms, if you write a MicroPython task, you can be sure that variables won't suddenly be changed by another task: your task has complete control until it issues await asyncio.sleep(0).

Bear in mind that interrupt handlers are pre-emptive. This applies to both hard and soft interrupts, either of which can occur at any point in your code.

An eloquent discussion of the evils of threading may be found in threads are bad.

8.6 Communication

In non-trivial applications, tasks need to communicate. Conventional Python techniques can be employed. These include the use of global variables or declaring tasks as object methods: these can then share instance variables. Alternatively a mutable object may be passed as a task argument.

Pre-emptive systems mandate specialist classes to achieve "thread safe" communications; in a cooperative system these are seldom required.

9. Polling vs Interrupts

The role of interrupts in cooperative systems has proved to be a source of confusion in the forum. The merit of an interrupt service routine (ISR) is that it runs very soon after the event causing it. On a Pyboard, Python code may be running 15μs after a hardware change, enabling prompt servicing of hardware and accurate timing of signals.

The question arises whether it is possible to use interrupts to cause a task to be scheduled at reduced latency. It is easy to show that, in a cooperative scheduler, interrupts offer no latency benefit compared to polling the hardware directly.

The reason for this is that a cooperative scheduler only schedules tasks when another task has yielded control. Consider a system with a number of concurrent tasks, where the longest any task blocks before yielding to the scheduler is Nms. In such a system, even with an ideal scheduler, the worst-case latency between a hardware event occurring and its handling task being scheduled is Nms, assuming that the mechanism for detecting the event adds no latency of its own.

In practice, N is likely to be on the order of many ms. On fast hardware there will be a negligible performance difference between polling the hardware and polling a flag set by an ISR. On hardware such as ESP8266 and ESP32 the ISR approach will probably be slower owing to the long and variable interrupt latency of these platforms.

Using an ISR to set a flag is probably best reserved for situations where an ISR is already needed for other reasons.

The above comments refer to an ideal scheduler. Currently asyncio is not in this category, with worst-case latency being > Nms. The conclusions remain valid.

This, along with other issues, is discussed in Interfacing asyncio to interrupts.

10. Interfacing threaded code

In the context of a asyncio application, the _thread module has two main uses:

  1. Defining code to run on another core (currently restricted to RP2).
  2. Handling blocking functions. The technique assigns the blocking function to another thread. The asyncio system continues to run, with a single task paused pending the result of the blocking method.

These techniques, and thread-safe classes to enable their use, are presented in this doc.