面向品牌 / 队列组件包
Symfony 框架的 IdeasBucket 队列组件包
Requires
- php: >=5.6.0
- ext-mbstring: *
- ext-openssl: *
- ideasbucket/common: ~1.1
- psr/simple-cache: 1.0.0
- symfony/console: ~2.5|~3.0|~4.0 || ~5.0
- symfony/framework-bundle: ^2.7 || ^3.0 || ^4.0 || ~5.0
- symfony/process: ^2.7 || ^3.0 || ^4.0 || ~5.0
Requires (Dev)
- aws/aws-sdk-php: ~3.0
- mockery/mockery: ~0.9.4
- pda/pheanstalk: ~3.0
- phpunit/phpunit: ~5.7
- predis/predis: ~1.0
README
简介
此队列组件包受到 Laravel 队列组件包的极大启发。实际上,一些文件是直接复制的。所以,向 Taylor Otwell 和 Laravel 团队致敬,他们为社区提供了一个出色的组件包。
安装
您可以通过 composer 安装 QueueBundle
composer require ideasbucket/queue-bundle
QueueBundle 支持 Symfony 2.8、3.0 及以上版本。
配置
安装组件包后,您需要修改您的 AppKernel.php
文件,添加类似以下的组件类条目。
<?php
use Symfony\Component\HttpKernel\Kernel;
use Symfony\Component\Config\Loader\LoaderInterface;
class AppKernel extends Kernel
{
public function registerBundles()
{
$bundles = array(
new Symfony\Bundle\FrameworkBundle\FrameworkBundle(),
new Symfony\Bundle\SecurityBundle\SecurityBundle(),
new Symfony\Bundle\TwigBundle\TwigBundle(),
new Symfony\Bundle\MonologBundle\MonologBundle(),
new Symfony\Bundle\SwiftmailerBundle\SwiftmailerBundle(),
new Doctrine\Bundle\DoctrineBundle\DoctrineBundle(),
new Sensio\Bundle\FrameworkExtraBundle\SensioFrameworkExtraBundle(),
new IdeasBucket\QueueBundle\IdeasBucketQueueBundle(), // ADD THIS
new AppBundle\AppBundle(),
);
....
return $bundles;
}
}
然后在 config.yml
中,您可以定义包含在组件包中的每个队列驱动程序的配置,包括数据库、Beanstalkd、Amazon SQS、Redis 和一个同步驱动程序,该驱动程序将立即执行任务(用于本地使用)。还包含一个 null
队列驱动程序,它简单地丢弃队列任务。
QueueBundle 所需的基本最小配置是配置 cache_handler
。基本上,它可以实现以下接口中的任何一个服务。
- PSR-16 缓存
- PSR-6 缓存池
- Doctrine 缓存
对于缓存处理程序,您可以定义如下服务。
# In service.yml or config.yml file
app.cache:
app: ANY_CACHE_ADAPTER
如果可能,我们建议使用 PSR-16 缓存接口。
您可以使用任何缓存适配器。有关缓存处理程序的更多信息,请访问 此处 或 此处。
QueueBundle 的完整配置如下。
ideasbucket_queue:
cache_handler: app.cache
default: sync # default
# Default config for command path you may need to change
# this if you are using Symfony 2.x directory structure.
command_path: %kernel.root_dir%/../bin/
lock_path: ~
lock_service: ideasbucket_queue.filesystem_switch # Default value
connections:
sqs:
driver: sqs
key: YOUR_KEY
secret: YOUR_SECRET
prefix: https://sqs.us-west-2.amazonaws.com/some-id
queue: default
region: us-west-2
redis:
driver: redis
client: YOUR_PREDIS_CLIENT
queue: default
retry_after: 90
beanstalkd:
driver: beanstalkd
host: localhost
port: 11300
persistent: ~
queue: default
retry_after: 90
database:
driver: database
queue: default
repository: YOUR_QUEUE_REPOSITORY
retry_after: 90
# If you want to store failed jobs in database.
#failed_job_repository: FAILED_JOB_REPOSITORY
连接与队列
在开始使用 QueueBundle 之前,了解“连接”和“队列”之间的区别非常重要。在您的 config.yml
中,您可以定义 connections
的配置。此选项定义了对后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。但是,任何给定的队列连接可能有多个“队列”,可以将其视为不同的任务队列。
请注意,config.yml
配置文件中的每个连接配置示例都包含一个 queue
属性。这是在将任务发送到给定连接时,任务将被调度到的默认队列。换句话说,如果您未显式指定任务应调度到哪个队列,则任务将被放置在连接配置的 queue
属性中定义的队列上。
// This job is sent to the default queue...
$this->get('idb_queue')->push('service_id');
// This job is sent to the "emails" queue...
$this->get('idb_queue')->push('service_id', [], 'emails');
某些应用程序可能永远不需要将作业推送到多个队列,而是更喜欢只有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为QueueBundle队列工作器允许您指定它应该按优先级处理的队列。例如,如果您将作业推送到一个high
队列,您可能运行一个给予它们更高处理优先级的工人
php console idb_queue:work --queue=high,default
驱动程序前提条件
数据库
为了使用database
队列驱动程序,您需要运行以下命令,该命令将生成支持队列所需的存储库和实体
php console idb_queue:database
这将生成您cache/output
文件夹中必要的文件,您需要将它们移动到适当的位置。然后定义一个服务,其定义将在命令运行结束时显示。
命令假定您正在使用带有注解配置的Doctrine(ORM或ODM)。如果您使用的是其他配置格式,那么您必须在生成的代码中进行必要的调整。
此外,如果您想使用关系数据库
,则需要"doctrine/orm"
;如果您想使用MongoDB
,则需要"doctrine/mongodb-odm"
和"doctrine/mongodb-odm-bundle"
。
如果您需要支持除Doctrine或任何其他数据库以外的其他库,请参阅此处
Redis
为了使用redis
队列驱动程序,您需要拥有任何提供Predis
客户端实例的服务。如果您使用SNC RedisBundle,那么它将是snc_redis.default_client
,假设您使用Predis作为默认的Redis客户端。
其他驱动程序先决条件
以下依赖关系是所需的队列驱动程序
-
Amazon SQS:
aws/aws-sdk-php ~3.0
-
Beanstalkd:
pda/pheanstalk ~3.0
-
Redis:
predis/predis ~1.0
创建任务
生成任务类
每个作业基本上是一个实现了IdeasBucket\QueueBundle\QueueableInterface
接口的服务。QueueBundle提供了生成作业的命令。
php console idb_queue:create_job
生成的类将位于您在命令期间选择的bundle中的Job
文件夹内。
作业结构
作业类非常简单,它只实现了包含只有fire
方法的Queueable接口,当作业由队列处理时调用该方法。为了入门,让我们看看一个示例作业类。在这个例子中,我们将使用队列发送电子邮件。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* @var \Swift_Mailer
*/
private $mailer;
public function __construct(\Swift_Mailer $mailer)
{
$this->mailer = $mailer;
}
/**
* @param JobsInterface $job
* @param array $data
*/
public function fire(JobsInterface $job, array $data = null)
{
// Create a message
//....
$this->mailer->send($message);
$job->delete();
}
}
然后您需要在您的service.yml文件中为作业定义一个服务。
services:
app_queue_mailer:
class: AppBundle\Job\QueueMailer
arguments: [ '@mailer']
{note} 二进制数据,如原始图像内容,应在传递给排队作业之前通过
base64_encode
函数进行传递。否则,作业在放置在队列上时可能无法正确序列化为JSON。
推送/调度作业
一旦您编写了作业类并配置了它的服务,您可以使用idb_queue
服务来调度它。
<?php
namespace AppBundle\Controllers;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
class MailerController extends Controller
{
/**
* The action/method handles the user registration.
*
* @param Request $request
* @Route("/mail", name="mail")
* @Method({"GET", "POST"})
*
* @return Response
*/
public function mail(Request $request)
{
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World']);
// Rest of the code......
}
}
延迟推送/调度
如果您想延迟队列作业的执行。
$tenMinutesLater = (new \DateTime)->modify('10 minute');
$this->get('idb_queue')->later($tenMinutesLater, 'app_queue_mailer', $data);
{note} Amazon SQS队列服务有最大延迟时间为15分钟。
自定义队列和连接
调度到特定队列
通过将作业推送到不同的队列,您可以“分类”您的排队作业,甚至可以优先考虑分配给各种队列的工人数。请注意,这不会将作业推送到由您的队列配置文件定义的不同队列“连接”,而只会推送到单个连接中的特定队列。
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing');
调度到特定连接
如果您正在处理多个队列连接,您可以选择将作业推送到哪个连接。
// On processing and sqs connection
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing', 'sqs');
指定最大任务尝试次数/超时值
最大尝试次数
指定一个作业可能尝试的最大次数的方法之一是通过控制台命令行上的--tries
开关。
php console idb_queue:work --tries=3
然而,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将覆盖命令行上提供的值。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
public $maxTries = 5;
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
如果您不喜欢使用公共属性。
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
private $maxTries = 5;
public function getMaxTries()
{
return $this->maxTries;
}
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
超时
同样,可以使用Artisan命令行上的--timeout
开关来指定作业可以运行的最大秒数。
php console idb_queue:work --timeout=30
但是,您也可以在作业类本身上定义作业应允许运行的最大秒数。如果作业上指定了超时,它将覆盖命令行上指定的任何超时。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
public $timeout = 5;
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
如果您不喜欢使用公共属性。
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
private $timeout = 5;
public function getTimeout()
{
return $this->timeout;
}
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
错误处理
如果在处理作业时抛出异常,作业将被自动释放回队列,以便可以再次尝试。作业将继续被释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由用于idb_queue:work
控制台命令的--tries
开关定义。或者,最大尝试次数可以在作业类本身上定义。有关运行队列工作者的更多信息,请参见下文。
运行队列工作进程
QueueBundle包含一个队列工作者,它将在新作业被推送到队列时处理它们。您可以使用idb_queue:work
控制台命令来运行工作者。请注意,一旦idb_queue:work
命令开始运行,它将继续运行,直到手动停止或您关闭终端。
php console idb_queue:work
您还可以使用
php console idb_queue:listen
{提示}为了永久在后台运行
idb_queue:work
进程,您应该使用进程监视器(例如Supervisor),以确保队列工作者不会停止运行。
请记住,队列工作者是长时间运行的进程,并在内存中存储启动的应用程序状态。因此,它们在启动后不会注意到代码库中的更改。所以,在您的部署过程中,务必重启您的队列工作者。
指定连接和队列
您还可以指定工作者应使用的队列连接。传递给work
命令的连接名称应与您的config/queue.php
配置文件中定义的连接之一相匹配。
php console idb_queue:work redis
您还可以通过只为给定连接处理特定的队列来进一步自定义您的队列工作者。例如,如果您的所有电子邮件都在redis
队列连接上的emails
队列中处理,您可以使用以下命令启动一个只处理该队列的工作者。
php console idb_queue:work redis --queue=emails
资源考虑事项
守护进程队列工作者在处理每个作业之前不会“重新启动”框架。因此,您应该在每个作业完成后释放任何重型资源。例如,如果您使用GD库进行图像处理,您应该在完成后使用imagedestroy
释放内存。
队列优先级
有时您可能希望优先处理队列。例如,在您的config.yml
中,您可能将redis
连接的默认queue
设置为low
。但是,偶尔您可能希望将作业推送到一个high
优先级队列,如下所示。
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'high');
为了启动一个工作者,它将验证所有high
队列作业在继续处理任何low
队列上的作业之前都已处理,请将队列名称的逗号分隔列表传递给work
命令。
php console idb_queue:work --queue=high,low
队列工作进程和部署
由于队列工作进程是长期运行的过程,它们不会在未重启的情况下自动获取代码更改。因此,使用队列工作进程部署应用程序的最简单方法是,在部署过程中重启工作进程。您可以通过发出idb_queue:restart
命令来优雅地重启所有工作进程。
php console idb_queue:restart
此命令将指示所有队列工作进程在完成当前作业后优雅地“死亡”,以防止丢失现有作业。由于队列工作进程将在执行idb_queue:restart
命令时“死亡”,因此您应该运行一个进程管理器,如Supervisor,来自动重启队列工作进程。
任务过期和超时
作业过期
在您的config.yml
配置文件中,每个队列连接都定义了一个retry_after
选项。此选项指定队列连接在重试正在处理的作业之前应等待多少秒。例如,如果将retry_after
的值设置为90
,则如果作业在90秒内未删除且正在处理,则将作业重新放入队列。通常,您应该将retry_after
值设置为作业合理完成处理所需的最大秒数。
{note} 没有包含
retry_after
值的唯一队列连接是Amazon SQS。SQS将根据默认可见性超时重试作业,该超时由AWS控制台管理。
工作进程超时
idb_queue:work
控制台命令公开了--timeout
选项。该--timeout
选项指定队列主进程在杀死正在处理作业的子队列工作进程之前将等待多长时间。有时子队列进程可能因各种原因而“冻结”,例如未响应的外部HTTP调用。《code> --timeout选项删除超过指定时间限制的冻结进程。
php console idb_queue:work --timeout=60
retry_after
配置选项和--timeout
CLI选项不同,但它们协同工作以确保作业不会丢失,并且作业只成功处理一次。
{note}
--timeout
值应始终至少比您的retry_after
配置值短几秒。这将确保在作业重试之前始终杀死处理特定作业的工作进程。如果您的--timeout
选项比您的retry_after
配置值长,则您的作业可能会被处理两次。
工作进程睡眠持续时间
当队列上有作业可用时,工作进程将在作业之间没有延迟的情况下继续处理作业。然而,《code> sleep选项确定在没有新作业可用的情况下,工作进程将“睡眠”多长时间。
php console idb_queue:work --sleep=3
Supervisor 配置
安装Supervisor
Supervisor是Linux操作系统的进程监控器,如果它失败,将自动重启您的idb_queue:work
进程。要在Ubuntu上安装Supervisor,您可以使用以下命令
sudo apt-get install supervisor
配置Supervisor
Supervisor配置文件通常存储在/etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,以指导Supervisor如何监控您的进程。例如,让我们创建一个queue-worker.conf
文件,该文件启动并监控一个idb_queue:work
进程。
[program:queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/project/acme/bin/console idb_queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=johndoe
numprocs=8
redirect_stderr=true
stdout_logfile=/home/project/acme/var/log/worker.log
在此示例中,numprocs
指令将指导Supervisor运行8个idb_queue:work
进程并监控所有这些进程,如果它们失败,则自动重启它们。当然,您应该更改command
指令中的idb_queue:work sqs
部分,以反映您想要的队列连接。
启动Supervisor
一旦创建配置文件,您可以使用以下命令更新Supervisor配置并启动进程
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start queue-worker:*
有关Supervisor的更多信息,请参阅Supervisor文档。
处理失败的任务
有时您的队列作业会失败。不用担心,事情并不总是按计划进行!QueueBundle提供了一个方便的方式来指定作业应该尝试的最大次数。当作业超过这个尝试次数后,它可以被插入到数据库中。要使用数据库来存储失败的作业,您应该使用idb_queue:fail_database
命令
php console idb_queue:fail_database
该命令会提出一些问题,您需要回答。然后为您生成必要的文件。过程与使用数据库进行队列类似。
然后,在运行您的队列工作进程时,您应该在idb_queue:work
命令上使用--tries
开关来指定作业应该尝试的最大次数。如果您没有指定--tries
选项的值,作业将无限期尝试。
php console idb_queue:work redis --tries=3
清理失败的任务
您需要在您的作业类中直接实现IdeasBucket\QueueBundle\QueueErrorInterface
,这样您就可以在失败发生时执行特定于作业的清理操作。这是向用户发送警报或撤销作业执行的操作的完美位置。导致作业失败的Exception
将被传递给failed
方法。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
use IdeasBucket\QueueBundle\QueueErrorInterface;
class QueueMailer implements QueueableInterface, QueueErrorInterface
{
public function fire(JobsInterface $job, array $data = null)
{
// ....
$job->delete();
}
public function failed(\Exception $e, $payload = null)
{
// Do something with the error
}
}
重试失败的任务
要查看已插入到您数据库中的所有失败的作业,您可以使用idb_queue:failed
控制台命令
php console idb_queue:failed
idb_queue:failed
命令将列出作业ID、连接、队列和失败时间。可以使用作业ID重试失败的作业。例如,要重试ID为5
的失败作业,请发出以下命令
php console idb_queue:retry 5
要重试所有失败的作业,请执行idb_queue:retry
命令并传递all
作为ID
php console idb_queue:retry all
如果您想删除一个失败的作业,您可以使用idbb_queue:forget
命令
php console idb_queue:forget 5
要删除所有失败的作业,您可以使用idb_queue:flush
命令
php console idb_queue:flush
任务事件
QueueBundle提供了一些事件,您可以在作业执行期间监听这些事件。这是一个通过电子邮件或HipChat通知您的团队或在工作失败时采取某些操作的绝佳机会。您可以通过以下方式附加监听器。
<?php
namespace AppBundle\EventListener;
use Symfony\Component\EventDispatcher\Event;
/**
* Class QueueListener
*
* @package AppBundle\EventListener
*/
class QueueListener
{
/**
* This method will be called before Job firing
*
* @param Event $event
*/
public function during(Event $event)
{
}
}
然后注册一个服务
appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
- { name: kernel.event_listener, event: job_processing, method: during }
- { name: kernel.event_listener, event: job_failed, method: during }
- { name: kernel.event_listener, event: job_exception_occurred, method: during }
- { name: kernel.event_listener, event: job_processed, method: during }
- { name: kernel.event_listener, event: looping, method: during }
- { name: kernel.event_listener, event: worker_stopping, method: during }
当然,您可以在不同的事件中调用不同的方法。如果您使用的是Symfony 3.2或更高版本
,我们建议您使用类常量,如下所示。
appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_EXCEPTION_OCCURRED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_FAILED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSING, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::LOOPING, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::WORKER_STOPPING, method: during }
有关创建事件监听器和注册的更多信息,请访问Symfony文档。
其他数据库和库支持
当您执行idb_queue:database
命令时,它假定您正在使用Doctrine(ORM或ODM)。如果您没有使用Doctrine或只想支持其他数据库,那么在QueueBundle中就相对简单。
您需要确保Repository实现了IdeasBucket\QueueBundle\Repository\DatabaseQueueRepositoryInterface
接口,并且Entity实现了IdeasBucket\QueueBundle\Entity\DatabaseQueueEntityInterface
接口。只要满足这些要求,您就可以使用任何库或数据库。
GIST
即将推出。