eresults/worker-bundle

Symfony Bundle,用于使用工作队列

1.0.4 2021-04-19 20:26 UTC

This package is auto-updated.

Last update: 2024-09-20 04:26:22 UTC


README

eResults\WorkerBundle 为队列提供者添加了抽象,并允许创建工作进程以消费队列工作负载。

要求

  • PHP >=7.4
  • Symfony ^4.4|^5.0

安装

composer req eresults/worker-bundle

配置

eresults_worker:
    providers:
        sqs:
            class: eResults\WorkerBundle\Provider\AwsSQS
            arguments:
                -
                    version: "latest"
                    region: "us-west-2"
                    credentials:
                        key: "xxxxxx"
                        secret: "xxxxxx"
    queues:
        my_queue:
            name: https://eu-west-1.queue.amazonaws.com/xxxxxx/xxxx
            provider: sqs

使用

您可以通过 Symfony 容器访问任何配置的提供者或队列

<?php

$provider = $this->get('eresults_worker.provider.sqs');
$provider->put('ThisIsMyQueue', 'Hello World');

$queue = $this->get('eresults_worker.queue.my_queue');
echo $queue->count()." item(s) in the queue";

您可以轻松创建工作进程

<?php

// src/Acme/DemoBundle/Command/DemoWorkerCommand.php

namespace Acme\DemoBundle\Command;

use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use eResults\WorkerBundle\Command\Worker;

class DemoWorkerCommand extends Worker
{
    protected function configureWorker()
    {
        $this
            // Queue name from the configuration
            ->setQueueName('queue1')

            // Or load the queue directly through your own dependency injection
//            ->setQueue($this->myQueue)

            // Inherited Command methods
            ->setName('demo-worker')
            ->setDescription('Test a worker')
        ;
    }

    protected function doProcess($workload, InputInterface $input, OutputInterface $output): int
    {
        $output->writeln($workload);

        // Stop worker when some end condition is reached
        if ($this->hasSomeReasonToStopAndExit()) {
            return self::STATE_SHUTDOWN;
        }

        // else continue
        return self::STATE_READY;
    }

    protected function onException(Exception $e, $workload): int
    {
        // If an exception occurs, check if the worker can continue running
        if ($e instanceof MyNonFatalException) {
            // Log the exception if appropriate
            // $this->logger->logException($e);

            return self::STATE_READY;
        }

        return self::STATE_EXCEPTION;
    }
 }

然后您可以将工作进程像其他命令一样启动

$ app/console demo-worker
Hello World

您可以传递选项。

$ app/console\
    --workload-timeout=60\
    --workload-limit=10\
    --memory-limit=128\
    --exit-on-exception

此命令等待 60 秒从队列中获取工作负载,最多处理 10 个工作负载,或者在使用的内存超过 128Mb 时退出,如果 executeWorker() 抛出异常,则退出。