【Swoole系列3.6】进程同步与共享内存

2023-03-03 13:39:38 浏览数 (2)

进程同步与共享内存

通过前面几篇的学习,相信你已经对 Swoole 的进程有了一定的了解。不管是单进程还是进程池,我们都着重讲了进程间的通讯问题。毕竟对于进程来说,它们是内存隔离的,通讯相对来说就是一个很大的问题。而我们之前讲的内容其实都是不使用第三方工具来进行通信的,但其实更方便的方式是直接使用一些第三方工具做为中间的存储媒介,让不同的进程直接去读取这里的内容就可以实现通信的能力了。比如说我们最常见的就是使用 Redis ,不过即使是 Redis ,甚至是使用了连接池,也会有连接建立的过程,所以也并不是最高效的。今天,我们要学习的一个共享内存表格,是 Swoole 提供的一种更高效的数据同步方式。除此之外,我们还要学习另外两个非常常用的进程间同步功能,一个是无锁计数器,另一个就是进程锁。

进程同步

关于进程同步问题,在很早的时候我们就解释过了。当时是以全局变量来讲的,并且解释了为什么在 Swoole 中无法使用传统的那些全局常量。要在进程间实现类似的全局功能,除了后面要讲的 Table 或者外部第三方工具外,还有一些小工具也非常值得我们关注。

进程间无锁计数器(Atomic)

进程间无锁计数器,它是 Swoole 底层提供的一种原子计数操作类,可以方便实现整数的无锁原子增减。原子这个词是不是听着很熟悉?没错,它就是数据库中 ACID 的那个原子性。要么成功,要么失败,原子操作是不会被线程调度或者多进程打断的操作,一旦开始就要运行到结束。

原子计数器其实就是一个简单地放置在共享内存的一种带原子操作能力的计数器功能应用,它就是实现简单的加减赋值操作。

代码语言:javascript复制
$atomic = new SwooleAtomic();

(new SwooleProcess(function (SwooleProcess $worker) use ($atomic) {
   while($atomic->get() < 5){
       $atomic->add();
       echo "Atomic Now: {$atomic->get()}, pid: {$worker->pid}", PHP_EOL;
       sleep(1);
   }
   echo "Shutdown {$worker->pid}", PHP_EOL;

}))->start();

(new SwooleProcess(function (SwooleProcess $worker) use ($atomic) {
   while($atomic->get() < 10){
       $atomic->add();
       echo "Atomic Now: {$atomic->get()}, pid: {$worker->pid}", PHP_EOL;
       sleep(1);
   }
   echo "Shutdown {$worker->pid}", PHP_EOL;
}))->start();

SwooleProcess::wait();
SwooleProcess::wait();

// [root@localhost source]# php 3.6进程同步与共享内存.php
// Atomic Now: 1, pid: 1469
// Atomic Now: 2, pid: 1468
// Atomic Now: 3, pid: 1468
// Atomic Now: 4, pid: 1469
// Atomic Now: 5, pid: 1468
// Atomic Now: 6, pid: 1469
// Shutdown 1468
// Atomic Now: 7, pid: 1469
// Atomic Now: 8, pid: 1469
// Atomic Now: 9, pid: 1469
// Atomic Now: 10, pid: 1469
// Shutdown 1469

没什么特别的内容,你可以把 Atomic 对象就看成是一个 Int ,不过它比 Int 要小,只是 32 位的无符号整型。如果需要的数字很大的话,可以使用 SwooleAtomicLong ,这是 64 位的有符号整型对象。但是 Long 格式的对象不支持下面的 wait() 和 weakup() 操作。

代码语言:javascript复制
$atomic = new SwooleAtomic();

//$atomic->cmpset(0, 1);

(new SwooleProcess(function (SwooleProcess $worker) use ($atomic) {
   $atomic->wait(3);
   echo "Shutdown wait Process: {$worker->pid}", PHP_EOL;

}))->start();

(new SwooleProcess(function (SwooleProcess $worker) use ($atomic) {
   sleep(2);
   $atomic->wakeup();
//    $atomic->cmpset(0, 1);
   echo "Shutdown other Process: {$worker->pid}", PHP_EOL;
}))->start();

SwooleProcess::wait();
SwooleProcess::wait();

//  [root@localhost source]# php 3.6进程同步与共享内存.php
//  Shutdown other Process: 1511
//  Shutdown wait Process: 1510

这两个方法操作是什么意思呢?当 atomic 的值为 0 时,如果调用了 wait() 就会启动进入等待状态。等待什么呢?当调用了 weakup() 方法或者将 atomic 的值设置为 1 后,wait() 就会结束。如果一开始 atomic 的值就不为 0 的话,那么 wait() 方法也是不会起作用的。

