RT-Thread之消息队列_rtthread消息队列-程序员宅基地

技术标签: RTOS  RT-Thread  

消息队列的应用场景

消息队列是常用的线程间通信方式,是一种异步的通信方式。消息队列可以应用于多种场合:线程间的消息交换、在中断服务函数中给线程发送消息(中断服务例程不可能接收消息)、使用串口接收不定长数据等。

消息队列的基本概念

队列又称消息队列,是一种常用于线程间通信的数据结构。队列可以在线程与线程间、中断与线程间传送消息,实现了线程接收来自其它线程或中断的不固定长度的消息,并根据不同的接口选择传递的消息是否存放在自己的空间。线程能够从队列中读取消息,当队列中的消息为空时,挂起读取线程。用户还可以指定线程挂起的超时时间timeout;当队列中有新消息时,挂起的读取线程被唤醒以接收并处理新消息。

通过消息队列服务,线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出(FIFO)原则。同时,RT-Thread中的消息队列支持优先级,所有等待消息的线程中优先级最高的会先获得消息。

用户在处理业务时,消息队列提供了异步处理机制,允许将一个消息放入队列,但并不立即处理它,同时队列还能起到缓冲消息的作用。

RT-Thread中使用队列数据结构实现线程异步通信工作,具有如下特性:

  • 消息支持先进先出排队方式与优先级排队方式,支持异步读写工作方式
  • 读队列支持超时机制
  • 支持发送紧急消息
  • 可以允许不同长度(不超过队列节点最大值)的任意类型消息
  • 一个线程能够从任意一个消息队列接收和发送消息
  • 多个线程能够从同一个消息队列接收和发送消息
  • 当队列使用结束后,需要通过删除队列操作来释放内存

消息队列的工作机制

创建消息队列时先创建一个消息队列对象控制块。然后给消息队列分配一块内存空间,组织成空闲消息链表,这块内存的大小等于[(消息大小+消息头)*消息队列容量](对应后文中struct rt_messagequeue中的msg_size以及max_msgs)。接着再初始化消息队列,此时消息队列为空。

RT-Thread操作系统的消息队列对象由多个元素组成,当消息队列被创建时,它就被分配了消息队列控制块:消息队列名称、内存缓冲区消息大小以及队列长度等。同时,每个消息队列对象中包含多个消息框,每个消息框可以存放一条消息。消息队列中的第一个和最后一个消息框分别被称为消息链表头和消息链表尾,对应于消息队列控制块中的msg_queue_head和msg_queue_tail。有些消息框可能是空的,它们通过msg_queue_free形成一个空闲消息框链表。所有消息队列中的消息框总数就是消息队列的长度,这个长度可在消息队列创建时被指定。

线程或中断服务程序都可以给消息队列发送消息。当发送消息时,

(1)消息队列对象先从空闲消息链表上取出下一个空闲消息快;

(2)把线程或中断服务例程发送的消息内容复制到消息块上;

(3)然后把该消息块挂到消息队列的尾部。

当且仅当空闲消息链表上有可用的空闲消息块,发送者才能成功发送消息;当空闲消息链表上无可用消息块时,说明消息队列已满,此时,发送消息的线程或者中断服务程序会收到一个错误码(-RT_EFULL)。

发送紧急消息的过程与发送消息的过程几乎一样,唯一不同的是,当发送紧急消息时,从空闲消息链表上取下来的消息快不是挂到消息队列的队尾,而是挂到消息队列的队首,这样,接受者就能够优先接收到紧急消息,从而及时进行消息处理。

读取消息时,根据消息链表头(msg_queue_head)找到最先进入队列的消息节点进行读取。根据消息队列控制块中的entry判断队列是否有消息读取,对于全部空闲的队列(entry为0)进行读消息操作会引起线程挂起。

当消息队列不再被使用时,应该删除它以释放系统资源。一旦删除操作完成,消息队列将被永久性地删除。

消息队列的阻塞机制

消息队列一般不是属于某个线程的队列,在很多时候,创建的队列是每个线程都可以对其进行读写操作的,但是为了保护被每个线程对其进行读写操作的过程,必须有阻塞机制。在某个线程对消息队列进行读写操作时,必须保证该线程能正常完成读写操作,而不受后来的线程干扰。

那么,如何实现这个机制呢?

