从源码说swoole进程间通信原理

2020-12-07 15:32:22 浏览数 (1)

  1. 本文件假设你有c 和多进程编程的基础知识。
  2. swoole进程间通信可以使用套接字(swoole_process::write/ swoole_process::read),也可以使用消息队列(push/pop)。本文的只讲述套接字通信部分。
  3. 本文使用的swoole源码为1.9版本

1. swoole_process中的__construct和start究竟做了什么

为了说明swoole进程间是如何使用unix socket进行通信的,我们先从源码入手,看看__construct和start函数究竟做了些什么。对于源码,我们只选取和本问题相关的部分进行解读。

1.1 __construct

代码语言:javascript复制
swoole_process.c 
static PHP_METHOD(swoole_process, __construct)
{
    /*pipe_type即是__constrct的第三参数$create_pipe, 默认为2*/
    long pipe_type = 2;

    if (pipe_type > 0)
    {
        swPipe *_pipe = emalloc(sizeof(swWorker));
        int socket_type = pipe_type == 1 ? SOCK_STREAM : ,SOCK_DGRAM;
        /*创建pipe, 本质是套接字*/
        if (swPipeUnsock_create(_pipe, 1, socket_type) < 0)
        {
            RETURN_FALSE;
        }

        process->pipe_object = _pipe;
        /*获取主进程用于读写的文件描述符*/
        process->pipe_master = _pipe->getFd(_pipe, SW_PIPE_MASTER);
        /*获取工作进程用于读写的文件描述符*/
        process->pipe_worker = _pipe->getFd(_pipe, SW_PIPE_WORKER); 
        process->pipe = process->pipe_master;
        /*对当前swoole_process对象(php中new出的)的属性值pipe进行赋值*/
        zend_update_property_long(swoole_process_class_entry_ptr, getThis(), ZEND_STRL("pipe"), process->pipe_master TSRMLS_CC)
}

swPipeUnsock_create做了什么呢?

代码语言:javascript复制
pipe/PipeUnsock.c 
int swPipeUnsock_create(swPipe *p, int blocking, int protocol)
{

    p->blocking = blocking;
    /*创建本地套接字*/
    ret = socketpair(AF_UNIX, protocol, 0, object->socks);
    if (ret < 0)
    {
        swWarn("socketpair() failed. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    else
    {
        /*
         * swoole_config.h:#define SW_SOCKET_BUFFER_SIZE      (8*1024*1024)
         * 由此可知套接字buffer大小为8M
         */
        int sbsize = SwooleG.socket_buffer_size;
        swSocket_set_buffer_size(object->socks[0], sbsize);
        swSocket_set_buffer_size(object->socks[1], sbsize);
    }

    return 0;
}

1.2 start

代码语言:javascript复制
swoole_process.c
static PHP_METHOD(swoole_process, start)
{
    swWorker *process = swoole_get_object(getThis());

    /*
     * 创建进程
     * 注意:
     * 1. 创建后父子进程中各有一个swoole_process对象,它们是双胞胎。
     * 2. 子进程继承父进程打开的文件描述符,所以之前__construct时创建的套接子,子进程中也有一份
     */

    pid_t pid = fork();
    if (pid < 0)
    {
        swoole_php_fatal_error(E_WARNING, "fork() failed. Error: %s[%d]", strerror(errno), errno);
        RETURN_FALSE;
    }
    /*主进程逻辑*/
    else if (pid > 0)
    {
        process->pid = pid;
        process->child_process = 0;
        zend_update_property_long(swoole_server_class_entry_ptr, getThis(), ZEND_STRL("pid"), process->pid TSRMLS_CC);
        RETURN_LONG(pid);
    }
    /*子进程逻辑*/
    else
    {
        process->child_process = 1;
        SW_CHECK_RETURN(php_swoole_process_start(process, getThis() TSRMLS_CC));
    }
    RETURN_TRUE;
}

子进程逻辑在php_swoole_process_start()中执行,我们继续看

代码语言:javascript复制
swoole_process.c
int php_swoole_process_start(swWorker *process, zval *object TSRMLS_DC)
{
    /*
     * 设置子进程中的swoole_process用于通信的描述符
     * 对比前文,主进程中的swoole_process使用的是process->pipe_master
     */
    process->pipe = process->pipe_worker;
    process->pid = getpid();

    /*更新子进程中swoole_process(php对象)的相应属性*/
    zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pid"), process->pid TSRMLS_CC);
    zend_update_property_long(swoole_process_class_entry_ptr, object, ZEND_STRL("pipe"), process->pipe_worker TSRMLS_CC);
}

可见 __construt的主要工作是使用socketpair创建一对套接字,并指定主进程中的swoole_process对象用于读写的套接字。 start的主要工作是,创建子进程,设置子进程中的swoole_process对象用于读写的套接字。

2. 通信原理

2.1 read/write源码解读

代码语言:javascript复制
swoole_process.c
static PHP_METHOD(swoole_process, read)
{
    /*默认每次读写大小*/
    long buf_size = 8192;

    /*上限值*/
    if (buf_size > 65536)
    {
        buf_size = 65536;
    }

    swWorker *process = swoole_get_object(getThis());

    char *buf = emalloc(buf_size   1);
    /*从进程中保留的套接字中读取数据*/
    int ret = read(process->pipe, buf, buf_size);;

    /*设置php函数swoole_process->read返回值*/
    SW_ZVAL_STRINGL(return_value, buf, ret, 0);
}
代码语言:javascript复制
swoole_process.c
static PHP_METHOD(swoole_process, write)
{
    int ret;

    /*以下两种情况的本质都是调用write函数向process-pipe中写入数据*/
    //async write
    if (SwooleG.main_reactor)
    {
        ret = SwooleG.main_reactor->write(SwooleG.main_reactor, process->pipe, data, (size_t) data_len);
    }
    else
    {
        ret = swSocket_write_blocking(process->pipe, data, data_len);
    }

    ZVAL_LONG(return_value, ret);
}

2.2 通信原理总结

swoole进程间使用套接字通信的原理如下: 1. 父进程使用socketpair创建一对套接字 2. 创建子进程时,子进程继承了这对套接字 3. 父子进程使用系统的read,write函数对各自的套接字进行读写完成通信。 4. 对于多个子进程,父进程其实是为每个子进程创建一对套接字用于通信。 5. 子进程之间的通信,比如A向B发消息,本质是fork A进程时,A从父进程处继承了向B发消息的套接字,从而完成了向B的通信。

3.关于SOCK_STREAM与SOCK_DGRAM

3.1 手册中的一个错误

手册中说:默认的方式是流式。但从1.1节的__construct源码中我们可以看到,默认使用的是SOCK_DGRAM方式

3.2 SOCK_STREAM与SOCK_DGRAM的区别

此参数经__construct的第三参数传入,最终作用于socketpair的protocol字段

代码语言:javascript复制
ret = socketpair(AF_UNIX, protocol, 0, object->socks);

在通常意义来说SOCK_STREAM与SOCK_DGRAM分别用于tcp通信和udp通信,前者有序(先发先至),可靠;后者不保证顺序及数据可靠性。但在本地套接字中,由于是本机两进程通信,不会涉及数据丢失,乱序等问题。那么这两个参数的区别在哪呢? 下面是我看到的一个非常清晰明了的解释:

The difference between SOCK_STREAM and SOCK_DGRAM is in the semantics of consuming data out of the socket. Stream socket allows for reading arbitrary number of bytes, but still preserving byte sequence. In other words, a sender might write 4K of data to the socket, and the receiver can consume that data byte by byte. The other way around is true too - sender can write several small messages to the socket that the receiver can consume in one read. Stream socket does not preserve message boundaries. Datagram socket, on the other hand, does preserve these boundaries - one write by the sender always corresponds to one read by the receiver (even if receiver’s buffer given to read(2) or recv(2) is smaller then that message).

也就是说SOCK_STREAM是流式的,数据没有消息边界,发送方多次写入的数据,可能读取方一次就读取了。发送一次写入的数据,读取方可能分多次才读完。回到swoole意味着这种方式下,write与read的次数并不是一一对应的。你需要自己设置边界来切分消息。

SOCK_DGRAM方式,数据天然是有边界的,读写次数一定是一一对应的。回到swoole,意味着这种方式下,只要你的单条消息不超单次读写上限(默认8192字节),就不需要自行设置边界来切分消息。

看一个例子

代码语言:javascript复制
<?php                                                                                 
$process1 = new swoole_process(function($process){                                    
    $i = 1;                                                                           
    while (true) {                                                                    
        $msg = $process->read();                                                      
        echo $msg,"n";                                                               
        echo "read $i timen";                                                        
        $i  ;                                                                         
    }                                                                                 
}, false, 1);                                                                         

$num = 10;                                                                            

$process2 = new swoole_process(function($process) use ($process1, $num){              
    for ($i=0; $i<$num; $i  ){                                                        
        $msg = $process1->write("hello 1 i'm 2;");                                    
        if ($i % 5 == 0){                                                             
            sleep(1);                                                                 
        }                                                                             
    }                                                                                 
});

$process2->start();                                                                   
$process1->start();  

new第三参数设为1,使用SOCK_STREAM通信。运行结果如下:

代码语言:javascript复制
hello 1 i'm 2;
read 1 time
hello 1 i'm 2;hello 1 i'm 2;hello 1 i'm 2;
read 2 time
hello 1 i'm 2;hello 1 i'm 2;
read 3 time
hello 1 i'm 2;hello 1 i'm 2;
read 4 time
hello 1 i'm 2;hello 1 i'm 2;
read 5 time

process2向process1写了10次数据,process1用了5次读完 将new第三参数设为2,使用SOCK_DGRAM通信。运行结果如下:

代码语言:javascript复制
hello 1 i'm 2;
read 1 time
hello 1 i'm 2;
read 2 time
hello 1 i'm 2;
read 3 time
hello 1 i'm 2;
read 4 time
hello 1 i'm 2;
read 5 time
hello 1 i'm 2;
read 6 time
hello 1 i'm 2;
read 7 time
hello 1 i'm 2;
read 8 time
hello 1 i'm 2;
read 9 time
hello 1 i'm 2;
read 10 time

10次写对应10次读。

0 人点赞