swoft使用rabbitmq消息队列

2021-01-12 14:36:58 浏览数 (1)

官方其实是实现了swoft/amqp组件,但是你会在sowft的官方文档里发现,根本找不到有任何关于它的使用说明。而且当使用composer require sowft/amqp你会发现无法安装成功,还会颇有嘲讽的提示你composer里没有找到这货。

需要手动增加如下配置到composer.json中。解决来源:https://github.com/swoft-cloud/swoft/issues/1376。

代码语言:javascript复制
{
    "repositories": {
        "swoft-amqp": {
            "type": "git",
              "url": "https://github.com/swoft-cloud/swoft-amqp.git"
        }
    }
}

在安装过程中,本地cygwin测试环境即使配了上面的地址,能下载README.md啥的,唯独无法下载最关键的swoft-amqpsrc的文件夹,最后没办法只能直接从git下载将src文件夹放到vendor中。

嘲讽 1

没有找到官方关于swoft/amqp的文档,因此只能从看源码摸索配置。 嘲讽再 1

安装好后,在bean.php中增加如下配置:

代码语言:javascript复制
    'amqp'              => [
        'class'    => SwoftAmqpClient::class,
        'host'     => '127.0.0.1',
        'port'     => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'queue' => 'test_queue',
        'exchange' => 'exchange_test',
        'route' => 'example-test-routing-key',
        'type' => AMQPExchangeType::DIRECT,
//        'channels' => [],
    ],
    'amqp.pool' => [
        'class' => SwoftAmqpPool::class,
        'client' => bean('amqp'),
    ],

主要的几个方法: 在sowftAmqpAmqp中可以看到。

代码语言:javascript复制
 * @method static Connection channel(string $channel = null)
 * @method static void push(string $message, array $prop = [], string $route = '')
 * @method static string|null pop()
 * @method static void listen(Closure $callback = null)
 */

swoft框架中如何将数据路由到指定的队列? 在原有的bean.phpamqp配置项中增加配置channels:

代码语言:javascript复制
 'amqp'              => [
      'class'    => SwoftAmqpClient::class,
      'host'     => '127.0.0.1',
      'port'     => 5672,
      'user' => 'guest',
      'password' => 'guest',
      'vhost' => '/',
      'queue' => 'test_queue',
      'exchange' => 'exchange_test',
      'route' => 'example-test-routing-key',
      'type' => AMQPExchangeType::DIRECT,
      'channels' => [ 
          'channel_1'=>[  //对应的key即为channel_id
              'exchange' => 'exchange_test_02',
              'queue' => 'subway',
          ],
      ],
  ],

在具体的逻辑代码中在获取$channel时指定需要获取的channels里的配置channle_id

代码语言:javascript复制
    public function index(Response $response): Response
    {
        $channel = Amqp::channel('channel_1');
        CLog::info('run method: '.__METHOD__);
        for($i=0; $i<10; $i  )
        {
            $channel->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'example-test-routing-key' );
        }

        $name = 'steve';
        return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
    }
    /**
     * @RequestMapping("listen")
     *
     * @param Response $response
     *
     * @return Response
     */
    public function listen(Response $response): Response
    {
        CLog::info('run method: '.__METHOD__);
        $channel = Amqp::channel('channel_1');
        $channel->listen(function ($message){
            //$message:数据结构(json_encode)之后
            //{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
            CLog::info('message:'. json_encode($message));
        });

        $name = 'steve';
        return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
    }

rabbitMQ的pubsub(发布订阅)模式:

  • 类型设置typefanout。具体配置可参考bean.php相关部分。
  • 消息发布者。参考代码AppHttpControllerTestAmqp中的pub()方法的实现。
代码语言:javascript复制
 public function pub(Request $request): Response
    {
        $content = $request->get('content', 'this-is-a-test');
        $channel = Amqp::connection('amqp.pubsub')->channel();
        for($i=0; $i<1; $i  )
        {
            $channel->push($content);
        }
        $name = __METHOD__;
        return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
    }
  • 消息订阅者。参考代码AppProcessSub1ProcressAppProcessSub2Procress
代码语言:javascript复制
    public function run(Process $process): void
    {
        $channel = Amqp::connection('amqp.pubsub')->channel('channel_pubsub_02');
        $channel->listen(function ($message){
            //$message:数据结构(json_encode)之后
            //{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
//            CLog::info('sub2 message:'. json_encode($message));
            CLog::info('sub2 message:'. $message->body);
        });
    }

0 人点赞