silverslice / redis-queue
Redis队列
v0.1.0
2022-05-20 00:23 UTC
Requires
- php: >=7.4
README
需求
- Redis >= 6.2.0
- phpredis PHP 扩展
安装
composer require silverslice/redis-queue
功能
- 延迟推送消息
- 每个作业的独立重试策略
- 正确处理因内存不足而终止的任务
用法
创建作业类
namespace Silverslice\RedisQueue\Examples\Jobs; use Silverslice\RedisQueue\AbstractJob; class TestJob extends AbstractJob { public $message; public function execute() { echo $this->message . ' ' . date('H:i:s') . "\n"; } }
将作业推送到队列
use Silverslice\RedisQueue\Connection; use Silverslice\RedisQueue\Queue; use Silverslice\RedisQueue\Examples\Jobs\TestJob; require __DIR__ . '/../vendor/autoload.php'; // create connection to Redis $conn = new Connection(); $queue = new Queue($conn); // create job $job = new TestJob(); $job->message = 'My message'; // push to the queue $queue->push($job); // push to the queue with delay 2 seconds $queue->pushWithDelay($job, 2);
运行工作进程
use Silverslice\RedisQueue\AbstractJob; use Silverslice\RedisQueue\Connection; use Silverslice\RedisQueue\Worker; require_once __DIR__ . '/../vendor/autoload.php'; // each consumer in stream need unique name, so we pass name as argument on start worker $options = getopt('', ['name:']); if (!isset($options['name'])) { echo 'Usage: php worker.php --name worker_name' . PHP_EOL; exit(1); } $conn = new Connection(); $conn->consumer = $options['name']; $worker = new Worker($conn); $worker->setDebug(true); $worker->onFail(function (AbstractJob $job, \Throwable $e) { echo '[!] Job failed: ' . serialize($job) . PHP_EOL; echo '[.] Error: ' . $e->getMessage() . PHP_EOL; }); $worker->run();
运行系统工作进程。系统工作进程检查挂起消息并移动延迟消息。应只有一个系统工作进程在运行
use Silverslice\RedisQueue\Connection; use Silverslice\RedisQueue\SystemWorker; require_once __DIR__ . '/../vendor/autoload.php'; $conn = new Connection(); $worker = new SystemWorker($conn); $worker->maxFailures = 3; $worker->setDebug(true); // if job crashes more than maxFailures times $worker->onFail(function($message, $id) { echo '[!] Message rejected: ' . $message . ' (id='. $id .')' . PHP_EOL; }); $worker->run();
您可以在作业类中设置单独的重试逻辑。默认行为:最多重试5次,重试间隔为1秒,乘数为2(1, 2, 4, 8, 16秒)。
class TestJob extends AbstractJob { public $message; public function execute() { } /** * Is job retryable? * * @param int $retries Number of retry * @return bool */ public function isRetryable($retries): bool { return $retries <= 5; } /** * Returns retry delay in seconds * * @param $retries * @return int */ public function getRetryDelay($retries): int { return 1 * 2 ** ($retries - 1); } }
要覆盖作业延迟,请在pushWithDelay
中传递第三个参数为true
use Silverslice\RedisQueue\Connection; use Silverslice\RedisQueue\Queue; use Silverslice\RedisQueue\Examples\Jobs\TestJob; require_once __DIR__ . '/../vendor/autoload.php'; $conn = new Connection(); $queue = new Queue($conn); // send message with delay 10 seconds, we are going to change delay later $job = new TestJob(); $job->message = 'Message with delay'; $queue->pushWithDelay($job, 10, true); // overwrite delay $queue->pushWithDelay($job, 15, true); $date = date('Y-m-d H:i:s'); echo "[$date] Message sent\n";
对于测试/本地开发,SyncQueue类可能很有用。SyncQueue同步执行作业
$queue = new SyncQueue($connection); $job = new TestJob(); $job->message = 'My first job'; // will be executed synchronously $queue->push($job);
请参阅示例
目录以获取更多示例。