ulabox/gearman-bundle

此包已被废弃且不再维护。作者建议使用 mmoreram/gearman-bundle 包。

Symfony2 的 Gearman 支持。

安装量: 2,669

依赖者: 0

建议者: 0

安全性: 0

星标: 0

关注者: 16

分支: 1

开放问题: 0

类型:symfony-bundle

1.0.0 2015-09-16 14:33 UTC

This package is not auto-updated.

Last update: 2022-02-01 12:27:08 UTC


README

该包提供了 symfony2 项目和 Gearman 作业服务器之间的接口。

快速安装

在 composer.json 文件中要求该包

{
    "require": {
        "ulabox/gearman-bundle": "*",
    }
}

运行 composer update 命令

$ composer update ulabox/gearman-bundle

现在将 Bundle 添加到您的 Kernel 中

<?php
// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        // ...
        new Ulabox\Bundle\GearmanBundle\UlaboxGearmanBundle(),
        // ...
    );
}

Gearman 依赖

使用以下命令安装适用于 Debain/Ubuntu 包 的 Gearman 作业服务器

$ sudo apt-get install gearman-job-server

一旦安装了作业服务器,可以通过运行以下命令来启动

$ service gearman-job-server start

安装 Gearman 驱动

使用以下命令安装 Gearman 驱动

$ pecl install channel://pecl.php.net/gearman-0.8.3

现在我们只需要启用模块。编辑您的 /etc/php5/conf.d/gearman.ini 文件并添加以下行

extension=gearman.so

配置

默认情况下,该包不需要任何配置,但您可以通过编辑 config.yml 来配置它

ulabox_gearman:
    # Define your own servers.
    # By default is set to localhost 127.0.0.1:4730.
    # If annotations defined, will be overwritten.
    servers:
        localhost:
            host: 127.0.0.1
            port: 4730
        server2:
            host: myotherhost
            port: 4567
    # Define the default method to execute a task.
    # By default is set to doBackgroundJob.
    # Available methods (doBackgroundJob, doHighJob, doHighBackgroundJob, doLowJob, doLowBackgroundJob, doNormalJob,
    # addTask, addTaskBackground, addTaskHigh, addTaskHighBackground, addTaskLow, addTaskLowBackground, runTasks).
    default_method: doLowBackgroundJob

    # Define the default number of executions before job dies.
    # By default is set to 100.
    # If annotations defined, will be overwritten.
    iterations: 150

    # Define your workers location directory.
    # By default is set to Gearman/Worker
    worker_dir: MyWorkerDir

    # Define your clients location directory.
    # By default is set to Gearman/Client
    client_dir: MyClientDir

    # Define the way the included event worker triggers the events
    # cli for getting the event dispatcher, request for doing a curl 
    # against the configured request_context URL
    # By default is set to request
    use_via: [cli|request]

编写简单工作器

对于位于 src/Acme/DemoBundle 的包,默认情况下,工作器类应该位于 src/Acme/DemoBundle/Gearman/Worker 中。我们可以创建一个简单的工作器,如下所示

<?php

namespace Acme\DemoBundle\Gearman\Worker;

use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;

/**
 * The worker.
 *
 * @Worker()
 */
class AcmeWorker extends ContainerAwareWorker
{
    /**
     * The hello world job.
     *
     * @param \GearmanJob $job The GearmanJob instance
     * @return boolean
     * @Job()
     */
    public function hello_world(\GearmanJob $job)
    {
        echo "Received hello world job: " . $job->handle() . "\n";

        $workload     = $job->workload();
        $workloadSize = $job->workloadSize();

        echo "Workload: $workload ($workloadSize)\n";

        // This status loop is not needed, just to show how work
        for ($i = 0; $i < $workloadSize; $i ++) {
            echo "Sending status: " . $i . "/$workloadSize completed\n";

            $job->sendStatus($i, $workloadSize);
            sleep(1);
        }

        echo "Result: '".$workload."' loaded\n";

        return true;
    }

    /**
     * This job is never call, because is not marked with @Job annotation.
     *
     * @param  \GearmanJob $job The GearmanJob instance
     * @return boolean
     */
    public function never_call_job(\GearmanJob $job)
    {
        ...
    }
}

注意工作器类如何用 @Worker() 注解标记,以及每个被视为作业的方法也用 @Job() 注解标记。

执行作业

一旦编写了作业,可以通过两种方式运行

通过代码
// get the gearman manager
$gearmanManager = $this->container->get('ulabox_gearman.manager');

// get the generic gearman client
$client = $gearmanManager->getClient('UlaboxGearmanBundle:GearmanClient');

// find your worker
$worker = $gearmanManager->getWorker('AcmeDemoBundle:AcmeWorker');

// now you should tell the client that worker must be run
$client->setWorker($worker);

// and finally do the job
$client->doNormalJob('hello_world', json_encode(array('foo' => 'bar')));

// do the job in backgroud
//$client->doBackgroundJob('hello_world', json_encode(array('foo' => 'bar')));

要查看结果,请打开控制台并运行以下命令

$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker

然后运行上面的代码。

通过命令行

