kontoulis/rabbit-manager

独立的 RabbitMQ 命令行管理器

v1.0.1 2015-10-03 17:39 UTC

This package is not auto-updated.

Last update: 2024-09-28 18:47:41 UTC


README

Latest Stable Version Latest Unstable Version License

还有一个独立的 Laravel 5.1 包

Rabbit Manager 是一个独立的 PHP 包,可以轻松管理 RabbitMQ

  • 内置命令行工具。简单的命令可以添加/接收消息到/从 RabbitMQ
  • 作为独立安装,或者将其添加到您的项目
  • 内置消息处理器和代理

使用 composer 安装

$ composer require kontouis/rabbit-manager

或者作为独立使用

$ git clone https://github.com/kontoulis/rabbit-manager
$ cd rabbit-manager
$ sudo composer update

将 bin/rabbit-manager 添加到 /usr/local/bin

$ sudo ln -s /full/path/to/project/bin/rabbit-manager /usr/local/bin/rabbit-manager

使用默认命令或基于这些命令创建自己的命令。

依赖关系

  • 当然需要一个正在运行的 RabbitMQ 实例

测试

别忘了运行测试!

$ vendor/bin/phpunit

使用方法

有两种类型的作业。一种是向队列添加消息,另一种是监听该队列。

命令行

您可以根据包中已定义的命令自行构建自己的命令,然后添加到 src/manager.php 中。

    $application->addCommands(
    	array(
    		new RabbitManager\Commands\QueueAddCommand,
    		new RabbitManager\Commands\QueueListenCommand,
    		new RabbitManager\Commands\YourCustomCommand,
    	)
    );

该包包括 2 个基本命令。您可以从命令行运行这些命令,甚至将它们添加到 supervisor 作为您需要的任何实例的工人

  • queue:add -> 向指定的队列添加消息
$ rabbit-manager queue:add [queueName] [message]
  • queue:listen -> 从队列中消费消息
$ rabbit-manager queue:listen [queueName]

作为库

您可以通过创建自己的 CustomHandler 和 Broker 将包用作库。(然而,您可以用一行的代理将消息添加到队列中)

  • 向队列添加消息
use RabbitManager\Libs\Broker;
use RabbitManager\Libs\Message;

// Your Class and Methods

public function publishMessage($message , $queueName = "Default")
{
    $broker = new Broker(AMPQ_HOST, AMPQ_PORT, AMPQ_USER, AMPQ_PASSWORD , AMPQ_VHOST);
    /* Makes the AMPQ message */
    $msg = new Message($queueName, ["message" => $message]);
    
    /* Sends the message */
    $broker->sendMessage($msg);
    
    $output->writeln('<info>Successfully submitted in queue</info>');
}
  • 消费队列:要消费队列,您可能需要一个从命令行运行的脚本,或者一个可以运行到队列为空的脚本。一个好的做法是有一个只监听队列的脚本和一个处理器来处理每个接收到的消息。您可以将该文件作为工人添加到 supervisor 或直接运行它。您也可以在同一个文件中做所有这些。
use RabbitManager\Libs\Broker;

public function listenToQueue($queueName = "Default" )
// Listening to queue
  $broker = new Broker();
  // Here you tell the broker which handler to call in order to parse the message
  // Use a fully qualified Namespace.
  // The broker will call the tryProcessing() method of the specified Handler
  // for every message received from the queue.
  // The handler in the package is named DefaultHandler
  // Make your own handlers according to your needs
  $broker->listenToQueue(
  	$queueName,
  	array(
  		"\\RabbitManager\\Handlers\\" . $queueName . "Handler"
  	)
  );
use RabbitManager\Libs\Handler;
use RabbitManager\Libs\Message;

class TheNameOfTheQueue extends Handler
{

	/**
	 * Tries to process the incoming message.
	 * @param Message $msg
	 * @return int One of the possible return values defined as Handler
	 * constants.
	 */
	public function tryProcessing(Message $msg)
	{
	  // TODO : Check, modify or validate the message.
	  // If the message is OK, process it
		return $this->handleSuccess($msg->getAMQPMessage()->body);

	}

	/**
	 * @param $msg
	 * @return int
	 */
	protected function handleSuccess($msg)
	{
	  // TODO : Do the processing. Store something in the db,
	  // Send a notification or eanything you are supossed to do with the received message
		echo $msg . "\n";
    
    // Returns and integer to the Broker, and the broker continues accordingly.
    // For a full list of return codes see the section bellow
		return Handler::RV_SUCCEED_CONTINUE;
	}
}

处理器返回值

这些返回值将告诉代理在您处理完消息后要做什么

  /**
	 * Pass this message and proceed with the next
	 */
	const RV_PASS = 1;
	/**
	 * Continue and ignore the failed message
	 */
	const RV_FAILED_CONTINUE = 10;
	/**
	 * We failed to do our job with this message (e.g. failed to store it in the database),
	 * Force exit
	 */
	const RV_FAILED_STOP = 11;
	/**
	 * We failed to do our job with this message (e.g. failed to store it in the database),
	 * put it again in the queue
	 */
	const RV_FAILED_REQUEUE = 12;
	/**
	 * Keep listening to the queue after successfully parsing the message
	 */
	const RV_SUCCEED_CONTINUE = 20;
	/**
	 *  Force stop listening after successfully parsing a message
	 */
	const RV_SUCCEED_STOP = 21;
	/**
	 *
	 */
	const RV_SUCCEED = Handler::RV_SUCCEED_CONTINUE;
	/**
	 *
	 */
	const RV_FAILED = Handler::RV_FAILED_CONTINUE;
	/**
	 *
	 */
	const RV_ACK = Handler::RV_SUCCEED;
	/**
	 *
	 */
	const RV_NACK = Handler::RV_FAILED_STOP;

版本

1.0.1

请随时提供反馈或提出任何问题