cgentry/message_queue

简单的消息队列接口。包括类似 SILEX 的守护系统

v1.0.1 2013-09-09 13:57 UTC

This package is not auto-updated.

Last update: 2024-09-28 14:22:37 UTC


README

消息服务类模仿 SILEX 框架。它们允许根据 AMQP 报头中的 AppID 字段简单路由消息。系统基于层次结构构建,允许程序员针对不同复杂度级别进行编码。

MessageQueue

MessageQueue 是一个库,允许您连接到 amqp 库并维护连接。它将在需要时尝试重新连接,并提供比标准 PHP 库更简单的接口。

MessageExceptions

此部分包含库中可能发生的所有异常。这里没有可用的代码,程序员应避免在他们的代码中使用这些异常。

ApiProcess

ApiProcess 封装将被调用的每个守护进程。它提供了运行接口,用于调用和解析参数引用。单独来看,只有几个有趣的接口,并且仅在与 MessageDaemon 结合使用时。

MessageDaemon

守护类实例将调用例程、队列和处理绑定到一个简单的类中。该类扩展了 MessageQueue 类,并使用 ApiProcess 保存处理信息。

设置

为了使用服务,您必须实例化和初始化服务。所有服务都尝试为您执行错误处理。它们提供相当简单的接口来设置服务。要使用 MessageDaemon 类,您需要了解如何使用 MessageQueue 类。

MessageQueue

 $srv = new MessageQueue();

实例化一个新的消息队列对象。

$srv->setConnectionString( ‘amqp://host’ );

设置主机的连接字符串。这些值会在对象内部分割和处理,而不是需要设置多个值。值包括:amqp://username:password@host:port/vhost 任何一个都可以省略,将由 PHP 默认值填充。(参见 PHP 文档 https://php.ac.cn/manual/en/amqp.configuration.php 中的连接部分)

设置后,您可以调用 getConnection() 来检索一个 \AMQPConnection 对象。除非您需要连接对象,否则不需要创建一个;它们将自动为您创建。(仅在需要连接对象时调用。它将为您内部处理。)

$srv->getConnection();

通道

设置后,您可以调用 getChannel() 来检索一个 \AMQPChannel 对象。每个消息服务对象只有一个通道对象。这些对象会内部存储和检查,以确保它们有效且已打开。当您调用 getChannel 时,如果没有打开的通道,它将调用 getConnection() 来创建一个新的通道。不需要同时调用两个。(仅在需要连接对象时调用。它将为您内部处理。)

$srv->getChannel();

交换

在您可以将队列绑定到它们之前,交换必须存在,因此即使您没有意图发布消息,也可能需要创建交换。

$srv->createExchange(array( ‘name’=>’xname’, ‘parm’ => ‘value’ ) );

参数是一个键值对数组,值可以是简单变量或数组。每个交换都必须附加一个名称。这是强制性的,即使它是空的('')。不需要添加 declare,因为交换函数在创建时总是会发出 declare。选项有

  • type => value(必需)
  • name=>string(必需)
  • flags => 整数值
  • argument => array( string key , mixed value)
  • arguments => array( key => value , key=>value…)
  • declare => true(不是必需的)

您不能从 createExchange() 方法中调用 delete。

示例

$exParm = array(
            'name' => 'xtest'  ,
            'type' => \AMQP_EX_TYPE_DIRECT
        );
        $qd = new MessageQueue();
        $qd->setConnectionString( 'amqp://127.0.0.1' )
           ->createExchange( $exParm )

一旦创建了交换,您就可以通过 getExchange() 来获取它。这允许您直接操作交换或使用它来发布消息。

$exchange = $srv->getExchange( ‘name’ );

名称是必需的,并且必须与创建交易所时传递的名称匹配。

交易所关联了一些实用函数。

$src->setAppId( ‘string’ );

应用程序ID是消息路由的方式。字符串用于发布消息时。

$srv->publish(‘name’ , 
$message, 
$route_key, 
$flags = \AMQP_NOPARAM,
$attributes = array()
);

这相当于

$attributes[ ‘timestamp’] = time();
// This is set by calling setAppId( ‘appid’);
$attributes[ ‘appid’ ] = ‘appid’;  
$srv->getExchange( ‘name’ )
    ->publish( $message, $route_key , $flags , $attributes );

队列

队列是用来接收消息的,必须在使用之前创建。MessageQueue中没有执行消息检索的函数。

$srv->createQueue( array( parms) );

参数可以是

  • name => ‘string’ 与 setName( string ) 相同;
  • declare => true 与 declare() 相同。这不是必需的,无论如何都会被调用。
  • flags => integer 与 AMQPQueue setFlags 的值相同;
  • argument => array( key, value) 与 setArgument( key, value) 相同;
  • arguments => array( key=>value, key=>value) 与 setArguments( array() ) 相同;
  • bind => array() 与 bind( array() ) 相同;
$srv->getQueue()

这将返回一个对象 \AMQPQueue,它是通过 createQueue( … ) 创建的;实用函数

$srv->setDebug( Boolean );
Set debugging output to on (true) or off (false)
$srv->isDebug();

返回 true(调试开启)或 false(调试关闭);

$srv->printDebug( $msg );

如果开启调试,则打印出 $msg。这将记录到错误日志中,并带有‘Debug: ‘前缀。$srv->isConnected(); 返回 true 或 false,根据是否存在活动连接和通道。它不检查队列的连接性。$srv->isQueueConnected(); 返回 true 或 false,根据是否存在活动连接和通道,并且队列看起来是打开的。实际上它并不测试队列连接,因为这需要访问消息。另见控制处理。消息守护进程

消息守护进程基于 MessageQueue 类并添加了额外的功能。必须使用创建队列和交易所的所有函数来创建这些对象。守护进程的关键是在消息信封头部找到 AppId 匹配时运行例程。

  • 当例程匹配 AppID 时,将注册一个例程来运行。这是通过 match() 函数注册的。
  • 当例程始终运行时,通过 always() 函数注册。
  • 如果未发生匹配,将注册一个例程来运行。这是通过 noMatch() 函数注册的。此外,每个注册的例程都可以注册 before() 和 after() 例程,在主例程运行时执行。

一般的调用约定是

$srv = new \LWare\Queue\MessageDaemon();
$srv->match( ‘OnAppId’ , 
function ( \AMQPEnvelope $env , \AMQPQueue $queue ) 
{
		// processing routine
	} ,
	‘OptionalTagID’
	);

匹配也可以是 always 或 noMatch。match() 函数的返回值将是一个 \LWare\Queue\ApiProcess 对象。第一个参数是它将与之匹配的内容,第二个是处理例程,第三个是可选的唯一标签(如果提供)。

通过从 match/always/noMatch 的返回值链式调用,您可以添加预处理和后处理例程:$srv->match( ‘OnAppId’ , function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 处理例程 } , ‘OptionalTagID’ ) ->before( function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 预处理例程 } ) , ->before( function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 预处理例程 } ), ->after( function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 后处理例程 } ) ; 要添加注册其他函数,必须单独编码它们。