打开第一个控制台并运行

$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker

现在打开另一个控制台并运行

$ php app/console gearman:client:execute --client=UlaboxGearmanBundle:GearmanClient:hello_world --worker=AcmeDemoBundle:AcmeWorker --params="{\"foo\": \"bar\" }"

命令行有一些选项,你可以在命令行部分查看更多详细信息。

注解

工作器注解
  • servers: 服务器数组
  • iterations: 作业死亡前的执行次数
<?php

namespace Acme\DemoBundle\Gearman\Worker;

use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;

/**
 * The worker.
 *
 * @Worker(servers={"127.0.0.1:4730"}, iterations=10)
 */
class AcmeWorker extends ContainerAwareWorker
{
    ....
}
客户端注解
  • worker: 工作器名称
  • servers: 服务器数组
<?php

namespace Acme\DemoBundle\Gearman\Client;

use Ulabox\Bundle\GearmanBundle\Annotation\Client;
use Ulabox\Bundle\GearmanBundle\Model\Client as BaseClient;

/**
 * The client
 *
 * @Client(worker="MyWorker", servers={"127.0.0.1:4730"})
 */
class AcmeClient extends BaseClient
    ....
}
作业注解
  • name: 作业名称
<?php

namespace Acme\DemoBundle\Gearman\Worker;

use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;

/**
 * The worker.
 *
 * @Worker(servers={"127.0.0.1:4730"}, iterations=10)
 */
class AcmeWorker extends ContainerAwareWorker
{
    /**
     * The hello world job.
     *
     * @param \GearmanJob $job The GearmanJob instance
     * @return boolean
     *
     * @Job(name="acme_hello_world")
     */
    public function hello_world(\GearmanJob $job)
    {
        ....
    }
}

编写简单客户端

对于位于src/Acme/DemoBundle的包,客户端类默认应位于src/Acme/DemoBundle/Gearman/Client中。客户端类默认与同名的Worker类关联,例如,AcmeClient将与AcmeWorker关联。

<?php

namespace Acme\DemoBundle\Gearman\Client;

use Ulabox\Bundle\GearmanBundle\Annotation\Client;
use Ulabox\Bundle\GearmanBundle\Model\Client as BaseClient;

/**
 * The client
 *
 * @Client()
 */
class AcmeClient extends BaseClient
{
    public function hello_world_status($status)
    {
        print_r("AcmeClient::Status ".$status."\n");
    }

    public function hello_world_data($task)
    {
        print_r("AcmeClient::Data: ".$task->data()."\n");
    }

    public function hello_world_fail($task)
    {
        print_r("AcmeClient::Failed: ".$task->jobHandle()."\n");
    }

    public function hello_world_success($result)
    {
        print_r("AcmeClient::Success: ".$result);
    }
}

注意客户端类有一个回调方法,用于在Worker执行任务时通知,但只有在使用doNormalJob方法执行任务时才有意义。

通过客户端执行任务

一旦编写了作业,可以通过两种方式运行

通过代码
// get the gearman manager
$gearmanManager = $this->container->get('ulabox_gearman.manager');

// get the acme client
$client = $gearmanManager->getClient('AcmeDemoBundle:AcmeClient');

// and finally do the job
$client->doNormalJob('hello_world', json_encode(array('foo' => 'bar')));

// do the job in backgroud
//$client->doBackgroundJob('hello_world', json_encode(array('foo' => 'bar')));

要查看结果,请打开控制台并运行以下命令

$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker

然后运行上面的代码。

通过命令行

打开第一个控制台并运行

$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker

现在打开另一个控制台并运行

$ php app/console gearman:client:execute --client=AcmeDemoBundle:AcmeClient:hello_world  --params="{\"foo\": \"bar\" }"

运行任务

将任务添加到与其他任务并行运行非常简单。向之前的Worker添加一个新任务

<?php

namespace Acme\DemoBundle\Gearman\Worker;

use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;

/**
 * The worker.
 *
 * @Worker(servers={"127.0.0.1:4730"}, iterations=10)
 */
class AcmeWorker extends ContainerAwareWorker
{
    /**
     * Execute a job.
     *
     * @param \GearmanJob $job The GearmanJob instance
     *
     * @return boolean
     *
     * @Job()
     */
    public function hello_world(\GearmanJob $job)
    {
        echo "Received hello world job: " . $job->handle() . "\n";

        $workload     = $job->workload();
        $workloadSize = $job->workloadSize();

        echo "Workload: $workload ($workloadSize)\n";

        // This status loop is not needed, just to show how work
        for ($i = 0; $i < $workloadSize; $i ++) {
            echo "Sending status: " . $i . "/$workloadSize completed\n";

            $job->sendStatus($i, $workloadSize);
            sleep(1);
        }

        echo "Result: '".$workload."' loaded\n";

        return true;
    }

    /**
     * Execute a job.
     *
     * @param \GearmanJob $job The GearmanJob instance
     *
     * @return boolean
     *
     * @Job()
     */
    public function send_newsletter(\GearmanJob $job)
    {
        $users = json_decode($job->workload(), true);

        foreach ($users as $name => $email) {
            echo "Be sent an email to $name\n";
            // send the email
            echo "The email have been send to $email\n\n";
            sleep(1);
        }

        echo count($users)." mails have been sent \n";

        return true;
    }
}

