machobearstudio / simpleue
PHP包,以简单的方式管理队列任务
Requires
- php: >=5.6
- ext-pcntl: *
- psr/log: ^1.0 || >=2.0
Requires (Dev)
- aws/aws-sdk-php: ^3.9
- pda/pheanstalk: ^3.1
- phpunit/phpunit: 4.0.*
- predis/predis: ^1.0
Suggests
- ext-redis: Allow work with Redis Locker
- aws/aws-sdk-php: Allow work with AWS Simple Queue Service (SQS) queues
- pda/pheanstalk: Allow work with Beanstalkd queues
- predis/predis: Allow work with Redis queues
This package is not auto-updated.
Last update: 2024-09-24 03:03:01 UTC
README
Simpleue提供了一种非常简单的方式来运行工作者以消费队列(消费者)在PHP中。该库已被开发以轻松扩展以与不同的队列服务器一起工作,并对外开放以管理任何类型的作业。
当前实现
- Redis队列适配器。
- AWS SQS队列适配器。
- Beanstalkd队列适配器。
您可以在simpleue-example中找到一个使用示例
工作者
该库有一个工作者类,它运行一个无限循环(可以通过一些条件停止)并管理处理作业的所有阶段
- 获取下一个作业。
- 执行作业。
- 作业成功后执行...
- 作业失败后执行...
- 执行错误后执行...
- 没有作业时执行...
循环可以通过以下方法在控制下停止
- 停止作业:作业处理器允许定义一个停止作业。
- 最大迭代次数:可以在声明对象时指定。
每个工作者有一个队列源,并管理一种类型的作业。可以使用相同的队列源同时运行多个工作者。
优雅退出
工作者还可以处理一些posix信号,例如SIGINT
和SIGTERM
,以便在有人尝试手动停止它时(通常在shell中使用C-c
键),它可以优雅地退出(等待当前队列作业完成)。
此行为默认禁用。要启用它,您需要在工作者类的构造函数中传递一个额外的参数。下面是使用示例。
注意:此功能在HHVM上未测试,因此如果您在HHVM上运行,可能无法按预期工作
队列
该库提供了一种接口,允许实现不同队列服务器的队列连接。目前该库提供了以下实现
- Redis队列适配器。
- AWS SQS队列适配器。
- Beanstalkd队列适配器。
队列接口管理与队列系统相关的所有内容,并抽象了与该作业相关的内容。
它需要队列系统客户端
- Redis:Predis\Client
- AWS SQS:Aws\Sqs\SqsClient
- Beanstalkd:Pheanstalk\Pheanstalk;
以及源队列名称。消费者需要额外的队列来管理进程
- 处理队列(仅限Redis):它将存储从源队列弹出的项目,在处理过程中。
- 失败队列:所有失败的作业(根据作业定义)都将添加到该队列。
- 错误队列:所有在管理过程中抛出异常的作业都将添加到该队列。
重要
对于AWS SQS队列,所有队列必须在开始工作之前存在。
作业
作业接口用于管理队列中接收到的作业。它必须管理域业务逻辑并定义停止作业。
作业从队列系统中抽象出来,因此相同的作业定义可以与不同的队列接口一起工作。作业始终从队列接收消息体。
如果您有不同的作业类型(发送邮件、裁剪图像等)并使用一个队列,您可以定义isMyJob
。如果作业不是预期的类型,可以将作业发送回队列。
安装
在您的composer json文件中需要此包
{ "require": { "javibravo/simpleue" : "dev-master", }, }
使用
第一步是定义和实现要管理的作业
。
<?php namespace MyProject\MyJob; use Simpleue\Job\Job; class MyJob implements Job { public function manage($job) { ... try { ... } catch ( ... ) { return FALSE; } ... return TRUE; } ... public function isStopJob($job) { if ( ... ) return TRUE; return FALSE; } public function isMyJob($job) { if ( ... ) return TRUE; return FALSE; } ... }
一旦定义了作业,我们就可以定义我们的消费者并开始运行
Redis消费者
<?php use Predis\Client; use Simpleue\Queue\RedisQueue; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $redisQueue = new RedisQueue( new Client(array('host' => 'localhost', 'port' => 6379, 'schema' => 'tcp')), 'my_queue_name' ); $myNewConsumer = new QueueWorker($redisQueue, new MyJob()); $myNewConsumer->start();
AWS SQS消费者
<?php use Aws\Sqs\SqsClient; use Simpleue\Queue\SqsQueue; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $sqsClient = new SqsClient([ 'profile' => 'aws-profile', 'region' => 'eu-west-1', 'version' => 'latest' ]); $sqsQueue = new SqsQueue($sqsClient, 'my_queue_name'); $myNewConsumer = new QueueWorker($sqsQueue, new MyJob()); $myNewConsumer->start();
Beanstalkd 消费者
<?php use Simpleue\Queue\BeanStalkdQueue; use Simpleue\Worker\QueueWorker; use Pheanstalk\Pheanstalk; use MyProject\MyJob; $beanStalkdClient = new Pheanstalk('localhost'); $beanStalkdQueue = new BeanStalkdQueue($beanStalkdClient, 'my_queue_name'); $myNewConsumer = new QueueWorker($beanStalkdQueue, new MyJob()); $myNewConsumer->start();
使用 maxIterations
您可以通过两种方式设置最大迭代次数,下面都进行了展示
使用 Setter 方法
$myConsumer = new QueueWorker($myQueue, new MyJob()); $myConsumer->setMaxIterations(10); //any number $myConsumer->start();
使用构造函数参数
$myConsumer = new QueueWorker($myQueue, new MyJob(), 10); $myConsumer->start();
启用优雅退出
要启用优雅退出,请在构造函数中传递一个额外的参数。
$myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true); $myConsumer->start();
AWS SQS 作业锁定以防止重复
当使用 AWS SQS 标准队列时,有时即使提供了 MessageVisibilityTimeout,工作者仍然可能会接收到重复的消息。为了防止这种重复,您可以将 Redis Locker 传递给 SqsQueue 对象。如果您提供了 locker 对象并且锁定失败,则作业将被发送到错误队列。锁提供者不会删除/解锁作业。如果需要,您应该手动解锁。您可以使用 getJobUniqId 方法获取作业键。
<?php use Aws\Sqs\SqsClient; use Simpleue\Queue\SqsQueue; use Simpleue\Locker\RedisLocker; use Simpleue\Worker\QueueWorker; use MyProject\MyJob; $redis = new \Redis(); $redis->addServer('localhost'); $redisLocker = new RedisLocker($redis); $sqsClient = new SqsClient([ 'profile' => 'aws-profile', 'region' => 'eu-west-1', 'version' => 'latest' ]); $sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30); $sqsQueue->setLocker($redisLocker); $myNewConsumer = new QueueWorker($sqsQueue, new MyJob()); $myNewConsumer->start();
(*) 该想法是支持任何队列系统,因此对此是开放的。欢迎贡献。