lapaliv/laravel-bulk-upsert


README

注释

您需要频繁地插入模型集合吗?我经常有这个任务。Laravel 提供了批量插入/更新/上次的解决方案,但它使用的算法与 eloquent 不同。例如,当我们使用 $model->create() 时,我们可以在观察器中补充数据,但当我们使用 Model::query()->insert() 时,它不起作用。

第二个问题出在字段数量上。在将字段传递给构建器的插入/更新/上次的操作之前,我们需要对字段进行对齐。这并不总是方便。

第三个问题出在插入后获取插入行的方式上。Laravel 不会返回它们。当然,如果您只有一个唯一的列,这不会是很大的问题,但在其他情况下,您需要编写相当大的 SQL 查询来选择它们,这将花费一些时间。

正因为如此,我编写了这个库来解决这些问题。使用这个库,您可以将模型集合保存起来,并同时在创建/创建、更新/更新、保存/保存、删除/删除、恢复/恢复、强制删除/强制删除等 eloquent 事件中使用 eloquent 事件。而且您不需要在之前准备字段数量。

简单来说,这个库运行得像这样

foreach($models as $model){
    $model->save();
}

但每次只对数据库进行少量查询。

功能

  • 触发 eloquent 事件创建/更新/上次的集合

    • 创建 / 创建成功,
    • 更新 / 更新成功,
    • 保存 / 保存成功,
    • 删除 / 删除成功,
    • 恢复 / 恢复成功;
    • 强制删除 / 强制删除成功;

    以及一些新的事件

    • 创建多个 / 创建多个成功,
    • 更新多个 / 更新多个成功,
    • 保存多个 / 保存多个成功,
    • 删除多个 / 删除多个成功,
    • 恢复多个 / 恢复多个成功;
    • 强制删除多个 / 强制删除多个成功;
  • 如果您不使用 eloquent 事件,则会在保存到数据库之前自动对传输的字段进行对齐。

  • 从数据库中选择插入的行

1.x 版本的文档

您可以在这里查看 1.x 版本的文档

需求

  • 数据库
    • MySQL: 5.7+
    • PostgreSQL 9.6+
    • SQLite 3.32+
  • PHP: 8.0+
  • Laravel: 8.0+

安装

您可以通过 composer 安装此包

composer require lapaliv/laravel-bulk-upsert

开始使用

在您的模型(s)中使用 Lapaliv\BulkUpsert\Bulkable 特性

namespace App\Models;

use Illuminate\Database\Eloquent\Model;
use Lapaliv\BulkUpsert\Bulkable;

class User extends Model {
    use Bulkable;
}

用法

创建实例

您可以通过四种方式来创建实例

use App\Models\User;

$bulk = User::query()->bulk();
use App\Models\User;

$bulk = User::bulk();
use App\Models\User;
use Lapaliv\BulkUpsert\Bulk;

$bulk = new Bulk(User::class);
use App\Models\User;
use Lapaliv\BulkUpsert\Bulk;

$bulk = new Bulk(new User());

创建/插入

准备数据

$data = [
    ['email' => 'john@example.com', 'name' => 'John'],
    ['email' => 'david@example.com', 'name' => 'David'],
];

您可以简单地创建这些用户。

$bulk->uniqueBy('email')->create($data);

您可以创建并返回这些用户。

$users = $bulk->uniqueBy('email')->createAndReturn($data);

// $users is Illuminate\Database\Eloquent\Collection<App\Models\User>

您可以累积行,直到有足够的行可以写入。

$chunkSize = 100;
$bulk->uniqueBy('email')
    ->chunk($chunkSize);

foreach($data as $item) {
    // The method `createOrAccumulate` will create rows
    // only when it accumulates the `$chunkSize` rows. 
    $bulk->createOrAccumulate($item);
}

// The createAccumulated method will create all accumulated rows,
// even if their quantity is less than `$chunkSize`.
$bulk->createAccumulated();

更新

准备数据

$data = [
    ['id' => 1, 'email' => 'steve@example.com', 'name' => 'Steve'],
    ['id' => 2, 'email' => 'jack@example.com', 'name' => 'Jack'],
];

您可以简单地更新这些用户。

$bulk->update($data);

您可以更新这些用户并返回模型集合。

$users = $bulk->updateAndReturn($data);

// $users is Illuminate\Database\Eloquent\Collection<App\Models\User>

您可以累积行,直到有足够的行可以写入。

$chunkSize = 100;
$bulk->chunk($chunkSize);

foreach($data as $item) {
    // The method `updateOrAccumulate` will update rows
    // only when it accumulates the `$chunkSize` rows. 
    $bulk->updateOrAccumulate($item);
}

