trkisf2/remote-collection-stream

该包最新版本(v0.2)没有提供许可信息。

连续将远程集合以用户定义的块的形式流式传输,并使用生成器返回它们。

v0.2 2017-06-21 14:51 UTC

This package is not auto-updated.

Last update: 2024-09-24 17:54:32 UTC


README

连续将远程集合以用户定义的块的形式流式传输,并使用生成器返回它们。

使用方法

$stream = new Stream();

$campaignsCollectionGenerator = $stream->stream(
    new StreamConfiguration(self::BATCH_FETCH_SIZE_CAMPAIGNS),
    function ($offset, $limit) use ($job) {
        return $this->campaigns->getAll($job, $offset, $limit);
    }
);

用例,实现示例

假设我们想要以块的形式将一个较大的MySQL表(几百万行)完整地流式传输到消息队列中,而不因内存限制而崩溃。

/** @var Chunk $campaignChunk */
foreach ($this->chunksProvider->streamCampaigns($job) as $campaignChunk) {
    try {
        echo $campaignChunk->key() . "\n";

        if ($this->chunksStorage->storeChunk($campaignChunk)) {
            $this->campaignQueue->insert($campaignChunk->serializedValue());
        }
    } catch (StoreChunkException $exception) {
        // Retry the process later
    }
}

ChunksProvider看起来像

/**
 * Continuously yields Campaign Chunk objects.
 *
 * @param Job $job
 *
 * @return \Generator
 */
public function streamCampaigns(Job $job): \Generator
{
    $stream = new Stream();

    $campaignsCollectionGenerator = $stream->stream(
        new StreamConfiguration(self::BATCH_FETCH_SIZE_CAMPAIGNS),
        function ($offset, $limit) use ($job) {
            return $this->campaigns->getAll($job, $offset, $limit);
        }
    );

    foreach ($campaignsCollectionGenerator as $collection) {
        yield new Chunk($collection);
    }
}

并且 $this->campaigns 是一个简单的Repository,它使用 $offset 和 $limit 从MySQL中获取数据。块对象只是一个自定义的DTO。

/**
 * @param Job $job
 * @param int $offset
 * @param int $limit
 *
 * @return LegacyCampaignsCollection
 */
public function getAll(Job $job, int $offset, int $limit) : LegacyCampaignsCollection;

或者它可以是内存中的模拟集合

class DummyCollectionRepository
{
    private $allElements = [];

    /**
     * @param int $elementsCount
     */
    public function __construct(int $elementsCount)
    {
        for ($i = 1; $i <= $elementsCount; $i++) {
            $this->allElements[] = $i;
        }
    }

    /**
     * @param int $offset
     * @param int $limit
     *
     * @return DummyCollection
     */
    public function getAll(int $offset, int $limit): DummyCollection
    {
        $collection      = new DummyCollection();
        $collectionChunk = array_slice($this->allElements, $offset, $limit);

        foreach ($collectionChunk as $element) {
            $collection->addElement($element);
        }

        return $collection;
    }
}

关于实现的一些最后的话

Stream对象使用 可调用对象 获取新的集合,这有点宽松,因为我无法通过接口强制执行可调用对象的参数,尽管这让我感觉像是一个JS开发者,但它给实现方带来了很好的优势+还有一个验证过程,验证传入的参数数量。

能够结合自己的参数调用

function ($offset, $limit) use ($job) {
    return $this->campaigns->getAll($job, $offset, $limit);
}

如果Stream对象在构造函数中需要某种“CollectionRepository”,那么你将不得不在Repository中使用setter、可空属性和其他邪恶的东西,以便使用额外的参数,如查询中的过滤器等。

包含测试。