mgdigital / busque
一个灵活的现代PHP7命令队列和调度器,基于Redis构建
Requires
- php: >=7.0
- psr/log: ~1.0
Requires (Dev)
- behat/behat: ~3.0
- henrikbjorn/phpspec-code-coverage: ^3.0
- league/tactician: ~1.0
- phpspec/phpspec: ~3.0
- phpunit/phpunit: ~5.1
- predis/predis: ~1.0
- squizlabs/php_codesniffer: ~2.6
Suggests
- ext-redis: To use the better PHPRedis driver
README
一个灵活的现代PHP7命令队列和调度器,基于Redis构建
我创建了BusQue,因为我发现中型的PHP应用程序缺乏简单的消息队列选择。
BusQue的名字意味着命令总线+消息队列。它被设计为与Tactician和Redis一起使用,可以使用PHPRedis或Predis客户端,以及PHP serialize()等序列化程序,但也欢迎使用其他适配器。
我发现其他队列中缺少的一个关键功能是能够为作业分配一个唯一的ID,允许同一作业被排队多次,但在最后插入后只执行一次。
MGDigitalBusQueBundle 提供了与Symfony框架的集成。
安装
使用composer安装
composer require mgdigital/busque
或获取Symfony包
composer require mgdigital/busque-bundle
您还需要一个Redis服务器来运行队列。
用法
要使用BusQue,首先需要使用其依赖项实例化BusQue\Implementation
的一个实例。基本配置可能如下所示
<?php use MGDigital\BusQue as BusQue; // The preferred client is PHPRedis: $client = new Redis(); $adapter = new BusQue\Redis\PHPRedis\PHPRedisAdapter($client); // A Predis adepter is included, although Predis can have issues when used in long-running processes. // $client = new Predis\Client(); // $adapter = new BusQue\Redis\Predis\PredisAdapter($client); $driver = new BusQue\Redis\RedisDriver($adapter); // The PHP serializer should fit most use cases: $serializer = new BusQue\Serializer\PHPCommandSerializer(); // The MD5 generator creates an ID unique to the serialized command: $idGenerator = new BusQue\IdGenerator\Md5IdGenerator($serializer); $implementation = new BusQue\Implementation( // Puts all commands into the "default" queue: new BusQue\QueueResolver\SimpleQueueResolver('default'), $serializer, $idGenerator, // The Redis driver is used as both the queue and scheduler: $driver, $driver, // Always returns the current time: new BusQue\SystemClock(), // Inject your command bus here: new BusQue\Tactician\CommandBusAdapter($commandBus), // Inject your logger here: new Psr\Log\NullLogger() ); $busQue = new BusQue\BusQue($implementation);
BusQue\Handler\QueuedCommandHandler
和BusQue\Handler\ScheduledCommandHandler
类还需要与您的命令总线(Tactician)注册。有关使用命令总线的更多信息,请参阅Tactician网站。
如果您使用的是Symfony包,则上述所有操作都已为您完成,您可以直接从容器中获取busque
服务。
排队一个命令
SendEmailCommand
是您已配置Tactician来处理的命令
<?php $command = new SendEmailCommand('joe@example.com', 'Hello Joe!'); $commandBus->handle(new BusQue\QueuedCommand($command)); // or $busQue->queueCommand($command);
运行队列工作进程
<?php $busQue->workQueue('default'); // Hello Joe!
或在您的Symfony应用程序中运行app/console busque:queue_worker default
您需要为每个队列运行至少一个工作进程,可以使用类似supervisord的工具。
提示:如果您想在控制台看到工作进程处理的命令,请在Tactician中配置一些日志中间件,然后使用--verbose
选项运行busque:queue_worker
命令。
调度一个命令
<?php $commandBus->handle(new BusQue\ScheduledCommand($command, new \DateTime('+1 minute'))); // or $busQue->scheduleCommand($command, new \DateTime('+1 minute'));
运行调度器工作进程
只需要一个调度器工作进程来管理所有队列的调度。调度器工作进程的唯一任务是排队即将到期的命令。必须同时运行一个队列工作进程来处理这些命令。
<?php $busQue->workSchedule(); // 1 minute later... Hello Joe!
或在您的Symfony应用程序中运行app/console busque:scheduler_worker
需要标识符的命令
此命令在产品库存水平发生变化时每次都被排队,但我们给命令分配了一个ID
<?php $productId = 123; $command = new SyncStockLevelsWithExternalApiCommand($productId); $uniqueCommandId = 'SyncStock' . $productId; $commandBus->handle(new BusQue\QueuedCommand($command, $uniqueCommandId));
如果您没有指定唯一的命令ID,将自动生成一个。
假设队列正忙,在产品库存水平第二次改变之前还没有时间处理这个命令,那会怎么样?我们最不想看到的就是这条消息的重复进入队列,库存水平只需要同步一次。
由于我们通过产品ID识别命令,它将在任意给定时间只允许在队列(或调度器)中存在一次。
相反,如果您想要能够多次发出相同的命令,并确保队列工作器将运行每个命令副本,您必须确保每个命令副本都有唯一的ID。
这种行为如下
- 同一ID的命令一次只能排队或调度
- 如果当前有一个相同ID的命令正在进行,则可以排队一个新的相同ID的命令
- 当队列遇到ID已正在进行的命令时,该命令将被重新插入队列的末尾
- 当调度一个已调度的ID的命令时,原始调度的命令将被新调度的命令替换
使用MD5IdGenerator将生成一个始终与命令及其负载唯一一致的ID。如果需要不同的行为,可以使用其他ID生成器。
检查队列长度
我们还可以检查任何队列中的项目数量
<?php echo $busQue->getQueuedCount($queueName); // 0
列出队列
如果不存在,队列会自动创建,使用从QueueResolverInterface
适配器返回的任意队列名称。工作器可以在队列还不存在时工作。您需要确保如果生成了新的队列名称,那么必须有工作器来接收该队列中的命令。
<?php $queues = $busQue->listQueues(); // ['SendEmailCommand', 'SyncStockCommand']
取消命令
如果您想以任何原因取消一个命令,您可以使用以下调用移除它的所有痕迹
<?php $busQue->purgeCommand($queueName, $uniqueCommandId);
清空队列
<?php $busQue->deleteQueue($queueName);
列出队列中当前命令的ID
<?php $ids = $busQue->listQueuedIds($queueName); // ['command1id', 'command2id']
列出当前正在进行的命令的ID
<?php $ids = $busQue->listInProgressIds($queueName); // []
根据其ID从队列中读取命令
此方法返回从BusQue基于其队列名称和ID的反序列化命令,保持队列中的任何消息不受影响,如果命令在命令存储中未找到,则抛出BusQue\CommandNotFoundException
。
<?php $command = $busQue->getCommand($queueName, $uniqueCommandId);
更多方便的方法可以在BusQue\BusQue
类中找到。
测试
查看Travis CI上的测试套件输出
运行phpspec测试套件
bin/phpspec run -f pretty
并运行Behat验收套件
bin/behat
默认情况下,Behat套件将测试与PHPRedis的集成。也可以测试与Predis的集成
bin/behat --profile predis
默认情况下,这些测试将尝试向redis://redis:6379
的Redis实例写入。您可以通过提供一个扩展BusQue\Features\Context\AbstractPHPRedisContext
或BusQue\Features\Context\AbstractPredisContext
的FeatureContext
类来配置替代测试客户端。
Docker
包括了一个基本的docker环境用于测试。
cd docker docker-compose -f ./docker-compose.yml up docker exec -ti busque-php composer install docker exec -ti busque-php bin/behat
警告
- 我刚写了这个,所以可能还有一些我没有遇到的问题。API仍然会随着我解决这些问题而发生变化。我打算在时间允许的情况下改进其健壮性并扩展其功能,也欢迎pull请求!
- 我还没有在生产中使用这个,但我打算很快!祝你好运 :)