// The updateAccumulated method will update all accumulated rows,
// even if their quantity is less than `$chunkSize`.
$bulk->updateAccumulated();

额外方法

有一种额外的方法可以更新数据库中的数据

User::query()
    ->whereIn('id', [1,2,3,4])
    ->selectAndUpdateMany(
        values: ['role' => null],
    );

这种方法从数据库加载数据,并通过查询更新找到的行。

上插(更新和插入)

准备数据

$data = [
    ['email' => 'jacob@example.com', 'name' => 'Jacob'],
    ['id' => 1, 'email' => 'oscar@example.com', 'name' => 'Oscar'],
];

您可以简单地上插这些用户。

$bulk->uniqueBy(['email'])
    ->upsert($data);

您可以上插这些用户并返回模型集合。

$users = $bulk->uniqueBy(['email'])
    ->upsertAndReturn($data);

// $users is Illuminate\Database\Eloquent\Collection<App\Models\User>

您也可以累积行,直到有足够的行可以写入。

$chunkSize = 100;
$bulk->uniqueBy(['email'])
    ->chunk($chunkSize);

foreach($data as $item) {
    // The method `upsertOrAccumulate` will upsert rows
    // only when it accumulates the `$chunkSize` rows. 
    $bulk->upsertOrAccumulate($item);
}

// The upsertAccumulated method will upsert all accumulated rows,
// even if their quantity is less than `$chunkSize`.
$bulk->upsertAccumulated();

强制/软删除(自 v2.1.0 以来)

准备数据

$data = [
    ['email' => 'jacob@example.com', 'name' => 'Jacob'],
    ['id' => 1, 'email' => 'oscar@example.com', 'name' => 'Oscar'],
];
$bulk->create($data);

您可以直接删除这些用户。如果您的模型使用了特性 Illuminate\Database\Eloquent\SoftDeletes,则您的模型将进行软删除,否则将强制删除。

$bulk->uniqueBy(['email'])
    ->delete($data);

或者,您可以强制删除它们。

$bulk->uniqueBy(['email'])
    ->forceDelete($data);

您也可以累积行,直到它们足够多以至于可以删除。

$chunkSize = 100;
$bulk->uniqueBy(['email'])
    ->chunk($chunkSize);

foreach($data as $item) {
    $bulk->deleteOrAccumulate($item);
    // or $bulk->forceDeleteOrAccumulate($item);
}

$bulk->deleteAccumulated();
// or $bulk->forceDeleteAccumulated();

监听器

事件顺序

回调函数调用的顺序是

  • onSaving
  • onCreatingonUpdating
  • onDeleting
  • onForceDeleting
  • onRestoring
  • onSavingMany
  • onCreatingManyonUpdatingMany
  • onDeletingMany
  • onForceDeletingMany
  • onRestoringMany
  • onCreatedonUpdated
  • onDeleted
  • onForceDeleted
  • onRestored
  • onCreatedManyonUpdatedMany
  • onDeletedMany
  • onForceDeletedMany
  • onRestoredMany
  • onSavedMany

如何监听事件

您可以通过三种方式监听库中的事件

如何监听事件:批量回调

use App\Models\User;
use Lapaliv\BulkUpsert\Collections\BulkRows;

$bulk
    // The callback runs before creating.
    // If your callback returns `false` then the model won't be created
    // and deleted (if `deleted_at` was filled in)
    ->onCreating(fn(User $user) => /* ... */)
    
    // The callback runs after creating.
    ->onCreated(fn(User $user) => /* ... */)
    
    // The callback runs before updating.
    // If your callback returns `false` then the model won't be updated,
    // deleted (if `deleted_at` was filled in) and restored.
    ->onUpdating(fn(User $user) => /* ... */)
    
    // The callback runs after updating.
    ->onUpdated(fn(User $user) => /* ... */)
    
    // The callback runs before deleting.
    // If your callback returns `false` then the model won't be deleted,
    // but it doesn't affect the upserting.
    ->onDeleting(fn(User $user) => /* ... */)
    
    // The callback runs before force deleting.
    // If your callback returns `false` then the model won't be deleted,
    ->onForceDeleting(fn(User $user) => /* ... */)
    
    // The callback runs after deleting.
    ->onDeleted(fn(User $user) => /* ... */)
    
    // The callback runs after force deleting.
    ->onForceDeleted(fn(User $user) => /* ... */)
    
    // The callback runs before upserting.
    ->onSaving(fn(User $user) => /* ... */)
    
    // The callback runs after upserting.
    ->onSaved(fn(User $user) => /* ... */)

    // Runs before creating.
    // If the callback returns `false` then these models won't be created.
    ->onCreatingMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs after creating.
    ->onCreatedMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs before updating.
    // If the callback returns `false` then these models won't be updated.
    ->onUpdatingMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs after updating.
    ->onUpdatedMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs before deleting.
    // If the callback returns `false` then these models won't be deleted,
    // but it doesn't affect the upserting.
    ->onDeletingMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs before force deleting.
    // If the callback returns `false` then these models won't be deleted.
    ->onForceDeletingMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs after deleting.
    ->onDeletedMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs after force deleting.
    ->onForceDeletedMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs before restoring.
    // If the callback returns `false` then these models won't be restored,
    // but it doesn't affect the upserting.
    ->onRestoringMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs after restoring.
    ->onRestoredMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs before upserting.
    // If the callback returns `false` then these models won't be upserting,
    ->onSavingMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

    // Runs after upserting.
    ->onSavedMany(fn(Collection $users, BulkRows $bulkRows) => /* .. */)

