cgentry / message_queue
简单的消息队列接口。包括类似 SILEX 的守护系统
Requires
- php: >5.3.3
- ext-amqp: *
This package is not auto-updated.
Last update: 2024-09-28 14:22:37 UTC
README
消息服务类模仿 SILEX 框架。它们允许根据 AMQP 报头中的 AppID 字段简单路由消息。系统基于层次结构构建,允许程序员针对不同复杂度级别进行编码。
MessageQueue
MessageQueue 是一个库,允许您连接到 amqp 库并维护连接。它将在需要时尝试重新连接,并提供比标准 PHP 库更简单的接口。
MessageExceptions
此部分包含库中可能发生的所有异常。这里没有可用的代码,程序员应避免在他们的代码中使用这些异常。
ApiProcess
ApiProcess 封装将被调用的每个守护进程。它提供了运行接口,用于调用和解析参数引用。单独来看,只有几个有趣的接口,并且仅在与 MessageDaemon 结合使用时。
MessageDaemon
守护类实例将调用例程、队列和处理绑定到一个简单的类中。该类扩展了 MessageQueue 类,并使用 ApiProcess 保存处理信息。
设置
为了使用服务,您必须实例化和初始化服务。所有服务都尝试为您执行错误处理。它们提供相当简单的接口来设置服务。要使用 MessageDaemon 类,您需要了解如何使用 MessageQueue 类。
MessageQueue
$srv = new MessageQueue();
实例化一个新的消息队列对象。
$srv->setConnectionString( ‘amqp://host’ );
设置主机的连接字符串。这些值会在对象内部分割和处理,而不是需要设置多个值。值包括:amqp://username:password@host:port/vhost 任何一个都可以省略,将由 PHP 默认值填充。(参见 PHP 文档 https://php.ac.cn/manual/en/amqp.configuration.php 中的连接部分)
设置后,您可以调用 getConnection() 来检索一个 \AMQPConnection 对象。除非您需要连接对象,否则不需要创建一个;它们将自动为您创建。(仅在需要连接对象时调用。它将为您内部处理。)
$srv->getConnection();
通道
设置后,您可以调用 getChannel() 来检索一个 \AMQPChannel 对象。每个消息服务对象只有一个通道对象。这些对象会内部存储和检查,以确保它们有效且已打开。当您调用 getChannel 时,如果没有打开的通道,它将调用 getConnection() 来创建一个新的通道。不需要同时调用两个。(仅在需要连接对象时调用。它将为您内部处理。)
$srv->getChannel();
交换
在您可以将队列绑定到它们之前,交换必须存在,因此即使您没有意图发布消息,也可能需要创建交换。
$srv->createExchange(array( ‘name’=>’xname’, ‘parm’ => ‘value’ ) );
参数是一个键值对数组,值可以是简单变量或数组。每个交换都必须附加一个名称。这是强制性的,即使它是空的('')。不需要添加 declare,因为交换函数在创建时总是会发出 declare。选项有
- type => value(必需)
- name=>string(必需)
- flags => 整数值
- argument => array( string key , mixed value)
- arguments => array( key => value , key=>value…)
- declare => true(不是必需的)
您不能从 createExchange() 方法中调用 delete。
示例
$exParm = array( 'name' => 'xtest' , 'type' => \AMQP_EX_TYPE_DIRECT ); $qd = new MessageQueue(); $qd->setConnectionString( 'amqp://127.0.0.1' ) ->createExchange( $exParm )
一旦创建了交换,您就可以通过 getExchange() 来获取它。这允许您直接操作交换或使用它来发布消息。
$exchange = $srv->getExchange( ‘name’ );
名称是必需的,并且必须与创建交易所时传递的名称匹配。
交易所关联了一些实用函数。
$src->setAppId( ‘string’ );
应用程序ID是消息路由的方式。字符串用于发布消息时。
$srv->publish(‘name’ , $message, $route_key, $flags = \AMQP_NOPARAM, $attributes = array() );
这相当于
$attributes[ ‘timestamp’] = time(); // This is set by calling setAppId( ‘appid’); $attributes[ ‘appid’ ] = ‘appid’; $srv->getExchange( ‘name’ ) ->publish( $message, $route_key , $flags , $attributes );
队列
队列是用来接收消息的,必须在使用之前创建。MessageQueue中没有执行消息检索的函数。
$srv->createQueue( array( parms) );
参数可以是
- name => ‘string’ 与 setName( string ) 相同;
- declare => true 与 declare() 相同。这不是必需的,无论如何都会被调用。
- flags => integer 与 AMQPQueue setFlags 的值相同;
- argument => array( key, value) 与 setArgument( key, value) 相同;
- arguments => array( key=>value, key=>value) 与 setArguments( array() ) 相同;
- bind => array() 与 bind( array() ) 相同;
$srv->getQueue()
这将返回一个对象 \AMQPQueue,它是通过 createQueue( … ) 创建的;实用函数
$srv->setDebug( Boolean );
Set debugging output to on (true) or off (false)
$srv->isDebug();
返回 true(调试开启)或 false(调试关闭);
$srv->printDebug( $msg );
如果开启调试,则打印出 $msg。这将记录到错误日志中,并带有‘Debug: ‘前缀。$srv->isConnected(); 返回 true 或 false,根据是否存在活动连接和通道。它不检查队列的连接性。$srv->isQueueConnected(); 返回 true 或 false,根据是否存在活动连接和通道,并且队列看起来是打开的。实际上它并不测试队列连接,因为这需要访问消息。另见控制处理。消息守护进程
消息守护进程基于 MessageQueue 类并添加了额外的功能。必须使用创建队列和交易所的所有函数来创建这些对象。守护进程的关键是在消息信封头部找到 AppId 匹配时运行例程。
- 当例程匹配 AppID 时,将注册一个例程来运行。这是通过 match() 函数注册的。
- 当例程始终运行时,通过 always() 函数注册。
- 如果未发生匹配,将注册一个例程来运行。这是通过 noMatch() 函数注册的。此外,每个注册的例程都可以注册 before() 和 after() 例程,在主例程运行时执行。
一般的调用约定是
$srv = new \LWare\Queue\MessageDaemon(); $srv->match( ‘OnAppId’ , function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // processing routine } , ‘OptionalTagID’ );
匹配也可以是 always 或 noMatch。match() 函数的返回值将是一个 \LWare\Queue\ApiProcess 对象。第一个参数是它将与之匹配的内容,第二个是处理例程,第三个是可选的唯一标签(如果提供)。
通过从 match/always/noMatch 的返回值链式调用,您可以添加预处理和后处理例程:$srv->match( ‘OnAppId’ , function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 处理例程 } , ‘OptionalTagID’ ) ->before( function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 预处理例程 } ) , ->before( function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 预处理例程 } ), ->after( function ( \AMQPEnvelope $env , \AMQPQueue $queue ) { // 后处理例程 } ) ; 要添加注册其他函数,必须单独编码它们。
$srv->match(…); $srv->match(…); $srv->always(…);
可能发生多个匹配(一个以上的 AppId 匹配)。它将按照定义的顺序运行它们。以下可能是函数/方法参数的定义:\AMQPEnvelope 从 get 或 consume 调用返回的消息(信封)。\AMQPQueue 从 get 或 consume 的队列对象。用于确认消息。MessageDaemon 消息守护进程对象。用于控制处理。
名称和顺序无关紧要,因为反射用于确定参数。
$srv->runOne()
获取消息并运行任何匹配的例程。这将只处理一条消息。
$srv->run( $options )
获取所有消息并运行任何匹配的程序。这会调用 consume(),直到收到停止指示。$options 会被传递给 consume() 程序,只能是不传递或 AMQPNOACK。(参见 php AMQP 文档。)如何在程序间传递数据
有几种方法可以在程序间传递数据
- 使用 php $GLOBALS 来存储值。这是最简单的方法,并且不需要额外的代码。
- 使用 php closure uses( &$values ) 来存储值。MessageDaemon 类中没有内置的机制来处理这个问题。为了确保值被正确初始化,你可以在 always 或 before handling 中使用。例如
// 在开始之前设置和初始化守护程序存储
$srv->always( ‘setup’ , function (){ $GLOBALS[‘daemon’] = array(); } ); // add in all the other steps… $srv->match( ‘appid’ , ... ); // Post cleanup so storage isn’t just sitting around $srv->always( ‘setup’ , function (){ $GLOBALS[‘daemon’] = array(); } );
这解决了共享存储问题,而无需额外的机制。控制处理 为了实现更精细的处理控制,存在 MessageDaemon 处理机制。为了使用它们,你的程序必须在参数中包含 MessageDaemon 的引用。你可以调用的程序有
$d->setProcessingFlag( $flag );
设置处理标志为 $flag。
$d->getProcessingFlag( bool $printableform );
获取处理标志。如果你传递 true,它将返回用于调试的可打印字符串。
$d->checkProcessingBits( $flag );
如果 $flag 中的位被设置,则返回 true。这允许进行简单的位测试以用于控制。
$d->clearProcessingBits( $flag );
清除与 $flag 对应的已设置的位。
标志可以是以下之一
示例 --------// The exchange parameters $exParm = array( 'name' => 'xtest' , 'type' => \AMQP_EX_TYPE_DIRECT );$qd = new MessageDaemon();
/* Optionally set debugging for testing purposes */ $qd->setDebug( false ) ->setConnectionString( 'amqp://127.0.0.1' ) ->createExchange( $exParm ) ->createQueue( array( 'name'=> 'qtest' , 'bind' => array( 'xtest' , 'ktest') ) ) ->appId( $appid ,function( \AMQPQueue $q , \AMQPEnvelope $env, MessageDaemon $d ) { // Processing here…
$q->ack( $env->getDeliveryTag() ); } ); $qd->run(); // endless run