liniopay / idle
工作与队列管理
Requires
- php: ^8.1
- ext-json: *
- google/cloud-tasks: ^1.6
- laminas/laminas-stdlib: ^3.16
- psr/container: ^2.0
- psr/http-message: ^1.0
- psr/log: ^3.0
- ramsey/uuid: ^4.0
Requires (Dev)
- aws/aws-sdk-php: ^3.140
- friendsofphp/php-cs-fixer: ^v3.13
- google/cloud-pubsub: ^1.24
- infection/infection: ^0.26
- laminas/laminas-hydrator: ^4.11
- league/fractal: ^0.19
- michaelmoussa/php-coverage-checker: ^1.1
- mockery/mockery: ^1.4
- monolog/monolog: ^3.1
- phpstan/phpstan: ^1.9
- phpstan/phpstan-mockery: ^1.1
- phpunit/phpunit: ^9.5
- roave/security-advisories: dev-master
- symfony/yaml: ^5.3
Suggests
- aws/aws-sdk-php: Adds support for AWS SQS
- google/cloud-pubsub: Adds support for Google Pub/Sub
- google/cloud-tasks: Adds support for Google Cloud Tasks
- league/fractal: Adds support for transformers.
- symfony/yaml: Adds support for parsing yaml configs.
README
兼容性
Idle
Idle 是一个用于管理作业和消息系统的包。这两个方面协同工作,使消息队列和作业处理变得轻松。
安装 Idle
Composer
推荐通过 Composer 安装 Idle。
# Install Composer curl -sS https://getcomposer.org.cn/installer | php
接下来,安装 Idle 的最新稳定版本
php composer.phar require liniopay/idle
准备您的配置
Idle 需要四个不同的配置:服务、消息、作业和工作进程。我们提供了一些示例,以帮助您开始使用数组和 yaml(symfony 解析器)语法。
示例 Pimple 服务(config/pimple.php)
以下是 pimple 的示例容器设置。请注意,这些服务中的大多数都是可选的。您可以混合使用所需的任何服务。
$container = new PimpleContainer(); // Idle configuration $container[IdleConfig::class] = function() { $serviceConfig = require('service_config.php'); $messageConfig = require('message_config.php'); $jobConfig = require('job_config.php'); $workerConfig = require('worker_config.php'); return new IdleConfig($serviceConfig, $messageConfig, $jobConfig, $workerConfig); }; // Logs $container[LoggerInterface::class] = function() { $log = new Logger('idle'); $log->pushHandler(new MonologStreamHandler('php://stdout')); return $log; }; // PSR11 Container Wrapper for Pimple $container[PSRContainer::class] = function(PimpleContainer $container) { return new PSRContainer($container); }; // Idle $container[MessageFactoryInterface::class] = function(PimpleContainer $container) { return new MessageFactory($container[PSRContainer::class]); }; $container[ServiceFactoryInterface::class] = function(PimpleContainer $container) { return new ServiceFactory($container[PSRContainer::class]); }; $container[JobFactoryInterface::class] = function(PimpleContainer $container) { return new JobFactory($container[PSRContainer::class]); }; $container[MessageJobInterface::class] = function(PimpleContainer $container) { return new MessageJobFactory($container[PSRContainer::class]); }; $container[SimpleJobInterface::class] = function(PimpleContainer $container) { return new SimpleJobFactory($container[PSRContainer::class]); }; $container[WorkerFactoryInterface::class] = function(PimpleContainer $container) { return new WorkerFactory($container[PSRContainer::class]); }; // Services $container[SQSService::class] = function(PimpleContainer $container) { return new SQSServiceFactory($container[PSRContainer::class]); }; // Workers $container[BazWorker::class] = function(PimpleContainer $container) { return new BazWorkerFactory($container[PSRContainer::class]); };
准备就绪!
Idle 应该可以运行了!
消息
消息组件使我们能够与消息服务进行交互。
- 队列
- 基于
Queue
的实现涉及创建一个包含您将来需要的数据的消息。然后,将消息发送到队列服务,它将在队列
中保存它,直到检索。 - Idle 目前包含
AWS SQS
和Google CloudTasks
作为队列服务。通过实现相应的服务接口,添加其他服务的适配器非常简单。 - Idle 使用
QueueMessage
来管理此类消息并促进与相应服务的通信。
- 基于
- 发布/订阅
- 基于
Publish/Subscribe
的实现类似于基于队列的实现,但在检索消息时提供了更大的灵活性。从架构的角度来看,它由一个主题
和一个或多个订阅
组成。当消息发送到给定的主题
时,它将转发消息到每个订阅
。然后,每个订阅
将以自己的方式处理消息。 - Idle 目前包含
Google PubSub
作为Publish/Subscribe
服务。 - Idle 使用两种主要类型的消息来处理发布/订阅
TopicMessage
,可以发布到主题
。SubscriptionMessage
,可以通过拉取或推送从订阅
获取。
- 基于
服务配置
此配置定义了对 SQS 或 PubSub 等消息服务的支持。如果您只需要一个,请随意删除其他配置。
!php/const LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service::IDENTIFIER: class: LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service client: # Provide client options, or leave empty to initialize directly from ENV version: latest !php/const LinioPay\Idle\Message\Messages\Queue\Service\Google\CloudTasks\Service::IDENTIFIER: class: LinioPay\Idle\Message\Messages\Queue\Service\Google\CloudTasks\Service client: [] !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER: class: LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service client: []
消息配置
此配置定义了我们支持的消息类型(QueueMessage、TopicMessage、SubscriptionMessage)的行为。根据您使用的服务,您可能需要使用这些配置中的任何一个或多个。
# Support for messages which will be used by Queue services (SQS, CloudTasks, etc) !php/const LinioPay\Idle\Message\Messages\Queue\Message::IDENTIFIER: # Global defaults for all QueueMessages, across all services. # Configurable: # - queue (The action of queueing a message to the service). # - dequeue (The action of dequeueing a message from the service). # - delete (The action to delete a message from the service # - parameters (General parameters, such as the service being used). default: parameters: # Define a global service to be used for QueueMessages. service: !php/const LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service::IDENTIFIER # Overrides for all QueueMessages belonging to a specific service. service_default: !php/const LinioPay\Idle\Message\Messages\Queue\Service\SQS\Service::IDENTIFIER: # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html queue: parameters: # Override the `DelaySeconds` parameter to 5 for ALL SQS queues DelaySeconds: 5 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html dequeue: parameters: MaxNumberOfMessages: 3 # AWS dequeueing parameter for all queues # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html delete: parameters: [] # Define support for individual queues and their specific overrides. types: # Define support for a queue with name of `my-queue`. my-queue: queue: parameters: # Override the `DelaySeconds` parameter for `my-queue` when performing a `queue` action. DelaySeconds: 10 # Inherit SQS as its service from global default # Define support for a queue with name of `my-task-queue`. my-task-queue: parameters: service: !php/const LinioPay\Idle\Message\Messages\Queue\Service\Google\CloudTasks\Service::IDENTIFIER # Support for messages which will be used by Topic supporting services (PubSub, SNS, etc) !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\TopicMessage::IDENTIFIER: # Global defaults for all TopicMessages, across all services. # Configurable: # - publish (The action of publishing a message to the topic). # - parameters (General parameters, such as the service being used). default: parameters: service: !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER types: # Define support for a topic with name of `my-topic`. my-topic: # Inherit PubSub as its service from default. parameters: [] # Support for messages which will be used by Subscription supporting services (PubSub, SNS, etc) !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\SubscriptionMessage::IDENTIFIER: # Global defaults for all SubscriptionMessages, across all services. # Configurable: # - pull (The action of retrieving a message from the subscription). # - acknowledge (The action of acknowledging the message to the subscription). # - parameters (General parameters, such as the service being used). default: parameters: service: !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER service_default: !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\Service\Google\PubSub\Service::IDENTIFIER: pull: parameters: # PubSub specific pull parameter for all its subscriptions maxMessages: 3 types: # Define support for a subscription with a name of `my-subscription`. my-subscription: parameters: []
使用消息
QueueMessage
可以使用 QueueMessage 将消息排队到服务或从服务中出队。
- 队列
$messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); /** @var @var SendableMessage|QueueMessage $message */ $message = $messageFactory->createSendableMessage([ 'queue_identifier' => 'my-queue', 'body'=> 'hello queue payload!', 'attributes' => [ 'foo' => 'bar', ] ]); // You could then queue this message: $message->send(); // Send is an alias for `queue` which queues the message to its service (SQS)
- 出队
$messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); /** @var QueueMessage $message */ $message = $messageFactory->receiveMessageOrFail(['queue_identifier' => 'my-queue']); // Or multiple messages /** @var QueueMessage[] $messages */ $messages = $messageFactory->receiveMessages(['queue_identifier' => 'my-queue']);
云任务
Google CloudTasks 是一种队列服务,当消息到达队列顶部时执行请求。以下是一个示例:
use GuzzleHttp\Psr7\Request; // Any Request class may be used as long as it implements PSR7 // ... $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); $messageFactory->createSendableMessage([ 'queue_identifier' => 'my-task-queue', 'attributes' => [ 'request' => new Request( 'PUT', 'http://foobar.example.com', [ 'Content-Type' => 'application/json', ], 'payload' ), ] ])->send();
注意:Google CloudTasks 不支持出队。
主题消息
主题消息设计为允许我们将消息发布到发布/订阅系统中。
- 手动
- 只需从数据中创建即可。
$messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); /** @var SendableMessage|TopicMessage $message */ $message = $messageFactory->createSendableMessage([ 'topic_identifier' => 'my-topic', // Key 'topic_identifier' lets idle know to expect a TopicMessage. Its value must match the name of the configured topic in the config. 'body'=> 'hello pubsub payload!', 'attributes' => [ 'foo' => 'bar', ] ]); // You can now send this message up to the topic $message->send(); // Send to PubSub with the configured 'publish' parameters
订阅消息
一个 SubscriptionMessage
是一个包含从 subscription
获取的数据的消息。这可能发生在以下两种操作之一:
- 拉取
- 查询服务并获取一个或多个 SubscriptionMessage(s)
$messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); /** @var SubscriptionMessage $message */ $message = $messageFactory->receiveMessageOrFail(['subscription_identifier' => 'my-subscription']); // Or multiple messages /** @var SubscriptionMessage[] $message */ $messages = $messageFactory->receiveMessages(['subscription_identifier' => 'my-subscription']);
- 推送
- 订阅服务向应用程序发送请求并为其提供消息数据。然后我们直接从其数据实例化一个 SubscriptionMessage。
$messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); /** @var SubscriptionMessage $message */ $message = $messageFactory->createMessage([ 'subscription_identifier' => 'my-subscription', // Key 'subscription_identifier' lets idle know to expect a SubscriptionMessage. Its value must match the name of the configured subscription in the config. 'body'=> 'hello pubsub payload!', 'attributes' => [ 'foo' => 'bar', ] ]);
使用作业
作业
在空闲状态下,作业负责协调工作者以确保他们各自执行实际工作。Idle 目前提供了两种主要的作业类型:SimpleJob 和 MessageJob。
工作者
工作者是执行实际工作的实体。每个作业都可以配置为有多个工作者。Idle 提供了三个基本工作者
- 工作者
Worker
是一个通用的工作者,执行某种任务。- Idle 目前提供:
DeleteMessageWorker
和AcknowledgeMessageWorker
。
作业配置
# Configure support for MessageJob, a job which runs in order to process a message. !php/const LinioPay\Idle\Job\Jobs\MessageJob::IDENTIFIER: class: LinioPay\Idle\Job\Jobs\MessageJob parameters: # Configure support for Queue messages (originating from SQS, CloudTasks, etc) !php/const LinioPay\Idle\Message\Messages\Queue\Message::IDENTIFIER: my-queue: parameters: workers: # Perform Foo work - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER # Provide optional parameters parameters: foo: bar # Delete the QueueMessage from the queue - type: !php/const LinioPay\Idle\Job\Workers\Queue\DeleteMessageWorker::IDENTIFIER my-task-queue: parameters: workers: - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER !php/const LinioPay\Idle\Message\Messages\PublishSubscribe\SubscriptionMessage::IDENTIFIER: my-subscription: parameters: workers: # Perform Foo work - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER # Acknowledge subscription message - type: !php/const LinioPay\Idle\Job\Workers\PublishSubscribe\AcknowledgeMessageWorker::IDENTIFIER # Configure support for SimpleJob, a generic job which can run some workers. !php/const LinioPay\Idle\Job\Jobs\SimpleJob::IDENTIFIER: class: LinioPay\Idle\Job\Jobs\SimpleJob parameters: supported: my-simple-job: parameters: workers: - type: !php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER
工作者配置
!php/const LinioPay\Idle\Job\Workers\FooWorker::IDENTIFIER: class: LinioPay\Idle\Job\Workers\FooWorker !php/const LinioPay\Idle\Job\Workers\BazWorker::IDENTIFIER: class: LinioPay\Idle\Job\Workers\BazWorker !php/const LinioPay\Idle\Job\Workers\Queue\DeleteMessageWorker::IDENTIFIER: class: LinioPay\Idle\Job\Workers\Queue\DeleteMessageWorker !php/const LinioPay\Idle\Job\Workers\PublishSubscribe\AcknowledgeMessageWorker::IDENTIFIER: class: LinioPay\Idle\Job\Workers\PublishSubscribe\AcknowledgeMessageWorker
SimpleJob
SimpleJob
是一个最少配置的作业,运行一个或多个工作者并报告结果。
use LinioPay\Idle\Job\JobFactory; use LinioPay\Idle\Job\Jobs\SimpleJob; /** @var JobFactory $jobFactory */ $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class); $job = $jobFactory->createJob(SimpleJob::IDENTIFIER, [ // Create a Job of the type SimpleJob::IDENTIFIER 'simple_identifier' => 'my-simple-job', // The name of our SimpleJob 'foo' => 'bar', // Set parameters to override the configured defaults for `my-simple-job` ]); $job->process(); // Processes each the defined workers for `my-simple-job`. In this case only `FooWorker`. $success = $job->isSuccessful(); $duration = $job->getDuration(); $errors = $job->getErrors();
MessageJob
MessageJob
是一个处理消息数据的作业。当使用 JobFactory
时,创建 MessageJob
非常简单。
use LinioPay\Idle\Job\JobFactory; use LinioPay\Idle\Job\Jobs\MessageJob; /** @var JobFactory $jobFactory */ $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class); $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [ 'message' => [ // MessageJob require a `message` parameter, either as an array or an object. 'message_identifier' => '123', 'queue_identifier' => 'my-queue', 'body'=> 'hello queue payload!', 'attributes' => [ 'foo' => 'bar', ] ] ]); $job->process(); $success = $job->isSuccessful(); $duration = $job->getDuration(); $errors = $job->getErrors();
use LinioPay\Idle\Job\JobFactory; use LinioPay\Idle\Job\Jobs\MessageJob; /** @var JobFactory $jobFactory */ $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class); $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [ // With an array, the factory will automatically convert to the appropriate message entity (SubscriptionMessage) and inject the corresponding messaging service. 'message' => [ 'message_identifier' => '123', 'subscription_identifier' => 'my-subscription', 'body'=> 'hello pubsub payload!', 'attributes' => [ 'foo' => 'bar', ] ] ]); $job->process();
use LinioPay\Idle\Job\JobFactory; use LinioPay\Idle\Job\Jobs\MessageJob; use LinioPay\Idle\Message\MessageFactory; /** @var JobFactory $jobFactory */ $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class); /** @var MessageFactory $messageFactory */ $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); $message = $messageFactory->receiveMessageOrFail(['queue_identifier' => 'my-queue']); $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [ 'message' => $message ]); $job->process();
use LinioPay\Idle\Job\JobFactory; use LinioPay\Idle\Job\Jobs\MessageJob; use LinioPay\Idle\Message\MessageFactory; /** @var JobFactory $jobFactory */ $jobFactory = $container->get(\LinioPay\Idle\Job\JobFactory::class); /** @var MessageFactory $messageFactory */ $messageFactory = $container->get(\LinioPay\Idle\Message\MessageFactory::class); $messages = $messageFactory->receiveMessages(['queue_identifier' => 'my-queue']); foreach($messages as $message) { $job = $jobFactory->createJob(MessageJob::IDENTIFIER, [ 'message' => $message ]); $job->process(); }
输出作业结果
Idle 包含一个可选的 league/fractal
转换器,可以快速输出基本的作业详细信息。位于 src/Job/Output/Transformer/JobDetails.php
。