在这段测试代码中,我们最后输出的结果是 other 先执行,也就是等待 2 秒后,调用了 weakup() 方法之后,前面那个在内部调用了 wait() 方法的进程才结束。wait() 的参数表示要等待多久,如果设置为 -1 的话就是永久等待,否则就是按参数值的秒数等待,超时后直接就不等了,继续运行。在这里,你可以测试在一开始就将 atomic 的值设为非 0 值,也就是注释中调用 cmpset() 方法的那一行打开。然后再运行的话,就会发现 wait() 不起作用了,直接第一个进程就运行完了。

其实,这个功能就可以实现一种锁的能力,但它并不是特别的灵活,毕竟一会要 wait() 一会要 weakup() 的,而且说不定哪里就直接把值改掉了。我们在 Swoole 中还有更方便的直接操作锁的功能,就是我们下面要讲的进程间锁。

进程间锁(Lock)

锁操作对于多进程、多线程相关的操作非常重要。为什么呢?并行执行代码最重要的一个问题就是可能同时去修改一个东西,有可能是数据库、有可能是内存数据、有可能是文件资源,本身我们使用的 MySQL 就有各种锁机制,同时 MySQL 也是以多线程的方式来处理的,包括事务处理机制、等级等等,都是为了解决高并发情况下数据读写出现的各种问题。这个以后我们学习 MySQL 相关的内容时再仔细全面地好好学习。而在程序代码中,内存操作和文件操作出现同时操作并且出现冲突是最常见的情况,比如说我们两个进程同时修改一个值,一个进程改为 2 ,一个进程改为 3 ,最后的结果哪个才对呢?

这种情况其实是要基于业务场景看的,它是累加变 3 了?还是 3 是代表一种状态?不管怎么样,两个进程的处理是应该有先后顺序的,不能让他们同时去操作,真正的同时操作所获得的结果就是模糊的,是无法被我们推演预知的。在这个时候,我们就可以为这种操作加锁,让同一时刻内只能有一个进程来操作这个资源,这样它的结果是确定,而不是模糊的。

代码语言:javascript复制
$lock = new SwooleLock();

(new SwooleProcess(function (SwooleProcess $worker) use ($lock) {
    echo "Process: {$worker->pid} Wait", PHP_EOL;
    $lock->lock();
    echo "Process: {$worker->pid} Locked", microtime(true), PHP_EOL;
    sleep(3);
    $lock->unlock();
    echo "Process: {$worker->pid} exit;", PHP_EOL;
}))->start();

(new SwooleProcess(function (SwooleProcess $worker) use ($lock) {
    sleep(1);
    echo "Process: {$worker->pid} Wait ", PHP_EOL;
    $lock->lock();
    echo "Process: {$worker->pid} Locked",microtime(true), PHP_EOL;
    $lock->unlock();
    echo "Process: {$worker->pid} exit;", PHP_EOL;
}))->start();

SwooleProcess::wait();
SwooleProcess::wait();

//[root@localhost source]# php 3.6进程同步与共享内存.php
//Process: 1611 Wait
//Process: 1611 Locked1640572026.9681
//Process: 1612 Wait
//Process: 1611 exit;
//Process: 1612 Locked1640572029.9771
//Process: 1612 exit;

在这段测试代码中,使用 SwooleLock 对象的 lock() 和 unlock() 方法来进行加锁和释放锁。第一个进程启动后加锁,然后休息 3 秒,第二个进程进来后也想加锁,但是第一个进程已经加上锁了,所以它要等待第一个进程释放掉锁,这里也可以看到是 3 秒后,第二个进程才获得了锁。

这一块学习过 C/C 或者 Java、Go 的同学应该很容易理解,如果有一些 IO 资源操作,特别是写数据这种,一定要加锁避免多个进程同时写数据产生混乱。同时,进程间锁是无法在协程中使用的,在这个锁中尽量不要使用协程相关的 API ,否则很容易产生 死锁

更多资料大家可以查阅官方文档以及搜索相关知识进行更加深入的学习了解。

共享内存(Table)

上面的无锁计数器和锁功能,其实都是为了进程间共享数据或者通信提供的一些功能。比如说只是简单地数字累加就完全可以使用计数器,而操作同一个句柄文件时,就加个锁,在这个文件中所有的进程都可以读取到它的数据。其实这样也是一种进程间通信和数据同步的方式。除了这些以外,Swoole 还原生提供了一个 Table 工具,它是直接基于共享内存和锁实现的超高性能内存数据结构。可以解决多进程/多线程数据共享和同步加锁的问题。

