anik / amqp
php-amqplib包装器,简化了对RabbitMQ的消耗。使用RabbitMQ的简便方式。
Requires
- php: ^7.2|^8.0
- php-amqplib/php-amqplib: ^3.2
Requires (Dev)
- phpunit/phpunit: ^8.5|^9.5
README
anik/amqp

anik/amqp是一个php-amqplib包装器,简化了对RabbitMQ的消耗。使用RabbitMQ的简便方式。
注意
之前,该软件包可以直接与Laravel、Laravel Zero、Lumen一起使用。从v2版本开始,移除了Laravel的支持。如果您正在寻找与Laravel的集成实现,可以使用anik/laravel-amqp。如果您正在使用此软件包与Laravel一起,并希望升级到Laravel 9,如果您以后想迁移到anik/laravel-amqp,请考虑使用anik/amqp-to-laravel-amqp。
示例
查看存储库以获取示例。
要求
- PHP
^7.2 | ^8.0 - PHP-AMQPLib
^3.0
安装
要安装此软件包,请运行
composer require anik/amqp
文档
对于V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90
连接
要创建AMQP连接,您可以使用
Anik\Amqp\AmqpConnectionFactory::makeAnik\Amqp\AmqpConnectionFactory::makeFromArray
<?php use Anik\Amqp\AmqpConnectionFactory; use PhpAmqpLib\Connection\AMQPLazySSLConnection; $host = '127.0.0.1'; $port = 5672; $user = 'user'; $password = 'password'; $vhost = '/'; $options = []; // options to be proxied to the amqp connection class $ofClass = AMQPLazySSLConnection::class; $connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass); $hosts = [ [ 'host' => $host, 'port' => $port, 'user' => $user, 'password' => $password, 'vhost' => $vhost, ], [ 'host' => $host, 'port' => $port, 'user' => $user, 'password' => $password, 'vhost' => $vhost, ] ]; // With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host $connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);
交换
此外,还有四个特定的交换类。
Anik\Amqp\Exchanges\Direct用于direct交换。Anik\Amqp\Exchanges\Fanout用于fanout交换。Anik\Amqp\Exchanges\Headers用于headers交换。Anik\Amqp\Exchanges\Topic用于topic交换。
您仍然可以使用Anik\Amqp\Exchanges\Exchange基类来创建自己的交换。
要实例化交换,您可以这样做
<?php use Anik\Amqp\Exchanges\Exchange; use Anik\Amqp\Exchanges\Fanout; use Anik\Amqp\Exchanges\Topic; $exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT); $exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]); $exchange = new Topic('anik.amqp.topic.exchange'); $exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);
在创建交换实例时
Exchange::make- 必须在给定的数组中存在name和type键。Topic::makeFanout::makeHeaders::makeDirect::make- 必须在给定的数组中存在name键。
Anik\Amqp\Exchanges\Exchange包含一些预定义的交换类型,您可以将其用作参考。
TYPE_DIRECT用于direct类型。TYPE_TOPIC用于topic类型。TYPE_FANOUT用于fanout类型。TYPE_HEADERS用于headers类型。
当创建交换实例时,Exchange::make方法还接受以下键。
declare类型:bool。默认:false。如果您想声明交换。passive类型:bool。默认:false。如果交换是被动。durable类型:bool。默认:true。如果交换是持久的。auto_delete类型:bool。默认:false。如果交换应该自动删除。internal类型:bool。默认:false。如果交换是内部的。no_wait类型:bool。默认:false。如果客户端不应等待服务器的回复。arguments类型:array。默认:[]。ticket类型:null | integer。默认:null。
您也可以使用 $exchange->reconfigure($options) 重新配置交换实例。该 $options 数组接受上述键。
此外,您还可以使用以下方法配置您的交换实例。
setName- 接受:string。实例化后更改交换名称的唯一方式。setDeclare- 接受:bool。setType- 接受:bool。setPassive- 接受:bool。setDurable- 接受:bool。setAutoDelete- 接受:bool。setInternal- 接受:bool。setNowait- 接受:bool。setArguments- 接受:array。setTicket- 接受:null | integer。
队列
要实例化队列,您可以这样做
<?php use Anik\Amqp\Queues\Queue; $queue = new Queue('anik.amqp.direct.exchange.queue'); $queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);
使用以下方式创建队列实例时
Queue::make- 给定数组中必须存在name键。
Queue::make 方法在创建队列实例时也接受以下键。
declare类型:bool。默认:false。如果您想声明队列。passive类型:bool。默认:false。如果队列是被动。durable类型:bool。默认:true。如果队列是持久的。exclusive类型:bool。默认:false。如果队列是排他的。auto_delete类型:bool。默认:false。如果队列应该自动删除。no_wait类型:bool。默认:false。如果客户端不应等待服务器的回复。arguments类型:array。默认:[]。ticket类型:null | integer。默认:null。
您也可以使用 $queue->reconfigure($options) 重新配置队列实例。该 $options 数组接受上述键。
此外,您还可以使用以下方法配置您的队列实例。
setName- 接受:string。实例化后更改队列名称的唯一方式。setDeclare- 接受:bool。setType- 接受:bool。setPassive- 接受:bool。setDurable- 接受:bool。setExclusive- 接受:bool。setAutoDelete- 接受:bool。setNowait- 接受:bool。setArguments- 接受:array。setTicket- 接受:null | integer。
Qos
要实例化 Qos,您可以这样做
<?php use Anik\Amqp\Qos\Qos; $prefetchSize = 0; $prefetchCount = 0; $global = false; $qos = new Qos($prefetchSize, $prefetchCount, $global); $qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);
Qos::make 方法在创建 qos 实例时也接受以下键。
prefetch_size类型:int。默认:0。prefetch_count类型:int。默认:0。global类型:bool。默认:true。
您也可以使用 $qos->reconfigure($options) 重新配置 qos 实例。该 $options 数组接受上述键。
此外,您还可以使用以下方法配置您的 qos 实例。
setPrefetchCount- 接受:int。setPrefetchSize- 接受:int。setGlobal- 接受:bool。
发布/生产消息
要发布/生产消息,您需要 Anik\Amqp\Producer 实例。要实例化该类
<?php use Anik\Amqp\Producer; $producer = new Producer($connection, $channel);
构造函数接受
$connection类型:PhpAmqpLib\Connection\AbstractConnection。必需。$channel类型:null | PhpAmqpLib\Channel\AMQPChannel。可选。
如果未提供 $channel 或为 null,则类使用来自 $connection 的通道。
一旦实例化生产者类,您可以使用setChannel设置通道。该方法接受PhpAmqpLib\Channel\AMQPChannel实例。
发布消息有三种方式
批量发布
Producer::publishBatch - 用于批量发布多条消息。
<?php use Anik\Amqp\Producer; (new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);
$messages类型:Anik\Amqp\Producible[]。如果其中任何消息不是Producible接口的类型,将抛出错误。$routingKey类型:string。路由键。默认''(空字符串)。$exchange类型:null | Anik\Amqp\Exchanges\Exchange。$options类型:array。运行时配置。- 键
exchange- 接受:array。- 如果您传递
null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。键与Exchange::make的$options相同。
- 如果您传递
- 键
publish- 接受:array。- 键
mandatory默认false。 - 键
immediate默认false。 - 键
ticket默认null。 - 键
batch_count。默认:500。在发布批次之前创建一个包含X条消息的批次。
- 键
- 键
发布
Producer::publish - 发布单条消息。在底层使用Producer::publishBatch。
<?php use Anik\Amqp\Producer; (new Producer($connection))->publish($message, $routingKey, $exchange, $options);
$message类型:Anik\Amqp\Producible。$routingKey类型:string。路由键。默认''(空字符串)。$exchange类型:null | Anik\Amqp\Exchanges\Exchange。$options类型:array。运行时配置。- 键
exchange- 接受:array。- 如果您传递
null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。键与Exchange::make的$options相同。
- 如果您传递
- 键
publish- 接受:array。- 键
mandatory默认false。 - 键
immediate默认false。 - 键
ticket默认null。
- 键
- 键
发布基本消息
Producer::publishBasic - 使用AMQPChannel::basic_publish方法发布单条消息。
<?php use Anik\Amqp\Producer; (new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);
$message类型:Anik\Amqp\Producible。$routingKey类型:string。路由键。默认''(空字符串)。$exchange类型:null | Anik\Amqp\Exchanges\Exchange。$options类型:array。运行时配置。- 键
exchange- 接受:array。- 如果您传递
null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。键与Exchange::make的$options相同。
- 如果您传递
- 键
publish- 接受:array。- 键
mandatory默认false。 - 键
immediate默认false。 - 键
ticket默认null。
- 键
- 键
ProducibleMessage:Producible接口的实现
该软件包附带Anik\Amqp\ProducibleMessage,它是Anik\Amqp\Producible接口的通用实现。
您可以像这样实例化该类
<?php use Anik\Amqp\ProducibleMessage; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; $msg = new ProducibleMessage('take my message to rabbitmq'); $msg = new ProducibleMessage('take my message to rabbitmq', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]); $msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['key' => 'value']), ]);
消费者
为了消费消息,您需要Anik\Amqp\Consumer实例。要实例化该类
<?php use Anik\Amqp\Consumer; $consumer = new Consumer($connection, $channel, $options);
构造函数接受
$connection类型:PhpAmqpLib\Connection\AbstractConnection。必需。$channel类型:null | PhpAmqpLib\Channel\AMQPChannel。可选。$options类型:array。可选。消费者配置。tag类型:string。默认:sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())。用于设置消费者标签。no_local类型:bool。默认false。no_ack类型:bool。默认false。exclusive类型:bool。默认false。no_wait类型:bool。默认false。arguments类型:bool。默认[]。ticket类型:null | int。默认null。
如果未提供 $channel 或为 null,则类使用来自 $connection 的通道。
一旦实例化消费者类,您就可以访问以下方法。
setChannel- 接受:PhpAmqpLib\Channel\AMQPChannel实例。reconfigure- 接受:array。用于重新配置实例。有效的键与构造函数的选项键相同。setConsumerTag- 接受:string。默认:sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())。setNoLocal- 接受:bool。默认false。setNoAck- 接受:bool。默认false。setExclusive- 接受:bool。默认false。setNowait- 接受:bool。默认false。setArguments- 接受:array。默认[]。setTicket- 接受:null | int。默认null。
为了消费消息
<?php use Anik\Amqp\Consumer; (new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);
$handler类型:Anik\Amqp\Consumable。$bindingKey类型:string。绑定键。默认''(空字符串)。$exchange类型:null | Anik\Amqp\Exchanges\Exchange。$queue类型:null | Anik\Amqp\Queues\Queue。$qos类型:null | Anik\Amqp\Qos\Qos。$options类型:array。运行时配置。consumer- 接受:array。键与Consumer::__construct的选项相同。exchange- 接受:array。键与Exchange::make的选项相同。- 如果您传递
null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。
- 如果您传递
队列- 接受:数组。键与Queue::make的选项相同。- 如果传递
null作为$queue,则必须通过此键提供有效配置以在底层创建队列。如果传递带有 Queue 实例的$queue和$options['queue'],队列实例将根据$options['queue']中提供的值进行相应重新配置。
- 如果传递
qos- 接受:数组。键与Qos::make的选项相同。- 如果传递带有 Qos 实例的
$qos和$options['qos'],Qos 实例将相应重新配置。如果$qos为null且$options['qos']包含值,则将对通道应用 QoS。如果$qos为null且$options['qos']不可用,通道将不会应用任何 QoS
- 如果传递带有 Qos 实例的
bind- 接受:数组。用于将队列绑定到交换机。no_wait。默认false。arguments。默认[]。ticket。默认null。
consume- 接受:数组。以下值将传递给AMQPChannel::wait()。allowed_methods默认null。non_blocking默认false。timeout默认0。
可消费消息:实现可消费接口
该软件包包含 Anik\Amqp\ConsumableMessage,它是 Anik\Amqp\Consumable 接口的一个通用实现。
您可以像这样实例化该类
<?php use Anik\Amqp\ConsumableMessage; // use PhpAmqpLib\Message\AMQPMessage; $msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) { echo $message->getMessageBody() . PHP_EOL; echo $message->getRoutingKey() . PHP_EOL; $message->ack(); // Alternatively, $original->ack(); /** * Method: `decodeMessage` * Returns: * - `array` if message body contains valid json * - `null` if json could not be decoded */ var_dump($message->decodeMessage()); /** * Method: `decodeMessageAsObject` * Returns: * - `\stdClass` if message body contains valid json * - `null` if json could not be decoded */ var_dump($message->decodeMessageAsObject()); });
注意:在未设置 AMQPMessage 的情况下调用 ConsumableMessage 实例的任何方法将抛出异常。
有问题吗?
如果您发现任何问题/错误/缺失功能,请提交问题,如果可能,请提交 PR。