进程间通信--管道

2023-10-17 08:17:38 浏览数 (1)

一.通信

有时候我们需要多个进程协同的去完成某种任务,因此需要进程之间能够相互通信。但是进程之间具有独立性,要让进程之间能通信就要打破这种独立性,所以通信的代价一定是不低的。打破这种独立性就是要让两个不同的进程看到同一份资源,这个资源只能由操作系统来提供。因为如果是某个进程来提供因为独立性,这个资源就只能被提供这个资源的进程看到。

所以不同的通信种类本质是由操作系统的哪一个模块来提供这个资源,比如如果是文件系统来提供就是管道通信。

通信的目的是为了:

1.数据传输:一个进程需要将数据发送给另外一个进程 2.资源共享:多个进程之间共享同一份资源 3.事件通知:当某件事发生时要通知某个进程,比如当子进程退出时要通知父进程来回收资源 4.进程控制:有些进程希望控制另外一个进程,比如调试程序

通信的方式主要有三种:聚焦本地通信的System V(如共享内存),实现跨主机之间通信的POSIX,以及基于文件系统的管道通信。

二.管道

fork创建的子进程会拷贝父进程绝大多数的结构体,但不会将文件拷贝一份,也就是说父子进程可以看到同一份文件。而每一个文件都有它自己的缓冲区,这个文件的缓冲区不就是父子进程看到的同一份资源吗。用于通信的管道文件的本质是一个内存级的文件,它不需要有IO过程,一个进程向缓冲区写,一个进程向缓冲区中读,此时就完成了进程间的通信。只能一个进程写,一个进程读,所以管道是单项通信。此外管道文件的创建需要同时以读和写打开一个文件,因为如果是以只读或者只写方式打开,子进程也就只能继承只读或者只写,无法实现一个进程读一个进程写,也就无法通信,值得一提的是同时以读和写的方式打开一个文件,那么这个文件就会占用两个文件描述符

上图是让父进程写,子进程读。所以关闭了父进程的读端,子进程的写端。

匿名管道(只能用于有血缘关系的进程之间通信)

匿名管道没有名字,而是子进程通过继承父进程的文件描述符表让子进程得到这个文件的地址,所以匿名管道只能用于有血缘关系的进程之间的通信。

1.匿名管道的创建

创建管道文件需要使用系统调用pipe,这样就可以同时以读写方式同时打开一个文件。如果一个进程是用来读的,那么就要关闭它的写端,用来写就要关闭读端。因为一个管道文件只能由一个进程写一个进程读,如果你要让一个进程既能读又能写,那就只能建立两个管道了。

代码语言:javascript复制
#include <unistd.h>
int pipe(int pipefd[2]);
// On success, zero is returned.  On error, -1 is returned, and errno is set appropriately.

pipe的参数是一个输出型的参数,因为读端和写端各要占一个文件描述符,所以传入的参数要是一个有两个元素的数组,此后pipefd[0]就代表该文件的读端,pipefd[1]就代表该文件的写端。

代码语言:javascript复制
#include<iostream>
#include<cstdio>
#include<cstring>
#include<cstdlib>
#include<cassert>
#include<sys/types.h>
#include<sys/wait.h>
#include<string>
#include<unistd.h>
using namespace std;

//本段代码的目的:1.建立匿名管道(让父子进程共享(能看到)同一段资源) 2.利用这个内存级文件实现进程间通信(子进程写入,父进程读取)
//这段共享资源是由操作系统来建立的,因为进程具有独立性,如果由进程来建立,非此进程无法看到
//建立管道文件需要调用系统调用:int pipe(int pipefd[2]);这里的pipefd是一个输入型参数

