背景
需要做项目迁移时,例如laravel迁移至hyperf时,因为基本上都是一步一步迁移的,仍有例如支付回调等依旧在laravel框架中进行消费的情况。需要接管处理消息的queue进行数据格式改造,利用构造同样命名空间的job去进行投递,他会序列化数据,可以debug一下内容哦,然后投递至rabbitMQ后,laravel进行消费就好啦。其中hyperf的版本背景为2.1
话不多说开干
- 在app下建立Job目录为例,大家可以根据情况来
- 在Job目录下建立Job.php,复制以下代码
<?php
declare(strict_types=1);
namespace AppJob;
/**
* Class Job
* @package AppJob
*/
class Job
{
protected $job;
public $connection;
public $queue;
public $delay;
/**
* Job constructor.
*/
public function __invoke()
{
$this->job = null;
$this->connection = null;
$this->queue = null;
$this->delay = null;
}
/**
* Set the desired delay for the job.
*
* @param DateTime|int|null $delay
* @return $this
*/
public function delay($delay)
{
$this->delay = $delay;
return $this;
}
/**
* Set the desired queue for the job.
*
* @param string|null $queue
* @return $this
*/
public function onQueue($queue)
{
$this->queue = $queue;
return $this;
}
/**
* Set the desired connection for the job.
*
* @param string|null $connection
* @return $this
*/
public function onConnection($connection)
{
$this->connection = $connection;
return $this;
}
}
PHP
Copy
- 接管Producer.php,继续创建Producer.php,复制以下代码进去
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfAmqpConnection;
use HyperfAmqpMessageProducerMessageInterface;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;
use HyperfAmqpBuilder;
/**
* 生产者
* Class Producer
* @package AppJob
*/
class Producer extends Builder
{
public $exchange_type;
public $exchange_passive;
public $exchange_durable;
public $exchange_auto_delete;
public $queue_passive;
public $queue_durable;
public $queue_exclusive;
public $queue_auto_delete;
public $queue_nowait;
public function checkExchange($channel, $producerMessage)
{
$exchange = $producerMessage->getExchange();
$queue = $producerMessage->getQueue();
$routingKey = $producerMessage->getRoutingKey();
$ttl = $producerMessage->getTtl();
$this->exchange_type = env('RABBITMQ_EXCHANGE_TYPE', 'direct');
$this->exchange_passive = env('RABBITMQ_EXCHANGE_PASSIVE', false);
$this->exchange_durable = env('RABBITMQ_EXCHANGE_DURABLE', true);
$this->exchange_auto_delete = env('RABBITMQ_EXCHANGE_PASSIVE', false);
$this->queue_passive = env('RABBITMQ_QUEUE_PASSIVE', false);
$this->queue_durable = env('RABBITMQ_QUEUE_DURABLE', true);
$this->queue_exclusive = env('RABBITMQ_QUEUE_EXCLUSIVE', false);
$this->queue_auto_delete = env('RABBITMQ_QUEUE_AUTODELETE', false);
//定义交换器
$channel->exchange_declare($exchange, $this->exchange_type, $this->exchange_passive, $this->exchange_durable, $this->exchange_auto_delete);
//定义队列
$channel->queue_declare($queue, $this->queue_passive, $this->queue_durable, $this->queue_exclusive, $this->queue_auto_delete);
//绑定队列到交换器上
$channel->queue_bind($queue, $exchange, $routingKey);
if ($ttl > 0) {
// $delayExchange = 'delayed_exchange_' . $exchange;
// $delayQueue = 'delayed_queue_' . $queue . '_' . $ttl;
// $delayRoutingKey = $routingKey . $ttl;
$delayExchange = $exchange;
$delayQueue = $queue . '_deferred_' . $ttl;
$delayRoutingKey = $delayQueue;
//定义延迟交换器
$channel->exchange_declare($delayExchange, $this->exchange_type, $this->exchange_passive, $this->exchange_durable, $this->exchange_auto_delete);
//定义延迟队列
$channel->queue_declare($delayQueue, $this->queue_passive, $this->queue_durable, $this->queue_exclusive, $this->queue_auto_delete, false, new AMQPTable(array(
"x-dead-letter-exchange" => $exchange,
"x-dead-letter-routing-key" => $routingKey,
"x-message-ttl" => $ttl * 1000,
)));
//绑定延迟队列到交换器上
$channel->queue_bind($delayQueue, $delayExchange, $delayRoutingKey);
$producerMessage->setExchange($delayExchange);
$producerMessage->setRoutingKey($delayRoutingKey);
}
}
/**
* @param ProducerMessageInterface $producerMessage
* @param $routingKey
* @param $exchange
* @param bool $confirm
* @param int $timeout
* @return bool
* @throws Exception
* @throws Throwable
*/
public function produce(ProducerMessageInterface $producerMessage, $routingKey, $exchange, bool $confirm = false, int $timeout = 5): bool
{
return retry(1, function () use ($exchange, $routingKey, $producerMessage, $confirm, $timeout) {
return $this->produceMessage($producerMessage, $routingKey, $exchange, $confirm, $timeout);
});
}
/**
* @param ProducerMessageInterface $producerMessage
* @param $routingKey
* @param $exchange
* @param bool $confirm
* @param int $timeout
* @return bool
* @throws Throwable
*/
private function produceMessage(ProducerMessageInterface $producerMessage, $routingKey, $exchange, bool $confirm = false, int $timeout = 5)
{
$result = false;
$this->injectMessageProperty($producerMessage, $routingKey, $exchange);
$delay = $producerMessage->getTtl();
if ($delay > 0) {
$message = new AMQPMessage($producerMessage->payload(), array_merge($producerMessage->getProperties(), [
'expiration' => $delay * 1000,
]));
} else {
$message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
}
// $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
$pool = $this->getConnectionPool($producerMessage->getPoolName());
/** @var Connection $connection */
$connection = $pool->get();
if ($confirm) {
$channel = $connection->getConfirmChannel();
} else {
$channel = $connection->getChannel();
}
$channel->set_ack_handler(function () use (&$result) {
$result = true;
});
try {
// 检测交换机和队列
$this->checkExchange($channel, $producerMessage);
$channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
$channel->wait_for_pending_acks_returns($timeout);
} catch (Throwable $exception) {
// Reconnect the connection before release.
$connection->reconnect();
throw $exception;
} finally {
$connection->release();
}
return $confirm ? $result : true;
}
private function injectMessageProperty(ProducerMessageInterface $producerMessage, $routingKey, $exchange)
{
$producerMessage->setRoutingKey($routingKey);
$producerMessage->setExchange($exchange);
}
}
PHP
Copy
- 接管ProducerMessage.php,继续创建ProducerMessage.php,复制以下代码进去
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfAmqpConstants;
use HyperfAmqpMessageMessage;
use HyperfAmqpMessageProducerMessageInterface;
/**
* 生产消息
* Class Job
* @package AppJob
*/
abstract class ProducerMessage extends Message implements ProducerMessageInterface
{
/**
* @var string
*/
protected $payload = '';
/**
* @var string
*/
protected $routingKey = '';
/**
* @var array
*/
protected $properties
= [
'content_type' => 'text/plain',
'delivery_mode' => Constants::DELIVERY_MODE_PERSISTENT,
];
public function getProperties(): array
{
return $this->properties;
}
public function setPayload($data): self
{
$this->payload = $data;
return $this;
}
public function payload(): string
{
return $this->serialize();
}
public function serialize(): string
{
return json_encode($this->payload);
}
/**
* @var integer 延迟时间(秒)
*/
protected $ttl = 0;
public function setTtl($ttl)
{
$this->ttl = $ttl;
return $this;
}
public function getTtl()
{
return $this->ttl;
}
protected $queue = 'default';
public function setQueue($name)
{
$this->queue = $name;
return $this;
}
public function getQueue()
{
return $this->queue;
}
}
PHP
Copy
- 序列化数据,创建SerializeJobData.php,复制以下代码进去
<?php
declare(strict_types=1);
namespace AppJob;
use HyperfUtilsStr;
/**
* 序列化队列数据
* Class SerializeJobData
* @package AppJob
*/
class SerializeJobData extends ProducerMessage
{
public function __construct($job)
{
// 设置不同 pool
$this->poolName = 'default';
/**
* 当驱动为redis时
* use IlluminateSupportStr;
* 'id' => Str::random(32),'attempts' => 0,
*/
if (env('QUEUE_DRIVER', 'rabbitmq') == 'rabbitmq') {
$this->payload = [
'displayName' => get_class($job),
'job' => 'IlluminateQueueCallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job)
]
];
} else {
$this->payload = [
'displayName' => get_class($job),
'job' => 'IlluminateQueueCallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job)
],
'id' => Str::random(32),
'attempts' => 0
];
}
}
}
PHP
Copy
- 创建助手函数 注意我的内容哦 按需修改
if (!function_exists('producerPushData')) {
/**
* 投递信息
* @param ProducerMessageInterface $message 消息
* @param string $routingKey 默认 default
* @param string $exchange 所投入的queue
* @param bool $confirm 是否需要确认
* @param int $timeout 超时时间
* @return bool
* @throws Throwable
*/
function producerPushData($message, $routingKey = 'default', $exchange = '', bool $confirm = false, int $timeout = 5)
{
$exchange = !empty($exchange) ? $exchange : env('RABBITMQ_EXCHANGE_NAME', 'sweetheart');
return make(Producer::class)->produce($message, $routingKey, $exchange, $confirm, $timeout);
}
}
PHP
Copy
- 使用方式 注意需要和laravel/lumen 保持同样的命名空间哦
创建的job需要继承 ```AppJobJob``
代码语言:javascript复制<?php
declare(strict_types=1);
use AppJobJob;
/**
* Class TestJob
*/
class TestJob extends Job
{
/**
* @var
*/
protected $data;
/**
* TestJob constructor.
* @param $data
*/
public function __construct($data)
{
$this->data = $data;
}
/**
* 处理逻辑
*/
public function __handle()
{
}
}
PHP
Copy
代码语言:javascript复制use AppJobSerializeJobData;
use TestJob;
$data = [];
$job = new TestJob($data);
producerPushData((new SerializeJobData($job)));
PHP
Copy