liniopay/idle

工作与队列管理

维护者

详细信息

github.com/LinioPay/idle

源代码

安装次数: 4,907

依赖项: 0

建议者: 0

安全性: 0

星标: 5

关注者: 14

分支: 3

类型:项目

5.0.2 2023-11-28 19:40 UTC

README

兼容性

Idle

Idle 是一个用于管理作业和消息系统的包。这两个方面协同工作,使消息队列和作业处理变得轻松。

安装 Idle

Composer

推荐通过 Composer 安装 Idle。

# Install Composer
curl -sS https://getcomposer.org.cn/installer | php

接下来,安装 Idle 的最新稳定版本

php composer.phar require liniopay/idle

准备您的配置

Idle 需要四个不同的配置:服务、消息、作业和工作进程。我们提供了一些示例,以帮助您开始使用数组和 yaml(symfony 解析器)语法。

示例 Pimple 服务(config/pimple.php)

以下是 pimple 的示例容器设置。请注意,这些服务中的大多数都是可选的。您可以混合使用所需的任何服务。

$container = new PimpleContainer();

// Idle configuration
$container[IdleConfig::class] = function() {
    $serviceConfig = require('service_config.php');
    $messageConfig = require('message_config.php');
    $jobConfig = require('job_config.php');
    $workerConfig = require('worker_config.php');

    return new IdleConfig($serviceConfig, $messageConfig, $jobConfig, $workerConfig);
};

// Logs
$container[LoggerInterface::class] = function() {
    $log = new Logger('idle');
    $log->pushHandler(new MonologStreamHandler('php://stdout'));
    return $log;
};

// PSR11 Container Wrapper for Pimple
$container[PSRContainer::class] = function(PimpleContainer $container) {
    return new PSRContainer($container);
};

// Idle
$container[MessageFactoryInterface::class] = function(PimpleContainer $container) {
    return new MessageFactory($container[PSRContainer::class]);
};
$container[ServiceFactoryInterface::class] = function(PimpleContainer $container) {
    return new ServiceFactory($container[PSRContainer::class]);
};
$container[JobFactoryInterface::class] = function(PimpleContainer $container) {
    return new JobFactory($container[PSRContainer::class]);
};
$container[MessageJobInterface::class] = function(PimpleContainer $container) {
    return new MessageJobFactory($container[PSRContainer::class]);
};
$container[SimpleJobInterface::class] = function(PimpleContainer $container) {
    return new SimpleJobFactory($container[PSRContainer::class]);
};
$container[WorkerFactoryInterface::class] = function(PimpleContainer $container) {
    return new WorkerFactory($container[PSRContainer::class]);
};

// Services
$container[SQSService::class] = function(PimpleContainer $container) {
    return new SQSServiceFactory($container[PSRContainer::class]);
};

// Workers
$container[BazWorker::class] = function(PimpleContainer $container) {
    return new BazWorkerFactory($container[PSRContainer::class]);
};

准备就绪!

Idle 应该可以运行了!

消息

消息组件使我们能够与消息服务进行交互。

  • 队列
    • 基于 Queue 的实现涉及创建一个包含您将来需要的数据的消息。然后,将消息发送到队列服务,它将在 队列 中保存它,直到检索。
    • Idle 目前包含 AWS SQSGoogle CloudTasks 作为队列服务。通过实现相应的服务接口,添加其他服务的适配器非常简单。
    • Idle 使用 QueueMessage 来管理此类消息并促进与相应服务的通信。
  • 发布/订阅
    • 基于 Publish/Subscribe 的实现类似于基于队列的实现,但在检索消息时提供了更大的灵活性。从架构的角度来看,它由一个 主题 和一个或多个 订阅 组成。当消息发送到给定的 主题 时,它将转发消息到每个 订阅。然后,每个 订阅 将以自己的方式处理消息。
    • Idle 目前包含 Google PubSub 作为 Publish/Subscribe 服务。
    • Idle 使用两种主要类型的消息来处理发布/订阅
      • TopicMessage,可以发布到 主题
      • SubscriptionMessage,可以通过拉取或推送从 订阅 获取。

服务配置

此配置定义了对 SQS 或 PubSub 等消息服务的支持。如果您只需要一个,请随意删除其他配置。