很简单,因为RT-Thread已经提供了这种机制,直接使用即可。每个对消息队列读写的函数都有这种机制,称其为阻塞机制。

举个例子:假设有一个线程A,在其对某个队列进行读操作(出队)时发现此队列上没有消息,那么此时线程A有三种选择:第一种选择,既然队列没有消息,那么不再等待,去进行其它操作,这样线程A不会进入阻塞态;第二种选择,线程A继续等待,此时线程A会进入阻塞状态,等待消息的到来。而线程A的等待时间可以自行定义,比如设置1000个tick的超时时间,在这段时间之内,如果没有消息,线程A都是处于阻塞态;若阻塞的这段时间内,线程A收到了队列的消息,则线程A就会由阻塞态变为就绪态。如果此时线程A的优先级比当前运行的线程的优先级高,则线程A就会得到消息并且运行。加入1000个tick过去了,队列还是没有消息,那么线程A就不再等待了,而是从阻塞态中(超时)唤醒,并返回一个没有等到消息的错误代码,然后继续执行线程A的其它代码;第三种选择,线程A一直等待,直到收到消息,这样线程A就会进入阻塞态,直到完成读取队列的消息。

与接收消息机制不同,发送消息操作并不带有阻塞机制,因为发送消息的环境可能是在中断中,不允许出现阻塞的情况。在消息队列已满的情况下,将返回一个错误码(-RT_EFULL)。

消息队列控制块

在RT-Thread中,消息队列控制块是操作系统用于管理消息队列的一个数据结构,由结构体struct rt_messagequeue表示,另一种C表达方式为rt_mq_t,表示的是消息队列的句柄,在C语言中的实现是消息队列控制块的指针。消息队列控制块结构体的详细定义在include\rtdef.h中,代码如下所示:

#ifdef RT_USING_MESSAGEQUEUE
/**
 * message queue structure
 */
struct rt_messagequeue
{
    struct rt_ipc_object parent;                        /**< inherit from ipc_object */

    void                *msg_pool;                      /**< start address of message queue */

    rt_uint16_t          msg_size;                      /**< message size of each message */
    rt_uint16_t          max_msgs;                      /**< max number of messages */

    rt_uint16_t          entry;                         /**< index of messages in the queue */

    void                *msg_queue_head;                /**< list head */
    void                *msg_queue_tail;                /**< list tail */
    void                *msg_queue_free;                /**< pointer indicated the free node of queue */

    rt_list_t            suspend_sender_thread;         /**< sender thread suspended on this message queue */
};
typedef struct rt_messagequeue *rt_mq_t;
#endif

消息队列控制块是一个结构体,其中含有消息队列相关的重要参数,在消息队列的功能实现中起着重要的作用。消息队列控制块包含了每个消息队列的信息,如消息队列名称、存放消息的消息池起始地址、每条消息的长度以及队列中已有的消息个数和能够容纳的最大消息数量等。rt_messagequeue对象从rt_ipc_object中派生,由IPC容器所管理。

消息队列相关接口函数

对一个消息队列的操作包括:创建/初始化消息队列、发送消息、接收消息、删除/脱离消息队列。

使用消息队列模块的典型流程如下:

(1)消息队列创建/初始化函数rt_mq_create()/rt_mq_init()

(2)消息队列发送消息函数rt_mq_send()

(3)消息队列接收函数rt_mq_recv()

(4)消息队列删除;/脱离函数rt_mq_delete()/rt_mq_detach()

  • 创建消息队列

消息队列在使用前,应该被创建出来,或对已有的静态消息队列对象进行初始化,创建消息队列的函数接口在include\rt_thread.h中定义,代码如下:

#ifdef RT_USING_HEAP
rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag);
rt_err_t rt_mq_delete(rt_mq_t mq);
#endif

rt_mq_create函数的参数和返回值如下表所示:

参数 描述
name 消息队列的名称
msg_size 消息队列中一条消息的最大长度,单位为字节
max_msgs 消息队列的最大个数
flag

消息队列采用的灯带方式,可以取如下数值:

RT_IPC_FLAG_FIFO或RT_IPC_FLAG_PRIO

返回值 描述
消息队列对象的句柄 成功
RT_NULL 失败

