silverstripe / messagequeue
适用于 SilverStripe 应用的简单轻量级消息队列机制,支持长运行过程执行队列操作、安装间消息发送以及与外部消息系统(如 ApacheMQ)的双向交互。
Requires
This package is not auto-updated.
Last update: 2024-07-10 21:43:03 UTC
README
消息队列模块
简介
MessageQueue 模块为 SilverStripe 应用提供了一种简单、轻量级的消息队列机制。它支持以下特性
- 为长运行过程执行队列操作
- 在 SilverStripe 安装间发送消息
- 与外部消息系统(如 ApacheMQ)的双向交互。
需求
- 如果使用 Stomp,则需要 Stomp.php(注意:仅限实验性使用)
- processOnShutdown 选项需要 *nix 或 OS X(在 Windows 上无法使用)
安装
将 messagequeue 解压缩到 SilverStripe 应用的基本文件夹中。默认配置适用于,以下有文档说明。确保其名称为 "messagequeue"。在 mysite/_config.php 中,放置设置模块接口配置的任何代码。
关键特性
模块的行为主要由应用配置决定,配置通过 MessageQueue::add_interface() 和 MessageQueue::remove_interface() 方法设置。这些方法用于设置一个或多个命名的 接口。
每个接口提供一个或多个命名的 队列。每个接口还指定了一个 实现,这是一个实现消息队列或与外部消息队列组件(如 ApacheMQ)接口的类。模块包括三个实现
- SimpleDBMQ - 在 SilverStripe 数据库中实现消息队列。此实现没有外部依赖。
- SimpleInterSSMQ - 实现将消息发送到另一个 SilverStripe 安装,通过 HTTP 进行。
- StompMQ - 使用 Stomp 协议实现与外部消息队列组件(如 ApacheMQ)的接口。
有两个主要操作
- send - 应用可以将消息发送到队列。
- receive - 可以从队列接收消息,并将其传递给应用。
消息格式
消息封装在 MessageFrame 对象中,其中包含
- headers - name => value 对的映射。
- body - 消息体。
发送行为
使用 MessageQueue.send() 发送消息。首先确定要发送的接口,根据队列名称。选定的接口是第一个其 queueName 选项与提供的队列名称匹配的接口。队列名称可以是正则表达式、队列名称数组、单个队列名称或 null。如果为 null,则匹配队列名称,因此作为通配符。
接口配置还包括编码。send() 将使用此编码来编码消息。默认为 "php-serialize",它调用 PHP 函数来编码消息。这意味着可以传递任何 PHP 对象并作为消息传递和接收,这对于应用向自身发送消息非常有用。
一旦消息编码,它将通过指定的实现类发送。
接收行为
接收消息有两个不同的阶段
- 从队列实现获取消息。检索委托给实现类。
- 将消息传递给应用。这是通过 MessageQueue 类集中完成的,以确保行为一致。
消息接收可以根据情况以不同的方式触发。
- 通过cron作业或其他外部触发器。
- 在PHP关闭后启动的子进程中(这样可以在不引入用户交互延迟的情况下,几乎同时处理应用程序中的事件队列,并且对于缓冲队列,可以将消息发送到远程系统,而不在页面请求过程中发送)。
- 在应用程序中程序化地执行。
通常以批量方式接收消息,可以选择限制这一点。一旦通过实现类接收一组消息,每个消息将根据接口上的编码选项进行解码,并尝试发送。
有3种消息发送选项。
- 如果消息是具有execute()方法的PHP对象,则执行该方法。
- 如果接口提供了回调,则使用消息作为参数调用该回调。
- 如果接口指定应用程序自行获取其消息,则仅在应用程序请求时才发送消息(此选项适用于应用程序的正常页面请求执行需要与用户交互同步处理消息的情况)。
注意
- 队列实现类保证检索任何一组消息都是原子的,这样多个进程可以处理队列,但可以保证每个消息发送尝试只执行一次。消息只能成功发送一次。
输出缓冲
在向远程系统发送消息时,通常有益于缓冲输出消息并在发送PHP请求之外发送它们。输出缓冲配置非常简单,涉及设置另一个队列作为缓冲区。例如
MessageQueue::add_interface("default", array( "queues" => array("remote"), "implementation" => "SimpleInterSSMQ", "implementation_options" => array( "remoteServer" => "http://myothersite.com/SimpleInterSSMQ_Accept" ), "encoding" => "php_serialize", "send" => array( "buffer" => "remote_buffer", "onShutdown" => "flush" ), "delivery" => array( "onerror" => array("log") ) )); MessageQueue::add_interface("buffer", array( "queues" => array("remote_buffer"), "implementation" => "SimpleDBMQ", "send" => array( "onShutdown" => "none" ), "delivery" => array( "onerror" => array("log") ) ));
主要要点是
- 队列
remote
在发送选项中指定了一个缓冲区。这是缓冲队列的名称。 onShutdown
指定队列应在关闭时刷新,这将在另一个进程中完成。- 使用SimpleDBMQ接口创建另一个队列,即remote_buffer。该队列配置为不在关闭时处理(不应显式消费)
当向remote
发送消息时,实际上消息被发送到缓冲队列。当remote
队列刷新时,它将读取缓冲队列上排队的信息,然后在此点通过实际配置的接口(在这种情况下,SimpleInterSSMQ)发送它们。
异常处理
如果在发送过程中发生异常(即在对象执行execute()方法或回调期间),接口配置中的'onerror'部分确定要执行的操作。通常,这是一个可以包括以下内容的命令数组:
- 通过SS_Log::log()记录错误
- 丢弃消息
- 重新排队消息,在同一个队列(用于重试)或另一个队列(例如,可能有一个错误队列)
自动执行消息
为了方便,提供了一个名为MethodInvocationMessage的类,该类封装了以下形式之一的方法调用
- 带有参数的静态方法调用
- 对DataObject实例的实例方法调用,带有参数
- 对任意对象的实例方法调用,带有参数
当接收到此类对象的消息时,它将自动执行,而不是通过回调将消息发送到应用程序。该类的另一个特点是捕获用户错误并作为异常抛出,因此用户错误受消息处理引擎中真实异常相同的异常处理约束。
一般来说,任何可序列化和实现MessageExecutable的类都可以以这种方式调用。这对于在长时间运行过程中轻松创建要执行的操作或命令非常有用。这些消息被认为是“自发送”的,尽管它们受异常处理约束。
配置选项和示例
默认配置
默认配置是
MessageQueue::add_interface("default", array( "queues" => "/.*/", "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "all" ), "delivery" => array( "onerror" => array( "log" ) ) ));
实际行为是任何发送到队列的消息将在PHP关闭时处理。(注意:如果设置了此选项,则从另一个PHP关闭函数发送的消息将不会被消费)。
示例1
MessageQueue::send( "myqueue", new MethodInvocationMessage("MyClass", "someStatic", "p1", 2) );
这将导致从PHP关闭启动的子进程中调用MyClass::someStatic("p1", 2)静态方法。错误将被记录。
注意:如果您不想在您的网站上使用默认行为,您必须在添加您想使用的接口之前调用MessageQueue::remove_interface("default")。
多个队列
MessageQueue::add_interface("myinterface", array( "queues" => array("queue1", "queue2"), "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "delivery" => array( "onerror" => array( "log", "requeue" => "queue2" ) ) )); MessageQueue::add_interface("default", array( "queues" => "/.*/", "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "all" ), "delivery" => array( "onerror" => array( "log" ) )));
此配置有两个显式命名的队列,queue1和queue2。它们不会在关闭时处理,因此必须显式调用MessageQueue_Consume来处理接收到的这些队列的消息。这些队列中的任一队列上的错误将被记录并重新入队到queue2。发送到任何其他队列的消息将由第二个接口处理,该接口在PHP关闭时处理。
SimpleInterSSMQ
此接口提供了一种简单的方法,可以从一个SilverStripe安装发送消息到另一个安装,而不需要安装任何额外的软件。它通过发送者向目标控制器发起HTTP请求并接受消息来实现。
发送示例配置如下
MessageQueue::add_interface("default", array( "queues" => array("mydest"), "implementation" => "SimpleInterSSMQ", "implementation_options" => array( "remoteServer" => "http://mydestination.com/SimpleInterSSMQ_Accept" ), "encoding" => "php_serialize", "send" => array( "buffer" => "mydest_buffer", "onShutdown" => "flush" ), "delivery" => array( "onerror" => array( "log" ) ) ));
MessageQueue::add_interface("buffer", array( "queues" => array("mydest_buffer"), "implementation" => "SimpleDBMQ", "delivery" => array( "onerror" => array("log") ) ));
它设置了一个名为mydest
的队列。implementation_options
指定了远程接受控制器。此示例还指定了一个缓冲队列mydest_buffer
。当消息发送到mydest
时,它们被缓冲到mydest_buffer
中,并且实际上是通过PHP关闭启动的进程,以更好地提高用户体验。
在目标处,mysite/_config.php中应包含以下内容:SimpleInterSSMQ_Accept::setEnabled(true);
这是必需的,因为出于安全目的,默认情况下禁用了SimpleInterSSMQ_Accept控制器。
从源发送消息是一个简单的消息发送
MessageQueue::send("mydest", $someObject);
甚至是一个自我调用的消息
MessageQueue::send("mydest", new MethodInvocationMessage("SomeClass", "someMethod", $parameter));
ApacheMQ
(此示例不完整。我们需要记录如何通过认证细节,以及如何使用Stomp的持久性客户端功能。)
MessageQueue::add_interface("myinterface", array( "queues" => array( "stompqueue1", "stompqueue2" ), "implementation" => "StompMQ", "encoding" => "raw", "delivery" => array( "onerror" => array( "log", "requeue" => "queue2" ) ) )); MessageQueue::add_interface("default", array( "queues" => "background", "implementation" => "SimpleDBMQ", "encoding" => "php_serialize", "send" => array( "onShutdown" => "all" ), "delivery" => array( "onerror" => array( "log" ) ) ));
在此示例中,定义了两个队列"stompqueue1"和"stompqueue2",并通过StompMQ类处理消息处理。这些队列名称直接传递给Stomp,因此它们是外部识别的队列名称,而不仅仅是SilverStripe应用程序内部的队列名称。
第二个接口提供了"后台"队列,与之前的内部排队和处理在关闭时相同。
错误处理选项
以下配置片段显示了当前可用的处理投递异常的形式。
...
"delivery" => array(
"log",
"requeue",
"requeue" => "errorQueue",
"callback" => array("MyClass", "method"),
"drop"
),
...
这是一个这些形式的命令列表,因此可以采取多个操作。
- "log"通过SS_Log::log记录消息。请注意,SS_Log有记录错误的位置选项,包括通知电子邮件。这需要在应用程序中单独配置。
- "requeue"将消息放回相同的队列以供稍后处理。(现有的队列行为将排除在相同队列消费调用中再次执行的消息)
- "requeue" => "queue"将消息放入命名的队列以供稍后处理。
- "callback" => $method调用指定的方法。$method应该是一个有效的回调定义。回调函数接收两个参数,异常对象和未能成功投递的消息帧。
- "drop"不执行任何操作。如果单独使用,异常消息将被丢弃。
指定投递的回调
...
"delivery" => array(
"callback" => array("MyClass", "method")
),
...
使用此选项,任何接收到的且未实现MethodExecutable的消息将被传递到指定的回调函数。该值是call_user_func_array
的方法指定符,因此可以通过提供类名和静态方法名称来识别静态函数。回调的签名是function callback($msgFrame, $config)
它传递了传入的消息帧(解码)和接收该接口的配置。
自定义关闭处理
如果您希望在关闭时处理消息,但您的应用程序需要一个排队消息的关闭函数,那么就会有问题,因为关闭函数是按照PHP执行它们的顺序执行的。处理这个问题的一个方法如下。在接口配置中,使用registerShutdown属性。
...
"send" => array(
"onShutdown" => "all",
"registerShutdown" => false,
)
...
然后在您的自定义关闭函数中执行以下操作
MessageQueue::send("myqueue", new MethodInvocationMessage("MyClass", "my_method")); // force MessageQueue to spawn the process that handles the messages as it // normally would on shutdown. MessageQueue::consume_on_shutdown();
重新触发队列处理
当在关闭时发送消息时,默认行为是启动一个进程来发送队列中的所有消息。有时您可能想要限制单个进程中发送的消息数量,但仍想发送所有消息。例如,如果您必须对大量对象执行内存密集型操作,尝试在一个进程中发送所有消息可能会导致PHP耗尽内存。消息队列提供了retrigger
和onShutdownMessageLimit
选项,可以用来解决这个问题。onShutdownMessageLimit
设置由单个PHP进程异步执行时发送的队列中项目数量的限制。retrigger
会导致异步进程在队列中仍有未发送消息时启动进一步的进程。
要配置此行为,请执行以下操作
...
"send" => array(
"onShutdown" => "all",
"retrigger" => "yes", // on consume, retrigger if there are more items
"onShutdownMessageLimit" => "1" // one message per async process
)
...
注意
- 在一般情况下,初始关闭将导致一系列同步PHP进程,这些进程最终将清除队列中的所有消息。
- 这并不保证只有一个消费者。如果有两个单独的HTTP请求都向队列发送消息,可能会出现两个进程都在发送消息的情况(然而,给定消息只会由其中之一执行。)如果进程可能产生不利交互,则必须谨慎处理。
队列语法
接口配置中的"queues"选项可以是以下形式之一
"queues" => "myqueue"
指定一个命名队列。"queues" => array("queue1", "queue2")
指定命名队列的列表。- `"queues" => "/.*AppQ$/"指定一个正则表达式,用于匹配队列名称。正则表达式必须以正斜杠开始和结束。在这个例子中,任何以AppQ结尾的队列名称,例如MyAppQ,都会与接口匹配。
指定重新排队以发送
...
"delivery" => array(
"requeue" => array(
"queue" => "otherQueue",
"immediate" => true
)
),
...
使用此选项,您可以在尝试发送时将其放入另一个队列。如果设置了立即选项,则在进程中尝试发送该队列中的消息。如果immediate为false,则不会尝试从其他队列立即发送消息 - 它将根据该队列的发送执行规则进行发送。
启动消息队列处理
在队列上可以启动两种不同的进程,如下所示
flush
仅对缓冲队列有效,并将导致缓冲消息发送到实际目的地。consume
将导致检索和发送队列上的消息。
通常,可以在队列上执行这两个操作中的一个或两个。
在PHP关闭时
要在启动发送的进程的PHP关闭时启动队列处理,您需要在接口配置上设置onShutdown
选项。onShutdown
可以是一个字符串的单个选项,也可以是一个选项字符串的数组。有效的选项有
flush
- 启用队列的刷新。consume
- 启用队列的消费。all
- 刷新和消费none
- 不执行任何操作 - 不会调用任何进程。
刷新在消费之前调用。
默认情况下,这会在子进程中调用MessageQueue_Process控制器,使用'sake'。此进程可以在主请求进程完成后继续执行。
在某些环境中(尤其是在存在多个编译方式不同的PHP二进制文件的环境中——例如MacOS X内置与MAMP,这是一个典型的例子),'sake' 可能无法正常工作。如果遇到这种情况,您可以在 mysite/_config.php 中调用以下代码:
MessageQueue::set_onshutdown_option("phppath", $pathToPhp);
如果设置了此选项,子进程将使用提供的php二进制文件而不是sake。
注意:这可能在不同开发、测试和生产环境中有所不同。
外部使用Sake
可以使用以下命令处理队列中的消息:
sake MessageQueue_Process "queue=myqueue&actions=all"
这将清除并消费 myqueue 上的所有消息,并将它们传递给应用程序。
您可以限制处理的消息条数。
sake MessageQueue_Process "queue=myqueue&limit=10&actions=consume"
actions
查询字段可以是一个包含 flush
、consume
、all
或 none
的逗号分隔列表。
您可以使用cron调度队列消费。
外部使用wget
在没有外部php二进制文件的环境(例如,只有mod_php)中,您可能需要使用wget来初始化对MessageQueue_Consume控制器的调用。
待办事项
- 完成StompMQ并对其进行测试。目前不支持的功能包括身份验证。
- 更多测试案例
- 队列消费选项,以原子方式处理消息。也就是说,一次处理一条消息,这样如果出现故障,我们就不会丢失大量消息。我们还可以在实现类中使用事务,以确保完全失败时消息仍然存在。优点:更健壮。缺点:性能较差(开销较大)。
- 重试异常消息的选项。可能是重试频率、最大重试次数。需要做到以下几点:使用头部信息存储信息(但无法保证不同实现者之间的一致性);或将它推送到实现层进行处理。
已知问题
MacOS X上MAMP的PHP关闭时消息消费问题
如果消息队列在关闭时似乎正在清除,但消息没有被传递(例如,回调没有执行),请启用调试。如果您看到以下信息:
符号未找到:__cg_jpeg_resync_to_restart
您需要确保 /Applications/MAMP/Library/bin/envvars 包含以下内容:
DYLD_LIBRARY_PATH="/Applications/MAMP/Library/lib:$DYLD_LIBRARY_PATH"
export DYLD_FALLBACK_LIBRARY_PATH=/Applications/MAMP/Library/lib
诊断PHP关闭时消息队列处理问题
默认情况下,当启动一个进程来在PHP关闭时清除队列时,该进程将输出重定向到 /dev/null。为了帮助调试这些进程,可以通过调用 MessageQueue::set_debugging 来设置写入日志文件的目录,并将stdout和stderr都重定向到该目录中的实际文件。
远程发送的数据对象
目前,当数据对象作为消息体发送时,它会序列化为一个指定类和ID的数据对象。当发送到远程系统时,无法保证该类存在,或者存在具有该类和ID的对象。请添加一个配置选项,可能是一个不同的序列化,以便消息通过值序列化,而不是引用,以用于远程发送。
待办事项
- 允许从缓冲队列接收消息,尤其是远程消息。
- SimpleInterSSMQ实现拉取行为,而不仅仅是推送。
- 在文档中提供使用SimpleInterSSMQ捕获onPublish并在远程站点上重新发布的示例。
- 为队列提供单个消费者,以确保没有两条消息同时由不同的进程发送。