Linux内核编程--消息队列

2022-05-09 21:43:08 浏览数 (1)

一,关于Linux中的IPC

IPC的意思是“ 进程间通信机制”,Linux内核有三种常用IPC对象可以拿来做进程间通信--消息队列,共享内存,信号量。这三种IPC对象在Linux内核中都以链表的形式存储,它们都有特定的ID来标识(消息队列标识符msqid、共享内存标识符shmid,信号量标识符semid)。

可以用ipcs指令查看当前的ipc对象的状态:

ipcs指令的常见用法:

代码语言:javascript复制
ipcs -q  //只查看消息队列
ipcs -s  //只查看信号量
ipcs -m  //只查看共享内存
ipcrm -q/s/m ipc_id //删除ID对应的IPC
ipcs -l       //查看ipc的最大限制参数

每个 IPC 对象都对应一个ipc_perm结构体, ipc_perm结构体中保留了该ipc对象对应的一些权限信息:

代码语言:javascript复制
struct ipc_perm {
  key_t __key;                 /* Key supplied to semget(2) */
  uid_t uid;                   /* Effective UID of owner */
  gid_t gid;                   /* Effective GID of owner */
  uid_t cuid;                  /* Effective UID of creator */
  gid_t cgid;                  /* Effective GID of creator */
  unsignedshort mode;          /* Read/write permission.*/
  unsignedshort __seq;         /* Sequence number */
};

参数uid/gid/mode是可以由函数msgctl()/semctl()/shmctl()来修改的。

给IPC生成一个ID标识,可以借助ftok()函数

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id);

--pathname:ipc用到的文件的文件名,且文件必须存在
--proj_id:生成IPC_ID时使用的子序号

返回:若成功,返回key_t类型的ID值。若失败,返回-1。

代码样例:

代码语言:javascript复制
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>

int main()
{
  key_t ipc_key;
  ipc_key = ftok("/dev/null", 666);
  printf ("ipc_key for process = %dn", ipc_key);
  return 0;
}

运行结果:

代码语言:javascript复制
ipc_key for process = 84214791

二,消息队列

概念:

消息队列是一种存放消息体的链表,提供了一个进程向另一个进程发送数据块的方式,这个数据块的类型可以指定。

示意图:

场景一,一个进程把消息体写入消息队列,另一个进程从消息队列读取。

场景二,一个进程把不同类型的消息体写入消息队列,多个进程按指定的类型读取不同的消息体。

消息队列与其他进程通信机制的比较:

与信号量相比,消息队列可以承载更多的通信数据。

与管道的默认接收相比,消息队列可以让接收进程有选择地接收通信数据,还可以设置接收的优先级。当使用消息队列的进程终止时,消息队列不会自动删除。但所有引用管道的进程终止时,管道会自动删除。

与共享内存相比,共享内存的速度更快,因为对共享内存的处理不经过内核调用,而消息队列需要经过内核调用。但是在多核系统上,为了避免产生高速缓存一致性问题,更推荐使用消息队列。

消息队列特点:

(1)消息队列可认为是全局的一个链表,由消息队列标识符进行标识。

(2)消息队列允许一个或多个进程写入或读取消息。

(3)消息队列的声明周期随内核。

(4)消息队列可以实现双向通信。

消息队列使用场景:

1.将业务拆解分离,拆分后的各个业务块儿可以用消息队列进行数据传输。

2.广播模式开发,主节点发布消息,从节点进行消息的订阅和获取。

3.作为缓冲区,将多次请求存起来然后统一处理,避免一个服务被多次请求,从而造成性能上的额外开销。

与消息队列关联的结构体--msqid_ds/mq_attr

msqid_ds常用于SYSTEM_V版的函数

代码语言:javascript复制
struct msqid_ds{
    struct ipc_perm msg_perm;
    time_t msg_stime;               //发送到队列的最后一个消息的时间戳
    time_t msg_rtime;               //从队列中获取的最后一个消息的时间戳
    time_t msg_ctime;               //对队列进行最后一次变动的时间戳
    unsigned long __msg_cbytes;     //在队列上所驻留的字节总数
    msgqnum_t msg_qnum;             //当前队列中消息的数量
    msglen_t msg_qbytes;            //当前队列允许的最大bytes数
    pid_t msg_lspid;                //发送最后一个消息的进程PID
    pid_t msg_lrpid;                //接收最后一个消息的进程PID
};

mq_attr常用于POSIX版的函数

代码语言:javascript复制
struct mq_attr{
    long mq_flags;       //常见取值为0/O_NONBLOCK,0表示阻塞模式,O_NONBLOCK表示非阻塞模式
    long mq_maxmsg;      //消息队列的最大消息数
    long mq_msgsize;     //每个消息最大的字节数
    long mq_curmsgs;     //消息队列的当前消息个数
}

消息队列常用函数(SYSTEM_V版和POSIX版)

1.SYSTEM_V 版

msgget/msgsnd/msgrcv/msgctl

msgget:打开或创建一个消息队列

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

