adtechpotok / enqueue-messenger-adapter-bundle
0.1.22
2018-09-24 15:40 UTC
Requires
- php: >=7.1
- adtechpotok/messenger-adapter: 0.1.21
- adtechpotok/symfony-aware: dev-master
- enqueue/amqp-bunny: ^0.8.31
- opentracing/opentracing: 1.0.0-beta5
- predis/predis: ^1.1
- queue-interop/queue-interop: ^0.6.2
- ramsey/uuid: ^3.8
- symfony/console: ~3.4.6|~4.1
- symfony/dependency-injection: ~3.4.6|~4.1
- symfony/serializer-pack: ^1.0
Requires (Dev)
- phpunit/phpunit: ^7.4@dev
README
此 Symfony Messenger 传输允许您使用 Enqueue 将消息发送和接收至所有支持的代理。
使用方法
- 安装传输
composer req enqueue/messenger-adapter
- 按照常规方式配置 Enqueue 扩展包(查看 Enqueue 扩展包文档)。如果您使用的是食谱,只需配置环境变量来配置
default
Enqueue 传输即可
# .env # ... ###> enqueue/enqueue-bundle ### ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f ###< enqueue/enqueue-bundle ###
- 配置 Messenger 的传输(我们将命名为
amqp
)以使用 Enqueue 的default
传输
# config/packages/messenger.yaml framework: messenger: transports: amqp: enqueue://default
- 路由需要通过消息队列的消息
# config/packages/framework.yaml framework: messenger: # ... routing: 'App\Message\MyMessage': amqp
- 消费!
bin/console messenger:consume-messages amqp
高级使用
配置队列和交换机
在传输 DSN 中,您可以添加额外的配置。以下是一个参考 DSN(注意,值只是为了示例)
enqueue://default
?queue[routingKey][name]=queue_name
&topic[name]=topic_name
&topic[type]=topic|fanout|direct
&deliveryDelay=1800
&delayStrategy=Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Transport\RabbitMq375DelayPluginDelayStrategy
&timeToLive=3600
&receiveTimeout=1000
&priority=1
&maximumPriority=255
&durability=1
# config/packages/messenger.yaml framework: messenger: transports: events: enqueue://default?queue[*][name]=events&topic[name]=events&topic[type]=topic foo.events: enqueue://default?queue[foo][name]=foo.events&topic[name]=events&topic[type]=topic bar.events: enqueue://default?queue[bar][name]=bar.events&topic[name]=events&topic[type]=topic routing: App\Message\EventsMessage: events Foo\Message\EventsMessage: foo.events Bar\Message\EventsMessage: bar.events
在特定主题上发送消息
您可以使用带有您的消息的 TransportConfiguration
封装项目在特定主题上发送消息
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; // ... $this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration( ['topic' => 'specific-topic'] )));
防止消息重复
此软件包提供了一个基于 Redis 乐观锁的重复删除机制(有关文档,请参阅 https://redis.ac.cn/topics/transactions "使用检查和设置进行乐观锁定")。它使用两个您必须自行实现的依赖项。
- 在生产总线中配置 UuidItemSetterMiddleware。它将在封装项目中存储 UUID-4 标识符。
# config/packages/messenger.yaml framework: messenger: buses: messenger.produce: middleware: - Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Middleware\UuidItemSetterMiddleware
- 实现 UniqueIdGetterInterface 接口并将其注册为服务。
该接口将在 LockBasedDeduplicationMiddleware 处理消息时使用。方法 getUniqueId 必须在每次调用时返回一个唯一的 ID。LockBasedDeduplicationMiddleware 将尝试通过此 ID 锁定消息。如果成功,则消息将被处理;否则,您将收到异常。
示例
class IdGenerator implements UniqueIdGetterInterface { /** @var string */ protected $id; public function __construct() { $this->generateId(); } /** * @return string */ public function getUniqueId(): string { return $this->id; } public function generateId(): void { $this->id = uniqid('', true); } }
#config/services.yaml services: SystemBundle\Classes\IdGenerator:
- 注册 redis-locker 和 deduplication 中间件
system.middleware.service.locker: class: Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Service\RedisLockService arguments: - '@REDIS_CLIENT' - 'rabbit_mq_' - 172800 - 'worker_id' messenger.middleware.lock_based_deduplication: class: Adtechpotok\Bundle\EnqueueMessengerAdapterBundle\Middleware\LockBasedDeduplicationMiddleware arguments: - '@system.middleware.service.locker' - '@SystemBundle\Classes\IdGenerator'
其中 @REDIS_CLIENT 是您配置的 redis 客户端。
- 配置 LockBasedDeduplicationMiddleware
common: messenger: buses: messenger.consume: middleware: - messenger.middleware.lock_based_deduplication