tobento / service-queue
用于处理后台作业的队列系统。
Requires
- php: >=8.0
- psr/clock: ^1.0
- psr/container: ^2.0
- psr/simple-cache: 2 - 3
- tobento/service-autowire: ^1.0.9
Requires (Dev)
- mockery/mockery: ^1.6
- phpunit/phpunit: ^9.5
- tobento/service-cache: ^1.0
- tobento/service-clock: ^1.0
- tobento/service-collection: ^1.0.5
- tobento/service-console: ^1.0.3
- tobento/service-container: ^1.0.6
- tobento/service-database: ^1.1.2
- tobento/service-encryption: ^1.0
- tobento/service-event: ^1.0
- tobento/service-storage: ^1.2.5
- vimeo/psalm: ^4.0
Suggests
- tobento/service-cache: May be used for unique jobs
- tobento/service-clock: To support storage queue
- tobento/service-console: To run queue worker via console commands
- tobento/service-database: To support storage queue factory
- tobento/service-encryption: To support job encryption
- tobento/service-event: Used for console commands to write events
- tobento/service-storage: To support storage queue
README
用于处理后台作业的队列系统。
目录
入门
使用此命令添加运行此任务队列项目的最新版本。
composer require tobento/service-queue
要求
- PHP 8.0 或更高版本
亮点
- 框架无关,与任何项目兼容
- 解耦设计
文档
创建作业
作业
您可以使用 Job::class
来创建作业。
使用命名作业
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\JobInterface; $job = new Job( name: 'sample', payload: ['key' => 'value'], ); var_dump($job instanceof JobInterface); // bool(true)
接下来,您需要添加一个 作业处理器 来处理作业
使用作业处理器
首先,创建作业处理器
use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\JobInterface; final class Mail implements JobHandlerInterface { public function __construct( private MailerInterface $mailer, private MessageFacotryInterface $messageFactory, ) {} public function handleJob(JobInterface $job): void { $message = $this->messageFactory->createFromArray($job->getPayload()); $this->mailer->send($message); } public static function toPayload(MessageInterface $message): array { return $message->jsonSerialize(); } }
最后,创建作业
use Tobento\Service\Queue\Job; $job = new Job( name: Mail::class, payload: Mail::toPayload($message), );
可调用作业
您可以使用 CallableJob::class
来创建作业。
类构造函数的参数必须是可选的 null|(type)
,如果它们不能由容器解决!
use Tobento\Service\Queue\CallableJob; use Tobento\Service\Queue\JobInterface; final class MailJob extends CallableJob { public function __construct( private null|MessageInterface $message = null, ) {} public function handleJob( JobInterface $job, MailerInterface $mailer, MessageFacotryInterface $messageFactory, ): void { $message = $messageFactory->createFromArray($job->getPayload()); $mailer->send($message); } public function getPayload(): array { if (is_null($this->message)) { return []; // or throw exception } return $this->message->jsonSerialize(); } public function renderTemplate(): static { // render template logic ... return $this; } }
创建作业
$job = (new MailJob($message)) ->renderTemplate();
作业参数
您可以使用提供的参数提供基本作业功能,或者 创建自定义参数 来添加新功能或定制现有功能以满足您的需求。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Duration(seconds: 10)) ->parameter(new Parameter\Retry(max: 2));
参数辅助方法
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\JobInterface; $job = (new Job(name: 'sample')) ->queue(name: 'secondary') ->data(['key' => 'value']) ->duration(seconds: 10) ->retry(max: 2) ->delay(seconds: 5) ->unique() ->priority(100) ->pushing(function() {}) ->encrypt();
如果您使用的是 可调用作业,您可以使用 __construct
方法指定默认参数
use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\Parameter; final class SampleJob extends CallableJob { public function __construct() { $this->duration(seconds: 10); $this->retry(max: 2); // or using its classes: $this->parameter(new Parameter\Priority(100)); } //... }
延迟参数
使用延迟参数设置作业需要延迟的秒数。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Delay(seconds: 60)) // or using helper method: ->delay(seconds: 60);
支持延迟的队列
数据参数
使用数据参数添加额外的作业数据。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Data(['key' => 'value'])) // or using helper method: ->data(['key' => 'value']);
持续时间参数
使用持续时间参数设置作业需要处理的近似时间。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Duration(seconds: 10)) // or using helper method: ->duration(seconds: 10);
失败的作业处理器 如果作业无法运行,将重新排队作业以防止超时。
加密参数
加密参数使用 服务加密 来加密作业数据。
它将加密以下数据
- 作业有效负载
- 数据参数 的值
首先,安装服务
composer require tobento/service-encryption
然后,将加密器绑定到 作业处理器 使用的容器
示例使用服务容器作为容器
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Container\Container; use Tobento\Service\Encryption\EncrypterInterface; $container = new Container(); $container->set(EncrypterInterface::class, function() { // create enrcypter: return $encrypter; }); $jobProcessor = new JobProcessor($container);
查看加密实现部分以了解更多信息。
最后,将参数添加到您的任务中
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Encrypt()) // or using helper method: ->encrypt();
您可以创建一个自定义加密参数来使用另一个加密器或自定义加密。
监控参数
监控参数由Worker添加,可以用于记录有关任务的数据,例如运行时间(秒)和内存使用情况。例如,该参数由Work Command用于将其数据写入控制台。
use Tobento\Service\Queue\Parameter\Monitor; if ($job->parameters()->has(Monitor::class)) { $monitor = $job->parameters()->get(Monitor::class); $runtimeInSeconds = $monitor->runtimeInSeconds(); $memoryUsage = $monitor->memoryUsage(); }
优先级参数
使用优先级参数指定任务的优先级。优先级更高的任务将优先处理。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Priority(100)) // or using helper method: ->priority(100);
推送参数
使用推送参数指定在任务推送到队列之前执行的处理器。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Pushing( handler: function(JobInterface $job, AnyResolvableClass $foo): JobInterface { return $job; }, // you may set a priority. Higher gets executed first: priority: 100, // 0 is default )) // or using helper method: ->pushing(handler: function() {}, priority: 100);
队列参数
使用队列参数指定将任务推送到的队列。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Queue(name: 'secondary')) // or using helper method: ->queue(name: 'secondary');
当任务推送到队列时,Job Processor会自动添加此参数。
重试参数
使用重试参数指定最大重试次数。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Retry(max: 2)) // or using helper method: ->retry(max: 2);
Failed Job Handler使用此参数处理重试。
唯一参数
唯一参数将防止在另一个实例的任务处于队列或处理状态时,任何新的重复任务进入队列。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\Unique( // A unique id. If null it uses the job id. id: null, // null|string )) // or using helper method: ->unique(id: null);
该参数需要将CacheInterface::class
绑定到传递给JobProcessor的容器中。
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Container\Container; use Psr\SimpleCache\CacheInterface; use Tobento\Service\Cache\Simple\Psr6Cache; use Tobento\Service\Cache\ArrayCacheItemPool; use Tobento\Service\Clock\SystemClock; $container = new Container(); $container->set(CacheInterface::class, function() { // create cache: return new Psr6Cache( pool: new ArrayCacheItemPool( clock: new SystemClock(), ), namespace: 'default', ttl: null, ); }); $jobProcessor = new JobProcessor($container);
无重叠参数
如果您添加了无重叠参数,则任务将一次只处理一次,以防止重叠。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\Parameter; $job = (new Job(name: 'sample')) ->parameter(new Parameter\WithoutOverlapping( // A unique id. If null it uses the job id. id: null, // null|string )) // or using helper method: ->withoutOverlapping(id: null);
该参数需要将CacheInterface::class
绑定到传递给JobProcessor的容器中。
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Container\Container; use Psr\SimpleCache\CacheInterface; use Tobento\Service\Cache\Simple\Psr6Cache; use Tobento\Service\Cache\ArrayCacheItemPool; use Tobento\Service\Clock\SystemClock; $container = new Container(); $container->set(CacheInterface::class, function() { // create cache: return new Psr6Cache( pool: new ArrayCacheItemPool( clock: new SystemClock(), ), namespace: 'default', ttl: null, ); }); $jobProcessor = new JobProcessor($container);
调度作业
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\QueueInterface; class SomeService { public function createJob(QueueInterface $queue): void { $job = new Job( name: 'sample', payload: ['key' => 'value'], ); $queue->push($job); } }
您可以考虑将其中一个队列绑定到容器作为默认的QueueInterface
实现,否则您需要使用队列来在特定队列上分配任务。
use Tobento\Service\Queue\Job; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\QueueException; class SomeService { public function createJob(QueuesInterface $queues): void { $job = new Job(name: 'sample'); $queues->queue(name: 'secondary')->push($job); // throws QueueException if not exists. // or $queues->get(name: 'secondary')?->push($job); // or you may check if queue exists before: if ($queues->has(name: 'secondary')) { $queues->queue(name: 'secondary')->push($job); } } }
队列
内存队列
InMemoryQueue::class
确实将任务存储在内存中。
use Tobento\Service\Queue\InMemoryQueue; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\JobProcessorInterface; $queue = new InMemoryQueue( name: 'inmemeory', jobProcessor: $jobProcessor, // JobProcessorInterface priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
空队列
NullQueue::class
不会排队任何任务,因此任务根本不会处理。
use Tobento\Service\Queue\NullQueue; use Tobento\Service\Queue\QueueInterface; $queue = new NullQueue( name: 'null', priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
存储队列
StorageQueue::class
使用存储服务来存储任务。
首先,您需要安装存储服务
composer require tobento/service-storage
接下来,您可以安装时钟服务或使用其他实现
composer require tobento/service-clock
最后,创建队列
use Tobento\Service\Queue\Storage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Storage\StorageInterface; use Psr\Clock\ClockInterface; $queue = new Storage\Queue( name: 'storage', jobProcessor: $jobProcessor, // JobProcessorInterface storage: $storage, // StorageInterface clock: $clock, // ClockInterface table: 'jobs', priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
存储需要以下表列
同步队列
SyncQueue::class
会在不排队的情况下立即分配任务。
use Tobento\Service\Queue\SyncQueue; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\JobProcessorInterface; use Psr\EventDispatcher\EventDispatcherInterface; $queue = new SyncQueue( name: 'sync', jobProcessor: $jobProcessor, // JobProcessorInterface eventDispatcher: null, // null|EventDispatcherInterface priority: 100, ); var_dump($queue instanceof QueueInterface); // bool(true)
队列
默认队列
use Tobento\Service\Queue\Queues; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\QueueInterface; $queues = new Queues( $queue, // QueueInterface $anotherQueue, // QueueInterface ); var_dump($queues instanceof QueuesInterface); // bool(true) var_dump($queue instanceof QueueInterface); // bool(true)
懒队列
LazyQueues::class
仅在需要时创建队列。
use Tobento\Service\Queue\LazyQueues; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueFactoryInterface; use Tobento\Service\Queue\QueueFactory; use Tobento\Service\Queue\SyncQueue; use Tobento\Service\Queue\NullQueue; use Psr\Container\ContainerInterface; $queues = new LazyQueues( container: $container, // ContainerInterface queues: [ // using a factory: 'primary' => [ // factory must implement QueueFactoryInterface 'factory' => QueueFactory::class, 'config' => [ 'queue' => SyncQueue::class, 'priority' => 100, ], ], // using a closure: 'secondary' => static function (string $name, ContainerInterface $c): QueueInterface { // create queue ... return $queue; }, // or you may sometimes just create the queue (not lazy): 'null' => new NullQueue(name: 'null'), ], ); var_dump($queues instanceof QueuesInterface); // bool(true) var_dump($queue instanceof QueueInterface); // bool(true)
您可以查看队列工厂以了解更多信息。
队列工厂
队列工厂
use Tobento\Service\Queue\QueueFactory; use Tobento\Service\Queue\QueueFactoryInterface; use Tobento\Service\Queue\JobProcessorInterface; $factory = new QueueFactory( jobProcessor: $jobProcessor // JobProcessorInterface ); var_dump($factory instanceof QueueFactoryInterface); // bool(true)
查看Job Processor以了解更多信息。
创建队列
use Tobento\Service\Queue\InMemoryQueue; use Tobento\Service\Queue\NullQueue; use Tobento\Service\Queue\SyncQueue; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the queue you want to create: 'queue' => InMemoryQueue::class, //'queue' => NullQueue::class, //'queue' => SyncQueue::class, // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
存储队列工厂
use Tobento\Service\Queue\Storage\QueueFactory; use Tobento\Service\Queue\QueueFactoryInterface; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Database\DatabasesInterface; use Psr\Clock\ClockInterface; $factory = new QueueFactory( jobProcessor: $jobProcessor, // JobProcessorInterface clock: $clock, // ClockInterface databases: null, // null|DatabasesInterface ); var_dump($factory instanceof QueueFactoryInterface); // bool(true)
查看Job Processor以了解更多信息。
创建JsonFileStorage::class
队列
use Tobento\Service\Storage\JsonFileStorage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the table storage: 'table' => 'queue', // specify the storage: 'storage' => JsonFileStorage::class, 'dir' => 'home/private/storage/', // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
创建InMemoryStorage::class
队列
use Tobento\Service\Storage\InMemoryStorage; use Tobento\Service\Storage\PdoMySqlStorage; use Tobento\Service\Storage\PdoMariaDbStorage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the table storage: 'table' => 'queue', // specify the storage: 'storage' => InMemoryStorage::class, // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
创建PdoMySqlStorage::class
或PdoMariaDbStorage::class
队列
use Tobento\Service\Storage\PdoMySqlStorage; use Tobento\Service\Storage\PdoMariaDbStorage; use Tobento\Service\Queue\QueueInterface; use Tobento\Service\Queue\QueueException; $queue = $factory->createQueue(name: 'primary', config: [ // specify the table storage: 'table' => 'queue', // specify the storage: 'storage' => PdoMySqlStorage::class, //'storage' => PdoMariaDbStorage::class, // specify the name of the database used: 'database' => 'name', // you may specify a priority: 'priority' => 200, ]); var_dump($queue instanceof QueueInterface); // bool(true) // or throws QueueException on failure.
作业处理器
JobProcessor::class
负责处理任务。
use Tobento\Service\Queue\JobProcessor; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Queue\JobHandlerInterface; use Psr\Container\ContainerInterface; $jobProcessor = new JobProcessor( container: $container // ContainerInterface ); var_dump($jobProcessor instanceof JobProcessorInterface); // bool(true)
添加作业处理器
您可以添加针对命名任务的作业处理器。
use Tobento\Service\Queue\JobHandlerInterface; $jobProcessor->addJobHandler( name: 'sample', handler: SampleHandler::class, // string|JobHandlerInterface );
处理器示例
use Tobento\Service\Queue\JobHandlerInterface; use Tobento\Service\Queue\JobInterface; final class SampleHandler implements JobHandlerInterface { public function handleJob(JobInterface $job): void { // handle job } }
失败的作业处理器
FailedJobHandler::class
负责处理失败的作业。
use Tobento\Service\Queue\FailedJobHandler; use Tobento\Service\Queue\FailedJobHandlerInterface; use Tobento\Service\Queue\QueuesInterface; $handler = new FailedJobHandler( queues: $queues, // QueuesInterface ); var_dump($handler instanceof FailedJobHandlerInterface); // bool(true)
当失败的作业超过重试参数中定义的尝试次数后,作业将被丢弃。
您可以通过扩展 FailedJobHandler::class
并使用 finallyFailed
方法来处理最终失败的作业,将作业存储到数据库中或简单地记录它们。
use Tobento\Service\Queue\FailedJobHandler; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\JobInterface; use Psr\Log\LoggerInterface; class LogFailedJobHandler extends FailedJobHandler { public function __construct( protected null|QueuesInterface $queues = null, protected null|LoggerInterface $logger = null, ) {} protected function finallyFailed(JobInterface $job, \Throwable $e): void { if (is_null($this->logger)) { return; } $this->logger->error( sprintf('Job %s with the id %s failed: %s', $job->getName(), $job->getId(), $e->getMessage()), [ 'name' => $job->getName(), 'id' => $job->getId(), 'payload' => $job->getPayload(), 'parameters' => $job->parameters()->jsonSerialize(), 'exception' => $e, ] ); } /** * Handle exception thrown by the worker e.g. */ public function handleException(\Throwable $e): void { if (is_null($this->logger)) { return; } $this->logger->error( sprintf('Queue exception: %s', $e->getMessage()), [ 'exception' => $e, ] ); } }
工作者
Worker::class
处理队列中的作业。
use Tobento\Service\Queue\Worker; use Tobento\Service\Queue\QueuesInterface; use Tobento\Service\Queue\JobProcessorInterface; use Tobento\Service\Queue\FailedJobHandlerInterface; use Psr\EventDispatcher\EventDispatcherInterface; $worker = new Worker( queues: $queues, // QueuesInterface jobProcessor: $jobProcessor, // JobProcessorInterface failedJobHandler: $failedJobHandler, // null|FailedJobHandlerInterface eventDispatcher: $eventDispatcher, // null|EventDispatcherInterface );
运行中的工作者
use Tobento\Service\Queue\WorkerOptions; $status = $worker->run( // specify the name of the queue you wish to use. // If null, it uses all queues by its priority, highest first. queue: 'name', // null|string // specify the options: options: new WorkerOptions( // The maximum amount of RAM the worker may consume: memory: 128, // The maximum number of seconds a worker may run: timeout: 60, // The number of seconds to wait in between polling the queue: sleep: 3, // The maximum number of jobs to run, 0 (unlimited): maxJobs: 0, // Indicates if the worker should stop when the queue is empty: stopWhenEmpty: false, ), ); // you may exit: exit($status);
使用命令运行工作者
控制台
您可以使用以下命令通过控制台服务。
为了快速开始,请考虑使用以下两个应用程序包
否则,您需要安装控制台服务并自行设置控制台。
工作命令
从所有队列中运行作业
php app queue:work
仅从特定队列中运行作业
php app queue:work --queue=primary
可用选项
清除命令
删除队列中的所有作业
php app queue:clear
仅删除特定队列中的作业
php app queue:clear --queue=primary --queue=secondary
事件
可用事件
use Tobento\Service\Queue\Event;
只需确保您将事件分派器传递给您的工作进程!
了解更多
创建自定义作业参数
您可以通过扩展 Parameter::class
来创建自定义参数
use Tobento\Service\Queue\Parameter\Parameter; class SampleParameter extends Parameter { // }
可存储参数
通过实现 JsonSerializable
接口,您的参数将被存储并在处理作业时可用。
use Tobento\Service\Queue\Parameter\Parameter; use JsonSerializable; class SampleParameter extends Parameter implements JsonSerializable { public function __construct( private string $value, ) {} /** * Serializes the object to a value that can be serialized natively by json_encode(). * Will be used to create the parameter by the parameters factory. * So it must much its __construct method. * * @return array */ public function jsonSerialize(): array { return ['value' => $this->value]; } }
可失败接口
通过实现 Failable
接口,您可以处理失败的作业。
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Failable; use Tobento\Service\Queue\JobInterface; use Throwable; class SampleParameter extends Parameter implements Failable { /** * Returns the failed job handler. * * @return callable */ public function getFailedJobHandler(): callable { return [$this, 'processFailedJob']; } /** * Process failed job. * * @param JobInterface $job * @param Throwable $e * @param ... any parameters resolvable by your container. * @return void */ public function processFailedJob(JobInterface $job, Throwable $e): void { // } }
查看 Tobento\Service\Queue\Parameter\Delay::class
以查看其实现。
可弹出接口
通过实现 Poppable
接口,您可以在作业从队列中弹出后处理作业。
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Poppable; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\QueueInterface; use JsonSerializable; class SampleParameter extends Parameter implements Poppable, JsonSerializable { /** * Returns the popping job handler. * * @return callable */ public function getPoppingJobHandler(): callable { return [$this, 'poppingJob']; } /** * Popping job. * * @param JobInterface $job * @param QueueInterface $queue * @param ... any parameters resolvable by your container. * @return null|JobInterface */ public function poppingJob(JobInterface $job, QueueInterface $queue): null|JobInterface { // called after the job is popped from the queue. // If returning null, the job gets not processed. return $job; } /** * Implemented as the parameter gets stored. Otherwise popping job handler gets not executed. */ public function jsonSerialize(): array { return []; } }
查看 Tobento\Service\Queue\Parameter\Encrypt::class
以查看其实现。
可处理接口
通过实现 Processable
接口,您可以在处理作业时进行操作。
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Processable; use Tobento\Service\Queue\JobInterface; use JsonSerializable; class SampleParameter extends Parameter implements Processable, JsonSerializable { /** * Returns the before process job handler. * * @return null|callable */ public function getBeforeProcessJobHandler(): null|callable { return [$this, 'beforeProcessJob']; // or return null if not required } /** * Returns the after process job handler. * * @return null|callable */ public function getAfterProcessJobHandler(): null|callable { return [$this, 'afterProcessJob']; // or return null if not required } /** * Before process job handler. * * @param JobInterface $job * @return JobInterface */ public function beforeProcessJob(JobInterface $job): JobInterface { return $job; } /** * After process job handler. * * @param JobInterface $job * @return JobInterface */ public function afterProcessJob(JobInterface $job): JobInterface { return $job; } /** * Implemented as the parameter gets stored. Otherwise handlers gets not executed. */ public function jsonSerialize(): array { return []; } }
查看 Tobento\Service\Queue\Parameter\Duration::class
以查看其实现。
可推送接口
通过实现 Pushable
接口,您可以在作业被推送到队列之前进行处理。
use Tobento\Service\Queue\Parameter\Parameter; use Tobento\Service\Queue\Parameter\Pushable; use Tobento\Service\Queue\JobInterface; use Tobento\Service\Queue\QueueInterface; class SampleParameter extends Parameter implements Pushable { /** * Returns the pushing job handler. * * @return callable */ public function getPushingJobHandler(): callable { return [$this, 'pushingJob']; } /** * Pushing job. * * @param JobInterface $job * @param QueueInterface $queue * @param ... any parameters resolvable by your container. * @return JobInterface */ public function pushingJob(JobInterface $job, QueueInterface $queue): JobInterface { return $job; } }
查看 Tobento\Service\Queue\Parameter\PushingJob::class
以查看其实现。
可分作业示例
此示例展示了使用数据参数存储其处理数据来创建可分块作业的可能方法。
use Tobento\Service\Queue\CallableJob; use Tobento\Service\Queue\Parameter; use Tobento\Service\Queue\QueuesInterface; final class ChunkableJob extends CallableJob { public function handleJob( JobInterface $job, QueuesInterface $queues, // Repository $repository, ): void { if (! $job->parameters()->has(Parameter\Data::class)) { // first time running job: $data = new Parameter\Data([ //'total' => $repository->count(), 'total' => 500, 'number' => 100, 'offset' => 0, ]); $job->parameters()->add($data); } else { $data = $job->parameters()->get(Parameter\Data::class); } $total = $data->get(key: 'total', default: 0); $number = $data->get(key: 'number', default: 100); $offset = $data->get(key: 'offset', default: 0); // Handle Job: //$items = $repository->findAll(limit: [$number, $offset]); $items = range($offset, $number); // For demo we use range foreach($items as $item) { // do sth } // Update offset: $data->set(key: 'offset', value: $offset+$number); // Repush to queue if not finished: if ($offset < $total) { $queues->queue( name: $job->parameters()->get(Parameter\Queue::class)->name() )->push($job); } } public function getPayload(): array { return []; } }