kogeva/queue-bundle

为 Symfony 框架分叉 IdeasBucket QueueBundle

安装: 2,623

依赖: 0

建议者: 0

安全: 0

星标: 0

关注者: 2

分支: 12

公开问题: 0

类型:kogeva-queue-bundle

v2.0 2019-03-22 16:47 UTC

This package is not auto-updated.

Last update: 2024-09-29 19:59:23 UTC


README

Build Status

简介

这个 QueueBundle 受 Laravel Queue 包的启发很大。事实上,一些文件是直接复制的。所以,向 Taylor Otwell 和 Laravel 团队致敬,因为他们为社区提供了一个出色的包。

安装

您可以通过 composer 安装 QueueBundle

composer require kogeva/queue-bundle

QueueBundle 支持 Symfony 3+、4+ 及以上版本。

配置

####对于 Symfony 3

安装包后,您需要更改 AppKernel.php,添加类似以下的包类条目。

<?php

use Symfony\Component\HttpKernel\Kernel;
use Symfony\Component\Config\Loader\LoaderInterface;

class AppKernel extends Kernel
{
    public function registerBundles()
    {
        $bundles = array(
            new Symfony\Bundle\FrameworkBundle\FrameworkBundle(),
            new Symfony\Bundle\SecurityBundle\SecurityBundle(),
            new Symfony\Bundle\TwigBundle\TwigBundle(),
            new Symfony\Bundle\MonologBundle\MonologBundle(),
            new Symfony\Bundle\SwiftmailerBundle\SwiftmailerBundle(),
            new Doctrine\Bundle\DoctrineBundle\DoctrineBundle(),
            new Sensio\Bundle\FrameworkExtraBundle\SensioFrameworkExtraBundle(),
            new IdeasBucket\QueueBundle\IdeasBucketQueueBundle(), // ADD THIS
            new AppBundle\AppBundle(),
        );

        ....

        return $bundles;
    }
}

####对于 Symfony 4

安装包后,您需要更改 config/bundles.php,添加类似以下的包类条目。

<?php

return [
    Symfony\Bundle\FrameworkBundle\FrameworkBundle::class => ['all' => true],
    Sensio\Bundle\FrameworkExtraBundle\SensioFrameworkExtraBundle::class => ['all' => true],
    Symfony\Bundle\WebServerBundle\WebServerBundle::class => ['dev' => true],
    Symfony\Bundle\MakerBundle\MakerBundle::class => ['dev' => true],
    Symfony\Bundle\TwigBundle\TwigBundle::class => ['dev' => true, 'test' => true],
    Symfony\Bundle\WebProfilerBundle\WebProfilerBundle::class => ['dev' => true, 'test' => true],
    Doctrine\Bundle\DoctrineCacheBundle\DoctrineCacheBundle::class => ['all' => true],
    Doctrine\Bundle\DoctrineBundle\DoctrineBundle::class => ['all' => true],
    Doctrine\Bundle\MigrationsBundle\DoctrineMigrationsBundle::class => ['all' => true],
    Symfony\Bundle\SecurityBundle\SecurityBundle::class => ['all' => true],
    Symfony\Bundle\MonologBundle\MonologBundle::class => ['all' => true],
    \IdeasBucket\QueueBundle\IdeasBucketQueueBundle::class => ['all' => true],
];

然后在 config.yml 中,您可以定义每个包含在包中的队列驱动程序的配置,这包括数据库、BeanstalkdAmazon SQSRedis,以及一个同步驱动器,它将立即执行作业(用于本地使用)。还包括一个 null 队列驱动器,它只是简单地丢弃队列作业。

QueueBundle 所需的基本最小配置是配置 cache_handler。基本上,它可以实现这些接口中的任何一个服务。

  • PSR-16 缓存
  • PSR-6 缓存池
  • Doctrine 缓存

对于缓存处理程序,您可以定义如下服务。

# In service.yml or config.yml file
app.cache:
    app: ANY_CACHE_ADAPTER

如果可能,我们推荐使用 PSR-16 缓存接口。

您可以使用任何缓存适配器。有关缓存处理程序的更多信息,请访问 此处此处

QueueBundle 的完整配置如下。

