quitoque/queue-bundle

Symfony2/3/4 队列包(用于后台任务)支持Mongo(Doctrine ODM)、Mysql(及任何Doctrine ORM)、RabbitMQ、Beanstalkd、Redis,以及... {编写自己的}

安装次数: 7,003

依赖项: 0

建议者: 0

安全: 0

星标: 0

关注者: 3

分支: 34

类型:symfony-bundle

4.10.7 2020-07-16 08:55 UTC

README

Build Status Scrutinizer Code Quality Code Coverage SensioLabsInsight

允许symfony开发者像这样轻松创建后台任务:$worker->later()->process(1,2,3)

4.0 版本发布

查看 更改

从3.0升级: 查看UPGRADING-4.0.md

支持的队列

  • MongoDB通过Doctrine-ODM
  • Mysql / Doctrine 2支持的数据库通过Doctrine-ORM
  • Beanstalkd通过pheanstalk
  • RabbitMQ通过php-amqplib
  • Redis支持通过Predis或PhpRedis,或者通过SncRedisBundle

Trends

简介

此包提供了一种轻松创建和管理队列后台任务的方法

基本功能

  • 易于使用
    • 使用一行或两行代码启动后台任务
    • 轻松添加后台工作服务
      • 将任何代码转换为后台任务只需几行
  • 作业的原子操作
    • 对于基于ORM的队列,这是通过 不依赖事务 来实现的。
  • 管理界面
    • 基于Web的管理界面,可选的性能图表
  • 命令行界面
    • 用于从控制台运行、管理和调试作业的命令
  • 作业归档
    • ORM和ODM管理器具有内置的作业归档功能,用于完成作业
  • 记录工作进程的错误
  • 针对诸如停滞作业、异常作业等各种安全检查
    • 允许通过控制台命令重置停滞和异常作业
  • 内置事件调度器

作业特定功能

  • 失败或异常时的自动重试
    • 如果作业以失败代码退出,可以自动重试
    • 如果需要,相同适用于异常
  • 优先级
    • 作业可以有优先级级别,以便即使是在队列中较晚添加的较高优先级作业也可以首先处理。
  • 未来作业(ODM / ORM / Redis)
    • 作业可以安排在未来某个时间运行
  • 批量
    • 作业可以被“批量”处理,这样即使有多个相同类型的作业排队,也只运行一个作业。
  • 过期
    • 作业可以有一个“过期”时间,这样它们就不会在某个时间点之后运行。
      • (如果队列积压并且作业在某个时间点之后毫无价值,这很有用)
  • 停滞(ODM / ORM)
    • 可以检测到使解释器崩溃或因其他原因被终止的作业
      • 这些作业可以被重新排队以供将来运行。

安装

Symfony 2/3

查看 /Resources/doc/symfony2-3.md

Symfony 4

查看 /Resources/doc/symfony4.md

用法

创建一个工作类,该类将处理后台作业。

示例

  • src/Worker/FibonacciWorker.php: (symfony 4)
  • src/AppBundle/Worker/FibonacciWorker.php: (symfony 2/3)
<?php
namespace App\Worker; // for symfony 2/3, the namespace would typically be AppBundle\Worker

class FibonacciWorker
    extends \Dtc\QueueBundle\Model\Worker
{
    private $filename;
    public function __construct() {
        $this->filename = '/tmp/fib-result.txt';
    }

    public function fibonacciFile($n) {
        $feb = $this->fibonacci($n);
        file_put_contents($this->filename, "{$n}: {$feb}");
    }


    public function fibonacci($n)
    {
        if($n == 0)
            return 0; //F0
        elseif ($n == 1)
            return 1; //F1
        else
            return $this->fibonacci($n - 1) + $this->fibonacci($n - 2);
    }

    public function getName() {
        return 'fibonacci';
    }

    public function getFilename()
    {
        return $this->filename;
    }
}

为作业创建一个DI服务,并将其标记为后台工作进程。

YAML

Symfony 4 和 3.3, 3.4

