diff --git a/dagfactory/listeners/__init__.py b/dagfactory/listeners/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dagfactory/listeners/runtime_event.py b/dagfactory/listeners/runtime_event.py new file mode 100644 index 0000000..1584159 --- /dev/null +++ b/dagfactory/listeners/runtime_event.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from airflow.listeners import hookimpl +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import TaskInstanceState + +from dagfactory import telemetry + +log = LoggingMixin().log + + +class EventStatus: + SUCCESS = "success" + FAILED = "failed" + + +class EventType: + DAG_RUN = "dag_run" + TASK_INSTANCE = "task_instance" + + +@hookimpl +def on_dag_run_success(dag_run: DagRun, msg: str): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.DAG_RUN, additional_telemetry_metrics) + + +@hookimpl +def on_dag_run_failed(dag_run: DagRun, msg: str): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.DAG_RUN, additional_telemetry_metrics) + + +@hookimpl +def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session): + additional_telemetry_metrics = { + "dag_hash": task_instance.dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "operator": task_instance.operator, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.TASK_INSTANCE, additional_telemetry_metrics) + + +@hookimpl +def on_task_instance_failed( + previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session +): + additional_telemetry_metrics = { + "dag_hash": task_instance.dag_run.dag_hash, + "status": EventStatus.FAILED, + "operator": task_instance.operator, + } + + telemetry.emit_usage_metrics_if_enabled(EventType.TASK_INSTANCE, additional_telemetry_metrics) diff --git a/dagfactory/plugin/__init__.py b/dagfactory/plugin/__init__.py new file mode 100644 index 0000000..04dc4a6 --- /dev/null +++ b/dagfactory/plugin/__init__.py @@ -0,0 +1,11 @@ +from airflow.plugins_manager import AirflowPlugin + +from dagfactory.listeners import runtime_event + + +class DagFactoryPlugin(AirflowPlugin): + name = "Listener Plugin" + listeners = [runtime_event] + + +dagfactory_plugin = DagFactoryPlugin() diff --git a/dev/dags/example_dag_factory.yml b/dev/dags/example_dag_factory.yml index e962854..f0c43ea 100644 --- a/dev/dags/example_dag_factory.yml +++ b/dev/dags/example_dag_factory.yml @@ -1,8 +1,8 @@ default: default_args: + catchup: false, owner: "default_owner" - start_date: 2018-03-01 - end_date: 2018-03-05 + start_date: 2024-11-11 retries: 1 retry_delay_sec: 300 on_success_callback_name: print_hello_from_callback diff --git a/pyproject.toml b/pyproject.toml index bb2a1e0..cd5b87f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,9 @@ Source = "https://github.com/astronomer/dag-factory" [tool.hatch.version] path = "dagfactory/__init__.py" +[project.entry-points."airflow.plugins"] +dagfactory = "dagfactory.plugin:DagFactoryPlugin" + [tool.hatch.build] sources = ["."]