Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor telemetry to collect events during DAG run and not DAG parsing #300

Merged
merged 16 commits into from
Dec 3, 2024
7 changes: 4 additions & 3 deletions PRIVACY_NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ In addition to Scarf's default data collection, DAG Factory collects the followi
- Python version
- Operating system & machine architecture
- Event type
- Number of DAGs
- Number of TaskGroups
- Number of Tasks
- Number of failed DagRuns
- Number of successful DagRuns
- Total tasks associated to each DagRun
- Dag hash

No user-identifiable information (IP included) is stored in Scarf.
4 changes: 2 additions & 2 deletions dagfactory/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}?{query_string}"
TELEMETRY_VERSION = "v1"
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}"
TELEMETRY_VERSION = "v2"
TELEMETRY_TIMEOUT = 5.0
11 changes: 4 additions & 7 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ def __init__(
self.dag_name: str = dag_name
self.dag_config: Dict[str, Any] = deepcopy(dag_config)
self.default_config: Dict[str, Any] = deepcopy(default_config)
self.tasks_count: int = 0
self.taskgroups_count: int = 0
self._yml_dag = yml_dag

# pylint: disable=too-many-branches,too-many-statements
Expand Down Expand Up @@ -807,19 +805,18 @@ def build(self) -> Dict[str, Union[str, DAG]]:
else:
dag.doc_md += f"\n{subtitle}\n```yaml\n{self._yml_dag}\n```"

# tags parameter introduced in Airflow 1.10.8
if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"):
tatiana marked this conversation as resolved.
Show resolved Hide resolved
dag.tags = dag_params.get("tags", None)
tags = dag_params.get("tags", [])
if "dagfactory" not in tags:
tags.append("dagfactory")
dag.tags = tags

tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"]
self.tasks_count = len(tasks)

# add a property to mark this dag as an auto-generated on
dag.is_dagfactory_auto_generated = True

# create dictionary of task groups
task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag)
self.taskgroups_count = len(task_groups_dict)

# create dictionary to track tasks and set dependencies
tasks_dict: Dict[str, BaseOperator] = {}
Expand Down
22 changes: 0 additions & 22 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from airflow.configuration import conf as airflow_conf
from airflow.models import DAG

from dagfactory import telemetry
from dagfactory.dagbuilder import DagBuilder
from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException

Expand All @@ -30,9 +29,6 @@ class DagFactory:
"""

def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None:
self.dags_count: int = 0
self.tasks_count: int = 0
self.taskgroups_count: int = 0
assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided"
if config_filepath:
DagFactory._validate_config_filepath(config_filepath=config_filepath)
Expand Down Expand Up @@ -132,21 +128,9 @@ def build_dags(self) -> Dict[str, DAG]:
dags[dag["dag_id"]]: DAG = dag["dag"]
except Exception as err:
raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err
else:
self.dags_count += 1
self.taskgroups_count += dag_builder.taskgroups_count
self.tasks_count += dag_builder.tasks_count

return dags

def emit_telemetry(self, event_type: str) -> None:
additional_telemetry_metrics = {
"dags_count": self.dags_count,
"tasks_count": self.tasks_count,
"taskgroups_count": self.taskgroups_count,
}
telemetry.emit_usage_metrics_if_enabled(event_type, additional_telemetry_metrics)

# pylint: disable=redefined-builtin
@staticmethod
def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None:
Expand All @@ -168,7 +152,6 @@ def generate_dags(self, globals: Dict[str, Any]) -> None:
"""
dags: Dict[str, Any] = self.build_dags()
self.register_dags(dags, globals)
self.emit_telemetry("generate_dags")