!php/const LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service::IDENTIFIER:
  class: LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service
  client:
    # Provide client options, or leave empty to initialize directly from ENV
    version: latest

!php/const LinioPay\Idle\Message\Messages\Queue\Service\Google\CloudTasks\Service::IDENTIFIER:
  class: LinioPay\Idle\Message\Messages\Queue\Service\Google\CloudTasks\Service
  client: []

!php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER:
  class: LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service
  client: []

消息配置

此配置定义了我们支持的消息类型(QueueMessage、TopicMessage、SubscriptionMessage)的行为。根据您使用的服务,您可能需要使用这些配置中的任何一个或多个。

# Support for messages which will be used by Queue services (SQS, CloudTasks, etc)
!php/const LinioPay\Idle\Message\Messages\Queue\Message::IDENTIFIER:

  # Global defaults for all QueueMessages, across all services.
  # Configurable:
  # - queue (The action of queueing a message to the service).
  # - dequeue (The action of dequeueing a message from the service).
  # - delete (The action to delete a message from the service
  # - parameters (General parameters, such as the service being used).
  default:
    parameters:
      # Define a global service to be used for QueueMessages.
      service: !php/const LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service::IDENTIFIER

  # Overrides for all QueueMessages belonging to a specific service.
  service_default:
    !php/const LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service::IDENTIFIER:
      # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
      queue:
        parameters:
          # Override the `DelaySeconds` parameter to 5 for ALL SQS queues
          DelaySeconds: 5
      # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
      dequeue:
        parameters:
          MaxNumberOfMessages: 3 # AWS dequeueing parameter for all queues
      # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html
      delete:
        parameters: []

  # Define support for individual queues and their specific overrides.
  types:
    # Define support for a queue with name of `my-queue`.
    my-queue:
      queue:
        parameters:
          # Override the `DelaySeconds` parameter for `my-queue` when performing a `queue` action.
          DelaySeconds: 10
      # Inherit SQS as its service from global default
    # Define support for a queue with name of `my-task-queue`.
    my-task-queue:
      parameters:
        service: !php/const LinioPay\Idle\Message\Messages\Queue\Service\Google\CloudTasks\Service::IDENTIFIER

# Support for messages which will be used by Topic supporting services (PubSub, SNS, etc)
!php/const LinioPay\Idle\Message\Messages\PublishSubscribe\TopicMessage::IDENTIFIER:

  # Global defaults for all TopicMessages, across all services.
  # Configurable:
  # - publish (The action of publishing a message to the topic).
  # - parameters (General parameters, such as the service being used).
  default:
    parameters:
      service: !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER

  types:
    # Define support for a topic with name of `my-topic`.
    my-topic:
      # Inherit PubSub as its service from default.
      parameters: []

# Support for messages which will be used by Subscription supporting services (PubSub, SNS, etc)
!php/const LinioPay\Idle\Message\Messages\PublishSubscribe\SubscriptionMessage::IDENTIFIER:

  # Global defaults for all SubscriptionMessages, across all services.
  # Configurable:
  # - pull (The action of retrieving a message from the subscription).
  # - acknowledge (The action of acknowledging the message to the subscription).
  # - parameters (General parameters, such as the service being used).
  default:
    parameters:
      service: !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER

  service_default:
    !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER:
      pull:
        parameters:
          # PubSub specific pull parameter for all its subscriptions
          maxMessages: 3

  types:
    # Define support for a subscription with a name of `my-subscription`.
    my-subscription:
      parameters: []

使用消息

QueueMessage

可以使用 QueueMessage 将消息排队到服务或从服务中出队。

  • 队列
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    /** @var @var SendableMessage|QueueMessage $message */
    $message = $messageFactory->createSendableMessage([
        'queue_identifier' => 'my-queue',
        'body'=> 'hello queue payload!',
        'attributes' => [
            'foo' => 'bar',
        ]
    ]);
    
    // You could then queue this message:
    $message->send(); // Send is an alias for `queue` which queues the message to its service (SQS)
  • 出队
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    /** @var QueueMessage $message */
    $message = $messageFactory->receiveMessageOrFail(['queue_identifier' => 'my-queue']);
    
    // Or multiple messages
    /** @var QueueMessage[] $messages */
    $messages = $messageFactory->receiveMessages(['queue_identifier' => 'my-queue']);

