消息队列 是消息的链接表,存储内核中,由消息标识符标识。 --《UNIX环境高级编程》
简单理解,消息队列就是一堆消息的有序集合,并缓存于内核中。如此一来,多个进程就可通过访问内核来实现多个进程之间的通信。目前存在的消息队列有POSIX与System V标准的接口,本篇主要介绍System V接口的使用。
简介
消息队列的本质是位于内核空间的链表,其中每个节点都是一个独立的消息,每个消息都有类型,相同类型的消息组成一个链表。
当各种各样的消息发出时,就如同下图所示排列在内核空间中。形状看成消息的类型,相同的形状则表示相同的消息类型。
这些看似杂乱无章的消息,通过消息队列发出来后,根据其发送的类型与发送的时间,在接收端中则是有规律的排序。
如上图,内核中杂乱无章的消息,接收端可通过消息类型与发送的顺序来逐一接收处理。可通过消息类型查看指定类型的消息,若指定类型为0,则按时间顺序输出所有接收到的消息。
接口
主要用到msgget、msgsnd、msgrcv和msgctl四个接口。其使用方式man手册说明的比较清晰了,这里简单描述一下函数形式及功能。
msgget
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
主要功能是根据key值获取一个消息队列的ID。msgflag主要有两个值IPC_CREAT 和IPC_EXC,指的是需要新创建消息队列ID。
msgsnd、msgrcv
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);
msgsnd与msgrcv主要用于消息队列的发送与接收。这里需要注意的是发送的msgp一般定义为结构体,首个成员为long型,表示消息的类型。如此msgrcv通过指定msgtype来筛选出需要的消息。
msgctl
代码语言:javascript复制#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msgctl是用来控制消息队列的,其中cmd指进行的操作,buf记录了消息队列的信息。cmd:
- IPC_STAT: 将msg相关的内核信息存储到buf指向的msqid_ds 结构体中。调用者需拥有阅读权限才可读取。
- IPC_SET:该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid结构中;可设置属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,同时,也影响msg_ctime成员。
- IPC_RMID:删除msqid标识的消息队列
buf:
代码语言:javascript复制struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
如上信息可看到buf中存储了与消息队列相关的属性,设置cmd后,可通过buf拿到这些信息。
实例演示
功能: 用消息队列实现server接client的数据,server可筛选显示指定消息类型的数据。
效果:server接收所有消息:
server 筛选消息类型为2的数据:
注:代码里可将消息类型封装成枚举,此demo作为演示不做过多封装。
总结
消息队列在进程间通信的优势总结起来有以下几点:
- 缓存:数据较大的消息处理起来时间较长,此时将其写入消息队列更快,待系统空闲时再处理。提高系统任务执行效率。
- 送达:消息队列存储的消息,会一直保留在队列中直到消息被处理,且被取走后就会被队列释放。因此无论多少个进程在获取,每个消息仅会被处理一次。
- 排序:消息在队列中一直按照“先入先出”的顺序来执行。因此任务被处理的时序不会错乱。
- 异步:消息队列因为会缓存消息,且顺序处理不会丢失。因此多个进程可通过消息队列实现异步通信,互不阻塞。
代码
client.cpp
代码语言:javascript复制/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name : client.cpp
* Author :
* Version : V1.0
* Description :
* Journal : 2021-03-19 init v1.0
* Brief : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others :
********************************************************************************
*/
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <errno.h>
#include <string.h>
#include "common.h"
const char TEXT[2][50] = {"this is client1!", "this is client2!"};
int main(int argc, char *argv[])
{
int msg_id, key, ret = 0;
struct MsgFrame msg_buf = {0, {0}};
if (argc < 2) {
PRINT_ERR("usage: %s [msgid]n", argv[0]);
goto exit;
}
if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
PRINT_ERR("Params invalid!n");
goto exit;
}
/* Obtain the standard key according to the file path */
key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
if (key < 0) {
PRINT_ERR("ftok failed! errno = %d(%s)n", errno, strerror(errno));
goto exit;
}
msg_id = msgget(key, IPC_EXCL);
if (msg_id < 0) {
PRINT_ERR("msgget failed! errno = %d(%s)n", errno, strerror(errno));
goto exit;
}
do {
memset(&msg_buf, 0x00, sizeof(msg_buf));
#if 0
if (fgets(msg_buf.buffer, sizeof(msg_buf.buffer), stdin) == NULL) {
PRINT_ERR("scanf failed! errno = %d(%s)n", errno, strerror(errno));
goto exit_msgid;
}
#else
msg_buf.type = atoi(argv[1]);
strncpy(msg_buf.buffer, TEXT[msg_buf.type - 1], strlen(TEXT[msg_buf.type-1]));
#endif
ret = msgsnd(msg_id, &msg_buf, sizeof(msg_buf.buffer), IPC_NOWAIT);
if (ret < 0) {
PRINT_ERR("msgnd failed! errno = %d(%s)n", errno, strerror(errno));
goto exit_msgid;
} else {
PRINT_INFO("[Send %ld %ld] %sn", msg_buf.type,
strlen(msg_buf.buffer), msg_buf.buffer);
}
sleep(1);
} while (strncmp(msg_buf.buffer, "end", msg_buf.type) != 0);
exit_msgid:
ret = msgctl(msg_id, IPC_RMID, 0);
if (ret < 0) {
PRINT_ERR("msgctl failed! errno = %d(%s)n", errno, strerror(errno));
}
exit:
return 0;
}
server.cpp
代码语言:javascript复制/*
********************************************************************************
* Copyright (C) 2021, xiang.D <dx_65535@163.com>.
* All right reserved.
*
* File Name : server.cpp
* Author :
* Version : V1.0
* Description :
* Journal : 2021-03-21 init v1.0
* Brief : Blog address: https://blog.csdn.net/qq_38750572?spm=1001.2014.3001.5343
* Others :
********************************************************************************
*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <iostream>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include "common.h"
static int msg_id;
static void SignalHandler(int sig)
{
switch (sig)
{
case SIGSTOP:
if (msg_id != 0) {
msgctl(msg_id, IPC_RMID, 0);
}
break;
default:
break;
}
}
int main(int argc, char *argv[])
{
int key, ret = 0;
long msg_type = 0;
struct MsgFrame msg_recv = {0, {0}};
signal(SIGSTOP, SignalHandler);
if (argc == 1) {
msg_type = 0;
} else if (argc == 2) {
if (strspn(argv[1], "0123456789") != strlen(argv[1])) {
PRINT_ERR("Params invalid!n");
goto exit;
} else {
msg_type = atoi(argv[1]);
//PRINT_INFO("Receive msg type is %ldn", msg_type);
}
} else {
PRINT_ERR("Params invalidn");
goto exit;
}
key = ftok(MSGQ_FILE_PATH, MSGQ_ID);
if (key < 0)
{
PRINT_ERR("ftok failed! errno = %d(%s)n", errno, strerror(errno));
goto exit;
}
msg_id = msgget(key, IPC_CREAT|0666);
if (msg_id < 0) {
PRINT_ERR("msgget failed! errno = %d(%s)n", errno, strerror(errno));
goto exit;
}
do {
memset(msg_recv.buffer, 0x00, MAX_SIZE);
ret = msgrcv(msg_id,(void*)&msg_recv, MAX_SIZE, msg_type, 0);
if (ret != -1) {
PRINT_INFO("[Receive %ld %ld] %sn",
msg_recv.type, strlen(msg_recv.buffer), msg_recv.buffer);
} else {
PRINT_ERR("msgrcv failed!n");
}
} while (1);
exit:
PRINT_INFO("Exit from %sn", __func__);
return 0;
}
最后
用心感悟,认真记录,写好每一篇文章,分享每一框干货。愿每一篇文章不负自己,不负看客!