pda/

pheanstalk

beanstalkd 队列的 PHP 客户端

v5.0.6 2024-09-05 11:27 UTC

README

Latest Stable Version Total Downloads Scrutinizer Code Quality Code Coverage Build Status

Pheanstalk 5 是一个纯 PHP 8.1+ 客户端,用于与 beanstalkd 工作队列 1.12 及以后的版本一起使用。在 2021/2022/2023 年,它几乎完全重写,以下目标在心中

  • 完全类型化
  • 使用 PHPStan 和 Psalm 通过严格的规则集进行静态分析
  • 将不同的角色分割成独立的模块

使用示例

生产者

use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\TubeName;

$pheanstalk = Pheanstalk::create('127.0.0.1');
$tube       = new TubeName('testtube');

// Queue a Job
$pheanstalk->useTube($tube);
$pheanstalk->put("job payload goes here\n");

$pheanstalk->useTube($tube);
$pheanstalk->put(
    data: json_encode(['test' => 'data'], JSON_THROW_ON_ERROR),
    priority: Pheanstalk::DEFAULT_PRIORITY,
    delay: 30,
    timeToRelease: 60
);

消费者/工作者

use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\TubeName;

$pheanstalk = Pheanstalk::create('127.0.0.1');
$tube       = new TubeName('testtube');

// we want jobs from 'testtube' only.
$pheanstalk->watch($tube);

// this hangs until a Job is produced.
$job = $pheanstalk->reserve();

try {
    $jobPayload = $job->getData();
    // do work.
    
    echo "Starting job with payload: {$jobPayload}\n";

    sleep(2);
    // If it's going to take a long time, periodically
    // tell beanstalk we're alive to stop it rescheduling the job.
    $pheanstalk->touch($job);
    sleep(2);

    // eventually we're done, delete job.
    $pheanstalk->delete($job);
}
catch(\Exception $e) {
    // handle exception.
    // and let some other worker retry.
    $pheanstalk->release($job); 
}

消费者/工作者的 Systemd 配置

请注意,这并不旨在涵盖所有可能的场景或配置。

[Unit]
Description=My App Worker

[Service]
User=deployer
Group=www-data
Restart=always
ExecStart=/usr/bin/php /var/www/html/worker.php

[Install]
WantedBy=multi-user.target

运行测试

确保您已安装 docker-compose。

> composer test

历史

Pheanstalk 4

2018 年,Sam Mousa 承担了维护 Pheanstalk 的责任。

Pheanstalk 4.0 停止支持旧版本的 PHP。其中包含以下更改(以及其他更改)

  • 严格的 PHP 类型提示
  • 作业 ID 的值对象
  • 无副作用函数
  • 停止支持持久连接
  • 增加对多种套接字实现的支持(流扩展、套接字扩展、fsockopen)

停止支持持久连接

持久连接是一种特性,在多个请求之间保持 TCP 连接活跃以减少 TCP 连接设置的开销。在重用 TCP 连接时,我们必须始终保证应用程序协议,在本例中是 beanstalks 的协议处于适当状态。这是很难的,在某些情况下甚至是不可能的;至少这意味着我们必须执行一些导致往返的测试。例如,一个刚刚发送了命令 PUT 0 4000 的连接。beanstalk 服务器现在将读取 4000 个字节,但如果 PHP 脚本在此写入过程中崩溃,下一个请求将分配此 TCP 套接字。现在,为了将连接重置为已知状态,它曾经订阅默认通道:use default。由于 beanstalk 服务器期望 4000 个字节,它将只写入此命令到作业并等待更多字节。

为了防止这类问题,最简单的解决方案是不使用持久连接。

丢弃连接处理

根据所使用的套接字实现,我们可能无法启用 TCP 保持活动状态。如果没有 TCP 保持活动状态,我们就没有办法检测到丢失的连接,底层操作系统可能要等待长达 15 分钟才能决定一个没有发送数据包的 TCP 连接已被断开。当使用支持读取超时的套接字实现时,例如使用套接字扩展的 SocketSocket,我们使用读取和写入超时来检测损坏的连接;beanstalk 协议的问题在于它允许在长时间内不发送数据包。解决方案是捕获这些连接异常并重新连接或使用具有小于读取/写入超时的 reserveWithTimeout()

作业运行器的示例代码可能如下(这是真实的生产代码)

use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\TubeName;

interface Task {

}
interface TaskFactory {
    public function fromData(string $data): Task;
}
interface CommandBus {
    public function handle(Task $task): void;
}

function run(\Pheanstalk\PheanstalkSubscriber $pheanstalk, CommandBus $commandBus, TaskFactory $taskFactory): void
{
    /**
     * @phpstan-ignore-next-line
     */
    while (true) {
        $job = $pheanstalk->reserveWithTimeout(50);
        if (isset($job)) {
            try {

                $task = $taskFactory->fromData($job->getData());
                $commandBus->handle($task);
                echo "Deleting job: {$job->getId()}\n";
                $pheanstalk->delete($job);
            } catch (\Throwable $t) {
                echo "Burying job: {$job->getId()}\n";
                $pheanstalk->bury($job);
            }
        }
    }
}

在此,连接错误将导致进程退出(并由任务管理器重启)。

具有副作用的功能

在版本 4 中,已经删除了具有副作用的功能,例如 putInTube 内部执行了多项操作

  1. 切换到通道
  2. 将作业放入新通道

在这个例子中,管路(tube)的含义发生了变化,这意味着连接现在处于不同的状态。这并不直观,迫使任何连接用户始终需要切换/检查当前管路。这种方法的另一个问题是处理错误更困难。如果发生异常,不清楚我们是否已经切换了管路。

迁移到v4

在大多数情况下,迁移应该相对简单

  • 更改构造函数,可以选择使用静态构造函数,使用DI容器构建依赖项,或者手动实例化它们。
  • 由于reserve()不再接受timeout参数,因此将具有超时的reserve()实例更改为reserveWithTimeout(int $timeout)
  • 运行您的测试,或使用静态分析器测试对不再存在的函数的调用。
  • 确保您处理连接异常(这并不是V4的新功能,只是在V4中,由于默认使用具有读写超时的套接字实现,您将遇到更多的异常)。

Pheanstalk 3

Pheanstalk是一个纯PHP 7.1+客户端,用于beanstalkd工作队列。自2008年底以来,它一直在积极开发,并被许多人用于生产。

Paul Annesley创建,Pheanstalk经过严格的单元测试,并使用封装的、可维护的面向对象设计编写。社区反馈、错误报告和补丁导致了2010年的稳定1.0版本、2013年的2.0版本和2014年的3.0版本。

Pheanstalk 3.0引入了PHP命名空间、PSR-1和PSR-2编码标准,以及PSR-4自动加载标准。

支持beanstalkd的最新版本1.10。实现了beanstalkd 1.3协议文档中指定的所有命令和响应。