anik / amqp
php-amqplib包装器,简化了对RabbitMQ的消耗。使用RabbitMQ的简便方式。
Requires
- php: ^7.2|^8.0
- php-amqplib/php-amqplib: ^3.2
Requires (Dev)
- phpunit/phpunit: ^8.5|^9.5
README
anik/amqp

anik/amqp
是一个php-amqplib包装器,简化了对RabbitMQ的消耗。使用RabbitMQ的简便方式。
注意
之前,该软件包可以直接与Laravel、Laravel Zero、Lumen一起使用。从v2
版本开始,移除了Laravel的支持。如果您正在寻找与Laravel的集成实现,可以使用anik/laravel-amqp。如果您正在使用此软件包与Laravel一起,并希望升级到Laravel 9,如果您以后想迁移到anik/laravel-amqp
,请考虑使用anik/amqp-to-laravel-amqp。
示例
查看存储库以获取示例。
要求
- PHP
^7.2 | ^8.0
- PHP-AMQPLib
^3.0
安装
要安装此软件包,请运行
composer require anik/amqp
文档
对于V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90
连接
要创建AMQP连接,您可以使用
Anik\Amqp\AmqpConnectionFactory::make
Anik\Amqp\AmqpConnectionFactory::makeFromArray
<?php use Anik\Amqp\AmqpConnectionFactory; use PhpAmqpLib\Connection\AMQPLazySSLConnection; $host = '127.0.0.1'; $port = 5672; $user = 'user'; $password = 'password'; $vhost = '/'; $options = []; // options to be proxied to the amqp connection class $ofClass = AMQPLazySSLConnection::class; $connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass); $hosts = [ [ 'host' => $host, 'port' => $port, 'user' => $user, 'password' => $password, 'vhost' => $vhost, ], [ 'host' => $host, 'port' => $port, 'user' => $user, 'password' => $password, 'vhost' => $vhost, ] ]; // With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host $connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);
交换
此外,还有四个特定的交换类。
Anik\Amqp\Exchanges\Direct
用于direct交换。Anik\Amqp\Exchanges\Fanout
用于fanout交换。Anik\Amqp\Exchanges\Headers
用于headers交换。Anik\Amqp\Exchanges\Topic
用于topic交换。
您仍然可以使用Anik\Amqp\Exchanges\Exchange
基类来创建自己的交换。
要实例化交换,您可以这样做
<?php use Anik\Amqp\Exchanges\Exchange; use Anik\Amqp\Exchanges\Fanout; use Anik\Amqp\Exchanges\Topic; $exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT); $exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]); $exchange = new Topic('anik.amqp.topic.exchange'); $exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);
在创建交换实例时
Exchange::make
- 必须在给定的数组中存在name
和type
键。Topic::make
Fanout::make
Headers::make
Direct::make
- 必须在给定的数组中存在name
键。
Anik\Amqp\Exchanges\Exchange
包含一些预定义的交换类型,您可以将其用作参考。
TYPE_DIRECT
用于direct类型。TYPE_TOPIC
用于topic类型。TYPE_FANOUT
用于fanout类型。TYPE_HEADERS
用于headers类型。
当创建交换实例时,Exchange::make
方法还接受以下键。
declare
类型:bool
。默认:false
。如果您想声明交换。passive
类型:bool
。默认:false
。如果交换是被动。durable
类型:bool
。默认:true
。如果交换是持久的。auto_delete
类型:bool
。默认:false
。如果交换应该自动删除。internal
类型:bool
。默认:false
。如果交换是内部的。no_wait
类型:bool
。默认:false
。如果客户端不应等待服务器的回复。arguments
类型:array
。默认:[]
。ticket
类型:null | integer
。默认:null
。
您也可以使用 $exchange->reconfigure($options)
重新配置交换实例。该 $options
数组接受上述键。
此外,您还可以使用以下方法配置您的交换实例。
setName
- 接受:string
。实例化后更改交换名称的唯一方式。setDeclare
- 接受:bool
。setType
- 接受:bool
。setPassive
- 接受:bool
。setDurable
- 接受:bool
。setAutoDelete
- 接受:bool
。setInternal
- 接受:bool
。setNowait
- 接受:bool
。setArguments
- 接受:array
。setTicket
- 接受:null | integer
。
队列
要实例化队列,您可以这样做
<?php use Anik\Amqp\Queues\Queue; $queue = new Queue('anik.amqp.direct.exchange.queue'); $queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);
使用以下方式创建队列实例时
Queue::make
- 给定数组中必须存在name
键。
Queue::make
方法在创建队列实例时也接受以下键。
declare
类型:bool
。默认:false
。如果您想声明队列。passive
类型:bool
。默认:false
。如果队列是被动。durable
类型:bool
。默认:true
。如果队列是持久的。exclusive
类型:bool
。默认:false
。如果队列是排他的。auto_delete
类型:bool
。默认:false
。如果队列应该自动删除。no_wait
类型:bool
。默认:false
。如果客户端不应等待服务器的回复。arguments
类型:array
。默认:[]
。ticket
类型:null | integer
。默认:null
。
您也可以使用 $queue->reconfigure($options)
重新配置队列实例。该 $options
数组接受上述键。
此外,您还可以使用以下方法配置您的队列实例。
setName
- 接受:string
。实例化后更改队列名称的唯一方式。setDeclare
- 接受:bool
。setType
- 接受:bool
。setPassive
- 接受:bool
。setDurable
- 接受:bool
。setExclusive
- 接受:bool
。setAutoDelete
- 接受:bool
。setNowait
- 接受:bool
。setArguments
- 接受:array
。setTicket
- 接受:null | integer
。
Qos
要实例化 Qos,您可以这样做
<?php use Anik\Amqp\Qos\Qos; $prefetchSize = 0; $prefetchCount = 0; $global = false; $qos = new Qos($prefetchSize, $prefetchCount, $global); $qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);
Qos::make
方法在创建 qos 实例时也接受以下键。
prefetch_size
类型:int
。默认:0
。prefetch_count
类型:int
。默认:0
。global
类型:bool
。默认:true
。
您也可以使用 $qos->reconfigure($options)
重新配置 qos 实例。该 $options
数组接受上述键。
此外,您还可以使用以下方法配置您的 qos 实例。
setPrefetchCount
- 接受:int
。setPrefetchSize
- 接受:int
。setGlobal
- 接受:bool
。
发布/生产消息
要发布/生产消息,您需要 Anik\Amqp\Producer
实例。要实例化该类
<?php use Anik\Amqp\Producer; $producer = new Producer($connection, $channel);
构造函数接受
$connection
类型:PhpAmqpLib\Connection\AbstractConnection
。必需。$channel
类型:null | PhpAmqpLib\Channel\AMQPChannel
。可选。
如果未提供 $channel
或为 null,则类使用来自 $connection
的通道。
一旦实例化生产者类,您可以使用setChannel
设置通道。该方法接受PhpAmqpLib\Channel\AMQPChannel
实例。
发布消息有三种方式
批量发布
Producer::publishBatch
- 用于批量发布多条消息。
<?php use Anik\Amqp\Producer; (new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);
$messages
类型:Anik\Amqp\Producible[]
。如果其中任何消息不是Producible
接口的类型,将抛出错误。$routingKey
类型:string
。路由键。默认''
(空字符串)。$exchange
类型:null | Anik\Amqp\Exchanges\Exchange
。$options
类型:array
。运行时配置。- 键
exchange
- 接受:array
。- 如果您传递
null
作为$exchange
,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange
以及$options['exchange']
,则交换实例将根据$options['exchange']
中可用的值相应地重新配置。键与Exchange::make
的$options
相同。
- 如果您传递
- 键
publish
- 接受:array
。- 键
mandatory
默认false
。 - 键
immediate
默认false
。 - 键
ticket
默认null
。 - 键
batch_count
。默认:500
。在发布批次之前创建一个包含X
条消息的批次。
- 键
- 键
发布
Producer::publish
- 发布单条消息。在底层使用Producer::publishBatch
。
<?php use Anik\Amqp\Producer; (new Producer($connection))->publish($message, $routingKey, $exchange, $options);
$message
类型:Anik\Amqp\Producible
。$routingKey
类型:string
。路由键。默认''
(空字符串)。$exchange
类型:null | Anik\Amqp\Exchanges\Exchange
。$options
类型:array
。运行时配置。- 键
exchange
- 接受:array
。- 如果您传递
null
作为$exchange
,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange
以及$options['exchange']
,则交换实例将根据$options['exchange']
中可用的值相应地重新配置。键与Exchange::make
的$options
相同。
- 如果您传递
- 键
publish
- 接受:array
。- 键
mandatory
默认false
。 - 键
immediate
默认false
。 - 键
ticket
默认null
。
- 键
- 键
发布基本消息
Producer::publishBasic
- 使用AMQPChannel::basic_publish
方法发布单条消息。
<?php use Anik\Amqp\Producer; (new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);
$message
类型:Anik\Amqp\Producible
。$routingKey
类型:string
。路由键。默认''
(空字符串)。$exchange
类型:null | Anik\Amqp\Exchanges\Exchange
。$options
类型:array
。运行时配置。- 键
exchange
- 接受:array
。- 如果您传递
null
作为$exchange
,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange
以及$options['exchange']
,则交换实例将根据$options['exchange']
中可用的值相应地重新配置。键与Exchange::make
的$options
相同。
- 如果您传递
- 键
publish
- 接受:array
。- 键
mandatory
默认false
。 - 键
immediate
默认false
。 - 键
ticket
默认null
。
- 键
- 键
ProducibleMessage:Producible接口的实现
该软件包附带Anik\Amqp\ProducibleMessage
,它是Anik\Amqp\Producible
接口的通用实现。
您可以像这样实例化该类
<?php use Anik\Amqp\ProducibleMessage; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $msg = new ProducibleMessage('take my message to rabbitmq'); $msg = new ProducibleMessage('take my message to rabbitmq', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]); $msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['key' => 'value']), ]);
消费者
为了消费消息,您需要Anik\Amqp\Consumer
实例。要实例化该类
<?php use Anik\Amqp\Consumer; $consumer = new Consumer($connection, $channel, $options);
构造函数接受
$connection
类型:PhpAmqpLib\Connection\AbstractConnection
。必需。$channel
类型:null | PhpAmqpLib\Channel\AMQPChannel
。可选。$options
类型:array
。可选。消费者配置。tag
类型:string
。默认:sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())
。用于设置消费者标签。no_local
类型:bool
。默认false
。no_ack
类型:bool
。默认false
。exclusive
类型:bool
。默认false
。no_wait
类型:bool
。默认false
。arguments
类型:bool
。默认[]
。ticket
类型:null | int
。默认null
。
如果未提供 $channel
或为 null,则类使用来自 $connection
的通道。
一旦实例化消费者类,您就可以访问以下方法。
setChannel
- 接受:PhpAmqpLib\Channel\AMQPChannel
实例。reconfigure
- 接受:array
。用于重新配置实例。有效的键与构造函数的选项键相同。setConsumerTag
- 接受:string
。默认:sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())
。setNoLocal
- 接受:bool
。默认false
。setNoAck
- 接受:bool
。默认false
。setExclusive
- 接受:bool
。默认false
。setNowait
- 接受:bool
。默认false
。setArguments
- 接受:array
。默认[]
。setTicket
- 接受:null | int
。默认null
。
为了消费消息
<?php use Anik\Amqp\Consumer; (new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);
$handler
类型:Anik\Amqp\Consumable
。$bindingKey
类型:string
。绑定键。默认''
(空字符串)。$exchange
类型:null | Anik\Amqp\Exchanges\Exchange
。$queue
类型:null | Anik\Amqp\Queues\Queue
。$qos
类型:null | Anik\Amqp\Qos\Qos
。$options
类型:array
。运行时配置。consumer
- 接受:array
。键与Consumer::__construct
的选项相同。exchange
- 接受:array
。键与Exchange::make
的选项相同。- 如果您传递
null
作为$exchange
,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange
以及$options['exchange']
,则交换实例将根据$options['exchange']
中可用的值相应地重新配置。
- 如果您传递
队列
- 接受:数组
。键与Queue::make
的选项相同。- 如果传递
null
作为$queue
,则必须通过此键提供有效配置以在底层创建队列。如果传递带有 Queue 实例的$queue
和$options['queue']
,队列实例将根据$options['queue']
中提供的值进行相应重新配置。
- 如果传递
qos
- 接受:数组
。键与Qos::make
的选项相同。- 如果传递带有 Qos 实例的
$qos
和$options['qos']
,Qos 实例将相应重新配置。如果$qos
为null
且$options['qos']
包含值,则将对通道应用 QoS。如果$qos
为null
且$options['qos']
不可用,通道将不会应用任何 QoS
- 如果传递带有 Qos 实例的
bind
- 接受:数组
。用于将队列绑定到交换机。no_wait
。默认false
。arguments
。默认[]
。ticket
。默认null
。
consume
- 接受:数组
。以下值将传递给AMQPChannel::wait()
。allowed_methods
默认null
。non_blocking
默认false
。timeout
默认0
。
可消费消息:实现可消费接口
该软件包包含 Anik\Amqp\ConsumableMessage
,它是 Anik\Amqp\Consumable
接口的一个通用实现。
您可以像这样实例化该类
<?php use Anik\Amqp\ConsumableMessage; // use PhpAmqpLib\Message\AMQPMessage; $msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) { echo $message->getMessageBody() . PHP_EOL; echo $message->getRoutingKey() . PHP_EOL; $message->ack(); // Alternatively, $original->ack(); /** * Method: `decodeMessage` * Returns: * - `array` if message body contains valid json * - `null` if json could not be decoded */ var_dump($message->decodeMessage()); /** * Method: `decodeMessageAsObject` * Returns: * - `\stdClass` if message body contains valid json * - `null` if json could not be decoded */ var_dump($message->decodeMessageAsObject()); });
注意:在未设置 AMQPMessage 的情况下调用 ConsumableMessage
实例的任何方法将抛出异常。
有问题吗?
如果您发现任何问题/错误/缺失功能,请提交问题,如果可能,请提交 PR。