highcore / temporal-bundle
Temporal 的命令运行器和活动注册器
v1.2.2
2024-07-13 14:46 UTC
Requires
- php: >=8.2
- ext-json: *
- symfony/config: ^6.4 || ^7.0
- symfony/console: ^6.4 || ^7.0
- symfony/dependency-injection: ^6.4 || ^7.0
- symfony/http-kernel: ^6.4 || ^7.0
- symfony/property-info: ^6.4 || ^7.0
- symfony/serializer: ^6.4 || ^7.0
- temporal/sdk: ^2.10
Requires (Dev)
- phpunit/phpunit: ^10.3
Conflicts
README
描述
这是一个官方 PHP SDK 的包装包,带有活动注册器和全配置的工人和工作流程客户端。
目录表(可选)
如果你的 README 很长,添加目录表以便用户更容易找到他们需要的内容。
安装
使用以下命令进行安装 composer require highcore/temporal-bundle
使用方法
创建 config/workflows.php
在此处注册您的流程,例如 symfony 的 config/bundles.php
示例 config/workflows.php
<?php declare(strict_types=1); return [ // ... Temporal\Samples\FileProcessing\FileProcessingWorkflow::class, // ... ];
创建 rr.yaml
version: "3" server: command: "php bin/console temporal:workflow:runtime" user: "backend" # Set up your user, or remove this value group: "backend" # Set up your group, or remove this value temporal: address: "localhost:7233" namespace: 'default' # Configure a temporal namespace (you must create a namespace manually or use the default namespace named "default") activities: num_workers: 4 # Set up your worker count # Set up your values logs: mode: production output: stdout err_output: stderr encoding: json level: error rpc: listen: tcp://0.0.0.0:6001
示例配置
# config/packages/temporal.yaml temporal: # Default address be localhost:7233 address: 'localhost:7233' worker: # Set up custom worker factory if you want to use custom WorkerFactory, # accepts symfony service factory format # # Details - https://symfony.com.cn/doc/current/service_container/factories.html factory: Highcore\TemporalBundle\WorkerFactory # Set up your own consumption queue for your Temporal Worker, you can set ENV or use string value queue: '%env(TEMPORAL_WORKER_QUEUE)%' data-converter: # Set up your custom Temporal\DataConverter\DataConverterInterface implementation class: Temporal\DataConverter\DataConverter # Customize the data converters, DO NOT CHANGE if you do not know what it is # Details - https://legacy-documentation-sdks.temporal.io/typescript/data-converters # # Sorting order from top to bottom is very, very important converters: - Temporal\DataConverter\NullConverter - Temporal\DataConverter\BinaryConverter - Temporal\DataConverter\ProtoJsonConverter - Highcore\TemporalBundle\DataConverter\ClassObjectConverter - Temporal\DataConverter\JsonConverter workflow-client: options: # Set up custom namespace, by default will be used 'default' namespace namespace: monoplace # Set up custom workflow client factory # accepts any class which implements Highcore\TemporalBundle\WorkflowClientFactoryInterface factory: Highcore\TemporalBundle\WorkflowClientFactory
示例活动接口
<?php /** * This file is part of Temporal package. * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Temporal\Samples\FileProcessing; use Temporal\Activity\ActivityInterface; #[ActivityInterface(prefix:"FileProcessing.")] interface StoreActivitiesInterface { /** * Upload file to remote location. * * @param string $localFileName file to upload * @param string $url remote location */ public function upload(string $localFileName, string $url): void; /** * Process file. * * @param string $inputFileName source file name @@return processed file name * @return string */ public function process(string $inputFileName): string; /** * Downloads file to local disk. * * @param string $url remote file location * @return TaskQueueFilenamePair local task queue and downloaded file name */ public function download(string $url): TaskQueueFilenamePair; }
示例活动
<?php # https://github.com/temporalio/samples-php/blob/master/app/src/FileProcessing/StoreActivity.php /** * This file is part of Temporal package. * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Temporal\Samples\FileProcessing; use Psr\Log\LoggerInterface; use Temporal\SampleUtils\Logger; class StoreActivity implements StoreActivitiesInterface { private static string $taskQueue; private LoggerInterface $logger; public function __construct(string $taskQueue = FileProcessingWorkflow::DEFAULT_TASK_QUEUE) { self::$taskQueue = $taskQueue; $this->logger = new Logger(); } public function upload(string $localFileName, string $url): void { if (!is_file($localFileName)) { throw new \InvalidArgumentException("Invalid file type: " . $localFileName); } // Faking upload to simplify sample implementation. $this->log('upload activity: uploaded from %s to %s', $localFileName, $url); } public function process(string $inputFileName): string { try { $this->log('process activity: sourceFile=%s', $inputFileName); $processedFile = $this->processFile($inputFileName); $this->log('process activity: processed file=%s', $processedFile); return $processedFile; } catch (\Throwable $e) { throw $e; } } public function download(string $url): TaskQueueFilenamePair { try { $this->log('download activity: downloading %s', $url); $data = file_get_contents($url); $file = tempnam(sys_get_temp_dir(), 'demo'); file_put_contents($file, $data); $this->log('download activity: downloaded from %s to %s', $url, realpath($file)); return new TaskQueueFilenamePair(self::$taskQueue, $file); } catch (\Throwable $e) { throw $e; } } private function processFile(string $filename): string { // faking processing for simplicity return $filename; } /** * @param string $message * @param mixed ...$arg */ private function log(string $message, ...$arg) { // by default all error logs are forwarded to the application server log and docker log $this->logger->debug(sprintf($message, ...$arg)); } }
示例工作流程接口
<?php /** * This file is part of Temporal package. * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Temporal\Samples\FileProcessing; use Temporal\Workflow\WorkflowInterface; use Temporal\Workflow\WorkflowMethod; #[WorkflowInterface] interface FileProcessingWorkflowInterface { #[WorkflowMethod("FileProcessing")] public function processFile( string $sourceURL, string $destinationURL ); }
示例工作流程
<?php # https://github.com/temporalio/samples-php/blob/master/app/src/FileProcessing/FileProcessingWorkflow.php /** * This file is part of Temporal package. * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Temporal\Samples\FileProcessing; use Carbon\CarbonInterval; use Temporal\Activity\ActivityOptions; use Temporal\Common\RetryOptions; use Temporal\Internal\Workflow\ActivityProxy; use Temporal\Workflow; class FileProcessingWorkflow implements FileProcessingWorkflowInterface { public const DEFAULT_TASK_QUEUE = 'default'; /** @var ActivityProxy|StoreActivitiesInterface */ private $defaultStoreActivities; public function __construct() { $this->defaultStoreActivities = Workflow::newActivityStub( StoreActivitiesInterface::class, ActivityOptions::new() ->withScheduleToCloseTimeout(CarbonInterval::minute(5)) ->withTaskQueue(self::DEFAULT_TASK_QUEUE) ); } public function processFile(string $sourceURL, string $destinationURL) { /** @var TaskQueueFilenamePair $downloaded */ $downloaded = yield $this->defaultStoreActivities->download($sourceURL); $hostSpecificStore = Workflow::newActivityStub( StoreActivitiesInterface::class, ActivityOptions::new() ->withScheduleToCloseTimeout(CarbonInterval::minute(5)) ->withTaskQueue($downloaded->hostTaskQueue) ); // Call processFile activity to zip the file. // Call the activity to process the file using worker-specific task queue. $processed = yield $hostSpecificStore->process($downloaded->filename); // Call upload activity to upload the zipped file. yield $hostSpecificStore->upload($processed, $destinationURL); return 'OK'; } }
使用 symfony 服务容器注册
<?php return static function (ContainerConfigurator $configurator): void { $services = $configurator->services(); $services->defaults() ->public() ->autowire(true) ->autoconfigure(true); $services->set(Temporal\Samples\FileProcessing\StoreActivity::class) // Setting a "label to your activity" will add the activity to the ActivityRegistry, // allowing your employee to use this activity in your Workflow ->tag('temporal.activity.registry');
现在你可以运行
rr serve rr.yaml
并通过以下方式调用工作流程
<?php declare(strict_types=1); namespace Highcore\TemporalBundle\Example; use Temporal\Client\WorkflowClientInterface; use Temporal\Workflow\WorkflowRunInterface; use Temporal\Client\WorkflowOptions; use Temporal\Common\RetryOptions; final class ExampleWorkflowRunner { public function __construct(private readonly WorkflowClientInterface $workflowClient) { } public function run(): void { /** @var \Temporal\Samples\FileProcessing\FileProcessingWorkflowInterface $workflow */ $workflow = $this->workflowClient->newWorkflowStub( \Temporal\Samples\FileProcessing\FileProcessingWorkflowInterface::class, WorkflowOptions::new() ->withRetryOptions( RetryOptions::new() ->withMaximumAttempts(3) ->withNonRetryableExceptions(\LogicException::class) ) ); // Start Workflow async, with no-wait result /** @var WorkflowRunInterface $result */ $result = $this->workflowClient->start($workflow, 'https://example.com/example_file', 's3://s3.example.com'); echo 'Run ID: ' . $result->getExecution()->getRunID(); // Or you can call workflow sync with wait result $result = $workflow->processingFile('https://example.com/example_file', 's3://s3.example.com'); echo $result; // OK } }
更多 PHP 示例请参考 这里
致谢
许可证
MIT 许可证
版权所有 (c) 2023 Highcore.org
在此特此授予任何获得此软件及其相关文档副本(“软件”)的人免费使用软件的权利,不受任何限制,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或出售软件副本的权利,并允许获得软件的人有权使用软件,受以下条件的约束
上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。
软件按“现状”提供,不提供任何明示或暗示的保证,包括但不限于适销性、适用于特定目的和侵权不保证。在任何情况下,作者或版权所有者均不对任何索赔、损害或其他责任负责,无论该责任是基于合同、侵权或其他方式,不论是否与软件或其使用或其它方式有关。