amtgard / redis-set-queue
由workerman支持的Redis消息SetQueue
v1.0.0
2024-08-10 17:44 UTC
Requires
- php: >=8.1
- ext-redis: *
- jedibc/optional: ^1.0
Requires (Dev)
- phake/phake: ^4.5
- phpunit/phpunit: ^11.2
README
redis-set-queue
基于redis构建的setqueue
使用方法
这个库设计用于与cron或worker库一起使用,例如workerman (composer require workerman/workerman
).
它作为一个通用的pub/sub队列运行,以下是一个技巧:如果任何键已经在队列中,它将不会被重新排队。当重复的键被重新排队时,默认操作是不替换返回现有消息。
带有可选参数$replace = true
的send()
将用新消息替换现有消息。
在任何情况下,同一时间都不会为同一键存在多条消息。
[注意:键幂等性是尽力而为的。存在一些条件会导致消息被移除或重复。]
在RedisPubSubQueueTest.php
测试文件中实现了通用使用,但通常期望从两个系统使用
- 发布者
- 订阅者
订阅者配置
$config = new RedisDataStructureConfig(); $config->setConfig([ 'host' => '127.0.0.1', 'port' => 36379, ]); $redis = new Redis(); $redis->pconnect($config->getConfig()['host'], $config->getConfig()['port']); $publisherName = "TEST"; $readDelay = 100; // microseconds if ($redis->isConnected()) { $hashSetFactory = new RedisHashSetFactory(); $redrivableQueueFactory = new RedisRedrivableQueueFactory(); $queue = new SetQueue($publisherName, $config, $hashSetFactory, $redrivableQueueFactory); $pubSub = new PubSubQueue(); $pubSub->addQueue($queue); $pubSub->redrive($queue->getName()); $callCount = 0; $handle = $pubSub->subscribe($queue->getName(), function($key, $message) use (&$callCount) { if ($message == "MESSAGE1") { $callCount++; } }); do { $pubSub->pump($handle); // Run every 100 milliseconds usleep($readDelay * 1000); } while (true); }
发布者配置
$config = new RedisDataStructureConfig(); $config->setConfig([ 'host' => '127.0.0.1', 'port' => 36379, ]); $redis = new Redis(); $redis->pconnect($config->getConfig()['host'], $config->getConfig()['port']); $publisherName = "TEST"; if ($redis->isConnected()) { $hashSetFactory = new RedisHashSetFactory(); $redrivableQueueFactory = new RedisRedrivableQueueFactory(); $queue = new SetQueue($publisherName, $config, $hashSetFactory, $redrivableQueueFactory); $pubSub = new PubSubQueue(); $handle = $pubSub->addQueue($queue); $pubSub->send($handle, "KEY1", "MESSAGE1"); }