Skip to content

Latest commit

 

History

History

README.md

Introduction

To comprehend the functionality of the PDM microphone and evaluate the quality of the captured audio, you can utilize the pdm_serial.py script to listen for a few seconds and save the results in a wav file. In the repo there is an example recording named microphone-results.wav.

❗ Ensure that you have scipy, numpy and pyserial installed and configured the required serial port in the script. If you don't know what the port name is, try the code below in the Python console:

from serial.tools.list_ports import comports

for port in comports():
        print(port)

The Recognizer Script 🧐

Samples from the Arduino Nano 33 BLE Sense PDM microphone arrive at a sample rate of 16 kHz and are 16 bits per sample. The PDM code fills a buffer of 512 samples, each 16 bits. This buffer is then passed to Serial.write(), which writes 1024 bytes and proceeds to refill the same buffer for the next burst. Therefore, to record for the desired duration, we read 'fsamp' * 'seconds' transactions and store them in the NumPy buffer. Essentially, we read 1024 bytes each time and convert them into a NumPy array of 512 items with a size of numpy.int16. Hence, it's more convenient to think in terms of bytes to approximate the listening duration.

# Serial COMM
    baudrate = 115200

    # Signal processing variables
    bufsize = 512   # It's the size of the numpy array 'x'
    seconds_to_reset = 200
    conversion = 32     # 32 conversion * 512 bufsize == 1 second at 16KHz of sample rate (almost)
    N = seconds_to_reset * conversion * bufsize     # == [seconds] of recording (almost)
    samp = False
    i = 0
    n = 0
    listening_for = 1.5   # * conversion == [second/bufsize]
    trigger_volume = 17000  # If the audio samples have magnitude greater than this start listening

    # Preparing buffers
    y = np.zeros(int(N), np.int16)
    z = np.zeros(int((listening_for + 1) * conversion * bufsize), np.int16)
    t = np.zeros(int(listening_for * conversion * bufsize), np.int16)

We use Asyncio StreamReader and StreamWriter as interfaces with the serial port. The try and except block will handle problems with the connection. Please read the source code of the serial_asyncio_fast.open_serial_connection() coroutine.

try:
    reader, writer = await serial_asyncio_fast.open_serial_connection(url=serial_port_ACM, baudrate=baudrate)
    print(writer.transport.get_extra_info("serial"))
except:
    print("Problem with serial connection")
    writer = None
    event.clear()

If a connection is established, we begin to receive data, handling any eventual timeouts.

BE CAREFUL WITH TIMEOUTS!

In Python, the Global Interpreter Lock (GIL) restricts execution so that only one thread can run at a time. Consequently, if we have three threads, only one can execute at any given moment. If the speech recognizer thread (referenced below) takes too long, it can delay the release of the GIL. When the GIL is eventually released and control is returned to the asyncio loop/task, it may exceed its allotted time, triggering a coroutine timeout. This situation may lead to the serial being restarted, resulting in a loss of responsiveness for seconds.

That's why, when working in Python, I prefer using an "online API" like Wit.AI. This approach transforms a CPU-bound task like audio transcription into an IO-bound task, as it waits for responses from an external META server. During this wait period, the GIL can be released in favor of the asyncio loop, maintaining high responsiveness to user voice input. If you opt for an offline speech recognizer, consider using multiprocessing instead of multithreading, especially in Python.

while event.is_set() and not stop.is_set():
    
    deadline = loop.time() + 2        # Timeout of two seconds, enough in my tests
    try:
        async with asyncio.timeout_at(deadline):
            ## Don't hog the CPU, data transfer is in background, so reading when enough data are in the Linux buffer.
            ## "await asyncio.sleep(0)" give control to the event loop which has nothing to do, better stop it entirely.
            time.sleep(0.032)          
            data = await reader.readexactly(bufsize * 2)    # In order to read 512 samples of 16 bit each, I need 1024 bytes
    except:
        print("Maybe a timeout, closing...")
        break

Whoa, time.sleep(0.033) in a coroutine!? It will pause the entire loop! Yes, you're right, that's exactly what I want. It's not the CPU that controls the serial data coming in, but rather the USB peripheral, DMA, and interrupt on my host (Raspberry Pi 4) via Linux. So, without sleep, the script will block on a select() (blocking syscall) and restart when the file descriptor is ready to read. readexactly() will constantly block on select() until the exact number of bytes are read. However, if we sleep, the CPU can handle other tasks while the input buffer is filled by external hardware. This hardware will notify Linux, which will unblock the select() in our script. After sleeping, our script is free to read. At this point, enough data will be in the input buffer to prevent further blocking on select(), since we've already paused via sleep, and processing can begin.