创建消息队列时先从对象管理器中分配一个消息队列对象,然后给消息队列对象分配一块内存空间,组织成空闲消息链表,这块内存的大小如前所述,为:[消息大小+消息头(用于链表连接)的大小]*消息队列最大个数。接着再初始化消息队列,此时消息队列为空。具体代码实现在src\ipc.c中,如下所示:

rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag)
{
    struct rt_messagequeue *mq;
    struct rt_mq_message *head;
    register rt_base_t temp;

    RT_ASSERT((flag == RT_IPC_FLAG_FIFO) || (flag == RT_IPC_FLAG_PRIO));

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* allocate object */
    mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);
    if (mq == RT_NULL)
        return mq;

    /* set parent */
    mq->parent.parent.flag = flag;

    /* initialize ipc object */
    _ipc_object_init(&(mq->parent));

    /* initialize message queue */

    /* get correct message size */
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = max_msgs;

    /* allocate message pool */
    mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);
    if (mq->msg_pool == RT_NULL)
    {
        rt_object_delete(&(mq->parent.parent));

        return RT_NULL;
    }

    /* initialize message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* initialize message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)
    {
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = (struct rt_mq_message *)mq->msg_queue_free;
        mq->msg_queue_free = head;
    }

    /* the initial entry is zero */
    mq->entry = 0;

    /* initialize an additional list of sender suspend thread */
    rt_list_init(&(mq->suspend_sender_thread));

    return mq;
}
RTM_EXPORT(rt_mq_create);

上边为函数概述,函数详细步骤描述如下:

(1)分配消息队列对象。调用rt_object_allocate函数将从对象系统为创建的消息队列分配一个对象,并且设置对象名称。在系统中,对象的名称必须是唯一的。

(2)设置消息队列的阻塞唤醒模式,创建的消息队列针对于指定的flag有不同的意义。使用RT_IPC_FLAG_PRIO创建的IPC对象,在多个线程等待队列资源时,优先级高的线程将优先获得资源;而使用RT_IPC_FLAG_FIFO创建的IPC对象,在多个线程等待队列资源时,将按照先到先得的顺序获得资源。RT_IPC_FLAG_PRIO和RT_IPC_FLAG_FIFO均在rtdef.h中定义,代码如下所示:

#define RT_IPC_FLAG_FIFO                0x00            /**< FIFOed IPC. @ref IPC. */
#define RT_IPC_FLAG_PRIO                0x01            /**< PRIOed IPC. @ref IPC. */

(3)初始化消息队列内核对象。此处会初始化一个链表,用于记录访问此队列而阻塞的线程,通过这个链表,可以找到对应的阻塞线程的控制块,从而能恢复线程。

(4)设置消息队列的节点大小与消息队列的最大容量。节点大小要按照RT_ALIGN_SIZE字节对齐,消息队列的容量由用户自己定义。

(5)给此消息队列分配内存。这块内存的大小为[消息大小+消息头(用于链表连接)的大小]*消息队列容量。每个消息节点中都有一个消息头,用于链表链接,指向下一个消息节点,作为消息的排序。

(6)初始化消息队列头尾链表。

(7)将所有的消息队列的节点连接起来,形成空闲链表。

(8)消息队列的个数置零。

注意:在创建消息队列时,是需要用户自己定义消息队列的句柄的,但定义了消息队列的句柄并不等于创建了队列,创建队列必须通过调用rt_mq_create函数来完成。否则,以后根据队列句柄使用消息队列的其它函数时会发生错误。在创建消息队列时会返回创建的结果,如果创建成功,则返回消息队列句柄;如果返回RT_NULL,则表示创建失败。

关于rt_mq_create函数,必须提到一个容易被忽略的关键点:struct rt_mq_message。前文已经多次提到一个概念:消息头,但是一直没有明确说明消息头具体是指什么,是什么结构,在这里详细讲解一下。上面代码中的struct rt_mq_message即指的是消息头,此结构定义于src\ipc.c中,代码如下:

/**
 * @addtogroup messagequeue
 */

/**@{*/

struct rt_mq_message
{
    struct rt_mq_message *next;
};

由代码可以看出,就是一个最基本的单链表结构。虽然很简单,但明确一下是非常有必要的。

  • 初始化消息队列

初始化消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

rt_err_t rt_mq_init(rt_mq_t     mq,
                    const char *name,
                    void       *msgpool,
                    rt_size_t   msg_size,
                    rt_size_t   pool_size,
                    rt_uint8_t  flag);