services:
    # for symfony 3 the class name would likely be AppBundle\Worker\FibonacciWorker
    App\Worker\FibonacciWorker:
        # public: false is possible if you completely use DependencyInjection for access to the service
        public: true
        tags:
            - { name: "dtc_queue.worker" }

Symfony 2, 以及 3.0, 3.1, 3.2

services:
    app.worker.fibonacci:
        class: AppBundle\Worker\FibonacciWorker:
        tags:
            - { name: "dtc_queue.worker" }
XML
<services>
	<!-- ... -->
	<service id="fibonacci_worker" class="FibonacciWorker">
	    <tag name="dtc_queue.worker" />
	</service>
	<!-- ... -->
</services>

创建作业

// Dependency inject the worker or fetch it from the container
$fibonacciWorker = $container->get('App\Worker\FibonacciWorker');

// For Symfony 3.3, 3.4
//     $fibonacciWorker = $container->get('AppBundle\Worker\FibonacciWorker');
//

// For Symfony 2, 3.0, 3.1, 3.2:
//     $fibonacciWorker = $container->get('app.worker.fibonacci');


// Basic Examples
$fibonacciWorker->later()->fibonacci(20);
$fibonacciWorker->later()->fibonacciFile(20);

// Batch Example
$fibonacciWorker->batchLater()->fibonacci(20); // Batch up runs into a single run

// Timed Example
$fibonacciWorker->later(90)->fibonacci(20); // Run 90 seconds later

// Priority
//    Note: whether 1 == High or Low priority is configurable, but by default it is High
$fibonacciWorker->later(0, 1); // As soon as possible, High priority
$fibonacciWorker->later(0, 125); // Medium priority
$fibonacciWorker->later(0, 255); // Low priority

// Advanced Usage Example:
//  (If the job is not processed by $expireTime, then don't execute it ever...)
$expireTime = time() + 3600;
$fibonacciWorker->later()->setExpiresAt(new \DateTime("@$expireTime"))->fibonacci(20); // Must be run within the hour or not at all
创建作业 - 其他信息

有关创建作业的进一步说明,包括如何在 命令行 中创建作业,请参阅

/Resources/doc/create-job.md

运行作业

建议您将以下控制台命令放入后台执行

bin/console dtc:queue:run -d 120
# the -d parameter is the number of seconds to run
#  For example you could put the above command into cron or a cron-like system to run every 2 minutes
#
# There are a number of other parameters that could be passed to dtc:queue:run run this for a full list:
bin/console dtc:queue:run --help

修剪作业

对于基于ODM和ORM的存储,存档表和常规作业队列可能需要定期修剪。

对于生产中的Mongo,使用固定集合TTL索引可能是明智的。

对于MySQL,您可以创建一个事件定期删除数据。

尽管如此,还有一些命令可以执行类似操作(也可以放入周期性的cron作业中)

bin/console dtc:queue:prune old --older 1m
# (deletes jobs older than one month from the Archive table)

# May be needed if jobs stall out:
bin/console dtc:queue:prune stalled

# If you're recording runs...this is recommended:
bin/console dtc:queue:prune stalled_runs

# If you're recording runs...another recommendation
bin/console dtc:queue:prune old_runs --older 1m

# If you're recording timings
bin/console dtc:queue:prune old_job_timings --older 1m

# You can tune 1m to a smaller interval such as 10d (10 days) or even 1800s (1/2 hour)
#  if you have too many jobs flowing through the system.
bin/console dtc:queue:prune --help # lists other prune commands

调试

这些命令可能有助于调试队列问题

bin/console dtc:queue:count # some status about the queue if available (ODM/ORM only)
bin/console dtc:queue:reset # resets errored and/or stalled jobs

# This is really only good for
bin/console dtc:queue:run --id={jobId}

# (jobId could be obtained from mongodb / or your database, if using an ORM / ODM solution)

跟踪运行情况

每次运行都可以在ORM/ODM支持的数据存储中的表中跟踪。

