adtechpotok/enqueue-messenger-adapter-bundle

0.1.22 2018-09-24 15:40 UTC

README

此 Symfony Messenger 传输允许您使用 Enqueue 将消息发送和接收至所有支持的代理。

使用方法

  1. 安装传输
composer req enqueue/messenger-adapter
  1. 按照常规方式配置 Enqueue 扩展包(查看 Enqueue 扩展包文档)。如果您使用的是食谱,只需配置环境变量来配置 default Enqueue 传输即可
# .env
# ...

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###
  1. 配置 Messenger 的传输(我们将命名为 amqp)以使用 Enqueue 的 default 传输
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            amqp: enqueue://default
  1. 路由需要通过消息队列的消息
# config/packages/framework.yaml
framework:
    messenger:
        # ...

        routing:
            'App\Message\MyMessage': amqp
  1. 消费!
bin/console messenger:consume-messages amqp

高级使用

配置队列和交换机

在传输 DSN 中,您可以添加额外的配置。以下是一个参考 DSN(注意,值只是为了示例)

enqueue://default
	?queue[routingKey][name]=queue_name
	&topic[name]=topic_name
    &topic[type]=topic|fanout|direct
    &deliveryDelay=1800
    &delayStrategy=Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Transport\RabbitMq375DelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1
    &maximumPriority=255
    &durability=1
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            events: enqueue://default?queue[*][name]=events&topic[name]=events&topic[type]=topic
            foo.events: enqueue://default?queue[foo][name]=foo.events&topic[name]=events&topic[type]=topic
            bar.events: enqueue://default?queue[bar][name]=bar.events&topic[name]=events&topic[type]=topic

        routing:
            App\Message\EventsMessage: events
            Foo\Message\EventsMessage: foo.events
            Bar\Message\EventsMessage: bar.events

在特定主题上发送消息

您可以使用带有您的消息的 TransportConfiguration 封装项目在特定主题上发送消息

use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration(
    ['topic' => 'specific-topic']
)));

防止消息重复

此软件包提供了一个基于 Redis 乐观锁的重复删除机制(有关文档,请参阅 https://redis.ac.cn/topics/transactions "使用检查和设置进行乐观锁定")。它使用两个您必须自行实现的依赖项。

  1. 在生产总线中配置 UuidItemSetterMiddleware。它将在封装项目中存储 UUID-4 标识符。
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.produce:
                middleware:
                    - Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Middleware\UuidItemSetterMiddleware
  1. 实现 UniqueIdGetterInterface 接口并将其注册为服务。

该接口将在 LockBasedDeduplicationMiddleware 处理消息时使用。方法 getUniqueId 必须在每次调用时返回一个唯一的 ID。LockBasedDeduplicationMiddleware 将尝试通过此 ID 锁定消息。如果成功,则消息将被处理;否则,您将收到异常。

示例

class IdGenerator implements UniqueIdGetterInterface
{
    /** @var string */
    protected $id;

    public function __construct()
    {
        $this->generateId();
    }

    /**
     * @return string
     */
    public function getUniqueId(): string
    {
        return $this->id;
    }

    public function generateId(): void
    {
        $this->id = uniqid('', true);
    }
}
#config/services.yaml
services:
    SystemBundle\Classes\IdGenerator:
  1. 注册 redis-locker 和 deduplication 中间件
system.middleware.service.locker:
    class: Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Service\RedisLockService
    arguments:
        - '@REDIS_CLIENT'
        - 'rabbit_mq_'
        - 172800
        - 'worker_id'

messenger.middleware.lock_based_deduplication:
    class: Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Middleware\LockBasedDeduplicationMiddleware
    arguments:
        - '@system.middleware.service.locker'
        - '@SystemBundle\Classes\IdGenerator'

其中 @REDIS_CLIENT 是您配置的 redis 客户端。

  1. 配置 LockBasedDeduplicationMiddleware
common:
    messenger:
        buses:
            messenger.consume:
                middleware:
                    - messenger.middleware.lock_based_deduplication