消息队列编程和案例,进程间通信 mq_open mq_close mq_unlink mq_setattr mq_getattr mq_send mq_rece

2024-09-01 11:05:03 浏览数 (2)

一、介绍

mq_open mq_close mq_unlink mq_setattr mq_getattr mq_send mq_receive

是 POSIX 消息队列(POSIX message queues)中用于发送和接收消息的函数。POSIX 消息队列是一种进程间通信(IPC)机制,允许进程以消息的形式交换数据。

哈哈哈哈,先了解一下函数,最后来个案例。

二、mq_open

功能:打开(如果已存在)或创建一个消息队列。

代码语言:javascript复制
#include <mqueue.h>  
#include <fcntl.h>  
#include <sys/stat.h>  

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
  • name:消息队列的名称,必须是以斜杠(/)开头的绝对路径名。
  • oflag:操作标志,可以是 O_NONBLOCK(非阻塞模式)和 O_CREAT(如果队列不存在则创建)的组合。
  • mode:如果 O_CREAT 被设置,则指定新创建队列的权限。
  • attr:指向 mq_attr 结构体的指针,用于指定队列的属性(如最大消息大小和队列容量)。如果为 NULL,则使用默认属性。

返回值:成功时返回消息队列描述符,失败时返回 (mqd_t)-1 并设置 errno。

三、mq_send

功能:用于将一条消息发送到指定的消息队列中。

代码语言:javascript复制
#include <mqueue.h>  

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

mqdes:消息队列描述符,由 mq_open 返回。 msg_ptr:指向要发送的消息的指针。 msg_len:消息的长度(以字节为单位),必须小于或等于消息队列的 mq_msgsize 属性。 msg_prio:消息的优先级,一个无符号整数,值越大优先级越高。

如果函数成功,返回 0;如果失败,返回 -1 并设置 errno 以指示错误。

四、mq_receive

mq_receive 函数用于从指定的消息队列中接收一条消息。

代码语言:javascript复制
#include <mqueue.h>  
  
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

mqdes:消息队列描述符。 msg_ptr:指向接收消息的缓冲区的指针。 msg_len:缓冲区的长度(以字节为单位),应足够大以容纳可能接收到的最大消息。 msg_prio:如果非 NULL,则用于存储接收到的消息的优先级。

如果成功,mq_receive 返回接收到的消息的实际长度(以字节为单位)。如果失败,则返回 -1 并设置 errno。

五、mq_close

功能:关闭消息队列描述符。

代码语言:javascript复制
#include <mqueue.h>  

int mq_close(mqd_t mqdes);
  • mqdes:消息队列描述符。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

六. mq_unlink

功能:删除消息队列。

代码语言:javascript复制
​
#include <mqueue.h>  

int mq_unlink(const char *name);

name:消息队列的名称。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

七、mq_setattr

功能:设置消息队列的属性。

代码语言:javascript复制
​#include <mqueue.h>  

int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);

mqdes:消息队列描述符。 newattr:指向新的 mq_attr 结构体的指针。 oldattr:如果非 NULL,则用于存储旧属性的副本。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

八、 mq_getattr

功能:获取消息队列的属性。

代码语言:javascript复制
 ​#include <mqueue.h>  

int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);

mqdes:消息队列描述符。 mqstat:指向 mq_attr 结构体的指针,用于存储队列的属性。

返回值:成功时返回 0,失败时返回 -1 并设置 errno。

九、消息队列案例

接收端代码

代码语言:javascript复制
#include <iostream>
#include <fcntl.h>           // For O_* constants
#include <sys/stat.h>        // For mode constants
#include <mqueue.h>

const char* QUEUE_NAME = "/my_message_queue";
const int MAX_MSG_SIZE = 1024;

int main() {

    int rc;
    struct mq_attr mqAttr;

    printf ("Bringing up server.n");
    rc = mq_unlink (QUEUE_NAME);
    if (rc < 0) {
        printf ("   Warning on server mq_unlink.n");
    }

    mqAttr.mq_maxmsg = 10;
    mqAttr.mq_msgsize = 1024;

    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, S_IWUSR|S_IRUSR, &mqAttr);
    if (mq == -1) {
        perror("mq_open");
        return 1;
    }

    // 阻塞通信阶段
    std::cout << "Starting in blocking mode." << std::endl;
    char buffer[MAX_MSG_SIZE];
    ssize_t bytesRead;

    // 尝试从消息队列读取消息(阻塞)
    bytesRead = mq_receive(mq, buffer, MAX_MSG_SIZE, nullptr);
    if (bytesRead == -1) {
        perror("mq_receive in blocking mode");
        mq_close(mq);
        return 1;
    }
    std::cout << "Received in blocking mode: " << buffer << std::endl;


    // 获取当前属性
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == -1) {
        perror("mq_getattr");
        mq_close(mq);
        return 1;
    }

    // 切换到非阻塞通信阶段
    std::cout << "Switching to non-blocking mode." << std::endl;
    attr.mq_flags |= O_NONBLOCK;
    if (mq_setattr(mq, &attr, nullptr) == -1) {
        perror("mq_setattr");
        mq_close(mq);
        return 1;
    }

    // 尝试从消息队列读取消息(非阻塞)
    bytesRead = mq_receive(mq, buffer, MAX_MSG_SIZE, nullptr);
    if (bytesRead == -1) {
        if (errno == EAGAIN) {
            std::cout << "No message available in the queue in non-blocking mode." << std::endl;
        } else {
            perror("mq_receive in non-blocking mode");
        }
    } else {
        std::cout << "Received message in non-blocking mode: " << buffer << std::endl;
    }


    // 开始销毁消息队列
    std::cout << "开始销毁消息队列" << std::endl;

    // 关闭消息队列
    mq_close(mq);

    // 删除消息队列(可选)
    mq_unlink(QUEUE_NAME);

    return 0;
}