如何监听事件:模型回调

您还可以使用模型回调。它们几乎相同。只需移除前缀 on。例如

use App\Models\User;
use Lapaliv\BulkUpsert\Collections\BulkRows;

User::saving(fn(User $user) => /* .. */);
User::savingMany(
    fn(Collection $users, BulkRows $bulkRows) => /* .. */
);

如何监听事件:观察者

您还可以使用观察者。例如

namespace App\Observers;

use App\Models\User;
use Lapaliv\BulkUpsert\Collections\BulkRows;

class UserObserver {
    public function creating(User $user) {
        // ..
    }
    
    public function creatingMany(Collection $users, BulkRows $bulkRows) {
        // ..
    }
}

示例

您可以观察过程。库支持基本的 eloquent 事件和一些额外事件,这些事件为您提供了访问模型集合的权限。接受类型为 BulkRows 的额外参数的监听器。这是一个包含您的原始数据(属性 original)、模型(属性 model)以及用于保存的唯一属性(属性 unique)的 Lapaliv\BulkUpsert\Entities\BulkRow 类的集合。如果,例如,您在数据中有些关系,这可以帮助您继续保存。

让我们看看示例

namespace App\Models;

use Illuminate\Database\Eloquent\Model;

class Comment extends Model {
    // ...
    protected $fillable = ['user_id', 'text', 'uuid'];
    // ...
}

class User extends Model {
    // ...
    protected $fillable = ['email', 'name'];
    // ...
}
namespace App\Observers;

use Illuminate\Database\Eloquent\Collection;
use Lapaliv\BulkUpsert\Collections\BulkRows;
use Lapaliv\BulkUpsert\Entities\BulkRow;

class UserObserver {
    public function savedMany(Collection $users, BulkRows $bulkRows): void {
        $rawComments = [];
        
        $bulkRows->each(
            function(BulkRow $bulkRow) use(&$rawComments): void {
                $bulkRow->original['user_id'] = $bulkRow->model->id;
                $rawComments[] = $bulkRow->original;
            }
        )
        
        Comment::query()
            ->bulk()
            ->uniqueBy(['uuid'])
            ->upsert($rawComments);
    }
}
$data = [
    [
        'id' => 1,
        'email' => 'tom@example.com',
        'name' => 'Tom',
        'comments' => [
            ['text' => 'First comment', 'uuid' => 'c0753127-45af-43ac-9664-60b5b2dbf0e5'],
            ['text' => 'Second comment', 'uuid' => 'e95d7e15-1e9f-44c5-9978-7641a3792669'],
        ],
    ]
];

User::query()
    ->uniqueBy(['email'])
    ->upsert($data);

在这个示例中,您有一个链。在执行插入更新操作后,库将运行 UserObserver::savedMany(),其中代码将准备评论并执行插入更新操作。

API

use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\Collection;

class Bulk {

    public function __construct(Model|string $model);
    
    /**
     * Sets the chunk size.
     */
    public function chunk(int $size = 100): static;
    
    /**
     * Defines the unique attributes of the rows.
     * @param callable|string|string[]|string[][] $attributes
     */
    public function uniqueBy(string|array|callable $attributes): static;
    
    /**
     * Defines the alternatives of the unique attributes.
     * @param callable|string|string[]|string[][] $attributes
     */
    public function orUniqueBy(string|array|callable $attributes): static;
    
    /**
     * Sets enabled events.
     * @param string[] $events
     */
    public function setEvents(array $events): static;
    
    /**
     * Disables the next events: `saved`, `created`, `updated`, `deleted`, `restored`.
     */
    public function disableModelEndEvents(): static;
    
    /**
     * Disables the specified events or the all if `$events` equals `null`.
     * @param string[]|null $events
     */
    public function disableEvents(array $events = null): static;
    
