tobento/service-queue

用于处理后台作业的队列系统。

1.0.4 2024-09-28 13:09 UTC

This package is auto-updated.

Last update: 2024-09-28 13:10:26 UTC


README

用于处理后台作业的队列系统。

目录

入门

使用此命令添加运行此任务队列项目的最新版本。

composer require tobento/service-queue

要求

  • PHP 8.0 或更高版本

亮点

  • 框架无关,与任何项目兼容
  • 解耦设计

文档

创建作业

作业

您可以使用 Job::class 来创建作业。

使用命名作业

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;

$job = new Job(
    name: 'sample',
    payload: ['key' => 'value'],
);

var_dump($job instanceof JobInterface);
// bool(true)

接下来,您需要添加一个 作业处理器 来处理作业

使用作业处理器

首先,创建作业处理器

use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;

final class Mail implements JobHandlerInterface
{
    public function __construct(
        private MailerInterface $mailer,
        private MessageFacotryInterface $messageFactory,
    ) {}

    public function handleJob(JobInterface $job): void
    {
        $message = $this->messageFactory->createFromArray($job->getPayload());
        
        $this->mailer->send($message);
    }
    
    public static function toPayload(MessageInterface $message): array
    {
        return $message->jsonSerialize();
    }
}

最后,创建作业

use Tobento\Service\Queue\Job;

$job = new Job(
    name: Mail::class,
    payload: Mail::toPayload($message),
);

可调用作业

您可以使用 CallableJob::class 来创建作业。

类构造函数的参数必须是可选的 null|(type),如果它们不能由容器解决!

use Tobento\Service\Queue\CallableJob;
use Tobento\Service\Queue\JobInterface;

final class MailJob extends CallableJob
{
    public function __construct(
        private null|MessageInterface $message = null,
    ) {}

    public function handleJob(
        JobInterface $job,
        MailerInterface $mailer,
        MessageFacotryInterface $messageFactory,
    ): void {
        $message = $messageFactory->createFromArray($job->getPayload());
        
        $mailer->send($message);
    }
    
    public function getPayload(): array
    {
        if (is_null($this->message)) {
            return []; // or throw exception
        }
        
        return $this->message->jsonSerialize();
    }
    
    public function renderTemplate(): static
    {
        // render template logic ...
        return $this;
    }
}

创建作业

$job = (new MailJob($message))
    ->renderTemplate();

作业参数

您可以使用提供的参数提供基本作业功能,或者 创建自定义参数 来添加新功能或定制现有功能以满足您的需求。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Duration(seconds: 10))
    ->parameter(new Parameter\Retry(max: 2));

参数辅助方法

作业可调用作业 支持以下辅助方法

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;

$job = (new Job(name: 'sample'))
    ->queue(name: 'secondary')
    ->data(['key' => 'value'])
    ->duration(seconds: 10)
    ->retry(max: 2)
    ->delay(seconds: 5)
    ->unique()
    ->priority(100)
    ->pushing(function() {})
    ->encrypt();

如果您使用的是 可调用作业,您可以使用 __construct 方法指定默认参数

use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\Parameter;

final class SampleJob extends CallableJob
{
    public function __construct()
    {
        $this->duration(seconds: 10);
        $this->retry(max: 2);
        
        // or using its classes:
        $this->parameter(new Parameter\Priority(100));
    }

    //...
}

延迟参数

使用延迟参数设置作业需要延迟的秒数。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Delay(seconds: 60))
    // or using helper method:
    ->delay(seconds: 60);

支持延迟的队列

数据参数

使用数据参数添加额外的作业数据。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Data(['key' => 'value']))
    // or using helper method:
    ->data(['key' => 'value']);

持续时间参数

使用持续时间参数设置作业需要处理的近似时间。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Duration(seconds: 10))
    // or using helper method:
    ->duration(seconds: 10);

失败的作业处理器 如果作业无法运行,将重新排队作业以防止超时。

加密参数

加密参数使用 服务加密 来加密作业数据。

它将加密以下数据

首先,安装服务

composer require tobento/service-encryption

然后,将加密器绑定到 作业处理器 使用的容器

示例使用服务容器作为容器

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Tobento\Service\Encryption\EncrypterInterface;

