gdbots / pbjx
pbj 的消息工具
Requires
- php: >=8.1
- ext-json: *
- gdbots/query-parser: ^3.0
- gdbots/schemas: ^3.0
- psr/log: ^1|^2|^3
- symfony/event-dispatcher: ^6.4 || ^7.0
Requires (Dev)
- aws/aws-sdk-php: ^3.138
- gdbots/acme-schemas: ^3.0
- phpunit/phpunit: ^10.5
- ruflin/elastica: ^7.1
- dev-master
- v4.2.0
- v4.1.1
- v4.1.0
- v4.0.0
- v3.1.6
- v3.1.5
- v3.1.4
- v3.1.3
- v3.1.2
- v3.1.1
- v3.1.0
- v3.0.1
- v3.0.0
- v2.3.14
- v2.3.13
- v2.3.12
- v2.3.11
- v2.3.10
- v2.3.9
- v2.3.8
- v2.3.7
- v2.3.6
- v2.3.5
- v2.3.4
- v2.3.3
- v2.3.2
- v2.3.1
- v2.3.0
- v2.2.5
- v2.2.4
- v2.2.3
- v2.2.2
- v2.2.1
- v2.2.0
- v2.1.1
- v2.1.0
- v2.0.1
- v2.0.0
- v1.0.1
- v1.0.0
- v0.3.0
- v0.2.2
- v0.2.1
- v0.2.0
- v0.1.1
- v0.1.0
This package is auto-updated.
Last update: 2024-08-24 05:55:36 UTC
README
此库为 Pbj 提供消息工具。
Pbj 代表 "Private Business Json"。Pbjc 代表 "Private Business Json Compiler",一个从模式配置创建 php 类的工具。Pbjx 代表 "Private Business Json Exchanger",一个通过各种传输和存储引擎交换 pbj 的工具。
使用此库假设您已经使用 Pbjc 创建并编译了自己的 pbj 类。
Pbjx 主要方法
- send: 向单个收件人异步发送消息,无返回有效载荷。
- publish: 异步广播消息,可以被订阅。
- request: 向单个收件人同步发送消息,预期有返回有效载荷。
如果您已配置 调度器,则这些方法也将工作
- sendAt: 将命令计划在稍后时间发送。
- cancelJobs: 通过作业 ID 取消先前计划的任务。
如果您的项目使用 Symfony,请使用 gdbots/pbjx-bundle-php 来简化集成。
传输
当交换 pbj(即消息)时,使用传输执行该操作。您的应用程序/领域逻辑不应该直接处理传输。
可用传输
- AWS Firehose
- AWS Kinesis
- 内存
路由器
某些传输需要路由器来确定交付通道(流名称、gearman 通道等),以通过路由器路由消息。路由器实现可以针对消息类型(命令、事件、请求)进行固定,也可以根据 pbj 消息本身的内容特定。
例如
interface Router { public function forCommand(Message $command): string; ... }
Pbjx::send
处理命令(如果传输支持,则异步)。
使用 send 方法时,意味着存在单个处理该命令的处理程序,换句话说,如果存在 "PublishArticle" 命令,则必须有一个处理该命令的服务。
在 gdbots/pbjx-bundle-php 中,使用
SchemaCurie
来推导服务 ID。
所有命令处理程序都必须实现 Gdbots\Pbjx\CommandHandler
。
"PublishArticle" 命令的处理程序示例
<?php declare(strict_types = 1); final class PublishArticleHandler implements CommandHandler { protected function handleCommand(Message $command, Pbjx $pbjx): void { // handle the command here } }
调用命令处理程序永远不会直接进行(除了在单元测试中)。在这个虚构的例子中,您可能有一个控制器,该控制器创建并发送命令。
<?php declare(strict_types = 1); final class ArticleController extends Controller { /** * @Route("/articles/{article_id}/publish", requirements={"article_id": "^[0-9A-Fa-f]+$"}) * @Method("POST") * @Security("is_granted('acme:blog:command:publish-article')") * * @param Request $request * * @return Response */ public function publishAction(Request $request): Response { $command = PublishArticleV1::create()->set('article_id', $request->attributes->get('article_id')); $this->getPbjx()->send($command); $this->addFlash('success', 'Article was published'); return $this->redirectToRoute('app_article_index'); } }
Pbjx::publish
向所有订阅者发布事件(如果传输支持,则异步)。除非发生致命错误,否则所有订阅者都将收到事件。
发布/订阅 是使用的模式。这很重要,因为它可能看起来像 Symfony EventDispatcher,但 Pbjx 订阅者不能像 Symfony 订阅者/监听器那样停止事件的传播。
Pbjx 发布的事件与应用程序或 "生命周期" 事件不同。这些是您的 "领域事件"。您在这里使用的名称应该对大多数人来说都有意义,包括非开发人员。例如,MoneyDeposited、ArticlePublished、AccountClosed、UserUpgraded 等。
订阅Pbjx发布的活动需要您知道该活动或其混入的SchemaCurie
。
继续上面的示例,让我们假设PublishArticleHandler
创建并发布了一个名为ArticlePublished
的活动,其curie为"acme:blog:event:article-published"
。在您的订阅者中,您可以监听以下任何一个:
- acme:blog:event:article-published:v1
- acme:blog:event:article-published
- acme:blog:* 所有"acme:blog"命名空间中的事件
- * 所有事件
以及任何其混入
- vendor:package:mixin:some-event:v1
- vendor:package:mixin:some-event
所有pbjx事件订阅者的方法签名应该是事件接口然后是Pbjx服务本身。
<?php declare(strict_types = 1); namespace Acme\Blog; use Gdbots\Pbjx\EventSubscriber; final class MyEventSubscriber implements EventSubscriber { public function onArticlePublished(Message $event, Pbjx $pbjx): void { // do something with this event. } public static function getSubscribedEvents() { return [ 'acme:blog:event:article-published' => 'onArticlePublished', ]; } }
当订阅多个事件时,可以使用方便的EventSubscriberTrait
,它将自动调用接收到的任何事件匹配的方法,例如:"onUserRegistered"、"onUserUpdated"、"onUserDeleted"。
Pbjx::request
同步处理请求并返回响应。如果传输支持,它可能不在当前进程(例如gearman)中运行。这与上面的"send"类似,但在这个情况下,必须返回一个响应。
所有请求处理者必须实现Gdbots\Pbjx\RequestHandler
。
“GetArticleRequest”处理器的示例
<?php declare(strict_types = 1); final class GetArticleRequestHandler implements RequestHandler { protected function handleRequest(Message $request, Pbjx $pbjx): Message { $response = GetArticleResponseV1::create(); // imaginary repository $article = $this->repository->getArticle($request->get('article_id')); return $response->set('article', $article); } }
调用请求处理器永远不会直接进行(除非在单元测试中)。
<?php declare(strict_types = 1); final class ArticleController extends Controller { /** * @Route("/articles/{article_id}", requirements={"article_id": "^[0-9A-Fa-f]+$"}) * @Method("GET") * @Security("is_granted('acme:blog:request:get-article-request')") * * @param Request $request * * @return Response */ public function getAction(Request $request): Response { $getArticleRequest = GetArticleRequestV1::create()->set('article_id', $request->attributes->get('article_id')); $getArticleResponse = $this->getPbjx()->request($getArticleRequest); return $this->render('article.html.twig', ['article' => $getArticleResponse->get('article')]); } }
Pbjx生命周期事件
当消息被处理(发送、发布、请求)时,它会经历一个生命周期,允许在"进程中"进行修改和验证。订阅这些事件的方法类似于Symfony事件订阅者/监听器的工作,并且可以停止传播。
生命周期事件名称(您的订阅者/监听器必须绑定到的)都具有标准格式,例如:"gdbots:pbjx:mixin:command.bind"。这些命名方式与SimpleEventBus
的命名方式相同。参见SimplePbjx::trigger
方法,了解如何实现这一点。
生命周期事件按发生顺序如下:
bind
在"bind"事件中,必须由运行环境绑定到消息的数据,这些数据通常来自环境变量、http请求本身、请求上下文等。这是一个低成本的操作(在大多数情况下)。
绑定用户代理、IP地址、表单输入是"bind"事件中的良好示例。
validate
在允许处理消息之前,应该对其进行验证。这通常是实施业务规则的地方。这不仅仅是模式验证(由pbj为您执行)。
检查权限通常在这个事件中完成。在Symfony应用程序中,这将是在您运行
AuthorizationChecker
和/或安全投票的地方。其他示例
- 在"AddProductToCart"中检查库存水平
- 在"UpdateArticle"上实施乐观并发控制
- 在"RegisterUser"上检查可用的用户名
- 在"SubmitContactForm"上验证catchpa
- 在"UploadVideo"上限制上传文件大小
enrich
一旦您决定要处理消息,您可以执行额外的丰富操作,这些操作之前可能是昂贵的或不值得做的。丰富是最终阶段,因此一旦完成,消息将被冻结并传输。
Geo2Ip丰富、情感分析、将相关数据添加到事件中等是"enrich"事件的良好用途。
事件存储
发布事件并存储/检索它们是一个如此常见的需求,以至于我们添加了Pbjx::getEventStore
方法,以便在没有对处理程序、订阅者等进行额外服务连接的情况下提供该服务。
该服务将在您调用该方法时才实例化,因此没有性能惩罚,因为Pbjx内置了此功能。目前,唯一可用的实现是DynamoDb。
事件存储的关键概念
流
事件被附加到一个通过StreamId
(见下文)标识的流中,并且按照该流中事件的occurred_at
字段排序。这并不是(请原谅我用了一个十美元的词)一个“单调递增的无间隙序列号”序列(v1, v2, ..., v10)。当然,你可以通过自己的实现实现这种设计,但对于原始的设计/实现,更重要的是能够从许多不同的服务器(包括不同的区域)收集数据,而不必与集中式的ID/序列号服务进行协调。
occurred_at
字段由10位(Unix时间戳)和6位微秒数拼接而成。
为了减少碰撞的可能性,当预期数据量很大时,我们会使用更细粒度的流ID。例如,编辑CMS文章的用户与投票的用户相比,你知道投票的用户写入量会高得多。
只要满足以下条件,事件存储可以有数十亿个流而不会出现性能问题:
- 你附加事件的StreamIds分布足够广泛,以避免热点键。
- 单个流上的事件总量不超过10GB(这个限制来自DynamoDb)。你可以使用基于日期的存储来处理长期存在的流,例如events-YYYYMM。
- 底层存储已配置以处理应用程序的数据大小和读写速率。
重要:一个事件可能存在于一个或多个流中。StreamId不是事件本身的属性。
StreamId
流ID代表一个事件流。为了我们的目的,ID的部分由冒号分隔,但可以很容易地转换为SNS、Kafka等可接受的格式。
可能只希望使用流ID的部分(例如主题)进行广播。
使用分区和可选的子分区可以将所有这些记录组合到存储中,并保证它们的顺序与它们添加到流的顺序完全相同。
StreamId格式:vendor:topic:partition:sub-partition
示例
"twitter" (供应商),"user.timeline" (主题),"homer-simpson" (分区),"yyyymm" (子分区)
twitter:user.timeline:homer-simpson:201501 twitter:user.timeline:homer-simpson:201502 twitter:user.timeline:homer-simpson:201503
"acme" (供应商),"bank-account" (主题),"homer-simpson" (分区)
acme:bank-account:homer-simpson
"acme" (供应商),"poll.votes" (主题),"batman-vs-superman" (分区),"yyyymm.[0-9a-f][0-9a-f]" (子分区)
请注意,这里的子分区是两个十六进制数字,允许有256个不同的流ID。当你需要避免热点键并且整体分区的顺序不重要时很有用。
acme:poll.votes:batman-vs-superman:20160301.0a acme:poll.votes:batman-vs-superman:20160301.1b acme:poll.votes:batman-vs-superman:20160301.c2
StreamSlice
从事件存储中获取数据是通过从单个流或所有流或获取流的一部分来完成的。将StreamSlice视为php函数array_slice。你可以从流中获取正向和反向的切片,并且事件按occurred_at
字段排序。
EventStore::putEvents
在大多数情况下,你会在命令处理程序中向事件存储写入。继续上面的发布文章示例,让我们实际上创建一个事件。
<?php declare(strict_types = 1); final class PublishArticleHandler implements CommandHandler { protected function handleCommand(Message $command, Pbjx $pbjx): void { // in this example it's ultra basic, create the event and push it to a stream $event = ArticlePublishedV1::create()->set('article_id', $command->get('article_id')); // copies contextual data from the previous message (ctx_* fields) $pbjx->copyContext($command, $event); $streamId = StreamId::fromString(sprintf('acme:article:%s', $command->get('article_id'))); $pbjx->getEventStore()->putEvents($streamId, [$event]); // after the event is persisted it will be published either via a // two phase commit or a publisher reading the EventStore streams // (DynamoDb streams for example) } }
需要注意的是,这里没有修改文章的状态,我们只是在流上放置了一个事件。事件订阅者会监听这些事件,然后更新文章。这只是处理状态更新的一种方式。
这种设计意味着最终一致性。
EventSearch
事件存储处理存储和检索事件,但很多时候您还需要能够搜索事件。例如评论、产品评论、帖子反应、调查回复等。许多用例都可以从索引中受益。
类似于事件存储,事件搜索服务仅在请求时才会实例化。目前我们唯一的实现是ElasticSearch。要使用此功能,您要索引的事件必须使用 "gdbots:pbjx:mixin:indexed" 混合。当使用 gdbots/pbjx-bundle-php 时,您可以通过简单的配置选项启用索引。
搜索事件通常在请求处理器中完成。以下是一个搜索事件的示例
protected function handleRequest(Message $request, Pbjx $pbjx): Message { $parsedQuery = ParsedQuery::fromArray(json_decode($request->get('parsed_query_json', '{}'), true)); $response = SearchCommentsResponseV1::create(); $pbjx->getEventSearch()->searchEvents( $request, $parsedQuery, $response, [SchemaCurie::fromString('acme:blog:event:comment-posted')] ); return $response; }
调度器
可以使用 sendAt
方法调度命令以在将来发送。您以通常的方式构建一个命令,决定它应该在何时发送,并可选择提供一个自己的jobId。
目前唯一的可用实现是DynamoDb,它还依赖于AWS Step Functions。未来将开源状态机和lambda任务的一个示例实现。现在,这里是一个定义在CloudFormation中的简单状态机,它可以处理延迟发送。
# Expects you to define your own SchedulerRole and SchedulerFunction resource SchedulerStateMachine: Type: AWS::StepFunctions::StateMachine DependsOn: SchedulerRole Properties: StateMachineName: !Sub 'acme-${AppEnv}-pbjx-scheduler' RoleArn: !GetAtt 'SchedulerRole.Arn' DefinitionString: !Sub |- { "StartAt": "WaitUntil", "States": { "WaitUntil" : { "Type": "Wait", "TimestampPath": "$.send_at", "Next": "Send" }, "Send": { "Type": "Task", "Resource": "${SchedulerFunction.Arn}", "TimeoutSeconds": 60, "End": true } } }
在上面的状态机中,SchedulerFunction.Arn
将是一个Lambda函数,它接收状态机的输入,然后将存储在DynamoDb中的有效载荷发送到您的 pbjx端点。
{ "send_at": "2016-08-18T17:33:00Z", "job_id": "my-job-id-or-an-autogenerated-uuid" }
如果 send_at
在一年以后,Lambda将不得不处理重启执行。这是目前Step Functions的一个限制。为了简化这个过程,有效载荷中包含了将使用未来的日期。
{ "send_at": "2017-12-25T12:00:00Z", "job_id": "my-job-id-or-an-autogenerated-uuid", "resend_at": [ "2018-12-25T12:00:00Z", "2019-12-25T12:00:00Z", "2020-12-25T12:00:00Z" ] }
当存在且非空的 resend_at
数组时
- 从
resend_at
数组中移除第一个元素 - 将其用作新的
send_at
输入值 - 再次开始执行(提供新的输入)
- 使用
job_id
的键更新dynamodb记录,并带有新的executionArn。
如果没有
resend_at
数组或它为空,按正常处理。
JobId
如果您向sendAt方法提供了自己的jobId,它将自动停止使用该id的现有作业。这确保了对于给定的过程只有一个作业存在。例如,发布或过期内容时,您想要取消旧的计划命令。
取消作业
sendAt方法返回一个job id(或简单地返回您提供的id)。此id是取消命令(使用cancelJobs
方法)所必需的,目前没有其他方法可以查询作业并取消结果。
Pbjx调试器
当使用Pbjx开发应用程序时,您需要能够看到正在交换的内容。《PbjxDebugger》将推送包含所有正在处理的pbj数据的“调试”消息。在使用此服务时,必须提供一个Psr\Log\LoggerInterface
记录器。
如果您使用monolog,可以将所有这些条目路由到自己的文件,以json行分隔格式。这是推荐的用法,因为它可以使用其他出色的工具,如jq。
用于Symfony的示例yaml配置
services: monolog_json_formatter: class: Monolog\Formatter\JsonFormatter arguments: [!php/const:Monolog\Formatter\JsonFormatter::BATCH_MODE_NEWLINES] monolog: handlers: pbjx_debugger: type: stream path: '%kernel.logs_dir%/pbjx-debugger.log' level: debug formatter: monolog_json_formatter channels: ['pbjx.debugger']