phly / phly-redis-task-queue
Requires
- php: ~8.1.0 || ~8.2.0
- dragonmantank/cron-expression: ^3.3.2
- phly/phly-configfactory: ^1.2
- predis/predis: ^1.1.10 || ^2.1
- psr/container: ^1.0 || ^2.0
- psr/event-dispatcher: ^1.0
- psr/log: ^1.0 || ^2.0 || ^3.0
- react/event-loop: ^1.3
- symfony/console: ^6.2
- webmozart/assert: ^1.11
Requires (Dev)
- laminas/laminas-coding-standard: ~2.3.0
- phpunit/phpunit: ^9.6.5
- vimeo/psalm: ^5.0
README
实现了使用Redis的任务队列和crontab运行器,利用PSR-14事件调度器、ReactPHP的事件循环和symfony/console(通过laminas-cli)。
安装
composer require phly/phly-redis-task-queue
为了获得最佳效果,建议安装ext-event扩展。
此外,该组件将自己标记为Laminas组件,在Laminas或Mezzio应用程序中使用时将自动设置使用配置。
使用
创建任务
任务可以是任何类的对象。
例如
namespace Foo; class HelloWorldTask { public function __construct( public readonly string $message ) { } }
序列化任务
为了将任务入队并在以后调度它,您需要能够序列化和反序列化它。为此,您需要创建一个mapper,这是一个实现了Phly\RedisTaskQueue\Mapper\MapperInterface
的类。
interface MapperInterface { /** * Can this implementation hydrate the given array type? * * @psalm-param array{__type: string, ...} $serialized */ public function handlesArray(array $serialized): bool; /** * Can this implementation extract the given object type? */ public function handlesObject(object $object): bool; /** @return array{__type: string, ...} */ public function castToArray(object $object): array; /** @psalm-param array{__type: string, ...} $serialized */ public function castToObject(array $serialized): object; }
在序列化到数组时,返回的数组必须包含成员__type
,其字符串值解析为任务类类型。否则,序列化格式其余部分由您决定。您可以假设当调用hydrate
时,数组包含mapper可以处理的__type
成员。
这些mapper类可以注册到您的DI容器中。一旦完成,您可以将它们注册到配置中,以便它们将自动添加到内部mapper。
return [ 'redis-task-queue' => [ 'mappers' => [ \App\Mapper\TaskMapper::class, ], ], ];
EmptyObjectMapper
此组件为空任务实现提供了灵活的mapper实现,Phly\RedisTaskQueue\Mapper\EmptyObjectMapper
。此类接受单个参数,即响应的任务类名称。在提取时,它将提取一个包含 exactly one member 的数组,即 __type
,其值为类。在填充时,它将不带参数实例化给定的类并返回它。
由于此实现有一个参数,因此您有几种方法可以将其注册到mapper。
首先,您可以创建一个自定义工厂。例如,如果空类命名为 RssFeed
,则可以创建一个 RssFeedMapperFactory
。
class RssFeedMapperFactory { public function __invoke(): EmptyObjectMapper { return new EmptyObjectMapper(RssFeed::class); } }
然后将其映射到服务
return [ 'dependencies' => [ 'factories' => 'rss-feed-mapper' => RssFeedMapperFactory::class, ], ], 'redis-task-queue' => [ 'mappers' => [ 'rss-feed-mapper', ], ], ];
第二种方法是使用Phly\RedisTaskQueue\Mapper\Mapper
类的委托工厂来附加它
class RssFeedMapperDelegator { public function __invoke(ContainerInterface $container, string $requestedName, callable $factory): Mapper { $mapper = $factory(); $mapper->attach(new EmptyObjectMapper(RssFeed::class)); return $mapper; } }
您只需注册委托工厂
return [ 'dependencies' => [ 'delegators' => [ \Phly\RedisTaskQueue\Mapper\Mapper::class => [ RssFeedMapperDelegator::class, ], ], ], ];
将任务入队
我建议将应用程序从RedisTaskQueue
解耦,并使用PSR-14调度器来调度一个包含任务的包装事件。这种方法意味着在开发中,您可以有一个处理延迟事件的替代处理程序,例如记录任务,而不是实际入队它。此外,通过将任务包装在DeferredEvent
中,您将在代码中指示您期望这会异步发生,而不是立即发生。如果您以后决定立即处理此类任务,您可以使用不同的监听器来监听DeferredEvent
,或者您可以从DeferredEvent
中解包特定的任务。
要将任务入队,请通过将其包装在Phly\RedisTaskQueue\EventDispatcher\DeferredEvent
中来调度它
$dispatcher->dispatch(new DeferredEvent($task));
此组件提供了一个监听此事件的监听器:Phly\RedisTaskQueue\EventDispatcher\DeferredEventListener
。您需要将其连接到PSR-14调度器。
为了延迟的目的,将应用程序从库解耦
如果您想“拥有”用于延迟任务的程序代码,并且不希望它依赖于此组件,您可以通过定义自己的
DeferredEvent
或AsyncEvent
类型来实现,然后为该类型创建自己的 PSR-14 监听器。实现方式类似于此库中的 DeferredEventListener。
处理任务
您需要为要排队的事件分配器中的每种任务类型注册一个或多个监听器。例如,基于上面的示例,您可能有一个以下监听器
namespace Foo; class HelloWorldListener { public function __invoke(HelloWorldTask $task): void { error_log(sprintf('Hello, %s', $task->message)); } }
然后您可以通过 PSR-14 监听器提供程序注册此监听器。
运行任务运行器
./vendor/bin/laminas phly:redis-task-queue:task-worker
如果您想有一组任务工作者,我建议使用 supervisord。具有五个工作者的池配置可能如下所示
[program:worker] autostart=true autorestart=unexpected command=vendor/bin/laminas phly:redis-task-queue:task-worker ; Change the following to your application root: directory=/var/www numprocs=5 process_name=%(program_name)s_%(process_num)d redirect_stderr=true
计划任务
此库中的 crontab 实现是通过 phly:redis-task-queue:cron-runner
laminas-cli 命令进行的。它从您的应用程序配置中拉取 crontab 定义,然后每分钟检查是否有到期任务。如果有,它将相关任务入队。
配置
配置是通过“cron.jobs”配置键进行的。每个元素都是一个具有两个键的数组
- schedule:要使用的 crontab 调度;有关概述,请参阅 dragonmantank/cron-expression 写作。
- task:表示要运行的任务的 JSON 字符串。此字符串必须代表可以映射到 PHP 类的 JSON 对象。由于 JSON 解析的工作方式,您需要确保正确转义命名空间分隔符;这通常是一个包含四个反斜杠的序列:
App\\Tasks\\FetchRssFeed
。
例如
return [ 'cron' => [ // Keys are not required for jobs, but are helpful when debugging configuration 'rss' => [ // Fetch every 3 hours at the top of the hour 'schedule' => '0 */3 * * *', 'task' => '{"__type": "App\\\\Tasks\\\\FetchRssFeed", "url": "https://github.com/weierophinney", "headers": {"Accept": "application/atom+xml"}}', ], 'social' => [ // Fetch every 15 minutes 'schedule' => '*/15 * * * *', 'task' => '{"__type": "App\\\\Tasks\\\\FetchSocial"}', ], ], ];
调用 cron 运行器
要调用 cron 运行器,请使用以下命令
./vendor/bin/laminas phly:redis-task-queue:cron-runner
我建议使用 supervisord 运行此命令。当您这样做时,请确保使用 仅一个 工作者,以确保到期时只有一个任务入队。配置可能如下所示
[program:cron] autostart=true autorestart=unexpected command=vendor/bin/laminas phly:redis-task-queue:cron-runner ; Change the following to your application root: directory=/var/www numprocs=1 redirect_stderr=true
配置
以下配置可以通过 config
服务消费
return [ 'redis-task-queue' => [ 'mappers' => [ // Class names of mapper services that can map event types for serialization ], // Float seconds interval between task runner invocations 'task_runner_interval' => 1.0, 'signals' => [ // Integer signals which indicate the task or cron runner should terminate. // In some cases, you may not be able to listen to SIGKILL // (e.g. running under a non-privileged user in supervisord) SIGKILL, SIGINT, SIGTERM, ], ], 'cron' => [ 'jobs' => [ /* Job definitions. * These are each arrays, and can have a named index or not. * Each has the following structure: * [ * 'schedule' => '* * * * *', // valid cron schedule string * 'task' => '{"__type": "...", ...}' // JSON serialization of task to run * ] */ ], ], ];