n0nag0n / simple-job-queue
一个简单的库,用于与其他作业队列提供者进行交互,并提供大量灵活性。目前支持MySQL
1.1.0
2024-06-14 15:11 UTC
Requires
- ext-pdo: *
- pda/pheanstalk: ^4.0
Requires (Dev)
- phpunit/phpunit: ^8.5
README
我想/需要一种简单的作业队列,可以在数据库上使用,也可以在需要时与beanstalkd/redis等适配器一起使用。实际上并没有看到为数据库设计的独立作业队列的好选项。
安装
composer require n0nag0n/simple-job-queue
用法
添加新作业
MySQL
<?php use n0nag0n\Job_Queue // default is mysql based job queue $Job_Queue = new Job_Queue('mysql', [ 'mysql' => [ 'table_name' => 'new_table_name', // default is job_queue_jobs 'use_compression' => false // default is true to use COMPRESS() and UNCOMPRESS() for payload ] ]); $PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
PostgreSQL
<?php use n0nag0n\Job_Queue // default is mysql based job queue $Job_Queue = new Job_Queue('pgsql', [ 'pgsql' => [ 'table_name' => 'new_table_name', // default is job_queue_jobs ] ]); $PDO = new PDO('pgsql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
SQLite3
<?php use n0nag0n\Job_Queue // default is mysql based job queue $Job_Queue = new Job_Queue('sqlite', [ 'sqlite' => [ 'table_name' => 'new_table_name' // default is job_queue_jobs ] ]); $PDO = new PDO('sqlite:example.db'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
Beanstalkd
<?php use n0nag0n\Job_Queue // default is mysql based job queue $Job_Queue = new Job_Queue('beanstalkd'); $pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1'); $Job_Queue->addQueueConnection($pheanstalk); $Job_Queue->selectPipeline('send_important_emails'); $Job_Queue->addJob(json_encode([ 'something' => 'that', 'ends' => 'up', 'a' => 'string' ]));
运行工作进程
请参阅 example_worker.php
文件或见下文
<?php $Job_Queue = new n0nag0n\Job_Queue('mysql'); $PDO = new PDO('mysql:dbname=testdb;host=127.0.0.1', 'user', 'pass'); $Job_Queue->addQueueConnection($PDO); $Job_Queue->watchPipeline('send_important_emails'); while(true) { $job = $Job_Queue->getNextJobAndReserve(); // adjust to whatever makes you sleep better at night (for database queues only, beanstalkd does not need this if statement) if(empty($job)) { usleep(500000); continue; } echo "Processing {$job['id']}\n"; $payload = json_decode($job['payload'], true); try { $result = doSomethingThatDoesSomething($payload); if($result === true) { $Job_Queue->deleteJob($job); } else { // this takes it out of the ready queue and puts it in another queue that can be picked up and "kicked" later. $Job_Queue->buryJob($job); } } catch(Exception $e) { $Job_Queue->buryJob($job); } }
处理长时间进程
Supervisord 将成为你的最爱。查找许多关于如何实现此功能的文章。
测试
目前使用 sqlite3 示例的 PHPUnit 测试。
vendor/bin/phpunit