hnto / bick
带有额外选项的RabbitMQ包装器(批量发布、跟踪、消息持久化)
Requires
- php: ^7.1.3
- ext-json: *
- php-amqplib/php-amqplib: ^2.8
- ramsey/uuid: ^3.8
This package is auto-updated.
Last update: 2024-09-17 00:00:02 UTC
README
Bick是用官方php-amqlib库构建的RabbitMQ包装器。除了简单的设置、发布和消费包装器外,Bick还可以用于批量发布消息、跟踪它们等。
安装
composer require hnto/bick
可能性
- 使用一个Bick类进行简单的设置、发布和消费
- 批量发布消息
- 存储在你选择的数据存储中
- 优先处理消息
- 跟踪消息
- 将错误消息存储在你选择的数据存储中,以供进一步研究
设置
- 通过提供
BickConnectionSetup实例化一个Bick类BickConnectionSetup需要一个包含以下值的配置- 主机
- 端口
- 用户
- 密码
- 虚拟主机(默认"/")
- 通过提供队列、交换机和绑定来执行
setup方法 Bick在连接故障时抛出异常
示例
try { $bick = new Bick(new BickConnectionSetup([ 'host' => 'localhost', 'port' => '5672', 'user' => 'guest', 'pass' => 'guest' ])); $bick->setup([ 'queues' => [ new QueueSetup('mailing'), new QueueSetup('process-file') ], 'exchanges' => [ new ExchangeSetup('default') ], 'bindings' => [ new QueueBindExchangeSetup('mailing', 'default', 'mail'), new QueueBindExchangeSetup('process-file', 'default', 'process-csv'), new QueueBindExchangeSetup('process-file', 'default', 'process-image'), new QueueBindExchangeSetup('process-file', 'default', 'process-pdf'), ] ]); } catch (BickException $exception) { }
发布消息
Bick对象可以用来获取一个BickPublisherInterface并发布一个消息。发布者内部的publish方法需要一个BickMessageInterface对象,并在无效的BickMessage上抛出异常
注意:当使用默认的BickPublisher类并且不想使用持久化时,请使用发布者方法的BickPublisher::persistMessages(false)将其设置为false。
示例
try { $publisher = $bick->publisher(MyPublisher::class); //Set the persistence adapter $publisher->setPersistenceAdapter($adapter); //Or if you don't want to persist $publisher->persistMessages(false); //When using the default BickPublisher object //you can set a set of available publishing options //Available publishing options $publisher->setPublishingOptions( //With this option you provide a valid callback that will be //executed upon an ACK from the broker when publishing a message. //Due note: this does not mean that the message was successfully routed //to a queue. Only that it was accepted by the broker. 'callback_ack' => function(AMQPMessage $msg), //With this option you provide a valid callback that will be //executed upon a NACK from the broker when publishing a message. //A NACK happens in exceptional cases that a message was //rejected by the broker. 'callback_nack' => function(AMQPMessage $msg), //With this option you provide a valid callback that will be //executed when the broker returns a message. //This usually happens if the message was, for some reason, //unroutable. This means that the message could not be routed //to a certain queue. This callback receives more info that the two above. //You receive a reply code, a reply text, the exchange the message //was published to, the routing key used and the AMQPMessage itself. callback_return => function($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $msg), ); $publisher->publish(new BickMessage([ 'body' => ['headers' => [], 'subject' => 'test', 'body' => '<html><body>Test</body></html>'], 'meta' => ['mail' => 'test@example.org'], 'exchange' => 'default', 'routingKey' => 'mail' ])); } catch (BickException $exception) { }
发布消息(批量)
Bick对象还提供了批量发布消息的选项。逻辑与发布单个消息时相同。区别在于你发送一个BickMessage对象数组。
示例
try { $publisher = $bick->publisher(BickPublisher::class); //Set the persistence adapter $publisher->setPersistenceAdapter($adapter); //Or if you don't want to persist $publisher->persistMessages(false); $publisher->publishBatch([ new BickMessage([ 'body' => ['users' => [1, 2, 33]], 'meta' => ['mail' => 'info@users.com'], 'queue' => 'process-file', 'exchange' => 'default', 'routingKey' => 'process-csv' ]), new BickMessage([ 'body' => ['body' => 'test'], 'meta' => ['mail' => 'info@users.com'], 'queue' => 'mailing', 'exchange' => 'default', 'routingKey' => 'mail' ]) ]); } catch (BickException $exception) { }
消费消息
为了消费消息,你必须创建自己的消费者类。此类必须实现BickConsumerInterface和BickConnectionShutdownInterface。为了更容易设置,你也可以扩展抽象的BickConsumer。你需要实现的唯一方法是process方法。此方法接受一个BickMessageInterface。在此方法中,你需要返回一个状态。这些状态可以在BickMessageInterface中找到,作为常量。可以通过执行Bick对象的方法consumer来获取消费者类。要开始消费消息,请在消费者类中执行consume方法。
注意:当使用抽象的BickConsumer时,如果你不想使用持久化,请将受保护的成员变量$persist设置为false。
示例
//MailingConsumer class MyConsumer extends BickConsumer { /** * Must contain the queue name * * @var string */ protected $queue = 'mailing'; protected $persist = false; /** * @inheritdoc */ public function process(BickMessageInterface $message): int { //Do something with the message return BickMessage::MESSAGE_ACK; } } //Consume messages $consumer = $bick->consumer(MyConsumer::class); //Set the persistence adapter (not requred if $persist is set to false) $consumer->setPersistenceAdapter($adapter); //Consume $consumer->consume('my-queue');
当使用BickConsumer时,你可以将原始的AMQP消息“翻译”为你自己的消息,该消息实现了BickMessageInterface。这样,你就可以完全控制消息体内部的内容以及更多信息。请求默认的BickConsumer时,将设置标准翻译器以创建默认的BickMessage对象。可以通过setTranslator方法替换默认翻译器。
$consumer->setTranslator(new MyTranslator()); class MyTranslator implements BickMessageTranslatorInterface { public function translate(AMQPMesage $msg): BickMessageInterface { //Do your own thing with the messsage //and return a valid BickMessageInterface object } }
消息持久化
为了跟踪消息而持久化消息
在Bick中,您可以通过提供一个数据存储适配器对象将消息持久化到数据存储中。这个适配器可以是您想要的任何类型(MySQL、Redis、文件)。BickPublisher 和 BickConsumer 对象已经提供了使用您的适配器中的 persist 方法将消息持久化到数据存储的实现。它们唯一需要的是在类 setAdapter 中设置的 Adapter 变量。这个适配器必须实现 PersistenceAdapterInterface 接口。您如何保存这些数据,由您自行决定。
示例
//PersistenceAdapterInterface ... public function persist(BickMessage $message): void; ... public function update(BickMessage $message): void; ... public function analyse(BickMessage $message, BickMessageFault $fault): void; ... //BickPublisher public function persist(BickMessage $message) { .... //Your adapter method "persist" will receive a BickMessage object //persist is used to insert a new message into your datastorage $this->getPersistenceAdapter()->persist($message); } //BickConsumer public function persist(BickMessage $message): void { ... Your adapter method "update" will receive a BickMessage object //update is used to update an existing message in your datastorage $this->getPersistenceAdapter()->update($message); } }
在NACK后持久化消息以进行分析
在Bick中,您可以持久化一个“NACK”消息,并提供有关错误的可选信息。为了实现此功能,您的消费者必须实现 MessageFaultAnalyseInterface 接口。该接口要求您定义一个名为 analyse 的方法,该方法需要一个 BickMessage 对象。此外,您必须将一个 BickMessageFault 对象赋值给成员变量 protected $fault。这个错误将和消息一起发送到数据存储;这个类实现了 JsonSerializable。默认情况下,BickConsumer 已经实现了这个功能。所以如果您扩展它,当返回 NACK 状态时,您的消息将自动存储到数据存储中。
示例
//MessageFaultAnalyseInterface ... public function analyse(BickMessage $message, BickMessageFault $fault): void; //MailingConsumer class MailingConsumer extends BickConsumer { ... public function process(BickMessage $message): int { //Do something with the message //Fault message (string|array), fault code (integer) $this->fault = new BickMessageFault('fault message', 1); return BickMessage::MESSAGE_NACK; ...
Bick适配器
Bick有一个自己的适配器用于保存/跟踪消息。这是一个连接到您选择的数据库关系型数据库的PDO适配器(MySQL/MariaDB/Postgre)。
SQL表
-- Create syntax for TABLE 'bick_batch' CREATE TABLE `bick_batch` ( `id` varchar(255) NOT NULL DEFAULT '', `messages` int(10) DEFAULT '1', `status` int(1) DEFAULT '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Create syntax for TABLE 'bick_message' CREATE TABLE `bick_message` ( `id` varchar(255) NOT NULL DEFAULT '', `message` text, `status` int(1) NOT NULL DEFAULT '0', `batchId` varchar(255) NOT NULL DEFAULT '', PRIMARY KEY (`id`), KEY `batchId` (`batchId`), CONSTRAINT `batchId` FOREIGN KEY (`batchId`) REFERENCES `bick_batch` (`id`) ON DELETE CASCADE ON UPDATE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Create syntax for TABLE 'bick_analyse' CREATE TABLE `bick_analyse` ( `id` int(255) unsigned NOT NULL AUTO_INCREMENT, `message` text NOT NULL, `fault` text, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
示例
//Bick\Service\Storage\BickStorage ... //This method is executed upon publishing a message public function persist(BickMessage $message) { ... //Insert into batch ... //Insert into message ... //This method is executed upon consuming a queue public function update(BickMessage $message) { //Update message ... //Check if all messages of batch have been completed ... //Update the batch ...
您可以按这种方式实现Bick适配器
$dsn = 'mysql:host=host;dbname=name'; $username = 'username'; $password = 'password'; //Optional options $options = array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8', ); $dbh = new PDO($dsn, $username, $password, $options); //Instantiate the adapter and give it a PDO object $adapter = new \Bick\Service\Storage\BickStorage($dbh); //Set the adapter in the publisher $bick->publisher(BickPublisher::class) ->setPersistenceAdapter($adapter); //Set the adapter in the consumer $bick->consumer(MyConsumer::class) ->setPersistenceAdapter($adapter);
事实
- 在实例化一个
Bick对象时,配置将被保存- 直到请求一个动作,才会创建与RabbitMQ的连接
- 在设置时,将打开连接并请求一个通道
- 随后关闭通道和连接
- 在发布消息时,将打开连接并请求一个通道
- 随后关闭通道和连接
- 在消费队列时,将打开连接并请求一个通道
- 之后不会关闭通道和连接。如果消费者停止,这些连接将被关闭。
待办事项
- 实现当消息(批量)完成或出现故障时发送事件或通知
- 在批量中优先处理消息
- 仪表板用于查看错误消息、编辑它们并根据需要重新发布