官方其实是实现了swoft/amqp
组件,但是你会在sowft的官方文档里发现,根本找不到有任何关于它的使用说明。而且当使用composer require sowft/amqp
你会发现无法安装成功,还会颇有嘲讽的提示你composer里没有找到这货。
需要手动增加如下配置到composer.json
中。解决来源:https://github.com/swoft-cloud/swoft/issues/1376。
{
"repositories": {
"swoft-amqp": {
"type": "git",
"url": "https://github.com/swoft-cloud/swoft-amqp.git"
}
}
}
在安装过程中,本地cygwin测试环境即使配了上面的地址,能下载README.md
啥的,唯独无法下载最关键的swoft-amqp
中src
的文件夹,最后没办法只能直接从git下载将src
文件夹放到vendor
中。
嘲讽 1
没有找到官方关于swoft/amqp
的文档,因此只能从看源码摸索配置。
嘲讽再 1
安装好后,在bean.php
中增加如下配置:
'amqp' => [
'class' => SwoftAmqpClient::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'queue' => 'test_queue',
'exchange' => 'exchange_test',
'route' => 'example-test-routing-key',
'type' => AMQPExchangeType::DIRECT,
// 'channels' => [],
],
'amqp.pool' => [
'class' => SwoftAmqpPool::class,
'client' => bean('amqp'),
],
主要的几个方法:
在sowftAmqpAmqp
中可以看到。
* @method static Connection channel(string $channel = null)
* @method static void push(string $message, array $prop = [], string $route = '')
* @method static string|null pop()
* @method static void listen(Closure $callback = null)
*/
swoft框架中如何将数据路由到指定的队列?
在原有的bean.php
的amqp
配置项中增加配置channels
:
'amqp' => [
'class' => SwoftAmqpClient::class,
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'queue' => 'test_queue',
'exchange' => 'exchange_test',
'route' => 'example-test-routing-key',
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_test_02',
'queue' => 'subway',
],
],
],
在具体的逻辑代码中在获取$channel
时指定需要获取的channels
里的配置channle_id
:
public function index(Response $response): Response
{
$channel = Amqp::channel('channel_1');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i )
{
$channel->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'example-test-routing-key' );
}
$name = 'steve';
return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
}
/**
* @RequestMapping("listen")
*
* @param Response $response
*
* @return Response
*/
public function listen(Response $response): Response
{
CLog::info('run method: '.__METHOD__);
$channel = Amqp::channel('channel_1');
$channel->listen(function ($message){
//$message:数据结构(json_encode)之后
//{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
CLog::info('message:'. json_encode($message));
});
$name = 'steve';
return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
}
rabbitMQ的pubsub(发布订阅)模式:
- 类型设置
type
为fanout
。具体配置可参考bean.php
相关部分。 - 消息发布者。参考代码
AppHttpControllerTestAmqp
中的pub()
方法的实现。
public function pub(Request $request): Response
{
$content = $request->get('content', 'this-is-a-test');
$channel = Amqp::connection('amqp.pubsub')->channel();
for($i=0; $i<1; $i )
{
$channel->push($content);
}
$name = __METHOD__;
return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
}
- 消息订阅者。参考代码
AppProcessSub1Procress
与AppProcessSub2Procress
。
public function run(Process $process): void
{
$channel = Amqp::connection('amqp.pubsub')->channel('channel_pubsub_02');
$channel->listen(function ($message){
//$message:数据结构(json_encode)之后
//{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
// CLog::info('sub2 message:'. json_encode($message));
CLog::info('sub2 message:'. $message->body);
});
}