pda / pheanstalk
beanstalkd 队列的 PHP 客户端
Requires
- php: >=8.1.0
- ext-mbstring: *
Requires (Dev)
Suggests
- ext-sockets: Socket implementation works best for long running processes
- v5.x-dev
- v5.0.6
- v5.0.5
- v5.0.4
- v5.0.3
- v5.0.2
- v5.0.1
- v5.0.0
- v5.0.0-rc.1
- v5.0.0-beta.2
- v5.0.0-beta.1
- v5.0.0-alpha2
- v5.0.0-alpha1
- v4.x-dev
- v4.0.5
- v4.0.4
- v4.0.3
- v4.0.2
- v4.0.1
- v4.0.0
- v4.0.0-alpha.3
- v4.0.0-alpha.2
- v4.0.0-alpha.1
- v3.2.1
- v3.2.0
- v3.1.0
- v3.0.2
- v3.0.1
- v3.0.0
- v2.1.1
- v2.1.0
- v2.0.0
- v2.0.0-rc1
- dev-fix-readme-snippets
- dev-fix-ci
- dev-chore-code-cleanup
This package is auto-updated.
Last update: 2024-09-05 11:29:34 UTC
README
Pheanstalk 5 是一个纯 PHP 8.1+ 客户端,用于与 beanstalkd 工作队列 1.12 及以后的版本一起使用。在 2021/2022/2023 年,它几乎完全重写,以下目标在心中
- 完全类型化
- 使用 PHPStan 和 Psalm 通过严格的规则集进行静态分析
- 将不同的角色分割成独立的模块
使用示例
生产者
use Pheanstalk\Pheanstalk; use Pheanstalk\Values\TubeName; $pheanstalk = Pheanstalk::create('127.0.0.1'); $tube = new TubeName('testtube'); // Queue a Job $pheanstalk->useTube($tube); $pheanstalk->put("job payload goes here\n"); $pheanstalk->useTube($tube); $pheanstalk->put( data: json_encode(['test' => 'data'], JSON_THROW_ON_ERROR), priority: Pheanstalk::DEFAULT_PRIORITY, delay: 30, timeToRelease: 60 );
消费者/工作者
use Pheanstalk\Pheanstalk; use Pheanstalk\Values\TubeName; $pheanstalk = Pheanstalk::create('127.0.0.1'); $tube = new TubeName('testtube'); // we want jobs from 'testtube' only. $pheanstalk->watch($tube); // this hangs until a Job is produced. $job = $pheanstalk->reserve(); try { $jobPayload = $job->getData(); // do work. echo "Starting job with payload: {$jobPayload}\n"; sleep(2); // If it's going to take a long time, periodically // tell beanstalk we're alive to stop it rescheduling the job. $pheanstalk->touch($job); sleep(2); // eventually we're done, delete job. $pheanstalk->delete($job); } catch(\Exception $e) { // handle exception. // and let some other worker retry. $pheanstalk->release($job); }
消费者/工作者的 Systemd 配置
请注意,这并不旨在涵盖所有可能的场景或配置。
[Unit]
Description=My App Worker
[Service]
User=deployer
Group=www-data
Restart=always
ExecStart=/usr/bin/php /var/www/html/worker.php
[Install]
WantedBy=multi-user.target
运行测试
确保您已安装 docker-compose。
> composer test
历史
Pheanstalk 4
2018 年,Sam Mousa 承担了维护 Pheanstalk 的责任。
Pheanstalk 4.0 停止支持旧版本的 PHP。其中包含以下更改(以及其他更改)
- 严格的 PHP 类型提示
- 作业 ID 的值对象
- 无副作用函数
- 停止支持持久连接
- 增加对多种套接字实现的支持(流扩展、套接字扩展、fsockopen)
停止支持持久连接
持久连接是一种特性,在多个请求之间保持 TCP 连接活跃以减少 TCP 连接设置的开销。在重用 TCP 连接时,我们必须始终保证应用程序协议,在本例中是 beanstalks 的协议处于适当状态。这是很难的,在某些情况下甚至是不可能的;至少这意味着我们必须执行一些导致往返的测试。例如,一个刚刚发送了命令 PUT 0 4000
的连接。beanstalk 服务器现在将读取 4000 个字节,但如果 PHP 脚本在此写入过程中崩溃,下一个请求将分配此 TCP 套接字。现在,为了将连接重置为已知状态,它曾经订阅默认通道:use default
。由于 beanstalk 服务器期望 4000 个字节,它将只写入此命令到作业并等待更多字节。
为了防止这类问题,最简单的解决方案是不使用持久连接。
丢弃连接处理
根据所使用的套接字实现,我们可能无法启用 TCP 保持活动状态。如果没有 TCP 保持活动状态,我们就没有办法检测到丢失的连接,底层操作系统可能要等待长达 15 分钟才能决定一个没有发送数据包的 TCP 连接已被断开。当使用支持读取超时的套接字实现时,例如使用套接字扩展的 SocketSocket
,我们使用读取和写入超时来检测损坏的连接;beanstalk 协议的问题在于它允许在长时间内不发送数据包。解决方案是捕获这些连接异常并重新连接或使用具有小于读取/写入超时的 reserveWithTimeout()
。
作业运行器的示例代码可能如下(这是真实的生产代码)
use Pheanstalk\Pheanstalk; use Pheanstalk\Values\TubeName; interface Task { } interface TaskFactory { public function fromData(string $data): Task; } interface CommandBus { public function handle(Task $task): void; } function run(\Pheanstalk\PheanstalkSubscriber $pheanstalk, CommandBus $commandBus, TaskFactory $taskFactory): void { /** * @phpstan-ignore-next-line */ while (true) { $job = $pheanstalk->reserveWithTimeout(50); if (isset($job)) { try { $task = $taskFactory->fromData($job->getData()); $commandBus->handle($task); echo "Deleting job: {$job->getId()}\n"; $pheanstalk->delete($job); } catch (\Throwable $t) { echo "Burying job: {$job->getId()}\n"; $pheanstalk->bury($job); } } } }
在此,连接错误将导致进程退出(并由任务管理器重启)。
具有副作用的功能
在版本 4 中,已经删除了具有副作用的功能,例如 putInTube
内部执行了多项操作
- 切换到通道
- 将作业放入新通道
在这个例子中,管路(tube)的含义发生了变化,这意味着连接现在处于不同的状态。这并不直观,迫使任何连接用户始终需要切换/检查当前管路。这种方法的另一个问题是处理错误更困难。如果发生异常,不清楚我们是否已经切换了管路。
迁移到v4
在大多数情况下,迁移应该相对简单
- 更改构造函数,可以选择使用静态构造函数,使用DI容器构建依赖项,或者手动实例化它们。
- 由于
reserve()
不再接受timeout
参数,因此将具有超时的reserve()
实例更改为reserveWithTimeout(int $timeout)
。 - 运行您的测试,或使用静态分析器测试对不再存在的函数的调用。
- 确保您处理连接异常(这并不是V4的新功能,只是在V4中,由于默认使用具有读写超时的套接字实现,您将遇到更多的异常)。
Pheanstalk 3
Pheanstalk是一个纯PHP 7.1+客户端,用于beanstalkd工作队列。自2008年底以来,它一直在积极开发,并被许多人用于生产。
由Paul Annesley创建,Pheanstalk经过严格的单元测试,并使用封装的、可维护的面向对象设计编写。社区反馈、错误报告和补丁导致了2010年的稳定1.0版本、2013年的2.0版本和2014年的3.0版本。
Pheanstalk 3.0引入了PHP命名空间、PSR-1和PSR-2编码标准,以及PSR-4自动加载标准。
支持beanstalkd的最新版本1.10。实现了beanstalkd 1.3协议文档中指定的所有命令和响应。