socloz/nsq-bundle

基于nsqphp的Symfony2消息队列组件

dev-master 2016-06-30 13:01 UTC

This package is not auto-updated.

Last update: 2024-09-24 03:33:42 UTC


README

此扩展允许您的Symfony2应用程序与NSQ队列系统协同工作。它是对NSQPHP(PHP的NSQ客户端库)的一个轻量级封装。

安装

在composer.json中添加依赖项

{
    "require": {
        "socloz/nsq-bundle": "dev-master"
    }
}

在您的应用程序内核中启用扩展

<?php
// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        // ...
        new Socloz\NsqBundle\SoclozNsqBundle(),
    );
}

配置

# app/config.yml

socloz_nsq:
  lookupd_hosts: ["localhost:4161"] # list of nsqlookupd hosts, optional. If omitted the consumers
                                    # will use the "publish_to" list (see below) of the topic.
  topics:
    foo:                      # topic name
      publish_to: [localhost] # list of nsqd hosts to publish to
      requeue_strategy:       # requeuing strategy on failure
        max_attempts: 5       # maximum number of attemps per message
        delays:               # requeuing delays in ms
          - 5000
          - 10000
          - 60000
          # 4st (and last) requeue will use the last specified delay (ie 60000)

指定消费者

通过配置

# app/config.yml

socloz_nsq:
  topics:
    foo:
      consumers:
        channel1: amce_consumer1 # service id
        channel2: amce_consumer2
        # ...

通过服务定义

# AcmeBundle/Resources/config/services.yml

amce_consumer1:
  class: AcmeBundle\Consumer1
  tags:
    -  { name: socloz.nsq.consumer, topic: foo, channel: acme.channel1 }

使用方法

生产者端

<?php

use nsqphp\Message\Message;

// ...

$topic = $container->get('socloz.nsq')->getTopic('foo');
// or simply
$topic = $container->get('socloz.nsq.topic.foo');
// It can also be a fancy way to directly inject a topic as service dependency
// instead of injecting both the topic manager and the topic name.

// then
$topic->publish('message payload');

消费者端

<?php
// AcmeBundle/Consumer1.php

namespace AcmeBundle;

use nsqphp\Exception;
use Socloz\NsqBundle\Consumer\ConsumerInterface;

class Consumer1 implements ConsumerInterface
{
    public function consume($topic, $channel, $payload)
    {
        if (...) {
            // Stops message processing and finishes it in normal way (as a
            // successful message process)
            throw new Exception\ExpiredMessageException();
        }

        if (...) {
            // Forces requeue of the message with the specified delay (and thus
            // bypasses the topic's requeue strategy)
            throw new Exception\RequeueMessageException(10);
        }
    }
}

为特定主题运行消费者

app/console socloz:nsq:topic:consume <topic> [-c <channel>]

延迟消息

延迟消息通过一个中间主题支持。当您发布一个带延迟的消息时,它将首先发布到指定主机上的一个专门的“隐藏”主题(命名为“__socloz_delayed”)。在此消息准备好之前,它将重新入队,剩余的延迟将继续。一旦消息准备好,它最终将发布到业务主题。

配置

以下是延迟消息主题的默认配置。

# app/config.yml

socloz_nsq:
  delayed_messages_topic:
    publish_to: localhost # single host
    requeue_strategy:     # 4 retries with 10s interval
      max_attempts: 5
      delays: [10000]

发布一个带延迟的消息

<?php
// ...
$topic->publish(
    'message payload',
    600 // will be ready in 10 minutes
);

运行为特定主题设计的消费者

app/console socloz:nsq:delayed:consume

其他

获取给定主题的消费者列表

app/console socloz:nsq:topic:list <topic>

测试

目前单元测试需要一个运行在本地主机、端口4150上的nsqd。您还需要PHPUnit和composer。

php composer.phar install --dev
phpunit

存根

当您需要测试依赖于SoclozNsqBundle的组件时,您可以通过配置启用存根模式。

# app/config_test.yml

socloz_nsq:
  stub: true

每个主题的存根使用内存中的消息列表。发布者端没有任何变化。在消费者端,您必须以这种方式调用consume():

<?php
// ...
$topic->consume(
    array(), // channel whitelist (optional)
    array(
        'limit' => 5, // number of messages to process (default=1)
        'ignore_delay' => false, // whether to skip delayed messages (default=true)
        'filter' => function ($payload) { // filter callback (default=null)
                                          // if false is returned, the message
                                          // will be skipped
            // ...
        },
        'consumers' => array( // overriding consumers list (default=empty list)
            'acme.channel1' => function ($payload) {
                // ...
            },
        ),
    ),
);

以下辅助方法也可用:

<?php
// clear message list
$topic->clear();

// delete messages filtered by the given callback
$topic->delete(function ($payload) {
    // ...
});