But wait, the pyserial package doesn't have a readexactly() method!
Yes, this method is called on the StreamReader instance, which is passed to the create_serial_connection coroutine through a StreamReaderProtocol. Every time data is received, the loop will call the data_received() method of the StreamReaderProtocol, which passes data to the StreamReader instance. The StreamReader instance checks if the exact bytes are received, and if so, it returns to the awaiter (our reader).

class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
        """Helper class to adapt between Protocol and StreamReader.
        
        (This is a helper class instead of making StreamReader itself a
        Protocol subclass, because the StreamReader has other potential
        uses, and to prevent the user of the StreamReader to accidentally
        call inappropriate methods of the protocol.)
        """

        def __init__(...):
                ...

        def data_received(self, data):
                reader = self._stream_reader
                if reader is not None:
                    reader.feed_data(data)        # reader is an asyncio.StreamReader instance

So, looking at the Python source code, the protocol calls the feed_data() method, which fills the read buffer. Then, the readexactly() coroutine (via cooperative multitasking with other looped coroutines) will return the correct number of bytes to the waiter, if possible.

class StreamReader:

        def __init__(...):
                ...

        def feed_data(self, data):
                assert not self._eof, 'feed_data after feed_eof'
        
                if not data:
                    return
        
                self._buffer.extend(data)
                self._wakeup_waiter()
        
                if (self._transport is not None and
                        not self._paused and
                        len(self._buffer) > 2 * self._limit):
                    try:
                        self._transport.pause_reading()
                    except NotImplementedError:
                        # The transport can't be paused.
                        # We'll just have to buffer all data.
                        # Forget the transport so we don't keep trying.
                        self._transport = None
                    else:
                        self._paused = True

        async def readexactly(self, n):
                """Read exactly `n` bytes.
        
                Raise an IncompleteReadError if EOF is reached before `n` bytes can be
                read. The IncompleteReadError.partial attribute of the exception will
                contain the partial read bytes.
        
                if n is zero, return empty bytes object.
        
                Returned value is not limited with limit, configured at stream
                creation.
        
                If stream was paused, this function will automatically resume it if
                needed.
                """
                if n < 0:
                    raise ValueError('readexactly size can not be less than zero')
        
                if self._exception is not None:
                    raise self._exception
        
                if n == 0:
                    return b''
        
                while len(self._buffer) < n:
                    if self._eof:
                        incomplete = bytes(self._buffer)
                        self._buffer.clear()
                        raise exceptions.IncompleteReadError(incomplete, n)
        
                    await self._wait_for_data('readexactly')
        
                if len(self._buffer) == n:
                    data = bytes(self._buffer)
                    self._buffer.clear()
                else:
                    data = bytes(memoryview(self._buffer)[:n])
                    del self._buffer[:n]
                self._maybe_resume_transport()
                return data

So to learn more, check the Python source code and the official doc to see how the state machine is used under the hood.   🤓

Once 1024 bytes are received, we must process the data, so NumPy comes into play, transforming the data read into a buffer of 512 samples of 16 bits each. We save incoming samples in the y buffer; this way, we collect the history of the sampled audio.

        # Data in input is buffered as 16bit, so 1024 bytes are coming at burst
        x = np.frombuffer(data, np.int16)
        # Continuous data recording
        y[n * bufsize: (n+1) * bufsize] = x