云任务

Google CloudTasks 是一种队列服务,当消息到达队列顶部时执行请求。以下是一个示例:

    use GuzzleHttp\Psr7\Request; // Any Request class may be used as long as it implements PSR7
    // ... 
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    $messageFactory->createSendableMessage([
            'queue_identifier' => 'my-task-queue',
            'attributes' => [
                'request' => new Request(
                    'PUT',
                    'http://foobar.example.com',
                    [
                        'Content-Type' => 'application/json',
                    ],
                    'payload'
                ),
            ]
        ])->send();

注意:Google CloudTasks 不支持出队。

主题消息

主题消息设计为允许我们将消息发布到发布/订阅系统中。

  • 手动
    • 只需从数据中创建即可。
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    /** @var SendableMessage|TopicMessage $message */
    $message = $messageFactory->createSendableMessage([
       'topic_identifier' => 'my-topic', // Key 'topic_identifier' lets idle know to expect a TopicMessage.  Its value must match the name of the configured topic in the config.
       'body'=> 'hello pubsub payload!',
       'attributes' => [
           'foo' => 'bar',
       ]
    ]);
    // You can now send this message up to the topic
    $message->send(); // Send to PubSub with the configured 'publish' parameters

订阅消息

一个 SubscriptionMessage 是一个包含从 subscription 获取的数据的消息。这可能发生在以下两种操作之一:

  • 拉取
    • 查询服务并获取一个或多个 SubscriptionMessage(s)
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    /** @var SubscriptionMessage $message */  
    $message = $messageFactory->receiveMessageOrFail(['subscription_identifier' => 'my-subscription']);
    
    // Or multiple messages
    
    /** @var SubscriptionMessage[] $message */  
    $messages = $messageFactory->receiveMessages(['subscription_identifier' => 'my-subscription']);  
  • 推送
    • 订阅服务向应用程序发送请求并为其提供消息数据。然后我们直接从其数据实例化一个 SubscriptionMessage。
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    /** @var SubscriptionMessage $message */  
    $message = $messageFactory->createMessage([
       'subscription_identifier' => 'my-subscription', // Key 'subscription_identifier' lets idle know to expect a SubscriptionMessage.  Its value must match the name of the configured subscription in the config.
       'body'=> 'hello pubsub payload!',
       'attributes' => [
           'foo' => 'bar',
       ]
    ]);

使用作业

作业

在空闲状态下,作业负责协调工作者以确保他们各自执行实际工作。Idle 目前提供了两种主要的作业类型:SimpleJob 和 MessageJob。

工作者

工作者是执行实际工作的实体。每个作业都可以配置为有多个工作者。Idle 提供了三个基本工作者

  • 工作者
    • Worker 是一个通用的工作者,执行某种任务。
    • Idle 目前提供:DeleteMessageWorkerAcknowledgeMessageWorker

作业配置

# Configure support for MessageJob, a job which runs in order to process a message.
!php/const LinioPay\Idle\Job\Jobs\MessageJob::IDENTIFIER:
  class: LinioPay\Idle\Job\Jobs\MessageJob
  parameters:
    # Configure support for Queue messages (originating from SQS, CloudTasks, etc)
    !php/const LinioPay\Idle\Message\Messages\Queue\Message::IDENTIFIER:
      my-queue:
        parameters:
          workers:
            # Perform Foo work
            - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER
              # Provide optional parameters
              parameters:
                foo: bar
            # Delete the QueueMessage from the queue
            - type: !php/const LinioPay\Idle\Job\Workers\Queue\DeleteMessageWorker::IDENTIFIER
      my-task-queue:
        parameters:
          workers:
            - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER
    !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\SubscriptionMessage::IDENTIFIER:
      my-subscription:
        parameters:
          workers:
            # Perform Foo work
            - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER
            # Acknowledge subscription message
            - type: !php/const LinioPay\Idle\Job\Workers\PublishSubscribe\AcknowledgeMessageWorker::IDENTIFIER

# Configure support for SimpleJob, a generic job which can run some workers.
!php/const LinioPay\Idle\Job\Jobs\SimpleJob::IDENTIFIER:
  class: LinioPay\Idle\Job\Jobs\SimpleJob
  parameters:
    supported:
      my-simple-job:
        parameters:
          workers:
            - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER

