<?php
require 'vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
// __construct(
// $host, ip
// $port, 端口号
// $user, 用户名
// $password, 密码
// $vhost = '/', 虚拟主机
// $insist = false,
// $login_method = 'AMQPLAIN',
// $login_response = null,
// $locale = 'en_US',
// $connection_timeout = 3.0,
// $read_write_timeout = 130.0,
// $context = null,
// $keepalive = false,
// $heartbeat = 60,
// $channel_rpc_timeout = 0.0,
// $ssl_protocol = null
// )
$connection = new AMQPStreamConnection('172.18.0.228', 5672, 'test', 'test');
$channel = $connection->channel();
// exchange_declare(
// $exchange, 交换器 exchange 名称
// $type, 交换器类型
// $passive = false, passive true 只检测不创建 false 创建
// $durable = false, durable true 为 持久化
// $auto_delete = true, exclusive 设置true 将会变成私有
// $internal = false,
// $nowait = false,
// $arguments = array(),
// $ticket = null
// )
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// $msg = new AMQPMessage('Hello World!',[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]);
// fanout 类型交换器 为广播类型
// $channel->basic_publish($msg, 'exchange', 'routing_key');
// queue_declare(
// $queue = '', 队列名称
// $passive = false, passive true 只检测不创建 false 创建
// $durable = false, durable true 为 持久化
// $exclusive = false, exclusive 设置true 将会变成私有
// $auto_delete = true, auto-delete true 自动删除 最后一个取消订阅就会移除队列
// $nowait = false,
// $arguments = array(),
// $ticket = null
// )
$channel->queue_declare('hello', false, true, false, false);
// __construct(
// $body = '', 消息内容
// $properties = [
// delivery_mode 投递模式 delivery mode 设置为 2 (AMQPMessage::DELIVERY_MODE_PERSISTENT) 标记持久化
// ]
//
// )
$msg = new AMQPMessage('Hello World!',[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]);
// basic_publish(
// $msg, 消息内容
// $exchange = '', 交换器名称
// $routing_key = '', 路由键 (routing key)
// $mandatory = false,
// $immediate = false,
// $ticket = null
// )
$channel->basic_publish($msg, 'exchange_name', 'routing_key');
echo " [x] Sent 'Hello World!'n";
//关闭信道,关闭amqp
$channel->close();
$connection->close();
require 'vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
// __construct(
// $host, ip
// $port, 端口号
// $user, 用户名
// $password, 密码
// $vhost = '/', 虚拟主机
// $insist = false,
// $login_method = 'AMQPLAIN',
// $login_response = null,
// $locale = 'en_US',
// $connection_timeout = 3.0,
// $read_write_timeout = 130.0,
// $context = null,
// $keepalive = false,
// $heartbeat = 60,
// $channel_rpc_timeout = 0.0,
// $ssl_protocol = null
// )
$connection = new AMQPStreamConnection('172.18.0.228', 5672, 'test', 'test');
$channel = $connection->channel();
// exchange_declare(
// $exchange, 交换器 exchange 名称
// $type, 交换器类型
// $passive = false, passive true 只检测不创建 false 创建
// $durable = false, durable true 为 持久化
// $auto_delete = true, exclusive 设置true 将会变成私有
// $internal = false,
// $nowait = false,
// $arguments = array(),
// $ticket = null
// )
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// queue_declare(
// $queue = '', 队列名称
// $passive = false, passive true 只检测不创建 false 创建
// $durable = false, durable true 为 持久化
// $exclusive = false, exclusive 设置true 将会变成私有
// $auto_delete = true, auto-delete true 自动删除 最后一个取消订阅就会移除队列
// $nowait = false,
// $arguments = array(),
// $ticket = null
// )
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// queue_bind(
// $queue, 队列名称
// $exchange, 交换器 exchange 名称
// $routing_key = '', 路由键 (routing key)
// $nowait = false,
// $arguments = array(),
// $ticket = null
// )
// fanout 类型交换器 为广播类型
$channel->queue_bind($queue_name, 'exchange',"routing_key");
// queue_declare(
// $queue = '', 队列名称
// $passive = false, passive true 只检测不创建 false 创建
// $durable = false, durable true 为 持久化
// $exclusive = false, exclusive 设置true 将会变成私有
// $auto_delete = true, auto-delete true 自动删除 最后一个取消订阅就会移除队列
// $nowait = false,
// $arguments = array(),
// $ticket = null
// )
$channel->queue_declare('hello', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL Cn";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "n";
// basic_ack(
// $delivery_tag, 投递标签 delivery mode 投递模式
// $multiple = false 多个
// )
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
// RabbitMQ中的概念,channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
// basic_qos(
// $prefetch_size,
// $prefetch_count,
// $a_global
// )
$channel->basic_qos(null, 1, null);
// basic_consume(
// $queue = '', 队列名称
// $consumer_tag = '',
// $no_local = false,
// $no_ack = false, 使用false必须 ack 主动确认消息 成功送达
// $exclusive = false, exclusive 设置true 将会变成私有
// $nowait = false,
// $callback = null, 回调函数
// $ticket = null,
// $arguments = array()
// )
$channel->basic_consume('queue_name', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}