anik/amqp

php-amqplib包装器,简化了对RabbitMQ的消耗。使用RabbitMQ的简便方式。

v2.3 2023-03-08 00:36 UTC

This package is auto-updated.

Last update: 2024-09-09 13:16:53 UTC


README

anik/amqp codecov PHP Version Require Total Downloads Latest Stable Version

anik/amqp是一个php-amqplib包装器,简化了对RabbitMQ的消耗。使用RabbitMQ的简便方式。

注意

之前,该软件包可以直接与Laravel、Laravel Zero、Lumen一起使用。从v2版本开始,移除了Laravel的支持。如果您正在寻找与Laravel的集成实现,可以使用anik/laravel-amqp。如果您正在使用此软件包与Laravel一起,并希望升级到Laravel 9,如果您以后想迁移到anik/laravel-amqp,请考虑使用anik/amqp-to-laravel-amqp

示例

查看存储库以获取示例。

要求

  • PHP ^7.2 | ^8.0
  • PHP-AMQPLib ^3.0

安装

要安装此软件包,请运行

composer require anik/amqp

文档

对于V1: https://medium.com/@sirajul.anik/rabbitmq-for-php-developers-c17cd019a90

连接

要创建AMQP连接,您可以使用

  • Anik\Amqp\AmqpConnectionFactory::make
  • Anik\Amqp\AmqpConnectionFactory::makeFromArray
<?php

use Anik\Amqp\AmqpConnectionFactory;
use PhpAmqpLib\Connection\AMQPLazySSLConnection;

$host = '127.0.0.1';
$port = 5672;
$user = 'user';
$password = 'password';
$vhost = '/';
$options = []; // options to be proxied to the amqp connection class
$ofClass = AMQPLazySSLConnection::class;

$connection = AmqpConnectionFactory::make($host, $port, $user, $password, $vhost, $options, $ofClass);
$hosts = [
    [
        'host' => $host,
        'port' => $port,
        'user' => $user,
        'password' => $password,
        'vhost' => $vhost,
    ],
    [
        'host' => $host,
        'port' => $port,
        'user' => $user,
        'password' => $password,
        'vhost' => $vhost,
    ]
];

// With AmqpConnectionFactory::makeFromArray method, you can try to connect to multiple host
$connection = AmqpConnectionFactory::makeFromArray($hosts, $options, $ofClass);

交换

此外,还有四个特定的交换类。

  • Anik\Amqp\Exchanges\Direct用于direct交换。
  • Anik\Amqp\Exchanges\Fanout用于fanout交换。
  • Anik\Amqp\Exchanges\Headers用于headers交换。
  • Anik\Amqp\Exchanges\Topic用于topic交换。

您仍然可以使用Anik\Amqp\Exchanges\Exchange基类来创建自己的交换。

要实例化交换,您可以这样做

<?php

use Anik\Amqp\Exchanges\Exchange;
use Anik\Amqp\Exchanges\Fanout;
use Anik\Amqp\Exchanges\Topic;

$exchange = new Exchange('anik.amqp.direct.exchange', Exchange::TYPE_DIRECT);

$exchange = Exchange::make(['name' => 'anik.amqp.direct.exchange', 'type' => Exchange::TYPE_DIRECT]);

$exchange = new Topic('anik.amqp.topic.exchange');

$exchange = Fanout::make(['name' => 'anik.amqp.fanout.exchange']);

在创建交换实例时

  • Exchange::make - 必须在给定的数组中存在nametype键。
  • Topic::make Fanout::make Headers::make Direct::make - 必须在给定的数组中存在name键。

Anik\Amqp\Exchanges\Exchange包含一些预定义的交换类型,您可以将其用作参考。

  • TYPE_DIRECT用于direct类型。
  • TYPE_TOPIC用于topic类型。
  • TYPE_FANOUT用于fanout类型。
  • TYPE_HEADERS用于headers类型。

当创建交换实例时,Exchange::make方法还接受以下键。

  • declare 类型:bool。默认:false。如果您想声明交换。
  • passive 类型:bool。默认:false。如果交换是被动。
  • durable 类型:bool。默认:true。如果交换是持久的。
  • auto_delete 类型:bool。默认:false。如果交换应该自动删除。
  • internal 类型:bool。默认:false。如果交换是内部的。
  • no_wait 类型:bool。默认:false。如果客户端不应等待服务器的回复。
  • arguments 类型:array。默认:[]
  • ticket 类型:null | integer。默认:null

您也可以使用 $exchange->reconfigure($options) 重新配置交换实例。该 $options 数组接受上述键。

此外,您还可以使用以下方法配置您的交换实例。

  • setName - 接受:string。实例化后更改交换名称的唯一方式。
  • setDeclare - 接受:bool
  • setType - 接受:bool
  • setPassive - 接受:bool
  • setDurable - 接受:bool
  • setAutoDelete - 接受:bool
  • setInternal - 接受:bool
  • setNowait - 接受:bool
  • setArguments - 接受:array
  • setTicket - 接受:null | integer

队列

要实例化队列,您可以这样做

<?php

use Anik\Amqp\Queues\Queue;

