From 7b7a3702fc6cdc54a5deec7445b5e1c1710e32a5 Mon Sep 17 00:00:00 2001 From: zhangkang Date: Wed, 26 Apr 2023 17:15:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E4=BC=98=E5=85=88=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- components/libc/posix/ipc/Kconfig | 1 + components/libc/posix/ipc/mqueue.c | 7 +-- include/rtthread.h | 18 +++++++ src/Kconfig | 8 ++- src/ipc.c | 86 ++++++++++++++++++++++++------ 5 files changed, 100 insertions(+), 20 deletions(-) diff --git a/components/libc/posix/ipc/Kconfig b/components/libc/posix/ipc/Kconfig index a1e4fc163236..cf3135e53a15 100644 --- a/components/libc/posix/ipc/Kconfig +++ b/components/libc/posix/ipc/Kconfig @@ -24,6 +24,7 @@ config RT_USING_POSIX_PIPE_SIZE config RT_USING_POSIX_MESSAGE_QUEUE bool "Enable posix message queue " select RT_USING_POSIX_CLOCK + select RT_USING_MESSAGEQUEUE_PRIORITY default n config RT_USING_POSIX_MESSAGE_SEMAPHORE diff --git a/components/libc/posix/ipc/mqueue.c b/components/libc/posix/ipc/mqueue.c index 22d285477098..49ce6bf98d64 100644 --- a/components/libc/posix/ipc/mqueue.c +++ b/components/libc/posix/ipc/mqueue.c @@ -232,7 +232,7 @@ ssize_t mq_receive(mqd_t id, char *msg_ptr, size_t msg_len, unsigned *msg_prio) return -1; } - result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER); + result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, RT_WAITING_FOREVER, RT_UNINTERRUPTIBLE); if (result >= 0) return rt_strlen(msg_ptr); @@ -255,7 +255,7 @@ int mq_send(mqd_t id, const char *msg_ptr, size_t msg_len, unsigned msg_prio) return -1; } - result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len); + result = rt_mq_send_wait_prio(mqdes->mq, (void *)msg_ptr, msg_len, msg_prio, 0, RT_UNINTERRUPTIBLE); if (result == RT_EOK) return 0; @@ -287,7 +287,8 @@ ssize_t mq_timedreceive(mqd_t id, if (abs_timeout != RT_NULL) tick = rt_timespec_to_tick(abs_timeout); - result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick); + result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, tick, RT_UNINTERRUPTIBLE); + if (result >= 0) return rt_strlen(msg_ptr); diff --git a/include/rtthread.h b/include/rtthread.h index dfcd1c413ce6..ea3ee6f81a75 100644 --- a/include/rtthread.h +++ b/include/rtthread.h @@ -463,6 +463,9 @@ struct rt_mq_message { struct rt_mq_message *next; rt_ssize_t length; +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + rt_int32_t prio; +#endif }; #define RT_MQ_BUF_SIZE(msg_size, max_msgs) \ @@ -515,6 +518,21 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t mq, rt_size_t size, rt_int32_t timeout); rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg); + +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY +rt_err_t rt_mq_send_wait_prio(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t prio, + rt_int32_t timeout, + int suspend_flag); +rt_err_t rt_mq_recv_prio(rt_mq_t mq, + void *buffer, + rt_size_t size, + rt_int32_t *prio, + rt_int32_t timeout, + int suspend_flag); +#endif #endif /* defunct */ diff --git a/src/Kconfig b/src/Kconfig index 5cabfdbd722d..4fd6de3ff9a3 100644 --- a/src/Kconfig +++ b/src/Kconfig @@ -313,10 +313,16 @@ menu "Inter-Thread communication" bool "Enable mailbox" default y - config RT_USING_MESSAGEQUEUE + menuconfig RT_USING_MESSAGEQUEUE bool "Enable message queue" default y + if RT_USING_MESSAGEQUEUE + config RT_USING_MESSAGEQUEUE_PRIORITY + bool "Enable message queue priority" + default n + endif + config RT_USING_SIGNALS bool "Enable signals" select RT_USING_MEMPOOL diff --git a/src/ipc.c b/src/ipc.c index 3a5a0efa6132..04656ffdbc3b 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -3174,11 +3174,12 @@ RTM_EXPORT(rt_mq_delete); * @warning This function can be called in interrupt context and thread * context. */ -static rt_err_t _rt_mq_send_wait(rt_mq_t mq, - const void *buffer, - rt_size_t size, - rt_int32_t timeout, - int suspend_flag) +static rt_err_t _rt_mq_send_wait(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t prio, + rt_int32_t timeout, + int suspend_flag) { rt_base_t level; struct rt_mq_message *msg; @@ -3304,6 +3305,33 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq, /* disable interrupt */ level = rt_hw_interrupt_disable(); +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + msg->prio = prio; + if (mq->msg_queue_head == RT_NULL) + mq->msg_queue_head = msg; + + struct rt_mq_message *node, *prevNode = RT_NULL; + for (node = mq->msg_queue_head; node != RT_NULL; node = node->next) + { + if (node->prio < msg->prio) + { + if (prevNode == RT_NULL) + mq->msg_queue_head = msg; + else + prevNode->next = msg; + msg->next = node; + break; + } + if (node->next == RT_NULL) + { + if (node != msg) + node->next = msg; + mq->msg_queue_tail = msg; + break; + } + prevNode = node; + } +#else /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { @@ -3316,6 +3344,7 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq, /* if the head is empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg; +#endif if(mq->entry < RT_MQ_ENTRY_MAX) { @@ -3352,7 +3381,7 @@ rt_err_t rt_mq_send_wait(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_send_wait(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE); + return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE); } RTM_EXPORT(rt_mq_send_wait); @@ -3361,7 +3390,7 @@ rt_err_t rt_mq_send_wait_interruptible(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_send_wait(mq, buffer, size, timeout, RT_INTERRUPTIBLE); + return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE); } RTM_EXPORT(rt_mq_send_wait_interruptible); @@ -3370,7 +3399,7 @@ rt_err_t rt_mq_send_wait_killable(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_send_wait(mq, buffer, size, timeout, RT_KILLABLE); + return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_KILLABLE); } RTM_EXPORT(rt_mq_send_wait_killable); /** @@ -3540,11 +3569,12 @@ RTM_EXPORT(rt_mq_urgent); * @return Return the real length of the message. When the return value is larger than zero, the operation is successful. * If the return value is any other values, it means that the mailbox release failed. */ -static rt_ssize_t _rt_mq_recv(rt_mq_t mq, - void *buffer, - rt_size_t size, - rt_int32_t timeout, - int suspend_flag) +static rt_ssize_t _rt_mq_recv(rt_mq_t mq, + void *buffer, + rt_size_t size, + rt_int32_t *prio, + rt_int32_t timeout, + int suspend_flag) { struct rt_thread *thread; rt_base_t level; @@ -3675,6 +3705,10 @@ static rt_ssize_t _rt_mq_recv(rt_mq_t mq, /* copy message */ rt_memcpy(buffer, GET_MESSAGEBYTE_ADDR(msg), len); +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY + if (prio != RT_NULL) + *prio = msg->prio; +#endif /* disable interrupt */ level = rt_hw_interrupt_disable(); /* put message to free list */ @@ -3709,7 +3743,7 @@ rt_ssize_t rt_mq_recv(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_recv(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE); + return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE); } RTM_EXPORT(rt_mq_recv); @@ -3718,7 +3752,7 @@ rt_ssize_t rt_mq_recv_interruptible(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_recv(mq, buffer, size, timeout, RT_INTERRUPTIBLE); + return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE); } RTM_EXPORT(rt_mq_recv_interruptible); @@ -3727,8 +3761,28 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t mq, rt_size_t size, rt_int32_t timeout) { - return _rt_mq_recv(mq, buffer, size, timeout, RT_KILLABLE); + return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_KILLABLE); } +#ifdef RT_USING_MESSAGEQUEUE_PRIORITY +rt_err_t rt_mq_send_wait_prio(rt_mq_t mq, + const void *buffer, + rt_size_t size, + rt_int32_t prio, + rt_int32_t timeout, + int suspend_flag) +{ + return _rt_mq_send_wait(mq, buffer, size, prio, timeout, suspend_flag); +} +rt_err_t rt_mq_recv_prio(rt_mq_t mq, + void *buffer, + rt_size_t size, + rt_int32_t *prio, + rt_int32_t timeout, + int suspend_flag) +{ + return _rt_mq_recv(mq, buffer, size, prio, timeout, suspend_flag); +} +#endif RTM_EXPORT(rt_mq_recv_killable); /** * @brief This function will set some extra attributions of a messagequeue object.