diff --git a/airflow/api_fastapi/core_api/datamodels/ui/graph.py b/airflow/api_fastapi/core_api/datamodels/ui/structure.py similarity index 75% rename from airflow/api_fastapi/core_api/datamodels/ui/graph.py rename to airflow/api_fastapi/core_api/datamodels/ui/structure.py index b4d7587b35e7b..e3df958a1a81f 100644 --- a/airflow/api_fastapi/core_api/datamodels/ui/graph.py +++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py @@ -30,31 +30,21 @@ class EdgeResponse(BaseModel): target_id: str -class NodeValueResponse(BaseModel): - """Graph Node Value responses.""" - - isMapped: bool | None = None - label: str | None = None - labelStyle: str | None = None - style: str | None = None - tooltip: str | None = None - rx: int - ry: int - clusterLabelPos: str | None = None - setupTeardownType: Literal["setup", "teardown"] | None = None - - class NodeResponse(BaseModel): """Node serializer for responses.""" children: list[NodeResponse] | None = None id: str | None - value: NodeValueResponse + is_mapped: bool | None = None + label: str | None = None + tooltip: str | None = None + setup_teardown_type: Literal["setup", "teardown"] | None = None + type: Literal["join", "sensor", "task", "task_group"] -class GraphDataResponse(BaseModel): - """Graph Data serializer for responses.""" +class StructureDataResponse(BaseModel): + """Structure Data serializer for responses.""" edges: list[EdgeResponse] - nodes: NodeResponse + nodes: list[NodeResponse] arrange: Literal["BT", "LR", "RL", "TB"] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 20c450cf0a2bc..87a63a3c47951 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -194,13 +194,13 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /ui/graph/graph_data: + /ui/structure/structure_data: get: tags: - - Graph - summary: Graph Data - description: Get Graph Data. - operationId: graph_data + - Structure + summary: Structure Data + description: Get Structure Data. + operationId: structure_data parameters: - name: dag_id in: query @@ -236,7 +236,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/GraphDataResponse' + $ref: '#/components/schemas/StructureDataResponse' '400': content: application/json: @@ -7594,30 +7594,6 @@ components: - name title: FastAPIAppResponse description: Serializer for Plugin FastAPI App responses. - GraphDataResponse: - properties: - edges: - items: - $ref: '#/components/schemas/EdgeResponse' - type: array - title: Edges - nodes: - $ref: '#/components/schemas/NodeResponse' - arrange: - type: string - enum: - - BT - - LR - - RL - - TB - title: Arrange - type: object - required: - - edges - - nodes - - arrange - title: GraphDataResponse - description: Graph Data serializer for responses. HTTPExceptionResponse: properties: detail: @@ -7810,66 +7786,43 @@ components: - type: string - type: 'null' title: Id - value: - $ref: '#/components/schemas/NodeValueResponse' - type: object - required: - - id - - value - title: NodeResponse - description: Node serializer for responses. - NodeValueResponse: - properties: - isMapped: + is_mapped: anyOf: - type: boolean - type: 'null' - title: Ismapped + title: Is Mapped label: anyOf: - type: string - type: 'null' title: Label - labelStyle: - anyOf: - - type: string - - type: 'null' - title: Labelstyle - style: - anyOf: - - type: string - - type: 'null' - title: Style tooltip: anyOf: - type: string - type: 'null' title: Tooltip - rx: - type: integer - title: Rx - ry: - type: integer - title: Ry - clusterLabelPos: - anyOf: - - type: string - - type: 'null' - title: Clusterlabelpos - setupTeardownType: + setup_teardown_type: anyOf: - type: string enum: - setup - teardown - type: 'null' - title: Setupteardowntype + title: Setup Teardown Type + type: + type: string + enum: + - join + - sensor + - task + - task_group + title: Type type: object required: - - rx - - ry - title: NodeValueResponse - description: Graph Node Value responses. + - id + - type + title: NodeResponse + description: Node serializer for responses. PatchTaskInstanceBody: properties: dry_run: @@ -8219,6 +8172,33 @@ components: - latest_scheduler_heartbeat title: SchedulerInfoResponse description: Scheduler info serializer for responses. + StructureDataResponse: + properties: + edges: + items: + $ref: '#/components/schemas/EdgeResponse' + type: array + title: Edges + nodes: + items: + $ref: '#/components/schemas/NodeResponse' + type: array + title: Nodes + arrange: + type: string + enum: + - BT + - LR + - RL + - TB + title: Arrange + type: object + required: + - edges + - nodes + - arrange + title: StructureDataResponse + description: Structure Data serializer for responses. TaskCollectionResponse: properties: tasks: diff --git a/airflow/api_fastapi/core_api/routes/ui/__init__.py b/airflow/api_fastapi/core_api/routes/ui/__init__.py index 0fa150b465397..2b22cc541206b 100644 --- a/airflow/api_fastapi/core_api/routes/ui/__init__.py +++ b/airflow/api_fastapi/core_api/routes/ui/__init__.py @@ -21,7 +21,7 @@ from airflow.api_fastapi.core_api.routes.ui.config import config_router from airflow.api_fastapi.core_api.routes.ui.dags import dags_router from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router -from airflow.api_fastapi.core_api.routes.ui.graph import graph_data_router +from airflow.api_fastapi.core_api.routes.ui.structure import structure_router ui_router = AirflowRouter(prefix="/ui") @@ -29,4 +29,4 @@ ui_router.include_router(config_router) ui_router.include_router(dags_router) ui_router.include_router(dashboard_router) -ui_router.include_router(graph_data_router) +ui_router.include_router(structure_router) diff --git a/airflow/api_fastapi/core_api/routes/ui/graph.py b/airflow/api_fastapi/core_api/routes/ui/structure.py similarity index 79% rename from airflow/api_fastapi/core_api/routes/ui/graph.py rename to airflow/api_fastapi/core_api/routes/ui/structure.py index 10dbfb150c711..a6ed936b92dfe 100644 --- a/airflow/api_fastapi/core_api/routes/ui/graph.py +++ b/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -20,35 +20,37 @@ from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.core_api.datamodels.ui.graph import GraphDataResponse +from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.utils.dag_edges import dag_edges from airflow.utils.task_group import task_group_to_dict -graph_data_router = AirflowRouter(tags=["Graph"], prefix="/graph") +structure_router = AirflowRouter(tags=["Structure"], prefix="/structure") -@graph_data_router.get( - "/graph_data", +@structure_router.get( + "/structure_data", include_in_schema=False, responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]), ) -def graph_data( +def structure_data( session: SessionDep, dag_id: str, request: Request, root: str | None = None, include_upstream: bool = False, include_downstream: bool = False, -) -> GraphDataResponse: - """Get Graph Data.""" +) -> StructureDataResponse: + """Get Structure Data.""" dag = request.app.state.dag_bag.get_dag(dag_id) if root: dag = dag.partial_subset( task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream ) - nodes = task_group_to_dict(dag.task_group) + nodes = [ + task_group_to_dict(child) for child in sorted(dag.task_group.children.values(), key=lambda t: t.label) + ] edges = dag_edges(dag) data = { @@ -57,4 +59,4 @@ def graph_data( "edges": edges, } - return GraphDataResponse(**data) + return StructureDataResponse(**data) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index f7e3576019e50..766c08ae9092b 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -16,13 +16,13 @@ import { DashboardService, EventLogService, ExtraLinksService, - GraphService, ImportErrorService, JobService, MonitorService, PluginService, PoolService, ProviderService, + StructureService, TaskInstanceService, TaskService, VariableService, @@ -327,15 +327,16 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = ( useDashboardServiceHistoricalMetricsKey, ...(queryKey ?? [{ endDate, startDate }]), ]; -export type GraphServiceGraphDataDefaultResponse = Awaited< - ReturnType +export type StructureServiceStructureDataDefaultResponse = Awaited< + ReturnType >; -export type GraphServiceGraphDataQueryResult< - TData = GraphServiceGraphDataDefaultResponse, +export type StructureServiceStructureDataQueryResult< + TData = StructureServiceStructureDataDefaultResponse, TError = unknown, > = UseQueryResult; -export const useGraphServiceGraphDataKey = "GraphServiceGraphData"; -export const UseGraphServiceGraphDataKeyFn = ( +export const useStructureServiceStructureDataKey = + "StructureServiceStructureData"; +export const UseStructureServiceStructureDataKeyFn = ( { dagId, includeDownstream, @@ -349,7 +350,7 @@ export const UseGraphServiceGraphDataKeyFn = ( }, queryKey?: Array, ) => [ - useGraphServiceGraphDataKey, + useStructureServiceStructureDataKey, ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]), ]; export type BackfillServiceListBackfillsDefaultResponse = Awaited< diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 54603f0b160dd..2bf0cf3695c39 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -15,13 +15,13 @@ import { DashboardService, EventLogService, ExtraLinksService, - GraphService, ImportErrorService, JobService, MonitorService, PluginService, PoolService, ProviderService, + StructureService, TaskInstanceService, TaskService, VariableService, @@ -407,17 +407,17 @@ export const prefetchUseDashboardServiceHistoricalMetrics = ( queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }), }); /** - * Graph Data - * Get Graph Data. + * Structure Data + * Get Structure Data. * @param data The data for the request. * @param data.dagId * @param data.root * @param data.includeUpstream * @param data.includeDownstream - * @returns GraphDataResponse Successful Response + * @returns StructureDataResponse Successful Response * @throws ApiError */ -export const prefetchUseGraphServiceGraphData = ( +export const prefetchUseStructureServiceStructureData = ( queryClient: QueryClient, { dagId, @@ -432,14 +432,14 @@ export const prefetchUseGraphServiceGraphData = ( }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseGraphServiceGraphDataKeyFn({ + queryKey: Common.UseStructureServiceStructureDataKeyFn({ dagId, includeDownstream, includeUpstream, root, }), queryFn: () => - GraphService.graphData({ + StructureService.structureData({ dagId, includeDownstream, includeUpstream, diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 3992c3a2edf47..6b9645fcb5ef8 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -21,13 +21,13 @@ import { DashboardService, EventLogService, ExtraLinksService, - GraphService, ImportErrorService, JobService, MonitorService, PluginService, PoolService, ProviderService, + StructureService, TaskInstanceService, TaskService, VariableService, @@ -523,18 +523,18 @@ export const useDashboardServiceHistoricalMetrics = < ...options, }); /** - * Graph Data - * Get Graph Data. + * Structure Data + * Get Structure Data. * @param data The data for the request. * @param data.dagId * @param data.root * @param data.includeUpstream * @param data.includeDownstream - * @returns GraphDataResponse Successful Response + * @returns StructureDataResponse Successful Response * @throws ApiError */ -export const useGraphServiceGraphData = < - TData = Common.GraphServiceGraphDataDefaultResponse, +export const useStructureServiceStructureData = < + TData = Common.StructureServiceStructureDataDefaultResponse, TError = unknown, TQueryKey extends Array = unknown[], >( @@ -553,12 +553,12 @@ export const useGraphServiceGraphData = < options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ - queryKey: Common.UseGraphServiceGraphDataKeyFn( + queryKey: Common.UseStructureServiceStructureDataKeyFn( { dagId, includeDownstream, includeUpstream, root }, queryKey, ), queryFn: () => - GraphService.graphData({ + StructureService.structureData({ dagId, includeDownstream, includeUpstream, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 185272ef0c2f2..8bc0c526b2f1c 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -15,13 +15,13 @@ import { DashboardService, EventLogService, ExtraLinksService, - GraphService, ImportErrorService, JobService, MonitorService, PluginService, PoolService, ProviderService, + StructureService, TaskInstanceService, TaskService, VariableService, @@ -498,18 +498,18 @@ export const useDashboardServiceHistoricalMetricsSuspense = < ...options, }); /** - * Graph Data - * Get Graph Data. + * Structure Data + * Get Structure Data. * @param data The data for the request. * @param data.dagId * @param data.root * @param data.includeUpstream * @param data.includeDownstream - * @returns GraphDataResponse Successful Response + * @returns StructureDataResponse Successful Response * @throws ApiError */ -export const useGraphServiceGraphDataSuspense = < - TData = Common.GraphServiceGraphDataDefaultResponse, +export const useStructureServiceStructureDataSuspense = < + TData = Common.StructureServiceStructureDataDefaultResponse, TError = unknown, TQueryKey extends Array = unknown[], >( @@ -528,12 +528,12 @@ export const useGraphServiceGraphDataSuspense = < options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ - queryKey: Common.UseGraphServiceGraphDataKeyFn( + queryKey: Common.UseStructureServiceStructureDataKeyFn( { dagId, includeDownstream, includeUpstream, root }, queryKey, ), queryFn: () => - GraphService.graphData({ + StructureService.structureData({ dagId, includeDownstream, includeUpstream, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index ae2eeaf1687cc..0f704159c8dc5 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2838,30 +2838,6 @@ export const $FastAPIAppResponse = { description: "Serializer for Plugin FastAPI App responses.", } as const; -export const $GraphDataResponse = { - properties: { - edges: { - items: { - $ref: "#/components/schemas/EdgeResponse", - }, - type: "array", - title: "Edges", - }, - nodes: { - $ref: "#/components/schemas/NodeResponse", - }, - arrange: { - type: "string", - enum: ["BT", "LR", "RL", "TB"], - title: "Arrange", - }, - }, - type: "object", - required: ["edges", "nodes", "arrange"], - title: "GraphDataResponse", - description: "Graph Data serializer for responses.", -} as const; - export const $HTTPExceptionResponse = { properties: { detail: { @@ -3162,19 +3138,7 @@ export const $NodeResponse = { ], title: "Id", }, - value: { - $ref: "#/components/schemas/NodeValueResponse", - }, - }, - type: "object", - required: ["id", "value"], - title: "NodeResponse", - description: "Node serializer for responses.", -} as const; - -export const $NodeValueResponse = { - properties: { - isMapped: { + is_mapped: { anyOf: [ { type: "boolean", @@ -3183,7 +3147,7 @@ export const $NodeValueResponse = { type: "null", }, ], - title: "Ismapped", + title: "Is Mapped", }, label: { anyOf: [ @@ -3196,28 +3160,6 @@ export const $NodeValueResponse = { ], title: "Label", }, - labelStyle: { - anyOf: [ - { - type: "string", - }, - { - type: "null", - }, - ], - title: "Labelstyle", - }, - style: { - anyOf: [ - { - type: "string", - }, - { - type: "null", - }, - ], - title: "Style", - }, tooltip: { anyOf: [ { @@ -3229,26 +3171,7 @@ export const $NodeValueResponse = { ], title: "Tooltip", }, - rx: { - type: "integer", - title: "Rx", - }, - ry: { - type: "integer", - title: "Ry", - }, - clusterLabelPos: { - anyOf: [ - { - type: "string", - }, - { - type: "null", - }, - ], - title: "Clusterlabelpos", - }, - setupTeardownType: { + setup_teardown_type: { anyOf: [ { type: "string", @@ -3258,13 +3181,18 @@ export const $NodeValueResponse = { type: "null", }, ], - title: "Setupteardowntype", + title: "Setup Teardown Type", + }, + type: { + type: "string", + enum: ["join", "sensor", "task", "task_group"], + title: "Type", }, }, type: "object", - required: ["rx", "ry"], - title: "NodeValueResponse", - description: "Graph Node Value responses.", + required: ["id", "type"], + title: "NodeResponse", + description: "Node serializer for responses.", } as const; export const $PatchTaskInstanceBody = { @@ -3755,6 +3683,34 @@ export const $SchedulerInfoResponse = { description: "Scheduler info serializer for responses.", } as const; +export const $StructureDataResponse = { + properties: { + edges: { + items: { + $ref: "#/components/schemas/EdgeResponse", + }, + type: "array", + title: "Edges", + }, + nodes: { + items: { + $ref: "#/components/schemas/NodeResponse", + }, + type: "array", + title: "Nodes", + }, + arrange: { + type: "string", + enum: ["BT", "LR", "RL", "TB"], + title: "Arrange", + }, + }, + type: "object", + required: ["edges", "nodes", "arrange"], + title: "StructureDataResponse", + description: "Structure Data serializer for responses.", +} as const; + export const $TaskCollectionResponse = { properties: { tasks: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c5e494fe11992..0536c0afa8032 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -34,8 +34,8 @@ import type { RecentDagRunsResponse, HistoricalMetricsData, HistoricalMetricsResponse, - GraphDataData, - GraphDataResponse2, + StructureDataData, + StructureDataResponse2, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, @@ -665,24 +665,24 @@ export class DashboardService { } } -export class GraphService { +export class StructureService { /** - * Graph Data - * Get Graph Data. + * Structure Data + * Get Structure Data. * @param data The data for the request. * @param data.dagId * @param data.root * @param data.includeUpstream * @param data.includeDownstream - * @returns GraphDataResponse Successful Response + * @returns StructureDataResponse Successful Response * @throws ApiError */ - public static graphData( - data: GraphDataData, - ): CancelablePromise { + public static structureData( + data: StructureDataData, + ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/ui/graph/graph_data", + url: "/ui/structure/structure_data", query: { dag_id: data.dagId, root: data.root, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1f1838d08706c..6977b4de6368d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -680,17 +680,6 @@ export type FastAPIAppResponse = { [key: string]: unknown | string; }; -/** - * Graph Data serializer for responses. - */ -export type GraphDataResponse = { - edges: Array; - nodes: NodeResponse; - arrange: "BT" | "LR" | "RL" | "TB"; -}; - -export type arrange = "BT" | "LR" | "RL" | "TB"; - /** * HTTPException Model used for error response. */ @@ -773,24 +762,15 @@ export type JobResponse = { export type NodeResponse = { children?: Array | null; id: string | null; - value: NodeValueResponse; -}; - -/** - * Graph Node Value responses. - */ -export type NodeValueResponse = { - isMapped?: boolean | null; + is_mapped?: boolean | null; label?: string | null; - labelStyle?: string | null; - style?: string | null; tooltip?: string | null; - rx: number; - ry: number; - clusterLabelPos?: string | null; - setupTeardownType?: "setup" | "teardown" | null; + setup_teardown_type?: "setup" | "teardown" | null; + type: "join" | "sensor" | "task" | "task_group"; }; +export type type = "join" | "sensor" | "task" | "task_group"; + /** * Request body for Clear Task Instances endpoint. */ @@ -930,6 +910,17 @@ export type SchedulerInfoResponse = { latest_scheduler_heartbeat: string | null; }; +/** + * Structure Data serializer for responses. + */ +export type StructureDataResponse = { + edges: Array; + nodes: Array; + arrange: "BT" | "LR" | "RL" | "TB"; +}; + +export type arrange = "BT" | "LR" | "RL" | "TB"; + /** * Task collection serializer for responses. */ @@ -1425,14 +1416,14 @@ export type HistoricalMetricsData = { export type HistoricalMetricsResponse = HistoricalMetricDataResponse; -export type GraphDataData = { +export type StructureDataData = { dagId: string; includeDownstream?: boolean; includeUpstream?: boolean; root?: string | null; }; -export type GraphDataResponse2 = GraphDataResponse; +export type StructureDataResponse2 = StructureDataResponse; export type ListBackfillsData = { dagId: string; @@ -2456,14 +2447,14 @@ export type $OpenApiTs = { }; }; }; - "/ui/graph/graph_data": { + "/ui/structure/structure_data": { get: { - req: GraphDataData; + req: StructureDataData; res: { /** * Successful Response */ - 200: GraphDataResponse; + 200: StructureDataResponse; /** * Bad Request */ diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py index 3597c7f893cd2..3d3738f5a8d7d 100644 --- a/airflow/utils/task_group.py +++ b/airflow/utils/task_group.py @@ -81,6 +81,60 @@ def task_group_to_dict(task_item_or_group): """Create a nested dict representation of this TaskGroup and its children used to construct the Graph.""" from airflow.models.abstractoperator import AbstractOperator from airflow.models.mappedoperator import MappedOperator + from airflow.sensors.base import BaseSensorOperator + + if isinstance(task := task_item_or_group, AbstractOperator): + setup_teardown_type = {} + is_mapped = {} + node_type = {"type": "task"} + if task.is_setup is True: + setup_teardown_type["setup_teardown_type"] = "setup" + elif task.is_teardown is True: + setup_teardown_type["setup_teardown_type"] = "teardown" + if isinstance(task, MappedOperator): + is_mapped["is_mapped"] = True + if isinstance(task, BaseSensorOperator): + node_type["type"] = "sensor" + return { + "id": task.task_id, + "label": task.label, + **is_mapped, + **setup_teardown_type, + **node_type, + } + + task_group = task_item_or_group + is_mapped = isinstance(task_group, MappedTaskGroup) + children = [ + task_group_to_dict(child) for child in sorted(task_group.children.values(), key=lambda t: t.label) + ] + + if task_group.upstream_group_ids or task_group.upstream_task_ids: + # This is the join node used to reduce the number of edges between two TaskGroup. + children.append({"id": task_group.upstream_join_id, "label": "", "type": "join"}) + + if task_group.downstream_group_ids or task_group.downstream_task_ids: + # This is the join node used to reduce the number of edges between two TaskGroup. + children.append({"id": task_group.downstream_join_id, "label": "", "type": "join"}) + + return { + "id": task_group.group_id, + "label": task_group.label, + "tooltip": task_group.tooltip, + "is_mapped": is_mapped, + "children": children, + "type": "task_group", + } + + +def task_group_to_dict_legacy(task_item_or_group): + """ + Legacy function to create a nested dict representation of this TaskGroup and its children used to construct the Graph. + + TODO: To remove for airflow 3 once the legacy UI is deleted. + """ + from airflow.models.abstractoperator import AbstractOperator + from airflow.models.mappedoperator import MappedOperator if isinstance(task := task_item_or_group, AbstractOperator): setup_teardown_type = {} @@ -106,7 +160,8 @@ def task_group_to_dict(task_item_or_group): task_group = task_item_or_group is_mapped = isinstance(task_group, MappedTaskGroup) children = [ - task_group_to_dict(child) for child in sorted(task_group.children.values(), key=lambda t: t.label) + task_group_to_dict_legacy(child) + for child in sorted(task_group.children.values(), key=lambda t: t.label) ] if task_group.upstream_group_ids or task_group.upstream_task_ids: diff --git a/airflow/www/views.py b/airflow/www/views.py index 57f7b3c204f31..5fc189800f652 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -132,7 +132,7 @@ from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.strings import to_boolean -from airflow.utils.task_group import TaskGroup, task_group_to_dict +from airflow.utils.task_group import TaskGroup, task_group_to_dict_legacy from airflow.utils.timezone import td_format, utcnow from airflow.utils.types import NOTSET, DagRunTriggeredByType from airflow.version import version @@ -3243,7 +3243,7 @@ def graph_data(self): task_ids_or_regex=root, include_upstream=filter_upstream, include_downstream=filter_downstream ) - nodes = task_group_to_dict(dag.task_group) + nodes = task_group_to_dict_legacy(dag.task_group) edges = dag_edges(dag) data = { diff --git a/task_sdk/src/airflow/sdk/definitions/taskgroup.py b/task_sdk/src/airflow/sdk/definitions/taskgroup.py index 7395b34174028..fd02a4c94e714 100644 --- a/task_sdk/src/airflow/sdk/definitions/taskgroup.py +++ b/task_sdk/src/airflow/sdk/definitions/taskgroup.py @@ -146,7 +146,10 @@ def __attrs_post_init__(self): if self.parent_group: self.parent_group.add(self) if self.parent_group.default_args: - self.default_args = {**self.parent_group.default_args, **self.default_args} + self.default_args = { + **self.parent_group.default_args, + **self.default_args, + } if self._group_id: self.used_group_ids.add(self.group_id) @@ -235,7 +238,9 @@ def add(self, task: DAGNode) -> DAGNode: if self.dag: if task.dag is not None and self.dag is not task.dag: raise RuntimeError( - "Cannot mix TaskGroups from different DAGs: %s and %s", self.dag, task.dag + "Cannot mix TaskGroups from different DAGs: %s and %s", + self.dag, + task.dag, ) task.dag = self.dag if task.children: @@ -268,7 +273,10 @@ def label(self) -> str | None: return self._group_id def update_relative( - self, other: DependencyMixin, upstream: bool = True, edge_modifier: EdgeModifier | None = None + self, + other: DependencyMixin, + upstream: bool = True, + edge_modifier: EdgeModifier | None = None, ) -> None: """ Override TaskMixin.update_relative. @@ -463,7 +471,10 @@ def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: from airflow.serialization.enums import DagAttributeTypes from airflow.serialization.serialized_objects import TaskGroupSerialization - return DagAttributeTypes.TASK_GROUP, TaskGroupSerialization.serialize_task_group(self) + return ( + DagAttributeTypes.TASK_GROUP, + TaskGroupSerialization.serialize_task_group(self), + ) def hierarchical_alphabetical_sort(self): """ @@ -475,7 +486,8 @@ def hierarchical_alphabetical_sort(self): :return: list of tasks in hierarchical alphabetical order """ return sorted( - self.children.values(), key=lambda node: (not isinstance(node, TaskGroup), node.node_id) + self.children.values(), + key=lambda node: (not isinstance(node, TaskGroup), node.node_id), ) def topological_sort(self): @@ -626,28 +638,28 @@ def task_group_to_dict(task_item_or_group): """Create a nested dict representation of this TaskGroup and its children used to construct the Graph.""" from airflow.models.abstractoperator import AbstractOperator from airflow.models.mappedoperator import MappedOperator + from airflow.sensors.base import BaseSensorOperator if isinstance(task := task_item_or_group, AbstractOperator): setup_teardown_type = {} is_mapped = {} + node_type = {"type": "task"} if task.is_setup is True: - setup_teardown_type["setupTeardownType"] = "setup" + setup_teardown_type["setup_teardown_type"] = "setup" elif task.is_teardown is True: - setup_teardown_type["setupTeardownType"] = "teardown" + setup_teardown_type["setup_teardown_type"] = "teardown" if isinstance(task, MappedOperator): - is_mapped["isMapped"] = True + is_mapped["is_mapped"] = True + if isinstance(task, BaseSensorOperator): + node_type["type"] = "sensor" return { "id": task.task_id, - "value": { - "label": task.label, - "labelStyle": f"fill:{task.ui_fgcolor};", - "style": f"fill:{task.ui_color};", - "rx": 5, - "ry": 5, - **is_mapped, - **setup_teardown_type, - }, + "label": task.label, + **is_mapped, + **setup_teardown_type, + **node_type, } + task_group = task_item_or_group is_mapped = isinstance(task_group, MappedTaskGroup) children = [ @@ -655,43 +667,18 @@ def task_group_to_dict(task_item_or_group): ] if task_group.upstream_group_ids or task_group.upstream_task_ids: - children.append( - { - "id": task_group.upstream_join_id, - "value": { - "label": "", - "labelStyle": f"fill:{task_group.ui_fgcolor};", - "style": f"fill:{task_group.ui_color};", - "shape": "circle", - }, - } - ) + # This is the join node used to reduce the number of edges between two TaskGroup. + children.append({"id": task_group.upstream_join_id, "label": "", "type": "join"}) if task_group.downstream_group_ids or task_group.downstream_task_ids: # This is the join node used to reduce the number of edges between two TaskGroup. - children.append( - { - "id": task_group.downstream_join_id, - "value": { - "label": "", - "labelStyle": f"fill:{task_group.ui_fgcolor};", - "style": f"fill:{task_group.ui_color};", - "shape": "circle", - }, - } - ) + children.append({"id": task_group.downstream_join_id, "label": "", "type": "join"}) return { "id": task_group.group_id, - "value": { - "label": task_group.label, - "labelStyle": f"fill:{task_group.ui_fgcolor};", - "style": f"fill:{task_group.ui_color}", - "rx": 5, - "ry": 5, - "clusterLabelPos": "top", - "tooltip": task_group.tooltip, - "isMapped": is_mapped, - }, + "label": task_group.label, + "tooltip": task_group.tooltip, + "is_mapped": is_mapped, "children": children, + "type": "task_group", } diff --git a/tests/api_fastapi/core_api/routes/ui/test_graph.py b/tests/api_fastapi/core_api/routes/ui/test_graph.py deleted file mode 100644 index 85dabd3ac6afa..0000000000000 --- a/tests/api_fastapi/core_api/routes/ui/test_graph.py +++ /dev/null @@ -1,198 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pendulum -import pytest - -from airflow.models import DagBag -from airflow.operators.empty import EmptyOperator - -from tests_common.test_utils.db import clear_db_runs - -pytestmark = pytest.mark.db_test - -DAG_ID = "test_dag_id" - - -@pytest.fixture(autouse=True, scope="module") -def examples_dag_bag(): - # Speed up: We don't want example dags for this module - - return DagBag(include_examples=False, read_dags_from_db=True) - - -@pytest.fixture(autouse=True) -def clean(): - clear_db_runs() - yield - clear_db_runs() - - -@pytest.fixture -def make_dag(dag_maker, session, time_machine): - with dag_maker( - dag_id=DAG_ID, - serialized=True, - session=session, - start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), - ): - EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") - - dag_maker.dagbag.sync_to_db() - - -class TestGraphDataEndpoint: - @pytest.mark.parametrize( - "params, expected", - [ - ( - {"dag_id": DAG_ID}, - { - "arrange": "LR", - "edges": [ - { - "is_setup_teardown": None, - "label": None, - "source_id": "task_1", - "target_id": "task_2", - }, - ], - "nodes": { - "children": [ - { - "children": None, - "id": "task_1", - "value": { - "clusterLabelPos": None, - "isMapped": None, - "label": "task_1", - "labelStyle": "fill:#000;", - "rx": 5, - "ry": 5, - "setupTeardownType": None, - "style": "fill:#e8f7e4;", - "tooltip": None, - }, - }, - { - "children": None, - "id": "task_2", - "value": { - "clusterLabelPos": None, - "isMapped": None, - "label": "task_2", - "labelStyle": "fill:#000;", - "rx": 5, - "ry": 5, - "setupTeardownType": None, - "style": "fill:#e8f7e4;", - "tooltip": None, - }, - }, - ], - "id": None, - "value": { - "clusterLabelPos": "top", - "isMapped": False, - "label": None, - "labelStyle": "fill:#000;", - "rx": 5, - "ry": 5, - "setupTeardownType": None, - "style": "fill:CornflowerBlue", - "tooltip": "", - }, - }, - }, - ), - ( - { - "dag_id": DAG_ID, - "root": "unknown_task", - }, - { - "arrange": "LR", - "edges": [], - "nodes": { - "children": [], - "id": None, - "value": { - "clusterLabelPos": "top", - "isMapped": False, - "label": None, - "labelStyle": "fill:#000;", - "rx": 5, - "ry": 5, - "setupTeardownType": None, - "style": "fill:CornflowerBlue", - "tooltip": "", - }, - }, - }, - ), - ( - { - "dag_id": DAG_ID, - "root": "task_1", - "filter_upstream": False, - "filter_downstream": False, - }, - { - "arrange": "LR", - "edges": [], - "nodes": { - "children": [ - { - "children": None, - "id": "task_1", - "value": { - "clusterLabelPos": None, - "isMapped": None, - "label": "task_1", - "labelStyle": "fill:#000;", - "rx": 5, - "ry": 5, - "setupTeardownType": None, - "style": "fill:#e8f7e4;", - "tooltip": None, - }, - }, - ], - "id": None, - "value": { - "clusterLabelPos": "top", - "isMapped": False, - "label": None, - "labelStyle": "fill:#000;", - "rx": 5, - "ry": 5, - "setupTeardownType": None, - "style": "fill:CornflowerBlue", - "tooltip": "", - }, - }, - }, - ), - ], - ) - @pytest.mark.usefixtures("make_dag") - def test_historical_metrics_data(self, test_client, params, expected): - response = test_client.get("/ui/graph/graph_data", params=params) - assert response.status_code == 200 - assert response.json() == expected diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py b/tests/api_fastapi/core_api/routes/ui/test_structure.py new file mode 100644 index 0000000000000..202f8b207a78b --- /dev/null +++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum +import pytest + +from airflow.models import DagBag +from airflow.operators.empty import EmptyOperator + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + +DAG_ID = "test_dag_id" + + +@pytest.fixture(autouse=True, scope="module") +def examples_dag_bag(): + # Speed up: We don't want example dags for this module + + return DagBag(include_examples=False, read_dags_from_db=True) + + +@pytest.fixture(autouse=True) +def clean(): + clear_db_runs() + yield + clear_db_runs() + + +@pytest.fixture +def make_dag(dag_maker, session, time_machine): + with dag_maker( + dag_id=DAG_ID, + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") + + dag_maker.dagbag.sync_to_db() + + +class TestStructureDataEndpoint: + @pytest.mark.parametrize( + "params, expected", + [ + ( + {"dag_id": DAG_ID}, + { + "arrange": "LR", + "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "task_2", + }, + ], + "nodes": [ + { + "children": None, + "id": "task_1", + "is_mapped": None, + "label": "task_1", + "setup_teardown_type": None, + "tooltip": None, + "type": "task", + }, + { + "children": None, + "id": "task_2", + "is_mapped": None, + "label": "task_2", + "setup_teardown_type": None, + "tooltip": None, + "type": "task", + }, + ], + }, + ), + ( + { + "dag_id": DAG_ID, + "root": "unknown_task", + }, + {"arrange": "LR", "edges": [], "nodes": []}, + ), + ( + { + "dag_id": DAG_ID, + "root": "task_1", + "filter_upstream": False, + "filter_downstream": False, + }, + { + "arrange": "LR", + "edges": [], + "nodes": [ + { + "children": None, + "id": "task_1", + "is_mapped": None, + "label": "task_1", + "setup_teardown_type": None, + "tooltip": None, + "type": "task", + }, + ], + }, + ), + ], + ) + @pytest.mark.usefixtures("make_dag") + def test_historical_metrics_data(self, test_client, params, expected): + response = test_client.get("/ui/structure/structure_data", params=params) + assert response.status_code == 200 + assert response.json() == expected diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py index 00caac703a40e..7b5c960f9c298 100644 --- a/tests/utils/test_task_group.py +++ b/tests/utils/test_task_group.py @@ -35,7 +35,7 @@ from airflow.models.xcom_arg import XComArg from airflow.operators.empty import EmptyOperator from airflow.utils.dag_edges import dag_edges -from airflow.utils.task_group import TaskGroup, task_group_to_dict +from airflow.utils.task_group import TaskGroup, task_group_to_dict, task_group_to_dict_legacy from tests.models import DEFAULT_DATE from tests_common.test_utils.compat import BashOperator, PythonOperator @@ -54,7 +54,7 @@ def my_task(): return my_task.override(task_id=name)() -EXPECTED_JSON = { +EXPECTED_JSON_LEGACY = { "id": None, "value": { "label": None, @@ -168,6 +168,41 @@ def my_task(): ], } +EXPECTED_JSON = { + "id": None, + "label": None, + "tooltip": "", + "is_mapped": False, + "children": [ + { + "id": "group234", + "label": "group234", + "tooltip": "", + "is_mapped": False, + "children": [ + { + "id": "group234.group34", + "label": "group34", + "tooltip": "", + "is_mapped": False, + "children": [ + {"id": "group234.group34.task3", "label": "task3", "type": "task"}, + {"id": "group234.group34.task4", "label": "task4", "type": "task"}, + {"id": "group234.group34.downstream_join_id", "label": "", "type": "join"}, + ], + "type": "task_group", + }, + {"id": "group234.task2", "label": "task2", "type": "task"}, + {"id": "group234.upstream_join_id", "label": "", "type": "join"}, + ], + "type": "task_group", + }, + {"id": "task1", "label": "task1", "type": "task"}, + {"id": "task5", "label": "task5", "type": "task"}, + ], + "type": "task_group", +} + def test_build_task_group_context_manager(): logical_date = pendulum.parse("20200101") @@ -199,6 +234,7 @@ def test_build_task_group_context_manager(): assert set(dag.task_group.children.keys()) == {"task1", "group234", "task5"} assert group34.group_id == "group234.group34" + assert task_group_to_dict_legacy(dag.task_group) == EXPECTED_JSON_LEGACY assert task_group_to_dict(dag.task_group) == EXPECTED_JSON @@ -220,17 +256,21 @@ def test_build_task_group(): task1 >> group234 group34 >> task5 + assert task_group_to_dict_legacy(dag.task_group) == EXPECTED_JSON_LEGACY assert task_group_to_dict(dag.task_group) == EXPECTED_JSON -def extract_node_id(node, include_label=False): +def extract_node_id(node, include_label=False, from_legacy=False): ret = {"id": node["id"]} if include_label: - ret["label"] = node["value"]["label"] + if from_legacy: + ret["label"] = node["value"]["label"] + else: + ret["label"] = node["label"] if "children" in node: children = [] for child in node["children"]: - children.append(extract_node_id(child, include_label=include_label)) + children.append(extract_node_id(child, include_label=include_label, from_legacy=from_legacy)) ret["children"] = children @@ -267,7 +307,7 @@ def test_build_task_group_with_prefix(): assert group234.get_child_by_label("group34") == group34 assert group4.get_child_by_label("task4") == task4 - assert extract_node_id(task_group_to_dict(dag.task_group), include_label=True) == { + expected_node_id = { "id": None, "label": None, "children": [ @@ -297,6 +337,12 @@ def test_build_task_group_with_prefix(): ], } + assert ( + extract_node_id(task_group_to_dict_legacy(dag.task_group), include_label=True, from_legacy=True) + == expected_node_id + ) + assert extract_node_id(task_group_to_dict(dag.task_group), include_label=True) == expected_node_id + def test_build_task_group_with_task_decorator(): """ @@ -341,7 +387,7 @@ def task_5(): assert tsk_1.operator in tsk_3.operator.upstream_list assert tsk_5.operator in tsk_4.operator.downstream_list - assert extract_node_id(task_group_to_dict(dag.task_group)) == { + expected_node_id = { "id": None, "children": [ { @@ -359,6 +405,9 @@ def task_5(): ], } + assert extract_node_id(task_group_to_dict_legacy(dag.task_group), from_legacy=True) == expected_node_id + assert extract_node_id(task_group_to_dict(dag.task_group)) == expected_node_id + edges = dag_edges(dag) assert sorted((e["source_id"], e["target_id"]) for e in edges) == [ ("group234.downstream_join_id", "task_5"), @@ -398,7 +447,7 @@ def test_sub_dag_task_group(): subdag = dag.partial_subset(task_ids_or_regex="task5", include_upstream=True, include_downstream=False) - assert extract_node_id(task_group_to_dict(subdag.task_group)) == { + expected_node_id = { "id": None, "children": [ { @@ -420,6 +469,9 @@ def test_sub_dag_task_group(): ], } + assert extract_node_id(task_group_to_dict_legacy(subdag.task_group), from_legacy=True) == expected_node_id + assert extract_node_id(task_group_to_dict(subdag.task_group)) == expected_node_id + edges = dag_edges(subdag) assert sorted((e["source_id"], e["target_id"]) for e in edges) == [ ("group234.group34.downstream_join_id", "task5"), @@ -485,10 +537,11 @@ def test_dag_edges(): group_d << group_c - nodes = task_group_to_dict(dag.task_group) + nodes_legacy = task_group_to_dict_legacy(dag.task_group) + nodes = task_group_to_dict_legacy(dag.task_group) edges = dag_edges(dag) - assert extract_node_id(nodes) == { + expected_node_id = { "id": None, "children": [ { @@ -532,6 +585,9 @@ def test_dag_edges(): ], } + assert extract_node_id(nodes_legacy) == expected_node_id + assert extract_node_id(nodes, from_legacy=False) == expected_node_id + assert sorted((e["source_id"], e["target_id"]) for e in edges) == [ ("group_a.downstream_join_id", "group_c.upstream_join_id"), ("group_a.group_b.downstream_join_id", "group_a.task5"), @@ -787,6 +843,7 @@ def section_2(value2): ], } + assert extract_node_id(task_group_to_dict_legacy(dag.task_group), from_legacy=True) == node_ids assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids @@ -968,6 +1025,7 @@ def section_2(value): ], } + assert extract_node_id(task_group_to_dict_legacy(dag.task_group), from_legacy=True) == node_ids assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids @@ -1062,6 +1120,7 @@ def task_group3(): ], } + assert extract_node_id(task_group_to_dict_legacy(dag.task_group), from_legacy=True) == node_ids assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids @@ -1124,6 +1183,7 @@ def task_group1(name: str): ], } + assert extract_node_id(task_group_to_dict_legacy(dag.task_group), from_legacy=True) == node_ids assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids @@ -1593,13 +1653,25 @@ def work(): ... assert set(t1.operator.downstream_task_ids) == set() assert set(t2.operator.downstream_task_ids) == set() - def get_nodes(group): - d = task_group_to_dict(group) + def get_nodes(group, from_legacy=False): + if from_legacy: + d = task_group_to_dict_legacy(group) + else: + d = task_group_to_dict(group) new_d = {} new_d["id"] = d["id"] new_d["children"] = [{"id": x["id"]} for x in d["children"]] return new_d + assert get_nodes(g1, from_legacy=True) == { + "id": "group_1", + "children": [ + {"id": "group_1.setup_1"}, + {"id": "group_1.setup_2"}, + {"id": "group_1.downstream_join_id"}, + ], + } + assert get_nodes(g1) == { "id": "group_1", "children": [