javibravo / simpleue
PHP 包,以简单的方式管理队列任务
Requires
- php: >=5.5
- psr/log: ~1.0
Requires (Dev)
- aws/aws-sdk-php: ^3.9
- pda/pheanstalk: ^3.1
- phpunit/phpunit: 4.0.*
- predis/predis: ^1.0
Suggests
- ext-redis: Allow work with Redis Locker
- aws/aws-sdk-php: Allow work with AWS Simple Queue Service (SQS) queues
- pda/pheanstalk: Allow work with Beanstalkd queues
- predis/predis: Allow work with Redis queues
This package is not auto-updated.
Last update: 2024-09-14 18:20:35 UTC
README
Simpleue 提供了一种非常简单的方式来运行工作进程(消费者)以消费队列(队列)。该库已被开发,以便易于扩展以与不同的队列服务器一起工作,并开放以管理任何类型的作业。
当前实现
- Redis 队列适配器。
- AWS SQS 队列适配器。
- Beanstalkd 队列适配器。
您可以在 simpleue-example 中找到使用示例。
工作进程
该库有一个工作进程类,它运行一个无限循环(可以通过某些条件停止)并管理处理作业的所有阶段
- 获取下一个作业。
- 执行作业。
- 作业成功后执行...
- 作业失败后执行...
- 执行错误后执行...
- 没有作业时执行...
循环可以通过以下方法在控制下停止
- 停止作业:作业处理器允许定义一个停止作业。
- 最大迭代次数:可以在声明对象时指定。
每个工作进程有一个队列源并管理一种类型的作业。可以使用相同的队列源同时运行多个工作进程。
优雅退出
工作进程还能够处理一些 POSIX 信号,例如 SIGINT
和 SIGTERM
,以便在有人尝试手动停止它时(通常在 shell 中使用 C-c
键组合)优雅地退出(等待当前队列作业完成)。
默认情况下,此行为是禁用的。要启用它,您需要在工作进程类的构造函数中传递一个额外的参数。下面是示例。
注意:此功能尚未在 HHVM 上进行测试,因此如果您在 HHVM 上运行它,可能不会按预期工作。
队列
该库提供了一个接口,允许实现不同队列服务器的队列连接。目前,该库提供了以下实现
- Redis 队列适配器。
- AWS SQS 队列适配器。
- Beanstalkd 队列适配器。
队列接口管理与队列系统相关的所有操作,并抽象了关于该作业的信息。
它需要队列系统客户端
- Redis:Predis\Client
- AWS SQS:Aws\Sqs\SqsClient
- Beanstalkd:Pheanstalk\Pheanstalk;
以及源 队列名称。消费者需要额外的队列来管理进程
- 处理队列(仅限 Redis):它将存储从源队列弹出的项目,在它被处理时。
- 失败队列:所有失败的作业(根据作业定义)都将添加到此队列。
- 错误队列:所有在管理过程中抛出异常的作业都将添加到此队列。
重要
对于 AWS SQS 队列,所有队列必须在开始工作之前存在。
作业
作业接口用于管理队列中接收到的作业。它必须管理领域业务逻辑并定义 停止作业。
作业从队列系统中抽象出来,因此相同的作业定义可以与不同的队列接口一起工作。作业始终从队列接收消息体。
如果您有不同类型的作业(发送邮件、裁剪图像等)并使用一个队列,您可以定义 isMyJob。如果作业不是预期类型,您可以将其发送回队列。
安装
在您的composer json文件中要求该包
{ "require": { "javibravo/simpleue" : "dev-master", }, }
用法
第一步是定义和实现要管理的作业。
<?php namespace MyProject\MyJob; use Simpleue\Job\Job; class MyJob implements Job { public function manage($job) { ... try { ... } catch ( ... ) { return FALSE; } ... return TRUE; } ... public function isStopJob($job) { if ( ... ) return TRUE; return FALSE; } public function isMyJob($job) { if ( ... ) return TRUE; return FALSE; } ... }
一旦定义了作业,我们就可以定义我们的消费者并开始运行
Redis消费者
<?php use Predis\Client; use Simpleue\Queue\RedisQueue; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $redisQueue = new RedisQueue( new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')), 'my_queue_name' ); $myNewConsumer = new QueueWorker($redisQueue, new MyJob()); $myNewConsumer->start();
AWS SQS消费者
<?php use Aws\Sqs\SqsClient; use Simpleue\Queue\SqsQueue; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $sqsClient = new SqsClient([ 'profile' => 'aws-profile', 'region' => 'eu-west-1', 'version' => 'latest' ]); $sqsQueue = new SqsQueue($sqsClient, 'my_queue_name'); $myNewConsumer = new QueueWorker($sqsQueue, new MyJob()); $myNewConsumer->start();
Beanstalkd消费者
<?php use Simpleue\Queue\BeanStalkdQueue; use Simpleue\Worker\QueueWorker; use Pheanstalk\Pheanstalk; use MyProject\MyJob; $beanStalkdClient = new Pheanstalk('localhost'); $beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name'); $myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob()); $myNewConsumer->start();
使用最大迭代次数
您可以通过两种方式设置最大迭代次数,下面都展示了
使用setter方法
$myConsumer = new QueueWorker($myQueue, new MyJob()); $myConsumer->setMaxIterations(10); //any number $myConsumer->start();
使用构造函数参数
$myConsumer = new QueueWorker($myQueue, new MyJob(), 10); $myConsumer->start();
启用优雅退出
要启用优雅退出,请在构造函数中传入一个额外的参数。
$myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true); $myConsumer->start();
AWS SQS作业锁定以防止重复
当使用AWS SQS标准队列时,有时即使提供了MessageVisibilityTimeout,工作者仍然会接收到重复的消息。为了防止这种重复,您可以向SqsQueue对象提供Redis或Memcached锁定器。如果您提供了锁定器对象且锁定失败,则作业将被发送到错误队列。锁定器提供者不会删除/解锁作业。如果需要,您应手动解锁。您可以使用getJobUniqId方法获取作业键。
<?php use Aws\Sqs\SqsClient; use Simpleue\Queue\SqsQueue; use Simpleue\Locker\MemcachedLocker; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $memcached = new \Memcached(); $memcached->addServer('localhost', 11211); $memcachedLocker = new MemcachedLocker($memcached); $sqsClient = new SqsClient([ 'profile' => 'aws-profile', 'region' => 'eu-west-1', 'version' => 'latest' ]); $sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30); $sqsQueue->setLocker($memcachedLocker); $myNewConsumer = new QueueWorker($sqsQueue, new MyJob()); $myNewConsumer->start();
(*) 该想法是支持任何队列系统,因此它是开放的。欢迎贡献。