Linux进程间通信 消息队列

2021-07-30 15:38:22 浏览数 (1)

消息队列 是消息的链接表,存储内核中,由消息标识符标识。 --《UNIX环境高级编程》

简单理解,消息队列就是一堆消息的有序集合,并缓存于内核中。如此一来,多个进程就可通过访问内核来实现多个进程之间的通信。目前存在的消息队列有POSIX与System V标准的接口,本篇主要介绍System V接口的使用。

简介

消息队列的本质是位于内核空间的链表,其中每个节点都是一个独立的消息,每个消息都有类型,相同类型的消息组成一个链表。

当各种各样的消息发出时,就如同下图所示排列在内核空间中。形状看成消息的类型,相同的形状则表示相同的消息类型。

这些看似杂乱无章的消息,通过消息队列发出来后,根据其发送的类型与发送的时间,在接收端中则是有规律的排序。

如上图,内核中杂乱无章的消息,接收端可通过消息类型与发送的顺序来逐一接收处理。可通过消息类型查看指定类型的消息,若指定类型为0,则按时间顺序输出所有接收到的消息。

接口

主要用到msgget、msgsnd、msgrcv和msgctl四个接口。其使用方式man手册说明的比较清晰了,这里简单描述一下函数形式及功能。

msgget

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

主要功能是根据key值获取一个消息队列的ID。msgflag主要有两个值IPC_CREAT 和IPC_EXC,指的是需要新创建消息队列ID。

msgsnd、msgrcv

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
               int msgflg);

msgsnd与msgrcv主要用于消息队列的发送与接收。这里需要注意的是发送的msgp一般定义为结构体,首个成员为long型,表示消息的类型。如此msgrcv通过指定msgtype来筛选出需要的消息。

msgctl

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msgctl是用来控制消息队列的,其中cmd指进行的操作,buf记录了消息队列的信息。cmd:

  • IPC_STAT: 将msg相关的内核信息存储到buf指向的msqid_ds 结构体中。调用者需拥有阅读权限才可读取。
  • IPC_SET:该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid结构中;可设置属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,同时,也影响msg_ctime成员。
  • IPC_RMID:删除msqid标识的消息队列

buf:

代码语言:javascript复制
struct msqid_ds {
    struct ipc_perm msg_perm;     /* Ownership and permissions */
 time_t          msg_stime;    /* Time of last msgsnd(2) */
 time_t          msg_rtime;    /* Time of last msgrcv(2) */
 time_t          msg_ctime;    /* Time of last change */
 unsigned long   __msg_cbytes; /* Current number of bytes in
         queue (nonstandard) */
 msgqnum_t       msg_qnum;     /* Current number of messages
         in queue */
 msglen_t        msg_qbytes;   /* Maximum number of bytes
         allowed in queue */
 pid_t           msg_lspid;    /* PID of last msgsnd(2) */
 pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
};

如上信息可看到buf中存储了与消息队列相关的属性,设置cmd后,可通过buf拿到这些信息。

实例演示

功能: 用消息队列实现server接client的数据,server可筛选显示指定消息类型的数据。

效果:server接收所有消息:

server 筛选消息类型为2的数据:

注:代码里可将消息类型封装成枚举,此demo作为演示不做过多封装。

总结

消息队列在进程间通信的优势总结起来有以下几点:

  • 缓存:数据较大的消息处理起来时间较长,此时将其写入消息队列更快,待系统空闲时再处理。提高系统任务执行效率。
  • 送达:消息队列存储的消息,会一直保留在队列中直到消息被处理,且被取走后就会被队列释放。因此无论多少个进程在获取,每个消息仅会被处理一次。
  • 排序:消息在队列中一直按照“先入先出”的顺序来执行。因此任务被处理的时序不会错乱。
  • 异步:消息队列因为会缓存消息,且顺序处理不会丢失。因此多个进程可通过消息队列实现异步通信,互不阻塞。

代码

client.cpp

代码语言:javascript复制
/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name   : client.cpp
* Author      :
* Version     : V1.0
* Description :
* Journal     : 2021-03-19 init v1.0
* Brief       : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others      :
********************************************************************************
*/

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <errno.h>
#include <string.h>
#include "common.h"

const char TEXT[2][50] = {"this is client1!", "this is client2!"};

