mmoreram / rsqueue-bundle
Redis Symfony2 Queue Bundle,一个简单且基于Redis的消息队列,用于symfony2
Requires
- php: >=5.3.3
- doctrine/annotations: ^1.1.2
- symfony/config: ~2.3|~3.0
- symfony/console: ~2.3|~3.0
- symfony/dependency-injection: ~2.3|~3.0
- symfony/event-dispatcher: ~2.3|~3.0
- symfony/framework-bundle: ~2.3|~3.0
- symfony/http-kernel: ~2.3|~3.0
Requires (Dev)
- symfony/phpunit-bridge: ~2.7
This package is auto-updated.
Last update: 2024-08-29 04:03:19 UTC
README
警告!!
迁移到 https://github.com/rsqueue/RSQueueBundle
此包将很快被弃用
RSQueueBundle for Symfony
基于Redis的简单队列系统
目录
安装/配置
标签
- 使用
1.0-dev
版本进行最新更新。它是dev-master
的别名。 - 使用最新的稳定版本标签以保持稳定发布。
安装Redis
wget http://download.redis.io/redis-stable.tar.gz
tar xvzf redis-stable.tar.gz
cd redis-stable
make
安装PHPRedis
phpredis扩展必须在您的服务器上安装。
否则composer会提醒您。
git clone git://github.com/nicolasff/phpredis.git cd phpredis phpize ./configure make sudo make install cd .. echo "extension=redis.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"`
安装RSQueue
您必须将require行添加到您的composer.json文件中
"require": { "php": ">=5.3.3", "symfony/symfony": "2.3.*", ... "mmoreram/rsqueue-bundle": "dev-master" },
然后您必须使用composer更新您的项目依赖项
php composer.phar update
并在appkernel.php文件中注册该捆绑包
return array( // ... new Mmoreram\RSQueueBundle\RSQueueBundle(), // ... );
配置
在此第一个版本中,所有连接都是localhost:6379,但将尽快可配置连接。
您需要配置所有队列和序列化器。
默认情况下,序列化器的值为'Json',但也可以使用'PHP'值。还可以通过扩展默认序列化器接口实现自定义序列化器。然后您需要将类的命名空间添加到rs_queue.serializer参数中。
rs_queue: # Queues definition queues: videos: "queues:videos" audios: "queues:audios" # Serializer definition serializer: ~ # Server configuration. By default, these values server: redis: host: 127.0.0.1 port: 6379 database: ~
生产者/消费者
生产者/消费者模型允许您使用默认的rsqueue生产者服务向一个/多个队列中生产元素。
一个元素被推入一个队列,因此只有一个消费者将弹出并处理此元素。
$this->container->get("rs_queue.producer")->produce("videos", "this is my video"); $this->container->get("rs_queue.producer")->produce("audios", "this is my audio");
然后您应该扩展ConsumerCommand,这样您就可以定义要监听的队列,以及每种情况下要执行的操作。
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Mmoreram\RSQueueBundle\Command\ConsumerCommand; /** * Testing consumer command */ class TestConsumerCommand extends ConsumerCommand { /** * Configuration method */ protected function configure() { $this ->setName('test:consumer') ->setDescription('Testing consumer command'); ; parent::configure(); } /** * Relates queue name with appropiated method */ public function define() { $this->addQueue('videos', 'consumeVideo'); } /** * If many queues are defined, as Redis respects order of queues, you can shuffle them * just overwritting method shuffleQueues() and returning true * * @return boolean Shuffle before passing to Gearman */ public function shuffleQueues() { return true; } /** * Consume method with retrieved queue value * * @param InputInterface $input An InputInterface instance * @param OutputInterface $output An OutputInterface instance * @param Mixed $payload Data retrieved and unserialized from queue */ protected function consumeVideo(InputInterface $input, OutputInterface $output, $payload) { $output->writeln($payload); } }
发布者/订阅者
此模型允许数据广播。这意味着一个或多个订阅者将处理队列中的所有元素,但只有当它们在发布者发布它们的时候正在监听。
$this->container->get("rs_queue.publisher")->publish("audios", "this is my audio");
并且,作为消费者,订阅者必须定义他们想要监听的频道
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Mmoreram\RSQueueBundle\Command\SubscriberCommand; /** * Testing subscriber command */ class TestSubscriberCommand extends SubscriberCommand { /** * Configuration method */ protected function configure() { $this ->setName('test:subscriber:audios') ->setDescription('Testing subscriber audios command'); ; parent::configure(); } /** * Relates queue name with appropiated method */ public function define() { $this->addChannel('audios', 'consumeAudio'); } /** * If many queues are defined, as Redis respects order of queues, you can shuffle them * just overwritting method shuffleQueues() and returning true * * @return boolean Shuffle before passing to Gearman */ public function shuffleQueues() { return true; } /** * subscriber method with retrieved queue value * * @param InputInterface $input An InputInterface instance * @param OutputInterface $output An OutputInterface instance * @param Mixed $payload Data retrieved and unserialized from queue */ protected function consumeAudio(InputInterface $input, OutputInterface $output, $payload) { $output->writeln($payload); } }
通过扩展PSubscriberCommand,您可以使用模式而不是队列名称。
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Mmoreram\RSQueueBundle\Command\PSubscriberCommand; /** * Testing PSubscriber command */ class TestPSubscriberCommand extends PSubscriberCommand { /** * Configuration method */ protected function configure() { $this ->setName('test:psubscriber') ->setDescription('Testing psubscriber command'); ; parent::configure(); } /** * Relates queue name with appropiated method */ public function define() { $this->addPattern('*', 'consumeAll'); } /** * If many queues are defined, as Redis respects order of queues, you can shuffle them * just overwritting method shuffleQueues() and returning true * * @return boolean Shuffle before passing to Gearman */ public function shuffleQueues() { return true; } /** * Consume method with retrieved queue value * * @param InputInterface $input An InputInterface instance * @param OutputInterface $output An OutputInterface instance * @param Mixed $payload Data retrieved and unserialized from queue */ protected function consumeAll(InputInterface $input, OutputInterface $output, $payload) { $output->writeln($payload); } }
事件
此捆绑包使用自定义事件。
/** * The rs_queue.consumer is thrown each time a job is consumed by consumer * * The event listener recieves an * Mmoreram\RSQueueBundle\Event\RSQueueConsumerEvent instance * * @var string */ const RSQUEUE_CONSUMER = 'rs_queue.consumer'; /** * The rs_queue.subscriber is thrown each time a job is consumed by subscriber * * The event listener recieves an * Mmoreram\RSQueueBundle\Event\RSQueueSubscriberEvent instance * * @var string */ const RSQUEUE_SUBSCRIBER = 'rs_queue.subscriber'; /** * The rs_queue.producer is thrown each time a job is consumed by producer * * The event listener recieves an * Mmoreram\RSQueueBundle\Event\RSQueueProducerEvent instance * * @var string */ const RSQUEUE_PRODUCER = 'rs_queue.producer'; /** * The rs_queue.publisher is thrown each time a job is consumed by publisher * * The event listener recieves an * Mmoreram\RSQueueBundle\Event\RSQueuePublisherEvent instance * * @var string */ const RSQUEUE_PUBLISHER = 'rs_queue.publisher';
贡献
所有代码都是按Symfony2代码格式编写的,因此每个pull request都必须验证phpcs标准。您应该阅读Symfony2编码标准并安装此 CodeSniffer以检查所有代码是否已验证。
此外,还有一项关于为此项目做出贡献的政策。所有pull request都必须一步步解释,以便我们更容易理解并合并pull request。所有新功能都必须使用PHPUnit进行测试。
如果您想做出贡献,请阅读文档中的贡献代码部分。如果您正在提交pull request,请遵循提交补丁部分的指南,并使用Pull Request模板。