V prúdenia vzduchu, snažím sa, aby funkcia, ktorá je určená na generovanie DAGs v súbore:
dynamic_dags.py:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
Potom v inom súbore, snažím sa importovať dags zo samostatného súboru:
load_dags.py:
from dynamic_dag import generate_dag
globals()["Dynamic_DAG_A"] = generate_dag('A')
Avšak, dags nie sú zobrazené až na web UI. Ale, keby som ich do jedného súboru, ako je uvedené nižšie kód, to bude fungovať:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
globals()["Dynamic_DAG_A"] = generate_dag('A')
Som zvedavý, prečo robí to v dvoch samostatných súborov nefunguje.