ideasbucket_queue:
    cache_handler: cache.app
    default: sync # default
    # Default config for command path you may need to change 
    # this if you are using Symfony 2.x directory structure.
    command_path: '%kernel.root_dir%/../bin/'
    lock_path: ~
    lock_service: ideasbucket_queue.filesystem_switch # Default value
    connections:
        sqs:
            driver: sqs
            key: YOUR_KEY
            secret: YOUR_SECRET
            prefix: https://sqs.us-west-2.amazonaws.com/some-id
            queue: default
            region: us-west-2
        redis:
            driver: redis
            client: YOUR_PREDIS_CLIENT
            queue: default
            retry_after: 90
        beanstalkd:
            driver: beanstalkd
            host: localhost
            port: 11300
            persistent: ~
            queue: default
            retry_after: 90
        database:
            driver: database
            queue: default
            repository: YOUR_QUEUE_REPOSITORY
            retry_after: 90
    # If you want to store failed jobs in database.          
    #failed_job_repository: FAILED_JOB_REPOSITORY

连接与队列

在开始使用 QueueBundle 之前,了解“连接”和“队列”之间的区别很重要。在您的 config.yml 中,您可以定义 connections 的配置。此选项定义了到后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,这些队列可以看作是不同的作业堆栈或堆。

请注意,config.yml配置文件中的每个连接配置示例都包含一个queue属性。这是当任务被发送到指定连接时,任务将被分配到的默认队列。换句话说,如果您没有明确指定任务应该分配到哪个队列,则任务将被放置在连接配置的queue属性中定义的队列。

// This job is sent to the default queue...
$this->get('idb_queue')->push('service_id');

// This job is sent to the "emails" queue...
$this->get('idb_queue')->push('service_id', [], 'emails');

某些应用程序可能永远不需要将任务推送到多个队列,而是更愿意使用一个简单的队列。然而,将任务推送到多个队列对于希望优先处理或细分任务处理的应用程序非常有用,因为队列包的队列工作进程允许您按优先级指定它应该处理的队列。例如,如果您将任务推送到high队列,您可能运行一个赋予它们更高处理优先级的工人。

php console idb_queue:work --queue=high,default

驱动器先决条件

数据库

为了使用database队列驱动程序,您需要运行以下命令,这将生成必要的存储库和实体以支持队列

php console idb_queue:database

这将生成您cache/output文件夹中必要的文件,您需要将它们移动到适当的位置。然后定义一个服务,该服务的定义将在命令运行结束时显示。

该命令假定您正在使用具有注释配置的Doctrine(ORM或ODM)。如果您使用的是其他配置格式,则必须在生成的代码中进行必要的调整。

此外,如果您想使用关系数据库,则需要"doctrine/orm";如果您想使用MongoDB,则需要"doctrine/mongodb-odm""doctrine/mongodb-odm-bundle"

如果您需要支持除了Doctrine或任何其他数据库之外的任何库,请参阅此处

Redis

为了使用redis队列驱动程序,您需要有一个提供Predis客户端实例的服务。如果您使用的是SNC RedisBundle,则将是snc_redis.default_client,前提是您正在使用Predis作为默认的Redis客户端。

其他驱动程序先决条件

以下依赖关系对于列出的队列驱动程序是必需的

  • Amazon SQS: aws/aws-sdk-php ~3.0

  • Beanstalkd: pda/pheanstalk ~3.0

  • Redis: predis/predis ~1.0

创建作业

生成作业类

每个任务基本上是一个实现IdeasBucket\QueueBundle\QueueableInterface接口的服务。队列包提供生成任务的命令。

php console idb_queue:create_job

生成的类将位于您在命令期间选择的包内的Job文件夹中。

任务结构

任务类非常简单,它仅实现了包含只有fire方法的队列接口,该方法在队列处理任务时被调用。为了开始,让我们看一下一个示例任务类。在这个例子中,我们将使用队列发送电子邮件。

<?php

namespace AppBundle\Job;

use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;

class QueueMailer implements QueueableInterface
{
    /**
     * @var \Swift_Mailer
     */
    private $mailer;
    
    public function __construct(\Swift_Mailer $mailer)
    {
        $this->mailer = $mailer;
    }
    
    /**
     * @param JobsInterface $job
     * @param array         $data
     */
    public function fire(JobsInterface $job, array $data = null)
    {
        // Create a message
        //....
        $this->mailer->send($message);
        
        $job->delete();
    }
}

然后您需要在您的service.yml文件中为任务定义一个服务。

services:
    app_queue_mailer:
        class: AppBundle\Job\QueueMailer
        arguments: [ '@mailer']

{note} 二进制数据,如原始图像内容,在传递给队列任务之前应通过base64_encode函数进行传递。否则,当将其放置在队列上时,任务可能无法正确序列化为JSON。

