From 3a765f438429e575c131f326f7acf81e08254cea Mon Sep 17 00:00:00 2001 From: Krzysztof Chruscinski Date: Thu, 14 Jan 2021 07:59:52 +0100 Subject: [PATCH] lib: os: Add mpsc (multiple producer, single consumer) packet buffer Added module for storing variable length packets in a ring buffer. Implementation assumes multiple producing contexts and single consumer. API provides zero copy functionality with alloc, commit, claim, free scheme. Additionally, there are functions optimized for storing single word packets and packets consisting of a word and a pointer. Buffer can work in two modes: saturation or overwriting the oldest packets when buffer has no space to allocate for a new buffer. Signed-off-by: Krzysztof Chruscinski --- include/sys/mpsc_packet.h | 63 ++++++ include/sys/mpsc_pbuf.h | 245 +++++++++++++++++++++++ lib/os/CMakeLists.txt | 2 + lib/os/Kconfig | 8 + lib/os/mpsc_pbuf.c | 404 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 722 insertions(+) create mode 100644 include/sys/mpsc_packet.h create mode 100644 include/sys/mpsc_pbuf.h create mode 100644 lib/os/mpsc_pbuf.c diff --git a/include/sys/mpsc_packet.h b/include/sys/mpsc_packet.h new file mode 100644 index 000000000000..cb815a5d5c71 --- /dev/null +++ b/include/sys/mpsc_packet.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef ZEPHYR_INCLUDE_SYS_MPSC_PACKET_H_ +#define ZEPHYR_INCLUDE_SYS_MPSC_PACKET_H_ + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Multi producer, single consumer packet header + * @defgroup mpsc_packet MPSC (Multi producer, single consumer) packet header + * @ingroup mpsc_buf + * @{ + */ + +/** @brief Number of bits in the first word which are used by the buffer. */ +#define MPSC_PBUF_HDR_BITS 2 + +/** @brief Header that must be added to the first word in each packet. + * + * This fields are controlled by the packet buffer and unless specified must + * not be used. Fields must be added at the top of the packet header structure. + */ +#define MPSC_PBUF_HDR \ + uint32_t valid: 1; \ + uint32_t busy: 1 + +/** @brief Generic packet header. */ +struct mpsc_pbuf_hdr { + MPSC_PBUF_HDR; + uint32_t data: 32 - MPSC_PBUF_HDR_BITS; +}; + +/** @brief Skip packet used internally by the packet buffer. */ +struct mpsc_pbuf_skip { + MPSC_PBUF_HDR; + uint32_t len: 32 - MPSC_PBUF_HDR_BITS; +}; + +/** @brief Generic packet header. */ +union mpsc_pbuf_generic { + struct mpsc_pbuf_hdr hdr; + struct mpsc_pbuf_skip skip; + uint32_t raw; +}; + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* ZEPHYR_INCLUDE_SYS_MPSC_PACKET_H_ */ diff --git a/include/sys/mpsc_pbuf.h b/include/sys/mpsc_pbuf.h new file mode 100644 index 000000000000..11d9b0e79da0 --- /dev/null +++ b/include/sys/mpsc_pbuf.h @@ -0,0 +1,245 @@ + +/* + * Copyright (c) 2021 Nordic Semiconductor ASA + * + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef ZEPHYR_INCLUDE_SYS_MPSC_PBUF_H_ +#define ZEPHYR_INCLUDE_SYS_MPSC_PBUF_H_ + +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Multi producer, single consumer packet buffer API + * @defgroup mpsc_buf MPSC (Multi producer, single consumer) packet buffer API + * @ingroup kernel_apis + * @{ + */ + +/* + * Multi producer, single consumer packet buffer allows to allocate variable + * length consecutive space for storing a packet. When space is allocated + * it can be filled by the user (except for the first 2 bits) and when packet + * is ready it is commited. It is allowed to allocate another packet before + * commiting the previous one. + * + * If buffer is full and packet cannot be allocated then null is returned unless + * overwrite mode is selected. In that mode, oldest entry are dropped (user is + * notified) until allocation succeeds. It can happen that candidate for + * dropping is currently being claimed. In that case, it is ommited and next + * packet is dropped and claimed packet is marked as invalid when freeing. + * + * Reading packets is performed in two steps. First packet is claimed. Claiming + * returns pointer to the packet within the buffer. Packet is freed when no + * longer in use. + */ + +/**@defgroup MPSC_PBUF_FLAGS MPSC packet buffer flags + * @{ */ + +/** @brief Flag indicating that buffer size is power of 2. + * + * When buffer size is power of 2 then optimizations are applied. + */ +#define MPSC_PBUF_SIZE_POW2 BIT(0) + +/** @brief Flag indicating buffer full policy. + * + * If flag is set then when allocating from a full buffer oldest packets are + * dropped. When flag is not set then allocation returns null. + */ +#define MPSC_PBUF_MODE_OVERWRITE BIT(1) + +/**@} */ + +/* Forward declaration */ +struct mpsc_pbuf_buffer; + +/** @brief Callback prototype for getting length of a packet. + * + * @param packet User packet. + * + * @return Size of the packet in 32 bit words. + */ +typedef uint32_t (*mpsc_pbuf_get_wlen)(union mpsc_pbuf_generic *packet); + +/** @brief Callback called when packet is dropped. + * + * @param buffer Packet buffer. + * + * @param packet Packet that is being dropped. + */ +typedef void (*mpsc_pbuf_notify_drop)(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic *packet); + +/** @brief MPSC packet buffer structure. */ +struct mpsc_pbuf_buffer { + /** Temporary write index. */ + uint32_t tmp_wr_idx; + + /** Write index. */ + uint32_t wr_idx; + + /** Temporary read index. */ + uint32_t tmp_rd_idx; + + /** Read index. */ + uint32_t rd_idx; + + /** Flags. */ + uint32_t flags; + + /** Lock. */ + struct k_spinlock lock; + + /** User callback called whenever packet is dropped. */ + mpsc_pbuf_notify_drop notify_drop; + + /** Callback for getting packet length. */ + mpsc_pbuf_get_wlen get_wlen; + + /* Buffer. */ + uint32_t *buf; + + /* Buffer size in 32 bit words. */ + uint32_t size; + + struct k_sem sem; +}; + +/** @brief MPSC packet buffer configuration. */ +struct mpsc_pbuf_buffer_config { + /* Pointer to a memory used for storing packets. */ + uint32_t *buf; + + /* Buffer size in 32 bit words. */ + uint32_t size; + + /* Callbacks. */ + mpsc_pbuf_notify_drop notify_drop; + mpsc_pbuf_get_wlen get_wlen; + + /* Configuration flags. */ + uint32_t flags; +}; + +/** @brief Initnialize a packet buffer. + * + * @param buffer Buffer. + * + * @param config Configuration. + */ +void mpsc_pbuf_init(struct mpsc_pbuf_buffer *buffer, + const struct mpsc_pbuf_buffer_config *config); + +/** @brief Allocate a packet. + * + * If a buffer is configured to overwrite mode then if there is no space to + * allocated a new buffer, oldest packets are dropped. Otherwise allocation + * fails and null pointer is returned. + * + * @param buffer Buffer. + * + * @param wlen Number of words to allocate. + * + * @param timeout Timeout. If called from thread context it will pend for given + * timeout if packet cannot be allocated before dropping the oldest or + * returning null. + * + * @return Pointer to the allocated space or null if it cannot be allocated. + */ +union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, + size_t wlen, k_timeout_t timeout); + +/** @brief Commit a packet. + * + * @param buffer Buffer. + * + * @param packet Pointer to a packet allocated by @ref mpsc_pbuf_alloc. + */ +void mpsc_pbuf_commit(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic *packet); + +/** @brief Put single word packet into a buffer. + * + * Function is optimized for storing a packet which fit into a single word. + * Note that 2 bits of that word is used by the buffer. + * + * @param buffer Buffer. + * + * @param word Packet content consisting of MPSC_PBUF_HDR with valid bit set + * and data on remaining bits. + */ +void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic word); + +/** @brief Put a packet consisting of a word and a pointer. + * * + * Function is optimized for storing packet consisting of a word and a pointer. + * Note that 2 bits of a first word is used by the buffer. + * + * @param buffer Buffer. + * + * @param word First word of a packet consisting of MPSC_PBUF_HDR with valid + * bit set and data on remaining bits. + * + * @param data User data. + */ +void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic word, void *data); + +/** @brief Put a packet into a buffer. + * + * Copy data into a buffer. + * Note that 2 bits of a first word is used by the buffer. + * + * @param buffer Buffer. + * + * @param data First word of data must contain MPSC_PBUF_HDR with valid set. + * + * @param wlen Packet size in words. + */ +void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, + uint32_t *data, size_t wlen); + +/** @brief Claim the first pending packet. + * + * @param buffer Buffer. + */ +union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer); + +/** @brief Free a packet. + * + * @param buffer Buffer. + * + * @param packet Packet. + */ +void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic *packet); + +/** @brief Check if there are any message pending. + * + * @param buffer Buffer. + * + * @retval true if pending. + * @retval false if no message is pending. + */ +bool mpsc_pbuf_is_pending(struct mpsc_pbuf_buffer *buffer); + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* ZEPHYR_INCLUDE_SYS_MPSC_PBUF_H_ */ diff --git a/lib/os/CMakeLists.txt b/lib/os/CMakeLists.txt index 155e26c58814..dcd7b4930fd7 100644 --- a/lib/os/CMakeLists.txt +++ b/lib/os/CMakeLists.txt @@ -35,6 +35,8 @@ zephyr_sources_ifdef(CONFIG_ASSERT assert.c) zephyr_sources_ifdef(CONFIG_USERSPACE mutex.c user_work.c) +zephyr_sources_ifdef(CONFIG_MPSC_PBUF mpsc_pbuf.c) + zephyr_sources_ifdef(CONFIG_SCHED_DEADLINE p4wq.c) zephyr_library_include_directories( diff --git a/lib/os/Kconfig b/lib/os/Kconfig index dfffc03b976d..262f71aaebb3 100644 --- a/lib/os/Kconfig +++ b/lib/os/Kconfig @@ -60,6 +60,14 @@ config PRINTK_SYNC interleaving with concurrent usage from another CPU or an preempting interrupt. +config MPSC_PBUF + bool "Multi producer, single consumer packet buffer" + select TIMEOUT_64BIT + help + Enable usage of mpsc packet buffer. Packet buffer is capable of + storing variable length packets in a circular way and operate directly + on the buffer memory. + rsource "Kconfig.cbprintf" endmenu diff --git a/lib/os/mpsc_pbuf.c b/lib/os/mpsc_pbuf.c new file mode 100644 index 000000000000..67695b8c17ab --- /dev/null +++ b/lib/os/mpsc_pbuf.c @@ -0,0 +1,404 @@ +/* + * Copyright (c) 2021 Nordic Semiconductor + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include + +void mpsc_pbuf_init(struct mpsc_pbuf_buffer *buffer, + const struct mpsc_pbuf_buffer_config *cfg) +{ + int err; + + memset(buffer, 0, offsetof(struct mpsc_pbuf_buffer, buf)); + buffer->get_wlen = cfg->get_wlen; + buffer->notify_drop = cfg->notify_drop; + buffer->buf = cfg->buf; + buffer->size = cfg->size; + buffer->flags = cfg->flags; + + if (is_power_of_two(buffer->size)) { + buffer->flags |= MPSC_PBUF_SIZE_POW2; + } + + err = k_sem_init(&buffer->sem, 0, 1); + __ASSERT_NO_MSG(err == 0); +} + +static inline bool free_space(struct mpsc_pbuf_buffer *buffer, uint32_t *res) +{ + if (buffer->rd_idx > buffer->tmp_wr_idx) { + *res = buffer->rd_idx - buffer->tmp_wr_idx - 1; + + return false; + } else if (!buffer->rd_idx) { + *res = buffer->size - buffer->tmp_wr_idx - 1; + return false; + } + + *res = buffer->size - buffer->tmp_wr_idx; + + return true; +} + +static inline bool available(struct mpsc_pbuf_buffer *buffer, uint32_t *res) +{ + if (buffer->tmp_rd_idx <= buffer->wr_idx) { + *res = (buffer->wr_idx - buffer->tmp_rd_idx); + + return false; + } + + *res = buffer->size - buffer->tmp_rd_idx; + + return true; +} + +static inline bool is_valid(union mpsc_pbuf_generic *item) +{ + return item->hdr.valid; +} + +static inline bool is_invalid(union mpsc_pbuf_generic *item) +{ + return !item->hdr.valid && !item->hdr.busy; +} + +static inline uint32_t idx_inc(struct mpsc_pbuf_buffer *buffer, + uint32_t idx, uint32_t val) +{ + uint32_t i = idx + val; + + if (buffer->flags & MPSC_PBUF_SIZE_POW2) { + return i & (buffer->size - 1); + } + + return (i >= buffer->size) ? i - buffer->size : i; +} + +static inline uint32_t idx_dec(struct mpsc_pbuf_buffer *buffer, + uint32_t idx, uint32_t val) +{ + uint32_t i = idx - val; + + if (buffer->flags & MPSC_PBUF_SIZE_POW2) { + return idx & (buffer->size - 1); + } + + return (i >= buffer->size) ? i + buffer->size : i; +} + +static inline uint32_t get_skip(union mpsc_pbuf_generic *item) +{ + if (item->hdr.busy && !item->hdr.valid) { + return item->skip.len; + } + + return 0; +} + +static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen) +{ + union mpsc_pbuf_generic skip = { + .skip = { .valid = 0, .busy = 1, .len = wlen } + }; + + buffer->buf[buffer->tmp_wr_idx] = skip.raw; + buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen); + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen); +} + +static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer, + uint32_t free_wlen) +{ + union mpsc_pbuf_generic *item; + uint32_t rd_wlen; + uint32_t skip_wlen; + + item = (union mpsc_pbuf_generic *)&buffer->buf[buffer->rd_idx]; + skip_wlen = get_skip(item); + + rd_wlen = skip_wlen ? skip_wlen : buffer->get_wlen(item); + if (skip_wlen) { + item = NULL; + } else if (item->hdr.busy) { + /* item is currently processed and cannot be overwritten. */ + add_skip_item(buffer, free_wlen + 1); + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen); + buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, rd_wlen); + + /* Get next itme followed the busy one. */ + uint32_t next_rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen); + + item = (union mpsc_pbuf_generic *)&buffer->buf[next_rd_idx]; + skip_wlen = get_skip(item); + if (skip_wlen) { + item = NULL; + rd_wlen += skip_wlen; + } else { + rd_wlen += buffer->get_wlen(item); + } + } + + buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen); + buffer->tmp_rd_idx = buffer->rd_idx; + + return item; +} + +void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic item) +{ + bool cont; + uint32_t free_wlen; + k_spinlock_key_t key; + union mpsc_pbuf_generic *dropped_item = NULL; + + do { + cont = false; + key = k_spin_lock(&buffer->lock); + (void)free_space(buffer, &free_wlen); + if (free_wlen) { + buffer->buf[buffer->tmp_wr_idx] = item.raw; + buffer->tmp_wr_idx = idx_inc(buffer, + buffer->tmp_wr_idx, 1); + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, 1); + } else if (buffer->flags & MPSC_PBUF_MODE_OVERWRITE) { + dropped_item = drop_item_locked(buffer, free_wlen); + cont = true; + } else { + /* empty */ + } + + k_spin_unlock(&buffer->lock, key); + + if (cont && dropped_item) { + /* Notify about item being dropped. */ + buffer->notify_drop(buffer, dropped_item); + } + } while (cont); + +} + +union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, + size_t wlen, k_timeout_t timeout) +{ + union mpsc_pbuf_generic *item = NULL; + union mpsc_pbuf_generic *dropped_item = NULL; + bool cont; + uint32_t free_wlen; + + do { + k_spinlock_key_t key; + bool wrap; + + cont = false; + key = k_spin_lock(&buffer->lock); + wrap = free_space(buffer, &free_wlen); + + if (free_wlen >= wlen) { + item = + (union mpsc_pbuf_generic *)&buffer->buf[buffer->tmp_wr_idx]; + item->hdr.valid = 0; + item->hdr.busy = 0; + buffer->tmp_wr_idx = idx_inc(buffer, + buffer->tmp_wr_idx, wlen); + } else if (wrap) { + add_skip_item(buffer, free_wlen); + cont = true; + } else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) && + !k_is_in_isr()) { + int err; + + k_spin_unlock(&buffer->lock, key); + err = k_sem_take(&buffer->sem, timeout); + key = k_spin_lock(&buffer->lock); + if (err == 0) { + cont = true; + } + } else if (buffer->flags & MPSC_PBUF_MODE_OVERWRITE) { + dropped_item = drop_item_locked(buffer, free_wlen); + cont = true; + } + + k_spin_unlock(&buffer->lock, key); + + if (cont && dropped_item) { + /* Notify about item being dropped. */ + buffer->notify_drop(buffer, dropped_item); + dropped_item = NULL; + } + } while (cont); + + return item; +} + +void mpsc_pbuf_commit(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic *item) +{ + uint32_t wlen = buffer->get_wlen(item); + + k_spinlock_key_t key = k_spin_lock(&buffer->lock); + + item->hdr.valid = 1; + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen); + k_spin_unlock(&buffer->lock, key); +} + +void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic item, void *data) +{ + static const size_t l = + (sizeof(item) + sizeof(data)) / sizeof(uint32_t); + union mpsc_pbuf_generic *dropped_item = NULL; + bool cont; + + do { + k_spinlock_key_t key; + uint32_t free_wlen; + bool wrap; + + cont = false; + key = k_spin_lock(&buffer->lock); + wrap = free_space(buffer, &free_wlen); + + if (free_wlen >= l) { + buffer->buf[buffer->tmp_wr_idx] = item.raw; + void **p = + (void **)&buffer->buf[buffer->tmp_wr_idx + 1]; + + *p = data; + buffer->tmp_wr_idx = + idx_inc(buffer, buffer->tmp_wr_idx, l); + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, l); + } else if (wrap) { + add_skip_item(buffer, free_wlen); + cont = true; + } else if (buffer->flags & MPSC_PBUF_MODE_OVERWRITE) { + dropped_item = drop_item_locked(buffer, free_wlen); + cont = true; + } else { + /* empty */ + } + + k_spin_unlock(&buffer->lock, key); + + if (cont && dropped_item) { + /* Notify about item being dropped. */ + buffer->notify_drop(buffer, dropped_item); + dropped_item = NULL; + } + } while (cont); +} + +void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, uint32_t *data, + size_t wlen) +{ + bool cont; + union mpsc_pbuf_generic *dropped_item = NULL; + + do { + uint32_t free_wlen; + k_spinlock_key_t key; + bool wrap; + + cont = false; + key = k_spin_lock(&buffer->lock); + wrap = free_space(buffer, &free_wlen); + + if (free_wlen >= wlen) { + memcpy(&buffer->buf[buffer->tmp_wr_idx], data, + wlen * sizeof(uint32_t)); + buffer->tmp_wr_idx = + idx_inc(buffer, buffer->tmp_wr_idx, wlen); + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen); + } else if (wrap) { + add_skip_item(buffer, free_wlen); + cont = true; + } else if (buffer->flags & MPSC_PBUF_MODE_OVERWRITE) { + dropped_item = drop_item_locked(buffer, free_wlen); + cont = true; + } + + k_spin_unlock(&buffer->lock, key); + + if (cont && dropped_item) { + /* Notify about item being dropped. */ + buffer->notify_drop(buffer, dropped_item); + dropped_item = NULL; + } + } while (cont); +} + +union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer) +{ + union mpsc_pbuf_generic *item; + bool cont; + + do { + uint32_t a; + k_spinlock_key_t key; + bool wrap; + + cont = false; + key = k_spin_lock(&buffer->lock); + wrap = available(buffer, &a); + item = (union mpsc_pbuf_generic *) + &buffer->buf[buffer->tmp_rd_idx]; + + if (!a || is_invalid(item)) { + item = NULL; + } else { + uint32_t skip = get_skip(item); + + if (skip || !is_valid(item)) { + uint32_t inc = + skip ? skip : buffer->get_wlen(item); + + buffer->tmp_rd_idx = + idx_inc(buffer, buffer->tmp_rd_idx, inc); + buffer->rd_idx = + idx_inc(buffer, buffer->rd_idx, inc); + cont = true; + } else { + item->hdr.busy = 1; + buffer->tmp_rd_idx = + idx_inc(buffer, buffer->tmp_rd_idx, + buffer->get_wlen(item)); + } + } + + k_spin_unlock(&buffer->lock, key); + } while (cont); + + return item; +} + +void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer, + union mpsc_pbuf_generic *item) +{ + uint32_t wlen = buffer->get_wlen(item); + k_spinlock_key_t key = k_spin_lock(&buffer->lock); + + item->hdr.valid = 0; + if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE) || + ((uint32_t *)item == &buffer->buf[buffer->rd_idx])) { + item->hdr.busy = 0; + buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen); + } else { + item->skip.len = wlen; + } + + k_spin_unlock(&buffer->lock, key); + k_sem_give(&buffer->sem); +} + +bool mpsc_pbuf_is_pending(struct mpsc_pbuf_buffer *buffer) +{ + uint32_t a; + + (void)available(buffer, &a); + + return a ? true : false; +} +