RabbitMq-PhpAmqpLib笔记(注:只做笔记-简易手册使用,不做教程)

2019-12-18 17:32:05 浏览数 (1)

<?php

require 'vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;

use PhpAmqpLibMessageAMQPMessage;

// __construct(

//   $host, ip

//   $port, 端口号

//   $user, 用户名

//   $password, 密码

//   $vhost = '/', 虚拟主机

//   $insist = false,

//   $login_method = 'AMQPLAIN',

//   $login_response = null,

//   $locale = 'en_US',

//   $connection_timeout = 3.0,

//   $read_write_timeout = 130.0,

//   $context = null,

//   $keepalive = false,

//   $heartbeat = 60,

//   $channel_rpc_timeout = 0.0,

//   $ssl_protocol = null

// )

$connection = new AMQPStreamConnection('172.18.0.228', 5672, 'test', 'test');

$channel = $connection->channel();

// exchange_declare(

//   $exchange, 交换器 exchange 名称

//   $type, 交换器类型

//   $passive = false, passive true 只检测不创建 false 创建

//   $durable = false, durable true 为 持久化

//   $auto_delete = true, exclusive 设置true 将会变成私有

//   $internal = false,

//   $nowait = false,

//   $arguments = array(),

//   $ticket = null

// )

$channel->exchange_declare('exchange_name', 'direct', false, true, false);

// $msg = new AMQPMessage('Hello World!',[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]);

// fanout 类型交换器 为广播类型

// $channel->basic_publish($msg, 'exchange', 'routing_key');

// queue_declare(

// $queue = '', 队列名称

// $passive = false, passive true 只检测不创建 false 创建

// $durable = false, durable true 为 持久化

// $exclusive = false, exclusive 设置true 将会变成私有

// $auto_delete = true, auto-delete true 自动删除 最后一个取消订阅就会移除队列 

// $nowait = false,

// $arguments = array(),

// $ticket = null

// )

$channel->queue_declare('hello', false, true, false, false);

// __construct(

//  $body = '',  消息内容

//  $properties = [

//  delivery_mode  投递模式  delivery mode 设置为 2 (AMQPMessage::DELIVERY_MODE_PERSISTENT) 标记持久化

//  ]

// 

// )

$msg = new AMQPMessage('Hello World!',[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]);

// basic_publish(

//   $msg, 消息内容

//   $exchange = '', 交换器名称

//   $routing_key = '', 路由键 (routing key)

//   $mandatory = false,

//   $immediate = false,

//   $ticket = null

// )

$channel->basic_publish($msg, 'exchange_name', 'routing_key');

echo " [x] Sent 'Hello World!'n";

//关闭信道,关闭amqp

$channel->close();

$connection->close();

require 'vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;

// __construct(

//   $host, ip

//   $port, 端口号

//   $user, 用户名

//   $password, 密码

//   $vhost = '/', 虚拟主机

//   $insist = false,

//   $login_method = 'AMQPLAIN',

//   $login_response = null,

//   $locale = 'en_US',

//   $connection_timeout = 3.0,

//   $read_write_timeout = 130.0,

//   $context = null,

//   $keepalive = false,

//   $heartbeat = 60,

//   $channel_rpc_timeout = 0.0,

//   $ssl_protocol = null

// )

$connection = new AMQPStreamConnection('172.18.0.228', 5672, 'test', 'test');

$channel = $connection->channel();

// exchange_declare(

//   $exchange, 交换器 exchange 名称

//   $type, 交换器类型

//   $passive = false, passive true 只检测不创建 false 创建

//   $durable = false, durable true 为 持久化

//   $auto_delete = true, exclusive 设置true 将会变成私有

//   $internal = false,

//   $nowait = false,

//   $arguments = array(),

//   $ticket = null

// )

$channel->exchange_declare('exchange_name', 'direct', false, true, false);

// queue_declare(

// $queue = '', 队列名称

// $passive = false, passive true 只检测不创建 false 创建

// $durable = false, durable true 为 持久化

// $exclusive = false, exclusive 设置true 将会变成私有

// $auto_delete = true, auto-delete true 自动删除 最后一个取消订阅就会移除队列 

// $nowait = false,

// $arguments = array(),

// $ticket = null

// )

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// queue_bind(

//   $queue, 队列名称

//   $exchange, 交换器 exchange 名称

//   $routing_key = '', 路由键 (routing key)

//   $nowait = false,

//   $arguments = array(),

//   $ticket = null

// )

// fanout 类型交换器 为广播类型

$channel->queue_bind($queue_name, 'exchange',"routing_key");

// queue_declare(

// $queue = '', 队列名称

// $passive = false, passive true 只检测不创建 false 创建

// $durable = false, durable true 为 持久化

// $exclusive = false, exclusive 设置true 将会变成私有

// $auto_delete = true, auto-delete true 自动删除 最后一个取消订阅就会移除队列 

// $nowait = false,

// $arguments = array(),

// $ticket = null

// )

$channel->queue_declare('hello', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL Cn";

$callback = function ($msg) {

echo ' [x] Received ', $msg->body, "n";

// basic_ack(

//  $delivery_tag,  投递标签 delivery mode 投递模式

//  $multiple = false  多个

// )

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

};

// RabbitMQ中的概念,channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。

// basic_qos(

//  $prefetch_size, 

//  $prefetch_count, 

//  $a_global

// )

$channel->basic_qos(null, 1, null);

// basic_consume(

//   $queue = '', 队列名称

//   $consumer_tag = '',

//   $no_local = false,

//   $no_ack = false, 使用false必须 ack 主动确认消息 成功送达

//   $exclusive = false, exclusive 设置true 将会变成私有

//   $nowait = false,

//   $callback = null, 回调函数 

//   $ticket = null,

//   $arguments = array()

// )

$channel->basic_consume('queue_name', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {

$channel->wait();

}

0 人点赞