配置方法: app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    manager:
        # run defaults to whatever job is set to (which defaults to "odm", i.e. mongodb)
        #   If you set the job to rabbit_mq, or beanstalkd or something else, you need to set run
        #   to an ORM / ODM run_manager (or a custom such one) in order to get the runs to save
        #
        run: orm # other possible option is "odm" (i.e. mongodb)
    #
    # (optionally define your own run manager with id: dtc_queue.manager.run.{some_name} and put {some_name} under run:

MongoDB DocumentManager

更改文档管理器

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    odm:
        document_manager: {something} # default is "default"

MySQL / ORM 设置

从4.0开始,ORM需要启用bcmath扩展

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    manager:
       job: orm

更改EntityManager

dtc_queue:
    orm:
        entity_manager: {something} # default is "default"

注意:如果您未启用自动映射,可能需要在config.yml中的映射部分添加DtcQueueBundle。

doctrine:
   #...
   orm:
       #...
       mappings:
           DtcQueueBundle: ~

关于非ORM设置的说明

如果您计划使用ODM或Redis或其他配置,但已在其他地方启用了Doctrine ORM,建议您使用schema_filter配置参数,以便模式转储和/或迁移差异不包含那些表(请参阅问题#77)。

例如。

doctrine:
   # ...
   dbal:
       # ...
       schema_filter: ~^(?!dtc_)~

(如果您已经有一个schema_filter,则只需将“dtc_”前缀添加到其中。)

Beanstalk配置

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    beanstalkd:
        host: beanstalkd
        tube: some-tube-name [optional]
    manager:
        job: beanstalkd

RabbitMQ 配置

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    manager:
        job: rabbit_mq
    rabbit_mq:
        host: rabbitmq
        port: 5672
        user: guest
        password: guest
        vhost: "/" [optional defaults to "/"]
        ssl: [optional defaults to false - toggles to use AMQPSSLConnection]
        options: [optional options to pass to AMQPStreamConnection or AMQPSSLConnection]
        ssl_options: [optional extra ssl options to pass to AMQPSSLConnection]
        queue_args: [optional]
            queue: [optional queue name]
            passive: [optional defaults to false]
            durable: [optional defaults to true]
            exlusive: [optional defaults to false]
            auto_delete: [optional defaults to false]
        exchange_args: [optional]
            exchange: [optional queue name]
            type: [optional defaults to "direct"]
            passive: [optional defaults to false]
            durable: [optional defaults to true]
            auto_delete: [optional defaults to false]

Redis 配置

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    manager:
        job: redis
    redis:
        # choose one of the below snc_redis, predis, or phpredis
        snc_redis:
           type: predis
           alias: default
        predis:
            # choose one of dns or connection_parameters
            dsn: redis://
            connection_parameters:
                scheme: tcp
                host: localhost
                port: 6379
                path: ~
                database: ~
                password: ~
                async: false
                persistent: false
                timeout: 5.0
                read_write_timeout: ~
                alias: ~
                weight: ~
                iterable_multibulk: false
                throw_errors: true
        phpredis:
            # minimum fill host and port if needed
            host: localhost
            port: 6379
            timeout: 0
            retry_interval: ~
            read_timeout: 0
            auth: ~

自定义任务和管理器

app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)

dtc_queue:
    class_job: Some\Job\ClassName [optional]
    manager:
        job: some_name [optional]
    # (create your own manager service and name or alias it:
    #   dtc_queue.manager.job.<some_name> and put
    #   <some_name> in the manager: job field above)

重命名数据库或表名

  1. 扩展以下内容
Dtc\QueueBundle\Document\Job
Dtc\QueueBundle\Document\JobArchive

或者

Dtc\QueueBundle\Entity\Job
Dtc\QueueBundle\Entity\JobArchive

(根据您是否使用 MongoDB 或 ORM)

  1. 适当地更改类的参数
<?php
namespace App\Entity; // Or whatever

