sebk/ small-swoole-patterns
此包已被废弃,不再维护。未建议替代包。
此包最新版本(1.1.1)的许可证信息不可用。
1.1.1
2023-01-04 09:34 UTC
Requires
- php: >=8.1
Requires (Dev)
- phpunit/phpunit: 9.*
- predis/predis: 2.*
Suggests
- ext-openswoole: *
- ext-swoole: *
- predis/predis: 2.*
README
本项目提供了对开源swoole项目的异步设计模式的实现。
迁移
此库已迁移到 framagit 项目。
新composer包可在 https://packagist.org.cn/packages/small/swoole-patterns 获取
未来的提交将在framagit上完成。
此存储库将在几个月后删除。
安装
安装 openswoole
pecl install openswoole
使用composer安装包
composer require sebk/small-swoole-patterns
模式
数组
映射
这是swoole函数 \Coroutine\map 的演变
这是一个映射数组到回调的类。比 \Coroutine\map 更详细,但允许你在代码中稍后等待。
<?php use Sebk\SmallSwoolePatterns\Array\Map; // First map $map1 = (new Map([1, 2, 3], function($value) { return $value - 1; }))->run(); // Second map $map2 = (new Map([4, 6, 8], function($value) { return $value / 2 }))->run(); // Wait two maps finished $map1->wait(); $map2->wait(); // Merg results and dump result; $result = array_merge($map1, $map2); var_dump($result);
观察者
这是在回调上实现的观察者模式。
use Sebk\SmallSwoolePatterns\Observable\Observable; // Create callback $get = function ($host): string { return (new Coroutine\Http\Client($host, 80, true)) ->get('/'); }; // Create observer $getObserver = (new Observable($get)) ->subscribe(function(string $html) { // left method handle result echo $html; }, function(\Exception $e) { // Right method handle exceptions echo 'Can\'t get url : ' . $e->getMessage(); }) ; $getObserver ->run('www.google.com') ->run('www.yahoo.com') ->run('www.qwant.com') ; $getObserver->wait();
在这个例子中,我们在异步调用中打印了google、yahoo和qwant的首页。
池
这是池模式的实现。
池对于管理异步进程以及管理到服务器资源的连接非常有用。
创建池
$pool = new \Sebk\SmallSwoolePatterns\Pool\Pool( new \Sebk\SmallSwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'), 10, 100 );
这里我们创建了
- A PRedis客户端池(第一个参数)
- 同时最多10个客户端(第二个参数)。
- 如果没有更多的客户端可用,池将每100µs尝试锁定一个新客户端(第三个参数)
使用客户端进程
要获取客户端,使用get方法
$client = $pool->get();
你现在已经锁定了客户端,可以使用了
$client->get('my-app:key')
现在我们已经完成使用,我们必须释放客户端
$pool->put($client);
在异步过程中组合起来将是
$pool = new \Sebk\SmallSwoolePatterns\Pool\Pool( new \Sebk\SmallSwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'), 10, 100 ); (new \Sebk\SmallSwoolePatterns\Array\Map(range(1, 100), ($i) use($pool) => { $client = $pool->get(); $client->put('my-app:sum:' . $i, $i +$i); $pool->put($client); }));
在这个用例中,并发连接数不能超过10个,即使是异步进程。
使用一个客户端会破坏异步优势,而使用客户端将等待之前的结束。
每次使用新客户端都可能超载你的内存和服务器。
速率控制
你可以通过速率控制来控制服务器的限制。
激活速率控制
($pool = new \Sebk\SmallSwoolePatterns\Pool\Pool( new \Sebk\SmallSwoolePatterns\Manager\Connection\PRedisClientManager('tcp://my-redis.net'), 10, 100 ))->activateRateController(100, 10);
在这段代码中,池正在等待,当您获取客户端时不会超过100µs。如果少于这个时间,它将在重试前等待10µs。
使用速率控制来限制服务器
在这个例子中,我们假设您想要连接到提供者的http api。您每分钟的限制是3000个请求。
您可以通过激活单元控制来观察提供者的限制,即使您的代码更快。
($pool = new \Sebk\SmallSwoolePatterns\Pool\Pool( new \Sebk\SmallSwoolePatterns\Manager\Connection\HttpClientManager('api.my-provider.net'), 10, 100 ))->activateRateController(100, 10) ->addUnitToControl('minutes', 60, \Sebk\SmallSwoolePatterns\Pool\Enum\RateBehaviour::waiting, 3000); (new \Sebk\SmallSwoolePatterns\Array\Map(range(1, 100), ($productId) use($pool) => { $client = $pool->get(); $uri = 'getProduct/{productId}' $product = json_decode($client->get(str_replace('{productId}', $productId, $uri))); $pool->put($client); return $product; }))->run()->wait();
资源
您可以使用资源模式来管理资源访问。
$factory = new \Sebk\SmallSwoolePatterns\Resource\ResourceFactory(); $resource = $factory->get('testResource1'); $ticket = $resource->acquireResource(\Sebk\SmallSwoolePatterns\Resource\Enum\GetResourceBehaviour::exceptionIfNotFree); $resource->releaseResource($ticket);
在异步过程中,您可以等待其他进程解锁资源。
$factory = new \Sebk\SmallSwoolePatterns\Resource\ResourceFactory(); $resource = $factory->get('testResource1'); $ticket = $resource->acquireResource(\Sebk\SmallSwoolePatterns\Resource\Enum\waitingForFree); $resource->releaseResource($ticket);
或者自行管理等待过程。
$factory = new \Sebk\SmallSwoolePatterns\Resource\ResourceFactory(); $resource = $factory->get('testResource1'); $ticket = $resource->acquireResource(\Sebk\SmallSwoolePatterns\Resource\Enum\GetResourceBehaviour::getTicket); while ($ticket->isWaiting()) { doStuff(); usleep(100); } doResourceStuff(); $resource->releaseResource($ticket);