soyuka/symfony-messenger-redis

symfony/messenger 组件的 Redis 适配器

1.0.10 2018-05-23 08:50 UTC

This package is auto-updated.

Last update: 2024-09-10 20:49:12 UTC


README

Build Status

大家好!这个包已经被弃用,因为 symfony 现在已经有一个 Redis 转发器。它从 symfony 4.3.0-BETA1 版本开始可用。

这是 symfony/messenger 组件的一个实验性的 Redis 接收器/发送器。

快速开始

目前,我们提供了一个预配置了接收器和发送器的 Messenger 组件的包。

composer require symfony/messenger soyuka/symfony-messenger-redis

添加该包 new Soyuka\RedisMessengerAdapter\Bundle\RedisMessengerAdapterBundle()

还需要 redis 扩展。

添加以下配置

redis_messenger_adapter:
    messages:
        'App\Message\Foo': 'foo_queue'

添加消息处理器

<?php

namespace App\MessageHandler;

use App\Message\Foo;

final class FooHandler
{
    public function __invoke(Foo $message)
    {
    }
}

标记它

services:
  App\MessageHandler\FooHandler:
      tags:
          - { name: messenger.message_handler }

完成了!

运行 bin/console messenger:consume-messages redis_messenger.receiver.foo_queue 并从总线中分发消息

<?php
$bus->dispatch(new Foo());

配置参考

redis_messenger_adapter:
    redis:
        url: '127.0.0.1'
        port: 6379
        serializer: !php/const \Redis::SERIALIZER_IGBINARY # default is \Redis::SERIALIZER_PHP
    messages:
        'App\Message\Foo': 'foo_queue'
        'App\Message\Bar':
            queue: 'bar_queue'
            ttl: 10000
            blockingTimeout: 1000

内部细节

相关讨论: https://twitter.com/jderusse/status/980768426116485122

发送器使用列表并使用 RPUSH(向列表尾部添加值)。接收器使用 BRPOPLPUSH,它读取列表的最后一个元素并将其添加到另一个列表(queue_processing)的头部。如果列表中没有元素,它将阻塞连接,直到出现新元素或超时。当超时时,它类似于某种“ping”(待定等待 26632 合并和 $handle(null))。在每次迭代中,我们将检查 queue_processing 列表。对于此队列中的每个项目,我们都有一个在 redis 中具有给定 ttl 的对应 key。如果键已过期,则项目将从 queue_processingLREM(移除)并放回原始队列以再次处理。此解决方案有助于避免消息丢失。

我开始了一个 RedisAdapter,可能会在 messenger 文档和 AMQP 适配器合并后添加到 symfony。