fsc/batch

一个包含用于批量处理的类的库。

0.2.0 2013-02-13 23:37 UTC

This package is auto-updated.

Last update: 2024-09-12 03:56:12 UTC


README

Build Status

PHP 5.3库,帮助您运行大规模批量操作。

它需要一个PagerfantaAdapterInterface(doctrine orm、propel、array、solarium等...可用)作为数据源,将数据以批量大小为单位的切片获取,然后通过调用您提供的回调函数对每个上下文进行批量处理。

<?php

use FSC\Batch\Batch;
use FSC\Batch\Event\ExecuteEvent;

$usersAdapter = ...;
$solrIndex = ...;

$batch = new Batch($usersAdapter, function (ExecuteEvent $event) use ($solrIndexer) {
    $solrIndexer->indexUser($event->getContext());
});
$batch->run(10); // Execute in batch of 10

功能

  • 使用symfony2事件调度器的事件系统,可以轻松添加行为

包含的监听器

  • ProgressEventListener:在每个批次的末尾显示进度、已用时间和估计剩余时间。
  • DoctrineEventListener:在每个批次的末尾
    • flush()对象管理器,将所有内容同时保存(在某些情况下可能提高性能)
    • clear()对象管理器,以避免内存泄漏

额外功能

  • 为doctrine ORM添加一个PagerfantaAdapter,该适配器使用id的范围查询而不是LIMIT/OFFSET遍历表。随着OFFSET的增长,LIMIT/OFFSET会降低查询时间,而范围查询的时间保持一致。

请注意,这个库是一个WIP,需要更多的测试。

示例

简单批量处理

<?php

require_once __DIR__.'/../vendor/autoload.php';

use FSC\Batch\Batch;
use FSC\Batch\Event\ExecuteEvent;
use FSC\Batch\EventListener\ProgressEventListener;
use Pagerfanta\Adapter\ArrayAdapter;
use Symfony\Component\Console\Output\ConsoleOutput;

$passwords = range(1, 100);
$hashes = array();

$batch = new Batch(new ArrayAdapter($passwords), function (ExecuteEvent $event) use (&$hashes) {
    $hashes[] = crypt($event->getContext(), '$2a$10$');
});
$batch->getEventDispatcher()->addSubscriber(new ProgressEventListener(new ConsoleOutput()));

$batch->run(10);

将输出

$ php examples/array_closure.php
Batch run start. 100 jobs [Mem: 0.52 MB]
[ 10/100] [ 10.00 %] ([Δ 0.83 sec] - [Elapsed 0.83 sec] - [Remaining   7 secs]) [Mem:  0.52 MB]
[ 20/100] [ 20.00 %] ([Δ 0.83 sec] - [Elapsed    1 sec] - [Remaining   6 secs]) [Mem:  0.52 MB]
[ 30/100] [ 30.00 %] ([Δ 0.83 sec] - [Elapsed   2 secs] - [Remaining   5 secs]) [Mem:  0.52 MB]
[ 40/100] [ 40.00 %] ([Δ 0.83 sec] - [Elapsed   3 secs] - [Remaining   4 secs]) [Mem:  0.52 MB]
[ 50/100] [ 50.00 %] ([Δ 0.82 sec] - [Elapsed   4 secs] - [Remaining   4 secs]) [Mem:  0.52 MB]
[ 60/100] [ 60.00 %] ([Δ 0.83 sec] - [Elapsed   4 secs] - [Remaining   3 secs]) [Mem:  0.52 MB]
[ 70/100] [ 70.00 %] ([Δ 0.83 sec] - [Elapsed   5 secs] - [Remaining   2 secs]) [Mem:  0.79 MB]
[ 80/100] [ 80.00 %] ([Δ 0.83 sec] - [Elapsed   6 secs] - [Remaining    1 sec]) [Mem:  0.79 MB]
[ 90/100] [ 90.00 %] ([Δ 0.83 sec] - [Elapsed   7 secs] - [Remaining 0.83 sec]) [Mem:  0.79 MB]
[100/100] [100.00 %] ([Δ 0.82 sec] - [Elapsed   8 secs] - [Remaining    0 sec]) [Mem:  0.79 MB]
Batch run end. took 0.82 sec [Mem: 0.79 MB]

在symfony命令中使用Doctrine ORM批量处理

此示例使用DoctrineEventListener,在每个批次的末尾flush(保存所有内容)并clear对象管理器(避免内存问题)。我们还使用了一个自定义的PagerfantaAdapter:DoctrineBatchAdapter,该适配器使用范围查询(id > 100 AND id < 200)而不是LIMIT/OFFSET,以避免随着OFFSET的增长而增加查询时间。

<?php

use FSC\Batch\Batch;
use FSC\Batch\Command\BatchCommand;
use FSC\Batch\EventListener\DoctrineEventListener;
use FSC\Batch\Event\ContextEvent;
use Pagerfanta\Adapter\DoctrineORMAdapter;

class UserIndexSolrCommand extends BatchCommand
{
    protected function createBatch()
    {
        $em = $this->getContainer()->get('doctrine')->getEntityManager();
        $qb = $em->getRepository('User')->createQueryBuilder('u');

        $batch = new Batch(new DoctrineORMAdapter($qb), array($this, 'indexUser'));
        $batch->getEventDispatcher()->addSubscriber(new DoctrineEventListener($em));

        return $batch;
    }

    public function indexUser(ContextEvent $event)
    {
        $user = $event->getContext();
        // Index this user!
    }
}