skydiablo/async-event-dispatcher-bundle

以异步方式处理所有事件

1.1 2017-02-22 09:44 UTC

This package is auto-updated.

Last update: 2024-09-27 16:04:16 UTC


README

AsyncEventDispatcher是一个symfony扩展包,它提供了一个简单的异步事件处理方式。以异步方式处理所有现有或新的事件。只需像平常一样定义事件处理器并在异步范围内运行即可。

A) 下载Bundle

打开您的命令行,切换到您的项目目录,并执行以下命令以下载此bundle的最新稳定版本

$ composer require skydiablo/async-event-dispatcher-bundle

此命令要求您全局安装了Composer,如Composer文档中的安装章节中所述。

B) 启用Bundle

通过在您的项目中的app/AppKernel.php文件中添加以下行来启用该bundle

// app/AppKernel.php
class AppKernel extends Kernel
{
    public function registerBundles()
    {
        $bundles = array(
            // ...
            new SkyDiablo\AsyncEventDispatcherBundle\AsyncEventDispatcherBundle(),
        );

        // ...
    }
}

C) 配置Bundle

目前只实现了一个事件队列:Amazon AWS SQS。如果您需要另一个,请随意实现它并提交一个pull request。

async_event_dispatcher:
    queue:
        awssqs:
            queue_url: http://...AWS...QUEUE...URL
            long_polling_timeout: 10 # some time in seconds...
            sqs_client: aws_sqs_client_service_name

D) 使用

  • 只需创建您的事件和监听器,如此处所述

  • 不再像这样标记您的监听器:kernel.event_listener,只需添加'.async'后缀:kernel.event_listener.async

    • 同样适用于订阅者标记:kernel.event_subscriber.async
  • 发布事件序列化器

    • 对于简单和静态事件,只需使用GenericEventSerializer。定义您的序列化器如下

        services:
            your_custom_serializer:
                class: SkyDiablo\AsyncEventDispatcherBundle\Serializer\GenericEventSerializer
                tags:
                    - { name: async_event_serializer, event: your_simple_event_name }
      
    • 对于更复杂的事件,例如具有复杂对象或doctrine实体的对象,需要一个自定义事件序列化器。您可以选择如何序列化和反序列化事件数据。创建您的序列化器并实现SkyDiablo\AsyncEventDispatcherBundle\Serializer\EventSerializerInterface接口。如上所述定义此新序列化器。

        class FooEventSerializer implements EventSerializerInterface
        {
        
            /**
             * @var FooRepository
             */
            private $fooRepository;
        
            /**
             * FooEventSerializer constructor.
             * @param FooRepository $fooRepository
             */
            public function __construct(FooRepository $fooRepository)
            {
                $this->fooRepository = $fooRepository;
            }
        
            /**
             * @param Event $event
             * @param string $eventName
             * @return QueueItemInterface
             */
            public function serialize(Event $event, $eventName)
            {
                if ($event instanceof FooEvent) {
                    return new QueueItem($eventName, $event->getFoo()->getId()); // just serialize the entity id
                }
            }
        
            /**
             * @param QueueItemInterface $queueItem
             * @return Event
             */
            public function deserialize(QueueItemInterface $queueItem)
            {
                $fooId = (int)$queueItem->getData(); // "getData()" contains the result from serialize function
                if($fooId) { // a valid number greater zero? 
                    if($foo = $this->fooRepository->getById($fooId)) { // try to load foo entity by id
                        return new FooEvent($foo); // create the event object that will be triggered async
                    }
                }
            }
        }
      

    从现在起,所有标记为异步处理的监听器/订阅者将在异步范围内运行。注意:没有序列化器的事件将被忽略,永远不会被处理!

  • 异步事件处理的核心是一个CLI命令。以循环间隔(例如cronjob-style)运行此命令

         $ php bin/console async_event_dispatcher --iterate-amount 5
    

    此命令将在本次运行中处理5个事件,默认设置为每次运行处理10个事件。

就这样!

E) 额外

在某些情况下,您可能需要事件抛出的请求的作用域。为了在异步事件处理器中使用该请求的作用域,请执行以下操作

/**
 * Inject in a propper way
 * @var \Symfony\Component\HttpFoundation\RequestStack
 */
private $requestStack;

public function someEventHandler(YourEvent $event) {
    // You can request the current request in the async event handler like this
    $currentRequest = $this->requestStack->getCurrentRequest();
}

有一些简单的解决方案

  • 在您的自定义事件中实现SkyDiablo\AsyncEventDispatcherBundle\Event\AsyncRequestScopeEventInterface接口

  • 在订阅者配置中定义请求作用域

     class ExceptionSubscriber implements EventSubscriberInterface
     {
       public static function getSubscribedEvents()
       {
           // return the subscribed events, their methods and priorities
           return array(
              KernelEvents::EXCEPTION => array(
                  array('processException', 10, true), // The third parameter "true" activates the request scope
              )
           );
       }
    
  • 在事件监听器标签内定义请求作用域

      # app/config/services.yml
      services:
          app.exception_listener:
              class: AppBundle\EventListener\ExceptionListener
              tags:
                  # set "async-request" to "true" or "false"
                  - { name: kernel.event_listener.async, event: kernel.exception, async-request: true }
    

如果您无法或不想异步处理事件,只需实现此接口

    SkyDiablo\AsyncEventDispatcherBundle\Event\AsyncEventInterface

并在isAllowAsync函数中返回"false"。然后此事件将永远不会被异步调用!