Swoole 连接池

v1.0.4 2021-10-12 04:37 UTC

This package is auto-updated.

Last update: 2024-09-12 10:45:19 UTC


README

基于 Swoole 的带有均衡器功能实现的连接池

参数

  • int maxActive (默认: 2) - 同时可以从此池分配的最大活动连接数
  • int minActive (默认: 0) - 应始终保持池中的最小建立连接数
  • float maxWaitTime (默认: 5.0) - 池将等待的最大秒数(当没有可用连接时)才抛出异常,返回连接。零值(0.0)将禁用等待超时。
  • float validationInterval (默认: 5.0) - 在空闲连接验证/清洁定时器运行之间休眠的秒数。此值不应低于 1 秒。零值将禁用验证连接定时器。
  • int maxIdleTime (默认: 60) - 连接在池中空闲的最短时间(秒),在此时间内它才有资格被关闭。零值将禁用空闲连接释放。
  • int maxLifeTime (默认: 0) - 连接在池中存在的最大时间(秒),在此时间内它才有资格被关闭。零值将禁用过期连接释放。
  • bool testOnBorrow (默认: true) - 在从池中借用对象之前对其进行验证的指示。如果对象验证失败,则将其从池中移除,并尝试借用另一个。
  • bool testOnReturn (默认: true) - 在将对象返回到池之前对其进行验证的指示
  • bool resetConnections (默认: false) - 当从池中借用连接时,将连接重置为其初始状态

所有这些参数都可以在运行时更改。

API

  • init - 初始化(启动)连接池
  • close - 关闭(停止)连接池
  • getStats - 获取连接池统计信息
  • pop (默认可见性: protected) - 从池中借用连接。在等待空闲连接超时时可能会抛出 BorrowTimeoutException(请参阅 maxWaitTime 参数文档)。当连接池关闭时可能会抛出 PoolIsClosedException
  • push (默认可见性: protected) - 将连接返回到池中

获取器

  • getIdleCount - 获取空闲连接数
  • getTotalCount - 获取由池创建的连接数
  • getMaxActive - 读取 maxActive 参数文档
  • getMinActive - 读取 minActive 参数文档
  • getMaxWaitTime - 读取 maxWaitTime 参数文档
  • getMaxIdleTime - 读取 maxIdleTime 参数文档
  • getMaxLifeTime - 读取 maxLifeTime 参数文档
  • getValidationInterval - 读取 validationInterval 参数文档
  • getTestOnBorrow - 读取 testOnBorrow 参数文档
  • getTestOnReturn - 读取 testOnReturn 参数文档
  • getResetConnections - 读取 resetConnections 参数文档

设置器

  • setMaxActive - 读取 maxActive 参数文档
  • setMinActive - 读取 minActive 参数文档
  • setMaxWaitTime - 读取 maxWaitTime 参数文档
  • setMaxIdleTime - 读取 maxIdleTime 参数文档
  • setMaxLifeTime - 读取 maxLifeTime 参数文档
  • setValidationInterval - 读取 validationInterval 参数文档
  • setTestOnBorrow - 读取 testOnBorrow 参数文档
  • setTestOnReturn - 读取 testOnReturn 参数文档
  • setResetConnections - 读取 resetConnections 参数文档

完整示例(HTTP连接池)

<?php

declare(strict_types=1);

require_once __DIR__ . '/vendor/autoload.php';

use MakiseCo\Connection\ConnectionConfig;
use MakiseCo\Connection\ConnectionConfigInterface;
use MakiseCo\Connection\ConnectionInterface;
use MakiseCo\Connection\ConnectorInterface;
use MakiseCo\Pool\Exception\BorrowTimeoutException;
use MakiseCo\Pool\Pool;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Http\Client;

use function Swoole\Coroutine\run;

class HttpResponse
{
    public string $content;
    public int $statusCode;

    public function __construct(string $content, int $statusCode)
    {
        $this->content = $content;
        $this->statusCode = $statusCode;
    }
}

interface HttpConnectionInterface extends ConnectionInterface
{
    /**
     * Perform GET request
     *
     * @param string $path
     *
     * @return HttpResponse
     *
     * @throws RuntimeException on connection problems
     */
    public function get(string $path): HttpResponse;
}

class HttpConnectionConfig extends ConnectionConfig
{
    private float $timeout;
    private bool $ssl;

    public function __construct(string $host, int $port, ?bool $ssl = null, float $timeout = 30)
    {
        $this->timeout = $timeout;

        if ($ssl === null) {
            $ssl = $port === 443;
        }

        $this->ssl = $ssl;

        parent::__construct($host, $port, null, null, null);
    }

    public function getTimeout(): float
    {
        return $this->timeout;
    }

    public function getSsl(): bool
    {
        return $this->ssl;
    }

    public function getConnectionString(): string
    {
        return '';
    }
}

class HttpConnection implements HttpConnectionInterface
{
    private Client $client;
    private int $lastUsedAt;
    private bool $isClosed = false;

