lelikptz/async-consumer

基于 Fibers 的异步消费者

v0.1.1 2023-10-20 05:33 UTC

This package is auto-updated.

Last update: 2024-09-26 13:19:57 UTC


README

使用 Fiber 实现的异步 consumer。要使用它,需要实现 TaskInterface。实现应该返回可以并行执行的非阻塞操作的状态。

Task.php 中有一个实现 TaskInterface 的示例,其中非阻塞操作是通过 guzzle 发送的 HTTP 请求。

Http\Task 使用示例

实现用于创建请求的工厂

final class Factory implements RequestFactoryInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function create(): RequestInterface
    {
        $this->logger->info('Some logic for creating request');

        return new Request('GET', 'https://www.google.com');
    }
}

实现用于处理响应和错误的处理器

final class Handler implements ResponseHandlerInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function onSuccess(ResponseInterface $response): void
    {
        $this->logger->info(
            sprintf(
                "Response body: %s; response code: %s",
                $response->getBody()->getContents(),
                $response->getStatusCode()
            )
        );
        $this->logger->info('Some logic with response');
        $this->logger->info('Finish');
    }

    public function onException(RequestException $exception): void
    {
        $this->logger->error($exception->getMessage());
    }
}

任务提供者收集所需的任务,并在就绪时将其返回给消费者

final class Provider implements ProviderInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function get(): array
    {
        return [
            new Task(new Factory($this->logger), new Handler($this->logger)),
        ];
    }
}

组装消费者并作为守护进程启动,例如通过 supervisor。

$pollTimeoutInMicroseconds - 查询提供者之间的延迟

$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG));
(new AsyncConsumer(new Provider($logger), new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();

使用 rabbitmq 作为任务提供者的示例

要使用 AMPQProvider.php,我们需要实现 TransformerInterface 以从 AMQPMessage 创建 TaskInterface

final class Transformer implements TransformerInterface
{
    public function __construct(private readonly LoggerInterface $logger)
    {
    }

    public function transform(AMQPMessage $message): TaskInterface
    {
        return new Task(new Factory($this->logger), new Handler($this->logger));
    }
}

组装并启动

$maxBatchSize - 从 rabbitmq 收集的最大批次大小,实际上是并行执行的任务数量

$maxBatchCollectTimeInSeconds - 等待从 rabbitmq 收集批次的时间,如果超时,则使用现有的数据进行处理

$pollTimeoutInMicroseconds - 查询提供者之间的延迟

$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$provider = new AMPQProvider($connection, 'provider', new Transformer($logger));
$logger = new ConsoleLogger(new ConsoleOutput(OutputInterface::VERBOSITY_DEBUG));
$batch = new BatchProvider($provider, 10, 5, $pollTimeoutInMicroseconds);

(new AsyncConsumer($batch, new FiberExecutor(), $pollTimeoutInMicroseconds, $logger))->consume();