zwirek/reactphp-limiter

使用 ReactPHP 限制并发作业执行的类集

v0.1.0 2024-01-04 21:43 UTC

This package is auto-updated.

Last update: 2024-09-04 23:22:47 UTC


README

为与 ReactPHP 一起使用而设计的限制器集

目录

  1. 简介
  2. 用法
  3. 限制器接口

简介

此存储库包含多个为与 ReactPHP 一起使用而设计的限制器类。每个限制器都必须有一个注册的处理程序(可调用对象),该处理程序将在达到限制之前立即被调用。任何超出限制的额外请求都将排队。针对不同的目的存在不同的限制器

  • 速率限制器:限制给定限制的并发处理程序执行
  • 时间窗口限制器:对于时间窗口,将处理程序执行限制到指定的限制。
  • 突变限制器:限制针对资源的处理程序执行。

用法

速率限制器

将并发处理程序执行限制到给定限制

$loop = \React\EventLoop\Loop::get();

$limiter = new \Zwirek\Limiter\RateLimiter(2, function($counter) use ($loop) {
    $deferred = new \React\Promise\Deferred();

    echo 'execute for ', $counter, PHP_EOL;

    $loop->addTimer(2, function() use ($deferred, $counter) {
        $deferred->resolve('return ' . $counter . PHP_EOL);
    });

    return $deferred->promise();
});

for ($i = 1; $i <= 10; $i++) {
    $limiter->handle($i)->then(function($resolve) { echo $resolve; });
}

$loop->run();

处理程序将同时在同一时间被调用两次。

可以通过设置溢出限制来限制等待执行的作业的数量

$loop = \React\EventLoop\Loop::get();

$limiter = new \Zwirek\Limiter\RateLimiter(2, function($counter) use ($loop) {
    $deferred = new \React\Promise\Deferred();

    echo 'execute for ', $counter, PHP_EOL;

    $loop->addTimer(2, function() use ($deferred, $counter) {
        $deferred->resolve('return ' . $counter . PHP_EOL);
    });

    return $deferred->promise();
}, 5);

for ($i = 1; $i <= 10; $i++) {
    $limiter->handle($i)
        ->then(
            function ($resolve) {
                echo $resolve;
            },
            function (OverflowException $exception) use ($i) {
                echo 'Overflow limit reached for call ', $i, PHP_EOL;
            }
        );
}

$loop->run();

超出限制的调用将立即被拒绝。

时间窗口限制器

此限制器负责在时间窗口内限制给定限制下的处理程序执行。例如,可以将作业执行限制为每1分钟100次。

$loop = \React\EventLoop\Loop::get();

$limiter = new \Zwirek\Limiter\TimeWindowLimiter(2, 500, function($counter) use ($loop) {
    $deferred = new \React\Promise\Deferred();

    echo 'execute for ', $counter, PHP_EOL;

    $loop->addTimer(1, function() use ($deferred, $counter) {
        $deferred->resolve('return ' . $counter . PHP_EOL);
    });

    return $deferred->promise();
});

for ($i = 1; $i <= 10; $i++) {
    $limiter->handle($i)->then(function($resolve) { echo $resolve; });
}

$loop->run();

在此示例中,处理程序每半秒被调用两次。下一次调用将立即在下一个窗口开始时开始,即使前一个窗口的作业处于挂起状态。

可以通过设置溢出限制来限制等待执行的作业的数量。

$loop = \React\EventLoop\Loop::get();

$limiter = new \Zwirek\Limiter\TimeWindowLimiter(2, 500, function($counter) use ($loop) {
    $deferred = new \React\Promise\Deferred();

    echo 'execute for ', $counter, PHP_EOL;

    $loop->addTimer(1, function() use ($deferred, $counter) {
        $deferred->resolve('return ' . $counter . PHP_EOL);
    });

    return $deferred->promise();
}, 5);

for ($i = 1; $i <= 10; $i++) {
    $limiter->handle($i)
        ->then(
            function ($resolve) {
                echo $resolve;
            },
            function (OverflowException $exception) use ($i) {
                echo 'Overflow limit reached for call ', $i, PHP_EOL;
            }
        );
}

$loop->run();

突变限制器

此限制器可以限制特定资源的并发作业调用。资源可以是任何东西,如文件、连接、数据库中的行,因为限制器需要额外的回调来返回资源ID。资源ID必须是字符串、整数或浮点数。

$loop = \React\EventLoop\Loop::get();

$limiter = new \Zwirek\Limiter\MutationLimiter(
    function($counter, $resource) use ($loop) {
        $deferred = new \React\Promise\Deferred();

        echo 'execute counter ', $counter, ' for resource ', $resource, PHP_EOL;

        $loop->addTimer(1, function() use ($deferred, $counter) {
            $deferred->resolve('return ' . $counter . PHP_EOL);
        });

        return $deferred->promise();
    },
    function($counter, $resource) {
        return $resource;
    }
);

$successCallback = function ($resolve) {
    echo $resolve;
};

for ($i = 1; $i <= 5; $i++) {
    $limiter->handle($i, 1)->then($successCallback);
    $limiter->handle($i, 2)->then($successCallback);
    $limiter->handle($i, 3)->then($successCallback);
}

$loop->run();

第二个回调负责提供资源ID。它接收与处理程序回调相同的参数。这样,就可以根据给定的数据解析资源ID。

可以通过设置溢出限制来限制每个资源等待执行的作业的数量。

$loop = \React\EventLoop\Loop::get();

$limiter = new \Zwirek\Limiter\MutationLimiter(
    function($counter, $resource) use ($loop) {
        $deferred = new \React\Promise\Deferred();

        echo 'execute counter ', $counter, ' for resource ', $resource, PHP_EOL;

        $loop->addTimer(1, function() use ($deferred, $counter) {
            $deferred->resolve('return ' . $counter . PHP_EOL);
        });

        return $deferred->promise();
    },
    function($counter, $resource) {
        return $resource;
    },
    4
);

$successCallback = function ($resolve) {
    echo $resolve;
};
$failureCallback = function (OverflowException $exception) {
    echo $exception->getMessage(), PHP_EOL;
};

for ($i = 1; $i <= 5; $i++) {
    $limiter->handle($i, 1)->then($successCallback, $failureCallback);
    $limiter->handle($i, 2)->then($successCallback, $failureCallback);
    $limiter->handle($i, 3)->then($successCallback, $failureCallback);
}

$loop->run();

限制器接口

每个限制器类都实现了 \Zwirek\Limiter\Limiter 接口。限制器接口只有一个公共方法。

\Zwirek\Limiter\Limiter::handle(mixed ...$arguments): \React\Promise\Promise

处理程序可以带有零个或多个参数。重要的是用与注册的处理程序回调相同的参数数量调用 ::handle