Skip to content

Commit

Permalink
Airflow 2.0.2 Compatibility (#109)
Browse files Browse the repository at this point in the history
* cicd: bump act versions, add codecov token

* fix: move Context import to TYPECHECKING

* fix: rm Context import, test down to 2.0.2

* fix: add compat for 2.0, bump version
  • Loading branch information
fritz-astronomer authored Sep 12, 2024
1 parent 3da0db0 commit d2dd9e8
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 10 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
with:
python-version: '3.10'
cache: 'pip'
- uses: extractions/setup-just@v1
- uses: extractions/setup-just@v2
- run: just install
- run: |
git fetch origin
Expand All @@ -29,7 +29,9 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- uses: extractions/setup-just@v1
- uses: extractions/setup-just@v2
- run: just install
- run: just test-with-coverage
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
with:
python-version: '3.10'
cache: 'pip'
- uses: extractions/setup-just@v1
- uses: extractions/setup-just@v2
- run: just build
- uses: pypa/gh-action-pypi-publish@release/v1
with:
Expand Down
2 changes: 1 addition & 1 deletion astronomer_starship/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.0.5"
__version__ = "2.0.6"


def get_provider_info():
Expand Down
28 changes: 27 additions & 1 deletion astronomer_starship/compat/starship_compatability.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,30 @@ def get_task_instances(self, dag_id: str, offset: int = 0, limit: int = 10):
raise e


class StarshipAirflow20(StarshipAirflow21):
"""
- description does not exist in variables
- queued_at not on dag_run
"""

def variable_attrs(self):
attrs = super().variable_attrs()
del attrs["description"]
return attrs

def dag_runs_attrs(self):
attrs = super().dag_runs_attrs()
if "queued_at" in attrs["dag_runs"]["test_value"][0]:
del attrs["dag_runs"]["test_value"][0]["queued_at"]
return attrs

def dag_run_attrs(self):
attrs = super().dag_run_attrs()
if "queued_at" in attrs:
del attrs["queued_at"]
return attrs


class StarshipAirflow27(StarshipAirflow):
"""
- include_deferred is required in pools
Expand Down Expand Up @@ -1078,8 +1102,10 @@ def __new__(cls, airflow_version: "Union[str, None]" = None) -> StarshipAirflow:
return StarshipAirflow27()
if int(minor) == 2:
return StarshipAirflow22()
if int(minor) <= 1:
if int(minor) == 1:
return StarshipAirflow21()
if int(minor) == 0:
return StarshipAirflow20()
return StarshipAirflow()
else:
raise RuntimeError(f"Unsupported Airflow Version: {airflow_version}")
8 changes: 4 additions & 4 deletions astronomer_starship/providers/starship/operators/starship.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow.utils.task_group import TaskGroup

from astronomer_starship.providers.starship.hooks.starship import (
StarshipLocalHook,
StarshipHttpHook,
)


# Compatability Notes:
# - @task() is >=AF2.0
# - @task_group is >=AF2.1
Expand All @@ -37,7 +37,7 @@ def __init__(self, variable_key: Union[str, None] = None, **kwargs):
super().__init__(**kwargs)
self.variable_key = variable_key

def execute(self, context: Context) -> Any:
def execute(self, context) -> Any:
logging.info("Getting Variable", self.variable_key)
variables = self.source_hook.get_variables()
variable: Union[dict, None] = (
Expand Down Expand Up @@ -90,7 +90,7 @@ def __init__(self, pool_name: Union[str, None] = None, **kwargs):
super().__init__(**kwargs)
self.pool_name = pool_name

def execute(self, context: Context) -> Any:
def execute(self, context) -> Any:
logging.info("Getting Pool", self.pool_name)
pool: Union[dict, None] = (
[v for v in self.source_hook.get_pools() if v["name"] == self.pool_name]
Expand Down Expand Up @@ -140,7 +140,7 @@ def __init__(self, connection_id: Union[str, None] = None, **kwargs):
super().__init__(**kwargs)
self.connection_id = connection_id

def execute(self, context: Context) -> Any:
def execute(self, context) -> Any:
logging.info("Getting Connection", self.connection_id)
connection: Union[dict, None] = (
[
Expand Down
2 changes: 1 addition & 1 deletion tests/validation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"apache/airflow:2.3.4",
"apache/airflow:2.2.4",
"apache/airflow:2.1.3",
# "apache/airflow:2.0.0",
"apache/airflow:2.0.2",
# # "apache/airflow:1.10.15",
# # "apache/airflow:1.10.10",
]
Expand Down

0 comments on commit d2dd9e8

Please sign in to comment.