int main()
{
    int fds[2];//建立匿名管道
    int n=pipe(fds);
    assert(n==0);//必须要创建成功

    pid_t id=fork();
    assert(id>=0);

    if(id==0)
    {
        //子进程读数据,首先要关闭写端,pipefd[0]就是读,pipefd[1]是写
        int cnt=0;
        close(fds[0]);
        while(true)
        {
            //sleep(1000);//子进程一直不写,让父进程一直读
             const char*s="hello parent, I am a child ";
             char buffer[1024];//建立一个临时缓冲区,先向缓冲区中写数据,在让系统调用将这个临时缓冲区的数据写到管道文件中
             snprintf(buffer,sizeof(buffer),"%d:子进程第(%d)次向父进程写入:%s",getpid(),cnt  ,s);//向临时缓冲区中写入数据
             write(fds[1],buffer,strlen(buffer));
             //cout<<"子进程第(%d)次写入"<<cnt<<endl;
             sleep(5);
        }

        //到这里就完成了数据的写入,为了不影响后面的代码,直接让子进程退出
        close(fds[1]);
        exit(0);

    }

    //这里就是父进程了,首先要回收子进程的资源,然后父进程是读端,所以要关闭写端
   
    close(fds[1]);
    
    while(true)
    {
        //读取
       // sleep(1000);//让子进程一直写,父进程一直不读
        char Readbuffer[1024];
        ssize_t len=read(fds[0],Readbuffer,sizeof(Readbuffer)-1);
        if(len>0)
        {
            Readbuffer[len]=0;//因为我输入的是字符串,所以在最后一个位置补,但其实操作系统不关心你输入的是什么类型的数据,它是按照字节流处理的

            cout<<getpid()<<"父进程读取完毕,内容为:"<<Readbuffer<<endl;
        }

    }
    int status=0;
    pid_t ret = waitpid(id,&status,0);
    close(fds[1]);
    return 0;
}
2.匿名管道的读取情况

1.在不关闭写端的情况下一直不向管道文件中写入,那么读端就会阻塞式读取(一定要读取到数据才会往下继续执行)

2.在不关闭读端的情况,一直向管道中写但不读取,文件的缓冲区满以后会一直等待读端来读取

3.在关闭写端的时候,一旦读端将缓冲区的数据读完就会读到0然后退出

4.在关闭读端的情况下,尝试用写端去写入会被操作系统发送信号杀死

3.管道的特征

1.只能用于具有血缘关系的进程之间的通信,是由父进程创建管道文件以后再调用fork创建子进程,让子进程继承父进程的文件描述符表使得父子进程能看到同一份文件

2.管道文件的生命周期随进程,进程销毁了管道文件也就被销毁了

3.管道提供的是字节流式

4.管道是半双工(单向通信)

5.管道有同步和互斥机制对共享资源进行保护

如何理解cat file|grep ”hello"? cat file会创建一个进程,这个进程会读取file文件并将读取到的内容写到到|管道文件中,grep也是一个进程,这个进程会到|管道文件中读取数据。无论是cat还是grep,它们都是操作系统的子进程。

4.基于匿名管道的简单进程池

设计一个由父进程负载均衡式的给子进程装载任务的简单进程池:

1.首先要让父进程创建一批管道和一批子进程,一个管道对应一个子进程 2.建立一批任务,将任务装载到一个函数指针数组中 3.将函数指针数组的下标作为数据写到管道文件中 4.让子进程去管道文件中读取code,再让子进程拿着code去函数指针数组中查找任务并执行 5.子进程结束后需要父进程回收资源

代码语言:javascript复制
#include<iostream>
#include<string>
#include<vector>
#include<cstring>
#include<cstdlib>
#include<cassert>
#include<unistd.h>
#include<fcntl.h>
#include<sys/types.h>
#include<sys/wait.h>
using namespace std;

#define PROCESS_NUM 5
#define MAKESEED() srand((unsigned long)time(nullptr)^getpid())
//写一个简单的进程池,装载n个任务,然后将n个任务随机(负载均衡)的派给我的子进程

typedef void(*func_t)();

//设置一些任务

void downloadTask()
{
    cout<<getpid()<<"这是一个下载任务"<<endl;
    sleep(1);
}

void IoTask()
{
    cout<<getpid()<<"这是一个IO任务"<<endl;
    sleep(1);
}

void flushTask()
{
    cout<<getpid()<<"这是一个刷新任务"<<endl;
    sleep(1);
}

void popTask()
{
    cout<<getpid()<<"这是一个删除任务"<<endl;
    sleep(1);
}

void loadTask(vector<func_t> *task)
{
    //要将任务装载到一个函数指针数组中,用vector存储
    assert(task);//不能为空
    //将任务函数插入到指针数组中
    task->push_back(downloadTask);
    task->push_back(IoTask);
    task->push_back(flushTask);
    task->push_back(popTask);

}


