small/swoole-patterns

该项目为PHP OpenSwoole项目提供异步设计模式的实现。

22.6.0 2024-08-12 23:29 UTC

README

small-swoole-patterns

            

关于

该项目为PHP OpenSwoole项目提供异步设计模式的实现。

安装

安装 openswoole

pecl install openswoole

使用composer安装包

composer require small/swoole-patterns

模式

数组

映射

这是swoole函数 \Coroutine\map 的一种进化。

它是一个映射数组到回调的类。比 \Coroutine\map 更详细,但允许你在代码中稍后等待。

<?php
use Small\SwoolePatterns\Array\Map;

// First map
$map1 = (new Map([1, 2, 3], function($value) {
    return $value - 1;
}))->run();

// Second map
$map2 = (new Map([4, 6, 8], function($value) {
    return $value / 2
}))->run();

// Wait two maps finished
$map1->wait();
$map2->wait();

// Merg results and dump result;
$result = array_merge($map1, $map2);
var_dump($result);

栈设计模式旨在以填充顺序访问数据。有两种类型的栈:先进先出(FIFO)和后进先出(LIFO)。

以下是一个FIFO栈的示例

use Small\SwoolePatterns\Array\Stack;
use Small\SwoolePatterns\Array\Enum\StackType;

$stack = new Stack([], StackType::fifo);

$stack->push(1)
    ->push(2)
    ->push(3)
;
foreach (range(1, 3) as $i) {
    echo $stack->pull() . "\n";
}

将输出

1
2
3

对于LIFO,顺序将被颠倒

use Small\SwoolePatterns\Array\Stack;
use Small\SwoolePatterns\Array\Enum\StackType;

$stack = new Stack([], StackType::lifo);

$stack->push(1)
->push(2)
->push(3)
;
foreach (range(1, 3) as $i) {
    echo $stack->pull() . "\n";
}

将输出

3
2
1

状态

简单使用

此实现的状态设计模式最简单的使用是一个代表系统中一组状态的类,其中包含一个注册表来管理它们。

use Small\SwoolePatterns\State\StateRegistry;
use Small\SwoolePatterns\State\State;

enum ButtonType {
    case pressed;
    case released;
}

$registry = (new StateRegistry());
    ->addState('buttonOk', State::class)
;

$registry->getState('buttonOk')
    ->set(ButtonType::pressed)
;

echo $registry->getState('buttonOk')->get()->name;

将产生

pressed

编码状态

你可以通过实现 StateEncoderInterface 添加一个编码器,如 json_encode 或其他。

use Small\SwoolePatterns\State\StateRegistry;
use Small\SwoolePatterns\State\Contract\StateEncoderInterface;
use Small\SwoolePatterns\State\State;

class JsonState extends State implements StateEncoderInterface
{
    
    public function encodeState(mixed $state): mixed
    {

        return json_encode($state);

    }

    public function decodeState(mixed $encodedState): mixed
    {

        return json_decode($encodedState);

    }
    
    public function getRawState(): string
    {
    
        return $this->state;
    
    }

}

$registry = (new StateRegistry());
    ->addState('array', JsonState::class)
;

$registry->getState('array')->set(['pressed' => true, 'force' => 12.8]);

echo $registry->getState('array')->get()['force'];

将产生

12.8

但调用 getRawState 方法将产生

{"pressed": true, "force": 12.8}

状态改变时执行操作

实现 OnChangeStateInterface 允许你根据状态变化执行操作。

例如,更改另一个状态

use Small\SwoolePatterns\State\StateRegistry;
use Small\SwoolePatterns\State\State;
use Small\SwoolePatterns\State\Contract\OnChangeStateInterface;

enum ActionStateType: int
{

    case first = 1;
    case second = 2;
    case third = 3;

}

class ActionOnState extends State implements OnChangeStateInterface
{

    public function __construct(protected State $triggeredState) {}

    public function set(mixed $state): self {

        if (!$state instanceof ActionStateType) {
            throw new \Exception('Wrong type');
        }

        parent::set($state);

        return $this;

    }

    public function onChange(mixed $stateBefore, mixed $stateAfter): void
    {

        if ($stateBefore < $stateAfter) {
            $this->triggeredState->set('upgrade');
        } else {
            $this->triggeredState->set('downgrade');
        }

    }

}

($registry = new StateRegistry())
->addState(
    'trigger',
    State::class)
->addState(
    'iterator',
    ActionOnState::class,
    $registry->getState('trigger')
);

$registry->getState('iterator')
    ->set(ActionStateType::first);
echo $registry->getState('trigger') . '\n';

$registry->getState('iterator')
    ->set(ActionStateType::third);
echo $registry->getState('trigger') . '\n';

$registry->getState('iterator')
    ->set(ActionStateType::second);
echo $registry->getState('trigger') . '\n';

将产生

upgrade
upgrade
downgrade

观察者

