zenstruck / messenger-test
用于测试 symfony/messenger 队列的断言和辅助函数。
Requires
- php: >=8.1
- symfony/deprecation-contracts: ^2.2|^3.0
- symfony/framework-bundle: ^5.4|^6.0|^7.0
- symfony/messenger: ^5.4|^6.0|^7.0
- zenstruck/assert: ^1.0
Requires (Dev)
- phpstan/phpstan: ^1.4
- phpunit/phpunit: ^9.6.0
- symfony/browser-kit: ^5.4|^6.0|^7.0
- symfony/clock: ^6.3|^7.0
- symfony/phpunit-bridge: ^5.4|^6.0|^7.0
- symfony/yaml: ^5.4|^6.0|^7.0
Suggests
- symfony/clock: A PSR-20 clock implementation in order to support DelayStamp.
Conflicts
- symfony/framework-bundle: 5.4.5|6.0.5
README
提供用于测试 symfony/messenger
队列的断言和辅助函数。
此库提供了一个 TestTransport
,默认情况下会拦截发送给它的任何消息。然后您可以检查并断言这些消息。发送的消息将被序列化和反序列化,以进行额外的检查。
该传输还允许处理这些 队列 消息。
安装
-
安装库
composer require --dev zenstruck/messenger-test
-
如果未通过 Symfony Flex 自动添加,请在
config/bundles.php
中添加束Zenstruck\Messenger\Test\ZenstruckMessengerTestBundle::class => ['test' => true],
-
更新
config/packages/messenger.yaml
,并将您的test
环境中的传输(s)覆盖为test://
# config/packages/messenger.yaml # ... when@test: framework: messenger: transports: async: test://
传输
您可以通过在您的 KernelTestCase
/WebTestCase
测试中使用 InteractsWithMessenger
特性与测试传输交互。您可以通过断言队列和消息处理的不同状态(如 "已确认"、"已拒绝" 等)来断言消息处理的不同步骤。
注意:如果您只需要知道是否已发送消息,您可以直接在总线本身上进行断言。
队列断言
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase // or WebTestCase { use InteractsWithMessenger; public function test_something(): void { // ...some code that routes messages to your configured transport // assert against the queue $this->transport()->queue()->assertEmpty(); $this->transport()->queue()->assertNotEmpty(); $this->transport()->queue()->assertCount(3); $this->transport()->queue()->assertContains(MyMessage::class); // queue contains this message $this->transport()->queue()->assertContains(MyMessage::class, 3); // queue contains this message 3 times $this->transport()->queue()->assertContains(MyMessage::class, 0); // queue contains this message 0 times $this->transport()->queue()->assertNotContains(MyMessage::class); // queue not contains this message // access the queue data $this->transport()->queue(); // Envelope[] $this->transport()->queue()->messages(); // object[] the messages unwrapped from envelope $this->transport()->queue()->messages(MyMessage::class); // MyMessage[] just messages matching class } }
处理队列
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase // or WebTestCase { use InteractsWithMessenger; public function test_something(): void { // ...some code that routes messages to your configured transport // let's assume 3 messages are on this queue $this->transport()->queue()->assertCount(3); $this->transport()->process(1); // process one message $this->transport()->processOrFail(1); // equivalent to above but fails if queue empty $this->transport()->queue()->assertCount(2); // queue now only has 2 items $this->transport()->process(); // process all messages on the queue $this->transport()->processOrFail(); // equivalent to above but fails if queue empty $this->transport()->queue()->assertEmpty(); // queue is now empty } }
注意:调用 process()
不仅处理队列上的消息,还处理在处理消息时创建的消息(默认为所有消息或最多 $number
)。
其他传输断言和辅助函数
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; use Zenstruck\Messenger\Test\InteractsWithMessenger; use Zenstruck\Messenger\Test\Transport\TestTransport; class MyTest extends KernelTestCase // or WebTestCase { use InteractsWithMessenger; public function test_something(): void { // manually send a message to your transport $this->transport()->send(new MyMessage()); // send with stamps $this->transport()->send(Envelope::wrap(new MyMessage(), [new SomeStamp()])); // send "pre-encoded" message $this->transport()->send(['body' => '...']); $queue = $this->transport()->queue(); $dispatched = $this->transport()->dispatched(); $acknowledged = $this->transport()->acknowledged(); // messages successfully processed $rejected = $this->transport()->rejected(); // messages not successfully processed // The 4 above variables are all instances of Zenstruck\Messenger\Test\EnvelopeCollection // which is a countable iterator with the following api (using $queue for the example). // Methods that return Envelope(s) actually return TestEnvelope(s) which is an Envelope // decorator (all standard Envelope methods can be used) with some stamp-related assertions. // collection assertions $queue->assertEmpty(); $queue->assertNotEmpty(); $queue->assertCount(3); $queue->assertContains(MyMessage::class); // contains this message $queue->assertContains(MyMessage::class, 3); // contains this message 3 times $queue->assertNotContains(MyMessage::class); // not contains this message // helpers $queue->count(); // number of envelopes $queue->all(); // TestEnvelope[] $queue->messages(); // object[] the messages unwrapped from their envelope $queue->messages(MyMessage::class); // MyMessage[] just instances of the passed message class // get specific envelope $queue->first(); // TestEnvelope - first one on the collection $queue->first(MyMessage::class); // TestEnvelope - first where message class is MyMessage $queue->first(function(Envelope $e): bool { return $e->getMessage() instanceof MyMessage && $e->getMessage()->isSomething(); }); // TestEnvelope - first that matches the filter callback // Equivalent to above - use the message class as the filter function typehint to // auto-filter to this message type. $queue->first(fn(MyMessage $m): bool => $m->isSomething()); // TestEnvelope // TestEnvelope stamp assertions $queue->first()->assertHasStamp(DelayStamp::class); $queue->first()->assertNotHasStamp(DelayStamp::class); // reset collected messages on the transport $this->transport()->reset(); // reset collected messages for all transports TestTransport::resetAll(); // fluid assertions on different EnvelopeCollections $this->transport() ->queue() ->assertNotEmpty() ->assertContains(MyMessage::class) ->back() // returns to the TestTransport ->dispatched() ->assertEmpty() ->back() ->acknowledged() ->assertEmpty() ->back() ->rejected() ->assertEmpty() ->back() ; } }
处理异常
默认情况下,当处理失败的消息时,TestTransport
会捕获异常并将其添加到拒绝列表中。您可以通过更改传输 dsn 来更改此行为
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase // or WebTestCase { use InteractsWithMessenger; public function test_something(): void { // ...some code that routes messages to your configured transport // disable exception catching $this->transport()->throwExceptions(); // if processing fails, the exception will be thrown $this->transport()->process(1); // re-enable exception catching $this->transport()->catchExceptions(); } }
您可以在传输 dsn 中默认启用对传输(s)的异常抛出
# config/packages/messenger.yaml # ... when@test: framework: messenger: transports: async: test://?catch_exceptions=false
解锁模式
默认情况下,发送到 TestTransport
的消息会被拦截并添加到队列中,等待手动处理。您可以通过更改传输 dsn 来更改此行为,以便按发送顺序处理消息
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase // or WebTestCase { use InteractsWithMessenger; public function test_something(): void { // disable intercept $this->transport()->unblock(); // ...some code that routes messages to your configured transport // ...these messages are handled immediately // enable intercept $this->transport()->intercept(); // ...some code that routes messages to your configured transport // if messages are on the queue when calling unblock(), they are processed $this->transport()->unblock(); } }
您可以在传输 dsn 中默认禁用对传输(s)的消息拦截
# config/packages/messenger.yaml # ... when@test: framework: messenger: transports: async: test://?intercept=false
测试序列化
默认情况下,TestTransport
会测试消息是否可以被序列化和反序列化。您可以通过传输 dsn 禁用此行为
# config/packages/messenger.yaml # ... when@test: framework: messenger: transports: async: test://?test_serialization=false
多个传输
如果您想测试多个传输,请将它们的所有 dsn 改为 test://
在您的测试环境中
# config/packages/messenger.yaml # ... when@test: framework: messenger: transports: low: test:// high: test://
在您的测试中,通过将名称传递给 transport()
方法
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase // or WebTestCase { use InteractsWithMessenger; public function test_something(): void { $this->transport('high')->queue(); $this->transport('low')->dispatched(); } }
对 DelayStamp
的支持
可以在传输的 dsn 中为每个传输启用对 DelayStamp
的支持
# config/packages/messenger.yaml when@test: framework: messenger: transports: async: test://?support_delay_stamp=true
注意
延迟戳支持是在版本 1.8.0 中添加的。
使用时钟
警告
延迟戳支持需要一个 PSR-20 Clock 的实现。
例如,您可以使用 Symfony 的时钟组件
composer require symfony/clock
当使用 Symfony 的时钟组件时,服务将自动配置。否则,您需要手动配置它
# config/services.yaml services: app.clock: class: Some\Clock\Implementation Psr\Clock\ClockInterface: '@app.clock'
支持 DelayStamp
的代码示例
注意
此示例使用 symfony/clock
组件,但您可以使用任何其他实现 Psr\Clock\ClockInterface
的实现。
// Let's say somewhere in your app, you register some actions that should occur in the future: $bus->dispatch(new Enevelope(new TakeSomeAction1(), [DelayStamp::delayFor(new \DateInterval('P1D'))])); // will be handled in 1 day $bus->dispatch(new Enevelope(new TakeSomeAction2(), [DelayStamp::delayFor(new \DateInterval('P3D'))])); // will be handled in 3 days // In your test, you can check that the action is not yet performed: class TestDelayedActions extends KernelTestCase { use InteractsWithMessenger; use ClockSensitiveTrait; public function testDelayedActions(): void { // 1. mock the clock, in order to perform sleeps $clock = self::mockTime(); // 2. trigger the action that will dispatch the two messages // ... // 3. assert nothing happens yet $transport = $this->transport('async'); $transport->process(); $transport->queue()->assertCount(2); $transport->acknowledged()->assertCount(0); // 4. sleep, process queue, and assert some messages have been handled $clock->sleep(60 * 60 * 24); // wait one day $transport->process()->acknowledged()->assertContains(TakeSomeAction1::class); $this->asssertTakeSomeAction1IsHandled(); // TakeSomeAction2 is still in the queue $transport->queue()->assertCount(1); $clock->sleep(60 * 60 * 24 * 2); // wait two other days $transport->process()->acknowledged()->assertContains(TakeSomeAction2::class); $this->asssertTakeSomeAction2IsHandled(); } }
DelayStamp
和解锁模式
"延迟" 消息不能由解锁机制处理,必须在 sleep()
调用后调用 $transport->process()
。
启用重试
默认情况下,TestTransport
不重试失败的消息(您的重试设置将被忽略)。您可以通过传输 dsn 禁用此行为
# config/packages/messenger.yaml when@test: framework: messenger: transports: async: test://?disable_retries=false
注意
当使用重试与support_delay_stamp
结合时,您必须模拟重试之间的睡眠时间。
总线
除了传输测试外,您还可以对总线进行断言。您可以通过在KernelTestCase
/ WebTestCase
测试中使用相同的InteractsWithMessenger
特性来测试消息处理。这在您只需测试是否已通过特定总线分发了消息,但不需要知道处理方式时尤其有用。
这允许您在使用自定义传输的同时断言消息仍然被正确分发。
单个总线
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase { use InteractsWithMessenger; public function test_something(): void { // ... some code that uses the bus // Let's assume two messages are processed $this->bus()->dispatched()->assertCount(2); $this->bus()->dispatched()->assertContains(MessageA::class, 1); $this->bus()->dispatched()->assertContains(MessageB::class, 1); } }
多个总线
如果您使用多个总线,您可以测试特定总线是否已处理其自身的消息。
# config/packages/messenger.yaml # ... framework: messenger: default_bus: bus_c buses: bus_a: ~ bus_b: ~ bus_c: ~
在您的测试中,将名称传递给bus()
方法
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase; use Zenstruck\Messenger\Test\InteractsWithMessenger; class MyTest extends KernelTestCase { use InteractsWithMessenger; public function test_something(): void { // ... some code that use bus // Let's assume two messages are handled by two different buses $this->bus('bus-a')->dispatched()->assertCount(1); $this->bus('bus-b')->dispatched()->assertCount(1); $this->bus('bus-c')->dispatched()->assertCount(0); $this->bus('bus-a')->dispatched()->assertContains(MessageA::class, 1); $this->bus('bus-b')->dispatched()->assertContains(MessageB::class, 1); } }
故障排除
分离的Doctrine实体
在您的测试中处理与Doctrine实体交互的消息时,您可能会注意到它们在处理后被从对象管理器中分离出来。这是因为DoctrineClearEntityManagerWorkerSubscriber
,它在处理消息后清除对象管理器。目前,禁用此功能唯一的方法是在您的test
环境中禁用服务
# config/packages/messenger.yaml # ... when@test: # ... services: # DoctrineClearEntityManagerWorkerSubscriber service doctrine.orm.messenger.event_subscriber.doctrine_clear_entity_manager: class: stdClass # effectively disables this service in your test env