pmg/queue-pheanstalk

适用于 Pheanstalk 和 Beanstalkd 的 pmg/queue 驱动实现

v6.0.0 2023-01-12 15:59 UTC

README

一个由 pmg/queue 支持的驱动程序,后端为 PheanstalkBeanstalkd

有关整个队列系统工作方式的文档,请参阅 pmg/queue readme

有关如何将所有内容连接起来的示例,请参阅 examples 目录。

快速示例

Pheanstalk 是一个 PHP 库,用于与 Beanstalkd 交互。 PheanstalkDriver 允许您利用 Beanstalkd 作为队列后端。

use Pheanstalk\Pheanstalk;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Driver\PheanstalkDriver;
use PMG\Queue\Serializer\NativeSerializer;
use PMG\Queue\Serializer\SigningSerializer;

// ...

$serilizer = new NativeSerializer('this is the secret key');

$driver = new PheanstalkDriver(new \Pheanstalk\Pheanstalk('localhost'), $serializer, [
    // how long easy message has to execute in seconds
    'ttr'               => 100,

    // the "priority" of the message. High priority messages are
    // consumed first.
    'priority'          => 1024,

    // The delay between inserting the message and when it
    // becomes available for consumption
    'delay'             => 0,

    // The ttr for retries jobs
    'retry-ttr'         => 100,

    // the priority for retried jobs
    'retry-priority'    => 1024,

    // When jobs fail, they are "burieds" in beanstalkd with this priority
    'fail-priority'     => 1024,

    // A call to `dequeue` blocks for this number of seconds. A zero or
    // falsy value will block until a job becomes available
    'reserve-timeout'   => 10,
]);

// $handler instanceof PMG\Queue\MessageHandler
$consumer = new DefaultConsumer($driver, $handler);

处理失败的消息

默认情况下,PheanstalkDriver 会将传递给 PheanstalkDriver::fail 的任何消息 埋藏。如果没有其他问责制措施在您的队列系统中,这通常是一件好事。

尽管如此,pmg/queue 在将工作项踢回就绪状态方面对您没有任何帮助。如果您的队列实现周围有其他问责制措施,并且您希望仅在重试后删除失败的消息,请使用不同的 FailureStrategy

use Pheanstalk\Pheanstalk;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Driver\PheanstalkDriver;
use PMG\Queue\Driver\Pheanstalk\DeleteFailureStrategy;
use PMG\Queue\Serializer\NativeSerializer;

// ...

$serilizer = new NativeSerializer('this is the secret key');
$failureStrategy = new DeleteFailureStrategy();

$driver = new PheanstalkDriver(new \Pheanstalk\Pheanstalk('localhost'), $serializer, [
    // as above
], $failureStrategy);

// $handler instanceof PMG\Queue\MessageHandler
$consumer = new DefaultConsumer($driver, $handler);

如果需要不同的解决方案,请随意实现 PMG\Queue\Driver\Pheanstalk\FailureStrategy