前言
今年接触了一个策略类手游相关的项目,后端本身计划是使用skynet
进行开发的,后来结合项目的时间紧急程度和客户端开发组讨论后决定使用PHP进行快速开发,后期再使用其他语言框架进行拆分业务;综合考虑最后选用了webman作为主要开发框架。
整体项目分为配置服务
、HTTP-API
服务、websocket
服务三大部分,其中配置管理主要是兼容客户端生成的配置数据进行导入导出转换加载,底层使用MySQL
进行储存,多服务间使用Redis
进行一级缓存,服务进程间使用了基于APCu
的共享缓存,后期我将该共享缓存组件化也贡献给了社区。
【workbunny】共享高速缓存 https://www.workerman.net/plugin/133
Redis
在游戏开发界实际上使用Redis的情况还是比较多的,我们使用Redis主要还是为了将一些数据缓存共享给各个服务器实例:
代码语言:javascript复制 ┌─────┐ ┌─────┐
| A | ────────────> service <──────────── | B |
└─────┘ └─────┘
/ | / |
┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐
| a | | b | | c | ───────> instance <─────── | a | | b | | c |
└───┘ └───┘ └───┘ └───┘ └───┘ └───┘
| | | | | |
1|2 1|2 1|2 ────────> process <──────── 1|2 1|2 1|2
3|4 3|4 3|4 3|4 3|4 3|4
如图所示,我们分为A/B
区服,每个区服下可能存在abc不同的服务器实例,他们需要共享相同的区服配置;每个区服各自管理自己的数据库数据区域/数据库实例;每个区服下的服务器实例对于数据库数据的要求是强需求,且为变动较为频繁的数据内容,与web的微服务有区别,所以我们没有使用类似Nacos或者其他配置中心进行处理,从而用更适配当前场景的Redis作为缓存服务。
同时Redis也可以作为用户登录鉴权相关中的一环,也可以为运营相关功能提供一些辅助,比如使用Redis-Stream
作为消息队列,处理一些事件通知等。
共享内存
在游戏开发中,许多业务都是在内存中进行的计算处理,而我们上述的模式是多进程模式,进程间通讯是一个比较频繁出现的点;一开始解决这个问题是粗暴的将一些固定业务固定在对应的进程上执行,尽可能避免进程间的通讯问题。
后来随着业务逐步的扩大,单纯限制业务是没办法完全实现的,这时候有考虑过使用webman的插件channel
;但实际上channel
基于socket
涉及系统内核态用户态的拷贝等问题,同时受网络影响受限,在一些业务的计算处理上会带来比较高的延迟,包括Redis也同样是这样的问题,我们需要实现数据的零拷贝。
后续我们的目标锁定在了共享内存上,因为共享内存可以轻易的在进程间进行通讯交换,而且不存在深拷贝和网络等问题,效率、性能非常的高,整体微秒级别的响应满足我们的需求;于是我基于PHP的拓展APCu封装了适合我们业务场景的插件包进行使用。
webman-shared-cache
我们的基础应用实现了定时器来从MySQL数据库读取配置信息,定时器的处理器也在读取数据刷入Redis的同时触发共享内存的更新事件,上层业务通过更新事件的回调出发会将Redis的数据刷入共享内存中,以便当前区服实例的各个进程能够使用。
我们使用缓存的场景很多都是MAP数据,所以我在实现插件的时候特别实现了类似Redis-Hash相关的功能:HSet/HGet/HDel/HKeys/HExists
。
由于我们需要一些自增自减的运算,所以也实现了以下功能点:HIncr/HDecr
支持浮点运算。由于APCu的特性所以储存的数据也是支持储存对象数据的;
webman-shared-cache为何使用锁?
APCu(Alternative PHP Cache User Cache)是一个开放源代码的PHP缓存扩展,它提供了一种在PHP应用程序中存储和检索数据的快速方法。它是APC(Alternative PHP Cache)的继任者,专注于用户数据的缓存,而不是opcode缓存。
之前我有和社区的同学们聊过,他们不是很理解为什么我在实现插件的时候自己使用了锁,这是因为APCu本身的自行实现了对它自身函数的原子性操作,但我们使用它的时候是在多进程的环境下,每一个进程内存在多次APCu的操作,为了业务的原子性,我们希望这多次的操作要在一个原子性内完成,所以需要一个锁来进行隔离,以免在多进程的环境下被其他进程的操作污染,整体是类似MySQl
的事务的:
protected static function _HIncr(string $key, string|int $hashKey, int|float $hashValue = 1): bool|int|float
{
$func = __FUNCTION__;
$result = false;
$params = func_get_args();
self::_Atomic($key, function () use (
$key, $hashKey, $hashValue, $func, $params, &$result
) {
$hash = self::_Get($key, []);
if (is_numeric($v = ($hash[$hashKey] ?? 0))) {
$hash[$hashKey] = $result = $v $hashValue;
self::_Set($key, $hash);
}
return [
'timestamp' => microtime(true),
'method' => $func,
'params' => $params,
'result' => null
];
}, true);
return $result;
}
比如上述代码,就是一个Hash key的自增操作,我们需要在读取Hash后在写入,读取和写入应为一体的;
原子性执行函数Atomic
的实现如下:
/**
* 原子操作
* - 无法对锁本身进行原子性操作
* - 只保证handler是否被原子性触发,对其逻辑是否抛出异常不负责
* - handler尽可能避免超长阻塞
* - lockKey会被自动设置特殊前缀#lock#,可以通过Cache::LockInfo进行查询
*
* @param string $lockKey
* @param Closure $handler
* @param bool $blocking
* @return bool
*/
protected static function _Atomic(string $lockKey, Closure $handler, bool $blocking = false): bool
{
$func = __FUNCTION__;
$result = false;
if ($blocking) {
$startTime = time();
while ($blocking) {
// 阻塞保险
if (time() >= $startTime self::$fuse) {return false;}
// 创建锁
apcu_entry($lock = self::GetLockKey($lockKey), function () use (
$lockKey, $handler, $func, &$result, &$blocking
) {
$res = call_user_func($handler);
$result = true;
$blocking = false;
return [
'timestamp' => microtime(true),
'method' => $func,
'params' => [$lockKey, 'Closure'],
'result' => $res
];
});
}
} else {
// 创建锁
apcu_entry($lock = self::GetLockKey($lockKey), function () use (
$lockKey, $handler, $func, &$result
) {
$res = call_user_func($handler);
$result = true;
return [
'timestamp' => microtime(true),
'method' => $func,
'params' => [$lockKey, 'Closure'],
'result' => $res
];
});
}
if ($result) {
apcu_delete($lock);
}
return $result;
}
当使用阻塞模式的时候,我们会在当前进程内使用一个while循环来进行阻塞抢占,为了不将当前进程阻塞死,我们还加入了一个保险,由self::$fuse
提供;
注意
这里在实践过程中需要注意的是,Atomic在传入回调函数时切勿再使用匿名函数作为参数值或者是通过use传入一个匿名函数,如:
代码语言:javascript复制$fuc = function() {
// do something
}
Cache::Atomic('test', function () use ($fuc) {
// do anything
})
APCu底层会对函数参数值或引用参数进行序列化储存,但匿名函数不可以被序列化,所以会抛出一个异常;但你可以通过当前对象的属性值或者静态属性来保存一个匿名函数,然后在Atomic的回调内调用使用。
0.4.x版本
由于目前我使用Webman基于SQLite和共享内存在自行实现一个具备RAFT的轻调度服务插件和服务注册与发现插件,所以特此为其完善增加了Channel特性;
Channel可以辅助实现类似Redis-List、Redis-stream、Redis-Pub/Sub
的功能。
Channel
Channel是个特殊的数据格式,他的格式是固定如下的:
代码语言:javascript复制[
'--default--' => [
'futureId' => null,
'value' => []
],
workerId_1 => [
'futureId' => 1,
'value' => []
],
workerId_2 => [
'futureId' => 1,
'value' => []
],
......
]
它在共享内存中的键默认以**#Channel#**开头。
--default--
是默认储存空间,workerId_1/workerId_2 等是子通道储存空间,命名是由用户代码传入的,这里建议使用workerman自带的workerId即可。- 默认储存空间和子通道储存空间是互斥的,也就是说当存在子通道储存空间时,是不存在--default--的,反之亦然;子通道储存空间是当当前通道存在监听器时生成的,而在监听器产生前,消息会暂存在--default--空间,当监听器创建时,--default--的数据value会被同步到子通道储存空间内,加入value的队头。
- 每一个子通道储存空间的value都是拷贝的,存在相同的数据,各自监听器监听各自的子通道储存空间;消息的发布支持向所有子通道发布,也可以指定子通道进行发布。
- 监听器的底层使用了workerman的定时器,区别与workerman的timer,在event驱动下定时器的间隔是0,也就是一个future,而其他的事件驱动是0.001s为间隔。
实现一个List
由于监听器创建消费是基于workerId的,我们可以通过不同进程创建相同的workerId的监听器来对同一个子通道进行监听:
- A进程使用list作为workerId:
Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的业务逻辑
});
- B进程也同样创建list的workerId监听器:
Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的业务逻辑
});
- 此时Channel test的数据如下:
[
'list' => [
'futureId' => 1,
'value' => []
],
......
]
注意:共享内存中储存的futureId为最后一个监听器创建的futureId;当当前进程需要对监听器进行移除时,请勿使用该数据,对应进程内可以通过
Cache::ChCreateListener()
的返回值获取到当前进程创建的futureId用于移除监听器,不使用共享内存中储存的futureId即可
- 这时任意进程通过
Cache::ChPublish('test', '这是一个测试消息', true);
发送消息,或者指定workerIdCache::ChPublish('test', '这是一个测试消息', true, 'list');
。
实现一个Pub/Sub
- A进程使用workerman的workerId作为workerId:
Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的业务逻辑
});
- B进程使用workerman的workerId作为workerId:
Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
// TODO 你的业务逻辑
});
- 此时Channel test的数据可能如下:
[
1 => [
'futureId' => 1,
'value' => []
],
2 => [
'futureId' => 1,
'value' => []
]
]
- 这时,任意进程通过
Cache::ChPublish('test', '这是一个测试消息', false);
发送消息即可。
注:发送消息第三个参数使用false时,如发送时还未创建监听器,消息则不会储存至Channel,即监听后才可存在消息
实现类似Redis-stream
与Pub/Sub相同,只不过发布消息使用Cache::ChPublish('test', '这是一个测试消息', true);
, 当发布消息指定workerId时,可以实现类似Redis-Stream Group的功能。
注:这里更复杂的功能可能需要对workerId进行变通,不能简单使用workerman自带的workerId,只需要自行规划好即可