这是在回调上实现观察者模式。

use Small\SwoolePatterns\Observable\Observable;

// Create callback
$get = function ($host): string {
    return (new Coroutine\Http\Client($host, 80, true))
        ->get('/');
};

// Create observer
$getObserver = (new Observable($get))
    ->subscribe(function(string $html) {
        // left method handle result
        echo $html;
    }, function(\Exception $e) {
        // Right method handle exceptions
        echo 'Can\'t get url : ' . $e->getMessage();
    })
;

$getObserver
    ->run('www.google.com')
    ->run('www.yahoo.com')
    ->run('www.qwant.com')
;

$getObserver->wait();

在这个例子中,我们在异步调用中打印了google、yahoo和qwant的主页。

异步任务

异步任务是指运行异步的函数或方法。

在这个实现中,一个 TaskCollection 类将运行一系列任务,一个 join 方法将等待所有任务完成。

任务可以包含

  • 一个回调
  • 一个静态方法
  • 一个扩展基本 Task 的任务类

例如

use Small\SwoolePatterns\AsyncTask\Task;
use Small\SwoolePatterns\AsyncTask\TaskCollection;

class MaTask extends Task
{

    public function __construct()
    {
        parent::__construct([$this, 'tache']);
    }

    public static function tache(): string
    {

        return 'tester';

    }

}

(new TaskCollection([
    $impots = new Task(fn($url) => $url . '/test', $urlImpots = 'impots.gouv.fr'),
    $yahoo = new Task([static::class, 'getNull']),
    $google = new Task([static::class, 'getUrl'], $urlGoogle = 'google.com'),
    $maTask = new MaTask(),
]))->join();

echo $impots->getResult() . '\n';
echo $maTask->getResult() . '\n';

将输出

impots.gouv.fr/test
tester

这是池模式的实现。

池对于管理异步进程以管理服务器资源中的连接非常有用。

创建池

$pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
);

这里我们创建了一个

  • 一个 PRedis 客户端池(第一个参数)
  • 同时最多有10个客户端(第二个参数)。
  • 如果没有更多客户端可用,池将尝试每100µs锁定一个新客户端(第三个参数)

使用客户端进程

要获取客户端,请使用 get 方法

$client = $pool->get();

你现在已锁定客户端并可以使用它

$client->get('my-app:key')

现在我们完成了使用,我们必须释放客户端

$pool->put($client);

在异步进程中组合起来将是

$pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
);

(new \Small\SwoolePatterns\Array\Map(range(1, 100), ($i) use($pool) => {
    $client = $pool->get();
    $client->put('my-app:sum:' . $i, $i +$i);
    $pool->put($client);
}));

在这个用例中,即使进程是异步的,并发连接的数量也不能超过10个连接。

使用一个客户端会破坏异步优势,而使用客户端将等待之前的结束。

每次使用新客户端都可能超载你的内存和服务器。

速率控制

你可以使用速率控制来控制服务器的限制。

激活速率控制

($pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'),
    10,
    100
))->activateRateController(100, 10);

在这个代码中,池正在等待你获取的客户端不超过100µs。如果少于100µs,它将在重试前等待10µs。

用于服务器限制的速率控制

对于这个例子,我们将考虑你想要连接到一个http api提供商。你每分钟的限制是3000个请求。

即使您的代码更快,您也可以激活单元控制以观察供应商的限制

($pool = new \Small\SwoolePatterns\Pool\Pool(
    new \Small\SwoolePatterns\Manager\Connection\HttpClientManager('api.my-provider.net'),
    10,
    100
))->activateRateController(100, 10)
    ->addUnitToControl('minutes', 60, \Small\SwoolePatterns\Pool\Enum\RateBehaviour::waiting, 3000);

(new \Small\SwoolePatterns\Array\Map(range(1, 100), ($productId) use($pool) => {
    $client = $pool->get();
    $uri = 'getProduct/{productId}'
    $product = json_decode($client->get(str_replace('{productId}', $productId, $uri)));
    $pool->put($client);
    
    return $product;
}))->run()->wait();

资源

您可以使用资源模式管理资源访问

$factory = new \Small\SwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Small\SwoolePatterns\Resource\Enum\GetResourceBehaviour::exceptionIfNotFree);
$resource->releaseResource($ticket);

在异步过程中,您可以等待其他进程解锁资源

$factory = new \Small\SwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Small\SwoolePatterns\Resource\Enum\waitingForFree);
$resource->releaseResource($ticket);

或自行管理等待的进程

$factory = new \Small\SwoolePatterns\Resource\ResourceFactory();
$resource = $factory->get('testResource1');
$ticket = $resource->acquireResource(\Small\SwoolePatterns\Resource\Enum\GetResourceBehaviour::getTicket);
while ($ticket->isWaiting()) {
    doStuff();
    usleep(100);
}
doResourceStuff();
$resource->releaseResource($ticket);