eltaline/bulk

PDO 批量库

v1.2.5 2021-01-30 18:59 UTC

This package is auto-updated.

Last update: 2024-08-29 04:43:22 UTC


README

PDO 批量库 PDO 批量库

用于轻松使用批量/批量插入的工具,支持 ON CONFLICT/ON DUPLICATE KEY/DO NOTHING/INSERT IGNORE, RETURNING SQL 运算符和简单的数学/连接逻辑。

包含与数据库交互的辅助类。

目前支持 PDO Postgresql/MySQL。

安装

通过 composer 安装

cd /path/to/yourproject
composer require eltaline/bulk 

或通过本地文件安装

cd /path/to/yourproject
wget -O composer.json https://raw.githubusercontent.com/eltaline/bulk/master/composer-local.json
composer install

需求

此库需要 PHP 7.1 或更高版本。

包概述

此包包含几个辅助函数

PgSQL: PSLIns, PSLInsNth, PSLInsUpd, PSLDel. MySQL: MSLIns, MSLInsNth, MSLInsUpd, MSLDel.

这些类基于 PDO 构建,通过在每个查询中执行多个(批量)操作,使用此 API 加速数据库行插入和删除。

测试表方案

在 PostgreSQL/MySQL 中创建测试表。

CREATE TABLE tablename (id integer NOT NULL, name varchar(128), class varchar(128), age integer, height integer, weight integer, PRIMARY KEY(id,name));

重要函数描述

刷新函数非常重要,需要在与队列操作的工作结束时使用。需要写入最后一个队列缓冲区并重置队列缓冲区。

$ins->flush(); // Write last or < queue size, part of data from queue.

队列函数 queue() 或 queuearray() 只能在单个循环中单独使用。

$ins->queue(); // Collect data values until queue size, then write part of data.
$ins->queuearray(); // Collect data values(from simple array with single query values of data) until queue size, then write part of data.

可选函数描述

可以在逻辑中使用可选计数器。

$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();

缓冲区和计数器的重置函数,重置队列缓冲区和计数器。

$ins->reset(); // Resetting only counters
$ins->resetbuf(); // Resetting only buffer
$ins->resetall(); // Resetting buffer and counters

PSLIns & MSLIns

此类利用批量插入到空/临时表中。

  • 实现 INSERT ...
  • 支持的 RETURNING 运算符

要使用它,请使用以下方式创建 PSLInsMSLIns 实例

  • 您的 PDO 连接对象
  • 每个批量查询要执行的插入数
  • 您的表名
  • 要插入的列名
  • 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
  • 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
  • 可选设置 RETURNING 字段或字段或 '*'
  • 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)

PSLInsNth & MSLInsNth

此类利用批量插入,避免表中重复键错误。

  • 实现 ON CONFLICT DO NOTHING 和 INSERT IGNORE
  • 支持的 RETURNING 运算符

要使用它,请使用以下方式创建 PSLInsNthMSLInsNth 实例

  • 您的 PDO 连接对象
  • 每个批量查询要执行的插入数
  • 您的表名
  • 要插入的列名
  • 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
  • 可选设置空数组(不用于 INSERT,但需要为设置 RETURNING 选项留空)
  • 可选设置 RETURNING 字段或字段或 '*'
  • 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)

PSLInsUpd & MSLInsUpd

此类利用批量插入中的简单数学逻辑。

  • 实现 ON CONFLICT (唯一/复合键) DO UPDATE SET ... 和 ON DUPLICATE KEY UPDATE ...
  • 支持的 + - / * 数学运算符(列+列,列-列,列/列,列*列)
  • 支持的 | 连接运算符(列|列|;),其中 ; 是分隔符
  • 支持的 RETURNING 运算符

要使用它,请使用以下方式创建 PSLInsUpdMSLInsUpd 实例

  • 您的 PDO 连接对象
  • 每个批量查询要执行的插入数
  • 您的表名
  • 要插入的列名
  • 表的主键列名(唯一/复合)
  • 更新列的列名或列名格式为 column+column 的列名(更新和值添加)
  • 可选设置 RETURNING 字段或字段或 '*'
  • 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)

PSLDel & MSLDel

此类利用按列值或列值的批量删除。

  • 实现 DELETE ...
  • 支持的 RETURNING 运算符

