rstmpw / ibmmq
IBM WebSphere MQ 的 PHP 客户端
1.2.1
2020-06-05 12:02 UTC
Requires
- php: ^7.0
Suggests
- ext-mqseries: To actually use this library, you'll have to install the custom version of extension mqseries
README
安装
为了运行,需要为 PHP7 修改过的 pecl 扩展 mqseries。
composer require rstmpw/ibmmq
使用
函数可能会抛出 \RuntimeException 或 \InvalidArgumentException
从队列中获取/发送消息
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; // Функция-помощник для формирования правильной структуры параметров подключения. // Если используется внешнее хранилище параметров подключений, то там следует хранить уже // сформированную структуру. $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); // Соединяемся с сервером $mqServer = new MQClient($mqConnOpts); // Открываем очередь $mqQueue = $mqServer->openQueue('QUEUE.NAME'); // Для удаленных очередей: // $mqOueue = $mqServer->openQueue('REMOTE.QUEUE.NAME', [MQSERIES_MQOO_FAIL_IF_QUIESCING, MQSERIES_MQOO_OUTPUT]); // Создаем сообщение и отправляем его в очередь $outMessage = new MQMessage('Somebody message '.time()); echo "\n Data: ".$outMessage->data(); $mqQueue->put($outMessage); echo "\n msgId: ".$outMessage->property('MsgId'); // Получаем сообщение из очереди $inMessage = $mqQueue->get(); echo "\n Data: ".$inMessage->data(); echo "\n msgId: ".$inMessage->property('MsgId'); // Закрываем объект и соединение с сервром // Явным образом можно не закрывать, деструкторы сделают все автоматически $mqQueue->close(); unset($mqServer);
如果只需要将一条消息放入队列,可以不显式打开队列并使用 put1() 函数
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); // Создаем сообщение $outMessage = new MQMessage('Somebody message '.time()); $mqServer->put1($outMessage, 'QUEUE.NAME'); echo "\n msgId: ".$outMessage->property('MsgId');
向另一个队列管理器发送消息
您的 QM 必须知道如何向另一个管理器发送消息(通过专用远程或传输队列)
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); // Создаем сообщение $outMessage = new MQMessage('Somebody message '.time()); $mqServer->put1($outMessage, 'QM.REMOT//QUEUE.NAME'); echo "\n msgId: ".$outMessage->property('MsgId');
从队列中获取消息,并按 CorrelId 选择
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); $mqQueue = $mqServer->openQueue('QUEUE.NAME'); // Устанавливаем опции получения сообщения и выборки $getOpts = [ 'Version' => MQSERIES_MQGMO_VERSION_2, 'Options' => [MQSERIES_MQGMO_FAIL_IF_QUIESCING, MQSERIES_MQGMO_WAIT], 'WaitInterval' => 60*1000, //60 sec 'MatchOptions' => [MQSERIES_MQMO_MATCH_CORREL_ID] ]; $MQMD = [ 'MsgId' => MQSERIES_MQMI_NONE, 'CorrelId' => 'NeedCorrelId' ]; // Получаем сообщение из очереди $inMessage = $mqQueue->get($getOpts, $MQMD); echo "\n Data: ".$inMessage->data(); echo "\n msgId: ".$inMessage->property('MsgId'); echo "\n correlId: ".$inMessage->property('CorrelId'); $mqQueue->close(); unset($mqServer);
记录/读取消息属性
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); // Создаем сообщение и устнавливаем свойства $outMessage = new MQMessage('Somebody message '.time()); $outMessage->property('Priority', 7); $outMessage->property('ReplyToQ', 'RESPONSE.QUEUE'); $outMessage->property('ReplyToQMgr', 'RESPONSE.QM'); // Отправляем сообщение $mqServer->put1($outMessage, 'QUEUE.NAME'); // Получаем сообщение и смотрим свойства $mqQueue = $mqServer->openQueue('QUEUE.NAME'); $inMessage = $mqQueue->get(); echo "\n Data: ".$inMessage->data(); echo "\n msgId: ".$inMessage->property('MsgId'); echo "\n Priority: ".$inMessage->property('Priority'); echo "\n ReplyToQ: ".$inMessage->property('ReplyToQ'); echo "\n ReplyToQMgr: ".$inMessage->property('ReplyToQMgr');
发布主题
与发送消息类似,区别在于使用主题名称而不是队列名称。主题名称由 '/' 分隔的字符串组成,例如 'news/sport/nascar'。
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); // Открываем топик $mqQueue = $mqServer->openTopic('news/sport/nascar'); // Создаем сообщение и публикуем его в топик $outMessage = new MQMessage('Latest news about nascar racing'); $mqQueue->put($outMessage);
也支持通过 put1 简化调用,但主题名称必须以 'TOPIC::' 开头
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); // Создаем сообщение и публикуем его в топик $outMessage = new MQMessage('Latest news about nascar racing'); $mqServer->put1($outMessage, 'TOPIC::news/sport/nascar/winners/top/1');
事务
与数据库中的事务类似
<?php use rstmpw\ibmmq\MQClient; use rstmpw\ibmmq\MQMessage; $mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414'); $mqServer = new MQClient($mqConnOpts); // Отправляем 2 сообщения вне транзакции $outMessage = new MQMessage('Put message w/o transaction'); $mqServer->put1($outMessage, 'OTHER.QUEUE.NAME'); $mqServer->put1($outMessage, 'OTHER.QUEUE.NAME'); // Открывем транзакцию $mqServer->begin(); // Получаем сообщение из очереди OTHER.QUEUE.NAME $mqQueue = $mqServer->openQueue('OTHER.QUEUE.NAME'); $mqQueue->get(); // Отправляем сообщение в очередь QUEUE.NAME с использованием put1 $mqServer->put1(new MQMessage('MSG1'), 'QUEUE.NAME'); // Отправляем еще 2 сообщения в очередь QUEUE.NAME и одно оттуда забираем через явное открытие очереди $mqQueueOut = $mqServer->openQueue('QUEUE.NAME'); $mqQueueOut->put(new MQMessage('MSG2')); $mqQueueOut->put(new MQMessage('MSG2')); $mqQueueOut->get(); // Фиксируем транзакцию $mqServer->commit(); // Если считать, что перед запуском скрипта очереди были путые, то в итоге в очереди OTHER.QUEUE.NAME будет одно сообщение, // a в очереди QUEUE.NAME - 2 сообщения. // Если явным образом откатить транзакцию // $mqServer->rollback(); // или явно ее не закоммитить, то в очереди OTHER.QUEUE.NAME будет 2 сообщения, а QUEUE.NAME останется пустой.
依赖
- 库与 PHP 7.0 及以上版本兼容。
- 为了运行,需要为 PHP7 修改过的 pecl 扩展 mqseries。