Skip to content

Commit

Permalink
消息队列支持优先级
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangkang authored and zhkag committed Jun 25, 2023
1 parent d8a3b7c commit 7b7a370
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 20 deletions.
1 change: 1 addition & 0 deletions components/libc/posix/ipc/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ config RT_USING_POSIX_PIPE_SIZE
config RT_USING_POSIX_MESSAGE_QUEUE
bool "Enable posix message queue <mqueue.h>"
select RT_USING_POSIX_CLOCK
select RT_USING_MESSAGEQUEUE_PRIORITY
default n

config RT_USING_POSIX_MESSAGE_SEMAPHORE
Expand Down
7 changes: 4 additions & 3 deletions components/libc/posix/ipc/mqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ ssize_t mq_receive(mqd_t id, char *msg_ptr, size_t msg_len, unsigned *msg_prio)
return -1;
}

result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER);
result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, RT_WAITING_FOREVER, RT_UNINTERRUPTIBLE);
if (result >= 0)
return rt_strlen(msg_ptr);

Expand All @@ -255,7 +255,7 @@ int mq_send(mqd_t id, const char *msg_ptr, size_t msg_len, unsigned msg_prio)
return -1;
}

result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len);
result = rt_mq_send_wait_prio(mqdes->mq, (void *)msg_ptr, msg_len, msg_prio, 0, RT_UNINTERRUPTIBLE);
if (result == RT_EOK)
return 0;

Expand Down Expand Up @@ -287,7 +287,8 @@ ssize_t mq_timedreceive(mqd_t id,
if (abs_timeout != RT_NULL)
tick = rt_timespec_to_tick(abs_timeout);

result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick);
result = rt_mq_recv_prio(mqdes->mq, msg_ptr, msg_len, (rt_int32_t *)msg_prio, tick, RT_UNINTERRUPTIBLE);

if (result >= 0)
return rt_strlen(msg_ptr);

Expand Down
18 changes: 18 additions & 0 deletions include/rtthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ struct rt_mq_message
{
struct rt_mq_message *next;
rt_ssize_t length;
#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
rt_int32_t prio;
#endif
};

#define RT_MQ_BUF_SIZE(msg_size, max_msgs) \
Expand Down Expand Up @@ -515,6 +518,21 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout);
rt_err_t rt_mq_control(rt_mq_t mq, int cmd, void *arg);

#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
rt_err_t rt_mq_send_wait_prio(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t prio,
rt_int32_t timeout,
int suspend_flag);
rt_err_t rt_mq_recv_prio(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t *prio,
rt_int32_t timeout,
int suspend_flag);
#endif
#endif

/* defunct */
Expand Down
8 changes: 7 additions & 1 deletion src/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,16 @@ menu "Inter-Thread communication"
bool "Enable mailbox"
default y

config RT_USING_MESSAGEQUEUE
menuconfig RT_USING_MESSAGEQUEUE
bool "Enable message queue"
default y

if RT_USING_MESSAGEQUEUE
config RT_USING_MESSAGEQUEUE_PRIORITY
bool "Enable message queue priority"
default n
endif