    public function __construct(Client $client)
    {
        $this->client = $client;
        $this->lastUsedAt = time();
    }

    public function __destruct()
    {
        $this->close();
    }

    public function isAlive(): bool
    {
        return !$this->isClosed;
    }

    public function close(): void
    {
        if ($this->isClosed) {
            return;
        }

        $this->isClosed = true;
        $this->client->close();
    }

    public function getLastUsedAt(): int
    {
        return $this->lastUsedAt;
    }

    public function resetSession(): void
    {
    }

    public static function connect(HttpConnectionConfig $config): self
    {
        $client = new Client($config->getHost(), $config->getPort(), $config->getSsl());
        $client->set(['timeout' => $config->getTimeout()]);

        return new self($client);
    }

    /**
     * @param string $path
     * @return HttpResponse
     *
     * @throws RuntimeException on connection errors
     */
    public function get(string $path): HttpResponse
    {
        $this->lastUsedAt = time();

        $this->client->get($path);

        $code = $this->client->getStatusCode();
        $this->checkStatusCode($code);

        return new HttpResponse(
            $this->client->getBody(),
            $this->client->getStatusCode()
        );
    }

    private function checkStatusCode(int $code): void
    {
        if ($code === SWOOLE_HTTP_CLIENT_ESTATUS_CONNECT_FAILED) {
            throw new RuntimeException('Connection failed');
        }

        if ($code === SWOOLE_HTTP_CLIENT_ESTATUS_REQUEST_TIMEOUT) {
            throw new RuntimeException('Request timeout');
        }

        if ($code === SWOOLE_HTTP_CLIENT_ESTATUS_SERVER_RESET) {
            throw new RuntimeException('Server has closed connection unexpectedly');
        }
    }
}

class HttpConnector implements ConnectorInterface
{
    /**
     * @param HttpConnectionConfig|ConnectionConfigInterface $config
     * @return HttpConnection
     */
    public function connect(ConnectionConfigInterface $config): HttpConnection
    {
        return HttpConnection::connect($config);
    }
}

class HttpPool extends Pool implements HttpConnectionInterface
{
    protected function createDefaultConnector(): HttpConnector
    {
        return new HttpConnector();
    }

    /**
     * {@inheritDoc}
     *
     * @throws BorrowTimeoutException
     */
    public function get(string $path): HttpResponse
    {
        $connection = $this->pop();

        try {
            return $connection->get($path);
        } finally {
            $this->push($connection);
        }
    }
}

run(static function () {
    $httpPool = new HttpPool(new HttpConnectionConfig('google.com', 80));
    $httpPool->setMaxActive(4);
    $httpPool->init();

    $tasks = [
        '/',
        '/help',
        '/search',
        '/test',
        '/query',
        '/images',
        '/videos',
        '/mail',
    ];

    $ch = new Channel();

    $start = microtime(true);

    foreach ($tasks as $task) {
        Coroutine::create(static function (Channel $ch, string $path) use ($httpPool) {
            $result = new class {
                public string $task;
                public ?HttpResponse $success = null;
                public ?Throwable $fail = null;
            };

            $result->task = $path;

            try {
                $result->success = $httpPool->get($path);
                $ch->push($result);
            } catch (Throwable $e) {
                $result->fail = $e;
                $ch->push($result);
            }
        }, $ch, $task);
    }

    $results = [];
    for ($i = 0, $iMax = \count($tasks); $i < $iMax; $i++) {
        $results[] = $ch->pop();
    }

    $end = microtime(true);

    foreach ($results as $result) {
        if ($result->fail !== null) {
            printf("Task: %s failed with: %s\n", $result->task, $result->fail->getMessage());

            continue;
        }

        printf("Task: %s returned %d status code\n", $result->task, $result->success->statusCode);
    }

    printf("\nResults fetched in %.4f secs\n\n", round($end - $start, 4));

    $stats = $httpPool->getStats();

    printf("Connections limit = %d\n", $stats->maxActive);
    printf("Connections count = %d\n", $stats->totalCount);
    printf("Idle connections = %d\n", $stats->idle);
    printf("Busy connections = %d\n", $stats->inUse);

    printf("Total wait time for an available connections = %f secs\n", $stats->waitDuration);
    printf("Total wait count = %d\n", $stats->waitCount);
    printf("Average wait time per one connection = %f secs\n", $stats->waitDuration / $stats->waitCount);

    $httpPool->close();
});

输出为

Task: /test returned 404 status code
Task: / returned 301 status code
Task: /search returned 301 status code
Task: /help returned 404 status code
Task: /query returned 404 status code
Task: /images returned 301 status code
Task: /mail returned 301 status code
Task: /videos returned 404 status code

Results fetched in 0.1483 secs

Connections limit = 4
Connections count = 4
Idle connections = 4
Busy connections = 0
Total wait time for an available connections = 0.380008 secs
Total wait count = 4
Average wait time per one connection = 0.095002 secs