highcore/temporal-bundle

Temporal 的命令运行器和活动注册器

v1.2.2 2024-07-13 14:46 UTC

This package is auto-updated.

Last update: 2024-09-13 15:14:18 UTC


README

描述

这是一个官方 PHP SDK 的包装包,带有活动注册器和全配置的工人和工作流程客户端。

目录表(可选)

如果你的 README 很长,添加目录表以便用户更容易找到他们需要的内容。

安装

使用以下命令进行安装 composer require highcore/temporal-bundle

使用方法

创建 config/workflows.php

在此处注册您的流程,例如 symfony 的 config/bundles.php

示例 config/workflows.php

<?php

declare(strict_types=1);

return [
    // ...
    Temporal\Samples\FileProcessing\FileProcessingWorkflow::class,
    // ...
];

创建 rr.yaml

version: "3"

server:
  command: "php bin/console temporal:workflow:runtime"
  user: "backend" # Set up your user, or remove this value
  group: "backend" # Set up your group, or remove this value

temporal:
  address: "localhost:7233"
  namespace: 'default' # Configure a temporal namespace (you must create a namespace manually or use the default namespace named "default")
  activities:
    num_workers: 4 # Set up your worker count

# Set up your values
logs:
  mode: production
  output: stdout
  err_output: stderr
  encoding: json
  level: error

rpc:
  listen: tcp://0.0.0.0:6001

示例配置

# config/packages/temporal.yaml
temporal:
  # Default address be localhost:7233
  address: 'localhost:7233'
  worker:
    # Set up custom worker factory if you want to use custom WorkerFactory, 
    # accepts symfony service factory format 
    #
    # Details - https://symfony.com.cn/doc/current/service_container/factories.html
    factory: Highcore\TemporalBundle\WorkerFactory
    # Set up your own consumption queue for your Temporal Worker, you can set ENV or use string value
    queue: '%env(TEMPORAL_WORKER_QUEUE)%'
    data-converter:
      # Set up your custom Temporal\DataConverter\DataConverterInterface implementation
      class: Temporal\DataConverter\DataConverter
      # Customize the data converters, DO NOT CHANGE if you do not know what it is
      # Details - https://legacy-documentation-sdks.temporal.io/typescript/data-converters
      #
      # Sorting order from top to bottom is very, very important
      converters:
        - Temporal\DataConverter\NullConverter
        - Temporal\DataConverter\BinaryConverter
        - Temporal\DataConverter\ProtoJsonConverter
        - Highcore\TemporalBundle\DataConverter\ClassObjectConverter
        - Temporal\DataConverter\JsonConverter
  workflow-client:
    options:
      # Set up custom namespace, by default will be used 'default' namespace
      namespace: monoplace

    # Set up custom workflow client factory
    # accepts any class which implements Highcore\TemporalBundle\WorkflowClientFactoryInterface
    factory: Highcore\TemporalBundle\WorkflowClientFactory

示例活动接口

<?php

/**
 * This file is part of Temporal package.
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Temporal\Samples\FileProcessing;

use Temporal\Activity\ActivityInterface;

#[ActivityInterface(prefix:"FileProcessing.")]
interface StoreActivitiesInterface
{
    /**
     * Upload file to remote location.
     *
     * @param string $localFileName file to upload
     * @param string $url remote location
     */
    public function upload(string $localFileName, string $url): void;

    /**
     * Process file.
     *
     * @param string $inputFileName source file name @@return processed file name
     * @return string
     */
    public function process(string $inputFileName): string;

    /**
     * Downloads file to local disk.
     *
     * @param string $url remote file location
     * @return TaskQueueFilenamePair local task queue and downloaded file name
     */
    public function download(string $url): TaskQueueFilenamePair;
}

示例活动

<?php
# https://github.com/temporalio/samples-php/blob/master/app/src/FileProcessing/StoreActivity.php

/**
 * This file is part of Temporal package.
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Temporal\Samples\FileProcessing;

use Psr\Log\LoggerInterface;
use Temporal\SampleUtils\Logger;

class StoreActivity implements StoreActivitiesInterface
{
    private static string $taskQueue;
    private LoggerInterface $logger;

    public function __construct(string $taskQueue = FileProcessingWorkflow::DEFAULT_TASK_QUEUE)
    {
        self::$taskQueue = $taskQueue;
        $this->logger = new Logger();
    }

    public function upload(string $localFileName, string $url): void
    {
        if (!is_file($localFileName)) {
            throw new \InvalidArgumentException("Invalid file type: " . $localFileName);
        }

        // Faking upload to simplify sample implementation.
        $this->log('upload activity: uploaded from %s to %s', $localFileName, $url);
    }

    public function process(string $inputFileName): string
    {
        try {
            $this->log('process activity: sourceFile=%s', $inputFileName);
            $processedFile = $this->processFile($inputFileName);
            $this->log('process activity: processed file=%s', $processedFile);

            return $processedFile;
        } catch (\Throwable $e) {
            throw $e;
        }
    }

    public function download(string $url): TaskQueueFilenamePair
    {
        try {
            $this->log('download activity: downloading %s', $url);

            $data = file_get_contents($url);
            $file = tempnam(sys_get_temp_dir(), 'demo');

            file_put_contents($file, $data);

            $this->log('download activity: downloaded from %s to %s', $url, realpath($file));

            return new TaskQueueFilenamePair(self::$taskQueue, $file);
        } catch (\Throwable $e) {
            throw $e;
        }
    }

    private function processFile(string $filename): string
    {
        // faking processing for simplicity
        return $filename;
    }

    /**
     * @param string $message
     * @param mixed ...$arg
     */
    private function log(string $message, ...$arg)
    {
        // by default all error logs are forwarded to the application server log and docker log
        $this->logger->debug(sprintf($message, ...$arg));
    }
}

