zenstruck/messenger-test

用于测试 symfony/messenger 队列的断言和辅助函数。

支持包维护!
kbond

安装次数: 1,731,038

依赖项: 3

建议者: 0

安全性: 0

星标: 223

关注者: 9

分支: 15

公开问题: 10

类型:symfony-bundle

v1.10.0 2024-07-31 14:08 UTC

README

CI Status Code Coverage

提供用于测试 symfony/messenger 队列的断言和辅助函数。

此库提供了一个 TestTransport,默认情况下会拦截发送给它的任何消息。然后您可以检查并断言这些消息。发送的消息将被序列化和反序列化,以进行额外的检查。

该传输还允许处理这些 队列 消息。

安装

  1. 安装库

    composer require --dev zenstruck/messenger-test
  2. 如果未通过 Symfony Flex 自动添加,请在 config/bundles.php 中添加束

    Zenstruck\Messenger\Test\ZenstruckMessengerTestBundle::class => ['test' => true],
  3. 更新 config/packages/messenger.yaml,并将您的 test 环境中的传输(s)覆盖为 test://

    # config/packages/messenger.yaml
    
    # ...
    
    when@test:
        framework:
            messenger:
                transports:
                    async: test://

传输

您可以通过在您的 KernelTestCase/WebTestCase 测试中使用 InteractsWithMessenger 特性与测试传输交互。您可以通过断言队列和消息处理的不同状态(如 "已确认"、"已拒绝" 等)来断言消息处理的不同步骤。

注意:如果您只需要知道是否已发送消息,您可以直接在总线本身上进行断言。

队列断言

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase // or WebTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // ...some code that routes messages to your configured transport

        // assert against the queue
        $this->transport()->queue()->assertEmpty();
        $this->transport()->queue()->assertNotEmpty();
        $this->transport()->queue()->assertCount(3);
        $this->transport()->queue()->assertContains(MyMessage::class); // queue contains this message
        $this->transport()->queue()->assertContains(MyMessage::class, 3); // queue contains this message 3 times
        $this->transport()->queue()->assertContains(MyMessage::class, 0); // queue contains this message 0 times
        $this->transport()->queue()->assertNotContains(MyMessage::class); // queue not contains this message

        // access the queue data
        $this->transport()->queue(); // Envelope[]
        $this->transport()->queue()->messages(); // object[] the messages unwrapped from envelope
        $this->transport()->queue()->messages(MyMessage::class); // MyMessage[] just messages matching class
    }
}

处理队列

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase // or WebTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // ...some code that routes messages to your configured transport

        // let's assume 3 messages are on this queue
        $this->transport()->queue()->assertCount(3);

        $this->transport()->process(1); // process one message
        $this->transport()->processOrFail(1); // equivalent to above but fails if queue empty

        $this->transport()->queue()->assertCount(2); // queue now only has 2 items

        $this->transport()->process(); // process all messages on the queue
        $this->transport()->processOrFail(); // equivalent to above but fails if queue empty

        $this->transport()->queue()->assertEmpty(); // queue is now empty
    }
}

注意:调用 process() 不仅处理队列上的消息,还处理在处理消息时创建的消息(默认为所有消息或最多 $number)。

其他传输断言和辅助函数

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Zenstruck\Messenger\Test\InteractsWithMessenger;
use Zenstruck\Messenger\Test\Transport\TestTransport;

