pmg/queue-tactician

使用Tactician运行异步作业

v6.1.0 2024-04-09 21:03 UTC

This package is auto-updated.

Last update: 2024-09-10 00:31:06 UTC


README

这是一个中间件,用于将Tacticianpmg/queue集成。

安装和用法

使用composer安装。

composer require pmg/queue-tactician

要使用它,请在默认命令处理中间件之前将中间件添加到中间件链中。

use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Producer;
use PMG\Queue\Tactician\QueueingMiddleware;

/** @var Producer */
$producer = createAQueueProducerSomehow();

$bus = new CommandBus([
    new QueueingMiddleware($producer),
    new CommandHandlerMiddleware(/*...*/),
]);

入队命令

实现PMG\Queue\Message的任何命令将通过生产者放入队列,并且不会调用其他中间件。

use PMG\Queue\Message;

final class DoLongRunningStuff implements Message
{
    /**
     * {@inheritdoc}
     */
    public function getName()
    {
        return 'LongRunningStuff';
    }
}

// goes right into the queue
$bus->handle(new DoLongRunningStuff());

出队(消费)命令

要使用Tactician通过消费者处理消息,请使用PMG\Queue\Handler\TacticianHandler

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TacticianHandler;

/** @var League\Tactician\CommandBus $bus */
$handler = new TacticianHandler($bus);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

$consumer->run();

以上假设CommandBus实例仍然安装了QueueingMiddleware。如果没有,您需要使用自己的处理程序来调用命令总线,可能通过CallableHandler

use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;

// no QueueingMiddleware!
$differentBus = new CommandBus([
    new CommandHandlerMiddleware(/*...*/),
]);

$handler = new CallableHandler([$bus, 'handle']);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

$consumer->run();

小心不要用PcntlForkingHandler包装此处理程序

命令总线的共享实例意味着,如果子进程被分叉以处理消息,那么诸如打开数据库连接之类的事情可能会引起问题。

相反,更好的选择是为每条消息创建一个新的命令总线。CreatingTacticianHandler可以为您做到这一点。

use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;
use PMG\Queue\Tactician\QueuedCommand;
use PMG\Queue\Tactician\QueueingMiddleware;
use PMG\Queue\Handler\CreatingTacticianHandler;

$handler = new CreatingTacticianHandler(function () {
    // this is invoked for every message
    return new CommandBus([
        new QueueingMiddleware(createAProduerSomehow()),
        new CommandHandlerMiddlware(/* ... */)
    ]);
});

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

$consumer->run();