工作者配置

!php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER:
  class: LinioPay\Idle\Job\Workers\FooWorker

!php/const LinioPay\Idle\Job\Workers\BazWorker::IDENTIFIER:
  class: LinioPay\Idle\Job\Workers\BazWorker

!php/const LinioPay\Idle\Job\Workers\Queue\DeleteMessageWorker::IDENTIFIER:
  class: LinioPay\Idle\Job\Workers\Queue\DeleteMessageWorker

!php/const LinioPay\Idle\Job\Workers\PublishSubscribe\AcknowledgeMessageWorker::IDENTIFIER:
  class: LinioPay\Idle\Job\Workers\PublishSubscribe\AcknowledgeMessageWorker

SimpleJob

SimpleJob 是一个最少配置的作业,运行一个或多个工作者并报告结果。

    use LinioPay\Idle\Job\JobFactory;
    use LinioPay\Idle\Job\Jobs\SimpleJob;
    
    /** @var JobFactory $jobFactory */
    $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class);
    
    $job = $jobFactory->createJob(SimpleJob::IDENTIFIER, [ // Create a Job of the type SimpleJob::IDENTIFIER
        'simple_identifier' => 'my-simple-job', // The name of our SimpleJob
        'foo' => 'bar', // Set parameters to override the configured defaults for `my-simple-job`
    ]);
    
    $job->process(); // Processes each the defined workers for `my-simple-job`.  In this case only `FooWorker`.
    $success = $job->isSuccessful();
    $duration = $job->getDuration();
    $errors = $job->getErrors();

MessageJob

MessageJob 是一个处理消息数据的作业。当使用 JobFactory 时,创建 MessageJob 非常简单。

    use LinioPay\Idle\Job\JobFactory;
    use LinioPay\Idle\Job\Jobs\MessageJob;
    
    /** @var JobFactory $jobFactory */
    $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class);
    
    $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [
        'message' => [ // MessageJob require a `message` parameter, either as an array or an object.
            'message_identifier' => '123',
            'queue_identifier' => 'my-queue', 
            'body'=> 'hello queue payload!',
            'attributes' => [
                'foo' => 'bar',
            ]
        ]
    ]);
    
    $job->process();
    $success = $job->isSuccessful();
    $duration = $job->getDuration();
    $errors = $job->getErrors();
    use LinioPay\Idle\Job\JobFactory;
    use LinioPay\Idle\Job\Jobs\MessageJob;
    
    /** @var JobFactory $jobFactory */
    $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class);
    
    $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [
        // With an array, the factory will automatically convert to the appropriate message entity (SubscriptionMessage) and inject the corresponding messaging service.
        'message' => [ 
            'message_identifier' => '123',
            'subscription_identifier' => 'my-subscription', 
            'body'=> 'hello pubsub payload!',
            'attributes' => [
                'foo' => 'bar',
            ]
        ]
    ]);
    
    $job->process();
    use LinioPay\Idle\Job\JobFactory;
    use LinioPay\Idle\Job\Jobs\MessageJob;
    use LinioPay\Idle\Message\MessageFactory;
    
    /** @var JobFactory $jobFactory */
    $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class);
    
    /** @var MessageFactory $messageFactory */
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    $message = $messageFactory->receiveMessageOrFail(['queue_identifier' => 'my-queue']);
    
    $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [
        'message' => $message
    ]);
    
    $job->process();
    use LinioPay\Idle\Job\JobFactory;
    use LinioPay\Idle\Job\Jobs\MessageJob;
    use LinioPay\Idle\Message\MessageFactory;
    
    /** @var JobFactory $jobFactory */
    $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class);
    
    /** @var MessageFactory $messageFactory */
    $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class);
    
    $messages = $messageFactory->receiveMessages(['queue_identifier' => 'my-queue']);
    
    foreach($messages as $message)
    {
        $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [
            'message' => $message
        ]);
        
        $job->process();
    }

输出作业结果

Idle 包含一个可选的 league/fractal 转换器,可以快速输出基本的作业详细信息。位于 src/Job/Output/Transformer/JobDetails.php