class MyTest extends KernelTestCase // or WebTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // manually send a message to your transport
        $this->transport()->send(new MyMessage());

        // send with stamps
        $this->transport()->send(Envelope::wrap(new MyMessage(), [new SomeStamp()]));

        // send "pre-encoded" message
        $this->transport()->send(['body' => '...']);

        $queue = $this->transport()->queue();
        $dispatched = $this->transport()->dispatched();
        $acknowledged = $this->transport()->acknowledged(); // messages successfully processed
        $rejected = $this->transport()->rejected(); // messages not successfully processed

        // The 4 above variables are all instances of Zenstruck\Messenger\Test\EnvelopeCollection
        // which is a countable iterator with the following api (using $queue for the example).
        // Methods that return Envelope(s) actually return TestEnvelope(s) which is an Envelope
        // decorator (all standard Envelope methods can be used) with some stamp-related assertions.

        // collection assertions
        $queue->assertEmpty();
        $queue->assertNotEmpty();
        $queue->assertCount(3);
        $queue->assertContains(MyMessage::class); // contains this message
        $queue->assertContains(MyMessage::class, 3); // contains this message 3 times
        $queue->assertNotContains(MyMessage::class); // not contains this message

        // helpers
        $queue->count(); // number of envelopes
        $queue->all(); // TestEnvelope[]
        $queue->messages(); // object[] the messages unwrapped from their envelope
        $queue->messages(MyMessage::class); // MyMessage[] just instances of the passed message class

        // get specific envelope
        $queue->first(); // TestEnvelope - first one on the collection
        $queue->first(MyMessage::class); // TestEnvelope - first where message class is MyMessage
        $queue->first(function(Envelope $e): bool {
            return $e->getMessage() instanceof MyMessage && $e->getMessage()->isSomething();
        }); // TestEnvelope - first that matches the filter callback

        // Equivalent to above - use the message class as the filter function typehint to
        // auto-filter to this message type.
        $queue->first(fn(MyMessage $m): bool => $m->isSomething()); // TestEnvelope

        // TestEnvelope stamp assertions
        $queue->first()->assertHasStamp(DelayStamp::class);
        $queue->first()->assertNotHasStamp(DelayStamp::class);

        // reset collected messages on the transport
        $this->transport()->reset();

        // reset collected messages for all transports
        TestTransport::resetAll();

        // fluid assertions on different EnvelopeCollections
        $this->transport()
            ->queue()
                ->assertNotEmpty()
                ->assertContains(MyMessage::class)
            ->back() // returns to the TestTransport
            ->dispatched()
                ->assertEmpty()
            ->back()
            ->acknowledged()
                ->assertEmpty()
            ->back()
            ->rejected()
                ->assertEmpty()
            ->back()
        ;
    }
}

处理异常

默认情况下,当处理失败的消息时,TestTransport 会捕获异常并将其添加到拒绝列表中。您可以通过更改传输 dsn 来更改此行为

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase // or WebTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // ...some code that routes messages to your configured transport

        // disable exception catching
        $this->transport()->throwExceptions();

        // if processing fails, the exception will be thrown
        $this->transport()->process(1);

        // re-enable exception catching
        $this->transport()->catchExceptions();
    }
}

您可以在传输 dsn 中默认启用对传输(s)的异常抛出

# config/packages/messenger.yaml

# ...

when@test:
    framework:
        messenger:
            transports:
                async: test://?catch_exceptions=false

解锁模式

默认情况下,发送到 TestTransport 的消息会被拦截并添加到队列中,等待手动处理。您可以通过更改传输 dsn 来更改此行为,以便按发送顺序处理消息

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase // or WebTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // disable intercept
        $this->transport()->unblock();

        // ...some code that routes messages to your configured transport
        // ...these messages are handled immediately

        // enable intercept
        $this->transport()->intercept();

        // ...some code that routes messages to your configured transport

        // if messages are on the queue when calling unblock(), they are processed
        $this->transport()->unblock();
    }
}

您可以在传输 dsn 中默认禁用对传输(s)的消息拦截

# config/packages/messenger.yaml

# ...

when@test:
    framework:
        messenger:
            transports:
                async: test://?intercept=false

测试序列化

默认情况下,TestTransport 会测试消息是否可以被序列化和反序列化。您可以通过传输 dsn 禁用此行为

# config/packages/messenger.yaml

# ...

when@test:
    framework:
        messenger:
            transports:
                async: test://?test_serialization=false

多个传输

如果您想测试多个传输,请将它们的所有 dsn 改为 test:// 在您的测试环境中

# config/packages/messenger.yaml

# ...

when@test:
    framework:
        messenger:
            transports:
                low: test://
                high: test://

在您的测试中,通过将名称传递给 transport() 方法

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase // or WebTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        $this->transport('high')->queue();
        $this->transport('low')->dispatched();
    }
}

DelayStamp 的支持

可以在传输的 dsn 中为每个传输启用对 DelayStamp 的支持

# config/packages/messenger.yaml

when@test:
    framework:
        messenger:
            transports:
                async: test://?support_delay_stamp=true

注意

延迟戳支持是在版本 1.8.0 中添加的。

使用时钟

警告

延迟戳支持需要一个 PSR-20 Clock 的实现。

例如,您可以使用 Symfony 的时钟组件

composer require symfony/clock

当使用 Symfony 的时钟组件时,服务将自动配置。否则,您需要手动配置它

# config/services.yaml
services:
    app.clock:
        class: Some\Clock\Implementation
    Psr\Clock\ClockInterface: '@app.clock'

支持 DelayStamp 的代码示例

注意

此示例使用 symfony/clock 组件,但您可以使用任何其他实现 Psr\Clock\ClockInterface 的实现。

// Let's say somewhere in your app, you register some actions that should occur in the future:

$bus->dispatch(new Enevelope(new TakeSomeAction1(), [DelayStamp::delayFor(new \DateInterval('P1D'))])); // will be handled in 1 day
$bus->dispatch(new Enevelope(new TakeSomeAction2(), [DelayStamp::delayFor(new \DateInterval('P3D'))])); // will be handled in 3 days

// In your test, you can check that the action is not yet performed:

class TestDelayedActions extends KernelTestCase
{
    use InteractsWithMessenger;
    use ClockSensitiveTrait;

    public function testDelayedActions(): void
    {
        // 1. mock the clock, in order to perform sleeps
        $clock = self::mockTime();

        // 2. trigger the action that will dispatch the two messages

        // ...

        // 3. assert nothing happens yet
        $transport = $this->transport('async');

        $transport->process();
        $transport->queue()->assertCount(2);
        $transport->acknowledged()->assertCount(0);

        // 4. sleep, process queue, and assert some messages have been handled
        $clock->sleep(60 * 60 * 24); // wait one day
        $transport->process()->acknowledged()->assertContains(TakeSomeAction1::class);
        $this->asssertTakeSomeAction1IsHandled();

        // TakeSomeAction2 is still in the queue
        $transport->queue()->assertCount(1);

        $clock->sleep(60 * 60 * 24 * 2); // wait two other days
        $transport->process()->acknowledged()->assertContains(TakeSomeAction2::class);
        $this->asssertTakeSomeAction2IsHandled();
    }
}

DelayStamp 和解锁模式

"延迟" 消息不能由解锁机制处理,必须在 sleep() 调用后调用 $transport->process()

启用重试

默认情况下,TestTransport 不重试失败的消息(您的重试设置将被忽略)。您可以通过传输 dsn 禁用此行为

# config/packages/messenger.yaml

when@test:
    framework:
        messenger:
            transports:
                async: test://?disable_retries=false

注意

当使用重试与support_delay_stamp结合时,您必须模拟重试之间的睡眠时间。

总线

除了传输测试外,您还可以对总线进行断言。您可以通过在KernelTestCase / WebTestCase测试中使用相同的InteractsWithMessenger特性来测试消息处理。这在您只需测试是否已通过特定总线分发了消息,但不需要知道处理方式时尤其有用。

这允许您在使用自定义传输的同时断言消息仍然被正确分发。

单个总线

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // ... some code that uses the bus

        // Let's assume two messages are processed
        $this->bus()->dispatched()->assertCount(2);

        $this->bus()->dispatched()->assertContains(MessageA::class, 1);
        $this->bus()->dispatched()->assertContains(MessageB::class, 1);
    }
}

多个总线

如果您使用多个总线,您可以测试特定总线是否已处理其自身的消息。

# config/packages/messenger.yaml

# ...

framework:
    messenger:
        default_bus: bus_c
        buses:
            bus_a: ~
            bus_b: ~
            bus_c: ~

在您的测试中,将名称传递给bus()方法

use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Zenstruck\Messenger\Test\InteractsWithMessenger;

class MyTest extends KernelTestCase
{
    use InteractsWithMessenger;

    public function test_something(): void
    {
        // ... some code that use bus

        // Let's assume two messages are handled by two different buses
        $this->bus('bus-a')->dispatched()->assertCount(1);
        $this->bus('bus-b')->dispatched()->assertCount(1);
        $this->bus('bus-c')->dispatched()->assertCount(0);

        $this->bus('bus-a')->dispatched()->assertContains(MessageA::class, 1);
        $this->bus('bus-b')->dispatched()->assertContains(MessageB::class, 1);
    }
}

故障排除

分离的Doctrine实体

在您的测试中处理与Doctrine实体交互的消息时,您可能会注意到它们在处理后被从对象管理器中分离出来。这是因为DoctrineClearEntityManagerWorkerSubscriber,它在处理消息后清除对象管理器。目前,禁用此功能唯一的方法是在您的test环境中禁用服务

# config/packages/messenger.yaml

# ...

when@test:
    # ...

    services:
        # DoctrineClearEntityManagerWorkerSubscriber service
        doctrine.orm.messenger.event_subscriber.doctrine_clear_entity_manager:
            class: stdClass # effectively disables this service in your test env