antidot-fw/message-queue

Antidot 框架的消息总线及 Pub-Sub 集成队列。

1.0.0 2021-08-14 15:52 UTC

README

Scrutinizer Code Quality Code Coverage Type Coverage Psalm Level Build Status Code Intelligence Status

使用enqueue/enqueue为 Antidot 框架实现的消息队列。

composer require antidot-fw/message-queue

消息队列

消息队列是一种异步通信方法。它允许将消息存储在队列系统中,直到它们被消费和销毁。每个消息仅由一个唯一的消费者处理一次。

不同的队列系统

  • 空队列
  • 文件系统队列
  • DBAL 队列
  • Redis 队列
  • Beanstalk
  • Amazon SQS

每种实现将具有不同的配置细节,请参阅具体文档部分。此外,您可以使用 PHP-enqueue 包中实现的所有系统,创建所需的工厂。

用法

您可以定义所需的任何上下文。您可以将每个上下文绑定到不同的队列。一旦创建了上下文类,就可以开始向队列发送作业。作业应包含队列名称、消息类型和消息本身。

<?php

declare(strict_types=1);

/** @var \Psr\Container\ContainerInterface $container */
$producer = $container->get(\Antidot\Queue\Producer::class);
$producer->enqueue(Job::create('some_queue', 'some_message', 'Hola Mundo!!')); 

启动监听队列

bin/console queue:start default # "default is the queue name"

现在您可以配置对队列接收到的消息类型的操作。操作是一个可调用的类,它接收一个 JobPayload 作为第一个参数。

作业和生产者

作业是一个负责将给定数据传输到队列的类。它由两个参数组成:队列名称作为单个字符串,以及包含要处理的数据的 JobPayload。JsonPayload 是一个由两个其他参数组成的可 JSON 序列化对象:消息类型和消息数据作为字符串或数组。

一旦有了作业类,就需要将其传递给生产者以将消息发送到队列。请参阅下面的示例。

<?php

declare(strict_types=1);

use Psr\Container\ContainerInterface;
use Antidot\Queue\Producer;

/** @var ContainerInterface $container */
/** @var Producer $producer */
$producer = $container->get(Producer::class);

// Send String Job of type "some_message_type" to "default" queue.
$job1 = Job::create('default', 'some_message_type', 'Hello world!!');
$producer->enqueue($job1);

// Send Array Job of type "other_message_type" to "other_queue" queue.
$job2 = Job::create('other_queue', 'other_message_type', ['greet' => 'Hello world!!']);
$producer->enqueue($job2);

操作

操作是可调用的类,当队列处理给定的消息时将执行。此类有一个唯一参数,即 JobPayload。

<?php

declare(strict_types=1);

use Antidot\Queue\JobPayload;

class SomeMessageTypeAction
{
    public function __invoke(JobPayload $payload): void
    {
        // do some stuff with the job payload here.
    }
}

配置

将操作绑定到消息类型。

services:
  some_action_service:
    class: Some\Action\Class
parameters:
  queues:
    contexts:
      default:
        message_types:
          # message_type: action_service
          some_message: some_action_service

这是默认配置。

parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: fs # redis|dbal|sqs|beanstalk|null
        context_service: queue.context.default
        container: queue.container.default
        extensions:
          - Enqueue\Consumption\Extension\LoggerExtension
          - Enqueue\Consumption\Extension\SignalExtension
          - Enqueue\Consumption\Extension\LogExtension

特定传输的配置

空队列

因此,为了测试目的,它丢弃任何接收到的作业。此传输类型所需配置的唯一要求是将它设置为上下文。

composer require enqueue/null

文件系统队列

文件系统队列将生产的作业存储在内存中的文件中。它需要存储作业的绝对文件路径。

composer require enqueue/fs
parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: fs
        context_params:
          path: file:///absoute/path/to/writable/dir

DBAL 队列

Doctrine DBAL 队列将生产的作业存储在数据库中。它需要 DBAL 连接服务名称。

composer require enqueue/dbal
parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: dbal
        context_params:
          connection: Doctrine\DBAL\Connection

Redis 队列

Redis 队列将生产的作业存储在 Redis 数据库中。它需要 Redis 连接参数。您可以使用 Predis 或使用 PHP Redis 扩展

使用 Predis

composer require enqueue/redis predis/predis:^1
parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: redis
        context_params:
            host: localhost
            port: 6379
            scheme_extensions: ['predis']

使用 PHP 扩展

确保您已安装并启用了 PHP Redis 扩展

composer require enqueue/redis
parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: redis
            host: localhost
            port: 6379
            scheme_extensions: ['phpredis']

Beanstalk 队列

Beanstalk 队列需要 beanstalk 主机地址和 beanstalk 端口才能工作。它使用 Pheanstalk PHP 库。

composer require enqueue/pheanstalk
parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: beanstalk
        context_params:
          host: localhost
          port: 11300

Amazon SQS 队列

Amazon SQS 队列在 AWS 上存储生产的作业。它需要 AWS 控制台凭据。您可以根据需要使用 标准队列和 FIFO 队列。在发送作业到工作前,队列必须在 AWS 中存在。

composer require enqueue/sqs
parameters:
  queues:
    default_context: default
    contexts:
      default:
        message_types: []
        context_type: sqs
        context_params:
          key: AWS-KEY
          secret: AWS-SECRET
          region: eu-west-3

消费者

工作器是负责监听给定队列以获取消息并逐条处理每条消息的CLI命令。在这个早期版本中,它使用的唯一参数是队列名称以开始监听。

bin/console queue:start queue_name

事件

Antidot框架消息队列使用PSR-14事件调度器,允许监听队列执行过程中发生的不同事件。

  • QueueConsumeWasStarted: 在队列开始时调度。
  • MessageReceived: 在处理之前,为任何接收到的任务调度。
  • MessageProcessed: 在任务处理之后调度,它将包含处理的结果。

扩展

有关扩展的更多信息,请参阅php-enqueue官方文档

日志扩展

您可以在框架默认配置中启用或禁用调试模式日志记录器。它内部使用PSR-3日志接口。

生产环境运行

在生产环境中,通常需要一个守护进程来保持消费者进程的活跃状态。您可以使用Supervisor或其他系统守护进程替代品。

Supervisor

您需要在系统中安装supervisord。然后您需要将消费者配置为supervisor作业。

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /absolute/path/to/app/bin/console queue:start QUEUE_NAME
autostart=true
autorestart=true
user=ubuntu
numprocs=2 # Be cautious, it will block your computer depending on the available simultaneous execution thread it has.
redirect_stderr=true
stdout_logfile=/absolute/path/to/app/var/log/worker.log
stopwaitsecs=3600