lelikptz / async-consumer
基于 Fibers 的异步消费者
v0.1.1
2023-10-20 05:33 UTC
Requires
- php: >=8.1
- ext-curl: *
- ext-http: *
- guzzlehttp/guzzle: ^7.8
- php-amqplib/php-amqplib: ^3.5
- psr/http-message: ^2.0
- psr/log: ^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.35
- roave/security-advisories: dev-latest
- vimeo/psalm: ^5.15
README
使用 Fiber 实现的异步 consumer。要使用它,需要实现 TaskInterface。实现应该返回可以并行执行的非阻塞操作的状态。
在 Task.php 中有一个实现 TaskInterface 的示例,其中非阻塞操作是通过 guzzle 发送的 HTTP 请求。
Http\Task 使用示例
实现用于创建请求的工厂
final class Factory implements RequestFactoryInterface { public function __construct(private readonly LoggerInterface $logger) { } public function create(): RequestInterface { $this->logger->info('Some logic for creating request'); return new Request('GET', 'https://www.google.com'); } }
实现用于处理响应和错误的处理器
final class Handler implements ResponseHandlerInterface { public function __construct(private readonly LoggerInterface $logger) { } public function onSuccess(ResponseInterface $response): void { $this->logger->info( sprintf( "Response body: %s; response code: %s", $response->getBody()->getContents(), $response->getStatusCode() ) ); $this->logger->info('Some logic with response'); $this->logger->info('Finish'); } public function onException(RequestException $exception): void { $this->logger->error($exception->getMessage()); } }
任务提供者收集所需的任务,并在就绪时将其返回给消费者
final class Provider implements ProviderInterface { public function __construct(private readonly LoggerInterface $logger) { } public function get(): array { return [ new Task(new Factory($this->logger), new Handler($this->logger)), ]; } }
组装消费者并作为守护进程启动,例如通过 supervisor。
$pollTimeoutInMicroseconds - 查询提供者之间的延迟
$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG)); (new AsyncConsumer(new Provider($logger), new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();
使用 rabbitmq 作为任务提供者的示例
要使用 AMPQProvider.php,我们需要实现 TransformerInterface 以从 AMQPMessage 创建 TaskInterface
final class Transformer implements TransformerInterface { public function __construct(private readonly LoggerInterface $logger) { } public function transform(AMQPMessage $message): TaskInterface { return new Task(new Factory($this->logger), new Handler($this->logger)); } }
组装并启动
$maxBatchSize - 从 rabbitmq 收集的最大批次大小,实际上是并行执行的任务数量
$maxBatchCollectTimeInSeconds - 等待从 rabbitmq 收集批次的时间,如果超时,则使用现有的数据进行处理
$pollTimeoutInMicroseconds - 查询提供者之间的延迟
$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest'); $provider = new AMPQProvider($connection, 'provider', new Transformer($logger)); $logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG)); $batch = new BatchProvider($provider, 10, 5, $pollTimeoutInMicroseconds); (new AsyncConsumer($batch, new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();