rt_mq_init函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
name 消息队列的名称
msgpool 指向存放消息的缓冲区的指针
msg_size 消息队列中一条消息的最大长度,单位为字节
pool_size 存放消息的缓冲区大小
flag 消息队列采用的等待方式,可以取如下数值:RT_IPC_FLAG_FIFO或RT_IPC_FLAG_PRIO
返回值 描述
RT_EOK 成功

初始化静态消息队列对象跟创建消息队列对象类似,只是静态消息队列对象的内存是在系统编译时由编译器分配的,一般放于读数据段或未初始化数据段中。在使用这类静态消息队列对象前,需要进行初始化。初始化消息队列时,该接口需要用户已经申请获得的消息队列对象的句柄(即指向消息队列对象控制块的指针)、消息队列名、消息缓冲区指针、消息大小以及消息队列缓冲区大小。消息队列初始化后,所有消息都挂在空闲消息链表上,消息队列为空。具体代码实现在src\ipc.c中,如下所示:


/**
 * @brief    Initialize a static messagequeue object.
 *
 * @note     For the static messagequeue object, its memory space is allocated by the compiler during compiling,
 *           and shall placed on the read-write data segment or on the uninitialized data segment.
 *           By contrast, the rt_mq_create() function will allocate memory space automatically
 *           and initialize the messagequeue.
 *
 * @see      rt_mq_create()
 *
 * @param    mq is a pointer to the messagequeue to initialize. It is assumed that storage for
 *           the messagequeue will be allocated in your application.
 *
 * @param    name is a pointer to the name that given to the messagequeue.
 *
 * @param    msgpool is a pointer to the starting address of the memory space you allocated for
 *           the messagequeue in advance.
 *           In other words, msgpool is a pointer to the messagequeue buffer of the starting address.
 *
 * @param    msg_size is the maximum length of a message in the messagequeue (Unit: Byte).
 *
 * @param    pool_size is the size of the memory space allocated for the messagequeue in advance.
 *
 * @param    flag is the messagequeue flag, which determines the queuing way of how multiple threads wait
 *           when the messagequeue is not available.
 *           The messagequeue flag can be ONE of the following values:
 *
 *               RT_IPC_FLAG_PRIO          The pending threads will queue in order of priority.
 *
 *               RT_IPC_FLAG_FIFO          The pending threads will queue in the first-in-first-out method
 *                                         (also known as first-come-first-served (FCFS) scheduling strategy).
 *
 *               NOTE: RT_IPC_FLAG_FIFO is a non-real-time scheduling mode. It is strongly recommended to
 *               use RT_IPC_FLAG_PRIO to ensure the thread is real-time UNLESS your applications concern about
 *               the first-in-first-out principle, and you clearly understand that all threads involved in
 *               this messagequeue will become non-real-time threads.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           If the return value is any other values, it represents the initialization failed.
 *
 * @warning  This function can ONLY be called from threads.
 */
rt_err_t rt_mq_init(rt_mq_t     mq,
                    const char *name,
                    void       *msgpool,
                    rt_size_t   msg_size,
                    rt_size_t   pool_size,
                    rt_uint8_t  flag)
{
    struct rt_mq_message *head;
    register rt_base_t temp;

    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT((flag == RT_IPC_FLAG_FIFO) || (flag == RT_IPC_FLAG_PRIO));

    /* initialize object */
    rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name);

    /* set parent flag */
    mq->parent.parent.flag = flag;

    /* initialize ipc object */
    _ipc_object_init(&(mq->parent));

    /* set message pool */
    mq->msg_pool = msgpool;

    /* get correct message size */
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message));

    /* initialize message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* initialize message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)
    {
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = (struct rt_mq_message *)mq->msg_queue_free;
        mq->msg_queue_free = head;
    }

    /* the initial entry is zero */
    mq->entry = 0;

    /* initialize an additional list of sender suspend thread */
    rt_list_init(&(mq->suspend_sender_thread));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_init);

  • 删除消息队列

当消息队列不再被使用时,应该删除它以释放系统资源。一旦操作完成,消息队列将被永久性地删除。删除消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

#ifdef RT_USING_HEAP
rt_err_t rt_mq_delete(rt_mq_t mq);
#endif

 rt_mq_delete函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