要使用它,请使用以下方式创建 PSLDelMSLDel 实例

  • 您的 PDO 连接对象
  • 每个批量查询要执行的插入数
  • 您的表名
  • where 子句的列名或列名
  • 可选设置空数组(现在不用于 DELETE,但需要为设置 RETURNING 选项留空)
  • 可选设置空数组(现在不用于 DELETE,但需要为设置 RETURNING 选项留空)
  • 可选设置 RETURNING 字段或字段或 '*'
  • 可选设置 RETURNING 子句的 PDO 获取模式(默认为空)

开始

include(__DIR__ . '/vendor/autoload.php');

use PDOBulk\Db\PSLIns;
use PDOBulk\Db\PSLInsNth;
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\PSLDel;
use PDOBulk\Db\MSLIns;
use PDOBulk\Db\MSLInsNth;
use PDOBulk\Db\MSLInsUpd;
use PDOBulk\Db\MSLDel;

$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);

$data = array();

$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];

事务

您还可以使用事务来加速批量查询。

$pdo->beginTransaction(); // Before loop with queue() or queuearray();
$pdo->commit(); // After loop with queue() or queuearray();

此外,使用try-catch(Exception $e)进行事务回滚。

$pdo->beginTransaction();

try {

    foreach (...) { 
	$ins->queue(...);
	//$ins->queuearray(...);
    }

} catch (Exception $e) {
	$pdo->rollBack();
	throw $ins;
    }

}

try {

    $ins->flush();
    $pdo->commit();

} catch (Exception $e) {

    $pdo->rollBack();
    throw $ins;

}

简单操作

支持以下类

PgSQL: PSLInsPSLInsNthPSLInsUpdPSLDel。MySQL: MSLInsPSLInsNthMSLInsUpdMSLDel

  • 类PSLInsNth & MSLInsNth实现了ON CONFLICT DO NOTHING & INSERT IGNORE子句。

格式:(连接对象,队列大小,表,[值列])

$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);

格式:(连接对象,队列大小,表,[值列])

$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);

格式:(连接对象,队列大小,表,[值列],[冲突/重复列],[更新列])

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);

格式:(连接对象,队列大小,表,[值列])

$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);

完整示例

include(__DIR__ . '/vendor/autoload.php');

use PDOBulk\Db\PSLIns;
use PDOBulk\Db\PSLInsNth;
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\PSLDel;
use PDOBulk\Db\MSLIns;
use PDOBulk\Db\MSLInsNth;
use PDOBulk\Db\MSLInsUpd;
use PDOBulk\Db\MSLDel;

$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);

$data = array();

$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];

$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
//$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);
//$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight']);
//$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight']);

foreach ($data as $fields) {

    $id = $fields[0];
    $name = $fields[1];
    $class = $fields[2];
    $age = $fields[3];
    $height = $fields[4];
    $weight = $fields[5];

    $ins->queue($id, $name, $class, $age, $height, $weight);
    //$ins->queuearray($fields);

}

//Bulk Write Complete Operation

$res = $ins->flush();

$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();

//Reset function for counters

$ins->reset();

print("Total queue operations: " . $tot . "\n");
print("Total affected rows: " . $aff . "\n");

高级操作

支持以下类

PgSQL: PSLInsUpd。MySQL: MSLInsUpd

  • 支持对具有前值的字段进行数学/连接操作。

格式:(连接对象,队列大小,表,[值列],[冲突/重复列],[更新列])

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight-weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight-weight']);

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight*weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight*weight']);

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight/weight']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight/weight']);

完整示例

include(__DIR__ . '/vendor/autoload.php');

use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\MSLInsUpd;

$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);

$data = array();

$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);
//$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class|class|;','weight+weight']);

foreach ($data as $fields) {

    $id = $fields[0];
    $name = $fields[1];
    $class = $fields[2];
    $age = $fields[3];
    $height = $fields[4];
    $weight = $fields[5];

    $ins->queue($id, $name, $class, $age, $height, $weight);
    //$ins->queuearray($fields);

}

//Bulk Write Complete Operation

$res = $ins->flush();

$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();

//Reset function for counters

$ins->reset();

print("Total queue operations: " . $tot . "\n");
print("Total affected rows: " . $aff . "\n");

返回操作

支持以下类

PgSQL: PSLIns, PSLInsNth, PSLInsUpd, PSLDel. MySQL: MSLIns, MSLInsNth, MSLInsUpd, MSLDel.

使用支持类的其中一个与RETURNING操作符。

  • 最后一个参数是PDO获取模式
  • 还可以将高级数学/连接操作与返回操作一起使用

支持的获取模式

