quitoque / queue-bundle
Symfony2/3/4 队列包(用于后台任务)支持Mongo(Doctrine ODM)、Mysql(及任何Doctrine ORM)、RabbitMQ、Beanstalkd、Redis,以及... {编写自己的}
Requires
- php: >=5.6
- cocur/background-process: >=0.7
- dstuecken/php7ify: >=1.1
- mmucklo/grid-bundle: >=4.2.0
- sensio/framework-extra-bundle: 2.*|3.*|4.*|5.*
- symfony/framework-bundle: >=2.7|>=3.3|4.*
Requires (Dev)
- beberlei/doctrineextensions: ^1.0
- doctrine/annotations: <1.5
- doctrine/cache: <1.7
- doctrine/collections: <1.5
- doctrine/common: <2.8
- doctrine/dbal: <2.6
- doctrine/doctrine-bundle: >=1.8.1
- doctrine/instantiator: <1.1
- doctrine/mongodb-odm: ^1.2
- doctrine/orm: ^2.4
- friendsofphp/php-cs-fixer: dev-master
- pda/pheanstalk: ^3.1
- php-amqplib/php-amqplib: ^2.6
- phpunit/php-code-coverage: ^4.0
- phpunit/phpunit: ^5.7.0
- predis/predis: ^v1.1.1
- snc/redis-bundle: ^2.0
- symfony/proxy-manager-bridge: >=2.7|>=3.3|4.*
- symfony/templating: 2.*|3.*|4.*
Suggests
- ext-redis: Alternative redis library
- alcaeus/mongo-php-adapter: If trying to use MongoDB ODM on PHP 7.0 or greater
- beberlei/DoctrineExtensions: Alternative for YEAR, MONTH, DAY, HOUR, MINUTE if using JobTiming trends
- doctrine/mongodb-odm: If using mongo db
- doctrine/orm: If using an RDBMS
- oro/doctrine-extensions: For YEAR, MONTH, DAY, HOUR, MINUTE date functions if using JobTiming trends
- pda/pheanstalk: If using beanstalkd
- php-amqplib/php-amqplib: If using RabbitMQ
- predis/predis: If using redis
- snc/redis-bundle: If using redis
- dev-master
- 4.10.7
- 4.10.6
- 4.10.5
- 4.10.4
- 4.10.3
- 4.10.2
- 4.10.1
- 4.10.0
- 4.9.1
- 4.9.0
- 4.8.1
- 4.8.0
- 4.7.1
- 4.7.0
- 4.6.1
- 4.6.0
- 4.5.3
- 4.5.2
- 4.5.1
- 4.5.0
- 4.4.1
- 4.4.0
- 4.3.2
- 4.3.1
- 4.3.0
- 4.2.0
- 4.1.0
- 4.0.3
- 4.0.2
- 4.0.1
- 4.0.0
- 3.1.3
- 3.1.2
- 3.1.1
- 3.1.0
- 3.0.4
- 3.0.2
- 3.0.1
- 3.0.0
- 2.7.13
- 2.7.12
- 2.7.11
- 2.7.10
- 2.7.9
- 2.7.8
- 2.7.7
- 2.7.6
- 2.7.5
- 2.7.4
- 2.7.3
- 2.7.2
- 2.7.1
- 2.7.0
- 2.6.7
- 2.6.6
- 2.6.5
- 2.6.4
- 2.6.3
- 2.6.2
- 2.6.1
- 2.6.0
- 2.5.3
- 2.5.2
- 2.5.1
- 2.5.0
- 2.4.0
- 2.3.3
- 2.3.2
- 2.3.1
- 2.3.0
- 2.2.0
- 2.1.1
- 2.1.0
- 2.0.1
- 2.0.0
- 1.0.1
- 0.9.5
- 0.9.4
- 0.9.3
- 0.9.2
- 0.9.1
- 0.9.0
- dev-feature/admin-ui-updates
This package is not auto-updated.
Last update: 2021-09-23 15:07:51 UTC
README
允许symfony开发者像这样轻松创建后台任务:
$worker->later()->process(1,2,3)
4.0 版本发布
查看 更改
从3.0升级: 查看UPGRADING-4.0.md
支持的队列
- MongoDB通过Doctrine-ODM
- Mysql / Doctrine 2支持的数据库通过Doctrine-ORM
- Beanstalkd通过pheanstalk
- RabbitMQ通过php-amqplib
- Redis支持通过Predis或PhpRedis,或者通过SncRedisBundle
简介
此包提供了一种轻松创建和管理队列后台任务的方法
基本功能
- 易于使用
- 使用一行或两行代码启动后台任务
- 轻松添加后台工作服务
- 将任何代码转换为后台任务只需几行
- 作业的原子操作
- 对于基于ORM的队列,这是通过 不依赖事务 来实现的。
- 管理界面
- 基于Web的管理界面,可选的性能图表
- 命令行界面
- 用于从控制台运行、管理和调试作业的命令
- 作业归档
- ORM和ODM管理器具有内置的作业归档功能,用于完成作业
- 记录工作进程的错误
- 针对诸如停滞作业、异常作业等各种安全检查
- 允许通过控制台命令重置停滞和异常作业
- 内置事件调度器
作业特定功能
- 失败或异常时的自动重试
- 如果作业以失败代码退出,可以自动重试
- 如果需要,相同适用于异常
- 优先级
- 作业可以有优先级级别,以便即使是在队列中较晚添加的较高优先级作业也可以首先处理。
- 。
- 未来作业(ODM / ORM / Redis)
- 作业可以安排在未来某个时间运行
- 批量
- 作业可以被“批量”处理,这样即使有多个相同类型的作业排队,也只运行一个作业。
- 过期
- 作业可以有一个“过期”时间,这样它们就不会在某个时间点之后运行。
- (如果队列积压并且作业在某个时间点之后毫无价值,这很有用)
- 作业可以有一个“过期”时间,这样它们就不会在某个时间点之后运行。
- 停滞(ODM / ORM)
- 可以检测到使解释器崩溃或因其他原因被终止的作业
- 这些作业可以被重新排队以供将来运行。
- 可以检测到使解释器崩溃或因其他原因被终止的作业
安装
Symfony 2/3
查看 /Resources/doc/symfony2-3.md
Symfony 4
用法
创建一个工作类,该类将处理后台作业。
示例
- src/Worker/FibonacciWorker.php: (symfony 4)
- src/AppBundle/Worker/FibonacciWorker.php: (symfony 2/3)
<?php namespace App\Worker; // for symfony 2/3, the namespace would typically be AppBundle\Worker class FibonacciWorker extends \Dtc\QueueBundle\Model\Worker { private $filename; public function __construct() { $this->filename = '/tmp/fib-result.txt'; } public function fibonacciFile($n) { $feb = $this->fibonacci($n); file_put_contents($this->filename, "{$n}: {$feb}"); } public function fibonacci($n) { if($n == 0) return 0; //F0 elseif ($n == 1) return 1; //F1 else return $this->fibonacci($n - 1) + $this->fibonacci($n - 2); } public function getName() { return 'fibonacci'; } public function getFilename() { return $this->filename; } }
为作业创建一个DI服务,并将其标记为后台工作进程。
YAML
Symfony 4 和 3.3, 3.4
services: # for symfony 3 the class name would likely be AppBundle\Worker\FibonacciWorker App\Worker\FibonacciWorker: # public: false is possible if you completely use DependencyInjection for access to the service public: true tags: - { name: "dtc_queue.worker" }
Symfony 2, 以及 3.0, 3.1, 3.2
services: app.worker.fibonacci: class: AppBundle\Worker\FibonacciWorker: tags: - { name: "dtc_queue.worker" }
XML
<services> <!-- ... --> <service id="fibonacci_worker" class="FibonacciWorker"> <tag name="dtc_queue.worker" /> </service> <!-- ... --> </services>
创建作业
// Dependency inject the worker or fetch it from the container $fibonacciWorker = $container->get('App\Worker\FibonacciWorker'); // For Symfony 3.3, 3.4 // $fibonacciWorker = $container->get('AppBundle\Worker\FibonacciWorker'); // // For Symfony 2, 3.0, 3.1, 3.2: // $fibonacciWorker = $container->get('app.worker.fibonacci'); // Basic Examples $fibonacciWorker->later()->fibonacci(20); $fibonacciWorker->later()->fibonacciFile(20); // Batch Example $fibonacciWorker->batchLater()->fibonacci(20); // Batch up runs into a single run // Timed Example $fibonacciWorker->later(90)->fibonacci(20); // Run 90 seconds later // Priority // Note: whether 1 == High or Low priority is configurable, but by default it is High $fibonacciWorker->later(0, 1); // As soon as possible, High priority $fibonacciWorker->later(0, 125); // Medium priority $fibonacciWorker->later(0, 255); // Low priority // Advanced Usage Example: // (If the job is not processed by $expireTime, then don't execute it ever...) $expireTime = time() + 3600; $fibonacciWorker->later()->setExpiresAt(new \DateTime("@$expireTime"))->fibonacci(20); // Must be run within the hour or not at all
创建作业 - 其他信息
有关创建作业的进一步说明,包括如何在 命令行 中创建作业,请参阅
运行作业
建议您将以下控制台命令放入后台执行
bin/console dtc:queue:run -d 120
# the -d parameter is the number of seconds to run # For example you could put the above command into cron or a cron-like system to run every 2 minutes # # There are a number of other parameters that could be passed to dtc:queue:run run this for a full list: bin/console dtc:queue:run --help
修剪作业
对于基于ODM和ORM的存储,存档表和常规作业队列可能需要定期修剪。
对于生产中的Mongo,使用固定集合或TTL索引可能是明智的。
对于MySQL,您可以创建一个事件定期删除数据。
尽管如此,还有一些命令可以执行类似操作(也可以放入周期性的cron作业中)
bin/console dtc:queue:prune old --older 1m # (deletes jobs older than one month from the Archive table) # May be needed if jobs stall out: bin/console dtc:queue:prune stalled # If you're recording runs...this is recommended: bin/console dtc:queue:prune stalled_runs # If you're recording runs...another recommendation bin/console dtc:queue:prune old_runs --older 1m # If you're recording timings bin/console dtc:queue:prune old_job_timings --older 1m # You can tune 1m to a smaller interval such as 10d (10 days) or even 1800s (1/2 hour) # if you have too many jobs flowing through the system.
bin/console dtc:queue:prune --help # lists other prune commands
调试
这些命令可能有助于调试队列问题
bin/console dtc:queue:count # some status about the queue if available (ODM/ORM only) bin/console dtc:queue:reset # resets errored and/or stalled jobs # This is really only good for bin/console dtc:queue:run --id={jobId} # (jobId could be obtained from mongodb / or your database, if using an ORM / ODM solution)
跟踪运行情况
每次运行都可以在ORM/ODM支持的数据存储中的表中跟踪。
配置方法: app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: manager: # run defaults to whatever job is set to (which defaults to "odm", i.e. mongodb) # If you set the job to rabbit_mq, or beanstalkd or something else, you need to set run # to an ORM / ODM run_manager (or a custom such one) in order to get the runs to save # run: orm # other possible option is "odm" (i.e. mongodb) # # (optionally define your own run manager with id: dtc_queue.manager.run.{some_name} and put {some_name} under run:
MongoDB DocumentManager
更改文档管理器
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: odm: document_manager: {something} # default is "default"
MySQL / ORM 设置
从4.0开始,ORM需要启用bcmath扩展
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: manager: job: orm
更改EntityManager
dtc_queue: orm: entity_manager: {something} # default is "default"
注意:如果您未启用自动映射,可能需要在config.yml中的映射部分添加DtcQueueBundle。
doctrine: #... orm: #... mappings: DtcQueueBundle: ~
关于非ORM设置的说明
如果您计划使用ODM或Redis或其他配置,但已在其他地方启用了Doctrine ORM,建议您使用schema_filter配置参数,以便模式转储和/或迁移差异不包含那些表(请参阅问题#77)。
例如。
doctrine: # ... dbal: # ... schema_filter: ~^(?!dtc_)~
(如果您已经有一个schema_filter,则只需将“dtc_”前缀添加到其中。)
Beanstalk配置
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: beanstalkd: host: beanstalkd tube: some-tube-name [optional] manager: job: beanstalkd
RabbitMQ 配置
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: manager: job: rabbit_mq rabbit_mq: host: rabbitmq port: 5672 user: guest password: guest vhost: "/" [optional defaults to "/"] ssl: [optional defaults to false - toggles to use AMQPSSLConnection] options: [optional options to pass to AMQPStreamConnection or AMQPSSLConnection] ssl_options: [optional extra ssl options to pass to AMQPSSLConnection] queue_args: [optional] queue: [optional queue name] passive: [optional defaults to false] durable: [optional defaults to true] exlusive: [optional defaults to false] auto_delete: [optional defaults to false] exchange_args: [optional] exchange: [optional queue name] type: [optional defaults to "direct"] passive: [optional defaults to false] durable: [optional defaults to true] auto_delete: [optional defaults to false]
Redis 配置
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: manager: job: redis redis: # choose one of the below snc_redis, predis, or phpredis snc_redis: type: predis alias: default predis: # choose one of dns or connection_parameters dsn: redis:// connection_parameters: scheme: tcp host: localhost port: 6379 path: ~ database: ~ password: ~ async: false persistent: false timeout: 5.0 read_write_timeout: ~ alias: ~ weight: ~ iterable_multibulk: false throw_errors: true phpredis: # minimum fill host and port if needed host: localhost port: 6379 timeout: 0 retry_interval: ~ read_timeout: 0 auth: ~
自定义任务和管理器
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4)
dtc_queue: class_job: Some\Job\ClassName [optional] manager: job: some_name [optional] # (create your own manager service and name or alias it: # dtc_queue.manager.job.<some_name> and put # <some_name> in the manager: job field above)
重命名数据库或表名
- 扩展以下内容
Dtc\QueueBundle\Document\Job
Dtc\QueueBundle\Document\JobArchive
或者
Dtc\QueueBundle\Entity\Job
Dtc\QueueBundle\Entity\JobArchive
(根据您是否使用 MongoDB 或 ORM)
- 适当地更改类的参数
<?php namespace App\Entity; // Or whatever use Dtc\QueueBundle\Entity\Job as BaseJob; use Doctrine\ORM\Mapping as ORM; /** * @ORM\Entity * @ORM\Table(name="job_some_other_name", indexes={@ORM\Index(name="job_crc_hash_idx", columns={"crcHash","status"}), * @ORM\Index(name="job_priority_idx", columns={"priority","whenAt"}), * @ORM\Index(name="job_when_idx", columns={"whenAt"}), * @ORM\Index(name="job_status_idx", columns={"status","whenAt"})}) */ class Job extends BaseJob { } // ... similarly for Entity\JobArchive if necessary
<?php namespace App\Document; use Doctrine\ODM\MongoDB\Mapping\Annotations as ODM; use Dtc\QueueBundle\Document\Job as BaseJob; /** * @ODM\Document(db="my_db", collection="my_job_collection") */ class Job extends BaseJob { } // ... similarly for Document\JobArchive if necessary
- 将新类添加到 config.yml
# config.yml # ... dtc_queue: class_job: App\Entity\Job class_job_archive: App\Entity\JobArchive
任务事件订阅者
在长时间运行的脚本中监听事件,以清除 doctrine 管理器或发送关于任务状态的电子邮件很有用。要添加任务事件订阅者,创建一个新的服务,标记为 dtc_queue.event_subscriber
services: voices.queue.listener.clear_manager: class: ClearManagerSubscriber arguments: - '@service_container' tags: - { name: dtc_queue.event_subscriber, connection: default }
ClearManagerSubscriber.php
<?php use Dtc\QueueBundle\EventDispatcher\Event; use Dtc\QueueBundle\EventDispatcher\EventSubscriberInterface; use Symfony\Component\DependencyInjection\ContainerInterface; class ClearManagerSubscriber implements EventSubscriberInterface { private $container; public function __construct(ContainerInterface $container) { $this->container = $container; } public function onPostJob(Event $event) { $managerIds = [ 'doctrine.odm.mongodb.document_manager', 'doctrine.orm.default_entity_manager', 'doctrine.orm.content_entity_manager' ]; foreach ($managerIds as $id) { $manager = $this->container->get($id); $manager->clear(); } } public static function getSubscribedEvents() { return array( Event::POST_JOB => 'onPostJob', ); } }
作为 upstart 服务运行
- 在 /etc/init/ 目录中创建以下文件。PHP 在内存管理和垃圾回收方面非常糟糕:为了处理内存不足的问题,一次运行 20 个任务。(或者一个可管理的任务大小)
# /etc/init/queue.conf author "David Tee" description "Queue worker service, run 20 jobs at a time, process timeout of 3600" respawn start on startup script /{path to}/console dtc:queue:run --max_count 20 -v -t 3600>> /var/logs/queue.log 2>&1 end script
- 重新加载配置:sudo initctl reload-configuration
- 启动脚本:sudo start queue
管理
您可以在 routing.yml 文件中注册管理路由以查看队列状态。添加以下内容
dtc_queue: resource: '@DtcQueueBundle/Resources/config/routing.yml'
测试
您可以在源文件夹中键入 bin/phpunit 来运行单元测试。如果您想运行带有 MongoDB 的集成测试,您需要在本地主机上设置 MongoDB 服务器并运行
bin/phpunit Tests/Document/JobManagerTest.php
如果您想运行 Beanstalkd 集成测试,您需要运行一个本地空实例的 beanstalkd 以进行测试。
sudo service beanstalkd restart; BEANSTALD_HOST=localhost bin/phpunit Tests/BeanStalkd/JobManagerTest.php
完整配置
请参阅 /Resources/doc/full-configuration.md
许可
此软件包受 MIT 许可证的约束(请参阅 Resources/meta/LICENSE 下的 LICENSE 文件)。
致谢
最初由 @dtee 编写,由 @mmucklo 增强和维护