返回值 描述
RT_EOK 成功

删除消息队列时,如果有线程被挂起在该消息队列等待队列上,则内核先唤醒挂起在该消息等待队列上的所有线程(线程返回值是-RT_ERROR),然后再释放消息队列使用的内存,最后删除消息队列对象。具体代码实现在src\ipc.c中,如下所示:

/**
 * @brief    This function will delete a messagequeue object and release the memory.
 *
 * @note     This function is used to delete a messagequeue object which is created by the rt_mq_create() function.
 *           By contrast, the rt_mq_detach() function will detach a static messagequeue object.
 *           When the messagequeue is successfully deleted, it will resume all suspended threads in the messagequeue list.
 *
 * @see      rt_mq_detach()
 *
 * @param    mq is a pointer to a messagequeue object to be deleted.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can ONLY delete a messagequeue initialized by the rt_mq_create() function.
 *           If the messagequeue is initialized by the rt_mq_init() function, you MUST NOT USE this function to delete it,
 *           ONLY USE the rt_mq_detach() function to complete the detachment.
 *           for example,the rt_mq_create() function, it cannot be called in interrupt context.
 */
rt_err_t rt_mq_delete(rt_mq_t mq)
{
    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE);

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* resume all suspended thread */
    _ipc_list_resume_all(&(mq->parent.suspend_thread));
    /* also resume all message queue private suspended thread */
    _ipc_list_resume_all(&(mq->suspend_sender_thread));

    /* free message queue pool */
    RT_KERNEL_FREE(mq->msg_pool);

    /* delete message queue object */
    rt_object_delete(&(mq->parent.parent));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_delete);

消息队列删除函数是根据消息队列句柄直接进行删除的,删除之后这个消息队列的所有信息都会被清空,而且 不能再次使用这个消息队列。需要注意的是,如果某个消息队列没有被创建,那当然也是无法被删除的。删除消息队列时会把所有由于访问此消息队列而进入阻塞态的线程从阻塞链表中删除,mq是rt_mq_delete()传入的参数,是消息队列句柄,表示的是想要删除哪个队列。

(1)检测消息队列是否被创建了,只有已经被创建了,才可以进行删除操作。

(2)调用_ipc_list_resume_all函数将所有由于访问此队列而阻塞的线程从阻塞状态中恢复过来,线程获得队列返回的错误代码。在实际情况中一般不这样使用,在删除时,应先确认所有的线程都无须再次访问此队列,并且此时没有线程被此队列阻塞,才进行删除操作。

(3)删除了消息队列,那肯定要把消息队列的内存释放出来。

(4)删除消息队列对象并且释放消息队列内核对象的内存,释放内核对象内存在rt_object_delete函数中实现。

消息队列删除函数rt_mq_delete的使用也很简单,只需传入要删除的消息队列的句柄即可。调用rt_mq_delete函数时,系统将删除这个消息队列。如果删除该消息队列时有线程正在等待消息,那么删除操作会先唤醒等待在消息队列上的线程(等待线程的返回值是-RT_ERROR)。

  • 脱离消息队列

脱离消息队列将使消息队列对象从内核对象管理器中脱离。脱离消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

rt_err_t rt_mq_detach(rt_mq_t mq);

rt_mq_detach函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
返回值 描述
RT_EOK 成功

使用该接口后,内核先唤醒所有挂在该消息等待队列对象上的线程(线程返回值是-RT_ERROR),然后将该消息队列对象从内核对象管理器中脱离。具体代码实现在src\ipc.c中,如下所示:

/**
 * @brief    This function will detach a static messagequeue object.
 *
 * @note     This function is used to detach a static messagequeue object which is initialized by rt_mq_init() function.
 *           By contrast, the rt_mq_delete() function will delete a messagequeue object.
 *           When the messagequeue is successfully detached, it will resume all suspended threads in the messagequeue list.
 *
 * @see      rt_mq_delete()
 *
 * @param    mq is a pointer to a messagequeue object to be detached.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can ONLY detach a static messagequeue initialized by the rt_mq_init() function.
 *           If the messagequeue is created by the rt_mq_create() function, you MUST NOT USE this function to detach it,
 *           and ONLY USE the rt_mq_delete() function to complete the deletion.
 */
