mle86 / wq-amqp
使用php-amqplib连接器为mle86/wq提供的AMQP模块
v1.4.0
2022-06-28 08:56 UTC
Requires
- php: >=8.0
- mle86/wq: ^v1.0.0 || >=v0.21.0 <1.0.0
- php-amqplib/php-amqplib: ^3 || ^2.20
Requires (Dev)
- php-coveralls/php-coveralls: ^2.5
- phpunit/phpunit: ^9
- roave/security-advisories: dev-master
README
此包包含PHP类mle86\WQ\WorkServerAdapter\AMQPWorkServer
。
它通过实现其WorkServerAdapter
接口来补充mle86/wq包。
它通过php-amqplib/php-amqplib包连接到AMQP服务器,例如RabbitMQ。
它为立即交付的工作在默认交换机("")中创建持久队列,并在自定义交换机“_phpwq._delay_exchange
”中的持久队列“_phpwq._delayed
”中为延迟作业创建持久队列。使用storeJob()
存储的作业也总是持久的。空队列或交换机不会自动删除。
版本和兼容性
这是mle86/wq-amqp
的1.4.0版本。
它是为mle86/wq
的1.0.0版本开发的,并且应该与其所有未来的1.x版本兼容。
安装和依赖
$ composer require mle86/wq-amqp
它需要PHP 8.0+。
它依赖于mle86/wq和php-amqplib/php-amqplib,而后者又需要PHP扩展mbstring、sockets和bcmath。
类参考
class mle86\WQ\WorkServerAdapter\AMQPWorkServer implements WorkServerAdapter
public function __construct (AMQPStreamConnection $connection)
构造函数。接收一个已经配置好的AMQPStreamConnection
实例来工作。不会尝试建立连接本身 - 使用connect()
工厂方法来代替。public static function connect ($host = 'localhost', $port = 5672, $user = 'guest', $password = 'guest', $vhost = '/', $insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', $connection_timeout = 3.0, $read_write_timeout = 3.0, $context = null, $keepalive = false, $heartbeat = 0)
工厂方法。请参阅AMQPStreamConnection::__construct以获取参数描述。
在WorkServerAdapter
接口中记录的接口方法
public function storeJob (string $workQueue, Job $job, int $delay = 0)
public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT): ?QueueEntry
public function buryEntry (QueueEntry $entry)
public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)
public function deleteEntry (QueueEntry $entry)
用法示例
<?php use mle86\WQ\WorkServerAdapter\AMQPWorkServer; use mle86\WQ\WorkProcessor; use mle86\WQ\Job\Job; $processor = new WorkProcessor( AMQPWorkServer::connect("localhost") ); while (true) { $processor->processNextJob("mail", function(Job $job) { $job->...; }); }
此代码将无限期地执行本地AMQP服务器“mail
”队列中所有可用的作业。然而,如果其中一个作业抛出异常,它将终止 - 您可能想在processNextJob()
调用周围添加日志记录try-catch块,如WQ的“快速入门”示例中所示。