推送/分配任务

一旦您编写了您的任务类并为其配置了服务,您就可以使用idb_queue服务来分配它。

<?php

namespace AppBundle\Controllers;

use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

class MailerController extends Controller
{
    /**
     * The action/method handles the user registration.
     *
     * @param Request $request
     * @Route("/mail", name="mail")
     * @Method({"GET", "POST"})
     *
     * @return Response
     */
    public function mail(Request $request)
    {
        $this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World']);
        
        // Rest of the code......
    }
}

延迟推送/分配

如果您想延迟队列中任务的执行。

$tenMinutesLater = (new \DateTime)->modify('10 minute');
$this->get('idb_queue')->later($tenMinutesLater, 'app_queue_mailer', $data);

{note} Amazon SQS队列服务有最长延迟时间为15分钟。

自定义队列与连接

分配到特定队列

通过将任务推送到不同的队列,您可以“分类”您的排队任务,甚至可以根据分配给各个队列的工作者数量进行优先级排序。请注意,这并不是将任务推送到由队列配置文件定义的不同队列“连接”,而只是推送到单个连接中的特定队列。

$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing');

派送到特定连接

如果您正在使用多个队列连接,您可以选择将任务推送到哪个连接。

// On processing and sqs connection
$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'processing', 'sqs');

指定最大尝试次数/超时值

最大尝试次数

指定任务最多可以尝试多少次的一种方法是通过控制台命令行上的--tries开关。

php console idb_queue:work --tries=3

但是,您也可以通过在任务类本身上定义最大尝试次数来采取更细致的方法。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的值。

<?php

namespace AppBundle\Job;

use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
    
class QueueMailer implements QueueableInterface
{
    /**
     * The number of max times the job may be attempted.
     *
     * @var int
     */
    public $maxTries = 5;
    
    /**
     * @param JobsInterface $job
     * @param mixed         $data
     */
    public function fire(JobsInterface $job, $data = null)
    {
        // ....
                     
        $job->delete();
    }
}

如果您不喜欢使用公共属性。

namespace AppBundle\Job;
    
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
        
class QueueMailer implements QueueableInterface
{
    /**
     * The number of max times the job may be attempted.
     *
     * @var int
     */
     private $maxTries = 5;
     
     public function getMaxTries()
     {
         return $this->maxTries;
     }
     
     /**
      * @param JobsInterface $job
      * @param mixed         $data
      */
     public function fire(JobsInterface $job, $data = null)
     {
         // ....
         
         $job->delete();
     }
}

超时

同样,可以使用Artisan命令行上的--timeout开关指定任务可以运行的最大秒数。

php console idb_queue:work --timeout=30

但是,您也可以在任务类本身上定义任务应该允许运行的最大秒数。如果在任务上指定了超时,它将优先于命令行上指定的任何超时。

<?php

namespace AppBundle\Job;

use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
    
class QueueMailer implements QueueableInterface
{
    /**
     * The number of max times the job may be attempted.
     *
     * @var int
     */
    public $timeout = 5;
    
    /**
     * @param JobsInterface $job
     * @param mixed         $data
     */
    public function fire(JobsInterface $job, $data = null)
    {
        // ....
                     
        $job->delete();
    }
}

如果您不喜欢使用公共属性。

namespace AppBundle\Job;
    
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
        
class QueueMailer implements QueueableInterface
{
    /**
     * The number of max times the job may be attempted.
     *
     * @var int
     */
     private $timeout = 5;
     
     public function getTimeout()
     {
         return $this->timeout;
     }
     
     /**
      * @param JobsInterface $job
      * @param mixed         $data
      */
     public function fire(JobsInterface $job, $data = null)
     {
         // ....
         
         $job->delete();
     }
}

错误处理

如果在处理任务时抛出异常,任务将自动重新放入队列以便再次尝试。任务将继续被释放,直到它尝试了应用程序允许的最大次数。最大尝试次数由在idb_queue:work控制台命令中使用的--tries开关定义。或者,最大尝试次数可以在任务类本身上定义。有关运行队列工作者的更多信息,请见下文。

运行队列工作进程

QueueBundle包括一个队列工作者,该工作者会处理被推送到队列上的新任务。您可以使用idb_queue:work控制台命令来运行工作者。请注意,一旦idb_queue:work命令开始运行,它将一直运行,直到手动停止或您关闭终端。

php console idb_queue:work

您还可以使用

php console idb_queue:listen    

