From 346b6f75ce9da91497715a2d165d71177d27a075 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 21 Nov 2024 14:32:34 +0530 Subject: [PATCH] Fix listeners --- dagfactory/listeners/__init__.py | 0 dagfactory/listeners/runtime_event.py | 65 +++++++++++++++++++++++++++ dagfactory/plugin/__init__.py | 11 +++++ dev/dags/example_dag_factory.yml | 4 +- pyproject.toml | 3 ++ 5 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 dagfactory/listeners/__init__.py create mode 100644 dagfactory/listeners/runtime_event.py create mode 100644 dagfactory/plugin/__init__.py diff --git a/dagfactory/listeners/__init__.py b/dagfactory/listeners/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py new file mode 100644 index 00000000..15841592 --- /dev/null +++ b/dagfactory/listeners/runtime_event.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from airflow.listeners import hookimpl +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import TaskInstanceState + +from dagfactory import telemetry + +log = LoggingMixin().log + + +class EventStatus: + SUCCESS = "success" + FAILED = "failed" + + +class EventType: + DAG_RUN = "dag_run" + TASK_INSTANCE = "task_instance" + + +@hookimpl +def on_dag_run_success(dag_run: DagRun, msg: str): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.DAG_RUN, additional_telemetry_metrics) + + +@hookimpl +def on_dag_run_failed(dag_run: DagRun, msg: str): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.DAG_RUN, additional_telemetry_metrics) + + +@hookimpl +def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session): + additional_telemetry_metrics = { + "dag_hash": task_instance.dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "operator": task_instance.operator, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.TASK_INSTANCE, additional_telemetry_metrics) + + +@hookimpl +def on_task_instance_failed( + previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session +): + additional_telemetry_metrics = { + "dag_hash": task_instance.dag_run.dag_hash, + "status": EventStatus.FAILED, + "operator": task_instance.operator, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.TASK_INSTANCE, additional_telemetry_metrics) diff --git a/dagfactory/plugin/__init__.py b/dagfactory/plugin/__init__.py new file mode 100644 index 00000000..04dc4a6b --- /dev/null +++ b/dagfactory/plugin/__init__.py @@ -0,0 +1,11 @@ +from airflow.plugins_manager import AirflowPlugin + +from dagfactory.listeners import runtime_event + + +class DagFactoryPlugin(AirflowPlugin): + name = "Listener Plugin" + listeners = [runtime_event] + + +dagfactory_plugin = DagFactoryPlugin() diff --git a/dev/dags/example_dag_factory.yml b/dev/dags/example_dag_factory.yml index e9628548..f0c43ea6 100644 --- a/dev/dags/example_dag_factory.yml +++ b/dev/dags/example_dag_factory.yml @@ -1,8 +1,8 @@ default: default_args: + catchup: false, owner: "default_owner" - start_date: 2018-03-01 - end_date: 2018-03-05 + start_date: 2024-11-11 retries: 1 retry_delay_sec: 300 on_success_callback_name: print_hello_from_callback diff --git a/pyproject.toml b/pyproject.toml index bb2a1e0e..cd5b87fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,9 @@ Source = "https://github.com/astronomer/dag-factory" [tool.hatch.version] path = "dagfactory/__init__.py" +[project.entry-points."airflow.plugins"] +dagfactory = "dagfactory.plugin:DagFactoryPlugin" + [tool.hatch.build] sources = ["."]