//上一个类,这个类中存放进程的名字,pid以及fds

class Proc
{
public:
    //只写一个构造函数即可
    Proc(pid_t id,int writefd)
    :_pid(id)
    ,_writefd(writefd)
    {
        //让名字有唯一标识,可以通过id和num来设置
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"proc,pid(%d),fd(%d),第(%d)次:",_pid,_writefd,num  );
        _name=buffer;
    }

public:
    static int num;
    string _name;
    pid_t _pid;
    int _writefd;
};
int Proc::num=0;

//子进程通过拿到一个整型变量code来去函数指针中查找对应的下标来执行不同的任务
//也就是说,父进程只需要向管道文件中写入一个整形code给子进程读取即可

void sendTask(const Proc &process, int taskNum)
{
   cout << "send task num: " << taskNum << " send to -> " << process._name << endl;
    int n = write(process._writefd, &taskNum, sizeof(taskNum));
    assert(n == sizeof(int));
    (void)n;
}

//从管道文件中读取数据
int getTask(int readFd)
{
    int code=0;
    ssize_t n=read(readFd,&code,sizeof(code));
    if(n==4)
        return code;
    else if(n<=0)
        return -1;
    else   
        return 0;
}

void CreateProcess(vector<Proc>*out,vector<func_t>&funcMap)
{
    //为了避免连续创建子进程时下一个子进程有上一个子进程管道文件的写端,建立一个关闭描述符的数组
    vector<int> deleteFd;
    //要创建子进程和建立管道,因为是父进程写,子进程读
    for(int i=0;i<PROCESS_NUM;i  )
    {
        int fds[2];
        int p=pipe(fds);
        assert(p==0);

        pid_t id=fork();
        assert(id!=-1);

        if(id==0)
        {
            //子进程,关闭写端
            close(fds[1]);
            for(int i=0;i<deleteFd.size();i  )
                close(deleteFd[i]);
            

            //读取是一种阻塞式的
            while(true)
            {
                int code = getTask(fds[0]);

                //拿到code判断是否合法,合法就去函数指针数组中找这个任务,然后执行
                if(code>=0&&code<funcMap.size())
                {
                    funcMap[code]();
                }
                else if(code==-1)
                {
                    break;
                }
            }

            //在子进程做完任务后,要将自己的读描述符关掉
           
            exit(0);
        }
         //父进程关闭读端
        close(fds[0]);
        Proc sub(id,fds[1]);//用当前这个子进程的id和写文件描述符来初始化一个Proc对象
        out->push_back(sub);//将这个对象插入到out数组中
        deleteFd.push_back(fds[1]);
    }

   
}

void loadbalance(const vector<Proc> &procmap,const vector<func_t> &funcmap,int count)
{
    //如何均衡的将任务分配给子进程呢?要么采用轮询的办法,要么采用随机数,这里采用随机数
    //在使用rand函数之前,首先要种随机数种子
    int procnum=procmap.size();
    int funcnum=funcmap.size();//为了随机数不越界我需要知道进程和任务的个数

   while(count--)
   {
     int procid=rand()%procnum;
     int taskid=rand()%funcnum;
     sendTask(procmap[procid],taskid);
     sleep(1);
   }

    //装载任务结束了以后,要让进程们退出就只要关闭写端,让读端读到0就行
    for(int i=0;i<procnum;i  )
        close(procmap[i]._writefd);
}

void waitprocess(const vector<Proc>& processmap)
{
    for(int i=0;i<processmap.size();i  )
    {
        waitpid(processmap[i]._pid,nullptr,0);
        cout<<"等待成功:"<<processmap[i]._pid<<endl;
    }
}

int main()
{
    //上来直接种下随机数种子
    MAKESEED();
    //要有一个数组存放子进程和管道文件的映射关系
    vector<Proc> procMap;
    //要有一个数组存放函数指针
    vector<func_t>funcMap;
    loadTask(&funcMap);//把任务装载到函数指针数组中方便后面子进程使用
    
    //首先我要创建管道和子进程
    CreateProcess(&procMap,funcMap);
    
    int cnt=3;//表示我需要装载多少个任务
    //然后我要将任务均衡分配给每一个子进程
    loadbalance(procMap,funcMap,cnt);

    //回收子进程
    waitprocess(procMap);
    return 0;
}