发送端代码

代码语言:javascript复制
 #include <iostream>
#include <fcntl.h>           // For O_* constants
#include <sys/stat.h>        // For mode constants
#include <mqueue.h>
#include <string.h>

const char* QUEUE_NAME = "/my_message_queue";
const int MAX_MSG_SIZE = 1024;

int main() {
    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, nullptr);
    if (mq == -1) {
        perror("mq_open");
        return 1;
    }

    // 阻塞通信阶段
    std::cout << "Starting in blocking mode." << std::endl;
    char buffer[MAX_MSG_SIZE];
    ssize_t bytesRead;


    // 发送消息到消息队列(阻塞)
    const char* messageToSend = "Hello from sender in blocking mode!";
    if (mq_send(mq, messageToSend, strlen(messageToSend), 0) == -1) {
        perror("mq_send in blocking mode");
        mq_close(mq);
        return 1;
    }

    // 获取当前属性
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == -1) {
        perror("mq_getattr");
        mq_close(mq);
        return 1;
    }

    // 切换到非阻塞通信阶段
    std::cout << "Switching to non-blocking mode." << std::endl;
    attr.mq_flags |= O_NONBLOCK;
    if (mq_setattr(mq, &attr, nullptr) == -1) {
        perror("mq_setattr");
        mq_close(mq);
        return 1;
    }


    // 发送消息到消息队列(非阻塞)
    const char* anotherMessageToSend = "Hello again in non-blocking mode!";
    if (mq_send(mq, anotherMessageToSend, strlen(anotherMessageToSend), 0) == -1) {
        if (errno == EAGAIN) {
            std::cout << "Queue is full. Cannot send message in non-blocking mode." << std::endl;
        } else {
            perror("mq_send in non-blocking mode");
        }
    }

    // 开始销毁消息队列
    std::cout << "开始销毁消息队列" << std::endl;

    // 关闭消息队列
    mq_close(mq);

    // 删除消息队列(可选)
    mq_unlink(QUEUE_NAME);

    return 0;
}

要运行这段代码实现进程间通信,可以按照以下步骤进行:

  1. 编译代码: 使用 C 编译器(如 g )编译代码。
代码语言:javascript复制
 g   mqsend.cpp -o mqsend g   mqrecv.cpp -o mqrecv

2. 运行程序: 打开两个终端窗口,分别代表两个不同的进程。 在一个终端中运行编译后的程序:

代码语言:javascript复制
 ./mqrecv

3. 在另一个终端中,稍等片刻后再次运行编译后的程序mqsend。这样两个进程就会尝试通过消息队列进行通信。

代码语言:javascript复制
 ./mqsend

十、Message too long 和 Invalid argument错误处理

10.1 Message too long

mq_recv出现如下错误 Message too long

添加图片注释,不超过 140 字(可选)

怎么解决的呢,大家对比一下前面 发送端案例 和接收端案例 时的 mq_open 。出现错误时, 发送端案例 和接收端案例 mq_open 设置都是如下。

代码语言:javascript复制
​int main() {
    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, nullptr);

接收端案例 mq_open设置修改为如下后,就可以跑了。据说是mq_maxmsg和mq_msgsize的设置导致的。

代码语言:javascript复制
int main() {

    in rc;
    struct mq_attr mqAttr;

    printf ("Bringing up server.n");
    rc = mq_unlink (QUEUE_NAME);
    if (rc < 0) {
        printf ("   Warning on server mq_unlink.n");
    }

    mqAttr.mq_maxmsg = 10;
    mqAttr.mq_msgsize = 1024;

    // 创建消息队列
    mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, S_IWUSR|S_IRUSR, &mqAttr);

10.2 Invalid argument

当我把mqAttr.mq_maxmsg = 10;改为mqAttr.mq_maxmsg = 100;就报下面的错

添加图片注释,不超过 140 字(可选)

改回去就好。

mq_maxmsg限定消息队列中的最大消息数,mq_msgsize限定每个消息的最大字节数。

10.3 数据不全和数据乱码

消息队列数据传输时,出现了数据不全或数据乱码的情况。

添加图片注释,不超过 140 字(可选)

添加图片注释,不超过 140 字(可选)

将代码中的sizeof修改为strlen就解决了。sizeof和strlen的区别参考这

获取char*字符串指针指向的数组长度时,记得用strlen,而不是sizeof-CSDN博客

十一、结果

当然在一个终端上可以收到另一个终端的消息啦

添加图片注释,不超过 140 字(可选)

queue - mq_receive:消息太长 - Stack Overflow

0 人点赞