trkisf2 / remote-collection-stream
该包最新版本(v0.2)没有提供许可信息。
连续将远程集合以用户定义的块的形式流式传输,并使用生成器返回它们。
v0.2
2017-06-21 14:51 UTC
Requires
- php: >=7.1
Requires (Dev)
- phpunit/phpunit: ^6.2
This package is not auto-updated.
Last update: 2024-09-24 17:54:32 UTC
README
连续将远程集合以用户定义的块的形式流式传输,并使用生成器返回它们。
使用方法
$stream = new Stream(); $campaignsCollectionGenerator = $stream->stream( new StreamConfiguration(self::BATCH_FETCH_SIZE_CAMPAIGNS), function ($offset, $limit) use ($job) { return $this->campaigns->getAll($job, $offset, $limit); } );
用例,实现示例
假设我们想要以块的形式将一个较大的MySQL表(几百万行)完整地流式传输到消息队列中,而不因内存限制而崩溃。
/** @var Chunk $campaignChunk */ foreach ($this->chunksProvider->streamCampaigns($job) as $campaignChunk) { try { echo $campaignChunk->key() . "\n"; if ($this->chunksStorage->storeChunk($campaignChunk)) { $this->campaignQueue->insert($campaignChunk->serializedValue()); } } catch (StoreChunkException $exception) { // Retry the process later } }
ChunksProvider看起来像
/** * Continuously yields Campaign Chunk objects. * * @param Job $job * * @return \Generator */ public function streamCampaigns(Job $job): \Generator { $stream = new Stream(); $campaignsCollectionGenerator = $stream->stream( new StreamConfiguration(self::BATCH_FETCH_SIZE_CAMPAIGNS), function ($offset, $limit) use ($job) { return $this->campaigns->getAll($job, $offset, $limit); } ); foreach ($campaignsCollectionGenerator as $collection) { yield new Chunk($collection); } }
并且 $this->campaigns 是一个简单的Repository,它使用 $offset 和 $limit 从MySQL中获取数据。块对象只是一个自定义的DTO。
/** * @param Job $job * @param int $offset * @param int $limit * * @return LegacyCampaignsCollection */ public function getAll(Job $job, int $offset, int $limit) : LegacyCampaignsCollection;
或者它可以是内存中的模拟集合
class DummyCollectionRepository { private $allElements = []; /** * @param int $elementsCount */ public function __construct(int $elementsCount) { for ($i = 1; $i <= $elementsCount; $i++) { $this->allElements[] = $i; } } /** * @param int $offset * @param int $limit * * @return DummyCollection */ public function getAll(int $offset, int $limit): DummyCollection { $collection = new DummyCollection(); $collectionChunk = array_slice($this->allElements, $offset, $limit); foreach ($collectionChunk as $element) { $collection->addElement($element); } return $collection; } }
关于实现的一些最后的话
Stream对象使用 可调用对象 获取新的集合,这有点宽松,因为我无法通过接口强制执行可调用对象的参数,尽管这让我感觉像是一个JS开发者,但它给实现方带来了很好的优势+还有一个验证过程,验证传入的参数数量。
能够结合自己的参数调用
function ($offset, $limit) use ($job) { return $this->campaigns->getAll($job, $offset, $limit); }
如果Stream对象在构造函数中需要某种“CollectionRepository”,那么你将不得不在Repository中使用setter、可空属性和其他邪恶的东西,以便使用额外的参数,如查询中的过滤器等。
包含测试。