riverline/worker-bundle

Symfony2 Bundle,用于使用工作进程队列

2.4.0 2021-02-16 11:41 UTC

README

Build Status

Riverline\WorkerBundle 是什么

Riverline\WorkerBundle 为队列提供者添加抽象,并允许创建工作进程以消费队列工作负载。

要求

  • PHP 5.3
  • Symfony 2.x

安装

Riverline\WorkerBundle 与 composer 和任何 prs-0 自动加载器兼容

配置

riverline_worker:
    providers:
        predis:
            class: Riverline\WorkerBundle\Provider\PRedis
            arguments:
                - { host: redis.example.com }
        sqs: #v1
            class: Riverline\WorkerBundle\Provider\AwsSQS
            arguments:
                - { key: xxxxxx, secret: xxxxx }
                - sqs.eu-west-1.amazonaws.com
        sqs: #v3
            class: Riverline\WorkerBundle\Provider\AwsSQSv3
            arguments:
                -
                    version: "latest"
                    region: "us-west-2"
                    credentials:
                        key: "xxxxxx"
                        secret: "xxxxxx"
        gearman:
            class: Riverline\WorkerBundle\Provider\Gearman
            arguments:
                - [ gearman1.example.com, gearman2.examplet.com ]
        amqp: ## WIP
            class: Riverline\WorkerBundle\Provider\AMQP
        semaphore:
            class: Riverline\WorkerBundle\Provider\Semaphore
        activemq:
            class: Riverline\WorkerBundle\Provider\ActiveMQ
            arguments:
                - tcp://localhost:61613
                - login
                - passcode
                - false        # Boolean indicates if message is persistent
                - false        # Boolean indicates if broker statistics plugin is enabled https://activemq.apache.ac.cn/statisticsplugin.html
                - true         # Boolean indicates if sync mode is enabled

    queues:
        queue1:
            name: ThisIsMyQueue
            provider: predis
        queue2:
            name: https://eu-west-1.queue.amazonaws.com/xxxxxx/xxxx
            provider: sqs

使用方法

您可以通过 Symfony 容器访问任何配置好的提供者或队列

<?php

$provider = $this->get('riverline_worker.provider.predis');
$provider->put('ThisIsMyQueue', 'Hello World');

$queue = $this->get('riverline_worker.queue.queue1');
echo $queue->count()." item(s) in the queue";

您可以轻松创建工作进程

<?php

// src/Acme/DemoBundle/Command/DemoWorkerCommand.php

namespace Acme\DemoBundle\Command;

use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Riverline\WorkerBundle\Command\Worker;
use Riverline\WorkerBundle\Command\WorkerControlCodes;

class DemoWorkerCommand extends Worker
{
    protected function configureWorker()
    {
        $this
            // Queue name from the configuration
            ->setQueueName('queue1')

            // Inhered Command methods
            ->setName('demo-worker')
            ->setDescription('Test a worker')
        ;
    }

    protected function executeWorker(InputInterface $input, OutputInterface $output, $workload)
    {
        $output->writeln($workload);

        // Stop worker and dot not process other workloads
        if ($someReasonToStopAndExit)
        {
            return WorkerControlCodes::STOP_EXECUTION;
        }

        // else continue
        return WorkerControlCodes::CAN_CONTINUE;
    }
}

然后,您可以像执行其他命令一样启动您的 worker

$ php app/console demo-worker
Hello World

您可以传递选项。

$ php app/console --worker-wait-timeout=60 --worker-limit=10 --memory-limit=128 --worker-exit-on-exception

此命令将等待 60 秒从队列中获取工作负载,最多处理 10 个工作负载或当使用内存超过 128Mb 时退出,如果 executeWorker() 抛出异常,则退出。