ozaretskii/php-snake-mqp

PHP 异步消息队列和数据处理库

v0.2.0-beta 2022-07-20 18:27 UTC

This package is auto-updated.

Last update: 2024-09-26 03:57:19 UTC


README

SnakeMQP 是一个用于通过队列异步运行和管理 PHP 任务的库。您只需几分钟就可以轻松地将代码中的任何部分集成异步执行。这个库为您提供所有必要的工具,让您可以立即开始享受异步任务处理,或在此基础上构建更复杂和可扩展的任务管理系统。

通常,我们的代码执行需要更多的时间或资源,这超出了我们针对每个相同 HTTP 请求可以承担的范围。在这种情况下,将最重或耗时最长的操作部分移至后台可以是一个明显的解决方案。后台执行允许您通过使用多个工作进程来同时处理多个任务,或者当执行顺序至关重要时,执行“链式”任务(在当前任务完成后将下一个任务放入队列)。

您可以通过将它们放入不同的队列名称并分配特定任务或队列的优先级来简单地管理您许多类型的任务。延迟执行在某些特定情况下也非常有用。为处理特定队列名称指定的专用工作进程可以使您立即处理重要任务,同时处理其他队列,以确保项目至关重要的部分始终按时运行。

SnakeMQP 允许您创建任意数量和类型的独立“工作进程”,可以从项目代码、CLI 或 REST API 中调用。这意味着您可以将代码中的任何部分的异步执行集成,即使受到底层服务器访问的限制。您的工作进程可以由简单的 crontab 脚本、CURL、外部授权的 API 调用或 Unix 守护进程(如 Supervisor 或 Systemd)触发。

使用 SnakeMQP 的好处

  • 易于集成到任何基于 PHP 的项目中
  • 高度可管理的队列系统
  • 任务延迟执行
  • 拥有仅处理特定队列或任务(具有特定优先级)的专用工作进程
  • 动态集成“工作进程”以满足项目需求

快速示例

放置任务以供进一步处理(基于 MySQL 的队列)

<?php
// Require the Composer autoloader.
require 'vendor/autoload.php';

use Ozaretskii\PhpSnakeMqp\adapters\MysqlQueueAdapter;
use Ozaretskii\PhpSnakeMqp\SnakeClient;

// Initiate queue adapter
$adapter = new MysqlQueueAdapter([
    'host' => 'localhost',
    'user' => 'root',
    'password' => 'root',
    'database' => 'test_database',
    'port' => '',
    'socket' => '',
]);
// create table if not existing yet (can be called just once per lifetime).
$adapter->prepare();
// Placing tasks into the queue
$hello = function ($arg1, $arg2) {
    var_dump($arg1);
    return $arg2;
};
$client = new SnakeClient($adapter);
// queueing closure
$client->addQueue($hello, ['function output', 'function result']);
/**
 * Worker:
 * - output: string(15) "function output"
 * - result: "function result"
 */
// queueing regular function
$client->addQueue('var_dump', ['text1', ['data' => 'array']]);
/** Worker:
 * - output: 
 * string(5) "text1"
 * array(1) {
 *   ["data"]=> string(5) "array"
 * } 
 * - result: null
 */

运行工作进程(基于 MySQL 的队列)

<?php
// Initiate queue adapter
$adapter = new MysqlQueueAdapter([
    'host' => 'localhost',
    'user' => 'root',
    'password' => 'root',
    'database' => 'test_database',
    'port' => '',
    'socket' => '',
]);
// process 10 jobs from the queue and print results
foreach ($client->processJobsInQueue(10) as $job) {
    if ($job->isSuccessful()) {
        var_dump("Job $job->id has finished.");
        var_dump("Results: ", $job->result);
        var_dump("Output: ", $job->printedOutput);
    } else {
        var_dump("Job $job->id has failed with an error.");
        var_dump("Error: ", $job->result);
        var_dump("Output: ", $job->printedOutput);
    }
}

获取帮助

如果您遇到错误,需要帮助将 SnakeMQP 集成到您的项目中,或者您需要向库中请求额外的功能,请随时在 https://github.com/ozaretskii/php-snake-mqp/issues 提交任何问题。任何 PR 都欢迎。希望这能满足您的需求。谢谢。