voryx/pgasync

PHP 的异步响应式 PostgreSQL 驱动程序(非阻塞)

2.0.6 2023-08-22 11:58 UTC

This package is auto-updated.

Last update: 2024-09-22 21:14:24 UTC


README

Build Status

PgAsync

PHP 的异步响应式 PostgreSQL 库(非阻塞)

这是什么

这是一个 PHP 的异步 PostgreSQL 库。查询方法返回可观察者,允许异步逐行处理数据(以及在数据上执行其他 Rx 操作)。请参见 Rx.PHP。网络和事件处理由 ReactPHP 处理。

这是一个纯 PHP 实现(使用它不需要 PostgreSQL 扩展)。

示例 - 简单查询

$client = new PgAsync\Client([
    "host" => "127.0.0.1",
    "port" => "5432",
    "user"     => "matt",
    "database" => "matt"
]);

$client->query('SELECT * FROM channel')->subscribe(
    function ($row) {
        var_dump($row);
    },
    function ($e) {
        echo "Failed.\n";
    },
    function () {
        echo "Complete.\n";
    }
);

示例 - 参数化查询

$client = new PgAsync\Client([
     "host" => "127.0.0.1",
     "port" => "5432",
     "user"     => "matt",
     "database" => "matt",
     "auto_disconnect" => true //This option will force the client to disconnect as soon as it completes.  The connection will not be returned to the connection pool.

]);

$client->executeStatement('SELECT * FROM channel WHERE id = $1', ['5'])
    ->subscribe(
        function ($row) {
            var_dump($row);
        },
        function ($e) {
            echo "Failed.\n";
        },
        function () {
            echo "Complete.\n";
        }
    );

示例 - LISTEN/NOTIFY

$client = new PgAsync\Client([
     "host" => "127.0.0.1",
     "port" => "5432",
     "user"     => "matt",
     "database" => "matt"
]);

$client->listen('some_channel')
    ->subscribe(function (\PgAsync\Message\NotificationResponse $message) {
        echo $message->getChannelName() . ': ' . $message->getPayload() . "\n";
    });
    
$client->query("NOTIFY some_channel, 'Hello World'")->subscribe();

安装

使用 composer 安装到您的项目中

安装 pgasync: composer require voryx/pgasync

它能做什么

  • 运行查询(CREATE、UPDATE、INSERT、SELECT、DELETE)
  • 排队命令
  • 异步返回结果(使用 Observables - 您将从数据库服务器逐行获取数据)
  • 预处理语句(作为参数化查询)
  • 连接池(基本池)

它目前还不能做什么

  • 事务(实际上,只需获取一个连接,您就可以在该单个连接上运行事务)

接下来是什么

  • 添加更多测试
  • 事务
  • 征服世界

请注意

这是一个异步库。如果您开始 3 个查询(订阅它们的可观察者)

$client->query("SELECT * FROM table1")->subscribe(...);
$client->query("SELECT * FROM table2")->subscribe(...);
$client->query("SELECT * FROM table3")->subscribe(...);

它们几乎会同时开始(您将在完成之前开始接收所有 3 个查询的结果)。如果您想同时运行 3 个查询,这将非常棒,但如果您有一些需要由其他语句修改的信息的查询,这可能会引起竞争条件

$client->query("INSERT INTO invoices(inv_no, customer_id, amount) VALUES('1234A', 1, 35.75)")->subscribe(...);
$client->query("SELECT SUM(amount) AS balance FROM invoices WHERE customer_id = 1")->subscribe(...);

在上面的情况下,您的余额可能包含也可能不包含第一行插入的发票。

您可以通过使用 Rx concat* 操作符来仅在第一个可观察者完成后启动第二个可观察者来避免这种情况

$insert = $client->query("INSERT INTO invoices(inv_no, customer_id, amount) VALUES('1234A', 1, 35.75)");
$select = $client->query("SELECT SUM(amount) AS balance FROM invoices WHERE customer_id = 1");

$insert
    ->concat($select)
    ->subscribe(...);

测试

我们使用 docker 来运行用于测试的 postgresql 实例。要本地运行,只需安装 docker 并从项目根目录运行以下命令

docker-compose -f docker/docker-compose.yml up -d

如果您需要重置数据库,只需停止 docker 实例并删除 docker/database 目录。使用上述命令重新启动 docker,它将重新初始化数据库。

测试不会更改数据库的最终结构,因此您通常不需要这样做。