$queue = new Queue('anik.amqp.direct.exchange.queue');

$queue = Queue::make(['name' => 'anik.amqp.direct.exchange.queue']);

使用以下方式创建队列实例时

  • Queue::make - 给定数组中必须存在 name 键。

Queue::make 方法在创建队列实例时也接受以下键。

  • declare 类型:bool。默认:false。如果您想声明队列。
  • passive 类型:bool。默认:false。如果队列是被动。
  • durable 类型:bool。默认:true。如果队列是持久的。
  • exclusive 类型:bool。默认:false。如果队列是排他的。
  • auto_delete 类型:bool。默认:false。如果队列应该自动删除。
  • no_wait 类型:bool。默认:false。如果客户端不应等待服务器的回复。
  • arguments 类型:array。默认:[]
  • ticket 类型:null | integer。默认:null

您也可以使用 $queue->reconfigure($options) 重新配置队列实例。该 $options 数组接受上述键。

此外,您还可以使用以下方法配置您的队列实例。

  • setName - 接受:string。实例化后更改队列名称的唯一方式。
  • setDeclare - 接受:bool
  • setType - 接受:bool
  • setPassive - 接受:bool
  • setDurable - 接受:bool
  • setExclusive - 接受:bool
  • setAutoDelete - 接受:bool
  • setNowait - 接受:bool
  • setArguments - 接受:array
  • setTicket - 接受:null | integer

Qos

要实例化 Qos,您可以这样做

<?php

use Anik\Amqp\Qos\Qos;

$prefetchSize = 0;
$prefetchCount = 0;
$global = false;

$qos = new Qos($prefetchSize, $prefetchCount, $global);

$qos = Queue::make(['prefetch_size' => $prefetchSize, 'prefetch_count' => $prefetchCount, 'global' => $global]);

Qos::make 方法在创建 qos 实例时也接受以下键。

  • prefetch_size 类型:int。默认:0
  • prefetch_count 类型:int。默认:0
  • global 类型:bool。默认:true

您也可以使用 $qos->reconfigure($options) 重新配置 qos 实例。该 $options 数组接受上述键。

此外,您还可以使用以下方法配置您的 qos 实例。

  • setPrefetchCount - 接受:int
  • setPrefetchSize - 接受:int
  • setGlobal - 接受:bool

发布/生产消息

要发布/生产消息,您需要 Anik\Amqp\Producer 实例。要实例化该类

<?php

use Anik\Amqp\Producer;

$producer = new Producer($connection, $channel);

构造函数接受

  • $connection 类型:PhpAmqpLib\Connection\AbstractConnection。必需。
  • $channel 类型:null | PhpAmqpLib\Channel\AMQPChannel。可选。

如果未提供 $channel 或为 null,则类使用来自 $connection 的通道。

一旦实例化生产者类,您可以使用setChannel设置通道。该方法接受PhpAmqpLib\Channel\AMQPChannel实例。

发布消息有三种方式

批量发布

Producer::publishBatch - 用于批量发布多条消息。

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publishBatch($messages, $routingKey, $exchange, $options);
  • $messages 类型:Anik\Amqp\Producible[]。如果其中任何消息不是Producible接口的类型,将抛出错误。
  • $routingKey 类型:string。路由键。默认''(空字符串)。
  • $exchange 类型:null | Anik\Amqp\Exchanges\Exchange
  • $options 类型:array。运行时配置。
    • exchange - 接受:array
      • 如果您传递null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。键与Exchange::make$options相同。
    • publish - 接受:array
      • mandatory 默认 false
      • immediate 默认 false
      • ticket 默认 null
      • batch_count。默认:500。在发布批次之前创建一个包含X条消息的批次。

发布

Producer::publish - 发布单条消息。在底层使用Producer::publishBatch

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publish($message, $routingKey, $exchange, $options);
  • $message 类型:Anik\Amqp\Producible
  • $routingKey 类型:string。路由键。默认''(空字符串)。
  • $exchange 类型:null | Anik\Amqp\Exchanges\Exchange
  • $options 类型:array。运行时配置。
    • exchange - 接受:array
      • 如果您传递null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。键与Exchange::make$options相同。
    • publish - 接受:array
      • mandatory 默认 false
      • immediate 默认 false
      • ticket 默认 null

发布基本消息

Producer::publishBasic - 使用AMQPChannel::basic_publish方法发布单条消息。

<?php

use Anik\Amqp\Producer;

(new Producer($connection))->publishBasic($message, $routingKey, $exchange, $options);
  • $message 类型:Anik\Amqp\Producible
  • $routingKey 类型:string。路由键。默认''(空字符串)。
  • $exchange 类型:null | Anik\Amqp\Exchanges\Exchange
  • $options 类型:array。运行时配置。
    • exchange - 接受:array
      • 如果您传递null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。键与Exchange::make$options相同。
    • publish - 接受:array
      • mandatory 默认 false
      • immediate 默认 false
      • ticket 默认 null

ProducibleMessage:Producible接口的实现

该软件包附带Anik\Amqp\ProducibleMessage,它是Anik\Amqp\Producible接口的通用实现。

您可以像这样实例化该类

