nimbly/辛迪加

支持消息派发的简单队列消费者框架。

0.8 2020-04-14 21:23 UTC

This package is auto-updated.

Last update: 2024-09-15 07:55:56 UTC


README

安装

composer require nimbly/syndicate

基本用法

创建队列实例

$queue = new Syndicate\Queue\Sqs(
    "https://queue.url",
    new SqsClient([
        'version' => 'latest',
        'region' => 'us-west-2'
    ])
);

监听队列

监听是一个 阻塞 调用,并以无限循环运行。当有新的消息到达时,您的回调函数将被触发。

$queue->listen(function(Message $message): void {

	/**
	 *
	 *  Process the message...
	 *
	 */

	// Delete the message from Queue.
	$message->delete();

});

设置自定义序列化和反序列化器

默认情况下,辛迪加假定您的消息是 JSON 格式,并将尝试相应地进行自动序列化和反序列化。

但是,如果您的消息是其他格式,您可以提供自己的序列化和反序列化回调函数。

序列化器应用于所有传出消息的有效负载。

$queue->setSerializer(function($message): string {

    return \json_encode($message);

});

反序列化回调函数应用于所有传入消息的有效负载。

例如,要处理从 SQS 反序列化消息有效负载,该消息由 SNS 转发,您可以传递以下反序列化回调函数。

$queue->setDeserializer(function($payload) {

    $payload = \json_decode($payload);

    if( \property_exists($payload, "Type") &&
        $payload->Type === "Notification" ){
        return \json_decode($payload->Message);
    }

    return $payload;

});

关闭队列

您可以使用 shutdown() 方法关闭队列。

队列实例将以安全的方式响应 PCNTL 信号,而不会在消息处理中间中断。您可以在代码中安装信号处理程序,以干净和安全地关闭服务。

\pcntl_signal(
	SIGINT,
	function() use ($queue): void {

		Log::info("[SIGNAL] Shutting down queue.");
		$queue->shutdown();

	}
);

路由和派发

通过使用 DispatcherRouter,您可以将消息传递给特定的处理器。如何路由取决于您和消息格式。

通常,消息将包含消息类型或事件名称 - 这些是路由的关键。

路由器

通过传递一个 callable 路由解析器和键值对数组作为路由定义来创建一个新的 Router 实例。

路由解析器

路由解析器负责接收传入的消息实例,找到匹配的路由以派发消息。

调度程序将遍历所有配置的路由,并将消息和路由传递给解析器。

解析器必须简单返回一个 bool 值,表示消息是否匹配给定的路由。

路由定义

路由定义是一个键值对数组,将任何键映射到 callable、格式为 Full\Namespace\ClassName@methodNamestring 或上述数组的数组。

$router = new Router(function(Message $message, string $routeKey): bool {

    return $message->getPayload()->eventName == $routeKey;

}, [

	'UserLoggedOff' => function(Message $message): void {
		// Do some session cleanup stuff...
	},

	'UserRegistered' => '\App\Handlers\UserHandler@userRegistered',

    'UserClosedAccount' => [
		'\App\Handlers\UserHandler@userAccountClosed',
		'\App\Handlers\NotificationHandler@userAccountClosed'
	]

]);

调度程序

通过传递 Router 实例来创建一个新的 Dispatcher 实例。

$dispatcher = new Dispatcher($router);

PSR-11 容器支持

Dispatcher 可以接受 PSR-11 兼容的 ContainerInterface 实例,在将匹配的消息派发到处理器时用于依赖关系解析。

$dispatcher = new Dispatcher(
	$router,
	$container
);

或者您可以直接调用 setContainer 方法。

$dispatcher->setContainer($container);

Dispatcher 将尝试解析处理器所需的任何依赖关系,包括 Syndicate\Message 实例。

添加默认处理器

如果 Router 无法解析 Message 的路由,则 Dispatcher 将尝试将消息传递给其默认处理器。

默认处理器可以设置为一个 callable,并接受 Message 实例。

$dispatcher->setDefaultHandler(function(Message $message): void {

    Log::critical("No route defined for {$message->getPayload()->eventName}!");
    $message->release();

});

如果消息无法派发且未提供默认处理器,将抛出 DispatchException

与队列一起使用调度程序

$queue->listen(function(Message $message) use ($dispatcher): void {

	$dispatcher->dispatch($message);

});