repo2 / query-reactor
此包已被废弃,不再维护。未建议替代包。
异步和非阻塞MySQL查询执行器。
v1.1
2015-02-08 23:41 UTC
Requires
- php: >=5.4
- psr/log: *
- repo2/query-builder: *
This package is not auto-updated.
Last update: 2020-09-15 19:25:34 UTC
README
Query Reactor是一个非阻塞MySQL查询执行器。该框架简单且快速。您只需实现Query
或使用GenericQuery
。
use Psr\Log\NullLogger; use Repo2\QueryBuilder; use Repo2\QueryReactor; $driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger()); $controller = new QueryReactor\Controller\PoolingController([ 'host' => 'localhost', 'username' => 'root', 'passwd' => '', 'dbname' => 'test' ]); $reactor = new QueryReactor\Reactor($driver, $controller); $expression = QueryBuilder\select('user', ['id', 'name']); $query = new QueryReactor\Query\GenericQuery( $expression, // on fulfill function (QueryReactor\Result $result) { foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } }, // on error function (\Exception $err) { throw $err; } ); $reactor->execQuery($query); $reactor->await();
目录
安装
使用Composer安装
{ "require": { "repo2/query-reactor": "*" } }
组件
该库需要repo2/query-builder。
驱动
Driver
提供与底层DB API的集成。API必须支持非阻塞查询执行。
当前框架仅实现了
mysqli
驱动。
无Reactor和Controller的驱动使用
use Psr\Log\NullLogger; use Repo2\QueryBuilder; use Repo2\QueryReactor; $driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger()); $expression = QueryBuilder\select('user', ['id', 'name']); $link = $driver->connect( ['host' => 'localhost', 'dbname' => 'test'], 'root', 'some_secret_passwd' ); $driver->query($link, $expression); do { list($read, $error) = $driver->poll([$link]); foreach ($error as $link) { throw $driver->error($link); } foreach ($read as $link) { $result = $driver->read($link); foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } } } while (!$read && !$error);
控制器
Controller
提供驱动连接和查询执行的协调。
框架包括PoolingController
。控制器提供了连接池和查询队列的基本逻辑。
查询
Query
提供查询定义和结果处理。
getExpression
此方法返回查询表达式。
function Query::getExpression()
resolve
此方法处理查询结果,并可创建子查询。
function Query::resolve(QueryReactor\Result $result)
返回
\Iterator
、Query
或null
GenericQuery的示例
use Repo2\QueryBuilder; use Repo2\QueryReactor; $query = new QueryReactor\Query\GenericQuery( // select all users QueryBuilder\select('user', ['id', 'name']), // on fulfill function (QueryReactor\Result $result) { foreach ($result->traverse() as $row) { // output a user echo $row['id'], ' -> ', $row['name'], PHP_EOL; yield new QueryReactor\Query\GenericQuery( // update account amount by random value QueryBuilder\update('account', ['amount' => mt_rand(10, 100)]) ->where( QueryBuilder\equal('user_id', $row['id']) ) ) } } ); $reactor = new QueryReactor\Reactor($driver, $controller); $reactor->execQuery($query); $reactor->await();
reject
此方法处理查询错误。
function Query::reject(\Exception $error)
返回void
分片
框架支持通过ShardingController
进行分片。
要开始分片,请执行以下3个简单步骤
-
use Repo2\QueryBuilder; use Repo2\QueryReactor; class UserQuery implements QueryReactor\Query, QueryReactor\Sharding\ShardedQuery { public static $table = 'user'; public $id; public function resolve(QueryReactor\Result $result) { foreach ($result->traverse() as $row) { echo $row['id'], ' -> ', $row['name'], PHP_EOL; } } public function reject(\Exception $err) { throw $err; } public function getExpression() { return QueryBuilder\select(self::$table, ['id', 'name']) ->where(QueryBuilder\equal('id', $this->id)); } public function getDistributionName() { return self::$table; } public function getDistributionId() { return $this->id; } }
-
创建自己的
ShardingService
use Repo2\QueryReactor; class SimpleShardingService implements QueryReactor\Sharding\ShardingService { public static $primary = [ 'host' => 'localhost', 'username' => 'root', 'passwd' => '', 'dbname' => 'test' ]; public static $shards = [ 'user' => [ ['id' => 1, 'dbname' => 'test1'], ['id' => 2, 'dbname' => 'test2'], ['id' => 3, 'dbname' => 'test3'] ] ]; public function selectGlobal() { return self::$primary; } public function selectShard($distributionName, $distributionValue) { $shards = self::$shards[$distributionName]; $shard = $shards[$distributionValue % count($shards)]; return $shard + self::$primary; } }
-
初始化控制器
use Repo2\QueryReactor; $controller = new QueryReactor\Sharding\ShardingController( new SimpleShardingService(), QueryReactor\Controller\PoolingController::class ); $reactor = new QueryReactor\Reactor($driver, $controller); $reactor->execQuery(new UserQuery($userId)); $reactor->await();
限制条件
该框架有一些限制
- 没有预编译语句。
- 结果中没有“最后插入ID”。