sebk/small-swoole-patterns

此包已被废弃,不再维护。未建议替代包。
此包最新版本(1.1.1)的许可证信息不可用。

1.1.1 2023-01-04 09:34 UTC

This package is auto-updated.

Last update: 2023-02-26 06:17:05 UTC


README

本项目提供了对开源swoole项目的异步设计模式的实现。

迁移

此库已迁移到 framagit 项目。

新composer包可在 https://packagist.org.cn/packages/small/swoole-patterns 获取

未来的提交将在framagit上完成。

此存储库将在几个月后删除。

安装

安装 openswoole

pecl install openswoole

使用composer安装包

composer require sebk/small-swoole-patterns

模式

数组

映射

这是swoole函数 \Coroutine\map 的演变

这是一个映射数组到回调的类。比 \Coroutine\map 更详细,但允许你在代码中稍后等待。

<?php
use Sebk\SmallSwoolePatterns\Array\Map;

// First map
$map1 = (new Map([1, 2, 3], function($value) {
    return $value - 1;
}))->run();

// Second map
$map2 = (new Map([4, 6, 8], function($value) {
    return $value / 2
}))->run();

// Wait two maps finished
$map1->wait();
$map2->wait();

// Merg results and dump result;
$result = array_merge($map1, $map2);
var_dump($result);

观察者

这是在回调上实现的观察者模式。

use Sebk\SmallSwoolePatterns\Observable\Observable;

// Create callback
$get = function ($host): string {
    return (new Coroutine\Http\Client($host, 80, true))
        ->get('/');
};

// Create observer
$getObserver = (new Observable($get))
    ->subscribe(function(string $html) {
        // left method handle result
        echo $html;
    }, function(\Exception $e) {
        // Right method handle exceptions
        echo 'Can\'t get url : ' . $e->getMessage();
    })
;

$getObserver
    ->run('www.google.com')
    ->run('www.yahoo.com')
    ->run('www.qwant.com')
;

$getObserver->wait();

在这个例子中,我们在异步调用中打印了google、yahoo和qwant的首页。

这是池模式的实现。

池对于管理异步进程以及管理到服务器资源的连接非常有用。

创建池

$pool = new \Sebk\SmallSwoolePatterns\Pool\Pool(
    new \Sebk\SmallSwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
);

这里我们创建了

  • A PRedis客户端池(第一个参数)
  • 同时最多10个客户端(第二个参数)。
  • 如果没有更多的客户端可用,池将每100µs尝试锁定一个新客户端(第三个参数)

使用客户端进程

要获取客户端,使用get方法

$client = $pool->get();

你现在已经锁定了客户端,可以使用了

$client->get('my-app:key')

现在我们已经完成使用,我们必须释放客户端

$pool->put($client);

在异步过程中组合起来将是

$pool = new \Sebk\SmallSwoolePatterns\Pool\Pool(
    new \Sebk\SmallSwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
);

(new \Sebk\SmallSwoolePatterns\Array\Map(range(1, 100), ($i) use($pool) => {
    $client = $pool->get();
    $client->put('my-app:sum:' . $i, $i +$i);
    $pool->put($client);
}));

在这个用例中,并发连接数不能超过10个,即使是异步进程。

使用一个客户端会破坏异步优势,而使用客户端将等待之前的结束。

每次使用新客户端都可能超载你的内存和服务器。

速率控制

你可以通过速率控制来控制服务器的限制。

激活速率控制

($pool = new \Sebk\SmallSwoolePatterns\Pool\Pool(
    new \Sebk\SmallSwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
))->activateRateController(100, 10);

在这段代码中,池正在等待,当您获取客户端时不会超过100µs。如果少于这个时间,它将在重试前等待10µs。

使用速率控制来限制服务器

在这个例子中,我们假设您想要连接到提供者的http api。您每分钟的限制是3000个请求。

您可以通过激活单元控制来观察提供者的限制,即使您的代码更快。

($pool = new \Sebk\SmallSwoolePatterns\Pool\Pool(
    new \Sebk\SmallSwoolePatterns\Manager\Connection\HttpClientManager('api.my-provider.net'),
    10,
    100
))->activateRateController(100, 10)
    ->addUnitToControl('minutes', 60, \Sebk\SmallSwoolePatterns\Pool\Enum\RateBehaviour::waiting, 3000);

(new \Sebk\SmallSwoolePatterns\Array\Map(range(1, 100), ($productId) use($pool) => {
    $client = $pool->get();
    $uri = 'getProduct/{productId}'
    $product = json_decode($client->get(str_replace('{productId}', $productId, $uri)));
    $pool->put($client);
    
    return $product;
}))->run()->wait();

资源

您可以使用资源模式来管理资源访问。

$factory = new \Sebk\SmallSwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Sebk\SmallSwoolePatterns\Resource\Enum\GetResourceBehaviour::exceptionIfNotFree);
$resource->releaseResource($ticket);

在异步过程中,您可以等待其他进程解锁资源。

$factory = new \Sebk\SmallSwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Sebk\SmallSwoolePatterns\Resource\Enum\waitingForFree);
$resource->releaseResource($ticket);

或者自行管理等待过程。

$factory = new \Sebk\SmallSwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Sebk\SmallSwoolePatterns\Resource\Enum\GetResourceBehaviour::getTicket);
while ($ticket->isWaiting()) {
    doStuff();
    usleep(100);
}
doResourceStuff();
$resource->releaseResource($ticket);