sfcod / jobqueue

Symfony 的异步队列

安装次数: 5,171

依赖: 0

建议者: 0

安全性: 0

星标: 15

关注者: 2

分支: 5

公开问题: 2

类型: symfony-bundle

1.3.0 2022-12-16 13:54 UTC

README

Scrutinizer Code QualityCode Climate

为 Symfony 提供异步队列实现(使用 mongodb 作为主要存储)。

支持的驱动(存储)

配置

注册包配置和所有可用的 "Jobs"。

sfcod_queue:
    drivers:
        redis: 'SfCod\QueueBundle\Connector\RedisConnector'
    connections:
        default: { driver: 'redis', collection: 'queue_jobs', queue: 'default', expire: 360, limit: 2 }  

services:
#    _instanceof:
#        SfCod\QueueBundle\Base\JobInterface:
#            tags: ['sfcod.jobqueue.job']
    App\Job\:
        resource: '../src/Job/*'
        tags: ['sfcod.jobqueue.job']

添加任务到队列

创建自己的 "任务",实现 SfCod\QueueBundle\Base\JobInterface 并运行它

public function someFunc(JobQueue $jobQueue) {
    $data = [...];
    $jobQueue->push(YourJob::class, $data);
}

其中 $data 是任务的有效负载

命令

使用控制台命令运行工作守护进程

$ php bin/console job-queue:work
$ php bin/console job-queue:retry --id=<Job ID>
$ php bin/console job-queue:run-job <Job ID>

其中

  • work - 在循环中运行守护进程的命令;
  • retry - 将所有失败的任务移回队列,可以与 --id 参数一起使用以重试单个任务
  • run-job - 通过 ID 运行单个任务

可用事件

'job_queue_worker.raise_before_job': SfCod\QueueBundle\Event\JobProcessingEvent;
'job_queue_worker.raise_after_job': SfCod\QueueBundle\Event\JobProcessedEvent;
'job_queue_worker.raise_exception_occurred_job': SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
'job_queue_worker.raise_failed_job': SfCod\QueueBundle\Event\JobFailedEvent;
'job_queue_worker.stop': SfCod\QueueBundle\Event\WorkerStoppingEvent;

可配置的服务列表(带有默认参数)

JobQueue
SfCod\QueueBundle\Service\JobQueue:
    public: true
    arguments:
        - '@SfCod\QueueBundle\Service\QueueManager'

SfCod\QueueBundle\Service\JobQueue: 主要任务队列服务

Worker
SfCod\QueueBundle\Worker\Worker:
    arguments:
        - '@SfCod\QueueBundle\Service\QueueManager'
        - '@SfCod\QueueBundle\Service\JobProcess'
        - '@SfCod\QueueBundle\Failer\FailedJobProviderInterface'
        - '@SfCod\QueueBundle\Handler\ExceptionHandlerInterface'
        - '@Symfony\Component\EventDispatcher\EventDispatcherInterface'

SfCod\QueueBundle\Worker\Worker: "work" 命令的异步工作进程

JobProcess
SfCod\QueueBundle\Service\JobProcess:
    arguments:
        - 'console'
        - '%kernel.project_dir%/bin'
        - 'php'
        - ''

JobProcess: 异步队列中作业命令处理程序的默认配置,其中

  • 'console' - 控制台命令的名称
  • '%kernel.project_dir%/bin' - 控制台命令的路径
  • 'php' - 二进制脚本
  • '' - 二进制脚本的参数
Connector
SfCod\QueueBundle\Connector\ConnectorInterface:
    class: SfCod\QueueBundle\Connector\RedisConnector
    arguments:
        - '@SfCod\QueueBundle\Base\JobResolverInterface'
        - '@SfCod\QueueBundle\Base\RedisDriver'

SfCod\QueueBundle\Connector\ConnectorInterface: 队列数据库的连接器

Failer
SfCod\QueueBundle\Failer\FailedJobProviderInterface:
    class: SfCod\QueueBundle\Failer\RedisFailedJobProvider
    arguments:
        - '@SfCod\QueueBundle\Service\RedisDriver'
        - 'queue_jobs_failed'

SfCod\QueueBundle\Failer\FailedJobProviderInterface: 失败任务的存储

Job resolver
SfCod\QueueBundle\Base\JobResolverInterface:
    class: SfCod\QueueBundle\Service\JobResolver
    arguments:
        - '@Symfony\Component\DependencyInjection\ContainerInterface'

SfCod\QueueBundle\Base\JobResolverInterface: 作业解析器,它使用作业的显示名称构建作业,对于默认作业从容器中作为公共服务检索。

异常处理程序
SfCod\QueueBundle\Handler\ExceptionHandlerInterface:
    class: SfCod\QueueBundle\Handler\ExceptionHandler
    arguments:
        - '@Psr\Log\LoggerInterface'

SfCod\QueueBundle\Handler\ExceptionHandlerInterface: 主要异常处理程序,用于记录问题

测试

您可以使用预配的配置 xml 文件运行测试

php bin/phpunit --configuration ./vendor/sfcod/jobqueue/phpunit.xml.dist --bootstrap ./vendor/autoload.php