rt_err_t rt_mq_detach(rt_mq_t mq)
{
    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent));

    /* resume all suspended thread */
    _ipc_list_resume_all(&mq->parent.suspend_thread);
    /* also resume all message queue private suspended thread */
    _ipc_list_resume_all(&(mq->suspend_sender_thread));

    /* detach message queue object */
    rt_object_detach(&(mq->parent.parent));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_detach);

  • 消息队列发送消息

线程或者中断服务程序都可以给消息队列发送消息。当发送消息时,消息队列对象先从空闲消息链表上取下一个空闲消息块,把线程或者中断服务程序发送的消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。当且仅当空闲消息链表上有可用的空闲消息块时,发送者才能成功发送消息;当空闲消息链表上无可用消息块时,说明消息队列已满,此时,发送消息的线程或者中断服务程序会收到一个错误码(-RT_EFULL)。发送消息的函数接口在include\rt_thread.h中定义,代码如下:

rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size);
rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout);

rt_mq_send函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
buffer 消息内容
size 消息大小
返回 描述
RT_EOK 成功
-RT_EFULL 消息队列已满
-RT_ERROR 失败,表示发送的消息长度大于消息队列中消息的最大长度

发送消息时,发送者需指定发送的消息队列的对象句柄(即指向消息队列控制块的指针),并且指定发送的消息内容以及消息大小。在发送一条普通消息之后,空闲消息链表上的队首消息被转移到了消息队列尾。发送消息的函数实现在src\ipc.c中,代码如下:

/**
 * @brief    This function will send a message to the messagequeue object.
 *           If there is a thread suspended on the messagequeue, the thread will be resumed.
 *
 * @note     When using this function to send a message, if the messagequeue is fully used,
 *           the current thread will wait for a timeout.
 *           By contrast, when the messagequeue is fully used, the rt_mq_send_wait() function will
 *           return an error code immediately without waiting.
 *
 * @see      rt_mq_send_wait()
 *
 * @param    mq is a pointer to the messagequeue object to be sent.
 *
 * @param    buffer is the content of the message.
 *
 * @param    size is the length of the message(Unit: Byte).
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can be called in interrupt context and thread context.
 */
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
{
    return rt_mq_send_wait(mq, buffer, size, 0);
}
RTM_EXPORT(rt_mq_send);

该函数实际上是rt_mq_send_wait函数的封装,rt_mq_send_wait函数也在同文件中实现,就在上边,代码如下:

/**
 * @brief    This function will send a message to the messagequeue object. If
 *           there is a thread suspended on the messagequeue, the thread will be
 *           resumed.
 *
 * @note     When using this function to send a message, if the messagequeue is
 *           fully used, the current thread will wait for a timeout. If reaching
 *           the timeout and there is still no space available, the sending
 *           thread will be resumed and an error code will be returned. By
 *           contrast, the rt_mq_send() function will return an error code
 *           immediately without waiting when the messagequeue if fully used.
 *
 * @see      rt_mq_send()
 *
 * @param    mq is a pointer to the messagequeue object to be sent.
 *
 * @param    buffer is the content of the message.
 *
 * @param    size is the length of the message(Unit: Byte).
 *
 * @param    timeout is a timeout period (unit: an OS tick).
 *
 * @return   Return the operation status. When the return value is RT_EOK, the
 *           operation is successful. If the return value is any other values,
 *           it means that the messagequeue detach failed.
 *
 * @warning  This function can be called in interrupt context and thread
 * context.
 */
rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout)
{
    register rt_ubase_t temp;
    struct rt_mq_message *msg;
    rt_uint32_t tick_delta;
    struct rt_thread *thread;

    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);

    /* current context checking */
    RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);

    /* greater than one message size */
    if (size > mq->msg_size)
        return -RT_ERROR;

    /* initialize delta tick */
    tick_delta = 0;
    /* get current thread */
    thread = rt_thread_self();

    RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();

    /* get a free list, there must be an empty item */
    msg = (struct rt_mq_message *)mq->msg_queue_free;
    /* for non-blocking call */
    if (msg == RT_NULL && timeout == 0)
    {
        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        return -RT_EFULL;
    }

    /* message queue is full */
    while ((msg = (struct rt_mq_message *)mq->msg_queue_free) == RT_NULL)
    {
        /* reset error number in thread */
        thread->error = RT_EOK;

        /* no waiting, return timeout */
        if (timeout == 0)
        {
            /* enable interrupt */
            rt_hw_interrupt_enable(temp);

            return -RT_EFULL;
        }

        /* suspend current thread */
        _ipc_list_suspend(&(mq->suspend_sender_thread),
                            thread,
                            mq->parent.parent.flag);

        /* has waiting time, start thread timer */
        if (timeout > 0)
        {
            /* get the start tick of timer */
            tick_delta = rt_tick_get();

            RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n",
                                        thread->name));

            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        /* re-schedule */
        rt_schedule();

        /* resume from suspend state */
        if (thread->error != RT_EOK)
        {
            /* return error */
            return thread->error;
        }

        /* disable interrupt */
        temp = rt_hw_interrupt_disable();

        /* if it's not waiting forever and then re-calculate timeout tick */
        if (timeout > 0)
        {
            tick_delta = rt_tick_get() - tick_delta;
            timeout -= tick_delta;
            if (timeout < 0)
                timeout = 0;
        }
    }

    /* move free list pointer */
    mq->msg_queue_free = msg->next;

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    /* the msg is the new tailer of list, the next shall be NULL */
    msg->next = RT_NULL;
    /* copy buffer */
    rt_memcpy(msg + 1, buffer, size);

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();
    /* link msg to message queue */
    if (mq->msg_queue_tail != RT_NULL)
    {
        /* if the tail exists, */
        ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
    }

    /* set new tail */
    mq->msg_queue_tail = msg;
    /* if the head is empty, set head */
    if (mq->msg_queue_head == RT_NULL)
        mq->msg_queue_head = msg;

    if(mq->entry < RT_MQ_ENTRY_MAX)
    {
        /* increase message entry */
        mq->entry ++;
    }
    else
    {
        rt_hw_interrupt_enable(temp); /* enable interrupt */
        return -RT_EFULL; /* value overflowed */
    }

    /* resume suspended thread */
    if (!rt_list_isempty(&mq->parent.suspend_thread))
    {
        _ipc_list_resume(&(mq->parent.suspend_thread));

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        rt_schedule();

        return RT_EOK;
    }

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    return RT_EOK;
}
RTM_EXPORT(rt_mq_send_wait)

实际上较早一些的版本如3.0.3版本中并没有单独拎出一个rt_mq_send_wait函数,只有rt_mq_send函数,所有的实现都在其中。 

(1)在发送消息时需要传递一些参数:rt_mq_t mq是已经创建的队列句柄;void *buffer是即将发送消息的存储地址;rt_size_t size是即将发送的消息的大小。

(2)检测传递进来的参数,即使这些参数中有一个是无效的,就无法发送消息。

(3)判断消息的大小,其大小不能超过创建时设置的消息队列的大小mq->msg_size。用户可以自定义大小,如果mq->msg_size不够,可以在创建时设置得大一些。

(4)获取一个空闲链表的指针。必须有一个空闲链表节点用于存放要发送的消息,如果消息队列已经满了,则无法发送消息。

(5)移动空闲链表指针。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/phmatthaus/article/details/123401823

智能推荐

Docker 快速上手学习入门教程_docker菜鸟教程-程序员宅基地

文章浏览阅读2.5w次,点赞6次,收藏50次。官方解释是,docker 容器是机器上的沙盒进程,它与主机上的所有其他进程隔离。所以容器只是操作系统中被隔离开来的一个进程,所谓的容器化,其实也只是对操作系统进行欺骗的一种语法糖。_docker菜鸟教程

电脑技巧:Windows系统原版纯净软件必备的两个网站_msdn我告诉你-程序员宅基地

文章浏览阅读5.7k次,点赞3次,收藏14次。该如何避免的,今天小编给大家推荐两个下载Windows系统官方软件的资源网站,可以杜绝软件捆绑等行为。该站提供了丰富的Windows官方技术资源,比较重要的有MSDN技术资源文档库、官方工具和资源、应用程序、开发人员工具(Visual Studio 、SQLServer等等)、系统镜像、设计人员工具等。总的来说,这两个都是非常优秀的Windows系统镜像资源站,提供了丰富的Windows系统镜像资源,并且保证了资源的纯净和安全性,有需要的朋友可以去了解一下。这个非常实用的资源网站的创建者是国内的一个网友。_msdn我告诉你

