sdpmlab / anser
PHP微服务编排模式库。
Requires
- php: ^7.4||^8.0
- pingyi/json-serializer: ^0.1.2
- predis/predis: ^2.0
- sdpmlab/anser-action: ^0.3.0
README
Anser是一个基于PHP的微服务编排库。您可以使用这个库来管理连接和编排您的微服务。通过Anser库,您可以轻松实现以下目标
- 抽象每个基于HTTP的微服务的特定类和实现,Anser不会限制您的通信模式。
- 快速组合您的微服务
- 编写按顺序执行的微服务脚本
- 快速采用SAGA模式来设计您的交易逻辑
- 简单的备份机制,在服务中断时进行事务恢复
安装
通过Composer安装Anser库
composer require sdpmlab/anser
快速入门
微服务连接列表
在您的项目中,您必须在执行周期内设置微服务连接列表。您可以通过ServiceList::addLocalService()
方法来设置它。您可以参考我们提供的示例来创建您的微服务连接列表,这将是所有微服务连接的基础。
namespace App\Anser\Config; use SDPMlab\Anser\Service\ServiceList; ServiceList::addLocalService("order_service","localhost",8080,false); ServiceList::addLocalService("product_service","localhost",8081,false); ServiceList::addLocalService("cart_service","localhost",8082,false); ServiceList::addLocalService("payment_service","localhost",8083,false);
抽象微服务
在Anser中,您可以通过SimpleService
类抽象微服务的所有端点。我们提供了一个示例,您可以参考它来快速创建微服务类
namespace App\Anser\Services; use SDPMlab\Anser\Service\SimpleService; use SDPMlab\Anser\Service\ActionInterface; use SDPMlab\Anser\Exception\ActionException; use Psr\Http\Message\ResponseInterface; class OrderService extends SimpleService { protected $serviceName = "order_service"; protected $retry = 1; protected $retryDelay = 1; protected $timeout = 10.0; /** * Get order by order_key * * @param integer $u_key * @param string $order_key * @return ActionInterface */ public function getOrder( int $u_key, string $order_key ): ActionInterface { $action = $this->getAction("GET", "/api/v2/order/{$order_key}") ->addOption("headers", [ "X-User-Key" => $u_key ]) ->doneHandler( function ( ResponseInterface $response, ActionInterface $action ) { $resBody = $response->getBody()->getContents(); $data = json_decode($resBody, true); $action->setMeaningData($data["data"]); } ) ->failHandler( function ( ActionException $e ) { log_message("critical", $e->getMessage()); $e->getAction()->setMeaningData([ "message" => $e->getMessage() ]); } ); return $action; } /** * Create order * * @param integer $u_key * @param integer $p_key * @param integer $amount * @param integer $price * @param string $orch_key * @return ActionInterface */ public function createOrder( int $u_key, int $p_key, int $amount, int $price, string $orch_key ): ActionInterface { $action = $this->getAction("POST", "/api/v2/order") ->addOption("json", [ "p_key" => $p_key, "price" => $price, "amount" => $amount ]) ->addOption("headers", [ "X-User-Key" => $u_key, "Orch-Key" => $orch_key ]) ->doneHandler( function ( ResponseInterface $response, ActionInterface $action ) { $resBody = $response->getBody()->getContents(); $data = json_decode($resBody, true); $action->setMeaningData($data["orderID"]); } ) ->failHandler( function ( ActionException $e ) { log_message("critical", $e->getMessage()); $e->getAction()->setMeaningData([ "message" => $e->getMessage() ]); } ); return $action; } /** * Delete order * * @param string $order_key * @param string $u_key * @param string $orch_key * @return ActionInterface */ public function deleteOrderByOrchKey( string $u_key, string $orch_key ): ActionInterface { $action = $this->getAction('DELETE', "/api/v2/order") ->addOption("headers", [ "X-User-Key" => $u_key, "Orch-Key" => $orch_key ]) ->doneHandler( function ( ResponseInterface $response, Action $action ) { $resBody = $response->getBody()->getContents(); $data = json_decode($resBody, true); $action->setMeaningData($data["data"]); } ) ->failHandler($this->getFailHandler()); return $action; } /** * Fail handler * * @return callable */ protected function getFailHandler(): callable { return function ( ActionException $e ) { log_message("critical", $e->getMessage()); $e->getAction()->setMeaningData([ "message" => $e->getMessage() ]); }; } }
您可以直接参考Anser-Action
库来了解Anser提供的处理微服务连接的机制。
编排微服务
在Anser中,您可以通过Orchestrator
类来编排您的微服务。我们提供了一个示例,您可以参考它来快速创建编排类
<?php namespace App\Anser\Orchestrators\V2; use App\Anser\Sagas\V2\CreateOrderSaga; use SDPMlab\Anser\Orchestration\Orchestrator; use App\Anser\Services\V2\ProductService; use App\Anser\Services\V2\PaymentService; use App\Anser\Services\V2\OrderService; use Exception; use SDPMlab\Anser\Orchestration\OrchestratorInterface; use SDPMlab\Anser\Orchestration\Saga\Cache\CacheFactory; class CreateOrderOrchestrator extends Orchestrator { /** * The service of product. * * @var ProductService */ protected ProductService $productService; /** * The service of payment. * * @var PaymentService */ protected PaymentService $paymentService; /** * The service of order. * * @var OrderService */ protected OrderService $orderService; /** * The user key of this orchestrator. * * @var string */ public $user_key = null; /** * The product information. * * @var array */ public array $product_data = []; /** * The order key. * * @var integer */ public $order_key; /** * The product key. * * @var integer */ public $product_key; /** * The product price * amount. * * @var int */ public $total = 0; /** * The product amount. * * @var integer */ public $product_amount = 0; /** * The payment key. * * @var string|null */ public $payment_key = null; public function __construct() { $this->productService = new ProductService(); $this->paymentService = new PaymentService(); $this->orderService = new OrderService(); } protected function definition(int $product_key = null, int $product_amount = null, int $user_key = null) { if (is_null($product_key) || is_null($user_key) || is_null($product_amount)) { throw new Exception("The parameters of product or user_key fail."); } $this->user_key = $user_key; $this->product_amount = $product_amount; $this->product_key = $product_key; $this->setServerName("Anser_Server_1"); // Step 1. Check the product inventory balance. $step1 = $this->setStep()->addAction( "product_check", $this->productService->checkProductInventory($product_key, $product_amount) ); // Step 2. Get product info. $step2 = $this->setStep()->addAction( "get_product_info", $this->productService->getProduct($product_key) ); // Step 3. Check the user wallet balance. $step3 = $this->setStep() ->addAction( "wallet_check", // Define the closure of step3. static function ( OrchestratorInterface $runtimeOrch ) use ( $user_key, $product_amount ) { $product_data = $runtimeOrch->getStepAction("get_product_info")->getMeaningData(); $total = $product_data["price"] * $product_amount; $runtimeOrch->product_data = &$product_data; $runtimeOrch->total = $total; $action = $runtimeOrch->paymentService->checkWalletBalance($user_key, $runtimeOrch->total); return $action; } ); // Start the saga. $this->transStart(CreateOrderSaga::class); // Step 4. Create order. $step4 = $this->setStep() ->setCompensationMethod("orderCreateCompensation") // Define the closure of step4. ->addAction( "create_order", static function ( OrchestratorInterface $runtimeOrch ) use ( $user_key, $product_amount, $product_key ) { return $runtimeOrch->orderService->createOrder( $user_key, $product_key, $product_amount, $runtimeOrch->product_data["price"], $runtimeOrch->getOrchestratorNumber() ); } ); // Step 5. Create payment. $step5 = $this->setStep() ->setCompensationMethod("paymentCreateCompensation") ->addAction( "create_payment", // Define the closure of step5. static function ( OrchestratorInterface $runtimeOrch ) use ( $user_key, $product_amount ) { $order_key = $runtimeOrch->getStepAction("create_order")->getMeaningData(); $runtimeOrch->order_key = $order_key; $action = $runtimeOrch->paymentService->createPayment( $user_key, $runtimeOrch->order_key, $product_amount, $runtimeOrch->total, $runtimeOrch->getOrchestratorNumber() ); return $action; } ); // Step 6. Reduce the product inventory amount. $step6 = $this->setStep() ->setCompensationMethod("productInventoryReduceCompensation") ->addAction( "reduce_product_amount", // Define the closure of Step 6. static function ( OrchestratorInterface $runtimeOrch ) use ($product_key, $product_amount) { $payment_key = $runtimeOrch->getStepAction("create_payment")->getMeaningData(); $runtimeOrch->payment_key = $payment_key; return $runtimeOrch->productService->reduceInventory( $product_key, $product_amount, $runtimeOrch->getOrchestratorNumber() ); } ); // Step 7. Reduce the user wallet balance. $step7 = $this->setStep() ->setCompensationMethod("walletBalanceReduceCompensation") ->addAction( "reduce_wallet_balance", // Define the closure of step 7. static function ( OrchestratorInterface $runtimeOrch ) use ($user_key) { return $runtimeOrch->paymentService->reduceWalletBalance( $user_key, $runtimeOrch->total, $runtimeOrch->getOrchestratorNumber() ); } ); $this->transEnd(); } protected function defineResult() { $data["data"] = [ "status" => $this->isSuccess(), "order_key" => $this->order_key, "product_data" => $this->product_data, "total" => $this->total ]; if (!$this->isSuccess()) { $data["data"]["isCompensationSuccess"] = $this->isCompensationSuccess(); } return $data; } }
在上面的示例中,我们可以看到在definition
方法中,我们使用setStep()
方法来定义每个步骤的行为,并使用addAction()
方法来定义每个步骤所需的逻辑。
在addAction()
中,您可以通过传递两种类型来实现不同的编排需求
- 传递
SDPMlab\Anser\Service\ActionInterface
的实例。当微服务编排器执行此步骤时,它将直接使用此Action
实例与微服务进行通信。 - 传递
callable
。当微服务编排器执行此步骤时,它将执行这个闭包,并将运行时编排器传递给它。您可以通过运行时编排器实例获取其他步骤的数据以满足更多逻辑要求,并在最后返回SDPMlab\Anser\Service\ActionInterface
的实例。
使用transStart()
方法启动SAGA事务,并在transEnd()
方法中结束SAGA事务。然后,您可以使用setCompensationMethod()
方法来定义每个步骤的补偿行为。当步骤失败时,将自动执行补偿行为。
定义补偿
在上面的示例中,我们可以看到在definition
方法中,我们使用setCompensationMethod()
方法来定义每个步骤的补偿行为。当步骤失败时,将自动执行补偿行为。
您必须实现SDPMlab\Anser\Orchestration\Saga\SimpleSaga
类来定义您的补偿逻辑,并通过补偿逻辑中的getOrchestrator()
方法获取运行时编排器实例。您可以通过运行时编排器实例获取其他步骤的数据以满足更多逻辑要求。
namespace App\Anser\Sagas\V2; use SDPMlab\Anser\Orchestration\Saga\SimpleSaga; use App\Anser\Services\V2\OrderService; use App\Anser\Services\V2\ProductService; use App\Anser\Services\V2\PaymentService; class CreateOrderSaga extends SimpleSaga { /** * The Compensation function for order creating. * * @return void */ public function orderCreateCompensation() { $orderService = new OrderService(); $orchestratorNumber = $this->getOrchestrator()->getOrchestratorNumber(); $user_key = $this->getOrchestrator()->user_key; $orderService->deleteOrderByRuntimeOrch($user_key, $orchestratorNumber)->do(); } /** * The Compensation function for product inventory reducing. * * @return void */ public function productInventoryReduceCompensation() { $productService = new ProductService(); $orchestratorNumber = $this->getOrchestrator()->getOrchestratorNumber(); $product_amount = $this->getOrchestrator()->product_amount; // It still need the error condition. // It will compensate the product inventory balance Only if the error code is 5XX error. $productService->addInventoryByRuntimeOrch($product_amount, $orchestratorNumber)->do(); } /** * The Compensation function for user wallet balance reducing. * * @return void */ public function walletBalanceReduceCompensation() { $paymentService = new PaymentService(); $orchestratorNumber = $this->getOrchestrator()->getOrchestratorNumber(); $user_key = $this->getOrchestrator()->user_key; $total = $this->getOrchestrator()->total; // It still need the error condition. // It will compensate the wallet balance Only if the error code is 5XX error. $paymentService->increaseWalletBalance($user_key, $total, $orchestratorNumber)->do(); } /** * The Compensation function for payment creating. * * @return void */ public function paymentCreateCompensation() { $paymentService = new PaymentService(); $orchestratorNumber = $this->getOrchestrator()->getOrchestratorNumber(); $payment_key = $this->getOrchestrator()->payment_key; $user_key = $this->getOrchestrator()->user_key; $paymentService->deletePaymentByRuntimeOrch($user_key, $orchestratorNumber)->do(); } }
执行微服务编排逻辑
根据您所使用的框架,您需要在某个地方执行您的编排器。
以下是一个粗略示例
use App\Anser\Orchestrators\V2\CreateOrderOrchestrator; class CreateOrderController extends BaseController { use ResponseTrait; public function createOrder() { $data = $this->request->getJSON(true); $product_key = $data["product_key"]; $product_amout = $data["product_amout"]; $user_key = $this->request->getHeaderLine("X-User-Key"); $userOrch = new CreateOrderOrchestrator(); $result = $userOrch->build($product_key, $product_amout, $user_key); return $this->respond($result); } }
我们可以在createOrder()
方法中看到,我们创建了Orchestrator实例:new CreateOrderOrchestrator();
,并使用build()
方法启动与Saga事务的服务协作。在build()
方法中,我们传递了三个参数:product_key
、product_amount
和user_key
,这些参数将在definition()
方法中使用。
最后,在build()
方法完成后,你会得到返回值。这个返回值来自defineResult()
处理的数据。
以上是Anser Orchestrator Saga的完整功能示例。你可以使用这个示例来了解如何使用Anser Orchestrator。
许可证
Anser是在MIT许可证下发布的。请参阅附带文件中的LICENSE。