aligent/async-bundle

用于扩展Oro的异步处理的包

安装量: 1,098

依赖者: 0

建议者: 0

安全: 0

星标: 2

关注者: 27

分支: 2

开放问题: 1

类型:symfony-bundle


README

此包提供以下功能

  1. 一个抽象处理器类,可以捕获RetryableExceptions,并在3次(默认)重试失败后将失败的作业放入数据库中,以便进行审查、修复然后重新排队。

  2. Webhook集成

安装说明

  1. 使用Composer安装此模块

     composer require aligent/async-bundle
    
  2. 清除缓存

     php bin/console cache:clear
    
  3. 运行迁移

     php bin/console oro:migration:load --force
    

用法

确保您的类继承自AbstractRetryableProcessor,并在execute函数中执行正常的作业处理。

class TestJobProcessor extends AbstractRetryableProcessor
{
    /**
     * @param MessageInterface $message
     * @return string
     * @throws RetryableException
     */
    public function execute(MessageInterface $message)
    {
        $body = JSON::decode($message->getBody());
        
        try {
            // Process the job here
            $this->importSomeEntities($body);
        } catch (\Exception $e) {
            throw new RetryableException(
                'This job failed and needs a retry',
                0,
                $e
            );
        }
    }
}

默认情况下,AbstractRetryableProcessor将从RetryableException中获取传递的异常并存储跟踪信息。但是,如果您没有捕获异常,您可以直接使用RetryableException,它将存储此点的跟踪信息和消息。

class TestJobProcessor extends AbstractRetryableProcessor
{
    /**
     * @param MessageInterface $message
     * @return string
     * @throws RetryableException
     */
    public function execute(MessageInterface $message)
    {
        $body = JSON::decode($message->getBody());
        
        // Process the job here
        $success = $this->importSomeEntities($body);
        
        if (!$success) {
            throw new RetryableException('This job failed and needs a retry');
        }
    }
}

您也可以通过覆盖您自己的类中的const MAX_RETRIES = 3;来增加或减少重试次数。

设置Webhook传输

转到“系统 -> 集成 -> 管理集成”,然后点击“创建集成”。选择“Webhook”作为集成类型,并填写所有必填字段。

要启用,请将状态设置为“活动”。

Webhook Integration Form

完成后,您现在必须接收请求URL的webhook事件。

分发webhook事件

  1. EntityEventListener监听任何创建/更新/删除事件;检查是否存在相关的webhook传输并处于活动状态;如果是,则将消息推送到队列
Aligent\AsyncEventsBundle\EventListener\EntityEventListener:
lazy: true
arguments:
- '@Aligent\AsyncEventsBundle\Provider\WebhookConfigProvider'
- '@oro_message_queue.client.message_producer'
- '@logger'
tags:
- { name: doctrine.event_listener, event: onFlush }
- { name: doctrine.event_listener, event: postFlush }
  1. WebhookEntityProcessor构建一个有效载荷并发送一个webhook事件作为对排队消息的响应。
Aligent\AsyncEventsBundle\Async\WebhookEntityProcessor:
parent: Aligent\AsyncEventsBundle\Async\AbstractRetryableProcessor
calls:
- [setConfigProvider, ['@Aligent\AsyncEventsBundle\Provider\WebhookConfigProvider']]
- [setTransport, ['@Aligent\AsyncEventsBundle\Integration\WebhookTransport']]
- [setSerializer, ['@oro_importexport.serializer']]
tags:
- { name: 'oro_message_queue.client.message_processor' }
  1. 如果您需要丰富有效载荷数据(例如,为顾客用户添加地址),您可以使用OroImportExportBundle并添加一些自定义规范化程序来添加/转换有效载荷数据。

支持

如果您对此包有任何问题,请创建一个 GitHub问题

贡献

任何贡献都备受赞赏。贡献代码的最佳方式是在GitHub上打开一个 pull request

开发者

Adam Hall adam.hall@aligent.com.au

许可证

MIT