vue2封装对话框el-dialog组件_<el-dialog 封装成组件 vue2-程序员宅基地

文章浏览阅读1.2k次。vue2封装对话框el-dialog组件_

MFC 文本框换行_c++ mfc同一框内输入二行怎么换行-程序员宅基地

文章浏览阅读4.7k次,点赞5次,收藏6次。MFC 文本框换行 标签: it mfc 文本框1.将Multiline属性设置为True2.换行是使用"\r\n" (宽字符串为L"\r\n")3.如果需要编辑并且按Enter键换行,还要将 Want Return 设置为 True4.如果需要垂直滚动条的话将Vertical Scroll属性设置为True,需要水平滚动条的话将Horizontal Scroll属性设_c++ mfc同一框内输入二行怎么换行

redis-desktop-manager无法连接redis-server的解决方法_redis-server doesn't support auth command or ismis-程序员宅基地

文章浏览阅读832次。检查Linux是否是否开启所需端口,默认为6379,若未打开,将其开启:以root用户执行iptables -I INPUT -p tcp --dport 6379 -j ACCEPT如果还是未能解决,修改redis.conf,修改主机地址:bind 192.168.85.**;然后使用该配置文件,重新启动Redis服务./redis-server redis.conf..._redis-server doesn't support auth command or ismisconfigured. try

实验四 数据选择器及其应用-程序员宅基地

文章浏览阅读4.9k次。济大数电实验报告_数据选择器及其应用

随便推点

灰色预测模型matlab_MATLAB实战|基于灰色预测河南省社会消费品零售总额预测-程序员宅基地

文章浏览阅读236次。1研究内容消费在生产中占据十分重要的地位,是生产的最终目的和动力,是保持省内经济稳定快速发展的核心要素。预测河南省社会消费品零售总额,是进行宏观经济调控和消费体制改变创新的基础,是河南省内人民对美好的全面和谐社会的追求的要求,保持河南省经济稳定和可持续发展具有重要意义。本文建立灰色预测模型,利用MATLAB软件,预测出2019年~2023年河南省社会消费品零售总额预测值分别为21881...._灰色预测模型用什么软件

log4qt-程序员宅基地

文章浏览阅读1.2k次。12.4-在Qt中使用Log4Qt输出Log文件,看这一篇就足够了一、为啥要使用第三方Log库,而不用平台自带的Log库二、Log4j系列库的功能介绍与基本概念三、Log4Qt库的基本介绍四、将Log4qt组装成为一个单独模块五、使用配置文件的方式配置Log4Qt六、使用代码的方式配置Log4Qt七、在Qt工程中引入Log4Qt库模块的方法八、获取示例中的源代码一、为啥要使用第三方Log库,而不用平台自带的Log库首先要说明的是,在平时开发和调试中开发平台自带的“打印输出”已经足够了。但_log4qt

100种思维模型之全局观思维模型-67_计算机中对于全局观的-程序员宅基地

文章浏览阅读786次。全局观思维模型,一个教我们由点到线,由线到面,再由面到体,不断的放大格局去思考问题的思维模型。_计算机中对于全局观的

线程间控制之CountDownLatch和CyclicBarrier使用介绍_countdownluach于cyclicbarrier的用法-程序员宅基地

文章浏览阅读330次。一、CountDownLatch介绍CountDownLatch采用减法计算;是一个同步辅助工具类和CyclicBarrier类功能类似,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。二、CountDownLatch俩种应用场景: 场景一:所有线程在等待开始信号(startSignal.await()),主流程发出开始信号通知,既执行startSignal.countDown()方法后;所有线程才开始执行;每个线程执行完发出做完信号,既执行do..._countdownluach于cyclicbarrier的用法

自动化监控系统Prometheus&Grafana_-自动化监控系统prometheus&grafana实战-程序员宅基地

文章浏览阅读508次。Prometheus 算是一个全能型选手,原生支持容器监控,当然监控传统应用也不是吃干饭的,所以就是容器和非容器他都支持,所有的监控系统都具备这个流程,_-自动化监控系统prometheus&grafana实战

React 组件封装之 Search 搜索_react search-程序员宅基地

文章浏览阅读4.7k次。输入关键字,可以通过键盘的搜索按钮完成搜索功能。_react search