Skip to content

Commit

Permalink
AIP-84 Graph Data update datamodel (apache#44459)
Browse files Browse the repository at this point in the history
* AIP-84 Update Structure Datamodel

* Update task sdk task_group_to_dict
  • Loading branch information
pierrejeambrun authored Dec 2, 2024
1 parent 3c1124e commit c0f2826
Show file tree
Hide file tree
Showing 17 changed files with 485 additions and 515 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,21 @@ class EdgeResponse(BaseModel):
target_id: str


class NodeValueResponse(BaseModel):
"""Graph Node Value responses."""

isMapped: bool | None = None
label: str | None = None
labelStyle: str | None = None
style: str | None = None
tooltip: str | None = None
rx: int
ry: int
clusterLabelPos: str | None = None
setupTeardownType: Literal["setup", "teardown"] | None = None


class NodeResponse(BaseModel):
"""Node serializer for responses."""

children: list[NodeResponse] | None = None
id: str | None
value: NodeValueResponse
is_mapped: bool | None = None
label: str | None = None
tooltip: str | None = None
setup_teardown_type: Literal["setup", "teardown"] | None = None
type: Literal["join", "sensor", "task", "task_group"]


class GraphDataResponse(BaseModel):
"""Graph Data serializer for responses."""
class StructureDataResponse(BaseModel):
"""Structure Data serializer for responses."""

edges: list[EdgeResponse]
nodes: NodeResponse
nodes: list[NodeResponse]
arrange: Literal["BT", "LR", "RL", "TB"]
118 changes: 49 additions & 69 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/ui/graph/graph_data:
/ui/structure/structure_data:
get:
tags:
- Graph
summary: Graph Data
description: Get Graph Data.
operationId: graph_data
- Structure
summary: Structure Data
description: Get Structure Data.
operationId: structure_data
parameters:
- name: dag_id
in: query
Expand Down Expand Up @@ -236,7 +236,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/GraphDataResponse'
$ref: '#/components/schemas/StructureDataResponse'
'400':
content:
application/json:
Expand Down Expand Up @@ -7594,30 +7594,6 @@ components:
- name
title: FastAPIAppResponse
description: Serializer for Plugin FastAPI App responses.
GraphDataResponse:
properties:
edges:
items:
$ref: '#/components/schemas/EdgeResponse'
type: array
title: Edges
nodes:
$ref: '#/components/schemas/NodeResponse'
arrange:
type: string
enum:
- BT
- LR
- RL
- TB
title: Arrange
type: object
required:
- edges
- nodes
- arrange
title: GraphDataResponse
description: Graph Data serializer for responses.
HTTPExceptionResponse:
properties:
detail:
Expand Down Expand Up @@ -7810,66 +7786,43 @@ components:
- type: string
- type: 'null'
title: Id
value:
$ref: '#/components/schemas/NodeValueResponse'
type: object
required:
- id
- value
title: NodeResponse
description: Node serializer for responses.
NodeValueResponse:
properties:
isMapped:
is_mapped:
anyOf:
- type: boolean
- type: 'null'
title: Ismapped
title: Is Mapped
label:
anyOf:
- type: string
- type: 'null'
title: Label
labelStyle:
anyOf:
- type: string
- type: 'null'
title: Labelstyle
style:
anyOf:
- type: string
- type: 'null'
title: Style
tooltip:
anyOf:
- type: string
- type: 'null'
title: Tooltip
rx:
type: integer
title: Rx
ry:
type: integer
title: Ry
clusterLabelPos:
anyOf:
- type: string
- type: 'null'
title: Clusterlabelpos
setupTeardownType:
setup_teardown_type:
anyOf:
- type: string
enum:
- setup
- teardown
- type: 'null'
title: Setupteardowntype
title: Setup Teardown Type
type:
type: string
enum:
- join
- sensor
- task
- task_group
title: Type
type: object
required:
- rx
- ry
title: NodeValueResponse
description: Graph Node Value responses.
- id
- type
title: NodeResponse
description: Node serializer for responses.
PatchTaskInstanceBody:
properties:
dry_run:
Expand Down Expand Up @@ -8219,6 +8172,33 @@ components:
- latest_scheduler_heartbeat
title: SchedulerInfoResponse
description: Scheduler info serializer for responses.
StructureDataResponse:
properties:
edges:
items:
$ref: '#/components/schemas/EdgeResponse'
type: array
title: Edges
nodes:
items:
$ref: '#/components/schemas/NodeResponse'
type: array
title: Nodes
arrange:
type: string
enum:
- BT
- LR
- RL
- TB
title: Arrange
type: object
required:
- edges
- nodes
- arrange
title: StructureDataResponse
description: Structure Data serializer for responses.
TaskCollectionResponse:
properties:
tasks:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
from airflow.api_fastapi.core_api.routes.ui.config import config_router
from airflow.api_fastapi.core_api.routes.ui.dags import dags_router
from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router
from airflow.api_fastapi.core_api.routes.ui.graph import graph_data_router
from airflow.api_fastapi.core_api.routes.ui.structure import structure_router

ui_router = AirflowRouter(prefix="/ui")

ui_router.include_router(assets_router)
ui_router.include_router(config_router)
ui_router.include_router(dags_router)
ui_router.include_router(dashboard_router)
ui_router.include_router(graph_data_router)
ui_router.include_router(structure_router)
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,37 @@

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.graph import GraphDataResponse
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict

graph_data_router = AirflowRouter(tags=["Graph"], prefix="/graph")
structure_router = AirflowRouter(tags=["Structure"], prefix="/structure")


@graph_data_router.get(
"/graph_data",
@structure_router.get(
"/structure_data",
include_in_schema=False,
responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]),
)
def graph_data(
def structure_data(
session: SessionDep,
dag_id: str,
request: Request,
root: str | None = None,
include_upstream: bool = False,
include_downstream: bool = False,
) -> GraphDataResponse:
"""Get Graph Data."""
) -> StructureDataResponse:
"""Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)
if root:
dag = dag.partial_subset(
task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream
)

nodes = task_group_to_dict(dag.task_group)
nodes = [
task_group_to_dict(child) for child in sorted(dag.task_group.children.values(), key=lambda t: t.label)
]
edges = dag_edges(dag)

data = {
Expand All @@ -57,4 +59,4 @@ def graph_data(
"edges": edges,
}

return GraphDataResponse(**data)
return StructureDataResponse(**data)
17 changes: 9 additions & 8 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import {
DashboardService,
EventLogService,
ExtraLinksService,
GraphService,
ImportErrorService,
JobService,
MonitorService,
PluginService,
PoolService,
ProviderService,
StructureService,
TaskInstanceService,
TaskService,
VariableService,
Expand Down Expand Up @@ -327,15 +327,16 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = (
useDashboardServiceHistoricalMetricsKey,
...(queryKey ?? [{ endDate, startDate }]),
];
export type GraphServiceGraphDataDefaultResponse = Awaited<
ReturnType<typeof GraphService.graphData>
export type StructureServiceStructureDataDefaultResponse = Awaited<
ReturnType<typeof StructureService.structureData>
>;
export type GraphServiceGraphDataQueryResult<
TData = GraphServiceGraphDataDefaultResponse,
export type StructureServiceStructureDataQueryResult<
TData = StructureServiceStructureDataDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useGraphServiceGraphDataKey = "GraphServiceGraphData";
export const UseGraphServiceGraphDataKeyFn = (
export const useStructureServiceStructureDataKey =
"StructureServiceStructureData";
export const UseStructureServiceStructureDataKeyFn = (
{
dagId,
includeDownstream,
Expand All @@ -349,7 +350,7 @@ export const UseGraphServiceGraphDataKeyFn = (
},
queryKey?: Array<unknown>,
) => [
useGraphServiceGraphDataKey,
useStructureServiceStructureDataKey,
...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]),
];
export type BackfillServiceListBackfillsDefaultResponse = Awaited<
Expand Down
14 changes: 7 additions & 7 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import {
DashboardService,
EventLogService,
ExtraLinksService,
GraphService,
ImportErrorService,
JobService,
MonitorService,
PluginService,
PoolService,
ProviderService,
StructureService,
TaskInstanceService,
TaskService,
VariableService,
Expand Down Expand Up @@ -407,17 +407,17 @@ export const prefetchUseDashboardServiceHistoricalMetrics = (
queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }),
});
/**
* Graph Data
* Get Graph Data.
* Structure Data
* Get Structure Data.
* @param data The data for the request.
* @param data.dagId
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
* @returns GraphDataResponse Successful Response
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
export const prefetchUseGraphServiceGraphData = (
export const prefetchUseStructureServiceStructureData = (
queryClient: QueryClient,
{
dagId,
Expand All @@ -432,14 +432,14 @@ export const prefetchUseGraphServiceGraphData = (
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseGraphServiceGraphDataKeyFn({
queryKey: Common.UseStructureServiceStructureDataKeyFn({
dagId,
includeDownstream,
includeUpstream,
root,
}),
queryFn: () =>
GraphService.graphData({
StructureService.structureData({
dagId,
includeDownstream,
includeUpstream,
Expand Down
Loading

0 comments on commit c0f2826

Please sign in to comment.