fulfillment/laravel-triaged-queues

扩展 Laravel 5 队列,在无法访问时自动切换到任意数量的主机。

0.4.1 2021-03-18 14:20 UTC

README

扩展 Laravel 5.1 队列,在无法访问时自动切换到任意数量的主机。

此包通过添加以下功能扩展 Laravel 5.1 队列

  • 在主主机不可达时,依次尝试连接到多个主机。
  • 如果所有连接的主机都失败,则启用回退到 sync 驱动程序,以便同步处理作业/命令,从而避免数据丢失。
  • (Beanstalkd 仅限) 在作业和套接字超时方面,可以在队列配置中指定额外的 touch() pheanstalk 命令。

安装

需要此包

composer require "fulfillment/laravel-triaged-queues:0.1.*"

添加包后,将 ServiceProvider 添加到 config/app.php 中的 providers 数组

Fulfillment\TriagedQueues\TriagedQueueServiceProvider::class,

并在 config/app.php 中删除 Laravel 的 QueueServiceProvider(如果存在)

'Illuminate\Queue\QueueSeverProvider'

支持的驱动程序

目前仅支持 beanstalkd 驱动程序。欢迎为其他驱动程序提交 PR。

用法

config/queue.php 中的常规队列连接看起来像这样

'beanstalkd' => [
    'driver'         => 'beanstalkd',
    'host'           => env('BEANSTALK_HOST', 'localhost'),
    'queue'          => 'default',
    'ttr'            => 60,
    'socketTimeout'  => null
    ],

TriagedQueues 提供三个条目

hostN

使用此语法将任意数量的主机条目添加到连接中

'host[N]' => '[host value]'

其中 [N] 是您想要尝试的主机的顺序。主主机不需要数字。

'host'  => 'server.domain.com',
'host1' => 'server-fallback1.domain.com',
'host2' => 'server-fallback2.domain.com',
...

fallbackToSync

如果设置了 fallbackToSync 条目并设置为 true,则 TriagedQueues 在所有主机都不可达时将使用 sync 驱动程序。

'host'           => 'server.domain.com',
'host1'          => 'server-fallback1.domain.com',
'host2'          => 'server-fallback2.domain.com',
'fallbackToSync' => true
...

attempts

添加 attempts 条目将使 TriagedQueues 在移动到下一个主机之前尝试 X 次建立连接。

'attempts' => 2

touch() (Beanstalkd 仅限)

修改后的 BeanstalkdJob 包括一个 touch() 方法,该方法将向 beanstalkd 队列发送一个 touch 命令。这重置了作业的剩余时间(TTR 减去运行时间),因此它不会被踢回就绪队列。

socketTimeout (Beanstalkd 仅限)

如果作业运行时间较长,则必须增加作业的 TTR(运行时间)或定期 touch 它以保留它。然而,还有另一个决定作业行为因素:如果客户端在保留作业时断开连接,则无论 TTR 如何,作业都会被踢回就绪队列

因此,对于运行时间超过默认套接字超时(ini 设置中的 60 秒)的作业,必须定期 touch 作业,更改此 ini 设置或使用配置中的 socketTimeout 键值手动指定超时。

自定义队列作业

处理从队列中分发的命令的默认类是 Illuminate\Queue\CallQueuedHandler。然而,此默认功能不允许用户在命令执行之前设置任何逻辑。

使用每个队列连接中的 job 属性,您可以指定一个任意 class@method,该方法将在使用自动有效负载创建时调用而不是 CallQueuedHandler。这意味着这仅在您使用 Bus::dispatch() 或类似方法时才有效,其中分配器为您创建有效负载(并且它不是闭包或原始有效负载)。

配置如下

'beanstalkd' => [
    'driver'         => 'beanstalkd',
    'host'           => env('BEANSTALK_HOST', 'localhost'),
    'queue'          => 'default',
    'ttr'            => 60,
    'socketTimeout'  => null,
    'job'            => 'app\MyCustomHandler@call'
    ],

我建议您从 CallQueuedHandler 继承,并重写 call 方法以简化操作。

使用这个方法的例子可能是在处理作业之前为 Laravel 工作实例设置认证。

class CallAuthenticatableQueuedHandler extends CallQueuedHandler
{
	public function call(Job $job, array $data)
	{
		$command = $this->setJobInstanceIfNecessary(
			$job, unserialize($data['command'])
		);
  
                // pipe an authentication middleware before the command is fired
		$this->dispatcher->pipeThrough([AuthenticateDispatchedJob::class])->dispatchNow($command, function ($handler) use ($job) {
			$this->setJobInstanceIfNecessary($job, $handler);
		});

		if (! $job->isDeletedOrReleased()) {
			$job->delete();
		}
	}
}

贡献

欢迎贡献额外的驱动程序!创建新驱动程序的步骤很简单

  1. Fulfillment\TriagedQueues\Queue\Connectors 中创建一个新类,该类实现了 Illuminate\Queue\Connectors\ConnectorInterface
  2. 实现 ConnectorInterface(即 connect() 函数),确保您的该方法尝试 $config 参数中列出的所有主机。
  3. 如果所有主机尝试均失败,则抛出 NoHostException

然后提交一个 PR,我会非常乐意接受它 :)

许可证

此软件包根据 MIT 许可证 许可。