socloz / nsq-bundle
基于nsqphp的Symfony2消息队列组件
dev-master
2016-06-30 13:01 UTC
Requires
- php: >=5.3.2
- socloz/nsqphp: dev-socloz
Requires (Dev)
- symfony/class-loader: 2.2.x-dev
This package is not auto-updated.
Last update: 2024-09-24 03:33:42 UTC
README
此扩展允许您的Symfony2应用程序与NSQ队列系统协同工作。它是对NSQPHP(PHP的NSQ客户端库)的一个轻量级封装。
安装
在composer.json中添加依赖项
{ "require": { "socloz/nsq-bundle": "dev-master" } }
在您的应用程序内核中启用扩展
<?php // app/AppKernel.php public function registerBundles() { $bundles = array( // ... new Socloz\NsqBundle\SoclozNsqBundle(), ); }
配置
# app/config.yml socloz_nsq: lookupd_hosts: ["localhost:4161"] # list of nsqlookupd hosts, optional. If omitted the consumers # will use the "publish_to" list (see below) of the topic. topics: foo: # topic name publish_to: [localhost] # list of nsqd hosts to publish to requeue_strategy: # requeuing strategy on failure max_attempts: 5 # maximum number of attemps per message delays: # requeuing delays in ms - 5000 - 10000 - 60000 # 4st (and last) requeue will use the last specified delay (ie 60000)
指定消费者
通过配置
# app/config.yml socloz_nsq: topics: foo: consumers: channel1: amce_consumer1 # service id channel2: amce_consumer2 # ...
通过服务定义
# AcmeBundle/Resources/config/services.yml amce_consumer1: class: AcmeBundle\Consumer1 tags: - { name: socloz.nsq.consumer, topic: foo, channel: acme.channel1 }
使用方法
生产者端
<?php use nsqphp\Message\Message; // ... $topic = $container->get('socloz.nsq')->getTopic('foo'); // or simply $topic = $container->get('socloz.nsq.topic.foo'); // It can also be a fancy way to directly inject a topic as service dependency // instead of injecting both the topic manager and the topic name. // then $topic->publish('message payload');
消费者端
<?php // AcmeBundle/Consumer1.php namespace AcmeBundle; use nsqphp\Exception; use Socloz\NsqBundle\Consumer\ConsumerInterface; class Consumer1 implements ConsumerInterface { public function consume($topic, $channel, $payload) { if (...) { // Stops message processing and finishes it in normal way (as a // successful message process) throw new Exception\ExpiredMessageException(); } if (...) { // Forces requeue of the message with the specified delay (and thus // bypasses the topic's requeue strategy) throw new Exception\RequeueMessageException(10); } } }
为特定主题运行消费者
app/console socloz:nsq:topic:consume <topic> [-c <channel>]
延迟消息
延迟消息通过一个中间主题支持。当您发布一个带延迟的消息时,它将首先发布到指定主机上的一个专门的“隐藏”主题(命名为“__socloz_delayed”)。在此消息准备好之前,它将重新入队,剩余的延迟将继续。一旦消息准备好,它最终将发布到业务主题。
配置
以下是延迟消息主题的默认配置。
# app/config.yml socloz_nsq: delayed_messages_topic: publish_to: localhost # single host requeue_strategy: # 4 retries with 10s interval max_attempts: 5 delays: [10000]
发布一个带延迟的消息
<?php // ... $topic->publish( 'message payload', 600 // will be ready in 10 minutes );
运行为特定主题设计的消费者
app/console socloz:nsq:delayed:consume
其他
获取给定主题的消费者列表
app/console socloz:nsq:topic:list <topic>
测试
目前单元测试需要一个运行在本地主机、端口4150上的nsqd。您还需要PHPUnit和composer。
php composer.phar install --dev
phpunit
存根
当您需要测试依赖于SoclozNsqBundle的组件时,您可以通过配置启用存根模式。
# app/config_test.yml socloz_nsq: stub: true
每个主题的存根使用内存中的消息列表。发布者端没有任何变化。在消费者端,您必须以这种方式调用consume():
<?php // ... $topic->consume( array(), // channel whitelist (optional) array( 'limit' => 5, // number of messages to process (default=1) 'ignore_delay' => false, // whether to skip delayed messages (default=true) 'filter' => function ($payload) { // filter callback (default=null) // if false is returned, the message // will be skipped // ... }, 'consumers' => array( // overriding consumers list (default=empty list) 'acme.channel1' => function ($payload) { // ... }, ), ), );
以下辅助方法也可用:
<?php // clear message list $topic->clear(); // delete messages filtered by the given callback $topic->delete(function ($payload) { // ... });