eresults / worker-bundle
Symfony Bundle,用于使用工作队列
1.0.4
2021-04-19 20:26 UTC
Requires
- php: ^7.4|^8.0
- ext-zlib: *
- symfony/config: ^4.0|^5.0
- symfony/console: ^4.0|^5.0
- symfony/dependency-injection: ^4.0|^5.0
- symfony/http-kernel: ^4.0|^5.0
Requires (Dev)
- aws/aws-sdk-php: ^3.142
- phpunit/phpunit: ^9.0
Suggests
- aws/aws-sdk-php: ^3.0
README
eResults\WorkerBundle 为队列提供者添加了抽象,并允许创建工作进程以消费队列工作负载。
要求
- PHP >=7.4
- Symfony ^4.4|^5.0
安装
composer req eresults/worker-bundle
配置
eresults_worker: providers: sqs: class: eResults\WorkerBundle\Provider\AwsSQS arguments: - version: "latest" region: "us-west-2" credentials: key: "xxxxxx" secret: "xxxxxx" queues: my_queue: name: https://eu-west-1.queue.amazonaws.com/xxxxxx/xxxx provider: sqs
使用
您可以通过 Symfony 容器访问任何配置的提供者或队列
<?php $provider = $this->get('eresults_worker.provider.sqs'); $provider->put('ThisIsMyQueue', 'Hello World'); $queue = $this->get('eresults_worker.queue.my_queue'); echo $queue->count()." item(s) in the queue";
您可以轻松创建工作进程
<?php // src/Acme/DemoBundle/Command/DemoWorkerCommand.php namespace Acme\DemoBundle\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use eResults\WorkerBundle\Command\Worker; class DemoWorkerCommand extends Worker { protected function configureWorker() { $this // Queue name from the configuration ->setQueueName('queue1') // Or load the queue directly through your own dependency injection // ->setQueue($this->myQueue) // Inherited Command methods ->setName('demo-worker') ->setDescription('Test a worker') ; } protected function doProcess($workload, InputInterface $input, OutputInterface $output): int { $output->writeln($workload); // Stop worker when some end condition is reached if ($this->hasSomeReasonToStopAndExit()) { return self::STATE_SHUTDOWN; } // else continue return self::STATE_READY; } protected function onException(Exception $e, $workload): int { // If an exception occurs, check if the worker can continue running if ($e instanceof MyNonFatalException) { // Log the exception if appropriate // $this->logger->logException($e); return self::STATE_READY; } return self::STATE_EXCEPTION; } }
然后您可以将工作进程像其他命令一样启动
$ app/console demo-worker Hello World
您可以传递选项。
$ app/console\
--workload-timeout=60\
--workload-limit=10\
--memory-limit=128\
--exit-on-exception
此命令等待 60 秒从队列中获取工作负载,最多处理 10 个工作负载,或者在使用的内存超过 128Mb 时退出,如果 executeWorker() 抛出异常,则退出。