bluspark / airflow-dag-run-bundle
为Airflow提供HTTP客户端,用于管理DAG运行
Requires
- php: >=8.1
- symfony/config: ^6.3
- symfony/dependency-injection: ^6.3
- symfony/event-dispatcher: ^6.3
- symfony/http-client: ^6.3
- symfony/http-kernel: ^6.3
- symfony/messenger: ^6.3
- symfony/scheduler: ^6.3
Requires (Dev)
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^10.5
README
此包提供了一种方法,可以从Apache Airflow运行中触发新的DAG,以生成导出文件,并异步请求生成的导出文件名。
安装
使用composer
composer require bluspark/airflow-dag-run-bundle
配置
确保您的包已在您的config/bundles.php
文件中启用(如果使用Symfony Flex,则会自动完成)
# config/bundles.php return [ ... Bluspark\AirflowDagRunBundle\BlusparkAirflowDagRunBundle::class => ['all' => true], ];
在您的config/packages
目录中添加一个bluspark_airflow_dag_run.yaml
文件,以定义以下必需参数
bluspark_airflow_dag_run: airflow_host: https://from-config.example.org airflow_dag_id: your-dag-id airflow_username: user airflow_password: !ChangeMe!
您可以通过在您的Symfony项目中运行此命令来获取有关配置参数的更多详细信息
bin/console config:dump bluspark_airflow_dag_run
最后,在您的config/packages/messenger.yaml
文件中添加您选择的传输,该传输将处理由包分发的成功消息(如下所述)
framework: messenger: routing: 'Bluspark\AirflowDagRunBundle\Scheduler\Message\DagRunMessageExecuted': my_project_transport
如果您使用的是Symfony < 6.4,则包不会使用
Scheduler
,而是使用通过MessengerBus
分发的标准消息。
如果是这样,您必须声明所有用于传输管理的包消息framework: messenger: routing: 'Bluspark\AirflowDagRunBundle\Scheduler\Message\*': my_project_transport
使用
该包提供了一种桥梁服务类,您可以通过依赖注入在项目中使用。
namespace App\Controller; use Bluspark\AirflowDagRunBundle\Contracts\Bridge\AirflowDagBridgeInterface; use Symfony\Bundle\FrameworkBundle\Controller\AbstractController; use Symfony\Component\HttpFoundation\Response; use Symfony\Component\Routing\Annotation\Route; final class YourController extends AbstractController { #[Route(path: '/my-export-route')] public function __invoke(AirflowDagBridgeInterface $airflowDagBridge): Response { // ... build your $config array with your export parameters $dagRunStatus = $airflowDagBridge->requestNewExportFile($config); } }
导出请求的授权参数为
format
:期望的导出文件格式(例如:“csv”,“xls”)export
:您的导出数据类型(例如:‘pickup’,‘producer’,...)search
:要应用于导出文件中包含的数据的筛选器数组extra
:您要使用或传递到整个导出过程的数据数组(例如,成功时通知的电子邮件)
不认为其他配置参数是有效的。
一旦在Airflow上请求了导出文件,该包就会使用一个Scheduler周期性消息,每30秒检查一次文件是否已成功创建,并具有自己的处理程序。然后,该包使用Symfony Messenger组件分发出一个包含现在可在S3上可用文件的filename
属性的Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted
消息。您必须在项目中实现Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted
消息的处理程序,并添加您自己的逻辑。
要运行此包使用的Scheduler传输,请勿忘记运行以下命令(仅适用于Symfony版本 >= 6.4)
bin/console messenger:consume scheduler_airflow_dag_run
消息
该包实现了2个不同的消息,需要被消费
- 一个
Bluspark\AirflowDagRunBundle\Message\DagRunChecker
消息,旨在由包提供的scheduler_airflow_dag_run
计划(提供)消费 - 一个
Bluspark\AirflowDagRunBundle\Message\DagRunMessageExecuted
消息,旨在由项目中选择的Messenger传输消费
许可
本项目根据CeCILL-B许可证许可 - 有关详细信息,请参阅LICENSE文件。
赞助商
Bluspark是一个操作聚居地和城市基础设施的SaaS应用程序。它是管理您的基础设施生命周期的完整解决方案,从设计到维护。
数字SaaS平台,用于融资和管理能源改造援助