PDO::FETCH_BOTH, PDO::FETCH_NUM,
PDO::FETCH_ASSOC, PDO::FETCH_UNIQUE,
PDO::FETCH_GROUP, PDO::FETCH_KEY_PAIR,
PDO::FETCH_GROUP|PDO::FETCH_COLUMN,
PDO::FETCH_OBJ

格式:(连接对象,队列大小,表,[值列],[不使用],[不使用],[返回列],[获取模式])

$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);

格式:(连接对象,队列大小,表,[值列],[不使用],[不使用],[返回列],[获取模式])

$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);

格式:(连接对象,队列大小,表,[值列],[冲突/重复列],[更新列],[返回列],[获取模式])

$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);

格式:(连接对象,队列大小,表,[值列],[现在不使用],[现在不使用],[返回列],[获取模式])

$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);

完整示例

include(__DIR__ . '/vendor/autoload.php');

use PDOBulk\Db\PSLIns;
use PDOBulk\Db\PSLInsNth;
use PDOBulk\Db\PSLInsUpd;
use PDOBulk\Db\PSLDel;
use PDOBulk\Db\MSLIns;
use PDOBulk\Db\MSLInsNth;
use PDOBulk\Db\MSLInsUpd;
use PDOBulk\Db\MSLDel;

$pdo = new PDO(...);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);

$data = array();

$data[] = ['1', 'Mark', 'Soldier' , '24', '175', '85'];
$data[] = ['2', 'Steve', 'Engineer', '36', '190', '95'];
$data[] = ['3', 'Clara', 'Sniper', '18', '180', '57'];

$ins = new PSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new PSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new PSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new PSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLIns($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLInsNth($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLInsUpd($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','name'], ['class','weight'], ['id,name'], ['PDO::FETCH_ASSOC']);
//$ins = new MSLDel($pdo, 1000, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], [], [], ['id,name'], ['PDO::FETCH_ASSOC']);

$part = array(); // Define array for merge $res by RETURNING data of column or columns.

foreach ($data as $fields) {

    $id = $fields[0];
    $name = $fields[1];
    $class = $fields[2];
    $age = $fields[3];
    $height = $fields[4];
    $weight = $fields[5];

    $res = $ins->queue($id, $name, $class, $age, $height, $weight);
    //$res = $ins->queuearray($fields);

    if(!empty($res)) $part[] = $res;

}

//Bulk Write Complete Operation

$res = $ins->flush();

//Reassemble queue+flush part of RETURNING array

if(!empty($res)) $part[] = $res;

$retarray = array();

foreach($part as $k => $v) {
    foreach($v as $km => $vm) {
        $retarray[]=$vm;
    }
}

$tot = $ins->getTotalOperations();
$aff = $ins->getAffectedRows();

//Reset function for counters

$ins->reset();

print_r($retarray);

print("Total queue operations: " . $tot . "\n");
print("Total affected rows: " . $aff . "\n");

原始SQL

对于设置原始SQL格式的列,您需要在列名后添加后缀:IS_RAW。例如

$ins = new PSLInsUpd($pdo, 1, 'users', ['id', 'name', 'class', 'age', 'height', 'weight'], ['id','COALESCE(name, \'test\'):IS_RAW'], ['name']);
/*
  Prepared sql:
  INSERT INTO users ("id", "name", "class", "age", "height", "weight") VALUES (?, ?, ?, ?, ?, ?)
  ON CONFLICT ("id", COALESCE(name, 'test')) DO UPDATE name = EXCLUDED."name"
*/

性能提示

为了从该库中获得最大性能,您应该

  • 将操作包装在事务中
  • 禁用预处理语句的模拟(PDO::ATTR_EMULATE_PREPARES=false

这两个提示结合可以为您带来高达50%的吞吐量提升,关于每秒插入数量。

建议

当使用事务时,建议不要忘记使用以下辅助程序 - try和catch与throw以及$pdo->rollBack();

限制

在增加每批查询的操作数量时要小心,因为您可能会遇到这些限制。

$ins = new PSL...($pdo, 1000, 'tablename', ['columnname']);

建议在每批查询插入中使用100-1000个查询。

  • PHP的[内存限制]
  • MySQL的[max_allowed_packet]
  • PDO还有一个限制,即每个语句最多65535个查询参数,实际上将每个查询的操作数限制为floor(65535 / number of columns)

允许的最大查询参数为65535。例如,65535 / 10列 ≈ 10922(为每批查询的最大查询数)。

END