示例工作流程接口

<?php

/**
 * This file is part of Temporal package.
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Temporal\Samples\FileProcessing;

use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;

#[WorkflowInterface]
interface FileProcessingWorkflowInterface
{
    #[WorkflowMethod("FileProcessing")]
    public function processFile(
        string $sourceURL,
        string $destinationURL
    );
}

示例工作流程

<?php
# https://github.com/temporalio/samples-php/blob/master/app/src/FileProcessing/FileProcessingWorkflow.php

/**
 * This file is part of Temporal package.
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Temporal\Samples\FileProcessing;

use Carbon\CarbonInterval;
use Temporal\Activity\ActivityOptions;
use Temporal\Common\RetryOptions;
use Temporal\Internal\Workflow\ActivityProxy;
use Temporal\Workflow;

class FileProcessingWorkflow implements FileProcessingWorkflowInterface
{
    public const DEFAULT_TASK_QUEUE = 'default';

    /** @var ActivityProxy|StoreActivitiesInterface */
    private $defaultStoreActivities;

    public function __construct()
    {
        $this->defaultStoreActivities = Workflow::newActivityStub(
            StoreActivitiesInterface::class,
            ActivityOptions::new()
                ->withScheduleToCloseTimeout(CarbonInterval::minute(5))
                ->withTaskQueue(self::DEFAULT_TASK_QUEUE)
        );
    }

    public function processFile(string $sourceURL, string $destinationURL)
    {
        /** @var TaskQueueFilenamePair $downloaded */
        $downloaded = yield $this->defaultStoreActivities->download($sourceURL);

        $hostSpecificStore = Workflow::newActivityStub(
            StoreActivitiesInterface::class,
            ActivityOptions::new()
                ->withScheduleToCloseTimeout(CarbonInterval::minute(5))
                ->withTaskQueue($downloaded->hostTaskQueue)
        );

        // Call processFile activity to zip the file.
        // Call the activity to process the file using worker-specific task queue.
        $processed = yield $hostSpecificStore->process($downloaded->filename);

        // Call upload activity to upload the zipped file.
        yield $hostSpecificStore->upload($processed, $destinationURL);

        return 'OK';
    }
}

使用 symfony 服务容器注册

<?php

return static function (ContainerConfigurator $configurator): void {
    $services = $configurator->services();
    $services->defaults()
        ->public()
        ->autowire(true)
        ->autoconfigure(true);

    $services->set(Temporal\Samples\FileProcessing\StoreActivity::class)
        // Setting a "label to your activity" will add the activity to the ActivityRegistry,
        // allowing your employee to use this activity in your Workflow
        ->tag('temporal.activity.registry');

现在你可以运行

rr serve rr.yaml

并通过以下方式调用工作流程

<?php
declare(strict_types=1);

namespace Highcore\TemporalBundle\Example;

use Temporal\Client\WorkflowClientInterface;
use Temporal\Workflow\WorkflowRunInterface;
use Temporal\Client\WorkflowOptions;
use Temporal\Common\RetryOptions;

final class ExampleWorkflowRunner {

    public function __construct(private readonly WorkflowClientInterface $workflowClient)
    {
    }
    
    public function run(): void
    {
        /** @var \Temporal\Samples\FileProcessing\FileProcessingWorkflowInterface $workflow */
        $workflow = $this->workflowClient->newWorkflowStub(
            \Temporal\Samples\FileProcessing\FileProcessingWorkflowInterface::class, 
            WorkflowOptions::new()
                ->withRetryOptions(
                    RetryOptions::new()
                        ->withMaximumAttempts(3)
                        ->withNonRetryableExceptions(\LogicException::class)
                )
        );
        
        // Start Workflow async, with no-wait result
        /** @var WorkflowRunInterface $result */
        $result = $this->workflowClient->start($workflow, 'https://example.com/example_file', 's3://s3.example.com');
        
        echo 'Run ID: ' . $result->getExecution()->getRunID();
        
        // Or you can call workflow sync with wait result
        $result = $workflow->processingFile('https://example.com/example_file', 's3://s3.example.com');
        
        echo $result; // OK
    }

}

更多 PHP 示例请参考 这里

致谢

许可证

MIT 许可证

版权所有 (c) 2023 Highcore.org

在此特此授予任何获得此软件及其相关文档副本(“软件”)的人免费使用软件的权利,不受任何限制,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或出售软件副本的权利,并允许获得软件的人有权使用软件,受以下条件的约束

上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。

软件按“现状”提供,不提供任何明示或暗示的保证,包括但不限于适销性、适用于特定目的和侵权不保证。在任何情况下,作者或版权所有者均不对任何索赔、损害或其他责任负责,无论该责任是基于合同、侵权或其他方式,不论是否与软件或其使用或其它方式有关。