--key: 用来指定返回的消息队列ID
--msgflg:创建时的标志位,例如 IPC_CREAT

返回:若成功,返回消息队列的ID,若失败,返回-1。

msgget()返回消息队列ID后,后面的发生/接收操作都基于这个ID来进行。

key参数可以是IPC_PRIVATE(由系统来指定消息队列ID),或由ftok()创建的IPC_ID,或者可以手工指定。

使用msgget()打开一个现有的队列,msgflag参数指定为0

msgflag参数与操作文件时的传参类似,可以传 IPC_CREAT。msgflag为"IPC_CREAT | IPC_EXCL"时,如果消息队列不存在则新建一个,如果消息队列存在,则报错。

msgsnd:将新消息添加到队列末尾

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg);

--msgid:消息队列ID
--msgp:指向消息结构体的指针
--msgsz:要发送消息的长度
--msgflg:创建时的标志位

返回:若成功,返回0,若失败,返回-1。

msgp参数,指向一个包含消息的结构体,这个结构体除了第一个字段,其他字段都可以由开发者自定义。消息结构体的第一个字段必须是long int类型,接收函数用这个字段来确定消息的类型。

代码语言:javascript复制
struct msgbuf {
    long mtype;       /* message type, must be > 0 */
    char mtext[1];    /* message data */
};

msgsz参数表示除mtype之外msgbuf的大小,一般指mtext的长度。

msgsnd()会将msgp指向的消息结构体复制一份出来,追加到msgid指定的消息队列中。如果消息队列没有被塞满,msgsnd()会立即返回,如果消息队列已满,则函数会阻塞直到有空间可用。也可以在参数msgflg中加入IPC_NOWAIT,让msgsnd()从阻塞模式变为异步模式,如果消息队列已满,则报错返回。

msgrcv:从队列中取出消息

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msgid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

--msgid:消息队列ID
--msgp:指向消息结构体的指针,读到的消息存储于此
--msgsz:要接收消息的长度
--msgtyp:接收消息的方式
--msgflg:创建时的标志位

返回:若成功,返回实际读取的字节数,若失败,返回-1。

msgrcv会将读到的消息从指定队列中删除,并将其内容填到*msgp指定的缓存地址中。

msgctl:控制消息队列 & 获得或设置消息队列属性

代码语言:javascript复制
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf);

--msgid:消息队列ID
--cmd:对消息队列的操作命令,例如删除命令IPC_RMID 
--buf:指向msqid_ds结构体的指针

返回:若成功,返回cmd执行后的结果,若失败,返回-1。

msgctl()常用的cmd:

代码语言:javascript复制
IPC_STAT: 获取该消息队列的信息,获取到的信息会储存在结构体msqid_ds类型的buf中
IPC_SET: 设置消息队列的属性,要设置的属性需先在结构体msqid_ds类型的buf中存储,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode、msg_qbytes等。
IPC_RMID:删除消息队列,这个操作可以解除消息队列造成的进程阻塞
IPC_INFO:获得系统对消息队列做的限制

* 消息队列不会在程序退出后自动删除,需要在程序中使用msgctl()进行删除(cmd=IPC_RMID),或者使用命令ipcs -q查看,使用ipcrm -q <id>删除。

2.Posix版

mq_open/mq_close/mq_unlink/mq_send/mq_receive

mq_open:打开或创建一个消息队列

代码语言:javascript复制
#include <mqueue.h>
typedef int mqd_t;
mqd_t mq_open(const char *name, int oflag); 
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

--name:消息队列名字
--oflag:访问权限位
--mode:用户相关的权限位,例如S_IRUSR,S_IWUSR等,如果oflag包含O_CREAT, 才需要指定mode参数;
--attr:指定新队列的属性,若attr为NULL,则使用默认属性

返回:若成功,返回消息队列描述符mqdes。若失败,返回-1。

oflag参数常见取值:

代码语言:javascript复制
O_RDONLY--以只接收消息的形式打开消息队列
O_WRONLY--以只发送消息的形式打开消息队列
O_RDWR--以可接收可发送的形式打开消息队列
还有O_CREAT, O_EXCL, O_NONBLOCK

mq_close:关闭消息队列

代码语言:javascript复制
#include <mqueue.h>
int mq_close(mqd_t mqdes);

--mqdes:要关闭消息队列的描述符

返回:若成功,返回0。若失败,返回-1。

mq_unlink:销毁消息队列

代码语言:javascript复制
#include <mqueue.h>
int mq_unlink(const char *name);

--name:消息队列名字

返回:若成功,返回0。若失败,返回-1。

mq_send:向队列发送消息

代码语言:javascript复制
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

--msg_ptr: 指向需要发送的消息的指针
--msg_len: 消息长度
--msg_prio: 消息的优先级

返回:若成功,返回0。若失败,返回-1。

mq_receive:从队列接收消息

代码语言:javascript复制
#include <mqueue.h>
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

--msg_ptr: 指向接收到的消息的指针
--msg_len: 消息长度
--msg_prio: 消息的优先级

