nimbly / 辛迪加
支持消息派发的简单队列消费者框架。
Requires
- php: >=7.2
- ext-json: *
- psr/container: ^1.0
Requires (Dev)
- aws/aws-sdk-php: ~3.0
- google/cloud-pubsub: ^1.7
- iron-io/iron_mq: ^4.0
- microsoft/azure-storage-queue: ^1.3
- nimbly/carton: ^1.0
- pda/pheanstalk: ~3.0
- php-coveralls/php-coveralls: ^2.1
- phploc/phploc: ^5.0
- phpunit/phpunit: ^8.0
- predis/predis: ~1.0
- squizlabs/php_codesniffer: ^3.4
- symfony/var-dumper: ^4.2
- vimeo/psalm: ^3.1
Suggests
- aws/aws-sdk-php: To add support for AWS Simple Queue Service (SQS).
- google/cloud-pubsub: To add support for Google Cloud Pubsub.
- microsoft/azure-storage-queue: To add support for Microsoft Azure Storage Queue.
- pda/pheanstalk: To add support for Beanstalkd queue.
- predis/predis: To add support for Redis based queue and pub/sub functionality.
README
安装
composer require nimbly/syndicate
基本用法
创建队列实例
$queue = new Syndicate\Queue\Sqs( "https://queue.url", new SqsClient([ 'version' => 'latest', 'region' => 'us-west-2' ]) );
监听队列
监听是一个 阻塞 调用,并以无限循环运行。当有新的消息到达时,您的回调函数将被触发。
$queue->listen(function(Message $message): void { /** * * Process the message... * */ // Delete the message from Queue. $message->delete(); });
设置自定义序列化和反序列化器
默认情况下,辛迪加假定您的消息是 JSON 格式,并将尝试相应地进行自动序列化和反序列化。
但是,如果您的消息是其他格式,您可以提供自己的序列化和反序列化回调函数。
序列化器应用于所有传出消息的有效负载。
$queue->setSerializer(function($message): string { return \json_encode($message); });
反序列化回调函数应用于所有传入消息的有效负载。
例如,要处理从 SQS 反序列化消息有效负载,该消息由 SNS 转发,您可以传递以下反序列化回调函数。
$queue->setDeserializer(function($payload) { $payload = \json_decode($payload); if( \property_exists($payload, "Type") && $payload->Type === "Notification" ){ return \json_decode($payload->Message); } return $payload; });
关闭队列
您可以使用 shutdown()
方法关闭队列。
队列实例将以安全的方式响应 PCNTL 信号,而不会在消息处理中间中断。您可以在代码中安装信号处理程序,以干净和安全地关闭服务。
\pcntl_signal( SIGINT, function() use ($queue): void { Log::info("[SIGNAL] Shutting down queue."); $queue->shutdown(); } );
路由和派发
通过使用 Dispatcher
和 Router
,您可以将消息传递给特定的处理器。如何路由取决于您和消息格式。
通常,消息将包含消息类型或事件名称 - 这些是路由的关键。
路由器
通过传递一个 callable
路由解析器和键值对数组作为路由定义来创建一个新的 Router
实例。
路由解析器
路由解析器负责接收传入的消息实例,找到匹配的路由以派发消息。
调度程序将遍历所有配置的路由,并将消息和路由传递给解析器。
解析器必须简单返回一个 bool
值,表示消息是否匹配给定的路由。
路由定义
路由定义是一个键值对数组,将任何键映射到 callable
、格式为 Full\Namespace\ClassName@methodName
的 string
或上述数组的数组。
$router = new Router(function(Message $message, string $routeKey): bool { return $message->getPayload()->eventName == $routeKey; }, [ 'UserLoggedOff' => function(Message $message): void { // Do some session cleanup stuff... }, 'UserRegistered' => '\App\Handlers\UserHandler@userRegistered', 'UserClosedAccount' => [ '\App\Handlers\UserHandler@userAccountClosed', '\App\Handlers\NotificationHandler@userAccountClosed' ] ]);
调度程序
通过传递 Router
实例来创建一个新的 Dispatcher
实例。
$dispatcher = new Dispatcher($router);
PSR-11 容器支持
Dispatcher
可以接受 PSR-11 兼容的 ContainerInterface
实例,在将匹配的消息派发到处理器时用于依赖关系解析。
$dispatcher = new Dispatcher( $router, $container );
或者您可以直接调用 setContainer
方法。
$dispatcher->setContainer($container);
Dispatcher
将尝试解析处理器所需的任何依赖关系,包括 Syndicate\Message
实例。
添加默认处理器
如果 Router
无法解析 Message
的路由,则 Dispatcher
将尝试将消息传递给其默认处理器。
默认处理器可以设置为一个 callable
,并接受 Message
实例。
$dispatcher->setDefaultHandler(function(Message $message): void { Log::critical("No route defined for {$message->getPayload()->eventName}!"); $message->release(); });
如果消息无法派发且未提供默认处理器,将抛出 DispatchException
。
与队列一起使用调度程序
$queue->listen(function(Message $message) use ($dispatcher): void { $dispatcher->dispatch($message); });