riverline / worker-bundle
Symfony2 Bundle,用于使用工作进程队列
2.4.0
2021-02-16 11:41 UTC
Requires
- php: >=7.1.3
- ext-zlib: *
- symfony/config: ^2.0|^3.0|^4.0
- symfony/console: ^2.0|^3.0|^4.0
- symfony/dependency-injection: ^2.0|^3.0|^4.0
- symfony/http-kernel: ^2.0|^3.0|^4.0
Requires (Dev)
- aws/aws-sdk-php: ^3.0
- phpunit/phpunit: ^4.8
- predis/predis: ^1.1
- stomp-php/stomp-php: ^4.3
Suggests
- amazonwebservices/aws-sdk-for-php: ^1.0
- aws/aws-sdk-php: ^3.0
- predis/predis: ^1.0
- stomp-php/stomp-php: ^4.0
README
Riverline\WorkerBundle 是什么
Riverline\WorkerBundle
为队列提供者添加抽象,并允许创建工作进程以消费队列工作负载。
要求
- PHP 5.3
- Symfony 2.x
安装
Riverline\WorkerBundle
与 composer 和任何 prs-0 自动加载器兼容
配置
riverline_worker: providers: predis: class: Riverline\WorkerBundle\Provider\PRedis arguments: - { host: redis.example.com } sqs: #v1 class: Riverline\WorkerBundle\Provider\AwsSQS arguments: - { key: xxxxxx, secret: xxxxx } - sqs.eu-west-1.amazonaws.com sqs: #v3 class: Riverline\WorkerBundle\Provider\AwsSQSv3 arguments: - version: "latest" region: "us-west-2" credentials: key: "xxxxxx" secret: "xxxxxx" gearman: class: Riverline\WorkerBundle\Provider\Gearman arguments: - [ gearman1.example.com, gearman2.examplet.com ] amqp: ## WIP class: Riverline\WorkerBundle\Provider\AMQP semaphore: class: Riverline\WorkerBundle\Provider\Semaphore activemq: class: Riverline\WorkerBundle\Provider\ActiveMQ arguments: - tcp://localhost:61613 - login - passcode - false # Boolean indicates if message is persistent - false # Boolean indicates if broker statistics plugin is enabled https://activemq.apache.ac.cn/statisticsplugin.html - true # Boolean indicates if sync mode is enabled queues: queue1: name: ThisIsMyQueue provider: predis queue2: name: https://eu-west-1.queue.amazonaws.com/xxxxxx/xxxx provider: sqs
使用方法
您可以通过 Symfony 容器访问任何配置好的提供者或队列
<?php $provider = $this->get('riverline_worker.provider.predis'); $provider->put('ThisIsMyQueue', 'Hello World'); $queue = $this->get('riverline_worker.queue.queue1'); echo $queue->count()." item(s) in the queue";
您可以轻松创建工作进程
<?php // src/Acme/DemoBundle/Command/DemoWorkerCommand.php namespace Acme\DemoBundle\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Riverline\WorkerBundle\Command\Worker; use Riverline\WorkerBundle\Command\WorkerControlCodes; class DemoWorkerCommand extends Worker { protected function configureWorker() { $this // Queue name from the configuration ->setQueueName('queue1') // Inhered Command methods ->setName('demo-worker') ->setDescription('Test a worker') ; } protected function executeWorker(InputInterface $input, OutputInterface $output, $workload) { $output->writeln($workload); // Stop worker and dot not process other workloads if ($someReasonToStopAndExit) { return WorkerControlCodes::STOP_EXECUTION; } // else continue return WorkerControlCodes::CAN_CONTINUE; } }
然后,您可以像执行其他命令一样启动您的 worker
$ php app/console demo-worker Hello World
您可以传递选项。
$ php app/console --worker-wait-timeout=60 --worker-limit=10 --memory-limit=128 --worker-exit-on-exception
此命令将等待 60 秒从队列中获取工作负载,最多处理 10 个工作负载或当使用内存超过 128Mb 时退出,如果 executeWorker()
抛出异常,则退出。