$container = new Container();
$container->set(EncrypterInterface::class, function() {
    // create enrcypter:
    return $encrypter;
});

$jobProcessor = new JobProcessor($container);

查看加密实现部分以了解更多信息。

最后,将参数添加到您的任务中

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Encrypt())
    // or using helper method:
    ->encrypt();

您可以创建一个自定义加密参数来使用另一个加密器或自定义加密。

监控参数

监控参数由Worker添加,可以用于记录有关任务的数据,例如运行时间(秒)和内存使用情况。例如,该参数由Work Command用于将其数据写入控制台。

use Tobento\Service\Queue\Parameter\Monitor;

if ($job->parameters()->has(Monitor::class)) {
    $monitor = $job->parameters()->get(Monitor::class);
    $runtimeInSeconds = $monitor->runtimeInSeconds();
    $memoryUsage = $monitor->memoryUsage();
}

优先级参数

使用优先级参数指定任务的优先级。优先级更高的任务将优先处理。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Priority(100))
    // or using helper method:
    ->priority(100);

推送参数

使用推送参数指定在任务推送到队列之前执行的处理器。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\Parameter;
    
$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Pushing(
        handler: function(JobInterface $job, AnyResolvableClass $foo): JobInterface {
            return $job;
        },
        
        // you may set a priority. Higher gets executed first:
        priority: 100, // 0 is default
    ))
    
    // or using helper method:
    ->pushing(handler: function() {}, priority: 100);

队列参数

使用队列参数指定将任务推送到的队列。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Queue(name: 'secondary'))
    // or using helper method:
    ->queue(name: 'secondary');

当任务推送到队列时,Job Processor会自动添加此参数。

重试参数

使用重试参数指定最大重试次数。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Retry(max: 2))
    // or using helper method:
    ->retry(max: 2);

Failed Job Handler使用此参数处理重试。

唯一参数

唯一参数将防止在另一个实例的任务处于队列或处理状态时,任何新的重复任务进入队列。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\Unique(
        // A unique id. If null it uses the job id.
        id: null, // null|string
    ))
    // or using helper method:
    ->unique(id: null);

该参数需要将CacheInterface::class绑定到传递给JobProcessor的容器中。

示例使用缓存服务容器服务

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Psr\SimpleCache\CacheInterface;
use Tobento\Service\Cache\Simple\Psr6Cache;
use Tobento\Service\Cache\ArrayCacheItemPool;
use Tobento\Service\Clock\SystemClock;

$container = new Container();
$container->set(CacheInterface::class, function() {
    // create cache:
    return new Psr6Cache(
        pool: new ArrayCacheItemPool(
            clock: new SystemClock(),
        ),
        namespace: 'default',
        ttl: null,
    );
});

$jobProcessor = new JobProcessor($container);

无重叠参数

如果您添加了无重叠参数,则任务将一次只处理一次,以防止重叠。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\Parameter;

$job = (new Job(name: 'sample'))
    ->parameter(new Parameter\WithoutOverlapping(
        // A unique id. If null it uses the job id.
        id: null, // null|string
    ))
    // or using helper method:
    ->withoutOverlapping(id: null);

该参数需要将CacheInterface::class绑定到传递给JobProcessor的容器中。

示例使用缓存服务容器服务

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Container\Container;
use Psr\SimpleCache\CacheInterface;
use Tobento\Service\Cache\Simple\Psr6Cache;
use Tobento\Service\Cache\ArrayCacheItemPool;
use Tobento\Service\Clock\SystemClock;

$container = new Container();
$container->set(CacheInterface::class, function() {
    // create cache:
    return new Psr6Cache(
        pool: new ArrayCacheItemPool(
            clock: new SystemClock(),
        ),
        namespace: 'default',
        ttl: null,
    );
});

$jobProcessor = new JobProcessor($container);

调度作业

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\QueueInterface;

class SomeService
{
    public function createJob(QueueInterface $queue): void
    {
        $job = new Job(
            name: 'sample',
            payload: ['key' => 'value'],
        );

        $queue->push($job);
    }
}

您可以考虑将其中一个队列绑定到容器作为默认的QueueInterface实现,否则您需要使用队列来在特定队列上分配任务。

use Tobento\Service\Queue\Job;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueException;

