setono/doctrine-orm-batcher

用于处理 Doctrine 中大量集合的库


README

Latest Version on Packagist Software License Build Status

当您需要处理大量实体,并且可能以异步方式时,请使用此库。

为什么我们需要这个库?为什么不直接使用像 Pagerfanta 这样的分页库或 Doctrine 中的常规 批量处理 呢?

嗯,因为当表变得太大时,MySQL 对 LIMIT 和 OFFSET 的处理不是很好。至于 Doctrine 的批量处理能力,这个库非常具有指导性。它会在基于消息的架构中工作得非常好,在这种架构中,大型处理很可能会以异步方式进行。

那么它是如何工作的呢?它使用 seek 方法 来分页结果。

安装

$ composer require setono/doctrine-orm-batcher

用法

有两种方法来获取结果:获取一组 id 或获取一个集合(无论是 id 还是实体)。

id 范围

范围是 id 的上下限。这通常用于异步环境,其中您将发送包含上下限的消息,以便消息的消费者可以轻松地根据这些界限获取相应的实体。

示例

您想处理所有的 Product 实体。该查询构建器可能如下所示

<?php
use Doctrine\ORM\EntityManagerInterface;

/** @var EntityManagerInterface $em */

$qb = $em->createQueryBuilder();
$qb->select('o')->from(Product::class, 'o');

# OR even simpler:
# $qb = $productRepository->createQueryBuilder('o');

现在将查询构建器注入到 id 范围批处理器中,并发送消息

<?php
use Setono\DoctrineORMBatcher\Batch\RangeBatch;
use Setono\DoctrineORMBatcher\Batcher\Collection\ObjectCollectionBatcher;
use Setono\DoctrineORMBatcher\Batcher\Collection\IdCollectionBatcher;
use Setono\DoctrineORMBatcher\Batcher\Range\NaiveIdRangeBatcher;
use Setono\DoctrineORMBatcher\Batcher\Range\IdRangeBatcher;
use Setono\DoctrineORMBatcher\Factory\BatcherFactory;

class ProcessProductBatchMessage
{
    private $batch;
    
    public function __construct(RangeBatch $batch)
    {
        $this->batch = $batch;        
    }
    
    public function getBatch(): RangeBatch
    {
        return $this->batch;
    }
}

$factory = new BatcherFactory(
    ObjectCollectionBatcher::class,
    IdCollectionBatcher::class,
    NaiveIdRangeBatcher::class,
    IdRangeBatcher::class
);
$idRangeBatcher = $factory->createIdRangeBatcher($qb);

/** @var RangeBatch[] $batches */
$batches = $idRangeBatcher->getBatches(50);
foreach ($batches as $batch) {
    $commandBus->dispatch(new ProcessProductBatchMessage($batch));
}

然后在某个地方,某个消费者将接收到这条消息并处理产品

<?php
use Setono\DoctrineORMBatcher\Query\QueryRebuilderInterface;

class ProcessProductBatchMessageHandler
{
    public function __invoke(ProcessProductBatchMessage $message)
    {
        /** @var QueryRebuilderInterface $queryRebuilder */
        $q = $queryRebuilder->rebuild($message->getBatch());
        $products = $q->getResult();
        
        foreach ($products as $product) {
            // process $product
        }
    }
}

这种方法非常快,但如果您有复杂的查询,可能更容易使用集合批处理器。

id 集合

应用于异步处理通过复杂查询选择的集合。

示例

您只想处理启用的 Product 实体。

<?php

use Doctrine\ORM\EntityManagerInterface;
use Setono\DoctrineORMBatcher\Factory\BatcherFactory;
use Setono\DoctrineORMBatcher\Batch\CollectionBatch;
use Setono\DoctrineORMBatcher\Batcher\Collection\ObjectCollectionBatcher;
use Setono\DoctrineORMBatcher\Batcher\Collection\IdCollectionBatcher;
use Setono\DoctrineORMBatcher\Batcher\Range\NaiveIdRangeBatcher;
use Setono\DoctrineORMBatcher\Batcher\Range\IdRangeBatcher;

class ProcessEnabledProductBatchMessage
{
    /** @var CollectionBatch */
    private $batch;
    
    public function __construct(CollectionBatch $batch)
    {
        $this->batch = $batch;        
    }
    
    public function getBatch(): CollectionBatch
    {
        return $this->batch;
    }
}

/** @var EntityManagerInterface $em */
$qb = $em->createQueryBuilder();
$qb->select('o')
    ->from(Product::class, 'o')
    ->where('o.enabled = 1')
;
$factory = new BatcherFactory(
    ObjectCollectionBatcher::class,
    IdCollectionBatcher::class,
    NaiveIdRangeBatcher::class,
    IdRangeBatcher::class
);
$idCollectionBatcher = $factory->createIdCollectionBatcher($qb);

/** @var CollectionBatch[] $batches */
$batches = $idCollectionBatcher->getBatches(50);
foreach ($batches as $batch) {
    $commandBus->dispatch(new ProcessEnabledProductBatchMessage($batch));
}

然后在某个地方,某个消费者将接收到这条消息并处理产品

<?php
use Setono\DoctrineORMBatcher\Query\QueryRebuilderInterface;

class ProcessProductBatchMessageHandler
{
    public function __invoke(ProcessEnabledProductBatchMessage $message)
    {
        /** @var QueryRebuilderInterface $queryRebuilder */
        $q = $queryRebuilder->rebuild($message->getBatch());
        $products = $q->getResult();
        
        foreach ($products as $product) {
            // process $product
        }
    }
}

对象集合

应用于立即处理通过复杂查询选择的对象。

示例

您只想立即处理启用的 Product 实体。

<?php

use Doctrine\ORM\EntityManagerInterface;
use Setono\DoctrineORMBatcher\Factory\BatcherFactory;
use Setono\DoctrineORMBatcher\Batch\CollectionBatch;
use Setono\DoctrineORMBatcher\Batcher\Collection\ObjectCollectionBatcher;
use Setono\DoctrineORMBatcher\Batcher\Collection\IdCollectionBatcher;
use Setono\DoctrineORMBatcher\Batcher\Range\NaiveIdRangeBatcher;
use Setono\DoctrineORMBatcher\Batcher\Range\IdRangeBatcher;

/** @var EntityManagerInterface $em */
$qb = $em->createQueryBuilder();
$qb->select('o')
    ->from(Product::class, 'o')
    ->where('o.enabled = 1')
;
$factory = new BatcherFactory(
    ObjectCollectionBatcher::class,
    IdCollectionBatcher::class,
    NaiveIdRangeBatcher::class,
    IdRangeBatcher::class
);
$collectionBatcher = $factory->createObjectCollectionBatcher($qb);

/** @var CollectionBatch[] $batches */
$batches = $collectionBatcher->getBatches(50);
foreach ($batches as $batch) {
    /** @var Product $product */
    foreach ($batch->getCollection() as $product) {
        // process $product
    }
}

框架集成