返回:若成功,返回接收到消息的字节数。若失败,返回-1。

如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间或该调用被信号打断。如果消息队列为空, mq_receive()函数将阻塞,直到队列有新的消息被放进来。如果创建消息队列时,oflag传入了O_NONBLOCK,则两个函数不会阻塞,而是会立马报错返回。

mq_notify:为空队列中的消息建立异步通知机制

代码语言:javascript复制
#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

返回:若成功,返回0。若失败,返回-1。

一次mq_notify注册只会触发一次异步事件,此后再遇到阻塞不会触发异步通知。

mq_getattr/mq_setattr:获取/设置消息队列的属性

代码语言:javascript复制
#include <mqueue.h>
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

返回:若成功,返回0。若失败,返回-1。

代码样例:

Demo1:

producer.c

代码语言:javascript复制
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/msg.h>
#include<sys/ipc.h>
struct mymesg{
  long int mtype;
  char mtext[512];
};
int main()
{
  int id = 0;
  struct mymesg test_msg;
  key_t key = ftok("/tmp",66);
  id = msgget(key,IPC_CREAT | 0666);
  if(id == -1)
  {
    printf("create msg error n");
    return 0;
  }
  while(1)
  {
    char msg[512];
    memset(msg,0,sizeof(msg));
    test_msg.mtype = 1;
    printf("input message:");
    fgets(msg,sizeof(msg),stdin);
    strcpy(test_msg.mtext,msg);
    if(msgsnd(id,(void *)&test_msg,512,0) < 0)
    {
      printf("send msg error n");
      return 0;
    }
    if(strncmp(msg,"QUIT",4) == 0)
      break;
  }
  if(msgctl(id,IPC_RMID,NULL) < 0)
  {
    printf("del msg error n");
    return 0;
  }
  return 0;
}

consumer.c

代码语言:javascript复制
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/msg.h>
#include<sys/ipc.h>
struct mymesg{
  long int mtype;
  char mtext[512];
};
int main()
{
  int id = 0;
  struct mymesg msg_test;
  key_t key = ftok("/tmp",66);
  id = msgget(key,0666|IPC_CREAT);
  if(id == -1)
  {
    printf("open msg error n");
    return 0;
  }
  while(1)
  {
    if(msgrcv(id,(void *)&msg_test,512,1,0) < 0)
    {
      printf("receive msg error n");
      return 0;
    }
    printf("data:%sn",msg_test.mtext);
    if(strncmp(msg_test.mtext,"QUIT",4) ==0)
      break;
  }
  return 0;
}

Demo2:

代码语言:javascript复制
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main()
{
    mqd_t mqID;
    mqID = mq_open("/testmQueue", O_RDWR | O_CREAT, 0666, NULL);
    if (mqID < 0)
    {
        cout<<"open message queue error..."<<strerror(errno)<<endl;
        return -1;
    }
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        cout<<"get the message queue attribute error"<<endl;
        return -1;
    }
    cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
    cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
    cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
    cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
}

运行结果:

代码语言:javascript复制
mq_flags:0
mq_maxmsg:10
mq_msgsize:8192
mq_curmsgs:0

Demo3:

代码语言:javascript复制
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main()
{
    mqd_t mqID;
    mqID = mq_open("/mq_test", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);
    if (mqID < 0)
    {
        if (errno == EEXIST)
        {
            mq_unlink("/mq_test");
            mqID = mq_open("/mq_test", O_RDWR | O_CREAT, 0666, NULL);
        }
        else
        {
            cout<<"open message queue error..."<<strerror(errno)<<endl;
            return -1;
        }
    }
    if (fork() == 0)
    {
        mq_attr mqAttr;
        mq_getattr(mqID, &mqAttr);
        char *buf = new char[mqAttr.mq_msgsize];
        for (int i = 1; i <= 3;   i)
        {
            if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0)
            {
                cout<<"receive message  failed. ";
                cout<<"error info:"<<strerror(errno)<<endl;
                continue;
            }
            cout<<"receive message "<<i<<": "<<buf<<endl;   
        }
        return 0;
    }
    char msg[] = "Hello, Message Queue.";
    for (int i = 1; i <= 3;   i)
    {
        if (mq_send(mqID, msg, sizeof(msg), i) < 0)
        {
            cout<<"send message "<<i<<" failed. ";
            cout<<"error info:"<<strerror(errno)<<endl;
        }
        cout<<"send message "<<i<<" success. "<<endl;   
        sleep(1);
    }

    return 0;
}

运行结果

代码语言:javascript复制
send message 1 success.
receive message 1: Hello, Message Queue.
send message 2 success.
receive message 2: Hello, Message Queue.
send message 3 success.
receive message 3: Hello, Message Queue.

参考教程:

《UNIX环境高级编程第3版》

https://programs.team/linux-message-queue-programming.html

https://www.tutorialspoint.com/inter_process_communication/

https://programmer.ink/think/linux-message-queue-for-interprocess-communication.html

0 人点赞