{提示} 为了使idb_queue:work进程永久在后台运行,您应该使用进程监控工具,如Supervisor,以确保队列工作者不会停止运行。

请记住,队列工作者是长期运行进程,并在内存中存储已启动的应用程序状态。因此,它们在启动后不会注意到代码库中的更改。所以,在您的部署过程中,请确保重启您的队列工作者

指定连接和队列

您还可以指定工作者应使用的队列连接。传递给work命令的连接名称应与在您的config/queue.php配置文件中定义的连接之一相匹配。

php console idb_queue:work redis

您还可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在redis队列连接上的emails队列中处理,您可以发出以下命令来启动一个只处理该队列的工作者:

php console idb_queue:work redis --queue=emails

资源考虑

守护进程队列工作者在处理每个任务之前不会“重新启动”框架。因此,您应该在每个任务完成后释放任何重资源。例如,如果您使用GD库进行图像处理,您应该在完成时使用imagedestroy释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在您的 config.yml 配置文件中,您可能将 redis 连接的默认 queue 设置为 low。然而,偶尔您可能希望将作业推送到 high 优先级队列,如下所示

$this->get('idb_queue')->push('app_queue_mailer', ['message' => 'Hello World'], 'high');

要启动一个工作进程,该进程会在继续处理 low 队列中的任何作业之前,验证所有 high 队列作业是否已处理,请将队列名称的逗号分隔列表传递给 work 命令

php console idb_queue:work --queue=high,low

队列工作进程与部署

由于队列工作进程是长时间运行的进程,它们不会在没有重新启动的情况下自动获取代码更改。因此,部署使用队列工作进程的应用程序的最简单方法是在部署过程中重启工作进程。您可以通过发出 idb_queue:restart 命令优雅地重启所有工作进程

php console idb_queue:restart

此命令将指示所有队列工作进程在完成当前作业后优雅地“死亡”,以确保不会丢失现有作业。由于队列工作进程将在执行 idb_queue:restart 命令时死亡,因此您应该运行进程管理器,例如 Supervisor,以自动重启队列工作进程。

作业过期与超时

作业过期

在您的 config.yml 配置文件中,每个队列连接都定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待多少秒。例如,如果 retry_after 的值为 90,则如果作业处理了 90 秒而没有被删除,则作业将重新进入队列。通常,您应将 retry_after 值设置为作业合理完成处理所需的最大秒数。

{note} 不包含 retry_after 值的唯一队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重新尝试作业。

工作进程超时

idb_queue:work 控制台命令公开了一个 --timeout 选项。该 --timeout 选项指定队列主进程在终止正在处理作业的子队列工作进程之前将等待多长时间。有时子队列进程可能会因为各种原因而“冻结”,例如没有响应的外部 HTTP 调用。该 --timeout 选项会删除超过指定时间限制的冻结进程

php console idb_queue:work --timeout=60

retry_after 配置选项和 --timeout 命令行选项不同,但它们协同工作以确保作业不会丢失,并且作业只成功处理一次。

{note} --timeout 的值应始终比 retry_after 配置值短几秒钟。这将确保处理特定作业的工作进程在作业重试之前被终止。如果您的 --timeout 选项比 retry_after 配置值长,则您的作业可能会被处理两次。

工作进程休眠时间

当队列上有作业可用时,工作进程将在没有延迟的情况下连续处理作业。然而,如果没有任何新作业可用,则 sleep 选项确定工作进程将“休眠”多长时间

php console idb_queue:work --sleep=3

监督者配置

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监控器,如果它失败,将自动重启您的 idb_queue:work 进程。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令

sudo apt-get install supervisor

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,这些文件指导 supervisor 如何监控您的进程。例如,让我们创建一个 queue-worker.conf 文件,该文件启动并监控一个 idb_queue:work 进程

