3slab/vdm-library-flysystem-transport-bundle

飞系统传输

3.0.1 2021-07-02 19:46 UTC

This package is auto-updated.

Last update: 2024-09-29 05:49:56 UTC


README

Build Status

此 symfony messenger 扩展提供了一种从 flysystem 兼容源 拉取数据的传输。

注意:它使用 flysystem 2.x

安装

composer require 3slab/vdm-library-flysystem-transport-bundle

配置参考

framework:
    messenger:
        transports:
            consumer:
                dsn: "vdm+flysystem://default.storage"
                retry_strategy:
                    max_retries: 0
                options:
                    flysystem_executor: ~ 

飞系统执行器

飞系统执行器允许您根据您的 messenger.yaml 文件中的传输定义自定义飞系统传输的行为。一些示例用例可能包括您可能想要递归列出内容或不列出。或者您想在 ack 时从源删除已处理的文件。

当声明传输时,如果您没有设置自定义的 flysystem_executor 选项,则使用默认的 DefaultFlysystemExecutor,它会对飞系统存储中的所有文件创建一个浅层列表,并在将文件元数据传递给 FlysystemMessage 实例的处理程序之前逐个下载每个文件。

您可以通过提供一个扩展 Vdm\Bundle\LibraryFlysystemTransportBundle\Executor\AbstractFlysystemExecutorVdm\Bundle\LibraryFlysystemTransportBundle\Executor\DefaultFlysystemExecutor 的类来覆盖此行为,如果您只想调整默认行为。

以下示例基于默认执行器,但在存储中执行深度列表而不是默认的浅度列表。

namespace App\Executor;

namespace Vdm\Bundle\LibraryFlysystemTransportBundle\Executor;

use League\Flysystem\FilesystemReader;
use Symfony\Component\Messenger\Envelope;
use Vdm\Bundle\LibraryFlysystemTransportBundle\Message\FlysystemMessage;

class CustomFlysystemExecutor extends DefaultFlysystemExecutor
{
    /**
     * {@inheritDoc}
     * @throws \League\Flysystem\FilesystemException
     */
    public function get(): iterable
    {
        $files = $this->listContents('/', FilesystemReader::LIST_DEEP);

        usort($files, function ($a, $b) {
            return ($a->path() < $b->path()) ? -1 : 1;
        });

        foreach ($files as $key => $file) {
            $file = $this->download($file);

            $isLast = array_key_last($files) === $key;

            $message = new FlysystemMessage($file);
            yield $this->getEnvelope($message, $isLast);
        }
    }
}

您的自定义执行器需要执行以下两个重要操作

  • 产生一个新的信封
  • 如果您想处理完最后一个消息后停止,请向产生的信封添加一个 StopAfterHandleStamp 约束(如果不这样做,messenger 工作员可能会循环并再次执行它)。这可以通过具有 $isLast 参数的 getEnvelope 方法自动完成。

注意:多亏了产生系统,您可以在执行函数中实现循环,并一次返回一个条目

注意:您可以在自定义执行器中保持状态

然后在您的项目 messenger.yaml 传输定义中引用此自定义执行器

framework:
    messenger:
        transports:
            ftp-storage-source:
                options:
                    flysystem_executor: App\Executor\CustomFlysystemExecutor