use Dtc\QueueBundle\Entity\Job as BaseJob;
use Doctrine\ORM\Mapping as ORM;

/**
 * @ORM\Entity
 * @ORM\Table(name="job_some_other_name", indexes={@ORM\Index(name="job_crc_hash_idx", columns={"crcHash","status"}),
 *                  @ORM\Index(name="job_priority_idx", columns={"priority","whenAt"}),
 *                  @ORM\Index(name="job_when_idx", columns={"whenAt"}),
 *                  @ORM\Index(name="job_status_idx", columns={"status","whenAt"})})
 */
class Job extends BaseJob {
}

// ... similarly for Entity\JobArchive if necessary
<?php
namespace App\Document;

use Doctrine\ODM\MongoDB\Mapping\Annotations as ODM;
use Dtc\QueueBundle\Document\Job as BaseJob;

/**
 * @ODM\Document(db="my_db", collection="my_job_collection")
 */
class Job extends BaseJob
{
}

// ... similarly for Document\JobArchive if necessary
  1. 将新类添加到 config.yml
# config.yml
# ...
dtc_queue:
    class_job: App\Entity\Job
    class_job_archive: App\Entity\JobArchive

任务事件订阅者

在长时间运行的脚本中监听事件,以清除 doctrine 管理器或发送关于任务状态的电子邮件很有用。要添加任务事件订阅者,创建一个新的服务,标记为 dtc_queue.event_subscriber

services:
    voices.queue.listener.clear_manager:
        class: ClearManagerSubscriber
        arguments:
            - '@service_container'
        tags:
            - { name: dtc_queue.event_subscriber, connection: default }

ClearManagerSubscriber.php

<?php
use Dtc\QueueBundle\EventDispatcher\Event;
use Dtc\QueueBundle\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

class ClearManagerSubscriber
    implements EventSubscriberInterface
{
    private $container;
    public function __construct(ContainerInterface $container) {
        $this->container = $container;
    }

    public function onPostJob(Event $event)
    {
        $managerIds = [
            'doctrine.odm.mongodb.document_manager',
            'doctrine.orm.default_entity_manager',
            'doctrine.orm.content_entity_manager'
        ];

        foreach ($managerIds as $id) {
            $manager = $this->container->get($id);
            $manager->clear();
        }
    }

    public static function getSubscribedEvents()
    {
        return array(
            Event::POST_JOB => 'onPostJob',
        );
    }
}

作为 upstart 服务运行

  1. 在 /etc/init/ 目录中创建以下文件。PHP 在内存管理和垃圾回收方面非常糟糕:为了处理内存不足的问题,一次运行 20 个任务。(或者一个可管理的任务大小)
# /etc/init/queue.conf

author "David Tee"
description "Queue worker service, run 20 jobs at a time, process timeout of 3600"

respawn
start on startup

script
        /{path to}/console dtc:queue:run --max_count 20 -v -t 3600>> /var/logs/queue.log 2>&1
end script
  1. 重新加载配置:sudo initctl reload-configuration
  2. 启动脚本:sudo start queue

管理

您可以在 routing.yml 文件中注册管理路由以查看队列状态。添加以下内容

dtc_queue:
    resource: '@DtcQueueBundle/Resources/config/routing.yml'

测试

您可以在源文件夹中键入 bin/phpunit 来运行单元测试。如果您想运行带有 MongoDB 的集成测试,您需要在本地主机上设置 MongoDB 服务器并运行

bin/phpunit Tests/Document/JobManagerTest.php

如果您想运行 Beanstalkd 集成测试,您需要运行一个本地空实例的 beanstalkd 以进行测试。

sudo service beanstalkd restart; BEANSTALD_HOST=localhost bin/phpunit Tests/BeanStalkd/JobManagerTest.php

完整配置

请参阅 /Resources/doc/full-configuration.md

许可

此软件包受 MIT 许可证的约束(请参阅 Resources/meta/LICENSE 下的 LICENSE 文件)。

致谢

最初由 @dtee 编写,由 @mmucklo 增强和维护