<?php

use Anik\Amqp\ProducibleMessage;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new ProducibleMessage('take my message to rabbitmq');

$msg = new ProducibleMessage('take my message to rabbitmq', [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$msg = (new ProducibleMessage())->setMessage('take my message to rabbitmq')->setProperties([
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers' => new AMQPTable(['key' => 'value']),
]);

消费者

为了消费消息,您需要Anik\Amqp\Consumer实例。要实例化该类

<?php

use Anik\Amqp\Consumer;

$consumer = new Consumer($connection, $channel, $options);

构造函数接受

  • $connection 类型:PhpAmqpLib\Connection\AbstractConnection。必需。
  • $channel 类型:null | PhpAmqpLib\Channel\AMQPChannel。可选。
  • $options 类型:array。可选。消费者配置。
    • tag 类型:string。默认:sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())。用于设置消费者标签。
    • no_local 类型:bool。默认 false
    • no_ack 类型:bool。默认 false
    • exclusive 类型:bool。默认 false
    • no_wait 类型:bool。默认 false
    • arguments 类型:bool。默认 []
    • ticket 类型:null | int。默认 null

如果未提供 $channel 或为 null,则类使用来自 $connection 的通道。

一旦实例化消费者类,您就可以访问以下方法。

  • setChannel - 接受:PhpAmqpLib\Channel\AMQPChannel实例。
  • reconfigure - 接受:array。用于重新配置实例。有效的键与构造函数的选项键相同。
  • setConsumerTag - 接受:string。默认:sprintf("anik.amqp_consumer_%s_%s", gethostname(), getmypid())
  • setNoLocal - 接受:bool。默认 false
  • setNoAck - 接受:bool。默认 false
  • setExclusive - 接受:bool。默认 false
  • setNowait - 接受:bool。默认 false
  • setArguments - 接受:array。默认 []
  • setTicket - 接受:null | int。默认 null

为了消费消息

<?php

use Anik\Amqp\Consumer;

(new Consumer($connection, $channel, $options))->consume($handler, $bindingKey, $exchange, $queue, $qos, $options);
  • $handler 类型:Anik\Amqp\Consumable
  • $bindingKey 类型:string。绑定键。默认 ''(空字符串)。
  • $exchange 类型:null | Anik\Amqp\Exchanges\Exchange
  • $queue 类型:null | Anik\Amqp\Queues\Queue
  • $qos 类型:null | Anik\Amqp\Qos\Qos
  • $options 类型:array。运行时配置。
    • consumer - 接受:array。键与Consumer::__construct的选项相同。
    • exchange - 接受:array。键与Exchange::make的选项相同。
      • 如果您传递null作为$exchange,则必须通过此键提供有效的配置来在底层创建一个交换。如果您传递带有交换实例的$exchange以及$options['exchange'],则交换实例将根据$options['exchange']中可用的值相应地重新配置。
    • 队列 - 接受: 数组。键与 Queue::make 的选项相同。
      • 如果传递 null 作为 $queue,则必须通过此键提供有效配置以在底层创建队列。如果传递带有 Queue 实例的 $queue$options['queue'],队列实例将根据 $options['queue'] 中提供的值进行相应重新配置。
    • qos - 接受: 数组。键与 Qos::make 的选项相同。
      • 如果传递带有 Qos 实例的 $qos$options['qos'],Qos 实例将相应重新配置。如果 $qosnull$options['qos'] 包含值,则将对通道应用 QoS。如果 $qosnull$options['qos'] 不可用,通道将不会应用任何 QoS
    • bind - 接受: 数组。用于将队列绑定到交换机。
      • no_wait。默认 false
      • arguments。默认 []
      • ticket。默认 null
    • consume - 接受: 数组。以下值将传递给 AMQPChannel::wait()
      • allowed_methods 默认 null
      • non_blocking 默认 false
      • timeout 默认 0

可消费消息:实现可消费接口

该软件包包含 Anik\Amqp\ConsumableMessage,它是 Anik\Amqp\Consumable 接口的一个通用实现。

您可以像这样实例化该类

<?php

use Anik\Amqp\ConsumableMessage;
// use PhpAmqpLib\Message\AMQPMessage;

$msg = new ConsumableMessage(function (ConsumableMessage $message/*, AMQPMessage $original*/) {
    echo $message->getMessageBody() . PHP_EOL;
    echo $message->getRoutingKey() . PHP_EOL;
    $message->ack();
    // Alternatively, $original->ack();

    /** 
     * Method: `decodeMessage` 
     * Returns:
     *      - `array` if message body contains valid json
     *      - `null` if json could not be decoded 
     */
    var_dump($message->decodeMessage());

    /** 
     * Method: `decodeMessageAsObject` 
     * Returns:
     *      - `\stdClass` if message body contains valid json
     *      - `null` if json could not be decoded 
     */
    var_dump($message->decodeMessageAsObject());
});

注意:在未设置 AMQPMessage 的情况下调用 ConsumableMessage 实例的任何方法将抛出异常。

有问题吗?

如果您发现任何问题/错误/缺失功能,请提交问题,如果可能,请提交 PR。