int main(int argc, char *argv[])
{
    int msg_id, key, ret = 0;
    struct MsgFrame msg_buf = {0, {0}};

    if (argc < 2) {
        PRINT_ERR("usage: %s [msgid]n", argv[0]);
        goto exit;
    }

    if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
        PRINT_ERR("Params invalid!n");
        goto exit;
    }

    /* Obtain the standard key according to the file path */
    key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
    if (key < 0) {
        PRINT_ERR("ftok failed! errno = %d(%s)n", errno, strerror(errno));
        goto exit;
    }

    msg_id = msgget(key, IPC_EXCL);
    if (msg_id < 0) {
        PRINT_ERR("msgget failed! errno = %d(%s)n", errno, strerror(errno));
        goto exit;
    }

    do {
        memset(&msg_buf, 0x00, sizeof(msg_buf));
#if 0
        if (fgets(msg_buf.buffer, sizeof(msg_buf.buffer), stdin) == NULL) {
            PRINT_ERR("scanf failed! errno = %d(%s)n", errno, strerror(errno));
            goto exit_msgid;
        }
#else
        msg_buf.type = atoi(argv[1]);
        strncpy(msg_buf.buffer, TEXT[msg_buf.type - 1], strlen(TEXT[msg_buf.type-1]));
#endif
        ret = msgsnd(msg_id, &msg_buf, sizeof(msg_buf.buffer), IPC_NOWAIT);
        if (ret < 0) {
            PRINT_ERR("msgnd failed! errno = %d(%s)n", errno, strerror(errno));
            goto exit_msgid;
        } else {
            PRINT_INFO("[Send %ld %ld] %sn", msg_buf.type,
                                        strlen(msg_buf.buffer), msg_buf.buffer);
        }
        sleep(1);
    } while (strncmp(msg_buf.buffer, "end", msg_buf.type) != 0);

exit_msgid:
    ret = msgctl(msg_id, IPC_RMID, 0);
    if (ret < 0) {
        PRINT_ERR("msgctl failed! errno = %d(%s)n", errno, strerror(errno));
    }
exit:
    return 0;
}

server.cpp

代码语言:javascript复制
/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name   : server.cpp
* Author      :
* Version     : V1.0
* Description :
* Journal     : 2021-03-21 init v1.0
* Brief       : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others      :
********************************************************************************
*/

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include "common.h"

static int msg_id;

static void SignalHandler(int sig)
{
    switch (sig)
    {
      case SIGSTOP:
        if (msg_id != 0) {
            msgctl(msg_id, IPC_RMID, 0);
        }
        break;
      default:
        break;
    }
}

int main(int argc, char *argv[])
{
    int key, ret = 0;
    long msg_type = 0;
    struct MsgFrame msg_recv = {0, {0}};

    signal(SIGSTOP, SignalHandler);

    if (argc == 1) {
       msg_type = 0;
    } else if (argc == 2) {
        if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
            PRINT_ERR("Params invalid!n");
            goto exit;
        } else {
            msg_type = atoi(argv[1]);
            //PRINT_INFO("Receive msg type is %ldn", msg_type);
        }
    } else {
        PRINT_ERR("Params invalidn");
        goto exit;
    }

    key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
    if (key < 0)
    {
        PRINT_ERR("ftok failed! errno = %d(%s)n", errno, strerror(errno));
        goto exit;
    }

    msg_id = msgget(key, IPC_CREAT|0666);
    if (msg_id < 0) {
        PRINT_ERR("msgget failed! errno = %d(%s)n", errno, strerror(errno));
        goto exit;
    }

    do {
        memset(msg_recv.buffer, 0x00, MAX_SIZE);
        ret = msgrcv(msg_id,(void*)&msg_recv, MAX_SIZE, msg_type, 0);
        if (ret != -1) {
            PRINT_INFO("[Receive %ld %ld] %sn",
                       msg_recv.type, strlen(msg_recv.buffer), msg_recv.buffer);
        } else {
            PRINT_ERR("msgrcv failed!n");
        }
    } while (1);

exit:
    PRINT_INFO("Exit from %sn", __func__);
    return 0;
}

最后

用心感悟,认真记录,写好每一篇文章,分享每一框干货。愿每一篇文章不负自己,不负看客!

0 人点赞