vrnvgasu / php-rabbit-handler
处理队列
2.0.0
2022-03-22 14:10 UTC
Requires
- php: >=8.1.0
- php-amqplib/php-amqplib: >=3.2
Requires (Dev)
- phpunit/phpunit: ^9.5
- rector/rector: ^0.12.18
README
composer require vrnvgasu/php-rabbit-handler
在 ./Examples 目录中查看示例。
连接
创建新的连接。
$connection = new Connection('localhost', 5672, 'guest', 'guest');
队列
创建新的队列。
$queue = new Queue('hello', false, false, false, false);
如果需要从 rabbit 创建唯一的队列,请将 '' 作为第一个变量传递。
交换机
创建新的交换机。
$exchange = new Exchange('direct_logs', 'direct', false, false, false);
绑定队列和交换机
创建新的绑定。
$binding = new Binding();
您可以在绑定中创建传递路由键。
$binding = new Binding('key');
然后您必须创建辅助函数并传递给他 $connection, $queue, $exchange 和 $binding。
$helper = new AMQPHelper($connection, $queue, $exchange, $binding);
消费
创建新的消费。
$consume = new Consume('', false, false,false, false);
您可以将回调函数传递给 Consume 构造函数以处理来自 rabbit 的消息。
$consume = new Consume('', false, false, false, false, function($msg) { print_r('Message: ' . $msg->body); });
或者您可以从 Consumer 扩展您的消费者类并声明 callback 方法。
class TestConsumer extends Consumer { /** * @param AMQPMessage $msg */ public function callback(AMQPMessage $msg): void { print_r(' [x] Received ' . $msg->body . "\n"); if (!$this->consume->getNoAck()) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } } }
然后您必须创建消费者并传递给他 $helper 和 $consume。
$consumer = new Consumer($helper, $publication);
然后调用 execute 方法。
$consumer->execute();
如果您想在一段时间后中断处理器的执行,请使用以下方法
$seconds = 60; $consumer->execute(true, null, false, $seconds);
这将中断执行,在处理最后一个消息后指定的时间。
生产
创建新的发布。
$publication = new Publication([], 'hello');
然后您必须创建生产者并传递给他 $helper 和 $publication。
$producer = new Producer($helper, $consume);
您需要为生产者提供有效载荷。生产者可以从实现 JobInterface 的对象中获取有效载荷。
您可以将您的作业对象传递给 Producer 对象
$producer->execute(TestJob::dispatch($param));
或者您可以将生产者传递给 execute 方法。
TestJob::dispatch($param)->execute($producer);
作业
为生产者准备有效载荷。
分配
首先,您需要将准备数据传输到 dispatch。
use ATC\Jobs\TestJob; TestJob::dispatch($param1, $param2, $param3);
类声明
您的 TestJob 必须扩展 Job
class TestJob extends BaseJob { // }
处理
在 TestJob 中,您必须声明带有您在 dispatch 中传递的参数的 handle 方法。handle 将在 dispatch 中执行。
protected function handle($param1, $param2, $param3) { // Parameters will be passed to this class automatically // You can do something with them and get some $data // In the same method, you need to transfer the payload to the method `payload` for `producer` $this->setPayload($data); }