makise-co / pool
Swoole 连接池
v1.0.4
2021-10-12 04:37 UTC
Requires
- php: >=7.4
- ext-swoole: >=4.4
- makise-co/connection: ^1.0
- makise-co/ev-primitives: ^1.0
Requires (Dev)
- phpstan/phpstan: ^0.12.38
- phpunit/phpunit: ^9.3
- swoole/ide-helper: ^4.5
- symfony/var-dumper: ^5.0
README
基于 Swoole 的带有均衡器功能实现的连接池
参数
- int
maxActive
(默认: 2) - 同时可以从此池分配的最大活动连接数 - int
minActive
(默认: 0) - 应始终保持池中的最小建立连接数 - float
maxWaitTime
(默认: 5.0) - 池将等待的最大秒数(当没有可用连接时)才抛出异常,返回连接。零值(0.0)将禁用等待超时。 - float
validationInterval
(默认: 5.0) - 在空闲连接验证/清洁定时器运行之间休眠的秒数。此值不应低于 1 秒。零值将禁用验证连接定时器。 - int
maxIdleTime
(默认: 60) - 连接在池中空闲的最短时间(秒),在此时间内它才有资格被关闭。零值将禁用空闲连接释放。 - int
maxLifeTime
(默认: 0) - 连接在池中存在的最大时间(秒),在此时间内它才有资格被关闭。零值将禁用过期连接释放。 - bool
testOnBorrow
(默认: true) - 在从池中借用对象之前对其进行验证的指示。如果对象验证失败,则将其从池中移除,并尝试借用另一个。 - bool
testOnReturn
(默认: true) - 在将对象返回到池之前对其进行验证的指示 - bool
resetConnections
(默认: false) - 当从池中借用连接时,将连接重置为其初始状态
所有这些参数都可以在运行时更改。
API
init
- 初始化(启动)连接池close
- 关闭(停止)连接池getStats
- 获取连接池统计信息pop
(默认可见性:protected
) - 从池中借用连接。在等待空闲连接超时时可能会抛出BorrowTimeoutException
(请参阅maxWaitTime
参数文档)。当连接池关闭时可能会抛出PoolIsClosedException
。push
(默认可见性:protected
) - 将连接返回到池中
获取器
getIdleCount
- 获取空闲连接数getTotalCount
- 获取由池创建的连接数getMaxActive
- 读取maxActive
参数文档getMinActive
- 读取minActive
参数文档getMaxWaitTime
- 读取maxWaitTime
参数文档getMaxIdleTime
- 读取maxIdleTime
参数文档getMaxLifeTime
- 读取maxLifeTime
参数文档getValidationInterval
- 读取validationInterval
参数文档getTestOnBorrow
- 读取testOnBorrow
参数文档getTestOnReturn
- 读取testOnReturn
参数文档getResetConnections
- 读取resetConnections
参数文档
设置器
setMaxActive
- 读取maxActive
参数文档setMinActive
- 读取minActive
参数文档setMaxWaitTime
- 读取maxWaitTime
参数文档setMaxIdleTime
- 读取maxIdleTime
参数文档setMaxLifeTime
- 读取maxLifeTime
参数文档setValidationInterval
- 读取validationInterval
参数文档setTestOnBorrow
- 读取testOnBorrow
参数文档setTestOnReturn
- 读取testOnReturn
参数文档setResetConnections
- 读取resetConnections
参数文档
完整示例(HTTP连接池)
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use MakiseCo\Connection\ConnectionConfig; use MakiseCo\Connection\ConnectionConfigInterface; use MakiseCo\Connection\ConnectionInterface; use MakiseCo\Connection\ConnectorInterface; use MakiseCo\Pool\Exception\BorrowTimeoutException; use MakiseCo\Pool\Pool; use Swoole\Coroutine; use Swoole\Coroutine\Channel; use Swoole\Coroutine\Http\Client; use function Swoole\Coroutine\run; class HttpResponse { public string $content; public int $statusCode; public function __construct(string $content, int $statusCode) { $this->content = $content; $this->statusCode = $statusCode; } } interface HttpConnectionInterface extends ConnectionInterface { /** * Perform GET request * * @param string $path * * @return HttpResponse * * @throws RuntimeException on connection problems */ public function get(string $path): HttpResponse; } class HttpConnectionConfig extends ConnectionConfig { private float $timeout; private bool $ssl; public function __construct(string $host, int $port, ?bool $ssl = null, float $timeout = 30) { $this->timeout = $timeout; if ($ssl === null) { $ssl = $port === 443; } $this->ssl = $ssl; parent::__construct($host, $port, null, null, null); } public function getTimeout(): float { return $this->timeout; } public function getSsl(): bool { return $this->ssl; } public function getConnectionString(): string { return ''; } } class HttpConnection implements HttpConnectionInterface { private Client $client; private int $lastUsedAt; private bool $isClosed = false; public function __construct(Client $client) { $this->client = $client; $this->lastUsedAt = time(); } public function __destruct() { $this->close(); } public function isAlive(): bool { return !$this->isClosed; } public function close(): void { if ($this->isClosed) { return; } $this->isClosed = true; $this->client->close(); } public function getLastUsedAt(): int { return $this->lastUsedAt; } public function resetSession(): void { } public static function connect(HttpConnectionConfig $config): self { $client = new Client($config->getHost(), $config->getPort(), $config->getSsl()); $client->set(['timeout' => $config->getTimeout()]); return new self($client); } /** * @param string $path * @return HttpResponse * * @throws RuntimeException on connection errors */ public function get(string $path): HttpResponse { $this->lastUsedAt = time(); $this->client->get($path); $code = $this->client->getStatusCode(); $this->checkStatusCode($code); return new HttpResponse( $this->client->getBody(), $this->client->getStatusCode() ); } private function checkStatusCode(int $code): void { if ($code === SWOOLE_HTTP_CLIENT_ESTATUS_CONNECT_FAILED) { throw new RuntimeException('Connection failed'); } if ($code === SWOOLE_HTTP_CLIENT_ESTATUS_REQUEST_TIMEOUT) { throw new RuntimeException('Request timeout'); } if ($code === SWOOLE_HTTP_CLIENT_ESTATUS_SERVER_RESET) { throw new RuntimeException('Server has closed connection unexpectedly'); } } } class HttpConnector implements ConnectorInterface { /** * @param HttpConnectionConfig|ConnectionConfigInterface $config * @return HttpConnection */ public function connect(ConnectionConfigInterface $config): HttpConnection { return HttpConnection::connect($config); } } class HttpPool extends Pool implements HttpConnectionInterface { protected function createDefaultConnector(): HttpConnector { return new HttpConnector(); } /** * {@inheritDoc} * * @throws BorrowTimeoutException */ public function get(string $path): HttpResponse { $connection = $this->pop(); try { return $connection->get($path); } finally { $this->push($connection); } } } run(static function () { $httpPool = new HttpPool(new HttpConnectionConfig('google.com', 80)); $httpPool->setMaxActive(4); $httpPool->init(); $tasks = [ '/', '/help', '/search', '/test', '/query', '/images', '/videos', '/mail', ]; $ch = new Channel(); $start = microtime(true); foreach ($tasks as $task) { Coroutine::create(static function (Channel $ch, string $path) use ($httpPool) { $result = new class { public string $task; public ?HttpResponse $success = null; public ?Throwable $fail = null; }; $result->task = $path; try { $result->success = $httpPool->get($path); $ch->push($result); } catch (Throwable $e) { $result->fail = $e; $ch->push($result); } }, $ch, $task); } $results = []; for ($i = 0, $iMax = \count($tasks); $i < $iMax; $i++) { $results[] = $ch->pop(); } $end = microtime(true); foreach ($results as $result) { if ($result->fail !== null) { printf("Task: %s failed with: %s\n", $result->task, $result->fail->getMessage()); continue; } printf("Task: %s returned %d status code\n", $result->task, $result->success->statusCode); } printf("\nResults fetched in %.4f secs\n\n", round($end - $start, 4)); $stats = $httpPool->getStats(); printf("Connections limit = %d\n", $stats->maxActive); printf("Connections count = %d\n", $stats->totalCount); printf("Idle connections = %d\n", $stats->idle); printf("Busy connections = %d\n", $stats->inUse); printf("Total wait time for an available connections = %f secs\n", $stats->waitDuration); printf("Total wait count = %d\n", $stats->waitCount); printf("Average wait time per one connection = %f secs\n", $stats->waitDuration / $stats->waitCount); $httpPool->close(); });
输出为
Task: /test returned 404 status code
Task: / returned 301 status code
Task: /search returned 301 status code
Task: /help returned 404 status code
Task: /query returned 404 status code
Task: /images returned 301 status code
Task: /mail returned 301 status code
Task: /videos returned 404 status code
Results fetched in 0.1483 secs
Connections limit = 4
Connections count = 4
Idle connections = 4
Busy connections = 0
Total wait time for an available connections = 0.380008 secs
Total wait count = 4
Average wait time per one connection = 0.095002 secs