eltaline / bulk
PDO 批量库
README
PDO 批量库 PDO 批量库
用于轻松使用批量/批量插入的工具,支持 ON CONFLICT/ON DUPLICATE KEY/DO NOTHING/INSERT IGNORE, RETURNING SQL 运算符和简单的数学/连接逻辑。
包含与数据库交互的辅助类。
目前支持 PDO Postgresql/MySQL。
安装
通过 composer 安装
cd /path/to/yourproject
composer require eltaline/bulk
或通过本地文件安装
cd /path/to/yourproject
wget -O composer.json https://raw.githubusercontent.com/eltaline/bulk/master/composer-local.json
composer install
需求
此库需要 PHP 7.1 或更高版本。
包概述
此包包含几个辅助函数
PgSQL: PSLIns
, PSLInsNth
, PSLInsUpd
, PSLDel
. MySQL: MSLIns
, MSLInsNth
, MSLInsUpd
, MSLDel
.
这些类基于 PDO
构建,通过在每个查询中执行多个(批量)操作,使用此 API 加速数据库行插入和删除。
测试表方案
在 PostgreSQL/MySQL 中创建测试表。
CREATE TABLE tablename (id integer NOT NULL, name varchar(128), class varchar(128), age integer, height integer, weight integer, PRIMARY KEY(id,name));
重要函数描述
刷新函数非常重要,需要在与队列操作的工作结束时使用。需要写入最后一个队列缓冲区并重置队列缓冲区。
$ins->flush(); // Write last or < queue size, part of data from queue.
队列函数 queue() 或 queuearray() 只能在单个循环中单独使用。
$ins->queue(); // Collect data values until queue size, then write part of data. $ins->queuearray(); // Collect data values(from simple array with single query values of data) until queue size, then write part of data.
可选函数描述
可以在逻辑中使用可选计数器。
$tot = $ins->getTotalOperations(); $aff = $ins->getAffectedRows();
缓冲区和计数器的重置函数,重置队列缓冲区和计数器。
$ins->reset(); // Resetting only counters $ins->resetbuf(); // Resetting only buffer $ins->resetall(); // Resetting buffer and counters
PSLIns & MSLIns
此类利用批量插入到空/临时表中。
- 实现 INSERT ...
- 支持的 RETURNING 运算符
要使用它,请使用以下方式创建 PSLIns
或 MSLIns
实例
- 您的
PDO
连接对象 - 每个批量查询要执行的插入数
- 您的表名
- 要插入的列名
- 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
- 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
- 可选设置 RETURNING 字段或字段或 '*'
- 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)
PSLInsNth & MSLInsNth
此类利用批量插入,避免表中重复键错误。
- 实现 ON CONFLICT DO NOTHING 和 INSERT IGNORE
- 支持的 RETURNING 运算符
要使用它,请使用以下方式创建 PSLInsNth
或 MSLInsNth
实例
- 您的
PDO
连接对象 - 每个批量查询要执行的插入数
- 您的表名
- 要插入的列名
- 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
- 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
- 可选设置 RETURNING 字段或字段或 '*'
- 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)
PSLInsUpd & MSLInsUpd
此类利用批量插入中的简单数学逻辑。
- 实现 ON CONFLICT (唯一/复合键) DO UPDATE SET ... 和 ON DUPLICATE KEY UPDATE ...
- 支持的 + - / * 数学运算符(列+列,列-列,列/列,列*列)
- 支持的 | 连接运算符(列|列|;),其中 ; 是分隔符
- 支持的 RETURNING 运算符
要使用它,请使用以下方式创建 PSLInsUpd
或 MSLInsUpd
实例
- 您的
PDO
连接对象 - 每个批量查询要执行的插入数
- 您的表名
- 要插入的列名
- 表的主键列名(唯一/复合)
- 更新列的列名或列名格式为 column+column 的列名(更新和值添加)
- 可选设置 RETURNING 字段或字段或 '*'
- 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)
PSLDel & MSLDel
此类利用按列值或列值的批量删除。
- 实现 DELETE ...
- 支持的 RETURNING 运算符
要使用它,请使用以下方式创建 PSLDel
或 MSLDel
实例
- 您的
PDO
连接对象 - 每个批量查询要执行的插入数
- 您的表名
- where 子句的列名或列名
- 可选设置空数组(现在不用于 DELETE,但需要为设置 RETURNING 选项留空)
- 可选设置空数组(现在不用于 DELETE,但需要为设置 RETURNING 选项留空)
- 可选设置 RETURNING 字段或字段或 '*'
- 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)
开始
include(__DIR__ . '/vendor/autoload.php'); use PDOBulk\Db\PSLIns; use PDOBulk\Db\PSLInsNth; use PDOBulk\Db\PSLInsUpd; use PDOBulk\Db\PSLDel; use PDOBulk\Db\MSLIns; use PDOBulk\Db\MSLInsNth; use PDOBulk\Db\MSLInsUpd; use PDOBulk\Db\MSLDel; $pdo = new PDO(...); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false); $data = array(); $data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85']; $data[] = ['2', 'Steve', 'Engineer', '36', '190', '95']; $data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];
事务
您还可以使用事务来加速批量查询。
$pdo->beginTransaction(); // Before loop with queue() or queuearray(); $pdo->commit(); // After loop with queue() or queuearray();
此外,使用try-catch(Exception $e)进行事务回滚。
$pdo->beginTransaction(); try { foreach (...) { $ins->queue(...); //$ins->queuearray(...); } } catch (Exception $e) { $pdo->rollBack(); throw $ins; } } try { $ins->flush(); $pdo->commit(); } catch (Exception $e) { $pdo->rollBack(); throw $ins; }
简单操作
支持以下类
PgSQL: PSLIns
、PSLInsNth
、PSLInsUpd
、PSLDel
。MySQL: MSLIns
、PSLInsNth
、MSLInsUpd
、MSLDel
。
- 类PSLInsNth & MSLInsNth实现了ON CONFLICT DO NOTHING & INSERT IGNORE子句。
格式:(连接对象,队列大小,表,[值列])
$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); $ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
格式:(连接对象,队列大小,表,[值列])
$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); $ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
格式:(连接对象,队列大小,表,[值列],[冲突/重复列],[更新列])
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']); $ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
格式:(连接对象,队列大小,表,[值列])
$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); $ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
完整示例
include(__DIR__ . '/vendor/autoload.php'); use PDOBulk\Db\PSLIns; use PDOBulk\Db\PSLInsNth; use PDOBulk\Db\PSLInsUpd; use PDOBulk\Db\PSLDel; use PDOBulk\Db\MSLIns; use PDOBulk\Db\MSLInsNth; use PDOBulk\Db\MSLInsUpd; use PDOBulk\Db\MSLDel; $pdo = new PDO(...); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false); $data = array(); $data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85']; $data[] = ['2', 'Steve', 'Engineer', '36', '190', '95']; $data[] = ['3', 'Clara', 'Sniper', '18', '180', '57']; $ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); //$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); //$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']); //$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); //$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); //$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); //$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']); //$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']); foreach ($data as $fields) { $id = $fields[0]; $name = $fields[1]; $class = $fields[2]; $age = $fields[3]; $height = $fields[4]; $weight = $fields[5]; $ins->queue($id, $name, $class, $age, $height, $weight); //$ins->queuearray($fields); } //Bulk Write Complete Operation $res = $ins->flush(); $tot = $ins->getTotalOperations(); $aff = $ins->getAffectedRows(); //Reset function for counters $ins->reset(); print("Total queue operations: " . $tot . "\n"); print("Total affected rows: " . $aff . "\n");
高级操作
支持以下类
PgSQL: PSLInsUpd
。MySQL: MSLInsUpd
。
- 支持对具有前值的字段进行数学/连接操作。
格式:(连接对象,队列大小,表,[值列],[冲突/重复列],[更新列])
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']); $ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']); $ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight-weight']); $ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight-weight']); $ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight*weight']); $ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight*weight']); $ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight/weight']); $ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight/weight']);
完整示例
include(__DIR__ . '/vendor/autoload.php'); use PDOBulk\Db\PSLInsUpd; use PDOBulk\Db\MSLInsUpd; $pdo = new PDO(...); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false); $data = array(); $data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85']; $data[] = ['2', 'Steve', 'Engineer', '36', '190', '95']; $data[] = ['3', 'Clara', 'Sniper', '18', '180', '57']; $ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']); //$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']); foreach ($data as $fields) { $id = $fields[0]; $name = $fields[1]; $class = $fields[2]; $age = $fields[3]; $height = $fields[4]; $weight = $fields[5]; $ins->queue($id, $name, $class, $age, $height, $weight); //$ins->queuearray($fields); } //Bulk Write Complete Operation $res = $ins->flush(); $tot = $ins->getTotalOperations(); $aff = $ins->getAffectedRows(); //Reset function for counters $ins->reset(); print("Total queue operations: " . $tot . "\n"); print("Total affected rows: " . $aff . "\n");
返回操作
支持以下类
PgSQL: PSLIns
, PSLInsNth
, PSLInsUpd
, PSLDel
. MySQL: MSLIns
, MSLInsNth
, MSLInsUpd
, MSLDel
.
使用支持类的其中一个与RETURNING操作符。
- 最后一个参数是PDO获取模式
- 还可以将高级数学/连接操作与返回操作一起使用
支持的获取模式
PDO::FETCH_BOTH, PDO::FETCH_NUM,
PDO::FETCH_ASSOC, PDO::FETCH_UNIQUE,
PDO::FETCH_GROUP, PDO::FETCH_KEY_PAIR,
PDO::FETCH_GROUP|PDO::FETCH_COLUMN,
PDO::FETCH_OBJ
格式:(连接对象,队列大小,表,[值列],[不使用],[不使用],[返回列],[获取模式])
$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); $ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
格式:(连接对象,队列大小,表,[值列],[不使用],[不使用],[返回列],[获取模式])
$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); $ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
格式:(连接对象,队列大小,表,[值列],[冲突/重复列],[更新列],[返回列],[获取模式])
$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']); $ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
格式:(连接对象,队列大小,表,[值列],[现在不使用],[现在不使用],[返回列],[获取模式])
$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); $ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
完整示例
include(__DIR__ . '/vendor/autoload.php'); use PDOBulk\Db\PSLIns; use PDOBulk\Db\PSLInsNth; use PDOBulk\Db\PSLInsUpd; use PDOBulk\Db\PSLDel; use PDOBulk\Db\MSLIns; use PDOBulk\Db\MSLInsNth; use PDOBulk\Db\MSLInsUpd; use PDOBulk\Db\MSLDel; $pdo = new PDO(...); $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false); $data = array(); $data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85']; $data[] = ['2', 'Steve', 'Engineer', '36', '190', '95']; $data[] = ['3', 'Clara', 'Sniper', '18', '180', '57']; $ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']); //$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']); $part = array(); // Define array for merge $res by RETURNING data of column or columns. foreach ($data as $fields) { $id = $fields[0]; $name = $fields[1]; $class = $fields[2]; $age = $fields[3]; $height = $fields[4]; $weight = $fields[5]; $res = $ins->queue($id, $name, $class, $age, $height, $weight); //$res = $ins->queuearray($fields); if(!empty($res)) $part[] = $res; } //Bulk Write Complete Operation $res = $ins->flush(); //Reassemble queue+flush part of RETURNING array if(!empty($res)) $part[] = $res; $retarray = array(); foreach($part as $k => $v) { foreach($v as $km => $vm) { $retarray[]=$vm; } } $tot = $ins->getTotalOperations(); $aff = $ins->getAffectedRows(); //Reset function for counters $ins->reset(); print_r($retarray); print("Total queue operations: " . $tot . "\n"); print("Total affected rows: " . $aff . "\n");
原始SQL
对于设置原始SQL格式的列,您需要在列名后添加后缀:IS_RAW。例如
$ins = new PSLInsUpd($pdo, 1, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','COALESCE(name, \'test\'):IS_RAW'], ['name']); /* Prepared sql: INSERT INTO users ("id", "name", "class", "age", "height", "weight") VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT ("id", COALESCE(name, 'test')) DO UPDATE name = EXCLUDED."name" */
性能提示
为了从该库中获得最大性能,您应该
- 将操作包装在事务中
- 禁用预处理语句的模拟(
PDO::ATTR_EMULATE_PREPARES=false
)
这两个提示结合可以为您带来高达50%的吞吐量提升,关于每秒插入数量。
建议
当使用事务时,建议不要忘记使用以下辅助程序 - try和catch与throw以及$pdo->rollBack();
。
限制
在增加每批查询的操作数量时要小心,因为您可能会遇到这些限制。
$ins = new PSL...($pdo, 1000, 'tablename', ['columnname']);
建议在每批查询插入中使用100-1000个查询。
- PHP的[内存限制]
- MySQL的[max_allowed_packet]
- PDO还有一个限制,即每个语句最多65535个查询参数,实际上将每个查询的操作数限制为
floor(65535 / number of columns)
。
允许的最大查询参数为65535。例如,65535 / 10列 ≈ 10922(为每批查询的最大查询数)。