def clean_dags(self, globals: Dict[str, Any]) -> None:
"""
Expand All @@ -192,10 +175,6 @@ def clean_dags(self, globals: Dict[str, Any]) -> None:
for dag_to_remove in dags_to_remove:
del globals[dag_to_remove]

self.emit_telemetry("clean_dags")

# pylint: enable=redefined-builtin


def load_yaml_dags(
globals_dict: Dict[str, Any],
Expand Down Expand Up @@ -229,5 +208,4 @@ def load_yaml_dags(
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_file_path)
else:
factory.emit_telemetry("load_yaml_dags")
logging.info("DAG loaded: %s", config_file_path)
Empty file.
49 changes: 49 additions & 0 deletions dagfactory/listeners/runtime_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from airflow.listeners import hookimpl
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun

from dagfactory import telemetry


class EventStatus:
SUCCESS = "success"
FAILED = "failed"


DAG_RUN = "dag_run"


def is_dagfactory_dag(dag: DAG | None = None):
if "dagfactory" in dag.tags:
return True
return False


@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
dag = dag_run.get_dag()
if not is_dagfactory_dag(dag):
return
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
tatiana marked this conversation as resolved.
Show resolved Hide resolved
"status": EventStatus.SUCCESS,
"task_count": len(dag.task_ids),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
dag = dag_run.get_dag()
if not is_dagfactory_dag(dag):
return
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
"task_count": len(dag.task_ids),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
11 changes: 11 additions & 0 deletions dagfactory/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from airflow.plugins_manager import AirflowPlugin

from dagfactory.listeners import runtime_event


class DagFactoryPlugin(AirflowPlugin):
name = "Dag Factory Plugin"
listeners = [runtime_event]


dagfactory_plugin = DagFactoryPlugin()
8 changes: 5 additions & 3 deletions dagfactory/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import platform
from urllib import parse
from urllib.parse import urlencode

import httpx
Expand All @@ -24,7 +25,7 @@ def collect_standard_usage_metrics() -> dict[str, object]:
"""
metrics = {
"dagfactory_version": dagfactory.__version__,
"airflow_version": airflow_version,
"airflow_version": parse.quote(airflow_version),
"python_version": platform.python_version(),
"platform_system": platform.system(),
"platform_machine": platform.machine(),
Expand All @@ -44,7 +45,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool:
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
if not response.is_success:
logging.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
Expand All @@ -64,8 +65,9 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str,
"""
if should_emit():
metrics = collect_standard_usage_metrics()
metrics["type"] = event_type
metrics["event_type"] = event_type
metrics["variables"].update(additional_metrics)
metrics.update(additional_metrics)
is_success = emit_usage_metrics(metrics)
return is_success
else:
Expand Down
6 changes: 3 additions & 3 deletions dev/dags/example_dag_factory.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
default:
default_args:
catchup: false,
owner: "default_owner"
start_date: 2018-03-01
end_date: 2018-03-05
start_date: 2024-11-11
retries: 1
retry_delay_sec: 300
retry_delay_sec: 30
on_success_callback_name: print_hello_from_callback
on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
concurrency: 1
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ Source = "https://github.com/astronomer/dag-factory"
[tool.hatch.version]
path = "dagfactory/__init__.py"

[project.entry-points."airflow.plugins"]
dagfactory = "dagfactory.plugin:DagFactoryPlugin"

[tool.hatch.build]
sources = ["."]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def test_build():
assert len(actual["dag"].tasks) == 3
assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_2", "task_3"}
if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"):
assert actual["dag"].tags == ["tag1", "tag2"]
assert actual["dag"].tags == ["tag1", "tag2", "dagfactory"]


def test_get_dag_params_dag_with_task_group():
Expand Down
15 changes: 0 additions & 15 deletions tests/test_dagfactory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import logging
import os
from unittest.mock import patch

import pytest
from airflow import __version__ as AIRFLOW_VERSION
Expand Down Expand Up @@ -459,20 +458,6 @@ def test_load_invalid_yaml_logs_error(caplog):
assert caplog.messages == ["Failed to load dag from tests/fixtures/invalid_yaml.yml"]


@patch("dagfactory.telemetry.emit_usage_metrics_if_enabled")
def test_load_yaml_dags_succeed(mock_emit_usage_metrics_if_enabled):
load_yaml_dags(
globals_dict=globals(),
dags_folder="tests/fixtures",
suffix=["dag_factory_variables_as_arguments.yml"],
)

# Confirm the representative telemetry for all the DAGs defined in the desired YAML is being sent
args = mock_emit_usage_metrics_if_enabled.call_args.args
assert args[0] == "load_yaml_dags"
assert args[1] == {"dags_count": 2, "tasks_count": 4, "taskgroups_count": 0}


def test_load_yaml_dags_default_suffix_succeed(caplog):
caplog.set_level(logging.INFO)
load_yaml_dags(
Expand Down
24 changes: 18 additions & 6 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,19 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
"python_version": "3.11",
"platform_system": "darwin",
"platform_machine": "amd64",
"variables": {"a": 1, "b": 2},
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"task_count": 3,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
mock_httpx_get.assert_called_once_with(
"https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D",
f"""https://astronomer.gateway.scarf.sh/dag-factory/v2/0.2.0a1/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""",
timeout=5.0,
follow_redirects=True,
)
assert not is_success
log_msg = "Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D. Status code: 404. Message: Non existent URL"
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/dag-factory/v2/0.2.0a1/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL"""
assert caplog.text.startswith("WARNING")
assert log_msg in caplog.text

Expand All @@ -74,7 +78,10 @@ def test_emit_usage_metrics_succeeds(caplog):
"python_version": "3.11",
"platform_system": "darwin",
"platform_machine": "amd64",
"variables": {"a": 1, "b": 2},
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"task_count": 3,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
assert is_success
Expand All @@ -91,11 +98,16 @@ def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog):


@patch("dagfactory.telemetry.should_emit", return_value=True)
@patch("dagfactory.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "variables": {}})
@patch("dagfactory.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "k2": "v2", "variables": {}})
@patch("dagfactory.telemetry.emit_usage_metrics")
def test_emit_usage_metrics_if_enabled_succeeds(
mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit
):
assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"})
mock_emit_usage_metrics.assert_called_once()
assert mock_emit_usage_metrics.call_args.args[0] == {"k1": "v1", "variables": {"k2": "v2"}, "type": "any"}
assert mock_emit_usage_metrics.call_args.args[0] == {
"k1": "v1",
"k2": "v2",
"event_type": "any",
"variables": {"k2": "v2"},
}
Loading