它的特点是性能强悍,内置行锁自旋锁(不需要再单独加锁操作),支持多进程,是进程间共享数据,并进行通信的强大工具。

代码语言:javascript复制
$table = new SwooleTable(1024);
$table->column('worker_id', SwooleTable::TYPE_INT);
$table->column('count', SwooleTable::TYPE_INT);
$table->column('data', SwooleTable::TYPE_STRING, 64);
$table->create();

$ppid = getmypid();
$table->set($ppid, ['worker_id'=>getmypid(), 'count'=>0, 'data'=>"这里是 " . $ppid]);


(new SwooleProcess(function (SwooleProcess $worker) use ($table) {
    $table->set($worker->pid, ['worker_id'=>$worker->pid, 'count'=>0, 'data'=>"这里是 {$worker->pid}"]);
    sleep(1);
    $table->incr($worker->pid, 'count');
    print_r($table->get($worker->pid));
}))->start();

(new SwooleProcess(function (SwooleProcess $worker) use ($table, $ppid) {
    $table->set($worker->pid, ['worker_id'=>$worker->pid, 'count'=>3, 'data'=>"这里是 {$worker->pid}"]);
    sleep(1);
    $table->decr($worker->pid, 'count');
    print_r($table->get($worker->pid));
    sleep(1);

    echo "{$worker->pid} 内部循环:", PHP_EOL;
    foreach($table as $t){
        print_r($t);
    }
    if($table->exist($ppid)){
        $table->del($ppid);
    }
}))->start();

SwooleProcess::wait();
SwooleProcess::wait();

echo "Talbe 数量:",$table->count(), PHP_EOL;
echo "主进程循环:", PHP_EOL;
foreach($table as $t){
    print_r($t);
}
echo "Table 状态:", PHP_EOL;
print_r($table->stats());

在实例化了 SwooleTable 对象之后,我们需要指定列的信息。实例化参数是 Table 的最大行数,这个行数不一定是准的,与预留的内存大小有关。注意,它不是动态分配内存的,是直接实例化的时候开辟一块固定的内容空间,需要前期规划好我们需要的内存空间。指定列的操作特别像是数据库建表的操作,这一步是可以方便地在内存中序列化数据。

然后,我们就可以通过 set() 方法设置每一行一行的数据。在不同的进程中,数据都是共享的,都是可以查看到的。

最后它还实现了迭代器相关的功能,可以 foreach() 遍历,也可以通过 count() 返回数量信息,通过 stats() 返回状态信息。

最后输出的结果应该是下面这样的。

代码语言:javascript复制
//  [root@localhost source]# php 3.6进程同步与共享内存.php
//  Array
//  (
//      [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//      [count] => 2
//      [data] => 这里是 1552
//  )
//  1552 内部循环:
//  Array
//  (
//      [worker_id] => 1550
//      [count] => 0
//      [data] => 这里是 1550
//  )
//  Array
//  (
//     [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//     [count] => 2
//     [data] => 这里是 1552
//  )
//  Talbe 数量:2
//  主进程循环:
//  Array
//  (
//      [worker_id] => 1551
//      [count] => 1
//      [data] => 这里是 1551
//  )
//  Array
//  (
//      [worker_id] => 1552
//      [count] => 2
//      [data] => 这里是 1552
//  )
//  Table 状态:
//  Array
//  (
//      [num] => 2
//      [conflict_count] => 0
//      [conflict_max_level] => 0
//      [insert_count] => 3
//      [update_count] => 2
//      [delete_count] => 1
//     [available_slice_num] => 204
//     [total_slice_num] => 204
//  )

总结

今天学习的内容是进程间同步相关的内容,有了这些其实进程间的通信及同步操作就方便了很多。但是需要注意的是,Atomic 和 Lock 如果在服务器应用中使用,不要在 onReceive 等回调函数中创建,否则内存可能会持续增长,也就是传说中的内存泄露溢出。为什么呢?其实就是因为它们是进程全局共享的,不回收,如果一直创建的话,进程不停止就会不停地创建,最后撑爆整个应用及服务器的物理内存。

好了,进程相关的基础内容其实我们已经学习得差不多了,接下来我们就进入下一个大的篇章,也就是协程相关内容的学习。

测试代码:

https://github.com/zhangyue0503/swoole/blob/main/3.Swoole进程/source/3.6进程同步与共享内存.php

参考文档:

https://wiki.swoole.com/#/process/process_pool

https://wiki.swoole.com/#/process/process_manager

0 人点赞