Now onto the fun part: digital signal processing. We have present data (x) and past data (y). If the volume of the present data is too high, we start sampling for a time given by the listening_for variable, collecting the samples in the z buffer. For instance, when I say "ACCENDI LUCE", usually from an audio point of view, the "C" is what goes above the volume limit, so there is a possibility to cut the starting of the keywords. This is where y (the history) comes into play: at the beginning, z will save the second before the sampling starts. I'm using fixed size Numpy arrays, so this state machine is fast and efficient with respect to numpy.append() (indeed CPU utilization is decreased of 10% on both Windows and Linux going from dinamically to statically sized arrays).

        '''
            Run to completion state machine, non blocking
        '''
        if x.max(0) >= trigger_volume and not samp:
            # Too loud, start listening for <listening_for>
            samp = True
        
        if samp == True: 
            # Listen and collect data
            if i < listening_for * conversion:
                # Collect also the second before the activation
                if i == 0:
                    if n >= conversion: z[:bufsize * conversion] = y[(n - conversion) * bufsize: n * bufsize]
                    else: z[:bufsize * conversion] = np.roll(y,conversion*bufsize,0)[n * bufsize: (n+conversion) * bufsize]
                t[i * bufsize: (i+1) * bufsize] = x
                i += 1
        
            # Use >= to be sure to enter in this state
            if i >= listening_for * conversion:
                # Send to speech recognizer thread and reset 
                z[bufsize * conversion:] = t
                samp = False
                i = 0
                audio_queue.put(z)
        
        if n < conversion * seconds_to_reset - 1:
            n += 1
        else:
            n = 0

So when enough data is collected, it is sent via a Queue to the recognizer thread that is waiting for it, resetting all the variables to become ready to start new sampling. When y becomes large enough, we cut the first part and start writing on the last one, swapping the two first.

The speech recognizer loop run in another thread!

Subsequently, we transmit the data to Wit.Ai, and the received string is utilized to search for matching keywords, determining the action to be taken with the bedroom light. The light is controlled through a Shelly Plus 1 relay connected to an MQTT broker on a Raspberry Pi 4, where the recognizer script also runs (via a scheduled worker in systemctl). This is why paho will connect to localhost.

while True:
        audio_sample = audio_queue.get()
        if audio_sample is None: break

        audio = sr.AudioData(audio_sample, fsamp, 2)  # retrieve the next audio processing job from the main thread
        voice = ''
        
        # recognize speech using Wit.ai
        WIT_AI_KEY = engine_KEY  # Wit.ai keys are 32-character uppercase alphanumeric strings
        try:
            voice = str(r.recognize_wit_new(audio, key=WIT_AI_KEY)).lower()         
        except sr.UnknownValueError:
            print("Wit.ai could not understand audio")
        except sr.RequestError as e:
            print("Could not request results from Wit.ai service; {0}".format(e))
        except:
            continue
        else:
            if voice != '':
                if all(x in voice for x in matches_on): mqttc.publish(topic=shelly_id+"/command/switch:0", payload="on", qos=2)        # global mqttc
                elif all(x in voice for x in matches_off): mqttc.publish(topic=shelly_id+"/command/switch:0", payload="off", qos=2)

❗ You cannot use the Python receiver as is. ❗

I've modified the source code of the Speech Recognition library due to deprecation warning by Wit.AI. Until the pull request is accepted and integrated into a new release available via PIP, you'll need to replace the __init__.py file found typically when installing the library via pip install SpeechRecognition in the site-packages folder within the Python path with the revised __init__.py. If this explanation is too lengthy, you can simply substitute the try block with the snippet provided below. This change is also backward compatible with the previous __init__.py file.

        try:
            voice = str(r.recognize_wit(audio, key=WIT_AI_KEY)).lower() 

❗ The script is capable of searching for an Arduino device attached to the serial port and will automatically establish a connection to it, managing any eventual disconnection on its own. You won't need to make any changes.
Obviously, there are multiple methods to detect serial ports. The most straightforward one is outlined in this pull request. However, here I also aim to detect whether the port is open, raising a serial.SerialException otherwise.

'''
    Find serial port where PDM MIC is attached
'''
def serial_ports():
    # Lists serial port names
    if sys.platform.startswith('win'):
        ports = ['COM%s' % (i + 1) for i in range(256)]
    elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
        # this excludes your current terminal "/dev/tty"
        ports = glob.glob('/dev/tty[A-Za-z]*')
    elif sys.platform.startswith('darwin'):
        ports = glob.glob('/dev/tty.*')
    else:
        raise EnvironmentError('Unsupported platform')

    result = []
    for port in ports:
        try:
            s = serial.Serial(port)
            s.close()
            result.append(port)
        except (OSError, serial.SerialException):
            pass
    return result

In the main loop, signals like CTRL+C or SIGTERM (via sudo systemctl stop <name of the service>) are handled to stop the script. Otherwise, it will restart gracefully, stopping coroutine and loop, and creating a new one. It is important for the subprocess routine in Python to disconnect the serial port before reusing it; otherwise, Linux can complain about multiple processes hogging the serial port, and it will attempt to change the name.

