From 3c299268f8d5271fce6599ce3e6f38bbafb15ba8 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 2 Dec 2024 20:23:31 +0530 Subject: [PATCH 1/7] Render YML DAG config as DAG Docs --- dagfactory/dagbuilder.py | 13 ++++++++++++- dagfactory/dagfactory.py | 15 +++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index a685ecf..464bb58 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -115,12 +115,15 @@ class DagBuilder: in the YAML file """ - def __init__(self, dag_name: str, dag_config: Dict[str, Any], default_config: Dict[str, Any]) -> None: + def __init__( + self, dag_name: str, dag_config: Dict[str, Any], default_config: Dict[str, Any], yml_dag: str = "" + ) -> None: self.dag_name: str = dag_name self.dag_config: Dict[str, Any] = deepcopy(dag_config) self.default_config: Dict[str, Any] = deepcopy(default_config) self.tasks_count: int = 0 self.taskgroups_count: int = 0 + self._yml_dag = yml_dag # pylint: disable=too-many-branches,too-many-statements def get_dag_params(self) -> Dict[str, Any]: @@ -795,6 +798,14 @@ def build(self) -> Dict[str, Union[str, DAG]]: ) dag.doc_md = doc_md_callable(**dag_params.get("doc_md_python_arguments", {})) + # Render YML DAG in DAG Docs + if self._yml_dag: + subtitle = "## YML DAG" + if dag.doc_md is None: + dag.doc_md = subtitle + "\n```yaml\n" + self._yml_dag + else: + dag.doc_md = dag.doc_md + subtitle + "\n```yaml\n" + self._yml_dag + # tags parameter introduced in Airflow 1.10.8 if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"): dag.tags = dag_params.get("tags", None) diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index fdb7c11..43f84c7 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -33,6 +33,8 @@ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] self.dags_count: int = 0 self.tasks_count: int = 0 self.taskgroups_count: int = 0 + # self.dag_py = None + self._config_filepath = config_filepath assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" if config_filepath: DagFactory._validate_config_filepath(config_filepath=config_filepath) @@ -40,6 +42,18 @@ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] if config: self.config: Dict[str, Any] = config + @staticmethod + def _serialise_config_md(dag_name, dag_config, default_config): + if dag_config.get("task_groups") == {}: + del dag_config["task_groups"] + default_config = {"default": default_config} + default_config = yaml.dump(default_config, default_flow_style=False, allow_unicode=True, sort_keys=False) + dag_config = {dag_name: dag_config} + dag_config = yaml.dump(dag_config, default_flow_style=False, allow_unicode=True, sort_keys=False) + dag_yml = default_config + "\n" + dag_config + print(dag_yml) + return dag_yml + @staticmethod def _validate_config_filepath(config_filepath: str) -> None: """ @@ -104,6 +118,7 @@ def build_dags(self) -> Dict[str, DAG]: dag_name=dag_name, dag_config=dag_config, default_config=default_config, + yml_dag=self._serialise_config_md(dag_name, dag_config, default_config), ) try: dag: Dict[str, Union[str, DAG]] = dag_builder.build() From 996927b57fd42700e2fc63580f7c25cdc0d89c4a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 2 Dec 2024 23:24:35 +0530 Subject: [PATCH 2/7] Fix tests --- dagfactory/dagbuilder.py | 5 +++-- dagfactory/dagfactory.py | 16 +++++++++++----- tests/test_dagfactory.py | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 464bb58..4d6c6c9 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -801,10 +801,11 @@ def build(self) -> Dict[str, Union[str, DAG]]: # Render YML DAG in DAG Docs if self._yml_dag: subtitle = "## YML DAG" + if dag.doc_md is None: - dag.doc_md = subtitle + "\n```yaml\n" + self._yml_dag + dag.doc_md = f"{subtitle}\n```yaml\n{self._yml_dag}\n```" else: - dag.doc_md = dag.doc_md + subtitle + "\n```yaml\n" + self._yml_dag + dag.doc_md += f"\n\n{subtitle}\n```yaml\n{self._yml_dag}\n```" # tags parameter introduced in Airflow 1.10.8 if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"): diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 43f84c7..01a2611 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -33,7 +33,6 @@ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] self.dags_count: int = 0 self.tasks_count: int = 0 self.taskgroups_count: int = 0 - # self.dag_py = None self._config_filepath = config_filepath assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" if config_filepath: @@ -44,14 +43,21 @@ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] @staticmethod def _serialise_config_md(dag_name, dag_config, default_config): + # Remove empty task_groups if it exists if dag_config.get("task_groups") == {}: del dag_config["task_groups"] + + # Convert default_config to YAML format default_config = {"default": default_config} - default_config = yaml.dump(default_config, default_flow_style=False, allow_unicode=True, sort_keys=False) + default_config_yaml = yaml.dump(default_config, default_flow_style=False, allow_unicode=True, sort_keys=False) + + # Convert dag_config to YAML format dag_config = {dag_name: dag_config} - dag_config = yaml.dump(dag_config, default_flow_style=False, allow_unicode=True, sort_keys=False) - dag_yml = default_config + "\n" + dag_config - print(dag_yml) + dag_config_yaml = yaml.dump(dag_config, default_flow_style=False, allow_unicode=True, sort_keys=False) + + # Combine the two YAML outputs with appropriate formatting + dag_yml = default_config_yaml + "\n" + dag_config_yaml + return dag_yml @staticmethod diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 53dc78a..14e8f7c 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -339,14 +339,14 @@ def test_doc_md_file_path(): generated_doc_md = globals()["example_dag2"].doc_md with open(DOC_MD_FIXTURE_FILE, "r") as file: expected_doc_md = file.read() - assert generated_doc_md == expected_doc_md + assert expected_doc_md in generated_doc_md def test_doc_md_callable(): td = dagfactory.DagFactory(TEST_DAG_FACTORY) td.generate_dags(globals()) expected_doc_md = globals()["example_dag3"].doc_md - assert str(td.get_dag_configs()["example_dag3"]["doc_md_python_arguments"]) == expected_doc_md + assert str(td.get_dag_configs()["example_dag3"]["doc_md_python_arguments"]) in expected_doc_md def test_schedule_interval(): From 6d0466e6bb5611ed86df598d05b80dc1c2cd9301 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 2 Dec 2024 23:38:32 +0530 Subject: [PATCH 3/7] Add tests --- tests/fixtures/dag_md_docs.yml | 30 ++++++++++++++++++++++++++++++ tests/test_dagfactory.py | 10 ++++++++++ 2 files changed, 40 insertions(+) create mode 100644 tests/fixtures/dag_md_docs.yml diff --git a/tests/fixtures/dag_md_docs.yml b/tests/fixtures/dag_md_docs.yml new file mode 100644 index 0000000..7842eec --- /dev/null +++ b/tests/fixtures/dag_md_docs.yml @@ -0,0 +1,30 @@ +default: + concurrency: 1 + dagrun_timeout_sec: 600 + default_args: + end_date: 2018-03-05 + owner: default_owner + retries: 1 + retry_delay_sec: 300 + start_date: 2018-03-01 + default_view: tree + max_active_runs: 1 + orientation: LR + schedule_interval: 0 1 * * * + +example_dag2: + schedule_interval: None + tasks: + task_1: + bash_command: echo 1 + operator: airflow.operators.bash_operator.BashOperator + task_2: + bash_command: echo 2 + dependencies: + - task_1 + operator: airflow.operators.bash_operator.BashOperator + task_3: + bash_command: echo 3 + dependencies: + - task_1 + operator: airflow.operators.bash_operator.BashOperator diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 14e8f7c..a664add 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -443,3 +443,13 @@ def test_load_yaml_dags_default_suffix_succeed(caplog): dags_folder="tests/fixtures", ) assert "Loading DAGs from tests/fixtures" in caplog.messages + + +def test_yml_dag_rendering_in_docs(): + dag_path = os.path.join(here, "fixtures/dag_md_docs.yml") + td = dagfactory.DagFactory(dag_path) + td.generate_dags(globals()) + generated_doc_md = globals()["example_dag2"].doc_md + with open(dag_path, "r") as file: + expected_doc_md = "## YML DAG\n```yaml\n" + file.read() + "\n```" + assert generated_doc_md == expected_doc_md From 08c52a2c2337d5dc8c23f3b169d8cecc4ddf49c9 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 3 Dec 2024 18:13:19 +0530 Subject: [PATCH 4/7] Add a comment for task_groups injection --- dagfactory/dagfactory.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 01a2611..6f1bfea 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -44,6 +44,8 @@ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] @staticmethod def _serialise_config_md(dag_name, dag_config, default_config): # Remove empty task_groups if it exists + # We inject it if not supply by user + # https://github.com/astronomer/dag-factory/blob/e53b456d25917b746d28eecd1e896595ae0ee62b/dagfactory/dagfactory.py#L102 if dag_config.get("task_groups") == {}: del dag_config["task_groups"] From 36bb7b2ddfc30ac24f032030ca92b5f84d331b61 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 3 Dec 2024 18:13:45 +0530 Subject: [PATCH 5/7] Add a comment for task_groups injection --- dagfactory/dagfactory.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 6f1bfea..9c17162 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -33,7 +33,6 @@ def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] self.dags_count: int = 0 self.tasks_count: int = 0 self.taskgroups_count: int = 0 - self._config_filepath = config_filepath assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided" if config_filepath: DagFactory._validate_config_filepath(config_filepath=config_filepath) From 38da84200ba11e5d2be114147a1857410e82eda3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 3 Dec 2024 19:06:53 +0530 Subject: [PATCH 6/7] Add more tests --- dagfactory/dagbuilder.py | 2 +- tests/test_dagfactory.py | 41 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 4d6c6c9..bced37c 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -805,7 +805,7 @@ def build(self) -> Dict[str, Union[str, DAG]]: if dag.doc_md is None: dag.doc_md = f"{subtitle}\n```yaml\n{self._yml_dag}\n```" else: - dag.doc_md += f"\n\n{subtitle}\n```yaml\n{self._yml_dag}\n```" + dag.doc_md += f"\n{subtitle}\n```yaml\n{self._yml_dag}\n```" # tags parameter introduced in Airflow 1.10.8 if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"): diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index a664add..2a72a95 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -334,12 +334,49 @@ def test_variables_as_arguments_dag(): def test_doc_md_file_path(): + dag_config = """ +## YML DAG +```yaml +default: + concurrency: 1 + dagrun_timeout_sec: 600 + default_args: + end_date: 2018-03-05 + owner: default_owner + retries: 1 + retry_delay_sec: 300 + start_date: 2018-03-01 + default_view: tree + max_active_runs: 1 + orientation: LR + schedule_interval: 0 1 * * * + +example_dag2: + doc_md_file_path: /Users/pankaj/Documents/astro_code/dag-factory/tests/fixtures/mydocfile.md + schedule_interval: None + tasks: + task_1: + bash_command: echo 1 + operator: airflow.operators.bash_operator.BashOperator + task_2: + bash_command: echo 2 + dependencies: + - task_1 + operator: airflow.operators.bash_operator.BashOperator + task_3: + bash_command: echo 3 + dependencies: + - task_1 + operator: airflow.operators.bash_operator.BashOperator + +```""" + td = dagfactory.DagFactory(TEST_DAG_FACTORY) td.generate_dags(globals()) generated_doc_md = globals()["example_dag2"].doc_md with open(DOC_MD_FIXTURE_FILE, "r") as file: - expected_doc_md = file.read() - assert expected_doc_md in generated_doc_md + expected_doc_md = file.read() + dag_config + assert generated_doc_md == expected_doc_md def test_doc_md_callable(): From fefafbbf7d4e9627348fa2ff6ff1f539607134dd Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 3 Dec 2024 19:09:55 +0530 Subject: [PATCH 7/7] Remove hardcoded path --- tests/test_dagfactory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 2a72a95..78f96de 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -334,7 +334,7 @@ def test_variables_as_arguments_dag(): def test_doc_md_file_path(): - dag_config = """ + dag_config = f""" ## YML DAG ```yaml default: @@ -352,7 +352,7 @@ def test_doc_md_file_path(): schedule_interval: 0 1 * * * example_dag2: - doc_md_file_path: /Users/pankaj/Documents/astro_code/dag-factory/tests/fixtures/mydocfile.md + doc_md_file_path: {DOC_MD_FIXTURE_FILE} schedule_interval: None tasks: task_1: