antidot-fw / message-queue
Antidot 框架的消息总线及 Pub-Sub 集成队列。
Requires
- php: ^7.4.3|^8.0
- beberlei/assert: ^3.2
- enqueue/enqueue: ^0.10.1
- psr/container: ^1.0.0
- psr/event-dispatcher: ^1.0
Requires (Dev)
- enqueue/dbal: ^0.10.1
- enqueue/fs: ^0.10.1
- enqueue/null: ^0.10.1
- enqueue/pheanstalk: ^0.10.2
- enqueue/redis: ^0.10.1
- enqueue/sqs: ^0.10.1
- infection/infection: ^0.24.0
- phpro/grumphp: ~0.17 || ~1.0
- phpstan/phpstan: ^0.11.5 || ^0.12.0
- phpunit/phpunit: ^8.0 || ^9.0
- squizlabs/php_codesniffer: ^3.4
- symfony/var-dumper: ^4.2 || ^5.0
- vimeo/psalm: ^4.9
Suggests
- enqueue/dbal: To use Database queue with dbal connection.
- enqueue/fs: To use Filesystem queue.
- enqueue/null: Null queue for testing purposes.
- enqueue/pheanstalk: To use queue with Beanstalk connection.
- enqueue/redis: To use queue with Redis connection.
- enqueue/sqs: To use queue with AWS SQS service.
This package is auto-updated.
Last update: 2024-08-29 05:30:58 UTC
README
使用enqueue/enqueue为 Antidot 框架实现的消息队列。
composer require antidot-fw/message-queue
消息队列
消息队列是一种异步通信方法。它允许将消息存储在队列系统中,直到它们被消费和销毁。每个消息仅由一个唯一的消费者处理一次。
不同的队列系统
- 空队列
- 文件系统队列
- DBAL 队列
- Redis 队列
- Beanstalk
- Amazon SQS
每种实现将具有不同的配置细节,请参阅具体文档部分。此外,您可以使用 PHP-enqueue 包中实现的所有系统,创建所需的工厂。
用法
您可以定义所需的任何上下文。您可以将每个上下文绑定到不同的队列。一旦创建了上下文类,就可以开始向队列发送作业。作业应包含队列名称、消息类型和消息本身。
<?php declare(strict_types=1); /** @var \Psr\Container\ContainerInterface $container */ $producer = $container->get(\Antidot\Queue\Producer::class); $producer->enqueue(Job::create('some_queue', 'some_message', 'Hola Mundo!!'));
启动监听队列
bin/console queue:start default # "default is the queue name"
现在您可以配置对队列接收到的消息类型的操作。操作是一个可调用的类,它接收一个 JobPayload 作为第一个参数。
作业和生产者
作业是一个负责将给定数据传输到队列的类。它由两个参数组成:队列名称作为单个字符串,以及包含要处理的数据的 JobPayload。JsonPayload 是一个由两个其他参数组成的可 JSON 序列化对象:消息类型和消息数据作为字符串或数组。
一旦有了作业类,就需要将其传递给生产者以将消息发送到队列。请参阅下面的示例。
<?php declare(strict_types=1); use Psr\Container\ContainerInterface; use Antidot\Queue\Producer; /** @var ContainerInterface $container */ /** @var Producer $producer */ $producer = $container->get(Producer::class); // Send String Job of type "some_message_type" to "default" queue. $job1 = Job::create('default', 'some_message_type', 'Hello world!!'); $producer->enqueue($job1); // Send Array Job of type "other_message_type" to "other_queue" queue. $job2 = Job::create('other_queue', 'other_message_type', ['greet' => 'Hello world!!']); $producer->enqueue($job2);
操作
操作是可调用的类,当队列处理给定的消息时将执行。此类有一个唯一参数,即 JobPayload。
<?php declare(strict_types=1); use Antidot\Queue\JobPayload; class SomeMessageTypeAction { public function __invoke(JobPayload $payload): void { // do some stuff with the job payload here. } }
配置
将操作绑定到消息类型。
services: some_action_service: class: Some\Action\Class parameters: queues: contexts: default: message_types: # message_type: action_service some_message: some_action_service
这是默认配置。
parameters: queues: default_context: default contexts: default: message_types: [] context_type: fs # redis|dbal|sqs|beanstalk|null context_service: queue.context.default container: queue.container.default extensions: - Enqueue\Consumption\Extension\LoggerExtension - Enqueue\Consumption\Extension\SignalExtension - Enqueue\Consumption\Extension\LogExtension
特定传输的配置
空队列
因此,为了测试目的,它丢弃任何接收到的作业。此传输类型所需配置的唯一要求是将它设置为上下文。
composer require enqueue/null
文件系统队列
文件系统队列将生产的作业存储在内存中的文件中。它需要存储作业的绝对文件路径。
composer require enqueue/fs
parameters: queues: default_context: default contexts: default: message_types: [] context_type: fs context_params: path: file:///absoute/path/to/writable/dir
DBAL 队列
Doctrine DBAL 队列将生产的作业存储在数据库中。它需要 DBAL 连接服务名称。
composer require enqueue/dbal
parameters: queues: default_context: default contexts: default: message_types: [] context_type: dbal context_params: connection: Doctrine\DBAL\Connection
Redis 队列
Redis 队列将生产的作业存储在 Redis 数据库中。它需要 Redis 连接参数。您可以使用 Predis 或使用 PHP Redis 扩展。
使用 Predis
composer require enqueue/redis predis/predis:^1
parameters: queues: default_context: default contexts: default: message_types: [] context_type: redis context_params: host: localhost port: 6379 scheme_extensions: ['predis']
使用 PHP 扩展
确保您已安装并启用了 PHP Redis 扩展
composer require enqueue/redis
parameters: queues: default_context: default contexts: default: message_types: [] context_type: redis host: localhost port: 6379 scheme_extensions: ['phpredis']
Beanstalk 队列
Beanstalk 队列需要 beanstalk 主机地址和 beanstalk 端口才能工作。它使用 Pheanstalk PHP 库。
composer require enqueue/pheanstalk
parameters: queues: default_context: default contexts: default: message_types: [] context_type: beanstalk context_params: host: localhost port: 11300
Amazon SQS 队列
Amazon SQS 队列在 AWS 上存储生产的作业。它需要 AWS 控制台凭据。您可以根据需要使用 标准队列和 FIFO 队列。在发送作业到工作前,队列必须在 AWS 中存在。
composer require enqueue/sqs
parameters: queues: default_context: default contexts: default: message_types: [] context_type: sqs context_params: key: AWS-KEY secret: AWS-SECRET region: eu-west-3
消费者
工作器是负责监听给定队列以获取消息并逐条处理每条消息的CLI命令。在这个早期版本中,它使用的唯一参数是队列名称以开始监听。
bin/console queue:start queue_name
事件
Antidot框架消息队列使用PSR-14事件调度器,允许监听队列执行过程中发生的不同事件。
- QueueConsumeWasStarted: 在队列开始时调度。
- MessageReceived: 在处理之前,为任何接收到的任务调度。
- MessageProcessed: 在任务处理之后调度,它将包含处理的结果。
扩展
有关扩展的更多信息,请参阅php-enqueue官方文档
日志扩展
您可以在框架默认配置中启用或禁用调试模式日志记录器。它内部使用PSR-3日志接口。
生产环境运行
在生产环境中,通常需要一个守护进程来保持消费者进程的活跃状态。您可以使用Supervisor或其他系统守护进程替代品。
Supervisor
您需要在系统中安装supervisord。然后您需要将消费者配置为supervisor作业。
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /absolute/path/to/app/bin/console queue:start QUEUE_NAME
autostart=true
autorestart=true
user=ubuntu
numprocs=2 # Be cautious, it will block your computer depending on the available simultaneous execution thread it has.
redirect_stderr=true
stdout_logfile=/absolute/path/to/app/var/log/worker.log
stopwaitsecs=3600