rstmpw/ibmmq

IBM WebSphere MQ 的 PHP 客户端

1.2.1 2020-06-05 12:02 UTC

This package is auto-updated.

Last update: 2024-09-05 21:16:34 UTC


README

安装

为了运行,需要为 PHP7 修改过的 pecl 扩展 mqseries

composer require rstmpw/ibmmq

使用

函数可能会抛出 \RuntimeException 或 \InvalidArgumentException

从队列中获取/发送消息

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

// Функция-помощник для формирования правильной структуры параметров подключения.
// Если используется внешнее хранилище параметров подключений, то там следует хранить уже
// сформированную структуру.
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');

// Соединяемся с сервером
$mqServer = new MQClient($mqConnOpts);

// Открываем очередь
$mqQueue = $mqServer->openQueue('QUEUE.NAME');
// Для удаленных очередей:
// $mqOueue = $mqServer->openQueue('REMOTE.QUEUE.NAME', [MQSERIES_MQOO_FAIL_IF_QUIESCING, MQSERIES_MQOO_OUTPUT]);

// Создаем сообщение и отправляем его в очередь
$outMessage = new MQMessage('Somebody message '.time());
echo "\n Data: ".$outMessage->data();
$mqQueue->put($outMessage);
echo "\n msgId: ".$outMessage->property('MsgId');

// Получаем сообщение из очереди
$inMessage = $mqQueue->get();
echo "\n Data: ".$inMessage->data();
echo "\n msgId: ".$inMessage->property('MsgId');

// Закрываем объект и соединение с сервром
// Явным образом можно не закрывать, деструкторы сделают все автоматически
$mqQueue->close();
unset($mqServer);

如果只需要将一条消息放入队列,可以不显式打开队列并使用 put1() 函数

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

// Создаем сообщение
$outMessage = new MQMessage('Somebody message '.time());

$mqServer->put1($outMessage, 'QUEUE.NAME');
echo "\n msgId: ".$outMessage->property('MsgId');

向另一个队列管理器发送消息

您的 QM 必须知道如何向另一个管理器发送消息(通过专用远程或传输队列)

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

// Создаем сообщение
$outMessage = new MQMessage('Somebody message '.time());

$mqServer->put1($outMessage, 'QM.REMOT//QUEUE.NAME');
echo "\n msgId: ".$outMessage->property('MsgId');

从队列中获取消息,并按 CorrelId 选择

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

$mqQueue = $mqServer->openQueue('QUEUE.NAME');

// Устанавливаем опции получения сообщения и выборки
$getOpts = [
    'Version' => MQSERIES_MQGMO_VERSION_2,
    'Options' => [MQSERIES_MQGMO_FAIL_IF_QUIESCING, MQSERIES_MQGMO_WAIT],
    'WaitInterval' => 60*1000, //60 sec
    'MatchOptions' => [MQSERIES_MQMO_MATCH_CORREL_ID]
];
$MQMD = [
    'MsgId' => MQSERIES_MQMI_NONE,
    'CorrelId' => 'NeedCorrelId'
];

// Получаем сообщение из очереди
$inMessage = $mqQueue->get($getOpts, $MQMD);
echo "\n Data: ".$inMessage->data();
echo "\n msgId: ".$inMessage->property('MsgId');
echo "\n correlId: ".$inMessage->property('CorrelId');

$mqQueue->close();
unset($mqServer);

记录/读取消息属性

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

// Создаем сообщение и устнавливаем свойства
$outMessage = new MQMessage('Somebody message '.time());
$outMessage->property('Priority', 7);
$outMessage->property('ReplyToQ', 'RESPONSE.QUEUE');
$outMessage->property('ReplyToQMgr', 'RESPONSE.QM');

// Отправляем сообщение
$mqServer->put1($outMessage, 'QUEUE.NAME');

// Получаем сообщение и смотрим свойства
$mqQueue = $mqServer->openQueue('QUEUE.NAME');
$inMessage = $mqQueue->get();
echo "\n Data: ".$inMessage->data();
echo "\n msgId: ".$inMessage->property('MsgId');
echo "\n Priority: ".$inMessage->property('Priority');
echo "\n ReplyToQ: ".$inMessage->property('ReplyToQ');
echo "\n ReplyToQMgr: ".$inMessage->property('ReplyToQMgr');

发布主题

与发送消息类似,区别在于使用主题名称而不是队列名称。主题名称由 '/' 分隔的字符串组成,例如 'news/sport/nascar'。

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

// Открываем топик
$mqQueue = $mqServer->openTopic('news/sport/nascar');

// Создаем сообщение и публикуем его в топик
$outMessage = new MQMessage('Latest news about nascar racing');
$mqQueue->put($outMessage);

也支持通过 put1 简化调用,但主题名称必须以 'TOPIC::' 开头

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

// Создаем сообщение и публикуем его в топик
$outMessage = new MQMessage('Latest news about nascar racing');
$mqServer->put1($outMessage, 'TOPIC::news/sport/nascar/winners/top/1');

事务

与数据库中的事务类似

<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;

$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);

// Отправляем 2 сообщения вне транзакции
$outMessage = new MQMessage('Put message w/o transaction');
$mqServer->put1($outMessage, 'OTHER.QUEUE.NAME');
$mqServer->put1($outMessage, 'OTHER.QUEUE.NAME');

// Открывем транзакцию
$mqServer->begin();

// Получаем сообщение из очереди OTHER.QUEUE.NAME
$mqQueue = $mqServer->openQueue('OTHER.QUEUE.NAME');
$mqQueue->get();

// Отправляем сообщение в очередь QUEUE.NAME с использованием put1
$mqServer->put1(new MQMessage('MSG1'), 'QUEUE.NAME');

// Отправляем еще 2 сообщения в очередь QUEUE.NAME и одно оттуда забираем через явное открытие очереди
$mqQueueOut = $mqServer->openQueue('QUEUE.NAME');
$mqQueueOut->put(new MQMessage('MSG2'));
$mqQueueOut->put(new MQMessage('MSG2'));
$mqQueueOut->get();

// Фиксируем транзакцию
$mqServer->commit();
// Если считать, что перед запуском скрипта очереди были путые, то в итоге в очереди OTHER.QUEUE.NAME будет одно сообщение,
// a в очереди QUEUE.NAME - 2 сообщения.
// Если явным образом откатить транзакцию
// $mqServer->rollback();
// или явно ее не закоммитить, то в очереди OTHER.QUEUE.NAME будет 2 сообщения, а QUEUE.NAME останется пустой.

依赖

  • 库与 PHP 7.0 及以上版本兼容。
  • 为了运行,需要为 PHP7 修改过的 pecl 扩展 mqseries