khahwp / queue-bundle
针对Symfony框架的Wealthpark QueueBundle
Requires
- php: >=5.6.0
- ext-mbstring: *
- ext-openssl: *
- khanhwp/common: ~1.3
- symfony/console: ~2.5|~3.0
- symfony/framework-bundle: ^2.7 || ^3.0
- symfony/process: ^2.7 || ^3.0
Requires (Dev)
- aws/aws-sdk-php: ~3.0
- mockery/mockery: ~0.9.4
- pda/pheanstalk: ~3.0
- phpunit/phpunit: ~5.7
- predis/predis: ~1.0
README
简介
此QueueBundle深受Laravel Queue包的启发。实际上,其中一些文件是直接复制的。因此,向Taylor Otwell和Laravel团队致敬,他们为社区提供了一款出色的包。
安装
您可以通过composer安装QueueBundle。
composer require ideasbucket/queue-bundle
QueueBundle支持Symfony 2.8、3.0及以上版本。
配置
安装包后,您需要在您的AppKernel.php
中进行更改,添加如下所示的包类条目。
<?php
use Symfony\Component\HttpKernel\Kernel;
use Symfony\Component\Config\Loader\LoaderInterface;
class AppKernel extends Kernel
{
public function registerBundles()
{
$bundles = array(
new Symfony\Bundle\FrameworkBundle\FrameworkBundle(),
new Symfony\Bundle\SecurityBundle\SecurityBundle(),
new Symfony\Bundle\TwigBundle\TwigBundle(),
new Symfony\Bundle\MonologBundle\MonologBundle(),
new Symfony\Bundle\SwiftmailerBundle\SwiftmailerBundle(),
new Doctrine\Bundle\DoctrineBundle\DoctrineBundle(),
new Sensio\Bundle\FrameworkExtraBundle\SensioFrameworkExtraBundle(),
new IdeasBucket\QueueBundle\IdeasBucketQueueBundle(), // ADD THIS
new AppBundle\AppBundle(),
);
....
return $bundles;
}
}
然后在config.yml
中,您可以定义包中包含的每个队列驱动程序的配置,包括数据库、Beanstalkd、Amazon SQS、Redis,以及一个同步驱动程序,该驱动程序将立即执行任务(用于本地使用)。还包括一个null
队列驱动程序,它只是简单地丢弃队列任务。
QueueBundle所需的基本最小配置是配置cache_handler
。基本上,它可以实现这些接口中的任何一个服务。
- PSR-16缓存
- PSR-6缓存池
- Doctrine缓存
对于缓存处理器,您可以定义如下服务。
# In service.yml or config.yml file
app.cache:
app: ANY_CACHE_ADAPTER
如果可能,我们建议使用PSR-16缓存接口。
您可以使用任何缓存适配器。有关缓存处理器更多信息,请访问这里或这里。
QueueBundle的完整配置如下。
ideasbucket_queue:
cache_handler: app.cache
default: sync # default
# Default config for command path you may need to change
# this if you are using Symfony 2.x directory structure.
command_path: %kernel.root_dir%/../bin/
lock_path: ~
lock_service: ideasbucket_queue.filesystem_switch # Default value
connections:
sqs:
driver: sqs
key: YOUR_KEY
secret: YOUR_SECRET
prefix: https://sqs.us-west-2.amazonaws.com/some-id
queue: default
region: us-west-2
redis:
driver: redis
client: YOUR_PREDIS_CLIENT
queue: default
retry_after: 90
beanstalkd:
driver: beanstalkd
host: localhost
port: 11300
persistent: ~
queue: default
retry_after: 90
database:
driver: database
queue: default
repository: YOUR_QUEUE_REPOSITORY
retry_after: 90
# If you want to store failed jobs in database.
#failed_job_repository: FAILED_JOB_REPOSITORY
连接与队列
在开始使用QueueBundle之前,了解“连接”和“队列”之间的区别非常重要。在您的config.yml
中,您可以定义connections
的配置。此选项定义了到后端服务(如Amazon SQS、Beanstalk或Redis)的特定连接。然而,给定的队列连接可以有多个“队列”,可以将其视为不同的任务队列或堆栈。
注意,在config.yml
配置文件中的每个连接配置示例都包含一个queue
属性。这是在将任务发送到给定的连接时,任务将被分发的默认队列。换句话说,如果您未显式指定任务应分发到哪个队列,则任务将被放置在连接配置中定义的queue
属性所指定的队列上。
// This job is sent to the default queue...
$this->get('idb_queue')->push('service_id');
// This job is sent to the "emails" queue...
$this->get('idb_queue')->push('service_id', [], 'emails');
某些应用程序可能永远不需要将作业推送到多个队列,而是更喜欢有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或对作业进行处理进行分段的程序特别有用,因为队列包的队列工作器允许您指定按优先级处理的队列。例如,如果您将作业推送到一个high
队列,您可能会运行一个将它们赋予更高处理优先级的作业。
php console idb_queue:work --queue=high,default
驱动程序前提条件
数据库
为了使用database
队列驱动程序,您需要运行以下命令,这将生成支持队列所需的存储库和实体
php console idb_queue:database
这将生成您cache/output
文件夹中的必要文件,您需要将其移动到适当的位置。然后定义一个服务,该服务的定义将在命令运行结束时显示。
命令假定您正在使用带有注释配置的Doctrine (ORM或ODM)。如果您使用任何其他配置格式,则必须在生成的代码中进行必要的调整。
此外,如果您想使用关系数据库,则需要"doctrine/orm"
;如果您想使用MongoDB,则需要"doctrine/mongodb-odm"
和"doctrine/mongodb-odm-bundle"
。
如果您需要支持Doctrine或其他数据库以外的任何库,请参阅此处
Redis
为了使用redis
队列驱动程序,您需要一个提供Predis
客户端实例的服务。如果您使用SNC RedisBundle,则它将是snc_redis.default_client
,假设您正在使用Predis作为默认Redis客户端。
其他驱动程序先决条件
以下依赖关系对于列出的队列驱动程序是必需的
-
Amazon SQS:
aws/aws-sdk-php ~3.0
-
Beanstalkd:
pda/pheanstalk ~3.0
-
Redis:
predis/predis ~1.0
创建任务
生成任务类
每个作业基本上都是一个实现IdeasBucket\QueueBundle\QueueableInterface
接口的服务。队列包提供了生成作业的命令。
php console idb_queue:create_job
生成的类将位于您在命令期间选择的包的Job
文件夹中。
作业结构
作业类非常简单,它简单地实现了包含只有fire
方法的队列接口,该方法在作业被队列处理时调用。为了开始,让我们看看一个示例作业类。在这个例子中,我们将使用队列发送电子邮件
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* @var \Swift_Mailer
*/
private $mailer;
public function __construct(\Swift_Mailer $mailer)
{
$this->mailer = $mailer;
}
/**
* @param JobsInterface $job
* @param array $data
*/
public function fire(JobsInterface $job, array $data = null)
{
// Create a message
//....
$this->mailer->send($message);
$job->delete();
}
}
然后您需要在您的service.yml文件中定义作业的服务。
services:
app_queue_mailer:
class: AppBundle\Job\QueueMailer
arguments: [ '@mailer']
{note}二进制数据,例如原始图像内容,应在传递给排队作业之前通过
base64_encode
函数进行传递。否则,作业在被放置在队列上时可能无法正确序列化为JSON。
推送/调度作业
一旦您编写了作业类并为其配置了服务,您可以使用idb_queue
服务来调度它。
<?php
namespace AppBundle\Controllers;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
class MailerController extends Controller
{
/**
* The action/method handles the user registration.
*
* @param Request $request
* @Route("/mail", name="mail")
* @Method({"GET", "POST"})
*
* @return Response
*/
public function mail(Request $request)
{
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World']);
// Rest of the code......
}
}
延迟推送/调度
如果您想延迟队列作业的执行。
$tenMinutesLater = (new \DateTime)->modify('10 minute');
$this->get('idb_queue')->later($tenMinutesLater, 'app_queue_mailer', $data);
{note}Amazon SQS队列服务有最长延迟时间为15分钟。
自定义队列与连接
向特定队列推送
通过将作业推送到不同的队列,您可以“分类”您的排队作业,甚至可以优先处理分配给各种队列的工作者的数量。请注意,这并不将作业推送到由您的队列配置文件定义的不同队列“连接”,而只是推送到单个连接中的特定队列
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing');
向特定连接推送
如果您正在处理多个队列连接,您可以指定将作业推送到哪个连接
// On processing and sqs connection
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing', 'sqs');
指定最大任务尝试次数/超时值
最大尝试次数
指定作业可能尝试的最大次数的一种方法是在控制台命令行上使用--tries
开关
php console idb_queue:work --tries=3
但是,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,则它将覆盖命令行上提供的值。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
public $maxTries = 5;
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
如果您不喜欢使用公共属性。
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
private $maxTries = 5;
public function getMaxTries()
{
return $this->maxTries;
}
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
超时
同样,可以使用Artisan命令行上的--timeout
开关来指定作业可以运行的最大秒数。
php console idb_queue:work --timeout=30
但是,您也可以在作业类本身上定义作业应运行的最大秒数。如果在作业上指定了超时,则它将覆盖命令行上指定的任何超时。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
public $timeout = 5;
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
如果您不喜欢使用公共属性。
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
private $timeout = 5;
public function getTimeout()
{
return $this->timeout;
}
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
错误处理
如果在处理作业时抛出异常,作业将自动重新加入队列以便再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由用于idb_queue:work
控制台命令的--tries
开关定义。或者,最大尝试次数也可以在作业类本身上定义。有关运行队列工作者的更多信息,请参阅下面的内容。
运行队列工作进程
QueueBundle包括一个队列工作者,它将在作业被推送到队列时处理新作业。您可以使用idb_queue:work
控制台命令运行工作者。请注意,一旦启动了idb_queue:work
命令,它将继续运行,直到手动停止或您关闭终端。
php console idb_queue:work
您还可以使用
php console idb_queue:listen
{提示} 要将
idb_queue:work
进程永久地在后台运行,您应使用进程监控工具(如Supervisor)来确保队列工作者不会停止运行。
记住,队列工作者是长生命周期的进程,并将启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在您的部署过程中,请确保重启您的队列工作者。
指定连接和队列
您还可以指定工作者应使用的队列连接。传递给work
命令的连接名称应与您config/queue.php
配置文件中定义的连接之一相对应。
php console idb_queue:work redis
您还可以进一步自定义队列工作者,仅处理特定连接的特定队列。例如,如果您的所有电子邮件都在redis
队列连接上的emails
队列中处理,则可以发出以下命令来启动仅处理该队列的工作者
php console idb_queue:work redis --queue=emails
资源考虑
守护进程队列工作者在处理每个作业之前不会“重新启动”框架。因此,您应在每个作业完成后释放任何重资源。例如,如果您使用GD库进行图像处理,则在完成时,您应使用imagedestroy
释放内存。
队列优先级
有时您可能希望优先处理队列。例如,在您的config.yml
中,您可能将redis
连接的默认queue
设置为low
。但是,偶尔您可能希望将作业推送到像这样high
优先级的队列
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'high');
要启动一个工作者,该工作者在继续处理任何低队列作业之前验证所有高队列作业已处理,请向work
命令传递以逗号分隔的队列名称列表
php console idb_queue:work --queue=high,low
队列工作进程与部署
由于队列工作者是长生命周期的进程,它们不会在未重新启动的情况下接收对您的代码的更改。因此,使用队列工作者部署应用程序的最简单方法是,在部署过程中重启工作者。您可以通过发出idb_queue:restart
命令来优雅地重启所有工作者。
php console idb_queue:restart
该命令将指示所有队列工作员在处理完当前作业后优雅地“死亡”,以确保不会丢失现有作业。由于队列工作员将在执行 idb_queue:restart
命令时死亡,因此您应该运行一个进程管理器,例如 Supervisor,来自动重启队列工作员。
任务过期与超时
作业过期
在您的 config.yml
配置文件中,每个队列连接都定义了一个 retry_after
选项。此选项指定队列连接在重试正在处理的作业之前应等待多少秒。例如,如果 retry_after
的值设置为 90
,则在90秒内未删除的情况下处理作业,则作业将重新放入队列。通常,您应将 retry_after
的值设置为作业合理完成处理所需的最大秒数。
{note} 不包含
retry_after
值的唯一队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 来重试作业。
工作员超时
idb_queue:work
控制台命令公开了一个 --timeout
选项。该 --timeout
选项指定队列主进程在杀死处理作业的子队列工作员之前将等待多长时间。有时子队列进程可能会因各种原因而“冻结”,例如未响应的外部 HTTP 调用。该 --timeout
选项删除超过指定时间限制的冻结进程
php console idb_queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项不同,但它们协同工作以确保作业不会丢失,并且作业只成功处理一次。
{note}
--timeout
值应始终至少比您的retry_after
配置值短几秒。这将确保在作业重试之前,处理给定作业的工作员总是被杀死。如果您的--timeout
选项比您的retry_after
配置值长,则您的作业可能会被处理两次。
工作员睡眠时长
当队列上有作业可用时,工作员将在作业之间没有任何延迟地持续处理作业。但是,sleep
选项确定在没有任何新作业可用时工作员将“睡眠”多长时间
php console idb_queue:work --sleep=3
监控器配置
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监控器,如果它失败,将自动重启您的 idb_queue:work
进程。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令
sudo apt-get install supervisor
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,指导 Supervisor 如何监控您的进程。例如,让我们创建一个 queue-worker.conf
文件,该文件将启动并监控一个 idb_queue:work
进程
[program:queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/project/acme/bin/console idb_queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=johndoe
numprocs=8
redirect_stderr=true
stdout_logfile=/home/project/acme/var/log/worker.log
在此示例中,numprocs
指令将指示 Supervisor 运行 8 个 idb_queue:work
进程并监控所有这些进程,如果它们失败则自动重启它们。当然,您应该将 command
指令中的 idb_queue:work sqs
部分更改为反映您所需的队列连接。
启动 Supervisor
一旦创建配置文件,您可以使用以下命令更新 Supervisor 配置并启动进程
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start queue-worker:*
有关 Supervisor 的更多信息,请参阅 Supervisor 文档。
处理失败任务
有时您的队列作业会失败。别担心,事情并不总是按计划进行!QueueBundle 包含了一种方便的方式,可以指定作业应尝试的最大次数。在作业尝试次数超过这个数量后,它可以被插入到数据库中。要使用数据库存储失败作业,您应该使用 idb_queue:fail_database
命令
php console idb_queue:fail_database
此命令会询问您一些问题,您需要回答。然后将为您生成必要的文件。过程与使用数据库进行队列类似。
然后,在运行您的 队列工作进程 时,您应该使用 idb_queue:work
命令的 --tries
开关来指定作业应尝试的最大次数。如果您未指定 --tries
选项的值,作业将无限尝试。
php console idb_queue:work redis --tries=3
清理失败任务后的操作
您需要直接在作业类中实现 IdeasBucket\QueueBundle\QueueErrorInterface
,这样您就可以在失败发生时执行特定的作业清理。这是发送警报给您的用户或撤销作业执行的操作的完美位置。导致作业失败的 Exception
将传递给 failed
方法。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
use IdeasBucket\QueueBundle\QueueErrorInterface;
class QueueMailer implements QueueableInterface, QueueErrorInterface
{
public function fire(JobsInterface $job, array $data = null)
{
// ....
$job->delete();
}
public function failed(\Exception $e, $payload = null)
{
// Do something with the error
}
}
重试失败任务
要查看所有已插入数据库的失败作业,您可以使用 idb_queue:failed
控制台命令。
php console idb_queue:failed
idb_queue:failed
命令将列出作业 ID、连接、队列和失败时间。可以使用作业 ID 重新尝试失败的作业。例如,要重新尝试 ID 为 5
的失败作业,请执行以下命令
php console idb_queue:retry 5
要重新尝试所有失败的作业,请执行 idb_queue:retry
命令并传递 all
作为 ID。
php console idb_queue:retry all
如果您想删除一个失败的作业,可以使用 idbb_queue:forget
命令。
php console idb_queue:forget 5
要删除所有失败的作业,可以使用 idb_queue:flush
命令。
php console idb_queue:flush
任务事件
QueueBundle 提供了您可以在作业执行期间监听的事件。这是一个通过电子邮件或 HipChat 通知您的团队或是在作业失败时采取某些行动的好机会。您可以像这样附加监听器。
<?php
namespace AppBundle\EventListener;
use Symfony\Component\EventDispatcher\Event;
/**
* Class QueueListener
*
* @package AppBundle\EventListener
*/
class QueueListener
{
/**
* This method will be called before Job firing
*
* @param Event $event
*/
public function during(Event $event)
{
}
}
然后注册一个服务
appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
- { name: kernel.event_listener, event: job_processing, method: during }
- { name: kernel.event_listener, event: job_failed, method: during }
- { name: kernel.event_listener, event: job_exception_occurred, method: during }
- { name: kernel.event_listener, event: job_processed, method: during }
- { name: kernel.event_listener, event: looping, method: during }
- { name: kernel.event_listener, event: worker_stopping, method: during }
当然,您可以在不同的事件期间调用不同的方法。如果您正在使用 Symfony 3.2 或更高版本
,我们建议您使用类常量,如下所示。
appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_EXCEPTION_OCCURRED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_FAILED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSING, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::LOOPING, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::WORKER_STOPPING, method: during }
有关创建事件监听器和注册的更多信息,请访问 Symfony 文档。
其他数据库和库支持
当您执行 idb_queue:database
命令时,它假定您正在使用 Doctrine(ORM 或 ODM)。如果您没有使用 Doctrine 或只想支持其他数据库,那么在 QueueBundle 中,这相当简单。
您需要确保 Repository 实现了接口 IdeasBucket\QueueBundle\Repository\DatabaseQueueRepositoryInterface
,并且 Entity 实现了 IdeasBucket\QueueBundle\Entity\DatabaseQueueEntityInterface
。只要满足这些要求,您就可以使用任何库或数据库。
GIST
即将推出。