一,关于Linux中的IPC
IPC的意思是“ 进程间通信机制”,Linux内核有三种常用IPC对象可以拿来做进程间通信--消息队列,共享内存,信号量。这三种IPC对象在Linux内核中都以链表的形式存储,它们都有特定的ID来标识(消息队列标识符msqid、共享内存标识符shmid,信号量标识符semid)。
可以用ipcs指令查看当前的ipc对象的状态:
ipcs指令的常见用法:
代码语言:javascript复制ipcs -q //只查看消息队列
ipcs -s //只查看信号量
ipcs -m //只查看共享内存
ipcrm -q/s/m ipc_id //删除ID对应的IPC
ipcs -l //查看ipc的最大限制参数
每个 IPC 对象都对应一个ipc_perm结构体, ipc_perm结构体中保留了该ipc对象对应的一些权限信息:
代码语言:javascript复制struct ipc_perm {
key_t __key; /* Key supplied to semget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsignedshort mode; /* Read/write permission.*/
unsignedshort __seq; /* Sequence number */
};
参数uid/gid/mode是可以由函数msgctl()/semctl()/shmctl()来修改的。
给IPC生成一个ID标识,可以借助ftok()函数
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
--pathname:ipc用到的文件的文件名,且文件必须存在
--proj_id:生成IPC_ID时使用的子序号
返回:若成功,返回key_t类型的ID值。若失败,返回-1。
代码样例:
代码语言:javascript复制#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
int main()
{
key_t ipc_key;
ipc_key = ftok("/dev/null", 666);
printf ("ipc_key for process = %dn", ipc_key);
return 0;
}
运行结果:
代码语言:javascript复制ipc_key for process = 84214791
二,消息队列
概念:
消息队列是一种存放消息体的链表,提供了一个进程向另一个进程发送数据块的方式,这个数据块的类型可以指定。
示意图:
场景一,一个进程把消息体写入消息队列,另一个进程从消息队列读取。
场景二,一个进程把不同类型的消息体写入消息队列,多个进程按指定的类型读取不同的消息体。
消息队列与其他进程通信机制的比较:
与信号量相比,消息队列可以承载更多的通信数据。
与管道的默认接收相比,消息队列可以让接收进程有选择地接收通信数据,还可以设置接收的优先级。当使用消息队列的进程终止时,消息队列不会自动删除。但所有引用管道的进程终止时,管道会自动删除。
与共享内存相比,共享内存的速度更快,因为对共享内存的处理不经过内核调用,而消息队列需要经过内核调用。但是在多核系统上,为了避免产生高速缓存一致性问题,更推荐使用消息队列。
消息队列特点:
(1)消息队列可认为是全局的一个链表,由消息队列标识符进行标识。
(2)消息队列允许一个或多个进程写入或读取消息。
(3)消息队列的声明周期随内核。
(4)消息队列可以实现双向通信。
消息队列使用场景:
1.将业务拆解分离,拆分后的各个业务块儿可以用消息队列进行数据传输。
2.广播模式开发,主节点发布消息,从节点进行消息的订阅和获取。
3.作为缓冲区,将多次请求存起来然后统一处理,避免一个服务被多次请求,从而造成性能上的额外开销。
与消息队列关联的结构体--msqid_ds/mq_attr
msqid_ds常用于SYSTEM_V版的函数
代码语言:javascript复制struct msqid_ds{
struct ipc_perm msg_perm;
time_t msg_stime; //发送到队列的最后一个消息的时间戳
time_t msg_rtime; //从队列中获取的最后一个消息的时间戳
time_t msg_ctime; //对队列进行最后一次变动的时间戳
unsigned long __msg_cbytes; //在队列上所驻留的字节总数
msgqnum_t msg_qnum; //当前队列中消息的数量
msglen_t msg_qbytes; //当前队列允许的最大bytes数
pid_t msg_lspid; //发送最后一个消息的进程PID
pid_t msg_lrpid; //接收最后一个消息的进程PID
};
mq_attr常用于POSIX版的函数
代码语言:javascript复制struct mq_attr{
long mq_flags; //常见取值为0/O_NONBLOCK,0表示阻塞模式,O_NONBLOCK表示非阻塞模式
long mq_maxmsg; //消息队列的最大消息数
long mq_msgsize; //每个消息最大的字节数
long mq_curmsgs; //消息队列的当前消息个数
}
消息队列常用函数(SYSTEM_V版和POSIX版)
1.SYSTEM_V 版
msgget/msgsnd/msgrcv/msgctl
msgget:打开或创建一个消息队列
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
--key: 用来指定返回的消息队列ID
--msgflg:创建时的标志位,例如 IPC_CREAT
返回:若成功,返回消息队列的ID,若失败,返回-1。
msgget()返回消息队列ID后,后面的发生/接收操作都基于这个ID来进行。
key参数可以是IPC_PRIVATE(由系统来指定消息队列ID),或由ftok()创建的IPC_ID,或者可以手工指定。
使用msgget()打开一个现有的队列,msgflag参数指定为0
msgflag参数与操作文件时的传参类似,可以传 IPC_CREAT。msgflag为"IPC_CREAT | IPC_EXCL"时,如果消息队列不存在则新建一个,如果消息队列存在,则报错。
msgsnd:将新消息添加到队列末尾
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg);
--msgid:消息队列ID
--msgp:指向消息结构体的指针
--msgsz:要发送消息的长度
--msgflg:创建时的标志位
返回:若成功,返回0,若失败,返回-1。
msgp参数,指向一个包含消息的结构体,这个结构体除了第一个字段,其他字段都可以由开发者自定义。消息结构体的第一个字段必须是long int类型,接收函数用这个字段来确定消息的类型。
代码语言:javascript复制struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
msgsz参数表示除mtype之外msgbuf的大小,一般指mtext的长度。
msgsnd()会将msgp指向的消息结构体复制一份出来,追加到msgid指定的消息队列中。如果消息队列没有被塞满,msgsnd()会立即返回,如果消息队列已满,则函数会阻塞直到有空间可用。也可以在参数msgflg中加入IPC_NOWAIT,让msgsnd()从阻塞模式变为异步模式,如果消息队列已满,则报错返回。
msgrcv:从队列中取出消息
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msgid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
--msgid:消息队列ID
--msgp:指向消息结构体的指针,读到的消息存储于此
--msgsz:要接收消息的长度
--msgtyp:接收消息的方式
--msgflg:创建时的标志位
返回:若成功,返回实际读取的字节数,若失败,返回-1。
msgrcv会将读到的消息从指定队列中删除,并将其内容填到*msgp指定的缓存地址中。
msgctl:控制消息队列 & 获得或设置消息队列属性
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msgid, int cmd, struct msqid_ds *buf);
--msgid:消息队列ID
--cmd:对消息队列的操作命令,例如删除命令IPC_RMID
--buf:指向msqid_ds结构体的指针
返回:若成功,返回cmd执行后的结果,若失败,返回-1。
msgctl()常用的cmd:
代码语言:javascript复制IPC_STAT: 获取该消息队列的信息,获取到的信息会储存在结构体msqid_ds类型的buf中
IPC_SET: 设置消息队列的属性,要设置的属性需先在结构体msqid_ds类型的buf中存储,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode、msg_qbytes等。
IPC_RMID:删除消息队列,这个操作可以解除消息队列造成的进程阻塞
IPC_INFO:获得系统对消息队列做的限制
* 消息队列不会在程序退出后自动删除,需要在程序中使用msgctl()进行删除(cmd=IPC_RMID),或者使用命令ipcs -q查看,使用ipcrm -q <id>删除。
2.Posix版
mq_open/mq_close/mq_unlink/mq_send/mq_receive
mq_open:打开或创建一个消息队列
代码语言:javascript复制#include <mqueue.h>
typedef int mqd_t;
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
--name:消息队列名字
--oflag:访问权限位
--mode:用户相关的权限位,例如S_IRUSR,S_IWUSR等,如果oflag包含O_CREAT, 才需要指定mode参数;
--attr:指定新队列的属性,若attr为NULL,则使用默认属性
返回:若成功,返回消息队列描述符mqdes。若失败,返回-1。
oflag参数常见取值:
代码语言:javascript复制O_RDONLY--以只接收消息的形式打开消息队列
O_WRONLY--以只发送消息的形式打开消息队列
O_RDWR--以可接收可发送的形式打开消息队列
还有O_CREAT, O_EXCL, O_NONBLOCK
mq_close:关闭消息队列
代码语言:javascript复制#include <mqueue.h>
int mq_close(mqd_t mqdes);
--mqdes:要关闭消息队列的描述符
返回:若成功,返回0。若失败,返回-1。
mq_unlink:销毁消息队列
代码语言:javascript复制#include <mqueue.h>
int mq_unlink(const char *name);
--name:消息队列名字
返回:若成功,返回0。若失败,返回-1。
mq_send:向队列发送消息
代码语言:javascript复制#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
--msg_ptr: 指向需要发送的消息的指针
--msg_len: 消息长度
--msg_prio: 消息的优先级
返回:若成功,返回0。若失败,返回-1。
mq_receive:从队列接收消息
代码语言:javascript复制#include <mqueue.h>
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
--msg_ptr: 指向接收到的消息的指针
--msg_len: 消息长度
--msg_prio: 消息的优先级
返回:若成功,返回接收到消息的字节数。若失败,返回-1。
如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间或该调用被信号打断。如果消息队列为空, mq_receive()函数将阻塞,直到队列有新的消息被放进来。如果创建消息队列时,oflag传入了O_NONBLOCK,则两个函数不会阻塞,而是会立马报错返回。
mq_notify:为空队列中的消息建立异步通知机制
代码语言:javascript复制#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
返回:若成功,返回0。若失败,返回-1。
一次mq_notify注册只会触发一次异步事件,此后再遇到阻塞不会触发异步通知。
mq_getattr/mq_setattr:获取/设置消息队列的属性
代码语言:javascript复制#include <mqueue.h>
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
返回:若成功,返回0。若失败,返回-1。
代码样例:
Demo1:
producer.c
代码语言:javascript复制#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/msg.h>
#include<sys/ipc.h>
struct mymesg{
long int mtype;
char mtext[512];
};
int main()
{
int id = 0;
struct mymesg test_msg;
key_t key = ftok("/tmp",66);
id = msgget(key,IPC_CREAT | 0666);
if(id == -1)
{
printf("create msg error n");
return 0;
}
while(1)
{
char msg[512];
memset(msg,0,sizeof(msg));
test_msg.mtype = 1;
printf("input message:");
fgets(msg,sizeof(msg),stdin);
strcpy(test_msg.mtext,msg);
if(msgsnd(id,(void *)&test_msg,512,0) < 0)
{
printf("send msg error n");
return 0;
}
if(strncmp(msg,"QUIT",4) == 0)
break;
}
if(msgctl(id,IPC_RMID,NULL) < 0)
{
printf("del msg error n");
return 0;
}
return 0;
}
consumer.c
代码语言:javascript复制#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/msg.h>
#include<sys/ipc.h>
struct mymesg{
long int mtype;
char mtext[512];
};
int main()
{
int id = 0;
struct mymesg msg_test;
key_t key = ftok("/tmp",66);
id = msgget(key,0666|IPC_CREAT);
if(id == -1)
{
printf("open msg error n");
return 0;
}
while(1)
{
if(msgrcv(id,(void *)&msg_test,512,1,0) < 0)
{
printf("receive msg error n");
return 0;
}
printf("data:%sn",msg_test.mtext);
if(strncmp(msg_test.mtext,"QUIT",4) ==0)
break;
}
return 0;
}
Demo2:
代码语言:javascript复制#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main()
{
mqd_t mqID;
mqID = mq_open("/testmQueue", O_RDWR | O_CREAT, 0666, NULL);
if (mqID < 0)
{
cout<<"open message queue error..."<<strerror(errno)<<endl;
return -1;
}
mq_attr mqAttr;
if (mq_getattr(mqID, &mqAttr) < 0)
{
cout<<"get the message queue attribute error"<<endl;
return -1;
}
cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
}
运行结果:
代码语言:javascript复制mq_flags:0
mq_maxmsg:10
mq_msgsize:8192
mq_curmsgs:0
Demo3:
代码语言:javascript复制#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main()
{
mqd_t mqID;
mqID = mq_open("/mq_test", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);
if (mqID < 0)
{
if (errno == EEXIST)
{
mq_unlink("/mq_test");
mqID = mq_open("/mq_test", O_RDWR | O_CREAT, 0666, NULL);
}
else
{
cout<<"open message queue error..."<<strerror(errno)<<endl;
return -1;
}
}
if (fork() == 0)
{
mq_attr mqAttr;
mq_getattr(mqID, &mqAttr);
char *buf = new char[mqAttr.mq_msgsize];
for (int i = 1; i <= 3; i)
{
if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0)
{
cout<<"receive message failed. ";
cout<<"error info:"<<strerror(errno)<<endl;
continue;
}
cout<<"receive message "<<i<<": "<<buf<<endl;
}
return 0;
}
char msg[] = "Hello, Message Queue.";
for (int i = 1; i <= 3; i)
{
if (mq_send(mqID, msg, sizeof(msg), i) < 0)
{
cout<<"send message "<<i<<" failed. ";
cout<<"error info:"<<strerror(errno)<<endl;
}
cout<<"send message "<<i<<" success. "<<endl;
sleep(1);
}
return 0;
}
运行结果
代码语言:javascript复制send message 1 success.
receive message 1: Hello, Message Queue.
send message 2 success.
receive message 2: Hello, Message Queue.
send message 3 success.
receive message 3: Hello, Message Queue.
参考教程:
《UNIX环境高级编程第3版》
https://programs.team/linux-message-queue-programming.html
https://www.tutorialspoint.com/inter_process_communication/
https://programmer.ink/think/linux-message-queue-for-interprocess-communication.html