    /**
     * Disables the specified event.
     */
    public function disableEvent(string $event): static;
    
    /**
     * Enables the specified events or the all if `$events` is empty.
     * @param string[]|null $events
     */
    public function enableEvents(array $events = null): static;
    
    /**
     * Enables the specified event.
     */
    public function enableEvent(string $event): static;
    
    /**
     * Sets the list of attribute names which should update.
     * @param string[] $attributes
     */
    public function updateOnly(array $attributes): static;
    
    /**
     * Sets the list of attribute names which shouldn't update.
     * @param string[] $attributes
     */
    public function updateAllExcept(array $attributes): static;
    
    /**
     * Enables soft deleted rows into select.
     * 
     * @return $this
     */
    public function withTrashed(): static;
    
    /**
     * Creates the rows.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     */
    public function create(iterable $rows, bool $ignoreConflicts = false): static;
    
    /**
     * Creates the rows if their quantity is greater than or equal to the chunk size.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     */
    public function createOrAccumulate(iterable $rows, bool $ignoreConflicts = false): static;
    
    /**
     * Creates the rows and returns them.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @param string[] $columns columns that should be selected from the database
     * @return Collection<Model>
     * @throws BulkException
     */
    public function createAndReturn(iterable $rows, array $columns = ['*'], bool $ignoreConflicts = false): Collection;
    
    /**
     * Creates the all accumulated rows.
     * @throws BulkException
     */
    public function createAccumulated(): static;
    
    /**
     * Updates the rows.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     */
    public function update(iterable $rows): static;
    
    /**
     * Updates the rows if their quantity is greater than or equal to the chunk size.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     */
    public function updateOrAccumulate(iterable $rows): static;
    
    /**
     * Updates the rows and returns them.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @param string[] $columns columns that should be selected from the database
     * @return Collection<Model>
     * @throws BulkException
     */
    public function updateAndReturn(iterable $rows, array $columns = ['*']): Collection;
    
    /**
     * Updates the all accumulated rows.
     * @throws BulkException
     */
    public function updateAccumulated(): static;
    
    /**
     * Upserts the rows.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     */
    public function upsert(iterable $rows): static;
    
    /**
     * Upserts the rows if their quantity is greater than or equal to the chunk size.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     */
    public function upsertOrAccumulate(iterable $rows): static;
    
    /**
     * Upserts the rows and returns them.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @param string[] $columns columns that should be selected from the database
     * @return Collection<Model>
     * @throws BulkException
     */
    public function upsertAndReturn(iterable $rows, array $columns = ['*']): Collection;
    
    /**
     * Upserts the all accumulated rows.
     * @throws BulkException
     */
    public function upsertAccumulated(): static;
    
    /**
     * Deletes the rows.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     * @since 2.1.0
     */
    public function delete(iterable $rows): static;
    
    /**
     * Deletes the rows if their quantity is greater than or equal to the chunk size.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     * @since 2.1.0
     */
    public function deleteOrAccumulate(iterable $rows): static;
    
    /**
     * Force deletes the rows.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     * @since 2.1.0
     */
    public function forceDelete(iterable $rows): static;
    
    /**
     * Force deletes the rows if their quantity is greater than or equal to the chunk size.
     * @param iterable<int|string, Model|stdClass|array<string, mixed>|object> $rows
     * @throws BulkException
     * @since 2.1.0
     */
    public function forceDeleteOrAccumulate(iterable $rows): static;
    
    /**
     * Deletes the all accumulated rows.
     *
     * @throws BulkException
     * @since 2.1.0
     */
    public function deleteAccumulated(): static;
    
    /**
     * Deletes the all accumulated rows.
     *
     * @throws BulkException
     * @since 2.1.0
     */
    public function forceDeleteAccumulated(): static;
    
    /**
     * Saves the all accumulated rows.
     * @throws BulkException
     */
    public function saveAccumulated(): static;   
}
namespace Lapaliv\BulkUpsert\Entities;

class BulkRow {
    /**
     * The upserting/upserted model.
     * @var Model 
     */
    public Model $model;
    
    /**
     * The original item from `iterable rows`. 
     * @var array|object|stdClass|Model 
     */
    public mixed $original;
    
    /**
     * Unique fields which were used for upserting.
     * @var string[] 
     */
    public array $unique;
}

待办事项

  • 批量恢复
  • 批量触摸
  • 批量更新而不更新时间戳
  • 支持 DB::raw() 作为值
  • 支持自定义数据库驱动
  • 更新行并返回更新数量

测试

您可以检查 操作 或在您的笔记本电脑上运行它

git clone https://github.com/lapaliv/laravel-bulk-upsert.git
cp .env.example .env
docker-compose up -d
./vendor/bin/phpunit