ulabox / gearman-bundle
Requires
- php: >=5.3.3
- doctrine/common: ~2.2
- jms/metadata: >=0.11
- jms/serializer-bundle: 0.12.*@dev
- symfony/framework-bundle: ~2.3
Requires (Dev)
- phpspec/php-diff: dev-master
- phpspec/phpspec2: dev-develop
This package is not auto-updated.
Last update: 2022-02-01 12:27:08 UTC
README
该包提供了 symfony2 项目和 Gearman 作业服务器之间的接口。
快速安装
在 composer.json 文件中要求该包
{
"require": {
"ulabox/gearman-bundle": "*",
}
}
运行 composer update 命令
$ composer update ulabox/gearman-bundle
现在将 Bundle 添加到您的 Kernel 中
<?php
// app/AppKernel.php
public function registerBundles()
{
$bundles = array(
// ...
new Ulabox\Bundle\GearmanBundle\UlaboxGearmanBundle(),
// ...
);
}
Gearman 依赖
使用以下命令安装适用于 Debain/Ubuntu 包 的 Gearman 作业服务器
$ sudo apt-get install gearman-job-server
一旦安装了作业服务器,可以通过运行以下命令来启动
$ service gearman-job-server start
安装 Gearman 驱动
使用以下命令安装 Gearman 驱动
$ pecl install channel://pecl.php.net/gearman-0.8.3
现在我们只需要启用模块。编辑您的 /etc/php5/conf.d/gearman.ini 文件并添加以下行
extension=gearman.so
配置
默认情况下,该包不需要任何配置,但您可以通过编辑 config.yml 来配置它
ulabox_gearman:
# Define your own servers.
# By default is set to localhost 127.0.0.1:4730.
# If annotations defined, will be overwritten.
servers:
localhost:
host: 127.0.0.1
port: 4730
server2:
host: myotherhost
port: 4567
# Define the default method to execute a task.
# By default is set to doBackgroundJob.
# Available methods (doBackgroundJob, doHighJob, doHighBackgroundJob, doLowJob, doLowBackgroundJob, doNormalJob,
# addTask, addTaskBackground, addTaskHigh, addTaskHighBackground, addTaskLow, addTaskLowBackground, runTasks).
default_method: doLowBackgroundJob
# Define the default number of executions before job dies.
# By default is set to 100.
# If annotations defined, will be overwritten.
iterations: 150
# Define your workers location directory.
# By default is set to Gearman/Worker
worker_dir: MyWorkerDir
# Define your clients location directory.
# By default is set to Gearman/Client
client_dir: MyClientDir
# Define the way the included event worker triggers the events
# cli for getting the event dispatcher, request for doing a curl
# against the configured request_context URL
# By default is set to request
use_via: [cli|request]
编写简单工作器
对于位于 src/Acme/DemoBundle 的包,默认情况下,工作器类应该位于 src/Acme/DemoBundle/Gearman/Worker 中。我们可以创建一个简单的工作器,如下所示
<?php
namespace Acme\DemoBundle\Gearman\Worker;
use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;
/**
* The worker.
*
* @Worker()
*/
class AcmeWorker extends ContainerAwareWorker
{
/**
* The hello world job.
*
* @param \GearmanJob $job The GearmanJob instance
* @return boolean
* @Job()
*/
public function hello_world(\GearmanJob $job)
{
echo "Received hello world job: " . $job->handle() . "\n";
$workload = $job->workload();
$workloadSize = $job->workloadSize();
echo "Workload: $workload ($workloadSize)\n";
// This status loop is not needed, just to show how work
for ($i = 0; $i < $workloadSize; $i ++) {
echo "Sending status: " . $i . "/$workloadSize completed\n";
$job->sendStatus($i, $workloadSize);
sleep(1);
}
echo "Result: '".$workload."' loaded\n";
return true;
}
/**
* This job is never call, because is not marked with @Job annotation.
*
* @param \GearmanJob $job The GearmanJob instance
* @return boolean
*/
public function never_call_job(\GearmanJob $job)
{
...
}
}
注意工作器类如何用 @Worker() 注解标记,以及每个被视为作业的方法也用 @Job() 注解标记。
执行作业
一旦编写了作业,可以通过两种方式运行
通过代码
// get the gearman manager
$gearmanManager = $this->container->get('ulabox_gearman.manager');
// get the generic gearman client
$client = $gearmanManager->getClient('UlaboxGearmanBundle:GearmanClient');
// find your worker
$worker = $gearmanManager->getWorker('AcmeDemoBundle:AcmeWorker');
// now you should tell the client that worker must be run
$client->setWorker($worker);
// and finally do the job
$client->doNormalJob('hello_world', json_encode(array('foo' => 'bar')));
// do the job in backgroud
//$client->doBackgroundJob('hello_world', json_encode(array('foo' => 'bar')));
要查看结果,请打开控制台并运行以下命令
$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker
然后运行上面的代码。
通过命令行
打开第一个控制台并运行
$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker
现在打开另一个控制台并运行
$ php app/console gearman:client:execute --client=UlaboxGearmanBundle:GearmanClient:hello_world --worker=AcmeDemoBundle:AcmeWorker --params="{\"foo\": \"bar\" }"
命令行有一些选项,你可以在命令行部分查看更多详细信息。
注解
工作器注解
- servers: 服务器数组
- iterations: 作业死亡前的执行次数
<?php
namespace Acme\DemoBundle\Gearman\Worker;
use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;
/**
* The worker.
*
* @Worker(servers={"127.0.0.1:4730"}, iterations=10)
*/
class AcmeWorker extends ContainerAwareWorker
{
....
}
客户端注解
- worker: 工作器名称
- servers: 服务器数组
<?php
namespace Acme\DemoBundle\Gearman\Client;
use Ulabox\Bundle\GearmanBundle\Annotation\Client;
use Ulabox\Bundle\GearmanBundle\Model\Client as BaseClient;
/**
* The client
*
* @Client(worker="MyWorker", servers={"127.0.0.1:4730"})
*/
class AcmeClient extends BaseClient
....
}
作业注解
- name: 作业名称
<?php
namespace Acme\DemoBundle\Gearman\Worker;
use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;
/**
* The worker.
*
* @Worker(servers={"127.0.0.1:4730"}, iterations=10)
*/
class AcmeWorker extends ContainerAwareWorker
{
/**
* The hello world job.
*
* @param \GearmanJob $job The GearmanJob instance
* @return boolean
*
* @Job(name="acme_hello_world")
*/
public function hello_world(\GearmanJob $job)
{
....
}
}
编写简单客户端
对于位于src/Acme/DemoBundle的包,客户端类默认应位于src/Acme/DemoBundle/Gearman/Client中。客户端类默认与同名的Worker类关联,例如,AcmeClient将与AcmeWorker关联。
<?php
namespace Acme\DemoBundle\Gearman\Client;
use Ulabox\Bundle\GearmanBundle\Annotation\Client;
use Ulabox\Bundle\GearmanBundle\Model\Client as BaseClient;
/**
* The client
*
* @Client()
*/
class AcmeClient extends BaseClient
{
public function hello_world_status($status)
{
print_r("AcmeClient::Status ".$status."\n");
}
public function hello_world_data($task)
{
print_r("AcmeClient::Data: ".$task->data()."\n");
}
public function hello_world_fail($task)
{
print_r("AcmeClient::Failed: ".$task->jobHandle()."\n");
}
public function hello_world_success($result)
{
print_r("AcmeClient::Success: ".$result);
}
}
注意客户端类有一个回调方法,用于在Worker执行任务时通知,但只有在使用doNormalJob方法执行任务时才有意义。
通过客户端执行任务
一旦编写了作业,可以通过两种方式运行
通过代码
// get the gearman manager
$gearmanManager = $this->container->get('ulabox_gearman.manager');
// get the acme client
$client = $gearmanManager->getClient('AcmeDemoBundle:AcmeClient');
// and finally do the job
$client->doNormalJob('hello_world', json_encode(array('foo' => 'bar')));
// do the job in backgroud
//$client->doBackgroundJob('hello_world', json_encode(array('foo' => 'bar')));
要查看结果,请打开控制台并运行以下命令
$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker
然后运行上面的代码。
通过命令行
打开第一个控制台并运行
$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker
现在打开另一个控制台并运行
$ php app/console gearman:client:execute --client=AcmeDemoBundle:AcmeClient:hello_world --params="{\"foo\": \"bar\" }"
运行任务
将任务添加到与其他任务并行运行非常简单。向之前的Worker添加一个新任务
<?php
namespace Acme\DemoBundle\Gearman\Worker;
use Ulabox\Bundle\GearmanBundle\Model\ContainerAwareWorker;
use Ulabox\Bundle\GearmanBundle\Annotation\Worker;
use Ulabox\Bundle\GearmanBundle\Annotation\Job;
/**
* The worker.
*
* @Worker(servers={"127.0.0.1:4730"}, iterations=10)
*/
class AcmeWorker extends ContainerAwareWorker
{
/**
* Execute a job.
*
* @param \GearmanJob $job The GearmanJob instance
*
* @return boolean
*
* @Job()
*/
public function hello_world(\GearmanJob $job)
{
echo "Received hello world job: " . $job->handle() . "\n";
$workload = $job->workload();
$workloadSize = $job->workloadSize();
echo "Workload: $workload ($workloadSize)\n";
// This status loop is not needed, just to show how work
for ($i = 0; $i < $workloadSize; $i ++) {
echo "Sending status: " . $i . "/$workloadSize completed\n";
$job->sendStatus($i, $workloadSize);
sleep(1);
}
echo "Result: '".$workload."' loaded\n";
return true;
}
/**
* Execute a job.
*
* @param \GearmanJob $job The GearmanJob instance
*
* @return boolean
*
* @Job()
*/
public function send_newsletter(\GearmanJob $job)
{
$users = json_decode($job->workload(), true);
foreach ($users as $name => $email) {
echo "Be sent an email to $name\n";
// send the email
echo "The email have been send to $email\n\n";
sleep(1);
}
echo count($users)." mails have been sent \n";
return true;
}
}
一旦编写了任务,可以按照以下方式运行
// get the gearman manager
$gearmanManager = $this->container->get('ulabox_gearman.manager');
// get the acme client
$client = $gearmanManager->getClient('AcmeDemoBundle:AcmeClient');
// add multiple background tasks
$client->addTaskBackground('hello_world', json_encode(array('foo' => 'bar')));
$client->addTaskBackground('send_newsletter', json_encode(array('Ivan' => 'ivannis.suarez@gmail.com', 'Ulabox' => 'info@ulabox.com')));
// run tasks
$client->runTasks();
要查看结果,请打开控制台并运行以下命令
$ php app/console gearman:worker:execute --worker=AcmeDemoBundle:AcmeWorker
然后运行上面的代码。
命令
显示所有注册的Worker
$ php app/console gearman:worker:list
执行一个任务
$ php app/console gearman:client:execute --help
执行一个Worker
$ php app/console gearman:worker:execute --help
异步事件
利用Gearman Job Server的潜力,我们引入了一个新的EventDispatcherAsync,它取代了默认的symfony EventDispatcher。新的事件派发器有一个名为dispatchAsync的新方法,该方法将事件发送到gearman队列,gearman处理它并将其发送回PHP应用程序,应用程序最终重建事件并异步运行它。
配置
要使用异步事件,首先要做的是导入gearman包的routing.yml文件。
# app/config/routing.yml
gearman_bundle:
resource: @UlaboxGearmanBundle/Resources/config/routing.yml
prefix: /_gearman
然后在您的security.yml文件中的访问控制中添加路由。
# app/config/security.yml
security:
# ...
access_control:
- { path: ^/_gearman*, roles: IS_AUTHENTICATED_ANONYMOUSLY, ips: [127.0.0.1, ::1] }
在您的config.yml中启用异步事件派发器。
# app/config/config.yml
ulabox_gearman:
enable_asynchronous_event_dispatcher: true
EventWorker负责处理事件并将其发送回应用程序。此Worker需要从命令行生成一个URL,这要求我们全局设置请求上下文。
# app/config/parameters.yml
parameters:
router.request_context.host: example.org
router.request_context.scheme: https
router.request_context.base_url: my/path
现在,一切配置就绪后,下一步是在命令行中运行EventWorker
$ php app/console gearman:worker:execute --worker=UlaboxGearmanBundle:EventWorker
EventWorker应始终运行,因此我们建议使用如supervisor之类的工具确保Worker始终运行。
编写异步事件
异步事件是一个简单的事件,唯一的区别是必须实现Ulabox\Bundle\GearmanBundle\Dispatcher\AsyncEventInterface接口。这里我们展示了一个小例子
<?php
namespace Acme\DemoBundle\Event;
use Ulabox\Bundle\GearmanBundle\Dispatcher\AsyncEventInterface;
use Symfony\Component\EventDispatcher\Event;
use Acme\DemoBundle\Entity\User;
/**
* The custom async event.
*/
class FooEvent extends Event implements AsyncEventInterface
{
/**
* The user entity
*
* @var User
*/
protected $user;
/**
* Construstor.
*
* @param User $user The user instance
*/
public function __construct(User $user)
{
$this->user = $user;
}
/**
* {@inheritdoc}
*/
public function getUser()
{
return $this->user;
}
/**
* {@inheritdoc}
*/
public function getArguments()
{
return array(
'user' => $this->user
);
}
/**
* {@inheritdoc}
*/
public function setArguments(array $args = array())
{
$this->user = $args['user'];
}
}
注意,该类实现了AsyncEventInterface的两个方法(getArguments,setArguments),这些方法用于内部重建事件。
创建事件监听器
<?php
namespace Acme\DemoBundle\EventListener;
use Acme\DemoBundle\Event\FooEvent;
/**
* Foo listener
*/
class FooListener
{
/**
* Listener on foo event
*
* @param FooEvent $event The event
*/
public function onFoo(FooEvent $event)
{
$user = $event->getUser();
// do something
}
}
现在类创建完成后,只需将其注册为服务即可
# app/config/config.yml
services:
acme.listener.foo_event:
class: Acme\DemoBundle\EventListener\FooListener
tags:
- { name: kernel.event_listener, event: foo.event, method: onFoo }
派发事件
dispatchAsync()方法开始处理过程,并异步通知所有给定事件的监听器
<?php
namespace Acme\DemoBundle\Controller;
use Acme\DemoBundle\Event\FooEvent;
/**
* Demo controller
*/
class DemoController
{
public function someAction()
{
...
$user = $this->get('security.context')->getToken()->getUser();
$eventDispatcher = $this->get('event_dispatcher');
// dispatch the event
$eventDispatcher->dispatchAsync('foo.event', new FooEvent($user));
...
}
}