innmind / amqp
AMQP 客户端
5.1.0
2024-06-26 09:17 UTC
Requires
- php: ~8.2
- innmind/filesystem: ~7.0
- innmind/immutable: ~5.7
- innmind/io: ~2.6
- innmind/math: ~6.0
- innmind/media-type: ~2.0
- innmind/operating-system: ~5.0
- innmind/stream: ~4.0
- innmind/time-continuum: ~3.1
- innmind/url: ~4.1
- ramsey/uuid: ~4.0
Requires (Dev)
- innmind/black-box: ~5.5
- innmind/coding-standard: ~2.0
- phpunit/phpunit: ~10.2
- psr/log: ~3.0
- vimeo/psalm: ~5.15
Suggests
- psr/log: To log what's happening through the connection
README
这是一个实现了 AMQP 协议版本 0.9
的 AMQP 客户端。
该实现的目的是提供一个易于使用和阅读的 PHP 实现(以方便使用和可读性),并明确区分 AMQP 模型、传输层和用户 API。
注意:如果没有 php-amqplib
的帮助,这个实现是无法完成的,它极大地帮助理解了传输层的细节。
重要:如果您正在使用 RabbitMQ,请注意它并没有完全实现规范,Qos
和 Recover
方法尚未实现。如果您在 Value
实现中遇到问题,请注意 ShortString
、SignedLongLongInteger
和 SignedShortInteger
在某些方法上(例如用作消息头)会生成服务器错误。
重要:您必须使用 vimeo/psalm
来确保正确使用此库。
安装
composer require innmind/amqp
用法
use Innmind\AMQP\{ Factory, Command\DeclareExchange, Command\DeclareQueue, Command\Bind, Command\Publish, Model\Basic\Message, Model\Exchange\Type, }; use Innmind\Socket\Internet\Transport; use Innmind\TimeContinuum\Earth\ElapsedPeriod; use Innmind\OperatingSystem\Factory as OSFactory; use Innmind\Url\Url; use Innmind\Immutable\Str; $os = OSFactory::build(); $client = Factory::of($os) ->make( Transport::tcp(), Url::of('amqp://guest:guest@localhost:5672/'), new ElapsedPeriod(1000), // timeout ) ->with(DeclareExchange::of('crawler', Type::direct)) ->with(DeclareQueue::of('parser')) ->with(Bind::of('crawler', 'parser')) ->with(Publish::one(Message::of(Str::of('https://github.com')))->to('crawler')) ->run(null) ->match( static fn() => null, // success static fn($failure) => throw new \RuntimeException($failure::class), );
上面的示例将声明一个名为 crawler
的交换机和队列 parser
,它将从我们的交换机接收消息。最后,它将带有负载 http://github.com/
的消息发布到 crawler
(服务器将路由它到 parser
)。
要消费消息,您有两种方法
use Innminq\AMQP\{ Command\Get, Command\Consume, Consumer\Continuation, Model\Basic\Message, }; $state = $client ->with(Get::of('parser')->handle(static function($state, Message $message, Continuation $continuation) { $state = $message->body()->toString(); return $continuation->ack($state); })) ->run(null) // <- this argument will passed as the state to the handler above ->match( static fn($state) => $state, static fn($failure) => throw new \RuntimeException($failure::class), ); echo $state; // will print "http://github.com/" // or $client ->with(Consume::of('crawler')->handle(static function($state, Message $message, Continuation $continuation) { doStuff($message); return $continuation->reject($state); // to reject the message return $continuation->requeue($state); // put the message back in the queue so it can be redelivered return $continuation->cancel($state); // instruct to stop receiving messages (current will be acknowledged first) })) ->run(null) ->match( static fn() => null, // in this case only reachable when you cancel the consumer static fn($failure) => throw new \RuntimeException($failure::class), );
在 get
回调中也可以使用 reject()
和 requeue()
。
请自由查看 Command
命名空间以探索所有功能。
基准测试
在 MacBookPro18,2 (M1 Max, 32Gb RAM) 上运行 make benchmark
,在容器中运行 RabbitMQ(通过 docker for mac)产生以下结果
make benchmark
Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.48978996276855
Consuming 4000:
php benchmark/consumer.php
Pid: 701, Count: 4000, Time: 2.3580
相比之下,php-amqplib
产生以下结果
Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.15483689308167
Consuming 4000:
php benchmark/consumer.php
Pid: 46862, Count: 4000, Time: 0.2366
所以,纯 函数是有代价的!
注意:这两个基准测试都使用了手动消息确认