soyuka / symfony-messenger-redis
symfony/messenger 组件的 Redis 适配器
Requires
- php: >=7.1.0
- ext-redis: *
Requires (Dev)
- phpunit/phpunit: ^7.1@dev
- symfony/config: ^4.1@dev
- symfony/dependency-injection: ^4.1@dev
- symfony/messenger: ^4.0@dev
- symfony/property-access: ^4.1@dev
- symfony/serializer: ^4.1@dev
Suggests
- ext-igbinary: *
README
大家好!这个包已经被弃用,因为 symfony 现在已经有一个 Redis 转发器。它从 symfony 4.3.0-BETA1
版本开始可用。
这是 symfony/messenger 组件的一个实验性的 Redis 接收器/发送器。
快速开始
目前,我们提供了一个预配置了接收器和发送器的 Messenger 组件的包。
composer require symfony/messenger soyuka/symfony-messenger-redis
添加该包 new Soyuka\RedisMessengerAdapter\Bundle\RedisMessengerAdapterBundle()
。
还需要 redis 扩展。
添加以下配置
redis_messenger_adapter: messages: 'App\Message\Foo': 'foo_queue'
添加消息处理器
<?php namespace App\MessageHandler; use App\Message\Foo; final class FooHandler { public function __invoke(Foo $message) { } }
标记它
services: App\MessageHandler\FooHandler: tags: - { name: messenger.message_handler }
完成了!
运行 bin/console messenger:consume-messages redis_messenger.receiver.foo_queue
并从总线中分发消息
<?php $bus->dispatch(new Foo());
配置参考
redis_messenger_adapter: redis: url: '127.0.0.1' port: 6379 serializer: !php/const \Redis::SERIALIZER_IGBINARY # default is \Redis::SERIALIZER_PHP messages: 'App\Message\Foo': 'foo_queue' 'App\Message\Bar': queue: 'bar_queue' ttl: 10000 blockingTimeout: 1000
内部细节
相关讨论: https://twitter.com/jderusse/status/980768426116485122
发送器使用列表并使用 RPUSH
(向列表尾部添加值)。接收器使用 BRPOPLPUSH
,它读取列表的最后一个元素并将其添加到另一个列表(queue_processing
)的头部。如果列表中没有元素,它将阻塞连接,直到出现新元素或超时。当超时时,它类似于某种“ping”(待定等待 26632 合并和 $handle(null)
)。在每次迭代中,我们将检查 queue_processing
列表。对于此队列中的每个项目,我们都有一个在 redis 中具有给定 ttl
的对应 key
。如果键已过期,则项目将从 queue_processing
中 LREM
(移除)并放回原始队列以再次处理。此解决方案有助于避免消息丢失。
我开始了一个 RedisAdapter,可能会在 messenger 文档和 AMQP 适配器合并后添加到 symfony。
- symfony/symfony#26632(AMQP 适配器 PR)
- symfony/symfony-docs#9437(messenger 文档)
- https://github.com/symfony/symfony/tree/master/src/Symfony/Component/Messenger(messenger 组件代码)