ideasbucket / queue-bundle
IdeasBucket QueueBundle for Symfony 框架
Requires
- php: >=5.6.0
- ext-mbstring: *
- ext-openssl: *
- ideasbucket/common: ~1.1
- psr/simple-cache: 1.0.0
- symfony/console: ~2.5|~3.0
- symfony/framework-bundle: ^2.7 || ^3.0
- symfony/process: ^2.7 || ^3.0
Requires (Dev)
- aws/aws-sdk-php: ~3.0
- mockery/mockery: ~0.9.4
- pda/pheanstalk: ~3.0
- phpunit/phpunit: ~5.7
- predis/predis: ~1.0
This package is not auto-updated.
Last update: 2024-09-18 21:00:02 UTC
README
介绍
这个 QueueBundle 受 Laravel Queue 包的启发很大。实际上,一些文件是直接复制的。因此,向 Taylor Otwell 和 Laravel 团队致敬,因为他们为社区提供了一个出色的包。
安装
您可以通过 composer 安装 QueueBundle。
composer require ideasbucket/queue-bundle
QueueBundle 支持 Symfony 2.8、3.0 及以上版本。
配置
一旦您安装了该包,您需要在 AppKernel.php
中进行更改,通过添加类似以下的包类条目。
<?php
use Symfony\Component\HttpKernel\Kernel;
use Symfony\Component\Config\Loader\LoaderInterface;
class AppKernel extends Kernel
{
public function registerBundles()
{
$bundles = array(
new Symfony\Bundle\FrameworkBundle\FrameworkBundle(),
new Symfony\Bundle\SecurityBundle\SecurityBundle(),
new Symfony\Bundle\TwigBundle\TwigBundle(),
new Symfony\Bundle\MonologBundle\MonologBundle(),
new Symfony\Bundle\SwiftmailerBundle\SwiftmailerBundle(),
new Doctrine\Bundle\DoctrineBundle\DoctrineBundle(),
new Sensio\Bundle\FrameworkExtraBundle\SensioFrameworkExtraBundle(),
new IdeasBucket\QueueBundle\IdeasBucketQueueBundle(), // ADD THIS
new AppBundle\AppBundle(),
);
....
return $bundles;
}
}
然后在 config.yml
中,您可以定义包中包含的每个队列驱动程序的配置,包括数据库、Beanstalkd、Amazon SQS、Redis 以及一个同步驱动程序,该驱动程序将立即执行作业(用于本地使用)。还包含一个 null
队列驱动程序,它简单地丢弃队列中的作业。
QueueBundle 所需的基本最小配置是配置 cache_handler
。基本上,它可以实现这些接口中的任何一个的服务。
- PSR-16 缓存
- PSR-6 缓存池
- Doctrine 缓存
对于缓存处理器,您可以定义如下服务。
# In service.yml or config.yml file
app.cache:
app: ANY_CACHE_ADAPTER
如果可能,我们推荐使用 PSR-16 缓存接口。
您可以使用任何缓存适配器。有关缓存处理器更多信息,请访问 这里 或 这里。
QueueBundle 的完整配置如下。
ideasbucket_queue:
cache_handler: app.cache
default: sync # default
# Default config for command path you may need to change
# this if you are using Symfony 2.x directory structure.
command_path: %kernel.root_dir%/../bin/
lock_path: ~
lock_service: ideasbucket_queue.filesystem_switch # Default value
connections:
sqs:
driver: sqs
key: YOUR_KEY
secret: YOUR_SECRET
prefix: https://sqs.us-west-2.amazonaws.com/some-id
queue: default
region: us-west-2
redis:
driver: redis
client: YOUR_PREDIS_CLIENT
queue: default
retry_after: 90
beanstalkd:
driver: beanstalkd
host: localhost
port: 11300
persistent: ~
queue: default
retry_after: 90
database:
driver: database
queue: default
repository: YOUR_QUEUE_REPOSITORY
retry_after: 90
# If you want to store failed jobs in database.
#failed_job_repository: FAILED_JOB_REPOSITORY
连接与队列
在开始使用 QueueBundle 之前,理解 "连接" 和 "队列" 之间的区别很重要。在您的 config.yml
中,您可以定义 connections
的配置。此选项定义了到后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可以有多个 "队列",这些队列可以被视为不同的作业堆栈或堆。
请注意,config.yml
配置文件中的每个连接配置示例都包含一个 queue
属性。这是作业被发送到给定连接时默认派遣到的队列。换句话说,如果您不明确指定作业应派遣到哪个队列,则作业将被放置在连接配置中 queue
属性定义的队列上。
// This job is sent to the default queue...
$this->get('idb_queue')->push('service_id');
// This job is sent to the "emails" queue...
$this->get('idb_queue')->push('service_id', [], 'emails');
某些应用程序可能永远不需要将工作推送到多个队列,而是更喜欢有一个简单的队列。然而,将工作推送到多个队列对于希望优先处理或分割工作处理的程序特别有用,因为QueueBundle队列工作进程允许您指定按优先级处理的队列。例如,如果您将工作推送到high
队列,您可能会运行一个赋予它们更高处理优先级的worker。
php console idb_queue:work --queue=high,default
驱动程序先决条件
数据库
为了使用database
队列驱动程序,您需要运行以下命令,这将生成支持队列所需的存储库和实体:
php console idb_queue:database
这将生成您cache/output
文件夹中必要的文件,您需要将其移动到适当的文件夹。然后定义一个在命令运行结束时显示的服务定义。
该命令假定您正在使用具有注释配置的Doctrine(ORM或ODM)。如果您使用任何其他配置格式,则必须在生成的代码中进行必要的调整。
此外,如果您想使用关系数据库,则需要"doctrine/orm"
;如果您想使用MongoDB,则需要"doctrine/mongodb-odm"
和"doctrine/mongodb-odm-bundle"
。
如果出于任何原因您需要支持除了Doctrine或任何其他数据库之外的任何库,请参阅此处。
Redis
为了使用redis
队列驱动程序,您需要有一个提供Predis
客户端实例的服务。如果您使用SNC RedisBundle,则默认Redis客户端将是snc_redis.default_client
。
其他驱动程序先决条件
以下依赖项对于列出的队列驱动程序是必需的:
-
Amazon SQS:
aws/aws-sdk-php ~3.0
-
Beanstalkd:
pda/pheanstalk ~3.0
-
Redis:
predis/predis ~1.0
创建作业
生成作业类
每个工作基本上都是一个实现了IdeasBucket\QueueBundle\QueueableInterface
接口的服务。QueueBundle提供了生成工作的命令。
php console idb_queue:create_job
生成的类将位于您在命令期间选择的bundle中的Job
文件夹内。
工作结构
工作类非常简单,它仅仅实现了一个包含只有一个fire
方法的Queueable接口,该方法是当工作被队列处理时调用的。为了入门,让我们看看一个示例工作类。在这个例子中,我们将使用队列发送电子邮件。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* @var \Swift_Mailer
*/
private $mailer;
public function __construct(\Swift_Mailer $mailer)
{
$this->mailer = $mailer;
}
/**
* @param JobsInterface $job
* @param array $data
*/
public function fire(JobsInterface $job, array $data = null)
{
// Create a message
//....
$this->mailer->send($message);
$job->delete();
}
}
然后您需要在您的service.yml文件中定义一个用于该工作的工作服务。
services:
app_queue_mailer:
class: AppBundle\Job\QueueMailer
arguments: [ '@mailer']
{note} 二进制数据,如原始图像内容,在传递给队列工作之前应该通过
base64_encode
函数进行编码。否则,当将其放置在队列上时,工作可能无法正确序列化为JSON。
推送/调度工作
一旦您编写了工作类并为其配置了服务,您可以使用idb_queue
服务来调度它。
<?php
namespace AppBundle\Controllers;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
class MailerController extends Controller
{
/**
* The action/method handles the user registration.
*
* @param Request $request
* @Route("/mail", name="mail")
* @Method({"GET", "POST"})
*
* @return Response
*/
public function mail(Request $request)
{
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World']);
// Rest of the code......
}
}
延迟推送/调度
如果您想延迟队列中工作的执行。
$tenMinutesLater = (new \DateTime)->modify('10 minute');
$this->get('idb_queue')->later($tenMinutesLater, 'app_queue_mailer', $data);
{note} Amazon SQS队列服务有最大延迟时间为15分钟。
自定义队列与连接
向特定队列推送
通过将工作推送到不同的队列,您可以“分类”您的队列工作,甚至可以优先考虑分配给各种队列的工作者的数量。请注意,这不会将工作推送到由您的队列配置文件定义的不同队列“连接”,而只是推送到单个连接中的特定队列。
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing');
向特定连接推送
如果您正在处理多个队列连接,您可以指定将工作推送到哪个连接。
// On processing and sqs connection
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing', 'sqs');
指定最大作业尝试次数/超时值
最大尝试次数
指定一个工作可以尝试的最大次数的方法之一是在控制台命令行中使用--tries
开关。
php console idb_queue:work --tries=3
然而,您可以通过在作业类本身上定义最大尝试次数来采取更精细的方法。如果指定了作业上的最大尝试次数,则它将优先于命令行提供的值。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
public $maxTries = 5;
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
如果您不喜欢使用公共属性。
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
private $maxTries = 5;
public function getMaxTries()
{
return $this->maxTries;
}
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
超时
同样,您可以使用 Artisan 命令行的 --timeout
开关来指定作业可以运行的最大秒数。
php console idb_queue:work --timeout=30
然而,您也可以在作业类本身上定义作业允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时。
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
public $timeout = 5;
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
如果您不喜欢使用公共属性。
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
class QueueMailer implements QueueableInterface
{
/**
* The number of max times the job may be attempted.
*
* @var int
*/
private $timeout = 5;
public function getTimeout()
{
return $this->timeout;
}
/**
* @param JobsInterface $job
* @param mixed $data
*/
public function fire(JobsInterface $job, $data = null)
{
// ....
$job->delete();
}
}
错误处理
如果在处理作业期间抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到达到您应用程序允许的最大尝试次数。最大尝试次数由 idb_queue:work
控制台命令上的 --tries
开关定义。或者,最大尝试次数也可以在作业类本身上定义。有关运行队列工作者的更多信息,请见下文。
运行队列工作进程
QueueBundle 包含一个队列工作者,它会处理队列上推送的新作业。您可以使用 idb_queue:work
控制台命令来运行工作者。请注意,一旦 idb_queue:work
命令开始运行,它将继续运行,直到手动停止或您关闭终端。
php console idb_queue:work
您还可以使用
php console idb_queue:listen
{提示} 为了永久在后台运行
idb_queue:work
进程,您应该使用进程监控程序,如 Supervisor,以确保队列工作者不会停止运行。
记住,队列工作者是长期运行的进程,并将启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。所以,在您的部署过程中,务必重启您的队列工作者。
指定连接和队列
您还可以指定工作者应使用哪个队列连接。传递给 work
命令的连接名称应与您的 config/queue.php
配置文件中定义的连接之一相对应。
php console idb_queue:work redis
您还可以通过只为给定连接处理特定的队列来进一步自定义您的队列工作者。例如,如果您的所有电子邮件都在 redis
队列连接上的 emails
队列中处理,您可以使用以下命令启动仅处理该队列的工作者
php console idb_queue:work redis --queue=emails
资源考虑
守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每次作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,您应该在完成后使用 imagedestroy
释放内存。
队列优先级
有时您可能希望优先处理队列。例如,在您的 config.yml
中,您可能将 redis
连接的默认 queue
设置为 low
。然而,有时您可能希望将作业推送到 high
优先级队列,如下所示
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'high');
要启动一个工作者,该工作者在继续处理 low
队列上的任何作业之前先验证所有 high
队列作业已处理,请将队列名称的逗号分隔列表传递给 work
命令
php console idb_queue:work --queue=high,low
队列工作进程与部署
由于队列工作者是长期运行的进程,它们在未重启的情况下不会获取代码更改。因此,使用队列工作者部署应用程序的最简单方法是,在部署过程中重启工作者。您可以通过发出 idb_queue:restart
命令来优雅地重启所有工作者。
php console idb_queue:restart
该命令将指示所有队列工作者在处理完当前工作后优雅地“死亡”,以确保不会丢失现有工作。由于队列工作者将在执行idb_queue:restart
命令时死亡,因此您应该运行进程管理器,如Supervisor,来自动重启队列工作者。
作业过期与超时
工作到期
在您的config.yml
配置文件中,每个队列连接定义了一个retry_after
选项。此选项指定队列连接在重试正在处理的工作之前应等待多少秒。例如,如果将retry_after
的值设置为90
,则在90秒内未删除的处理工作将被释放回队列。通常,您应将retry_after
值设置为工作完成处理所需的最大合理秒数。
{注意} 不包含
retry_after
值的唯一队列连接是Amazon SQS。SQS将根据在AWS控制台中管理的默认可见性超时重试工作。
工作者超时
idb_queue:work
控制台命令公开了一个--timeout
选项。该--timeout
选项指定队列主进程在杀死正在处理工作的子队列工作者之前将等待多长时间。有时子队列进程可能因为各种原因而“冻结”,例如未响应的外部HTTP调用。《code>--timeout选项将超过指定时间限制的冻结进程移除。
php console idb_queue:work --timeout=60
retry_after
配置选项和--timeout
CLI选项不同,但它们协同工作以确保工作不会丢失,并且工作只会成功处理一次。
{注意}
--timeout
值应始终至少比您的retry_after
配置值短几秒。这将确保处理特定工作的工作者总是在工作重试之前被杀死。如果您的--timeout
选项比您的retry_after
配置值长,则您的工作可能会被处理两次。
工作者休眠时间
当队列上有工作可用时,工作者将在它们之间没有任何延迟地持续处理工作。然而,《code>sleep选项决定了如果不存在新工作,工作者将“休眠”多长时间。
php console idb_queue:work --sleep=3
监督者配置
安装Supervisor
Supervisor是Linux操作系统的进程监控器,如果它失败,将自动重启您的idb_queue:work
进程。要在Ubuntu上安装Supervisor,您可以使用以下命令
sudo apt-get install supervisor
配置Supervisor
Supervisor配置文件通常存储在/etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,以指导Supervisor如何监控您的进程。例如,让我们创建一个queue-worker.conf
文件,以启动和监控一个idb_queue:work
进程
[program:queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/project/acme/bin/console idb_queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=johndoe
numprocs=8
redirect_stderr=true
stdout_logfile=/home/project/acme/var/log/worker.log
在此示例中,numprocs
指令将指导Supervisor运行8个idb_queue:work
进程并监控所有进程,如果它们失败,将自动重启它们。当然,您应该将command
指令中的idb_queue:work sqs
部分更改为反映您希望的队列连接。
启动Supervisor
一旦创建配置文件,您可以使用以下命令更新Supervisor配置并启动进程
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start queue-worker:*
有关Supervisor的更多信息,请参阅Supervisor文档。
处理失败作业
有时您的队列作业会失败。不要担心,事情并不总是按计划进行!QueueBundle 包括一种方便的方式来指定一个作业应该尝试的最大次数。当一个作业尝试次数超过这个数量后,它可以被插入到数据库中。要使用数据库来存储失败的作业,您应该使用 idb_queue:fail_database
命令
php console idb_queue:fail_database
这个命令将提出一些问题,您需要回答。然后为您生成必要的文件。过程与使用数据库进行队列类似。
然后,在运行您的 队列工作进程 时,您应该在 idb_queue:work
命令中使用 --tries
开关来指定一个作业应该尝试的最大次数。如果您没有指定 --tries
选项的值,作业将无限尝试
php console idb_queue:work redis --tries=3
清理失败作业后的操作
您需要在您的作业类中直接实现 IdeasBucket\QueueBundle\QueueErrorInterface
,这样您就可以在失败发生时执行特定于作业的清理工作。这是向用户发送警报或撤销作业执行的完美位置。失败的 Exception
将传递给 failed
方法
<?php
namespace AppBundle\Job;
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
use IdeasBucket\QueueBundle\QueueErrorInterface;
class QueueMailer implements QueueableInterface, QueueErrorInterface
{
public function fire(JobsInterface $job, array $data = null)
{
// ....
$job->delete();
}
public function failed(\Exception $e, $payload = null)
{
// Do something with the error
}
}
重试失败作业
要查看所有已插入到您的数据库中的失败作业,您可以使用 idb_queue:failed
控制台命令
php console idb_queue:failed
idb_queue:failed
命令将列出作业 ID、连接、队列和失败时间。可以使用作业 ID 来重试失败的作业。例如,要重试具有 ID 5
的失败作业,请发出以下命令
php console idb_queue:retry 5
要重试所有失败的作业,执行 idb_queue:retry
命令并将 all
作为 ID 传递
php console idb_queue:retry all
如果您想删除一个失败的作业,可以使用 idbb_queue:forget
命令
php console idb_queue:forget 5
要删除所有失败的作业,您可以使用 idb_queue:flush
命令
php console idb_queue:flush
作业事件
QueueBundle 提供了您可以在作业执行期间监听的一些事件。这是通过电子邮件或 HipChat 通知您的团队或如果作业失败采取一些行动的绝佳机会。您可以像这样附加监听器。
<?php
namespace AppBundle\EventListener;
use Symfony\Component\EventDispatcher\Event;
/**
* Class QueueListener
*
* @package AppBundle\EventListener
*/
class QueueListener
{
/**
* This method will be called before Job firing
*
* @param Event $event
*/
public function during(Event $event)
{
}
}
然后注册一个服务
appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
- { name: kernel.event_listener, event: job_processing, method: during }
- { name: kernel.event_listener, event: job_failed, method: during }
- { name: kernel.event_listener, event: job_exception_occurred, method: during }
- { name: kernel.event_listener, event: job_processed, method: during }
- { name: kernel.event_listener, event: looping, method: during }
- { name: kernel.event_listener, event: worker_stopping, method: during }
当然,您可以在不同的事件中调用不同的方法。如果您正在使用 3.2 或以上版本的 Symfony
,我们建议您使用类常量,如下所示。
appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_EXCEPTION_OCCURRED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_FAILED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSED, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSING, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::LOOPING, method: during }
- { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::WORKER_STOPPING, method: during }
有关创建事件监听器和注册的更多信息,请访问 Symfony 文档。
其他数据库和库支持
当您执行 idb_queue:database
命令时,它假定您正在使用 Doctrine(ORM 或 ODM)。如果您没有使用 Doctrine 或只想支持其他数据库,那么在 QueueBundle 中它相当简单。
您需要确保仓库实现接口 IdeasBucket\QueueBundle\Repository\DatabaseQueueRepositoryInterface
,并且实体实现 IdeasBucket\QueueBundle\Entity\DatabaseQueueEntityInterface
。只要满足这些要求,您就可以使用任何库或数据库。
GIST
即将推出。