phelixjuma / php-enqueue
这是PHP中一个简单但健壮的基于Redis的作业队列实现。
v4.1.6
2024-08-20 16:19 UTC
Requires
- php: ^7.4 || ^8.0
- ext-json: *
- ext-pcntl: *
- doctrine/annotations: ^2.0
- monolog/monolog: ^3.4
- pda/pheanstalk: ^5.0
- predis/predis: ^1.1
- psr/log: ^2.0
- symfony/console: ^6.0
- vlucas/phpdotenv: ^5.3
- webmozart/assert: ^1.11
Requires (Dev)
- phpunit/phpunit: ^9.0
- dev-master
- v4.1.6
- v4.1.5
- v4.1.4
- v4.1.3
- v4.1.2
- v4.1.1
- v4.1.0
- v4.0.20
- v4.0.19
- v4.0.18
- v4.0.17
- v4.0.16
- v4.0.15
- v4.0.14
- v4.0.13
- v4.0.12
- v4.0.11
- v4.0.10
- v4.0.9
- v4.0.8
- v4.0.7
- v4.0.6
- v4.0.5
- v4.0.4
- v4.0.3
- v4.0.2
- v4.0.1
- v4.0.0
- v3.0.19
- v3.0.18
- v3.0.17
- v3.0.16
- 3.0.15
- 3.0.14
- 3.0.13
- 3.0.12
- 3.0.11
- v3.0.10
- v3.0.9
- v3.0.8
- v3.0.7
- v3.0.6
- v3.0.5
- v3.0.4
- v3.0.3
- v3.0.0
- v2.0.9
- v2.0.8
- v2.0.7
- v2.0.6
- v2.0.5
- v2.0.4
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0.0
- v1.1.0
- v1.0.28
- v1.0.27
- v1.0.26
- v1.0.25
- v1.0.24
- v1.0.23
- v1.0.22
- v1.0.21
- v1.0.20
- v1.0.19
- v1.0.18
- v1.0.17
- v1.0.16
- v1.0.15
- v1.0.14
- v1.0.13
- v1.0.12
- v1.0.11
- v1.0.10
- v1.0.9
- v1.0.8
- v1.0.7
- v1.0.6
- v1.0.5
- v1.0.4
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
This package is auto-updated.
Last update: 2024-09-20 16:31:53 UTC
README
这是PHP中一个简单但健壮的基于Redis的作业队列实现。
为什么还需要另一个作业队列包?因为,我尝试了所有我能找到的顶级选项,但没有一个完全符合:一些顶级建议的选项已经超过5年没有维护,并且它们的依赖项导致我的其他包产生了大量冲突,所以我构建了一个新的包,为了我自己。
此包的后端是redis。
要求
- PHP >= 7.1
- vlucas/phpdotenv
- predis/predis
- symfony/console
- amphp/parallel
安装
composer require phelixjuma/php-enqueue
使用说明
1. 运行Worker
php-enqueue是事件驱动的。作业被调度并在redis中安排。必须设置一个Worker以“永远运行”。这个Worker监听任何传入的作业并执行它。作业执行是通过amphp/parallel包并发进行的,这允许并发执行多个作业
要设置Worker,请运行以下命令
./bin/worker --queue=name_of_queue --threaded=1 --concurrency=1 --max_retries=3 --log_path=/path/to/log log_level=100
注意,Worker接受参数,例如
- 队列:Worker监听的队列名称。每个Worker只能监听一个队列。这允许你拥有多个Worker处理不同的队列,这为你的作业执行引入了并行化
- 线程化:值为1表示作业将以多线程方式运行(非阻塞)。0表示以阻塞方式运行作业。如果你的作业不依赖于任何全局变量,请使用多线程,否则将其设置为0(阻塞执行)
- 并发性:定义单个Worker在给定时间可以处理的并发作业数量
- 最大重试次数:如果作业失败,通常是通过抛出异常,它将被重试,直到达到这里定义的最大次数。默认情况下,不进行重试
- 日志路径:日志应放入的目录路径。指定目录路径而不是日志文件,并确保php有权限写入该目录
- 日志级别:按照monolog日志级别
注意
- 你可以使用像supervisord这样的服务来运行Worker并监视它们,这样,如果Worker本身失败,它可以自动重新启动。
- 当你更新任何依赖Worker的代码库的部分时,请注意,除非重新启动Worker,否则更新不会反映出来
2. 管理作业
您有命令行选项来管理任务。
2.1 查看作业列表
./bin/manager enqueue:list --queue=queue_name
2.2 添加新作业
./bin/manager enqueue:add --queue=queue_name --class=job_class_name --parameters=job_args
2.3 从队列中删除作业
./bin/manager enqueue:remove --queue=queue_name --taskId=task_id
2.4 列出失败的作业
./bin/manager enqueue:failed:list --queue=queue_name
2.5 重新排队失败的作业
./bin/manager enqueue:failed:requeue --queue=queue_name
2.6 删除所有失败的作业
./bin/manager enqueue:failed:purge --queue=queue_name
3. 作业
每个作业类都实现了JobInterface。需要定义的类是
- setUp() - 在作业执行之前所需的任何设置,
- perform() - 作业运行的实际逻辑,
- tearDown() - 要执行的任何后处理任务。
3.1 示例作业类
class EmailJob implements JobInterface { /** * @var Logger */ private $logger; public function setUp(Task $task) { $this->logger = new Logger("email_job"); $this->logger->pushHandler(new StreamHandler('/path/to/log/email_job.log', Logger::DEBUG)); } public function perform(Task $task) { // Actual logic to send an email goes here $this->logger->info("Performing email job. Args: ".json_encode($task->getArgs())); } public function tearDown(Task $task) { } }
3.2 调度作业的脚本
use Phelixjuma\Enqueue\Jobs\EmailJob; use Phelixjuma\Enqueue\RedisQueue; use Phelixjuma\Enqueue\Task; use Predis\Client; // Some global section where redis and queue are defined $redis = new Client('tcp://127.0.0.1:6379'); $queue = new RedisQueue($redis); // Actual section to queue the email job task $queue ->setName('test_queue') ->enqueue(new Task(new EmailJob(), ['some_arg' => 'some_value']));
注意:更好的方法是将php-enqueue包装在一个enqueue服务中。在这个服务中,你定义redis和队列,使它们可重用。以下是一个此类服务的示例
<?php namespace \Some\Name\Space\Service; use Phelixjuma\Enqueue\Jobs\EmailJob; use Phelixjuma\Enqueue\RedisQueue; use Phelixjuma\Enqueue\Task; use Predis\Client; final class EnqueueService { const DEFAULT_QUEUE = 'default'; private static $instance = null; private RedisQueue|null $queue = null; /** * @return EnqueueService|null */ private static function getInstance(): ?EnqueueService { if (self::$instance === null) { self::$instance = new self(); } return self::$instance; } /** * @param $job * @param $args * @param string $queueName * @return void */ public static function enqueue($job, $args, string $queueName=self::DEFAULT_QUEUE): void { self::getInstance() ->init() ->queue ->setName($queueName) ->enqueue(new Task($job, $args));; } /** * @return $this */ private function init(): EnqueueService { if ($this->queue !== null) { return $this; } $redisHost = getenv("REDIS_HOST"); $redisPort = getenv("REDIS_PORT"); $redis = new Client("tcp://$redisHost:$redisPort"); $this->queue = new RedisQueue($redis); return $this; } } // Use the service within your application to add a job to a queue as EnqueueService::enqueue(new EmailJob(), ['email' => 'test@gmail.com', 'subject' => 'php-enqueue works!', 'message' => 'Thank you for this amazing package']);
4. 关于作业的说明
- 此包使用Redis来排队作业。
- 这意味着在排队之前将作业及其参数序列化,并在获取时反序列化
- 由于PHP处理序列化/反序列化的方式,你的作业和参数不应包含非序列化的实例
- 不可序列化的一个良好例子是pdo。所以如果你的工作类注入了一个pdo实例到数据库类,那么工作队列将失败
- 良好的实践是不在Job类中注入任何类,并避免使用类实例作为参数
- 相反,使用setUp()方法与你的依赖注入容器一起实例化你需要的其他类。
致谢
- Phelix Juma (jp@docusift.ai)