ilepilin/queue

PHP AMQP Facade

1.0.1 2021-03-03 09:43 UTC

This package is auto-updated.

Last update: 2024-09-29 05:09:29 UTC


README

RabbitMQ 队列模块。

主要特点

模块允许执行延迟计算,并可以在单个项目或项目之间传递信息。

模块不依赖于任何框架,仅需要 PHP 5.4 或更高版本,以及 bcmathpdo_mysql 扩展。

容错性

为了确保容错性,模块支持同时使用不同的队列驱动。

在配置中首先指定的驱动将被用作主驱动。其余的驱动将作为备用。

模块中包含 RabbitMQ 和 MySQL 驱动。如果需要添加自己的驱动,需要在 QueueFacade 配置中指定它,并且它必须实现 DriverInterface 接口。

项目间消息传递

要从一个项目向另一个项目传递消息,它们必须连接到同一个 rabbitmq 服务器(并且对于备用队列具有共同的 MySQL 连接)。

可能存在一个问题,即不同项目中的 Payload 类的命名空间不同。为了避免错误,需要将发送消息的项目中的 Payload 类与接收消息的项目中的相同 Payload 类关联起来。

这可以通过在接收消息的项目中配置驱动器的 payloadMap 属性来实现。

'payloadMap' => [
    'libs\queue\component1\Payload' => 'common\components\queue\component1\Payload',
    'libs\queue\component2\Payload' => 'common\components\queue\component2\Payload',
],

开始使用

要使用该模块,需要以下内容:

安装

要安装模块,需要将其添加到 composer

php composer.phar require ilepilin/queue

或添加到 composer.json 文件并运行更新

"ilepilin/queue": "1.*"
composer update ilepilin

配置

模块的主要组件 QueueFacade 可以用作单例,并通过 Service Locator 访问。

以下是一个为 Yii2 项目提供的配置示例

'components' => [
    ...
    
    'queue' => [
        'class' => '\ilepilin\queue\QueueFacade',
        'drivers' => [
            [
                'class' => '\ilepilin\queue\driver\RabbitMQ',
                'host' => '127.0.0.1',
                'port' => 5672,
                'user' => 'rabbitmq',
                'password' => '******',
                'payloadMap' => [
                    'libs\queue\component1\Payload' => 'common\components\queue\component1\Payload',
                ],
                'loggerClass' => '\yii\log\Logger',
            ],
            [
                'class' => '\ilepilin\queue\driver\MySQL',
                'username' => 'project_user',
                'password' => '******',
                'host' => '127.0.0.1',
                'port' => 3307, // если порт не стандартный
                'dbname' => 'project_db',
                'payloadMap' => [
                    'libs\queue\component1\Payload' => 'common\components\queue\component1\Payload',
                ],
                'loggerClass' => '\yii\log\Logger',
                
                // указать true, когда таблица для резервной очереди будет создана. 
                // в противном случае, при каждом использовании драйвера будет лишний SQL запрос для проверки существования таблицы
                'isTableCreated' => true,
            ]
        ],
    ],
    
    ...
]

在当前示例中安装了 2 个驱动

  • RabbitMQ 作为主驱动;
  • MySQL 作为备用驱动。

使用

如上例所示的配置好的 QueueFacade 可以按以下方式方便地使用

添加到队列

/** @var \ilepilin\queue\BasePayload $payload */
$payload = new Payload($data);

Yii::$app->queue->push(Worker::channelName(), $payload);

从队列中提取

处理队列消息的示例

/** @var \ilepilin\queue\WorkerInterface $worker */
$worker = new \path\to\Worker();

/** @var \ilepilin\queue\QueueFacade $facade */
$facade = Yii::$app->get('queue');

$listener = new \ilepilin\queue\listener\Listener($facade, $worker);
$listener->handle();

// $listener->handle(\ilepilin\queue\driver\MySQL::getCode()); // для резервных очередей

handle() 方法中,Listener 将尝试通过指定的驱动获取最旧的消息。

如果没有指定驱动,则将使用配置中指定的第一个驱动 - 在本例中是 RabbitMQ

成功获取消息后,Listener 将将其发送到处理,传递给 $worker->work()

消息的后台处理

在文件 daemon.php 中有处理队列消息的示例。

通过 CLI 启动,第一个参数需要传递通道名称。

php daemon.php channel_name

为了方便控制守护进程,建议使用 supervisord。

为此,需要为每个守护进程在 /etc/supervisord.d/conf.d/ 中创建一个配置文件

[program:daemon_name]
command=/usr/bin/php -q /path/to/daemon.php channel_name
umprocs=1
autostart=true
autorestart=true
startretries=10
user = user1
group = group1
startsecs = 0
stdout_logfile=/path/to/daemon_name.log

添加配置后,可以运行以下命令来启动

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start demon_name

处理备用队列的消息

备用队列的消息处理可以通过 cron 来执行。

以下是一个示例脚本,该脚本通过 cron 执行,用于将消息从备用队列发送到主队列

$channelNames = [
    Worker1::channelName(),
    Worker2::channelName(),
    Worker3::channelName(),
    ...
];

/** @var \ilepilin\queue\QueueFacade $facade */
$facade = Yii::$app->queue;

$driver = $facade->getDriver(\ilepilin\queue\driver\MySQL::getCode());

foreach ($channelNames as $channelName) {
    while ($message = $driver->pop($channelName)) {
        $facade->push($channelName, $message->data, 300);
    }
}

许可协议

版权所有 (c) 2019 Lepilin Igor

本许可证允许获得本软件及其相关文档副本(以下简称“软件”)的个人免费使用软件,不受限制,包括无限使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本的权利,以及提供给本软件的个人,在遵守以下条件的情况下。

上述版权声明和这些条件必须包含在所有副本或相关部分的软件中。

本软件“按原样”提供,不提供任何明示或暗示的保证,包括适销性、特定用途的适用性和非侵权性,但不仅限于此。在任何情况下,作者或权利所有者不对因使用软件或与软件相关的任何行为而产生的任何索赔、损害或要求承担责任,包括合同、侵权或其他情况。