You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I was testing newest Airflow version 2.9.1 for compatibility with our project written using astro-sdk 1.8.0 and I found out that dataset-aware scheduling which worked fine under Airflow 2.8.4 just stopped working in new environment.
I wrote several small examples to illustrate that:
A file is loaded to database thus triggering dataset change
Table data gets copied to another table - again, triggering dataset change
Modification of simple Airflow dataset
importpendulumimportpandasaspdfromairflow.modelsimportDAGfromairflow.decoratorsimporttaskfromairflow.datasetsimportDatasetfromastroimportsqlasaqlfromastro.filesimportFilefromastro.sql.tableimportTable, Metadatatypes_table=Table(name='types', conn_id='source_db', metadata=Metadata(schema='stage'))
types_copy_table=Table(name='types_copy', conn_id='source_db', metadata=Metadata(schema='stage'))
dataset=Dataset("myscheme://myhost?table=mytable")
@taskdefprint_triggering_dataset_events(triggering_dataset_events=None):
""" Print out dataset trigger information """fordataset, event_listintriggering_dataset_events.items():
print(f'Dataset: {dataset}')
print(f'Events: {event_list}')
withDAG(
dag_id='load_file',
start_date=pendulum.today().add(days=-1),
schedule='@daily',
catchup=False,
tags=['testing']
) asdag:
""" Load file into TYPES table. This will modify `types_table` dataset and trigger corresponding DAG """aql.load_file(File(path='./dags/test.csv'), output_table=types_table)
withDAG(
dag_id='triggered_by_file_load',
start_date=pendulum.today().add(days=-1),
schedule=[types_table],
catchup=False,
tags=['testing']
) asdag:
""" This DAG is to be initiated by `types_table` dataset modifications """print_triggering_dataset_events()
withDAG(
dag_id='copy-table',
start_date=pendulum.today().add(days=-1),
schedule='@daily',
catchup=False,
tags=['testing']
) asdag:
""" Load all data from TYPES table and save into new `TYPES_COPY` table. This should modify `types_copy_table` dataset and trigger corresponding DAG """@aql.run_raw_sql(results_format='pandas_dataframe')defload_table(table: Table):
return'''select * from {{table}}'''@aql.dataframedefsave_data(data: pd.DataFrame):
returndatadata=load_table(types_table)
save_data(data, output_table=types_copy_table)
withDAG(
dag_id='triggered_by_copy_table',
start_date=pendulum.today().add(days=-1),
schedule=[types_copy_table],
catchup=False,
tags=['testing']
) asdag:
""" This DAG is to be initiated by `types_copy_table` dataset modifications """print_triggering_dataset_events()
withDAG(
dag_id='dataset_triggerer',
start_date=pendulum.today().add(days=-1),
schedule='@daily',
catchup=False,
tags=['testing']
) asdag:
""" Simply trigger `dataset` dataset changes to run corresponding DAG """@dag.task(outlets=[dataset])deftrigger_dataset_event():
print('Triggering event')
trigger_dataset_event()
withDAG(
dag_id='triggered_by_dataset',
start_date=pendulum.today().add(days=-1),
schedule=[dataset],
catchup=False,
tags=['testing']
) asdag:
""" This DAG is to be initiated by `dataset` dataset modifications """print_triggering_dataset_events()
Under Airflow 2.8.4 everything works just fine - dependend DAGs start after dataset changes:
However, under Airflow 2.9.1 only the last pair of DAGs (which are using Airflow dataset) work as expected. Ones which rely on Astro-SDK tables are not triggered at all:
No code, obviously, gets changed, I just modify base image in Dockerfile used to build the environment (FROM apache/airflow:slim-2.8.4-python3.10 to FROM apache/airflow:slim-2.9.1-python3.10).
I could not find any clue on this in Airflow logs.
Please help to solve this. Thanks!
Versions
Astro-SDK: 1.8.0
Airflow: [2.8.4, 2.9.1]
Python: 3.10
The text was updated successfully, but these errors were encountered:
Hi, I was testing newest Airflow version 2.9.1 for compatibility with our project written using astro-sdk 1.8.0 and I found out that dataset-aware scheduling which worked fine under Airflow 2.8.4 just stopped working in new environment.
I wrote several small examples to illustrate that:
Under Airflow 2.8.4 everything works just fine - dependend DAGs start after dataset changes:
However, under Airflow 2.9.1 only the last pair of DAGs (which are using Airflow dataset) work as expected. Ones which rely on Astro-SDK tables are not triggered at all:
No code, obviously, gets changed, I just modify base image in Dockerfile used to build the environment (
FROM apache/airflow:slim-2.8.4-python3.10
toFROM apache/airflow:slim-2.9.1-python3.10
).I could not find any clue on this in Airflow logs.
Please help to solve this. Thanks!
Versions
The text was updated successfully, but these errors were encountered: