innmind/amqp

AMQP 客户端

5.1.0 2024-06-26 09:17 UTC

README

Build Status codecov Type Coverage

这是一个实现了 AMQP 协议版本 0.9 的 AMQP 客户端。

该实现的目的是提供一个易于使用和阅读的 PHP 实现(以方便使用和可读性),并明确区分 AMQP 模型、传输层和用户 API。

注意:如果没有 php-amqplib 的帮助,这个实现是无法完成的,它极大地帮助理解了传输层的细节。

重要:如果您正在使用 RabbitMQ,请注意它并没有完全实现规范,QosRecover 方法尚未实现。如果您在 Value 实现中遇到问题,请注意 ShortStringSignedLongLongIntegerSignedShortInteger 在某些方法上(例如用作消息头)会生成服务器错误。

文档

重要:您必须使用 vimeo/psalm 来确保正确使用此库。

安装

composer require innmind/amqp

用法

use Innmind\AMQP\{
    Factory,
    Command\DeclareExchange,
    Command\DeclareQueue,
    Command\Bind,
    Command\Publish,
    Model\Basic\Message,
    Model\Exchange\Type,
};
use Innmind\Socket\Internet\Transport;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\OperatingSystem\Factory as OSFactory;
use Innmind\Url\Url;
use Innmind\Immutable\Str;

$os = OSFactory::build();
$client = Factory::of($os)
    ->make(
        Transport::tcp(),
        Url::of('amqp://guest:guest@localhost:5672/'),
        new ElapsedPeriod(1000), // timeout
    )
    ->with(DeclareExchange::of('crawler', Type::direct))
    ->with(DeclareQueue::of('parser'))
    ->with(Bind::of('crawler', 'parser'))
    ->with(Publish::one(Message::of(Str::of('https://github.com')))->to('crawler'))
    ->run(null)
    ->match(
        static fn() => null, // success
        static fn($failure) => throw new \RuntimeException($failure::class),
    );

上面的示例将声明一个名为 crawler 的交换机和队列 parser,它将从我们的交换机接收消息。最后,它将带有负载 http://github.com/ 的消息发布到 crawler(服务器将路由它到 parser)。

要消费消息,您有两种方法

use Innminq\AMQP\{
    Command\Get,
    Command\Consume,
    Consumer\Continuation,
    Model\Basic\Message,
};

$state = $client
    ->with(Get::of('parser')->handle(static function($state, Message $message, Continuation $continuation) {
        $state = $message->body()->toString();

        return $continuation->ack($state);
    }))
    ->run(null) // <- this argument will passed as the state to the handler above
    ->match(
        static fn($state) => $state,
        static fn($failure) => throw new \RuntimeException($failure::class),
    );
echo $state; // will print "http://github.com/"
// or
$client
    ->with(Consume::of('crawler')->handle(static function($state, Message $message, Continuation $continuation) {
        doStuff($message);

        return $continuation->reject($state); // to reject the message
        return $continuation->requeue($state); // put the message back in the queue so it can be redelivered
        return $continuation->cancel($state); // instruct to stop receiving messages (current will be acknowledged first)
    }))
    ->run(null)
    ->match(
        static fn() => null, // in this case only reachable when you cancel the consumer
        static fn($failure) => throw new \RuntimeException($failure::class),
    );

get 回调中也可以使用 reject()requeue()

请自由查看 Command 命名空间以探索所有功能。

基准测试

在 MacBookPro18,2 (M1 Max, 32Gb RAM) 上运行 make benchmark,在容器中运行 RabbitMQ(通过 docker for mac)产生以下结果

make benchmark
Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.48978996276855
Consuming 4000:
php benchmark/consumer.php
Pid: 701, Count: 4000, Time: 2.3580

相比之下,php-amqplib 产生以下结果

Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.15483689308167
Consuming 4000:
php benchmark/consumer.php
Pid: 46862, Count: 4000, Time: 0.2366

所以, 函数是有代价的!

注意:这两个基准测试都使用了手动消息确认