写这样的代码很容易存在一个这样的问题:

因为子进程会拷贝父进程的文件描述符表,也就是说当父进程创建一个管道文件后,假设写端是3文件描述符,此时我再创建一个子进程,此时子进程的文件描述符表中的3也会指向那个管道文件,也就说这个管道文件的写端被两个进程所指向了,当我关闭父进程的写端后,我所期望的是子进程读到0,然后退出;但是由于还有其他进程的指向这个管道文件,所以该子进程无法直接读到0,此时子进程就会阻塞式的等待读。

解决办法:

建立一个vector数组,每当我创建一个管道文件,就将这个管道文件的写端描述符插入到这个vector数组中,然后在子进程中关闭这个文件描述符对应的文件。因为进程具有独立性,所以在子进程中关闭并不会影响父进程。这样就又回到只有一个进程指向管道文件的写端,一个进程指向管道文件的读端,这时当我关闭父进程的写端时,子进程就可以通过读到0而退出了。

有名管道(用于没有血缘关系的进程间的通信)

如果要在两个毫无关系的进程之间通信就需要使用有名管道,因为有名管道有名字,所以它的唯一标识就是路径 文件名(匿名管道的唯一标识是地址)。让两个毫无关联的进程打开同一个文件,一个写一个读,这就是有名管道。

1.有名管道的建立和删除

有名管道的通过调用mkfifo来实现,删除使用unlink

2.通过一段程序来了解有名管道

其实有名管道就是两个进程去打开同一个文件,这个文件不需要IO,是一个内存级文件,因为文件是被进程所共享的,所以文件发生变化的时候,进程可以感知到

下面通过客户端向往文件中写入数据,服务端从文件中读取数据来感受命名管道:

1.name_pipe.hpp

代码语言:javascript复制
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define NAMED_PIPE "/tmp/mypipe.106"

bool createFifo(const std::string &path)
{
    umask(0);
    int n = mkfifo(path.c_str(), 0600);
    if (n == 0)
        return true;
    else
    {
        std::cout << "errno: " << errno << " err string: " << strerror(errno) << std::endl;
        return false;
    }
}

void removeFifo(const std::string &path)
{
    int n = unlink(path.c_str());
    assert(n == 0); // debug , release 里面就没有了
    (void)n;
}

2.client.cc

代码语言:javascript复制
#include"name_pipe.hpp"

int main()
{
    std::cout << "client begin" << std::endl;
    int wfd = open(NAMED_PIPE, O_WRONLY);
    std::cout << "client end" << std::endl;
    if(wfd < 0) exit(1); 

    //write
    char buffer[1024];
    while(true)
    {
        std::cout << "Please Say# ";
        fgets(buffer, sizeof(buffer), stdin); // abcdn
        if(strlen(buffer) > 0) buffer[strlen(buffer)-1] = 0;
        ssize_t n = write(wfd, buffer, strlen(buffer));
        assert(n == strlen(buffer));
        (void)n;
    }

    close(wfd);
    
    return 0;
}

3.server.cc

代码语言:javascript复制
#include"name_pipe.hpp"

//这个是服务端,只要负责读取就行,这和从文件中读取数据差不多

int main()
{
    bool r = createFifo(NAMED_PIPE);
    assert(r);
    (void)r;

    std::cout << "server begin" << std::endl;
    int rfd = open(NAMED_PIPE, O_RDONLY);
    std::cout << "server end" << std::endl;
    if(rfd < 0) exit(1);

    //read
    char buffer[1024];
    while(true)
    {
        ssize_t s = read(rfd, buffer, sizeof(buffer)-1);
        if(s > 0)
        {
            buffer[s] = 0;
            std::cout << "client->server# " << buffer << std::endl;
        }
        else if(s == 0)
        {
            std::cout << "client quit, me too!" << std::endl;
            break;
        }
        else
        {
            std::cout << "err string: " << strerror(errno) << std::endl;
            break;
        }
    }

    close(rfd);

    // sleep(10);
    removeFifo(NAMED_PIPE);
    return 0;
}

4.结果展示:

0 人点赞