workerman/redis-queue

基于workerman和Redis支持的消息队列系统,用PHP编写。

v1.2.0 2024-02-28 07:00 UTC

This package is auto-updated.

Last update: 2024-08-28 09:39:16 UTC


README

基于workerman并用Redis支持的PHP编写的消息队列系统。

安装

composer require workerman/redis-queue

用法

test.php

<?php
require __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\RedisQueue\Client;

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');

    $client->subscribe('user-1', function($data) {
        echo "user-1\n";
        var_export($data);
    });

    $client->subscribe('user-2', function($data) {
        echo "user-2\n";
        var_export($data);
    });

    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "consume failure\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });

    Timer::add(1, function() use ($client) {
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

使用命令php test.php startphp test.php start -d运行。

API

__construct (string $address, [array $options])

通过$address和$options创建一个实例。

  • $address例如redis://ip:6379

  • $options是客户端连接选项。默认值

    • auth: 默认 ''
    • db: 默认 0
    • retry_seconds: 消费失败后的重试间隔
    • max_attempts: 消费失败后的最大重试次数

send(String $queue, Mixed $data, [int $dely=0])

向队列发送消息

  • $queue是要发布的队列,String
  • $data是要发布的消息,Mixed
  • $dely是延迟消费的秒数,Int

subscribe(mixed $queue, callable $callback)

订阅一个或多个队列

  • $queue是一个String队列或一个具有队列名称作为键的Array
  • $callback - function (Mixed $data)$datasend($queue, $data)发送的数据。

unsubscribe(mixed $queue)

取消订阅一个或多个队列

onConsumeFailure(callable $callback)

当消费失败时,将触发onConsumeFailure。

  • $callback - function (\Throwable $exception, array $package)$package包含有关数据队列尝试等信息