javibravo / simple-php-queue
Requires
- php: >=5.5
- psr/log: ~1.0
Requires (Dev)
- aws/aws-sdk-php: ^3.9
- pda/pheanstalk: ^3.1
- phpunit/phpunit: 4.0.*
- predis/predis: ^1.0
Suggests
- ext-redis: Allow work with Redis Locker
- aws/aws-sdk-php: Allow work with AWS Simple Queue Service (SQS) queues
- pda/pheanstalk: Allow work with Beanstalkd queues
- predis/predis: Allow work with Redis queues
This package is not auto-updated.
Last update: 2022-02-01 12:48:32 UTC
README
Simpleue提供了一种非常简单的方式来在PHP中运行工作进程以消费队列(消费者)。该库已开发用于轻松扩展以与不同的队列服务器一起工作,并开放以管理任何类型的作业。
当前实现
- Redis队列适配器。
- AWS SQS队列适配器。
- Beanstalkd队列适配器。
您可以在simpleue-example中找到使用示例。
工作进程
该库有一个工作进程类,它运行一个无限循环(可以通过一些条件停止)并管理处理作业的所有阶段
- 获取下一个作业。
- 执行作业。
- 作业成功后执行...
- 作业失败后执行...
- 执行错误后执行...
- 没有作业时执行...
循环可以通过以下方法在控制下停止
- 停止作业:作业处理器允许定义一个停止作业。
- 最大迭代次数:可以在对象声明时指定。
每个工作进程有一个队列源并管理一种类型的作业。可以使用相同的队列源同时运行多个工作进程。
优雅退出
工作进程还可以处理一些posix信号,例如 SIGINT
和 SIGTERM
,因此当有人尝试手动停止它时(通常在shell中使用C-c
键),它可以优雅地退出(等待当前队列作业完成)。
默认情况下,此行为是禁用的。要启用它,您需要在工作进程类的构造函数中传递一个额外的参数。下面是使用示例。
注意:此功能在HHVM上未测试,因此如果您在HHVM上运行它,可能不会按预期工作。
队列
该库提供了一个接口,允许为不同的队列服务器实现队列连接。目前,该库提供了以下实现
- Redis队列适配器。
- AWS SQS队列适配器。
- Beanstalkd队列适配器。
队列接口管理与队列系统相关的所有内容,并抽象了关于该作业的信息。
它需要队列系统客户端
- Redis:Predis\Client
- AWS SQS:Aws\Sqs\SqsClient
- Beanstalkd:Pheanstalk\Pheanstalk;
以及源 队列名称。消费者将需要额外的队列来管理流程
- 处理队列(仅限Redis):它将存储从源队列中弹出的项,在它被处理时。
- 失败队列:所有失败(根据作业定义)的作业都将添加到该队列。
- 错误队列:在管理过程中抛出异常的所有作业都将添加到该队列。
重要
对于AWS SQS队列,在开始工作之前,所有队列都必须存在。
作业
作业接口用于管理队列中接收到的作业。它必须管理域业务逻辑并定义停止作业。
作业被抽象化从队列系统中,因此相同的作业定义能够与不同的队列接口一起工作。作业始终从队列接收消息体。
如果您有不同的作业类型(发送邮件、裁剪图像等)并且您使用一个队列,您可以定义isMyJob。如果作业不是预期的类型,您可以发送作业回队列。
安装
在您的composer json文件中需要此包
{ "require": { "javibravo/simpleue" : "dev-master", }, }
使用
第一步是定义和实现要管理的作业。
<?php namespace MyProject\MyJob; use Simpleue\Job\Job; class MyJob implements Job { public function manage($job) { ... try { ... } catch ( ... ) { return FALSE; } ... return TRUE; } ... public function isStopJob($job) { if ( ... ) return TRUE; return FALSE; } public function isMyJob($job) { if ( ... ) return TRUE; return FALSE; } ... }
一旦定义了作业,我们就可以定义我们的消费者并开始运行
Redis 消费者
<?php use Predis\Client; use Simpleue\Queue\RedisQueue; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $redisQueue = new RedisQueue( new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')), 'my_queue_name' ); $myNewConsumer = new QueueWorker($redisQueue, new MyJob()); $myNewConsumer->start();
AWS SQS 消费者
<?php use Aws\Sqs\SqsClient; use Simpleue\Queue\SqsQueue; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $sqsClient = new SqsClient([ 'profile' => 'aws-profile', 'region' => 'eu-west-1', 'version' => 'latest' ]); $sqsQueue = new SqsQueue($sqsClient, 'my_queue_name'); $myNewConsumer = new QueueWorker($sqsQueue, new MyJob()); $myNewConsumer->start();
Beanstalkd 消费者
<?php use Simpleue\Queue\BeanStalkdQueue; use Simpleue\Worker\QueueWorker; use Pheanstalk\Pheanstalk; use MyProject\MyJob; $beanStalkdClient = new Pheanstalk('localhost'); $beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name'); $myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob()); $myNewConsumer->start();
使用最大迭代次数
您可以通过两种方式设置最大迭代次数,下面都展示了
使用设置器方法
$myConsumer = new QueueWorker($myQueue, new MyJob()); $myConsumer->setMaxIterations(10); //any number $myConsumer->start();
使用构造函数参数
$myConsumer = new QueueWorker($myQueue, new MyJob(), 10); $myConsumer->start();
启用优雅退出
要启用优雅退出,请在构造函数中传入一个额外的参数。
$myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true); $myConsumer->start();
AWS SQS 作业锁定以防止重复
当使用 AWS SQS 标准队列时,即使提供了 MessageVisibilityTimeout,有时工作者也可能接收到重复的消息。为了防止这种重复,您可以将 Redis 或 Memcached 锁定器提供给 SqsQueue 对象。如果您提供了锁定器对象且锁定失败,则作业将发送到错误队列。锁定器提供者不会移除/解锁作业。如有需要,您应手动解锁。您可以使用 getJobUniqId 方法获取作业键。
<?php use Aws\Sqs\SqsClient; use Simpleue\Queue\SqsQueue; use Simpleue\Locker\MemcachedLocker; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $memcached = new \Memcached(); $memcached->addServer('localhost', 11211); $memcachedLocker = new MemcachedLocker($memcached); $sqsClient = new SqsClient([ 'profile' => 'aws-profile', 'region' => 'eu-west-1', 'version' => 'latest' ]); $sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30); $sqsQueue->setLocker($memcachedLocker); $myNewConsumer = new QueueWorker($sqsQueue, new MyJob()); $myNewConsumer->start();
(*) 我们的想法是支持任何队列系统,因此它是开放的。欢迎贡献。