[program:queue-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/project/acme/bin/console idb_queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=johndoe
numprocs=8
redirect_stderr=true
stdout_logfile=/home/project/acme/var/log/worker.log

在这个例子中,numprocs 指令将指示 Supervisor 运行 8 个 idb_queue:work 进程,并监控所有进程。如果进程失败,它们将自动重启。当然,您应该将 command 指令中的 idb_queue:work sqs 部分更改为反映您想要的队列连接。

启动 Supervisor

一旦创建配置文件,您可以使用以下命令更新 Supervisor 配置并启动进程:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start queue-worker:*

有关 Supervisor 的更多信息,请参阅 Supervisor 文档

处理失败的作业

有时您的队列作业可能会失败。不要担心,事情并不总是按计划进行!QueueBundle 包括一个方便的方式来指定一个作业应该尝试的最大次数。一旦作业尝试次数超过此数量,它就可以插入数据库。要使用数据库存储失败作业,您应该使用 idb_queue:fail_database 命令

php console idb_queue:fail_database

此命令将提出一些问题,您必须回答。然后将为您生成必要的文件。过程与使用数据库进行队列类似。

然后,当运行您的 队列工作进程 时,您应该在 idb_queue:work 命令中使用 --tries 开关指定一个作业应该尝试的最大次数。如果您不指定 --tries 选项的值,作业将无限尝试。

php console idb_queue:work redis --tries=3

清理失败的作业

您需要在您的作业类上直接实现 IdeasBucket\QueueBundle\QueueErrorInterface,这样当发生失败时,您就可以执行特定的清理操作。这是通知您的用户或撤销作业执行操作的理想位置。导致作业失败的 Exception 将传递给 failed 方法。

<?php

namespace AppBundle\Job;
    
use IdeasBucket\QueueBundle\QueueableInterface;
use IdeasBucket\QueueBundle\Job\JobsInterface;
use IdeasBucket\QueueBundle\QueueErrorInterface;
        
class QueueMailer implements QueueableInterface, QueueErrorInterface
{
    public function fire(JobsInterface $job, array $data = null)
    {
        // ....
         
        $job->delete();
    }
    
    public function failed(\Exception $e, $payload = null)
    {
       // Do something with the error
    }
}

重试失败的作业

要查看已插入到数据库中的所有失败作业,您可以使用 idb_queue:failed 控制台命令

php console idb_queue:failed

idb_queue:failed 命令将列出作业 ID、连接、队列和失败时间。可以使用作业 ID 重新尝试失败的作业。例如,要重新尝试 ID 为 5 的失败作业,请发出以下命令

php console idb_queue:retry 5

要重新尝试所有失败作业,请执行 idb_queue:retry 命令并传递 all 作为 ID

php console idb_queue:retry all

如果您想删除一个失败的作业,您可以使用 idbb_queue:forget 命令

php console idb_queue:forget 5

要删除所有失败的作业,您可以使用 idb_queue:flush 命令

php console idb_queue:flush

作业事件

QueueBundle 提供了您可以在作业执行期间监听的一些事件。这是一个通过电子邮件或 HipChat 通知您的团队或当作业失败时执行某些操作的好机会。您可以使用以下方式附加监听器。

<?php

namespace AppBundle\EventListener;

use Symfony\Component\EventDispatcher\Event;

/**
 * Class QueueListener
 *
 * @package AppBundle\EventListener
 */
class QueueListener
{
    /**
     * This method will be called before Job firing
     *
     * @param Event $event
     */
    public function during(Event $event)
    {
        
    }
}

然后注册一个服务

appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
    - { name: kernel.event_listener, event: job_processing, method: during }
    - { name: kernel.event_listener, event: job_failed, method: during }
    - { name: kernel.event_listener, event: job_exception_occurred, method: during }
    - { name: kernel.event_listener, event: job_processed, method: during }
    - { name: kernel.event_listener, event: looping, method: during }
    - { name: kernel.event_listener, event: worker_stopping, method: during }

当然,您可以在不同事件期间调用不同的方法。如果您正在使用 Symfony 3.2 或更高版本,我们建议您使用类常量,如下所示。

appbundle.queue_listener:
class: AppBundle\EventListener\QueueListener
tags:
    - { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_EXCEPTION_OCCURRED, method: during }
    - { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_FAILED, method: during }
    - { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSED, method: during }
    - { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::JOB_PROCESSING, method: during }
    - { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::LOOPING, method: during }
    - { name: kernel.event_listener, event: !php/const:IdeasBucket\QueueBundle\Event\EventList::WORKER_STOPPING, method: during }

有关创建事件监听器和注册的更多信息,请访问 Symfony 文档

其他数据库和库支持

当您执行 idb_queue:database 命令时,它假设您正在使用 Doctrine(ORM 或 ODM)。如果出于任何原因您不使用 Doctrine 或只想支持其他数据库,那么在 QueueBundle 中它相当简单。

您需要确保的是,存储库实现接口 IdeasBucket\QueueBundle\Repository\DatabaseQueueRepositoryInterface,并且实体实现 IdeasBucket\QueueBundle\Entity\DatabaseQueueEntityInterface。只要满足这些要求,您就可以使用任何库或数据库。

GIST

即将推出。