一旦编写了任务,可以按照以下方式运行

// get the gearman manager
$gearmanManager = $this->container->get('ulabox_gearman.manager');

// get the acme client
$client = $gearmanManager->getClient('AcmeDemoBundle:AcmeClient');

// add multiple background tasks
$client->addTaskBackground('hello_world', json_encode(array('foo' => 'bar')));
$client->addTaskBackground('send_newsletter', json_encode(array('Ivan' => 'ivannis.suarez@gmail.com', 'Ulabox' => 'info@ulabox.com')));

// run tasks
$client->runTasks();

要查看结果,请打开控制台并运行以下命令

$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker

然后运行上面的代码。

命令

显示所有注册的Worker

$ php app/console gearman:worker:list

执行一个任务

$ php app/console gearman:client:execute --help

执行一个Worker

$ php app/console gearman:worker:execute --help

异步事件

利用Gearman Job Server的潜力,我们引入了一个新的EventDispatcherAsync,它取代了默认的symfony EventDispatcher。新的事件派发器有一个名为dispatchAsync的新方法,该方法将事件发送到gearman队列,gearman处理它并将其发送回PHP应用程序,应用程序最终重建事件并异步运行它。

配置

要使用异步事件,首先要做的是导入gearman包的routing.yml文件。

# app/config/routing.yml
gearman_bundle:
    resource: @UlaboxGearmanBundle/Resources/config/routing.yml
    prefix:   /_gearman

然后在您的security.yml文件中的访问控制中添加路由。

# app/config/security.yml
security:
    # ...
    access_control:
        - { path: ^/_gearman*, roles: IS_AUTHENTICATED_ANONYMOUSLY, ips: [127.0.0.1, ::1] }

在您的config.yml中启用异步事件派发器。

# app/config/config.yml
ulabox_gearman:
    enable_asynchronous_event_dispatcher: true

EventWorker负责处理事件并将其发送回应用程序。此Worker需要从命令行生成一个URL,这要求我们全局设置请求上下文。

# app/config/parameters.yml
parameters:
    router.request_context.host: example.org
    router.request_context.scheme: https
    router.request_context.base_url: my/path

现在,一切配置就绪后,下一步是在命令行中运行EventWorker

$ php app/console gearman:worker:execute --worker=UlaboxGearmanBundle:EventWorker

EventWorker应始终运行,因此我们建议使用如supervisor之类的工具确保Worker始终运行。

编写异步事件

异步事件是一个简单的事件,唯一的区别是必须实现Ulabox\Bundle\GearmanBundle\Dispatcher\AsyncEventInterface接口。这里我们展示了一个小例子

<?php

namespace Acme\DemoBundle\Event;

use Ulabox\Bundle\GearmanBundle\Dispatcher\AsyncEventInterface;
use Symfony\Component\EventDispatcher\Event;
use Acme\DemoBundle\Entity\User;

/**
 * The custom async event.
 */
class FooEvent extends Event implements AsyncEventInterface
{
    /**
     * The user entity
     *
     * @var User
     */
    protected $user;

    /**
     * Construstor.
     *
     * @param User $user The user instance
     */
    public function __construct(User $user)
    {
        $this->user = $user;
    }

    /**
     * {@inheritdoc}
     */
    public function getUser()
    {
        return $this->user;
    }

    /**
     * {@inheritdoc}
     */
    public function getArguments()
    {
        return array(
            'user' => $this->user
        );
    }

    /**
     * {@inheritdoc}
     */
    public function setArguments(array $args = array())
    {
        $this->user = $args['user'];
    }
}

注意,该类实现了AsyncEventInterface的两个方法(getArguments,setArguments),这些方法用于内部重建事件。

创建事件监听器
<?php

namespace Acme\DemoBundle\EventListener;

use Acme\DemoBundle\Event\FooEvent;

/**
 * Foo listener
 */
class FooListener
{
    /**
     * Listener on foo event
     *
     * @param FooEvent $event The event
     */
    public function onFoo(FooEvent $event)
    {
        $user = $event->getUser();
        // do something
    }
}

现在类创建完成后,只需将其注册为服务即可

# app/config/config.yml
services:
    acme.listener.foo_event:
        class: Acme\DemoBundle\EventListener\FooListener
        tags:
            - { name: kernel.event_listener, event: foo.event, method: onFoo }
派发事件

dispatchAsync()方法开始处理过程,并异步通知所有给定事件的监听器

<?php

namespace Acme\DemoBundle\Controller;

use Acme\DemoBundle\Event\FooEvent;

/**
 * Demo controller
 */
class DemoController
{
    public function someAction()
    {
        ...

        $user = $this->get('security.context')->getToken()->getUser();
        $eventDispatcher = $this->get('event_dispatcher');

        // dispatch the event
        $eventDispatcher->dispatchAsync('foo.event', new FooEvent($user));

        ...
    }
}