class SomeService
{
    public function createJob(QueuesInterface $queues): void
    {
        $job = new Job(name: 'sample');
        
        $queues->queue(name: 'secondary')->push($job);
        // throws QueueException if not exists.
        
        // or
        $queues->get(name: 'secondary')?->push($job);
        
        // or you may check if queue exists before:
        if ($queues->has(name: 'secondary')) {
            $queues->queue(name: 'secondary')->push($job);
        }
    }
}

队列

内存队列

InMemoryQueue::class确实将任务存储在内存中。

use Tobento\Service\Queue\InMemoryQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;

$queue = new InMemoryQueue(
    name: 'inmemeory',
    jobProcessor: $jobProcessor, // JobProcessorInterface
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

空队列

NullQueue::class不会排队任何任务,因此任务根本不会处理。

use Tobento\Service\Queue\NullQueue;
use Tobento\Service\Queue\QueueInterface;

$queue = new NullQueue(
    name: 'null',
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

存储队列

StorageQueue::class使用存储服务来存储任务。

首先,您需要安装存储服务

composer require tobento/service-storage

接下来,您可以安装时钟服务或使用其他实现

composer require tobento/service-clock

最后,创建队列

use Tobento\Service\Queue\Storage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Storage\StorageInterface;
use Psr\Clock\ClockInterface;

$queue = new Storage\Queue(
    name: 'storage',
    jobProcessor: $jobProcessor, // JobProcessorInterface
    storage: $storage, // StorageInterface
    clock: $clock, // ClockInterface
    table: 'jobs',
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

存储需要以下表列

同步队列

SyncQueue::class会在不排队的情况下立即分配任务。

use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Psr\EventDispatcher\EventDispatcherInterface;

$queue = new SyncQueue(
    name: 'sync',
    jobProcessor: $jobProcessor, // JobProcessorInterface
    eventDispatcher: null, // null|EventDispatcherInterface
    priority: 100,
);

var_dump($queue instanceof QueueInterface);
// bool(true)

队列

默认队列

use Tobento\Service\Queue\Queues;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueInterface;

$queues = new Queues(
    $queue, // QueueInterface
    $anotherQueue, // QueueInterface
);

var_dump($queues instanceof QueuesInterface);
// bool(true)

var_dump($queue instanceof QueueInterface);
// bool(true)

懒队列

LazyQueues::class仅在需要时创建队列。

use Tobento\Service\Queue\LazyQueues;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\QueueFactory;
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\NullQueue;
use Psr\Container\ContainerInterface;

$queues = new LazyQueues(
    container: $container, // ContainerInterface
    queues: [
        // using a factory:
        'primary' => [
            // factory must implement QueueFactoryInterface
            'factory' => QueueFactory::class,
            'config' => [
                'queue' => SyncQueue::class,
                'priority' => 100,
            ],
        ],
        
        // using a closure:
        'secondary' => static function (string $name, ContainerInterface $c): QueueInterface {
            // create queue ...
            return $queue;
        },
        
        // or you may sometimes just create the queue (not lazy):
        'null' => new NullQueue(name: 'null'),
    ],
);

var_dump($queues instanceof QueuesInterface);
// bool(true)

var_dump($queue instanceof QueueInterface);
// bool(true)

您可以查看队列工厂以了解更多信息。

队列工厂

队列工厂

use Tobento\Service\Queue\QueueFactory;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\JobProcessorInterface;

$factory = new QueueFactory(
    jobProcessor: $jobProcessor // JobProcessorInterface
);

var_dump($factory instanceof QueueFactoryInterface);
// bool(true)

查看Job Processor以了解更多信息。

创建队列

use Tobento\Service\Queue\InMemoryQueue;
use Tobento\Service\Queue\NullQueue;
use Tobento\Service\Queue\SyncQueue;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the queue you want to create:
    'queue' => InMemoryQueue::class,
    //'queue' => NullQueue::class,
    //'queue' => SyncQueue::class,
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

存储队列工厂

use Tobento\Service\Queue\Storage\QueueFactory;
use Tobento\Service\Queue\QueueFactoryInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Database\DatabasesInterface;
use Psr\Clock\ClockInterface;

$factory = new QueueFactory(
    jobProcessor: $jobProcessor, // JobProcessorInterface
    clock: $clock, // ClockInterface
    databases: null, // null|DatabasesInterface
);

var_dump($factory instanceof QueueFactoryInterface);
// bool(true)

查看Job Processor以了解更多信息。

创建JsonFileStorage::class队列

use Tobento\Service\Storage\JsonFileStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the table storage:
    'table' => 'queue',
    
    // specify the storage:
    'storage' => JsonFileStorage::class,
    'dir' => 'home/private/storage/',
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

创建InMemoryStorage::class队列

use Tobento\Service\Storage\InMemoryStorage;
use Tobento\Service\Storage\PdoMySqlStorage;
use Tobento\Service\Storage\PdoMariaDbStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the table storage:
    'table' => 'queue',
    
    // specify the storage:
    'storage' => InMemoryStorage::class,
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

创建PdoMySqlStorage::classPdoMariaDbStorage::class队列

use Tobento\Service\Storage\PdoMySqlStorage;
use Tobento\Service\Storage\PdoMariaDbStorage;
use Tobento\Service\Queue\QueueInterface;
use Tobento\Service\Queue\QueueException;

$queue = $factory->createQueue(name: 'primary', config: [
    // specify the table storage:
    'table' => 'queue',
    
    // specify the storage:
    'storage' => PdoMySqlStorage::class,
    //'storage' => PdoMariaDbStorage::class,
    
    // specify the name of the database used:
    'database' => 'name',
    
    // you may specify a priority:
    'priority' => 200,
]);

var_dump($queue instanceof QueueInterface);
// bool(true)
// or throws QueueException on failure.

作业处理器

JobProcessor::class负责处理任务。

use Tobento\Service\Queue\JobProcessor;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Queue\JobHandlerInterface;
use Psr\Container\ContainerInterface;

$jobProcessor = new JobProcessor(
    container: $container // ContainerInterface
);

var_dump($jobProcessor instanceof JobProcessorInterface);
// bool(true)

添加作业处理器

您可以添加针对命名任务的作业处理器。

use Tobento\Service\Queue\JobHandlerInterface;

$jobProcessor->addJobHandler(
    name: 'sample',
    handler: SampleHandler::class, // string|JobHandlerInterface
);

处理器示例

use Tobento\Service\Queue\JobHandlerInterface;
use Tobento\Service\Queue\JobInterface;

final class SampleHandler implements JobHandlerInterface
{
    public function handleJob(JobInterface $job): void
    {
        // handle job
    }
}

失败的作业处理器

FailedJobHandler::class负责处理失败的作业。

use Tobento\Service\Queue\FailedJobHandler;
use Tobento\Service\Queue\FailedJobHandlerInterface;
use Tobento\Service\Queue\QueuesInterface;

$handler = new FailedJobHandler(
    queues: $queues, // QueuesInterface
);

var_dump($handler instanceof FailedJobHandlerInterface);
// bool(true)

当失败的作业超过重试参数中定义的尝试次数后,作业将被丢弃。

您可以通过扩展 FailedJobHandler::class 并使用 finallyFailed 方法来处理最终失败的作业,将作业存储到数据库中或简单地记录它们。

use Tobento\Service\Queue\FailedJobHandler;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\JobInterface;
use Psr\Log\LoggerInterface;

class LogFailedJobHandler extends FailedJobHandler
{
    public function __construct(
        protected null|QueuesInterface $queues = null,
        protected null|LoggerInterface $logger = null,
    ) {}
    
    protected function finallyFailed(JobInterface $job, \Throwable $e): void
    {
        if (is_null($this->logger)) {
            return;
        }

        $this->logger->error(
            sprintf('Job %s with the id %s failed: %s', $job->getName(), $job->getId(), $e->getMessage()),
            [
                'name' => $job->getName(),
                'id' => $job->getId(),
                'payload' => $job->getPayload(),
                'parameters' => $job->parameters()->jsonSerialize(),
                'exception' => $e,
            ]
        );
    }
    
    /**
     * Handle exception thrown by the worker e.g.
     */
    public function handleException(\Throwable $e): void
    {
        if (is_null($this->logger)) {
            return;
        }

        $this->logger->error(
            sprintf('Queue exception: %s', $e->getMessage()),
            [
                'exception' => $e,
            ]
        );
    }
}

工作者

Worker::class 处理队列中的作业。

use Tobento\Service\Queue\Worker;
use Tobento\Service\Queue\QueuesInterface;
use Tobento\Service\Queue\JobProcessorInterface;
use Tobento\Service\Queue\FailedJobHandlerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;

$worker = new Worker(
    queues: $queues, // QueuesInterface
    jobProcessor: $jobProcessor, // JobProcessorInterface
    failedJobHandler: $failedJobHandler, // null|FailedJobHandlerInterface
    eventDispatcher: $eventDispatcher, // null|EventDispatcherInterface
);

运行中的工作者

use Tobento\Service\Queue\WorkerOptions;

$status = $worker->run(
    // specify the name of the queue you wish to use.
    // If null, it uses all queues by its priority, highest first.
    queue: 'name', // null|string
    
    // specify the options:
    options: new WorkerOptions(
        // The maximum amount of RAM the worker may consume:
        memory: 128,
        
        // The maximum number of seconds a worker may run:
        timeout: 60,
        
        // The number of seconds to wait in between polling the queue:
        sleep: 3,
        
        // The maximum number of jobs to run, 0 (unlimited):
        maxJobs: 0,
        
        // Indicates if the worker should stop when the queue is empty:
        stopWhenEmpty: false,
    ),
);

// you may exit:
exit($status);

使用命令运行工作者

如果您想通过命令运行工作进程,请查看控制台工作命令部分。

控制台

您可以使用以下命令通过控制台服务

为了快速开始,请考虑使用以下两个应用程序包

否则,您需要安装控制台服务并自行设置控制台。

工作命令

从所有队列中运行作业

php app queue:work

仅从特定队列中运行作业

php app queue:work --queue=primary

可用选项

清除命令

删除队列中的所有作业

php app queue:clear

仅删除特定队列中的作业

php app queue:clear --queue=primary --queue=secondary

事件

可用事件

use Tobento\Service\Queue\Event;

只需确保您将事件分派器传递给您的工作进程

了解更多

创建自定义作业参数

您可以通过扩展 Parameter::class 来创建自定义参数

use Tobento\Service\Queue\Parameter\Parameter;

class SampleParameter extends Parameter
{
    //
}

可存储参数

通过实现 JsonSerializable 接口,您的参数将被存储并在处理作业时可用。

use Tobento\Service\Queue\Parameter\Parameter;
use JsonSerializable;

class SampleParameter extends Parameter implements JsonSerializable
{
    public function __construct(
        private string $value,
    ) {}
    
    /**
     * Serializes the object to a value that can be serialized natively by json_encode().
     * Will be used to create the parameter by the parameters factory.
     * So it must much its __construct method.
     *
     * @return array
     */
    public function jsonSerialize(): array
    {
        return ['value' => $this->value];
    }
}

可失败接口

通过实现 Failable 接口,您可以处理失败的作业。

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Failable;
use Tobento\Service\Queue\JobInterface;
use Throwable;

class SampleParameter extends Parameter implements Failable
{
    /**
     * Returns the failed job handler.
     *
     * @return callable
     */
    public function getFailedJobHandler(): callable
    {
        return [$this, 'processFailedJob'];
    }
    
    /**
     * Process failed job.
     *
     * @param JobInterface $job
     * @param Throwable $e
     * @param ... any parameters resolvable by your container.
     * @return void
     */
    public function processFailedJob(JobInterface $job, Throwable $e): void
    {
        //
    }
}

查看 Tobento\Service\Queue\Parameter\Delay::class 以查看其实现。

可弹出接口

通过实现 Poppable 接口,您可以在作业从队列中弹出后处理作业。

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Poppable;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\QueueInterface;
use JsonSerializable;

class SampleParameter extends Parameter implements Poppable, JsonSerializable
{
    /**
     * Returns the popping job handler.
     *
     * @return callable
     */
    public function getPoppingJobHandler(): callable
    {
        return [$this, 'poppingJob'];
    }
    
    /**
     * Popping job.
     *
     * @param JobInterface $job
     * @param QueueInterface $queue
     * @param ... any parameters resolvable by your container.
     * @return null|JobInterface
     */
    public function poppingJob(JobInterface $job, QueueInterface $queue): null|JobInterface
    {
        // called after the job is popped from the queue.
        // If returning null, the job gets not processed.
        return $job;
    }
    
    /**
     * Implemented as the parameter gets stored. Otherwise popping job handler gets not executed.
     */
    public function jsonSerialize(): array
    {
        return [];
    }
}

查看 Tobento\Service\Queue\Parameter\Encrypt::class 以查看其实现。

可处理接口

通过实现 Processable 接口,您可以在处理作业时进行操作。

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Processable;
use Tobento\Service\Queue\JobInterface;
use JsonSerializable;

class SampleParameter extends Parameter implements Processable, JsonSerializable
{
    /**
     * Returns the before process job handler.
     *
     * @return null|callable
     */
    public function getBeforeProcessJobHandler(): null|callable
    {
        return [$this, 'beforeProcessJob'];
        // or return null if not required
    }
    
    /**
     * Returns the after process job handler.
     *
     * @return null|callable
     */
    public function getAfterProcessJobHandler(): null|callable
    {
        return [$this, 'afterProcessJob'];
        // or return null if not required
    }
    
    /**
     * Before process job handler.
     *
     * @param JobInterface $job
     * @return JobInterface
     */
    public function beforeProcessJob(JobInterface $job): JobInterface
    {
        return $job;
    }
    
    /**
     * After process job handler.
     *
     * @param JobInterface $job
     * @return JobInterface
     */
    public function afterProcessJob(JobInterface $job): JobInterface
    {
        return $job;
    }
    
    /**
     * Implemented as the parameter gets stored. Otherwise handlers gets not executed.
     */
    public function jsonSerialize(): array
    {
        return [];
    }
}

查看 Tobento\Service\Queue\Parameter\Duration::class 以查看其实现。

可推送接口

通过实现 Pushable 接口,您可以在作业被推送到队列之前进行处理。

use Tobento\Service\Queue\Parameter\Parameter;
use Tobento\Service\Queue\Parameter\Pushable;
use Tobento\Service\Queue\JobInterface;
use Tobento\Service\Queue\QueueInterface;

class SampleParameter extends Parameter implements Pushable
{
    /**
     * Returns the pushing job handler.
     *
     * @return callable
     */
    public function getPushingJobHandler(): callable
    {
        return [$this, 'pushingJob'];
    }
    
    /**
     * Pushing job.
     *
     * @param JobInterface $job
     * @param QueueInterface $queue
     * @param ... any parameters resolvable by your container.
     * @return JobInterface
     */
    public function pushingJob(JobInterface $job, QueueInterface $queue): JobInterface
    {
        return $job;
    }
}

查看 Tobento\Service\Queue\Parameter\PushingJob::class 以查看其实现。

可分作业示例

此示例展示了使用数据参数存储其处理数据来创建可分块作业的可能方法。

use Tobento\Service\Queue\CallableJob;
use Tobento\Service\Queue\Parameter;
use Tobento\Service\Queue\QueuesInterface;

final class ChunkableJob extends CallableJob
{
    public function handleJob(
        JobInterface $job,
        QueuesInterface $queues,
        // Repository $repository,
    ): void {
        if (! $job->parameters()->has(Parameter\Data::class)) {
            // first time running job:
            $data = new Parameter\Data([
                //'total' => $repository->count(),
                'total' => 500,
                'number' => 100,
                'offset' => 0,
            ]);
            
            $job->parameters()->add($data);
        } else {
            $data = $job->parameters()->get(Parameter\Data::class);
        }
        
        $total = $data->get(key: 'total', default: 0);
        $number = $data->get(key: 'number', default: 100);
        $offset = $data->get(key: 'offset', default: 0);
                
        // Handle Job:
        //$items = $repository->findAll(limit: [$number, $offset]);
        $items = range($offset, $number); // For demo we use range
        
        foreach($items as $item) {
            // do sth
        }
        
        // Update offset:
        $data->set(key: 'offset', value: $offset+$number);
        
        // Repush to queue if not finished:
        if ($offset < $total) {
            $queues->queue(
                name: $job->parameters()->get(Parameter\Queue::class)->name()
            )->push($job);
        }
    }
    
    public function getPayload(): array
    {
        return [];
    }    
}

鸣谢