pdffiller / qless-php
qless 的 PHP 绑定
Requires
- php: >=7.1 < 8.3
- ext-json: *
- ext-pcntl: *
- ext-pcre: *
- ext-posix: *
- ext-sockets: *
- monolog/monolog: ^1.23 || ^2.0
- predis/predis: ^1.1.10
- psr/log: ^1 || ^2 || ^3
- ramsey/uuid: ^3.7 || ^4
- seld/signal-handler: 1.1.*
Requires (Dev)
- phpstan/phpstan: ^0.12.87
- phpunit/phpunit: ^7.5 || ^8.5 || ^9.0
- squizlabs/php_codesniffer: 3.*
Suggests
- ext-proctitle: Needed to support setting process title as an unprivileged user on macOS
Conflicts
- v3.22.0
- v3.21.0
- v3.20.0
- v3.19.0
- v3.18.0
- v3.17.2
- v3.17.1
- v3.17.0
- v3.16.0
- v3.15.0
- v3.14.2
- v3.14.1
- v3.14.0
- v3.13.0
- v3.12.1
- v3.12.0
- v3.11.1
- v3.11.0
- v3.10.1
- v3.10.0
- v3.9.1
- v3.9.0
- v3.8.3
- v3.8.2
- v3.8.1
- v3.8.0
- v3.7.0
- v3.6.1
- v3.6.0
- v3.5.0
- v3.4.0
- v3.3.0
- v3.2.0
- v3.1.0
- v3.0.0
- v2.2.1
- v2.2.0
- v2.1.0
- dev-master / 2.0.x-dev
- v2.0.1
- 2.0.0
- v2.0.0-rc.3
- v2.0.0-rc.2
- v2.0.0-rc.1
- v2.0.0-beta9
- v2.0.0-beta8
- v2.0.0-beta7
- v2.0.0-beta6
- v2.0.0-beta5
- v2.0.0-beta4
- v2.0.0-beta3
- v2.0.0-beta2
- v2.0.0-beta1
- v2.0.0-alpha7
- v2.0.0-alpha6
- v2.0.0-alpha5
- v2.0.0-alpha4
- v2.0.0-alpha3
- v2.0.0-alpha2
- v2.0.0-alpha1
- v1.0.0
- dev-add_php_8.2_support
- dev-fix-cast-error
- dev-light-core-script
- dev-CORE-3728-tag-list-3-14-1
- dev-YaroslavStryhun-patch-1
- dev-feature/default-reservers-options
- dev-release/4.0.0
This package is auto-updated.
Last update: 2024-08-24 10:43:42 UTC
README
qless 的 PHP 绑定。
Qless 是一个基于 Redis 的强大作业队列系统,灵感来源于 resque,但建立在一系列 Lua 脚本之上,在 qless-core 仓库 中维护。务必查看 变更日志。
感谢我们的 贡献者;你们太棒了!
注意:这个库是对 Contatta's qless-php 的完全重构和独立开发版本。对 Contatta/qless-php 代码的版权属于 Ryver, Inc。更多信息请参阅 Contatta/qless-php 许可证。
文档借鉴自 seomoz/qless。
内容
哲学和命名法
job
是一个通过作业 ID 或 jid
识别的工作单元。一个 queue
可以包含多个作业,这些作业计划在某个时间运行,多个等待运行的作业,以及当前正在运行的作业。一个 worker
是主机上的一个进程,具有唯一标识符,它会从队列中请求作业,执行与该作业相关的某些进程,然后将它标记为完成。当它完成后,它可以放入另一个队列。
作业一次只能在一个队列中。那个队列就是它们最后被放入的队列。所以如果一个工作进程正在处理一个作业,而你移动它,工作进程请求完成作业的请求将被忽略。
作业可以被 canceled
,这意味着它消失在空气中,我们永远不会再注意它。一个作业可以被 dropped
,这意味着一个工作进程未能及时心跳或完成作业,或者一个作业可以被 failed
,这意味着主机识别出关于作业的一些系统性的问题状态。工作进程应该只在错误不太可能是瞬态的情况下失败作业;否则,该工作进程应该只是将其丢弃,并让系统回收它。
特性
- 作业不会掉到地上 — 有时工作进程会丢弃作业。Qless 会自动将其捡起来,并将其交给另一个工作进程。
- 标签/跟踪 — 不同的工作有不同的趣味性。跟踪这些工作以获取它们的进度更新。使用有意义的标识符对工作进行标记,以便在UI中快速找到它们。
- 主题 — 将工作发送到多个队列。
- 工作依赖性 — 一个工作可能需要等待另一个工作完成。
- 统计信息 —
qless
自动记录工作等待处理的时间和它们处理的时间。目前,我们跟踪这些时间的计数、平均值、标准偏差和直方图。 - 工作数据临时存储 — 工作信息会保留一段时间,以便您可以回顾工作历史、数据等。
- 优先级 — 优先级相同的工作按插入顺序弹出;更高的优先级意味着它被弹出的速度更快。
- 重试逻辑 — 每个工作都与一个与之关联的重试次数相关联,当它被放入新队列或完成时,这些次数会更新。如果一个工作反复被丢弃,则假定它存在问题,并将自动失败。
- Web应用 — 随着Ruby客户端的出现,有一个基于Sinatra的Web应用,可以让你控制某些操作问题。
- 计划任务 — 在工作等待指定的延迟(默认为0)之前,工作不能被工人弹出。
- 重复性工作 — 虽然计划很好,但我们还支持需要定期重复的工作。
- 通知 — 随着工作完成、失败、放入、弹出等,跟踪的工作在pubsub频道上触发事件。使用这些事件来获取您感兴趣工作的进度通知。
安装
要求
必需的PHP扩展是
支持的PHP版本是:7.1、7.2、7.3、8.0和8.1。
Qless PHP可以通过Composer安装
composer require pdffiller/qless-php
或者,从GitHub检出qless-php以源代码形式安装
git clone git://github.com/pdffiller/qless-php.git
cd qless-php
composer update
注意:master
分支将始终包含最新的不稳定版本。如果您想检查旧版本或正式的标记版本,请切换到相关的版本。
adm
目录包含对系统管理员有用的配置示例。
使用方法
入队作业
首先,创建一个Qless客户端。客户端接受所有在构建Predis\Client客户端时使用的相同参数。
use Qless\Client; // Connect to localhost $client1 = new Client(); // Connect to somewhere else $client2 = new Client('127.0.0.99:1234');
工作应该是定义了perform
方法的类,该方法必须接受一个单个的Qless\Jobs\BaseJob
参数
use Qless\Jobs\BaseJob; class MyJobClass { /** * @param BaseJob $job Is an instance of `Qless\Jobs\BaseJob` and provides access * to the payload data via `$job->getData()`, a means to cancel * the job (`$job->cancel()`), and more. */ public function perform(BaseJob $job): void { // ... echo 'Perform ', $job->getId(), ' job', PHP_EOL; $job->complete(); } }
现在您可以访问队列,并将工作添加到该队列中。
/** * This references a new or existing queue 'testing'. * @var \Qless\Queues\Queue $queue * @var \Qless\Client $client */ $queue = $client->queues['testing']; // Let's add a job, with some data. Returns Job ID $jid = $queue->put(MyJobClass::class, ['hello' => 'howdy']); // $jid here is "696c752a706049cdb227a9fcfe9f681b" /** * Now we can ask for a job. * @var \Qless\Jobs\BaseJob $job */ $job = $queue->pop(); // And we can do the work associated with it! $job->perform(); // Perform 316eb06a30d24d66ad0d33361306a7a1 job
工作数据必须是可序列化为JSON的,并建议您使用散列。下面是支持的作业选项列表。
queue->put()
返回的参数是jid
(工作ID)。每个Qless工作都有一个唯一的jid
,它提供了与现有工作交互的手段
/** * Find an existing job by it's JID * @var string $jid * @var \Qless\Client $client */ $job = $client->jobs[$jid]; // query it to find out details about it: $job->jid; // the job id $job->klass; // the class of the job $job->queue; // the queue the job is in $job->data; // the data for the job $job->history; // the history of what has happened to the job so far $job->dependencies; // the jids of other jobs that must complete before this one $job->dependents; // the jids of other jobs that depend on this one $job->priority; // the priority of this job $job->worker; // the internal worker name (usually consumer identifier) $job->tags; // array of tags for this job $job->expires; // when you must either check in with a heartbeat or turn it in as completed $job->remaining; // the number of retries remaining for this job $job->retries; // the number of retries originally requested $job->tracked; // is job flagged as important $job->failed; // is job flagged as failed // there is a way to get seconds remaining before this job will timeout: $job->ttl(); // you can also change the job in various ways: $job->requeue('some_other_queue'); // move it to a new queue $job->cancel(); // cancel the job $job->tag('foo'); // add a tag $job->untag('foo'); // remove a tag $job->track(); // start tracking current job $job->untrack(); // stop tracking current job
运行工作进程
Qless PHP的分叉工作进程受到了Resque工作进程的极大启发,但由于qless-core lua脚本的强大功能,它要简单得多,您可以编写自己的或使用包含的非分叉工作进程(例如,如果您更愿意通过不为每个工作分叉工作进程来节省内存)。
分叉工作进程
与resque类似...
- 工作进程为每个任务启动一个子进程,以便提供对内存泄漏的容错能力(通过传递
RUN_AS_SINGLE_PROCESS
环境变量来强制Qless不启动子进程。单进程模式应仅在某些测试/开发环境中使用。) - 工作进程更新其进程名以反映其状态,因此您可以使用
ps
查看正在执行的工作进程。 - 工作进程注册了信号处理器,以便您可以发送信号来控制它。
- 工作进程会获得一个队列列表,从中取出作业。
- 工作进程根据传递给工作进程的
Psr\Log\LoggerInterface
实例的设置来记录输出。
Resque使用队列来表示优先级。相比之下,qless内置了优先级支持。因此,工作进程支持两种从队列中弹出作业的策略:有序和轮询。有序预留器会从第一个队列中持续弹出作业,直到该队列为空,然后尝试从第二个队列中弹出作业。轮询预留器会从第一个队列中弹出作业,然后是第二个队列,依此类推。您也可以轻松实现自己的。
要启动工作进程,编写一些实例化工作进程并运行它的PHP代码。例如,您可以编写一个简单的脚本来完成此操作。
// The autoloader line is omitted use Qless\Client; use Qless\Jobs\Reservers\OrderedReserver; use Qless\Workers\ForkingWorker; // Create a client $client = new Client(); // Get the queues you use. // // Create a job reserver; different reservers use different // strategies for which order jobs are popped off of queues $reserver = new OrderedReserver($client->queues, ['testing', 'testing-2', 'testing-3']); $worker = new ForkingWorker($reserver, $client); $worker->run();
非分叉工作进程
Qless PHP还包括一个非分叉工作进程。这可以减轻与外部连接(如Redis或MySQL)相关的问题,并且在使用systemd等外部服务管理器管理工作进程时可能给出更好的结果。使用方法与分叉工作进程非常相似。
// The autoloader line is omitted use Qless\Client; use Qless\Jobs\Reservers\OrderedReserver; use Qless\Workers\SimpleWorker; // Create a client $client = new Client(); // Get the queues you use. // // Create a job reserver; different reservers use different // strategies for which order jobs are popped off of queues $reserver = new OrderedReserver($client->queues, ['testing', 'testing-2', 'testing-3']); $worker = new SimpleWorker($reserver, $client); $worker->run();
信号处理
父进程支持以下POSIX兼容信号。
TERM
:立即关闭,停止处理作业。INT
:立即关闭,停止处理作业。QUIT
:在当前作业处理完成后关闭。USR1
:分叉工作进程:立即杀死分叉的子进程,继续处理作业。USR1
:非分叉工作进程:立即放弃当前作业的进度,继续处理作业。USR2
:不要处理任何新作业,并转储当前回溯。CONT
:在USR2
后再次开始处理作业。
有关信号的详细信息,请参阅signal(7)
。
当使用分叉工作进程时,应向主进程发送这些信号,而不是子进程。
子进程支持USR2
信号,该信号会导致它转储其当前回溯。
当使用非分叉工作进程时,需要正确处理USR1
信号,这意味着不得捕获\Qless\Exceptions\SimpleWorkerContinuationException
类的异常。如果您的工作进程类或作业执行者捕获所有异常或所有可抛出的对象,则需要重新抛出\Qless\Exceptions\SimpleWorkerContinuationException
实例,否则USR1
信号将被忽略。
如果您想更改处理信号的方式,可以子类化您想要使用的Worker
类,并覆盖handleSignal
方法。
作业预留
存在不同的作业预留器。
DefaultReserver
:默认作业预留器。OrderedReserver
:按队列名称对队列进行排序。PriorityReserver
:按优先级对队列进行排序。RoundRobinReserver
:轮询所有提供的队列。ShuffledRoundRobin
:类似于RoundRobinReserver,但会打乱队列的顺序。
自定义作业处理器
可以设置自定义作业处理器来处理作业。为此,请调用 \Qless\Workers\WorkerInterface::registerJobPerformHandler
方法。它的参数应该实现 \Qless\Jobs\PerformAwareInterface
接口。当作业处理器是一个复杂的服务且有依赖关系时,这种方法很方便。让我们看看一个示例,其中我们需要通过外部工厂创建自定义作业处理器。
use Qless\Jobs\Reservers\OrderedReserver; use Qless\Workers\ForkingWorker; /** * @var \Qless\Client $client * @var object $jobHandlerFactory is some complicated factory which knows how to create Job Handler */ $jobHandler = $jobHandlerFactory->createJobHandler(); $reserver = new OrderedReserver($client->queues, 'my-queue'); $worker = new ForkingWorker($reserver, $client); $worker->registerJobPerformHandler($jobHandler); $worker->run();
Web 界面
Qless PHP 不附带 Web 应用程序。然而,由 seomoz/qless 提供了一个受 resque 启发的 Web 应用程序。此外,你可以利用基于 docker 的仪表板。我们计划使用 PHP 框架创建一个强大而优雅的 Web 界面,但这不是最高优先级。
作业依赖
假设你有一个作业依赖于另一个,但任务定义本质上不同。你需要烤一只火鸡,你需要做填充料,但你不能在填充料做好之前烤火鸡。
/** * @var \Qless\Queues\Queue $queue * @var \Qless\Client $client */ $queue = $client->queues['cook']; $jid = $queue->put(MakeStuffing::class, ['lots' => 'of butter']); $queue->put( MakeTurkey::class, // The class with the job perform method. ['with' => 'stuffing'], // An array of parameters for job. null, // The specified job id, if not a specified, a jid will be generated. null, // The specified delay to run job. null, // Number of retries allowed. null, // A greater priority will execute before jobs of lower priority. null, // An array of tags to add to the job. [$jid] // A list of JIDs this job must wait on before executing. );
当填充料作业完成后,火鸡作业被解锁,可以自由处理。
优先级
某些作业需要比其他作业更早地弹出。无论是故障单,还是调试,当你将作业放入队列时,你可以轻松地做到这一点。
/** @var \Qless\Queues\Queue $queue */ $queue->put(MyJobClass::class, ['foo' => 'bar'], null, null, null, 10);
当你想调整一个正在队列中等待的作业的优先级时,会发生什么?
/** @var \Qless\Client $client */ $job = $client->jobs['0c53b0404c56012f69fa482a1427ab7d']; // Now this will get popped before any job of lower priority. $job->priority = 10;
计划作业
如果你不想立即运行作业,而想在未来的某个时间运行,你可以指定一个延迟。
/** * Run at least 10 minutes from now. * * @var \Qless\Queues\Queue $queue */ $queue->put(MyJobClass::class, ['foo' => 'bar'], null, 600);
这并不能保证作业会在正好 10 分钟后运行。你可以通过更改作业的优先级来实现这一点,一旦 10 分钟过去,它就会被放在优先级较低的作业之前。
/** * Run in 10 minutes. * * @var \Qless\Queues\Queue $queue */ $queue->put(MyJobClass::class, ['foo' => 'bar'], null, 600, null, 100);
重复作业
有时仅仅安排一个作业是不够的,你可能想定期运行作业。特别是,你可能有一些批处理操作,需要每小时运行一次,而你并不关心哪个工作器运行它。重复作业的指定方式与其他作业类似。
/** * Run every hour. * * @var \Qless\Queues\Queue $queue */ $jid = $queue->recur(MyJobClass::class, ['widget' => 'warble'], 3600); // $jid here is "696c752a706049cdb227a9fcfe9f681b"
你甚至可以用与普通作业相同的方式访问它们。
/** * @var \Qless\Client $client * @var \Qless\Jobs\RecurringJob $job */ $job = $client->jobs['696c752a706049cdb227a9fcfe9f681b'];
更改实际运行间隔是微不足道的。
/** * I think I only need it to run once every two hours. * * @var \Qless\Jobs\RecurringJob $job */ $job->interval = 7200;
如果你想每小时整点运行它,但现在时间是 2:37,你可以指定一个偏移量,这是第一次弹出作业之前应该等待的时间。
/** * 23 minutes of waiting until it should go. * * @var \Qless\Queues\Queue $queue */ $queue->recur(MyJobClass::class, ['howdy' => 'hello'], 3600, 23 * 60);
重复作业也有优先级,可配置的重试次数和标签。这些设置不适用于重复作业,而是适用于它们创建的作业。在超过一个间隔之后工作器尝试弹出作业的情况下,将创建多个作业。这种想法是,虽然它是完全客户端管理的,但状态不应该依赖于工作器尝试弹出作业的频率。
/** * Recur every minute. * * @var \Qless\Queues\Queue $queue */ $queue->recur(MyJobClass::class, ['lots' => 'of jobs'], 60); // Wait 5 minutes $jobs = $queue->pop(null, 10); echo count($jobs), ' jobs got popped'; // 5 jobs got popped
主题
主题可以帮助你将作业放入不同的队列。首先,你必须创建订阅。你可以使用模式作为主题的名称。符号 *
- 一个单词,#
- 由点 .
分隔的几个单词。例如:first.second.*
,*.second.*
,#.third
。
/** * Subscribe * * @var \Qless\Queues\Queue $queue1 * @var \Qless\Queues\Queue $queue2 * @var \Qless\Queues\Queue $queue3 */ $queue1->subscribe('*.*.apples'); $queue2->subscribe('big.*.apples'); $queue3->subscribe('#.apples');
然后你可以将作业放入所有订阅者。
/** * Put to few queues * * @var \Qless\Topics\Topic * @var \Qless\Client $client */ use Qless\Topics\Topic; $topic = new Topic('big.green.apples', $client); $topic->put('ClassName', ['key' => 'value']); // Put to $queue1, $queue2 and $queue3
你可以调用所有 Queue 的公共方法来处理主题。
配置选项
你可以获取和设置全局(即在同一 Redis 实例的上下文中)配置,以更改心跳行为等。配置选项并不多,但一个重要的选项是作业数据保留多长时间。作业数据在完成 jobs-history
秒后过期,但限制为最后 jobs-history-count
个完成的作业。这些默认为 50k 个作业和 30 天,但根据数量,你的需求可能不同。还可以通过使用 jobs-failed-history
参数配置历史中的失败作业。为了仅保留最后 500 个作业最多 7 天和失败作业最多 3 天。
/** @var \Qless\Client $client */ $client->config['jobs-history'] = 7 * 86400; $client->config['jobs-history-count'] = 500; $client->config['jobs-failed-history'] = 3 * 86400;
标签/跟踪
在qless中,“跟踪”意味着标记一个作业为重要。跟踪的作业在网页界面中有一个专用的标签页,它们在进度推进时也会发出可订阅的事件(下面将详细介绍)。您可以通过网页界面或相应的代码来标记作业。
/** @var \Qless\Client $client */ $client->jobs['b1882e009a3d11e192d0b174d751779d']->track();
作业可以用字符串标记,这些字符串可用于快速搜索。例如,作业可能与客户账户或其他对您的项目有意义的键相关联。
/** @var \Qless\Queues\Queue $queue */ $queue->put(MyJobClass::class, ['tags' => 'aplenty'], null, null, null, null, ['12345', 'foo', 'bar']);
这使得它们在网页界面或代码中都可搜索。
/** @var \Qless\Client $client */ $jids = $client->jobs->tagged('foo');
您也可以随意添加或删除标签。
/** * @var \Qless\Client $client * @var \Qless\Jobs\BaseJob $job */ $job = $client->jobs['b1882e009a3d11e192d0b174d751779d']; $job->tag('howdy', 'hello'); $job->untag('foo', 'bar');
通知
跟踪的作业在发生特定事情时会向特定的pubsub通道发出事件。无论是从队列中弹出、被工作者完成等。
以下是一个例子
/** * @var \Qless\Client $client * @var \Qless\PubSub\Manager $events */ $events = $client->events; $events->on( Qless\PubSub\Manager::EVENT_COMPLETED, function (string $jid) { echo "{$jid} completed"; } ); $events->listen();
熟悉redis pubsub的人会注意到,一旦开始监听,redis连接就只能用于pubsub-y命令。因此,调用Client->events
实际上会创建第二个连接,以便Client
仍然可以像平常一样使用。
/** * @var \Qless\Client $client * @var \Qless\PubSub\Manager $events */ $events = $client->events; $events->on( Qless\PubSub\Manager::EVENT_FAILED, function (string $jid) use ($client) { echo "{$jid} failed in {$client->jobs[$jid]->queue}"; } ); $events->listen();
可能的事件类型与Qless-core定义的类型匹配,并在\Qless\PubSub\Manager
类中定义为常量:EVENT_CANCELED
、EVENT_COMPLETED
、EVENT_FAILED
、EVENT_POPPED
、EVENT_STALLED
、EVENT_PUT
、EVENT_TRACK
、EVENT_UNTRACK
。
事件系统
Qless还有一个基本的事件系统,您的应用程序可以使用它来自定义一些qless内部的行为。事件可以在子进程中处理单个作业之前、之后或周围注入逻辑。例如,当您需要为每个作业重新建立数据库连接时,这可能很有用。
事件有几个主要概念 - entity
、happening
、source
。
entity
是事件发生的对象类型(组件),例如entity
可以是job
、worker
、queue
。happening
是正在发生的行为,例如可以是beforeFork
或beforePerform
。source
是触发事件的主体。
在代码中,事件通过某些类表示。所有事件类都是\Qless\Events\User\AbstractEvent
类的后代。您可以通过调用静态方法getEntityName()
和getHappening()
来获取事件的entity
和happening
,您可以通过调用静态方法getName()
来获取事件的完整名称(由entity
和happening
组成),您可以通过调用getSource()
来获取source
。
此外,还有事件订阅者。订阅者可以是任何类,其方法名称与事件的happening
相匹配(例如:beforeFork(AbstractEvent $event)
)。订阅者也可以是闭包。订阅者的处理方法将只接收一个参数 - 事件,您可以从该事件中获取所需的所有数据。
您可以将订阅者附加到特定事件或事件组(通过事件entity
分组)。
示例:定义一个具有beforeFork
方法的事件,该方法将在您希望处理作业的地方被调用。
use Acme\Database\Connection; use Qless\Events\User\AbstractEvent; class ReEstablishDBConnection { private $connection; public function __construct(Connection $connection) { $this->connection = $connection; } public function beforeFork(AbstractEvent $event): void { $this->connection->connect(); } }
然后,将订阅者附加到worker
事件组。
use Qless\Events\User\Worker\AbstractWorkerEvent; /** @var \Qless\Workers\ForkingWorker $worker */ $worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new ReEstablishDBConnection());
要将订阅者附加到特定事件,您可以这样做
use \Qless\Events\User\Worker\BeforeFork; /** @var \Qless\Workers\ForkingWorker $worker */ $worker->getEventsManager()->attach(BeforeFork::getName(), new ReEstablishDBConnection());
您可以添加任意数量的订阅者。Qless事件系统支持优先级,因此您可以更改默认优先级。
use Qless\Events\User\Worker\AbstractWorkerEvent; /** @var \Qless\Workers\ForkingWorker $worker */ $worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new MySubscriber1(), 150); // More priority $worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new MySubscriber2(), 100); // Normal priority $worker->getEventsManager()->attach(AbstractWorkerEvent::getEntityName(), new MySubscriber10(), 50); // Less priority
每个作业事件
如上所述,Qless支持基于每个实体的事件。因此,如果有一些正交逻辑需要在某些(但不是所有)作业的上下文中运行,则可以提供每个作业的事件。每个作业事件类都是\Qless\Events\User\Job\AbstractJobEvent
的后代,并包含\Qless\Jobs\BaseJob
的实体。要使用$event->getJob()
方法从事件中获取作业。
每个作业的订阅可以像工作者订阅一样定义。
use Qless\Events\User\Job\BeforePerform; use Qless\Jobs\BaseJob; use Qless\Jobs\PerformAwareInterface; use My\Database\Connection; class ReEstablishDBConnection { private $connection; public function __construct(Connection $connection) { $this->connection = $connection; } /** * @param BeforePerform $event * @param BaseJob|PerformAwareInterface $source */ public function beforePerform(BeforePerform $event, $source): void { $this->connection->connect(); } }
要将它们添加到工作类中,首先需要通过订阅所需的事件组使工作类具备事件感知能力。为此,只需实现 setUp
方法并订阅所需的事件。
use Qless\Events\User\Job\AbstractJobEvent; use Qless\Jobs\BaseJob; use Qless\EventsManagerAwareInterface; use Qless\EventsManagerAwareTrait; class EventsDrivenJobHandler implements EventsManagerAwareInterface { use EventsManagerAwareTrait; public function setUp() { $this->getEventsManager()->attach(AbstractJobEvent::getEntityName(), new ReEstablishDBConnection()); } public function perform(BaseJob $job): void { // ... $job->complete(); } }
注意:在此场景中,工作类必须实现 Qless\EventsManagerAwareInterface
。
另一个例子。假设作业的有效负载应该始终包含来自当前上下文的数据。您可以使用 BeforeEnqueue
订阅者轻松修改它。
use Qless\Events\User\Queue\BeforeEnqueue; /** @var \Qless\Client $client */ $client ->getEventsManager() ->attach('queue:beforeEnqueue', function (BeforeEnqueue $event) { $event->getData()['metadata'] = [ 'server_address' => $_SERVER['SERVER_ADDR'], ]; });
事件列表
Qless 中可用的完整事件列表
同步作业处理
如果您希望作业在没有工作者的情况下处理,您可以设置 qless 客户端的同步模式。在您的项目配置中编写如下代码:
/** @var \Qless\Client $client */ $client->config->set('sync-enabled', true);
现在所有作业都将同步处理,无需工作者。
注意:请将此功能用于开发环境中的作业测试。
心跳
当工作者获得作业时,它将获得对该作业的独占锁。这意味着只要工作者在作业上检查进度,该作业就不会分配给任何其他工作者。默认情况下,作业必须每 60 秒报告进度或完成,但这是一个可配置的选项。对于更长的作业,这可能没有意义。
$job = $queue->pop(); // How long until I have to check in? $job->ttl(); // 59 // ... public function perform(BaseJob $job): void { // some code // if job need more time $job->heartbeat(); // some code $job->complete(); } // ...
如果您想设置所有队列的心跳:
// Set 10 minutes $client->config->set('heartbeat', 600);
此外,您还可以为单独的队列设置心跳
$client->queues['test-queue']->heartbeat = 120;
统计数据
qless
的一个很好的功能是可以获取使用情况统计信息。统计数据按天汇总,因此当您需要有关队列的统计数据时,您需要说明是哪个队列以及您谈论的是哪一天。默认情况下,您只需获取今天的统计数据。这些统计数据包括平均作业等待时间、标准偏差和直方图等信息。这些相同的数据也适用于作业完成情况。
// Today stat $client->stats('queue_name', time()); // {"run":{"std":0.027175949738075,"histogram":[...],"mean":0.011884652651273,"count":26},"failures":0,"retries":0,"failed":0,"wait":{"std":56188.180755369,"histogram":[...],"mean":32870.757469205,"count":26}}
时间
Redis 不允许在您将要进行任何数据操作时访问系统时间。然而,Qless 有心跳功能。当发出最多请求的客户端实际上发送当前时间时,所以所有工作者都必须同步。
确保作业唯一性
Qless 自动生成作业 ID,但您可以手动设置。
// automatically $queue->put($className, $data); // manually $queue->put($className, $data, 'abcdef123456');
例如,作业 ID 可以基于 className 和 payload。这将保证 Qless 不会有具有相同类和数据的多份作业。这也帮助在开发环境中进行调试。
设置默认作业选项
- jid
- 延迟
- 优先级
- 标签
- 重试
- 依赖
所有这些选项都有默认值。您还可以直接在工作类上定义默认作业选项。
$queue->put( Job::class, // Class - require ['key1' => 'value1'], // Payload - require 'custom-job-id', // Manually id 10, // Delay 10 seconds 3, // Three retries 7, // Priority ['important', 'media'], // Tags [$jidFirst, $jidSecond] // Depends jobs );
测试作业
您可以使用 同步 来处理测试中的作业。在这种制度下,所有作业将立即运行。
贡献和开发
请参阅 CONTRIBUTING.md。
许可证
qless-php 是开源软件,许可协议为 MIT 许可证。有关更多信息,请参阅 LICENSE.txt
文件。
© 2018-2022 PDFfiller
© 2013-2015 Ryver, Inc
版权所有。