Skip to content

Commit

Permalink
Fix listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Nov 26, 2024
1 parent 831a741 commit 346b6f7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
Empty file.
65 changes: 65 additions & 0 deletions dagfactory/listeners/runtime_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

from airflow.listeners import hookimpl
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState

from dagfactory import telemetry

log = LoggingMixin().log


class EventStatus:
SUCCESS = "success"
FAILED = "failed"


class EventType:
DAG_RUN = "dag_run"
TASK_INSTANCE = "task_instance"


@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
}

telemetry.emit_usage_metrics_if_enabled(EventType.DAG_RUN, additional_telemetry_metrics)


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
}

telemetry.emit_usage_metrics_if_enabled(EventType.DAG_RUN, additional_telemetry_metrics)


@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
additional_telemetry_metrics = {
"dag_hash": task_instance.dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"operator": task_instance.operator,
}

telemetry.emit_usage_metrics_if_enabled(EventType.TASK_INSTANCE, additional_telemetry_metrics)


@hookimpl
def on_task_instance_failed(
previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
additional_telemetry_metrics = {
"dag_hash": task_instance.dag_run.dag_hash,
"status": EventStatus.FAILED,
"operator": task_instance.operator,
}

telemetry.emit_usage_metrics_if_enabled(EventType.TASK_INSTANCE, additional_telemetry_metrics)
11 changes: 11 additions & 0 deletions dagfactory/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from airflow.plugins_manager import AirflowPlugin

from dagfactory.listeners import runtime_event


class DagFactoryPlugin(AirflowPlugin):
name = "Listener Plugin"
listeners = [runtime_event]


dagfactory_plugin = DagFactoryPlugin()
4 changes: 2 additions & 2 deletions dev/dags/example_dag_factory.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
default:
default_args:
catchup: false,
owner: "default_owner"
start_date: 2018-03-01
end_date: 2018-03-05
start_date: 2024-11-11
retries: 1
retry_delay_sec: 300
on_success_callback_name: print_hello_from_callback
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ Source = "https://github.com/astronomer/dag-factory"
[tool.hatch.version]
path = "dagfactory/__init__.py"

[project.entry-points."airflow.plugins"]
dagfactory = "dagfactory.plugin:DagFactoryPlugin"

[tool.hatch.build]
sources = ["."]

Expand Down

0 comments on commit 346b6f7

Please sign in to comment.