jackdpeterson / immutable-state-status-tracker
并行工作处理时状态跟踪的ZF2模块
此包尚未发布版本,可用的信息很少。
README
描述:一个用于在多线程工作环境中跟踪状态的ZF2模块。
- 问题描述:当PHP应用程序需要过渡到处理后台处理时,可能需要跟踪具有多个子步骤的工作负载。实现此类状态跟踪系统可能很复杂,并且会大大增加代码的复杂性。此模块旨在以可预测的方式减少复杂性。
- 解决问题的总体方法:消除管理工人或线程需要提供状态信息时的“共享状态”的复杂性。
为什么不可变性很重要?
- “可变性”的概念(在对象实例化后更改变量的能力)可能导致不可预测的结果——特别是在共享存储系统被使用但工作分布在线程和/或系统之间时。问题可能变成,“哪个线程拥有状态对象的‘正确版本’”?答案可能很快变成,“都不是!”
锁定方案已经存在了一段时间。这不是在重新发明轮子吗?
- 底层数据系统可能支持或可能不支持悲观写锁定。此模块采用一种解决缺乏锁定的方法,即从一开始就不需要锁定。
- 悲观写锁定可能非常慢,并且每当工人需要更新整体状态时都需要管理。即使你有悲观写锁定,这也不一定解决谁拥有正确状态对象的问题。我们又回到了起点!
- 事件推送比追踪单个对象操作错误通过分布式工作负载更容易理解,也更容易诊断。
此模块如何解决管理共享状态的问题?
- 创建一个“单个作业”,其中所有“组件”都预先定义。可以将“组件”视为需要发生的单个任务。如果您使用该模块,则应将此“组件”视为SlmQueue中的“作业”。
- 组件任务(SlmQueue中的作业)在状态更改时推送有关特定组件的状态事件。“组件”和“作业_id”是给定工作负载的输入。
- 作业状态在需要时进行计算。当前状态是从每个组件最新的状态事件汇总推断出来的。
安装
安装此模块的最佳方式是使用composer。如果您愿意/需要,也可以将其克隆到您的模块路径。
require: [
...
"jackdpeterson/immutable-state-status-tracker": "dev-master"
...
]
在config/application.config.php中
return array('modules' =>
array(
...
'ImmutableStateStatusTracker'
...
)
);
处理的概念示例
-
假设我们需要执行大量图片的下载和处理。只有在这些任务完成后,我们的更广泛过程才能进入下一阶段(在这种情况下,通知用户)。
processBigImageCollectionJob(由下载图片和调整图片大小组成)。第二个子任务是所有图片下载和处理完成后通知用户。
任务#1 - 下载一组图片(例如,10,000张图片分为100个图片处理块[与下载ImageCollection任务相关的100个组件])
任务#2 - 一次调整100张图片的大小[与处理Image任务相关的100个组件]。
-
只有调整完成后发送通知电子邮件。
完成#1相对简单……我们将创建一个包含200个组件(每个工作100张图片下载,以及一次批量操作中处理100张图片)的工作。
要完成#2,我们将有一个某种类型的进程来扫描提交的工作的状态。在完成或失败后,可以将监视列表中的工作从列表中删除并采取一些操作。
创建状态跟踪工作
protected $statusTracker;
public function __construct(StatusTrackerServiceInterface $statusTracker) {
$this->statusTracker = $statusTracker;
}
...
public function execute() {
$jobToTrack = $this->statusTracker->createJob(array(
'download_images_[0-9]', // saving space here, but this would literally be 10 entries (one for each respective job)
'resize_and_upload_images_[0-99]', // same story -- except we have 100 entries here (let's assume higher computational complexity)
'notification_email'
));
// divide the list out and submit the 100 downloadAndStoreImageCollection job and pass in the identifier for which task id this is
// submit a status tracking job <-- recurring magic happens here ;-)
$newJob->setContents(array(
'status_job_id' => $job->getJobId(),
'shard_number' => 2,
'collection_of_images_pointer' => 'somethingUseful'
));
}
...
添加第一个状态以标记工作正在进行
protected $statusTracker;
public function __construct(StatusTrackerServiceInterface $statusTracker) {
$this->statusTracker = $statusTracker;
}
public function execute() {
$expectedParams = array(
'collection_of_images_list_pointer' => 'someObjectReference (e.g., REDIS key)',
'shard_id' => 35, // this is the identifier that will be used (effectively the shard key).
'status_job_id' => 'something provided by the previous step'
);
// ADD IN A STATUS EVENT!!
$event = $this->statusTracker->addStatusEvent($job->getJobId(), $job->getComponents()[0], StatusEvent::STATUS_IN_PROGRESS, 'Started downloading collection:' .
$inputParams['shard_id']);
}
检查工作的状态
最后一步由实施者决定;然而,一般来说,人们会调用calculateJob()并处理最后的事件和整体状态来决定最合适的行动方案。
public function execute() {
// adding this because the queue may be very quick and we don't want to introduce lots of repeated jobs for no reason.
sleep(120);
$status = $this->statusTracker->calculateJob()
// Push a new status check event into the queue (recursively run this until it appropriately exits) DANGER!!!.
if ($status->getOverallStatus() == CalculatedJobStatus::STATUS_IN_PROGRESS) {
// maybe check that all downloads are done processing
// --> Do something here (e.g., fire off the notification task)!
}
if ($status->getOverallStatus() == CalculatedJobStatus::STATUS_COMPLETED) {
// fire event and then remove the job?
}
if ($status->getOverallStatus() == CalculatedJobStatus::STATUS_FAILED) {
// maybe send this to a special logging facility to notify devs and collect as much data as possible?
}
}
调用statusTracker->calculateJob()返回一个Entity\Calculatedstatus的实例或抛出异常。
计算状态包含一些变量
- (string) overallStatus,
- (Entity\Job) job,
- (array componentName => Entity\StatusEvent) 这是由createdAt值确定的(最高=最新)。
Job是Entity\Job的实例
常见问题解答(F.A.Q.)
此支持[MySQL、MongoDB、磁盘、网络存储、Redis等]吗?
- 欢迎贡献!现在,话虽如此……默认包含两个适配器
- 磁盘——这是为使用网络存储(NFS、GlusterFS、Ceph等)和本地测试目的而设计的。
- DoctrineORM——这可能是今天最常见用例。
- 使用自己的存储适配器意味着您只需要在存储适配器配置中指定不同的ClassName(请参阅config/ISST_*.php示例)。如果您想提交一个新的,PR欢迎。只需确保它有单元测试支持它!
您推荐哪个模块来处理这些队列?
https://github.com/juriansluiman/SlmQueue - SlmQueue模块
我的计算工作没有以完成事件结束,尽管它是我在数据库、文件系统等中看到的最后一个事件。
这是一个已知问题。状态事件的精度达到秒级(1/60分钟)。如果这在您的代码中频繁发生,那么在提交状态事件之前添加[以下代码]应该可以解决这个问题,直到想出更好的解决方案。
sleep(2);
当我调用$serviceManager->get('immutable-state-status-tracker');时收到了500内部错误...但我不知道出了什么问题!
- 请检查您是否已安装了依赖项(使用composer php composer.phar update)。
- 请检查您是否在config/application.config.php中将ImmutableStateStatusTracker定义为ZF2依赖项。
- 现在,对于StorageAdapter特定的问题...请确保您已将配置(vendor/jackdpeterson/config/ISST_[pick_one].config.php)复制到config/autoload/jobStatus.global.php中。
- 如果您仍然存在问题,请检查您的syslog,因为工厂会将信息发送到那里,包括堆栈跟踪以供进一步诊断。在Ubuntu上运行
$ sudo tail -f /var/log/syslog
我刚刚运行了清理操作,但数据库表仍然是(xyz)吉字节长,但MySQL没有释放空间。怎么回事?
- 这是MySQL的一个已知问题。解决方法是暂停您的工作者(停止注入事件)一段时间,然后重新创建或清理表。重命名表,然后从干净状态重新创建它们。另一种方法是截断isst_status_event表和isst_job表。显然,后者是一种更具破坏性的方法;然而,如果需要,它可以快速释放数据。在这个设计背景下...工作通常是短暂的,不需要超过几天。
问题/评论/贡献?
提交一个问题和/或Pull请求!