repo2/query-reactor

此包已被废弃,不再维护。未建议替代包。

异步和非阻塞MySQL查询执行器。

v1.1 2015-02-08 23:41 UTC

This package is not auto-updated.

Last update: 2020-09-15 19:25:34 UTC


README

Build Status Latest Stable Version Total Downloads License Scrutinizer Code Quality Code Coverage

Query Reactor是一个非阻塞MySQL查询执行器。该框架简单且快速。您只需实现Query或使用GenericQuery

use Psr\Log\NullLogger;
use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger());

$controller = new QueryReactor\Controller\PoolingController([
    'host' => 'localhost',
    'username' => 'root',
    'passwd' => '',
    'dbname' => 'test'
]);

$reactor = new QueryReactor\Reactor($driver, $controller);

$expression = QueryBuilder\select('user', ['id', 'name']);

$query = new QueryReactor\Query\GenericQuery(
    $expression,
    // on fulfill
    function (QueryReactor\Result $result) {
        foreach ($result->traverse() as $row) {
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
        }
    },
    // on error
    function (\Exception $err) {
        throw $err;
    }
);

$reactor->execQuery($query);

$reactor->await();

目录

  1. 安装
  2. 组件
  3. 分片
  4. 限制

安装

使用Composer安装

{
    "require": {
        "repo2/query-reactor": "*"
    }
}

组件

该库需要repo2/query-builder

驱动

Driver提供与底层DB API的集成。API必须支持非阻塞查询执行。

当前框架仅实现了mysqli驱动。

无Reactor和Controller的驱动使用

use Psr\Log\NullLogger;
use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger());

$expression = QueryBuilder\select('user', ['id', 'name']);

$link = $driver->connect(
    ['host' => 'localhost', 'dbname' => 'test'],
    'root',
    'some_secret_passwd'
);

$driver->query($link, $expression);

do {
    list($read, $error) = $driver->poll([$link]);
    foreach ($error as $link) {
        throw $driver->error($link);
    }
    foreach ($read as $link) {
        $result = $driver->read($link);
        foreach ($result->traverse() as $row) {
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
        }
    }
} while (!$read && !$error);

控制器

Controller提供驱动连接和查询执行的协调。

框架包括PoolingController。控制器提供了连接池和查询队列的基本逻辑。

查询

Query提供查询定义和结果处理。

getExpression

此方法返回查询表达式。

function Query::getExpression()

返回ExpressionInterface

resolve

此方法处理查询结果,并可创建子查询。

function Query::resolve(QueryReactor\Result $result)

返回\IteratorQuerynull

GenericQuery的示例

use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$query = new QueryReactor\Query\GenericQuery(
    // select all users
    QueryBuilder\select('user', ['id', 'name']),
    // on fulfill
    function (QueryReactor\Result $result) {
        foreach ($result->traverse() as $row) {
            // output a user
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
            yield new QueryReactor\Query\GenericQuery(
                // update account amount by random value
                QueryBuilder\update('account', ['amount' => mt_rand(10, 100)])
                ->where(
                    QueryBuilder\equal('user_id', $row['id'])
                )
            )
        }
    }
);

$reactor = new QueryReactor\Reactor($driver, $controller);
$reactor->execQuery($query);
$reactor->await();

reject

此方法处理查询错误。

function Query::reject(\Exception $error)

返回void

分片

框架支持通过ShardingController进行分片。

要开始分片,请执行以下3个简单步骤

  1. 实现ShardedQuery

    use Repo2\QueryBuilder;
    use Repo2\QueryReactor;
    
    class UserQuery implements QueryReactor\Query, QueryReactor\Sharding\ShardedQuery
    {
        public static $table = 'user';
    
        public $id;
    
        public function resolve(QueryReactor\Result $result)
        {
            foreach ($result->traverse() as $row) {
                echo $row['id'], ' -> ', $row['name'], PHP_EOL;
            }
        }
    
        public function reject(\Exception $err)
        {
            throw $err;
        }
    
        public function getExpression()
        {
            return QueryBuilder\select(self::$table, ['id', 'name'])
            ->where(QueryBuilder\equal('id', $this->id));
        }
    
        public function getDistributionName()
        {
            return self::$table;
        }
    
        public function getDistributionId()
        {
            return $this->id;
        }
    }
  2. 创建自己的 ShardingService

    use Repo2\QueryReactor;
    
    class SimpleShardingService implements QueryReactor\Sharding\ShardingService
    {
        public static $primary = [
            'host' => 'localhost',
            'username' => 'root',
            'passwd' => '',
            'dbname' => 'test'
        ];
    
        public static $shards = [
            'user' => [
                ['id' => 1, 'dbname' => 'test1'],
                ['id' => 2, 'dbname' => 'test2'],
                ['id' => 3, 'dbname' => 'test3']
            ]
        ];
    
        public function selectGlobal()
        {
            return self::$primary;
        }
    
        public function selectShard($distributionName, $distributionValue)
        {
            $shards = self::$shards[$distributionName];
            $shard = $shards[$distributionValue % count($shards)];
            return $shard + self::$primary;
        }
    }
  3. 初始化控制器

    use Repo2\QueryReactor;
    
    $controller = new QueryReactor\Sharding\ShardingController(
        new SimpleShardingService(),
        QueryReactor\Controller\PoolingController::class
    );
    
    $reactor = new QueryReactor\Reactor($driver, $controller);
    $reactor->execQuery(new UserQuery($userId));
    $reactor->await();

限制条件

该框架有一些限制

  • 没有预编译语句。
  • 结果中没有“最后插入ID”。

来源:https://github.com/Repo2/query-reactor