rsqueue/rsqueue-bundle

Redis Symfony2 队列Bundle,一个简单且灵活的基于Redis的消息队列,适用于symfony2

安装: 11,873

依赖者: 2

建议者: 0

安全性: 0

星级: 11

关注者: 4

分支: 2

开放问题: 4

类型:symfony-bundle

0.1.1 2018-10-08 02:05 UTC

This package is not auto-updated.

Last update: 2024-09-23 07:16:11 UTC


README

基于Redis的简单队列系统

SensioLabsInsight Build Status Scrutinizer Quality Score

目录

  1. 安装/配置
  2. 生产者/消费者
  3. 发布者/订阅者
  4. 事件
  5. 贡献

安装/配置

标签

  • 使用1.0-dev版本进行最新更新。是dev-master的别名。
  • 使用最新的稳定版本标签以保持稳定发布。

安装Redis

wget http://download.redis.io/redis-stable.tar.gz
tar xvzf redis-stable.tar.gz
cd redis-stable
make

安装PHPRedis

phpredis扩展必须在服务器上安装。
否则composer会提醒你。

git clone git://github.com/nicolasff/phpredis.git
cd phpredis
phpize
./configure
make
sudo make install
cd ..
echo "extension=redis.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"`

安装RSQueue

您必须将require行添加到您的composer.json文件中

"require": {
    "php": ">=5.3.3",
    "symfony/symfony": "2.3.*",
    ...
    "mmoreram/rsqueue-bundle": "dev-master"
},

然后您必须使用composer更新您的项目依赖项

php composer.phar update

并在appkernel.php文件中注册该bundle

return array(
    // ...
    new Mmoreram\RSQueueBundle\RSQueueBundle(),
    // ...
);

配置

在这个版本中,所有连接都是localhost:6379,但尽快将允许配置连接。
您需要配置所有队列和序列化器。
默认序列化器的值是'Json',但也可以使用'PHP'值。还可以通过扩展默认序列化器接口实现自定义序列化器。然后您需要将类的命名空间添加到rs_queue.serializer参数中。

rs_queue:

    # Queues definition
    queues:
        videos: "queues:videos"
        audios: "queues:audios"

    # Serializer definition
    serializer: ~

    # Server configuration. By default, these values
    server:
        redis:
            host: 127.0.0.1
            port: 6379
            database: ~

生产者/消费者

生产者/消费者模型允许您使用默认的rsqueue生产者服务将元素推送到一个/多个队列。
一个元素被推送到一个队列,因此只有一个消费者将弹出一个并处理此元素。

$this->container->get("rs_queue.producer")->produce("videos", "this is my video");
$this->container->get("rs_queue.producer")->produce("audios", "this is my audio");

然后您应该扩展ConsumerCommand,这样您可以定义要监听的队列,并在每种情况下执行哪种操作。

use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Mmoreram\RSQueueBundle\Command\ConsumerCommand;

/**
 * Testing consumer command
 */
class TestConsumerCommand extends ConsumerCommand
{

    /**
     * Configuration method
     */
    protected function configure()
    {
        $this
            ->setName('test:consumer')
            ->setDescription('Testing consumer command');
        ;

        parent::configure();
    }

    /**
     * Relates queue name with appropiated method
     */
    public function define()
    {
        $this->addQueue('videos', 'consumeVideo');
    }

    /**
     * If many queues are defined, as Redis respects order of queues, you can shuffle them
     * just overwritting method shuffleQueues() and returning true
     *
     * @return boolean Shuffle before passing to Gearman
     */
    public function shuffleQueues()
    {
        return true;
    }

    /**
     * Consume method with retrieved queue value
     *
     * @param InputInterface  $input   An InputInterface instance
     * @param OutputInterface $output  An OutputInterface instance
     * @param Mixed           $payload Data retrieved and unserialized from queue
     */
    protected function consumeVideo(InputInterface $input, OutputInterface $output, $payload)
    {
        $output->writeln($payload);
    }
}

发布者/订阅者

此模型允许数据广播。这意味着一个或多个订阅者将处理队列中的所有元素,但前提是他们正在监听发布者在发布它们的那一时刻。

$this->container->get("rs_queue.publisher")->publish("audios", "this is my audio");

作为消费者,订阅者必须定义他们想要监听的频道

use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Mmoreram\RSQueueBundle\Command\SubscriberCommand;

/**
 * Testing subscriber command
 */
class TestSubscriberCommand extends SubscriberCommand
{

    /**
     * Configuration method
     */
    protected function configure()
    {
        $this
            ->setName('test:subscriber:audios')
            ->setDescription('Testing subscriber audios command');
        ;

        parent::configure();
    }

    /**
     * Relates queue name with appropiated method
     */
    public function define()
    {
        $this->addChannel('audios', 'consumeAudio');
    }

    /**
     * If many queues are defined, as Redis respects order of queues, you can shuffle them
     * just overwritting method shuffleQueues() and returning true
     *
     * @return boolean Shuffle before passing to Gearman
     */
    public function shuffleQueues()
    {
        return true;
    }

    /**
     * subscriber method with retrieved queue value
     *
     * @param InputInterface  $input   An InputInterface instance
     * @param OutputInterface $output  An OutputInterface instance
     * @param Mixed           $payload Data retrieved and unserialized from queue
     */
    protected function consumeAudio(InputInterface $input, OutputInterface $output, $payload)
    {
        $output->writeln($payload);
    }
}

通过扩展PSubscriberCommand,您可以定义模式而不是队列名称。

use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Mmoreram\RSQueueBundle\Command\PSubscriberCommand;

/**
 * Testing PSubscriber command
 */
class TestPSubscriberCommand extends PSubscriberCommand
{

    /**
     * Configuration method
     */
    protected function configure()
    {
        $this
            ->setName('test:psubscriber')
            ->setDescription('Testing psubscriber command');
        ;

        parent::configure();
    }

    /**
     * Relates queue name with appropiated method
     */
    public function define()
    {
        $this->addPattern('*', 'consumeAll');
    }

    /**
     * If many queues are defined, as Redis respects order of queues, you can shuffle them
     * just overwritting method shuffleQueues() and returning true
     *
     * @return boolean Shuffle before passing to Gearman
     */
    public function shuffleQueues()
    {
        return true;
    }

    /**
     * Consume method with retrieved queue value
     *
     * @param InputInterface  $input   An InputInterface instance
     * @param OutputInterface $output  An OutputInterface instance
     * @param Mixed           $payload Data retrieved and unserialized from queue
     */
    protected function consumeAll(InputInterface $input, OutputInterface $output, $payload)
    {
        $output->writeln($payload);
    }
}

事件

此bundle使用自定义事件。

/**
 * The rs_queue.consumer is thrown each time a job is consumed by consumer
 *
 * The event listener recieves an
 * Mmoreram\RSQueueBundle\Event\RSQueueConsumerEvent instance
 *
 * @var string
 */
const RSQUEUE_CONSUMER = 'rs_queue.consumer';

/**
 * The rs_queue.subscriber is thrown each time a job is consumed by subscriber
 *
 * The event listener recieves an
 * Mmoreram\RSQueueBundle\Event\RSQueueSubscriberEvent instance
 *
 * @var string
 */
const RSQUEUE_SUBSCRIBER = 'rs_queue.subscriber';

/**
 * The rs_queue.producer is thrown each time a job is consumed by producer
 *
 * The event listener recieves an
 * Mmoreram\RSQueueBundle\Event\RSQueueProducerEvent instance
 *
 * @var string
 */
const RSQUEUE_PRODUCER = 'rs_queue.producer';

/**
 * The rs_queue.publisher is thrown each time a job is consumed by publisher
 *
 * The event listener recieves an
 * Mmoreram\RSQueueBundle\Event\RSQueuePublisherEvent instance
 *
 * @var string
 */
const RSQUEUE_PUBLISHER = 'rs_queue.publisher';

贡献

所有代码都采用Symfony2代码格式,因此每个pull request都必须验证phpcs标准。您应该阅读Symfony2编码标准并安装 CodeSniffer以检查所有代码是否经过验证。

该项目还有一项贡献政策。所有pull request都必须一步一步地解释,以便我们更容易理解并合并pull request。所有新功能都必须使用PHPUnit进行测试。

如果您想贡献,请阅读文档中的贡献代码部分。如果您正在提交pull request,请遵循提交补丁部分的指南,并使用Pull Request模板