eventio / bbq
PHP 消息队列抽象库
Requires
- php: >=5.3.0
Requires (Dev)
- iron-io/iron_mq: ~1.4.5
- pda/pheanstalk: ~2.1
- predis/predis: ~0.8
This package is not auto-updated.
Last update: 2024-09-14 14:39:58 UTC
README
BBQ 是一个用于 PHP (5.3+) 的消息队列抽象库。该库支持不同类型的队列,您可以根据应用程序的需要混合使用。
由于实际的队列服务被抽象化,您可以在不同的环境中使用不同的队列类型。
安装
"require": {
"eventio/bbq": "dev-master"
}
对于 Symfony2 项目,您可以使用 EventioBBQBundle
基本用法
您至少需要一个队列,并通过 BBQ() 进行注册。BBQ 队列充当消息队列服务的抽象层。
<?php
$bbq = new BBQ();
$queue = new DirectoryQueue('tasks', '/var/lib/bbq/email_tasks');
$bbq->registerQueue($queue);
队列注册后,您可以通过以下方式消费队列:
- 推送消息;或
- 获取消息
推送消息
$bbq->pushJob('tasks', new StringPayload('New task payload'));
BBQ::pushJob() 接受两个参数
- 要推送工作的队列 ID
- 工作负载
获取消息
$job = $bbq->fetchJob('tasks');
$payload = $job->getPayload();
echo $payload; // Outputs "New task payload"
$bbq->deleteJob($job);
BBQ::fetchJob() 接受两个参数
- 获取工作的队列 ID(必填)
- 可选的等待队列任务的超时时间(秒)
如您在示例中看到的那样,在成功处理工作后,您应明确地使用 deleteJob($job) 删除工作。否则,工作最有可能被返回到队列。实际行为取决于队列类型。
队列
BBQ 队列结合了消息队列服务和由服务托管的实际队列。当您在应用程序中使用 BBQ 时,您不需要知道哪个服务实际上托管了队列。您可以在不同的环境中轻松地使用不同的队列类型(开发、测试、生产)。
请参阅以下支持的队列类型。
每个队列都使用 BBQ() 注册,并带有 ID。ID 是任何标识您应用程序中队列的字符串。
支持的队列类型
DirectoryQueue
$queue = new DirectoryQueue('queue_id', '/tmp/queue');
DirectoryQueue 在给定目录中的文件中持久化工作。
已获取但未删除的工作将作为新工作返回到队列。
PheanstalkTubeQueue (beanstalkd)
$pheanstalk = new \Pheanstalk_Pheanstalk('127.0.0.1');
$queue = new PheanstalkTubeQueue('queue_id', $pheanstalk, 'tube_name');
PheanstalkTubeQueue 使用 Pheanstalk 库 访问配置的 beanstalkd 服务器及其中的一个 tube。您需要将 \Pheanstalk_Pheanstalk 的实例和 tube 的名称传递给构造函数。
当脚本结束时,未删除但已获取的工作将返回到队列。
RedisQueue (Redis 服务器)
$redis = new \Predis\Client();
$queueListKey = 'queue_key';
$queue = new RedisQueue('queue_id', $redis, $queueListKey);
RedisQueue 使用 Predis PHP 库 访问配置的 Redis 服务器。实际队列是通过 Redis 列实现的。
pushJob()使用[LPUSH](https://redis.ac.cn/commands/lpush)将工作负载添加到列表fetchJob()使用[BRPOPLPUSH](https://redis.ac.cn/commands/brpoplpush)或[RPOPLPUSH](https://redis.ac.cn/commands/rpoplpush)获取工作finalizeJob()使用[LREM](https://redis.ac.cn/commands/lrem)从处理队列中删除工作releaseJob()使用[RPOPLPUSH](https://redis.ac.cn/commands/lrem)将工作移回列表队列
当脚本结束时,未删除但已获取的工作将返回到队列。
RedisQueue 使用处理队列的概念来确保即使在客户端失败的情况下也能保证队列的可靠性。处理队列仅存在于特定的键中,在 fetchJob() 和 finalizeJob()(或 releaseJob())调用之间。处理队列的键名在 fetchJob() 调用中自动构建,并默认遵循以下模式 <queue_name>:<host_name>:<pid>(随机唯一字符串)。 了解更多关于可靠队列模式的信息。
队列配置
您可以通过向队列构造函数传递第四个参数来进一步自定义队列配置。
$queue = new RedisQueue('queue_id', $redis, $queueListKey, $configuration);
$configuration 应该是一个关联数组。默认配置(和可能的变量)如下。
$configuration = array(
'processing_queue_key_prefix' => '%q:%h:%p',
'allow_infinite_blocking' => false,
'skip_shutdown_release' => false,
);
processing_queue_key_prefix:处理队列键的前缀模式。有几个占位符会被实际值替换:%q 主要队列名称,%h 主机名和 %p PHP进程ID。
allow_infinite_blocking:默认情况下,如果您没有为 fetchJob() 传递任何超时(或 NULL 或 0 或 false),RedisQueue 将执行非阻塞的 [RPOPLPUSH](https://redis.ac.cn/commands/rpoplpush) 调用,而不是阻塞的 [BRPOPLPUSH](https://redis.ac.cn/commands/brpoplpush)。如果队列为空,函数将立即返回。如果将 allow_infinite_blocking 设置为 true 并为 fetchJob() 传递无超时,队列将强制使用 [BRPOPLPUSH](https://redis.ac.cn/commands/brpoplpush),即使没有超时(=无限阻塞)。请谨慎使用。
ship_shutdown_release:默认情况下,队列注册了一个调用,将可能未释放和未完成的但已检索的工作项放回队列。设置为 true 以禁用此功能。
IronMQQueue
$ironMQ = new \IronMQ(array(
'token' => 'YOUR_IRONMQ_TOKEN',
'project_id' => 'YOUR_IRONMQ_PROJECT_ID'
));
$queue = new IronMQQueue('queue_id', $ironMQ, 'queue_name');
IronMQQueue 通过 HTTP(S) 接口访问 Iron.io 的 IronMQ 服务。
在超时后,未删除但已检索的工作项如果未被删除,将返回到队列中。默认超时时间为 60 秒。
数组队列
$queue = new ArrayQueue('queue_id');
消息存储在队列对象内部的数组中。显然,这种类型的队列在 PHP 进程之间不是持久的,主要用于测试。
脚本结束时,整个队列将被销毁。
贡献
由于库处于非常初级的阶段,您非常欢迎通过以下方式贡献工作:
- 修复错误
- 编写新的测试
- 实现新的队列类型
- 对代码提出想法和评论
许可
版权所有 Eventio Oy,Ville Mattila,2013
在 MIT 许可证 下发布