PHP异步非阻塞MySQL客户端连接池

2024-09-10 21:03:09 浏览数 (1)

概述

AMPHP是一个事件驱动的PHP库集合,设计时考虑了纤程和并发性。amphp/mysql是一个异步MySQL客户端。该库通过在可用连接的可伸缩池中透明地分发查询来实现并发查询。客户端透明地将这些查询分布在一个可扩展的可用连接池中,并使用100%的用户态PHP,没有外部扩展依赖性(例如ext/mysqliext/pdo等)。

特征

  • 公开一个非阻塞API,用于并发发出多个MySQL查询
  • 透明的连接池克服了MySQL的基本同步连接协议
  • MySQL传输编码支持(gzip,TLS加密)
  • 支持参数化预处理语句
  • 带有提交和回滚事件钩子的嵌套事务
  • 无缓冲结果以减少大型结果集的内存使用
  • 完整的MySQL协议支持,包括所有可用的异步命令

安装

此包可以作为Composer依赖项安装

代码语言:javascript复制
composer require amphp/mysql

使用

入门使用

代码语言:javascript复制
<?php
/**
 * @desc mysql.php
 * @author Tinywan(ShaoBo Wan)
 * @date 2024/8/16 11:19
 */
declare(strict_types=1);

require 'vendor/autoload.php';

use AmpMysqlMysqlConfig;
use AmpMysqlMysqlConnectionPool;

$config = MysqlConfig::fromString(
    "host=127.0.0.1 user=root password=123456 db=test"
);

$pool = new MysqlConnectionPool($config);

$statement = $pool->prepare("SELECT * FROM mall_member WHERE member_time = :member_time Limit 10");
$timeOne = microtime(true);
$result = $statement->execute(['member_time' => 0]);
foreach ($result as $key => $row) {
    echo '[x] ['.$key.'] '.$row['member_name'].PHP_EOL;
}
$timeTwo = microtime(true);

echo '[x] Run Time Result : ' . ($timeTwo - $timeOne) . PHP_EOL;

执行结果

代码语言:javascript复制
[x] [0] 12161435
[x] [1] 开源技术小栈
[x] [2] 12161435
[x] [3] 12161435
[x] [4] T1800082
[x] [5] 12161435
[x] [6] 12161435
[x] [7] 12161387
[x] [8] 12161235
[x] [9] 12161149
[x] Run Time Result : 0.045973062515259

迭代器

代码语言:javascript复制
<?php

require 'support/bootstrap.php';

use AmpFuture;
use AmpMysqlMysqlConfig;
use AmpMysqlMysqlConnectionPool;
use function Ampasync;

$db = new MysqlConnectionPool(MysqlConfig::fromAuthority(DB_HOST, DB_USER, DB_PASS, DB_NAME));

$db->query("DROP TABLE IF EXISTS tmp");

/* Create table and insert a few rows */
/* we need to wait until table is finished, so that we can insert. */
$db->query("CREATE TABLE IF NOT EXISTS tmp (a INT(10), b INT(10))");

print "Table successfully created." . PHP_EOL;

$statement = $db->prepare("INSERT INTO tmp (a, b) VALUES (?, ? * 2)");

$future = [];
foreach (range(1, 5) as $num) {
    $future[] = async(fn () => $statement->execute([$num, $num]));
}

/* wait until everything is inserted */
$results = Futureawait($future);

print "Insertion successful (if it wasn't, an exception would have been thrown by now)" . PHP_EOL;

$result = $db->query("SELECT a, b FROM tmp");

foreach ($result as $row) {
    var_dump($row);
}

$db->query("DROP TABLE tmp");

$db->close();

事务支持

代码语言:javascript复制
<?php

require 'support/bootstrap.php';
require 'support/generic-table.php';

use AmpMysqlMysqlConfig;
use AmpMysqlMysqlConnectionPool;

$db = new MysqlConnectionPool(MysqlConfig::fromAuthority(DB_HOST, DB_USER, DB_PASS, DB_NAME));

/* create same table than in 3-generic-with-yield.php */
createGenericTable($db);

$transaction = $db->beginTransaction();

$transaction->execute("INSERT INTO tmp VALUES (?, ? * 2)", [6, 6]);

$result = $transaction->execute("SELECT * FROM tmp WHERE a >= ?", [5]); // Two rows should be returned.

foreach ($result as $row) {
    var_dump($row);
}

$transaction->rollback();

// Run same query again, should only return a single row since the other was rolled back.
$result = $db->execute("SELECT * FROM tmp WHERE a >= ?", [5]);

foreach ($result as $row) {
    var_dump($row);
}

$db->close();

0 人点赞