uecode/qpush-bundle

使用推送队列对 Symfony 进行异步处理

3.0.2 2018-08-13 17:01 UTC

README

Build Status Quality Score Code Coverage Total Downloads

概述

此捆绑包允许您通过简单地标记您的服务和依赖 Symfony 的事件调度器来轻松地从推送队列中消费消息,而无需运行守护进程或后台进程来持续轮询您的队列。

完整文档: qpush-bundle.readthedocs.org

安装

应通过 composer 安装此捆绑包。

composer require uecode/qpush-bundle

更新您的 Symfony 应用程序的 AppKernel.php

UecodeQPushBundle 添加到内核引导序列中的 $bundles 数组。

public function registerBundles()
{
    $bundles = array(
        // ...
        new Uecode\Bundle\QPushBundle\UecodeQPushBundle(),
    );

    return $bundles;
}

基本配置

以下是一个基本配置,它将使用 AWS 或 IronMQ 创建一个名为 my_queue_name 的推送队列。您可以在完整文档中了解支持的提供者和提供者选项。

示例
#app/config.yml

uecode_qpush:
    providers:
        ironmq:
            token:      YOUR_IRON_MQ_TOKEN_HERE
            project_id: YOUR_IRON_MQ_PROJECT_ID_HERE
        aws:
            key:    YOUR_AWS_KEY_HERE
            secret: YOUR_AWS_SECRET_HERE
            region: YOUR_AWS_REGION_HERE
    queues:
        my_queue_key:
            provider: ironmq #or aws
            options:
                queue_name: my_queue_name #optional. the queue name used on the provider
                push_notifications: true
                subscribers:
                    - { endpoint: http://example.com/qpush, protocol: http }

您可以将 aws 密钥和密钥排除在外,以便默认使用 EC2 机器上的 IAM 角色。

发布消息到您的队列

发布消息很简单 - 从容器中获取已注册的 Provider 服务,并在相应的队列上调用 publish 方法。

此捆绑包将您的消息存储为 json 对象,而发布方法期望一个数组,通常是关联数组。

示例
// src/My/Bundle/ExampleBundle/Controller/MyController.php

public function publishAction()
{
    $message = ['foo' => 'bar'];

    // fetch your provider service from the container
    $this->get('uecode_qpush')->get('my_queue_key')->publish($message);

    // you can also fetch it directly
    $this->get('uecode_qpush.my_queue_key')->publish($message);
}

处理队列中的消息

当消息击中您的应用程序时,此捆绑包将调度一个 MessageEvent,这些事件可以由您的服务处理。您需要标记您的服务来处理这些事件。

示例
services:
    my_example_service:
    	class: My\Bundle\ExampleBundle\Service\ExampleService
    	tags:
    		- { name: uecode_qpush.event_listener, event: my_queue_key.message_received, method: onMessageReceived }
示例
// src/My/Bundle/ExampleBundle/Service/ExampleService.php

use Uecode\Bundle\QPushBundle\Event\MessageEvent;

public function onMessageReceived(MessageEvent $event)
{
    $queue_name = $event->getQueueName();
    $message    = $event->getMessage();

    // do some processing
}

消息对象包含提供者特定的消息 ID、消息正文和一组提供者特定的元数据。

这些属性可以通过消息对象从简单的获取器访问。

示例
// src/My/Bundle/ExampleBundle/Service/ExampleService.php

use Uecode\Bundle\QPushBundle\Event\MessageEvent;
use Uecode\Bundle\QPushBundle\Message\Message;

public function onMessageReceived(MessageEvent $event)
{
    $id         = $event->getMessage()->getId();
    $body       = $event->getMessage()->getBody();
    $metadata   = $event->getMessage()->getMetadata();

    // do some processing
}

清理队列

一旦所有其他 MessageEvent 的事件监听器都已被调用,捆绑包将自动尝试为您从队列中删除消息。