bluspark/airflow-dag-run-bundle

为Airflow提供HTTP客户端,用于管理DAG运行

v0.0.4 2024-06-14 09:28 UTC

This package is auto-updated.

Last update: 2024-09-14 10:11:38 UTC


README

此包提供了一种方法,可以从Apache Airflow运行中触发新的DAG,以生成导出文件,并异步请求生成的导出文件名。

安装

使用composer

composer require bluspark/airflow-dag-run-bundle

配置

确保您的包已在您的config/bundles.php文件中启用(如果使用Symfony Flex,则会自动完成)

# config/bundles.php
return [
    ...
    Bluspark\AirflowDagRunBundle\BlusparkAirflowDagRunBundle::class => ['all' => true],
];

在您的config/packages目录中添加一个bluspark_airflow_dag_run.yaml文件,以定义以下必需参数

bluspark_airflow_dag_run:
  airflow_host: https://from-config.example.org
  airflow_dag_id: your-dag-id
  airflow_username: user
  airflow_password: !ChangeMe!

您可以通过在您的Symfony项目中运行此命令来获取有关配置参数的更多详细信息

bin/console config:dump bluspark_airflow_dag_run

最后,在您的config/packages/messenger.yaml文件中添加您选择的传输,该传输将处理由包分发的成功消息(如下所述)

framework:
  messenger:

    routing:
      'Bluspark\AirflowDagRunBundle\Scheduler\Message\DagRunMessageExecuted': my_project_transport

如果您使用的是Symfony < 6.4,则包不会使用Scheduler,而是使用通过MessengerBus分发的标准消息。
如果是这样,您必须声明所有用于传输管理的包消息

framework:
 messenger:

   routing:
     'Bluspark\AirflowDagRunBundle\Scheduler\Message\*': my_project_transport

使用

该包提供了一种桥梁服务类,您可以通过依赖注入在项目中使用。

namespace App\Controller;

use Bluspark\AirflowDagRunBundle\Contracts\Bridge\AirflowDagBridgeInterface;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Annotation\Route;

final class YourController extends AbstractController
{
    #[Route(path: '/my-export-route')]
    public function __invoke(AirflowDagBridgeInterface $airflowDagBridge): Response
    {
        // ... build your $config array with your export parameters
        $dagRunStatus = $airflowDagBridge->requestNewExportFile($config);
    }
}

导出请求的授权参数为

  • format:期望的导出文件格式(例如:“csv”,“xls”)
  • export:您的导出数据类型(例如:‘pickup’,‘producer’,...)
  • search:要应用于导出文件中包含的数据的筛选器数组
  • extra:您要使用或传递到整个导出过程的数据数组(例如,成功时通知的电子邮件)

不认为其他配置参数是有效的。
一旦在Airflow上请求了导出文件,该包就会使用一个Scheduler周期性消息,每30秒检查一次文件是否已成功创建,并具有自己的处理程序。然后,该包使用Symfony Messenger组件分发出一个包含现在可在S3上可用文件的filename属性的Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted消息。您必须在项目中实现Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted消息的处理程序,并添加您自己的逻辑。

要运行此包使用的Scheduler传输,请勿忘记运行以下命令(仅适用于Symfony版本 >= 6.4)

bin/console messenger:consume scheduler_airflow_dag_run

消息

该包实现了2个不同的消息,需要被消费

  • 一个Bluspark\AirflowDagRunBundle\Message\DagRunChecker消息,旨在由包提供的scheduler_airflow_dag_run计划(提供)消费
  • 一个Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted消息,旨在由项目中选择的Messenger传输消费

许可

本项目根据CeCILL-B许可证许可 - 有关详细信息,请参阅LICENSE文件。

赞助商

Bluspark logo

Bluspark是一个操作聚居地和城市基础设施的SaaS应用程序。它是管理您的基础设施生命周期的完整解决方案,从设计到维护。

Consoneo logo

数字SaaS平台,用于融资和管理能源改造援助