'''
    Main loop
'''
def loop():
    global event
    global audio_queue
    global stop
    
    while True:

        event.set()

        serial_port = ''
        for serial_port_name in serial_ports():
            if "ACM" in serial_port_name:
                serial_port = serial_port_name
                
        subprocess.run(["fuser", "-k", serial_port])    # Kill process that are using the MIC, if any
        time.sleep(1)   # Give the OS time to start other services

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, ask_exit)      # Handle external signal, also SIGTERM from OS
            
        try:
            loop.run_until_complete(receiver(loop, serial_port))
        except:
            pass

        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.remove_signal_handler(sig)

        while not audio_queue.empty():
            audio_queue.get()           # Empty the queue to restart listening wihtout interfering with old samples
            
        time.sleep(2)
        loop.close()

        if stop.is_set():
            break  
'''
    Closing the asyncio loop and stopping other threads for safe exit, releasing the serial connection
'''
def ask_exit():
    global audio_queue
    global event
    global stop
    global mqttc

    event.clear()           # Stop coroutine
    audio_queue.put(None)   # Stop the recognizer worker
    stop.set()              # Gracefully stop the loop and all other asyncio task in background
    time.sleep(1)
    mqttc.disconnect()      # Stop the MQTT loop on the other thread
    mqttc.loop_stop()
    print("Exiting...")

Curiosities

  • If you use the script on Windows, using read(1024) or readexactly(1024) methods of the reader coming from open_serial_connection won't make any difference because the PySerial Asyncio library on Windows is based on busy polling (the loop calls the OS every 5ms to read samples until 1024 bytes, which is the default limit of the library). ❗ On Windows, this receiver won't work as is because loop.add_signal_handler is only compatible with Linux. Therefore, you should remove that part of the code. However, it's important to note that Windows is capable of interrupting the code even when it doesn't handle signals. If you use WSL, this code will also work. With the latest commit, now the code is totally cross-platform.

  • While on Linux, it is important to note that Asyncio will return immediately when observing the file descriptor, and it is not guaranteed that the read() method will yield enough samples. Indeed, using read(1024) in this manner can offer low latency, but it also exposes the signal processing section of the code to potential bugs (although you can mitigate this by utilizing the x.size variable instead of bufsize, it still may not be the optimal solution). However, with readexactly(), you won't notice any difference in the end. It was all conjecture 🤦‍♂️.

  • On Linux, changing the event loop from default to uvloop dropped the average CPU usage by 1.5%.

  • 🦎 If you're aiming for low latency, consider shifting speech recognition from online to offline on the device, as demonstrated here. However, it's not necessarily a significantly better approach than the one I've described to you so far and it can be worst.

  • If you wish to work with the raw PCM samples from the Arduino, please refer to the binary file sample shared in the test_golang directory. You can listen to it using the instructions provided below.

My use case

The code runs on a Raspberry Pi 4, which is connected to the microphone. The MQTT section will connect to the broker already running on the Raspberry Pi, which is why I'm connecting to 'localhost'.

Thanks to Asyncio, uvloop, the quantum leap provided by this commit, modifying the SpeechRecognition library to utilize the new low-latency Wit.AI API, urllib3 to send requests and Paho MQTT library 2.0, stopping the event loop to wait for enough data in the input serial buffer, I've achieved lower latency than ever before: from the voice command ("ACCENDI LUCE" or "SPEGNERE LE LUCI") to the actions in an average of 1.4 seconds (thanks, of course, to the 1GBit Ethernet on my LAN)!

The resource consumption metrics coming from the top -i Linux command say:

  • max 1.7% of CPU usage
  • min 0% of CPU usage
  • average 0.7% of CPU usage
  • 1.4-1.5% of memory usage

See the Go data about this.

Try to listen to what Arduino Nano 33 BLE Sense produce:

ffplay -f s16le -ar 16000 -autoexit microphone-results.wav

Required Packages

Import the necessary packages into your Python environment. The packages used are:

  • Pyserial master branch (pip install git+https://github.com/pyserial/pyserial)
  • Pyserial asyncio fast because it is more up-to-date (pip install git+https://github.com/home-assistant-libs/pyserial-asyncio-fast)
  • Numpy
  • Speech Recognition fork master branch (pip install git+https://github.com/TIT8/speech_recognition)
  • Paho MQTT

References