$srv->match(…);
$srv->match(…);
$srv->always(…);

可能发生多个匹配(一个以上的 AppId 匹配)。它将按照定义的顺序运行它们。以下可能是函数/方法参数的定义:\AMQPEnvelope 从 get 或 consume 调用返回的消息(信封)。\AMQPQueue 从 get 或 consume 的队列对象。用于确认消息。MessageDaemon 消息守护进程对象。用于控制处理。

名称和顺序无关紧要,因为反射用于确定参数。

$srv->runOne()

获取消息并运行任何匹配的例程。这将只处理一条消息。

	$srv->run( $options )

获取所有消息并运行任何匹配的程序。这会调用 consume(),直到收到停止指示。$options 会被传递给 consume() 程序,只能是不传递或 AMQPNOACK。(参见 php AMQP 文档。)如何在程序间传递数据

有几种方法可以在程序间传递数据

  • 使用 php $GLOBALS 来存储值。这是最简单的方法,并且不需要额外的代码。
  • 使用 php closure uses( &$values ) 来存储值。MessageDaemon 类中没有内置的机制来处理这个问题。为了确保值被正确初始化,你可以在 always 或 before handling 中使用。例如

// 在开始之前设置和初始化守护程序存储

$srv->always( ‘setup’ , function (){ $GLOBALS[‘daemon’] = array(); } );
// add in all the other steps…
$srv->match( ‘appid’ , ... );
// Post cleanup so storage isn’t just sitting around
$srv->always( ‘setup’ , function (){ $GLOBALS[‘daemon’] = array(); } );

这解决了共享存储问题,而无需额外的机制。控制处理 为了实现更精细的处理控制,存在 MessageDaemon 处理机制。为了使用它们,你的程序必须在参数中包含 MessageDaemon 的引用。你可以调用的程序有

$d->setProcessingFlag( $flag );

设置处理标志为 $flag。

$d->getProcessingFlag( bool $printableform );

获取处理标志。如果你传递 true,它将返回用于调试的可打印字符串。

$d->checkProcessingBits( $flag );

如果 $flag 中的位被设置,则返回 true。这允许进行简单的位测试以用于控制。

$d->clearProcessingBits( $flag );

清除与 $flag 对应的已设置的位。

标志可以是以下之一

示例 --------
// The exchange parameters
$exParm = array(
            'name' => 'xtest'  ,
            'type' => \AMQP_EX_TYPE_DIRECT
        );

$qd = new MessageDaemon();

/* Optionally set debugging for testing purposes */ $qd->setDebug( false ) ->setConnectionString( 'amqp://127.0.0.1' ) ->createExchange( $exParm ) ->createQueue( array( 'name'=> 'qtest' , 'bind' => array( 'xtest' , 'ktest') ) ) ->appId( $appid ,function( \AMQPQueue $q , \AMQPEnvelope $env, MessageDaemon $d ) { // Processing here…
$q->ack( $env->getDeliveryTag() ); } ); $qd->run(); // endless run