amtgard/redis-set-queue

由workerman支持的Redis消息SetQueue

v1.0.0 2024-08-10 17:44 UTC

This package is auto-updated.

Last update: 2024-09-19 20:32:16 UTC


README

Code Climate

redis-set-queue

基于redis构建的setqueue

使用方法

这个库设计用于与cron或worker库一起使用,例如workerman (composer require workerman/workerman).

它作为一个通用的pub/sub队列运行,以下是一个技巧:如果任何键已经在队列中,它将不会被重新排队。当重复的键被重新排队时,默认操作是不替换返回现有消息。

带有可选参数$replace = truesend()将用新消息替换现有消息。

在任何情况下,同一时间都不会为同一键存在多条消息。

[注意:键幂等性是尽力而为的。存在一些条件会导致消息被移除或重复。]

RedisPubSubQueueTest.php测试文件中实现了通用使用,但通常期望从两个系统使用

  1. 发布者
  2. 订阅者

订阅者配置

$config = new RedisDataStructureConfig();
$config->setConfig([
    'host' => '127.0.0.1',
    'port' => 36379,
]);
$redis = new Redis();
$redis->pconnect($config->getConfig()['host'], $config->getConfig()['port']);

$publisherName = "TEST";
$readDelay = 100; // microseconds

if ($redis->isConnected()) {
    $hashSetFactory = new RedisHashSetFactory();
    $redrivableQueueFactory = new RedisRedrivableQueueFactory();
    $queue = new SetQueue($publisherName, $config, $hashSetFactory, $redrivableQueueFactory);
    
    $pubSub = new PubSubQueue();
    $pubSub->addQueue($queue);
    $pubSub->redrive($queue->getName());
    
    $callCount = 0;
    $handle = $pubSub->subscribe($queue->getName(), function($key, $message) use (&$callCount) {
        if ($message == "MESSAGE1") {
            $callCount++;
        }
    });
        
    do {
        $pubSub->pump($handle);
        // Run every 100 milliseconds
        usleep($readDelay * 1000);
    } while (true);
}

发布者配置

$config = new RedisDataStructureConfig();
$config->setConfig([
    'host' => '127.0.0.1',
    'port' => 36379,
]);
$redis = new Redis();
$redis->pconnect($config->getConfig()['host'], $config->getConfig()['port']);

$publisherName = "TEST";

if ($redis->isConnected()) {
    $hashSetFactory = new RedisHashSetFactory();
    $redrivableQueueFactory = new RedisRedrivableQueueFactory();
    $queue = new SetQueue($publisherName, $config, $hashSetFactory, $redrivableQueueFactory);
    
    $pubSub = new PubSubQueue();
    $handle = $pubSub->addQueue($queue);

    $pubSub->send($handle, "KEY1", "MESSAGE1");
}