config RT_USING_SIGNALS
bool "Enable signals"
select RT_USING_MEMPOOL
Expand Down
86 changes: 70 additions & 16 deletions src/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3174,11 +3174,12 @@ RTM_EXPORT(rt_mq_delete);
* @warning This function can be called in interrupt context and thread
* context.
*/
static rt_err_t _rt_mq_send_wait(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t timeout,
int suspend_flag)
static rt_err_t _rt_mq_send_wait(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t prio,
rt_int32_t timeout,
int suspend_flag)
{
rt_base_t level;
struct rt_mq_message *msg;
Expand Down Expand Up @@ -3304,6 +3305,33 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq,

/* disable interrupt */
level = rt_hw_interrupt_disable();
#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
msg->prio = prio;
if (mq->msg_queue_head == RT_NULL)
mq->msg_queue_head = msg;

struct rt_mq_message *node, *prevNode = RT_NULL;
for (node = mq->msg_queue_head; node != RT_NULL; node = node->next)
{
if (node->prio < msg->prio)
{
if (prevNode == RT_NULL)
mq->msg_queue_head = msg;
else
prevNode->next = msg;
msg->next = node;
break;
}
if (node->next == RT_NULL)
{
if (node != msg)
node->next = msg;
mq->msg_queue_tail = msg;
break;
}
prevNode = node;
}
#else
/* link msg to message queue */
if (mq->msg_queue_tail != RT_NULL)
{
Expand All @@ -3316,6 +3344,7 @@ static rt_err_t _rt_mq_send_wait(rt_mq_t mq,
/* if the head is empty, set head */
if (mq->msg_queue_head == RT_NULL)
mq->msg_queue_head = msg;
#endif

if(mq->entry < RT_MQ_ENTRY_MAX)
{
Expand Down Expand Up @@ -3352,7 +3381,7 @@ rt_err_t rt_mq_send_wait(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout)
{
return _rt_mq_send_wait(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE);
return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE);
}
RTM_EXPORT(rt_mq_send_wait);

Expand All @@ -3361,7 +3390,7 @@ rt_err_t rt_mq_send_wait_interruptible(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout)
{
return _rt_mq_send_wait(mq, buffer, size, timeout, RT_INTERRUPTIBLE);
return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE);
}
RTM_EXPORT(rt_mq_send_wait_interruptible);

Expand All @@ -3370,7 +3399,7 @@ rt_err_t rt_mq_send_wait_killable(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout)
{
return _rt_mq_send_wait(mq, buffer, size, timeout, RT_KILLABLE);
return _rt_mq_send_wait(mq, buffer, size, 0, timeout, RT_KILLABLE);
}
RTM_EXPORT(rt_mq_send_wait_killable);
/**
Expand Down Expand Up @@ -3540,11 +3569,12 @@ RTM_EXPORT(rt_mq_urgent);
* @return Return the real length of the message. When the return value is larger than zero, the operation is successful.
* If the return value is any other values, it means that the mailbox release failed.
*/
static rt_ssize_t _rt_mq_recv(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t timeout,
int suspend_flag)
static rt_ssize_t _rt_mq_recv(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t *prio,
rt_int32_t timeout,
int suspend_flag)
{
struct rt_thread *thread;
rt_base_t level;
Expand Down Expand Up @@ -3675,6 +3705,10 @@ static rt_ssize_t _rt_mq_recv(rt_mq_t mq,
/* copy message */
rt_memcpy(buffer, GET_MESSAGEBYTE_ADDR(msg), len);

#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
if (prio != RT_NULL)
*prio = msg->prio;
#endif
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* put message to free list */
Expand Down Expand Up @@ -3709,7 +3743,7 @@ rt_ssize_t rt_mq_recv(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout)
{
return _rt_mq_recv(mq, buffer, size, timeout, RT_UNINTERRUPTIBLE);
return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_UNINTERRUPTIBLE);
}
RTM_EXPORT(rt_mq_recv);

Expand All @@ -3718,7 +3752,7 @@ rt_ssize_t rt_mq_recv_interruptible(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout)
{
return _rt_mq_recv(mq, buffer, size, timeout, RT_INTERRUPTIBLE);
return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_INTERRUPTIBLE);
}
RTM_EXPORT(rt_mq_recv_interruptible);

Expand All @@ -3727,8 +3761,28 @@ rt_ssize_t rt_mq_recv_killable(rt_mq_t mq,
rt_size_t size,
rt_int32_t timeout)
{
return _rt_mq_recv(mq, buffer, size, timeout, RT_KILLABLE);
return _rt_mq_recv(mq, buffer, size, 0, timeout, RT_KILLABLE);
}
#ifdef RT_USING_MESSAGEQUEUE_PRIORITY
rt_err_t rt_mq_send_wait_prio(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t prio,
rt_int32_t timeout,
int suspend_flag)
{
return _rt_mq_send_wait(mq, buffer, size, prio, timeout, suspend_flag);
}
rt_err_t rt_mq_recv_prio(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t *prio,
rt_int32_t timeout,
int suspend_flag)
{
return _rt_mq_recv(mq, buffer, size, prio, timeout, suspend_flag);
}
#endif
RTM_EXPORT(rt_mq_recv_killable);
/**
* @brief This function will set some extra attributions of a messagequeue object.
Expand Down

0 comments on commit 7b7a370

Please sign in to comment.