diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 4ab90292a48dd..8acd5f4f17cac 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -51,6 +51,9 @@ import useDagRuns from "./useDagRuns"; import useHistoricalMetricsData from "./useHistoricalMetricsData"; import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; import useEventLogs from "./useEventLogs"; +import useTaskAnomaliesData from "./useTaskAnomaliesData"; +import useDagAnomaliesData from "./useDagAnomaliesData"; +import useResourceAnomaliesData from "./useResourceAnomaliesData"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -98,4 +101,7 @@ export { useTaskXcomEntry, useTaskXcomCollection, useEventLogs, + useTaskAnomaliesData, + useDagAnomaliesData, + useResourceAnomaliesData, }; diff --git a/airflow/www/static/js/api/useDagAnomaliesData.ts b/airflow/www/static/js/api/useDagAnomaliesData.ts new file mode 100644 index 0000000000000..6a38f0bb55a48 --- /dev/null +++ b/airflow/www/static/js/api/useDagAnomaliesData.ts @@ -0,0 +1,40 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import axios, { AxiosResponse } from "axios"; +import { useQuery } from "react-query"; +import type { DagAnomaliesData } from "src/types"; + +import { getMetaValue } from "src/utils"; + +const url = getMetaValue("dag_anomalies_data_url"); + +const useDagAnomaliesData = (startDate: string, endDate: string) => + useQuery( + ["dag_anomalies_data", startDate, endDate], + async () => + axios.get(url, { + params: { start_date: startDate, end_date: endDate }, + }), + { + refetchInterval: 60 * 1000, + } + ); + +export default useDagAnomaliesData; diff --git a/airflow/www/static/js/api/useResourceAnomaliesData.ts b/airflow/www/static/js/api/useResourceAnomaliesData.ts new file mode 100644 index 0000000000000..16a3f1940d089 --- /dev/null +++ b/airflow/www/static/js/api/useResourceAnomaliesData.ts @@ -0,0 +1,40 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import axios, { AxiosResponse } from "axios"; +import { useQuery } from "react-query"; +import type { ResourceAnomaliesData } from "src/types"; + +import { getMetaValue } from "src/utils"; + +const url = getMetaValue("resource_anomalies_data_url"); + +const useResourceAnomaliesData = (startDate: string, endDate: string) => + useQuery( + ["resource_anomalies_data", startDate, endDate], + async () => + axios.get(url, { + params: { start_date: startDate, end_date: endDate }, + }), + { + refetchInterval: 60 * 1000, + } + ); + +export default useResourceAnomaliesData; diff --git a/airflow/www/static/js/api/useTaskAnomaliesData.ts b/airflow/www/static/js/api/useTaskAnomaliesData.ts new file mode 100644 index 0000000000000..0f71bcff6e389 --- /dev/null +++ b/airflow/www/static/js/api/useTaskAnomaliesData.ts @@ -0,0 +1,40 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import axios, { AxiosResponse } from "axios"; +import { useQuery } from "react-query"; +import type { TaskAnomaliesData } from "src/types"; + +import { getMetaValue } from "src/utils"; + +const url = getMetaValue("task_anomalies_data_url"); + +const useTaskAnomaliesData = (startDate: string, endDate: string) => + useQuery( + ["historical_metrics_data", startDate, endDate], + async () => + axios.get(url, { + params: { start_date: startDate, end_date: endDate }, + }), + { + refetchInterval: (autoRefreshInterval || 1) * 1000, + } + ); + +export default useTaskAnomaliesData; diff --git a/airflow/www/static/js/cluster-activity/dag-anomalies/PieChart.tsx b/airflow/www/static/js/cluster-activity/dag-anomalies/PieChart.tsx new file mode 100644 index 0000000000000..ca3f61bb54d48 --- /dev/null +++ b/airflow/www/static/js/cluster-activity/dag-anomalies/PieChart.tsx @@ -0,0 +1,147 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { + Box, + BoxProps, + Card, + CardBody, + CardHeader, + Heading, + useTheme, +} from "@chakra-ui/react"; +import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts"; +import type { DagAnomaliesData } from "src/types"; +import { camelCase, mapKeys } from "lodash"; + +interface SeriesPoint { + name: string; + value: number; +} + +type SeriesData = Array; + +const camelCaseColorPalette = mapKeys(stateColors, (_, k) => camelCase(k)); + +const formatData = ( + data: DagAnomaliesData[keyof DagAnomaliesData] | undefined +): [number, SeriesData] => { + if (data === undefined) return [0, []]; + + let sum = 0; + const formattedData: { name: string; value: number }[] = []; + Object.entries(data).forEach(([k, v]) => { + sum += v; + formattedData.push({ + name: k, + value: v, + }); + }); + formattedData.sort((a: SeriesPoint, b: SeriesPoint) => b.value - a.value); + return [sum, formattedData]; +}; + +interface Props extends BoxProps { + title: string; + data?: DagAnomaliesData[keyof DagAnomaliesData]; + colorPalette?: { + [key: string]: string; + }; +} + +const PieChart = ({ + title, + data, + colorPalette = camelCaseColorPalette, + ...rest +}: Props) => { + const theme = useTheme(); + const [sum, formattedData] = formatData(data); + const option: ReactEChartsProps["option"] = { + title: { + text: `on a total of ${sum}`, + left: "right", + top: "bottom", + textStyle: { + fontSize: "14px", + color: theme.colors.gray["500"], + }, + }, + tooltip: { + trigger: "item", + }, + legend: { + left: "center", + type: "scroll", + }, + color: formattedData?.map((d) => { + let color = colorPalette[d.name]; + if (color === undefined) { + // eslint-disable-next-line no-console + console.warn( + `The color for ${d.name} is missing from the palette, defaulting to black` + ); + color = "black"; + } + return color; + }), + series: [ + { + name: title, + type: "pie", + radius: ["35%", "60%"], + avoidLabelOverlap: false, + top: "0%", + itemStyle: { + borderRadius: 5, + borderColor: "#fff", + borderWidth: 2, + }, + label: { + show: false, + position: "center", + }, + emphasis: { + label: { + show: true, + fontSize: 16, + fontWeight: "bold", + }, + }, + data: formattedData, + }, + ], + }; + + return ( + + + + {title} + + + + + + + ); +}; + +export default PieChart; diff --git a/airflow/www/static/js/cluster-activity/dag-anomalies/index.tsx b/airflow/www/static/js/cluster-activity/dag-anomalies/index.tsx new file mode 100644 index 0000000000000..66c15ab9cdb9d --- /dev/null +++ b/airflow/www/static/js/cluster-activity/dag-anomalies/index.tsx @@ -0,0 +1,97 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { + Card, + CardBody, + CardHeader, + Heading, + Text, + Flex, + Table, + Tbody, + Tr, + Td, + Code, + Box, + Thead, + Th, +} from "@chakra-ui/react"; +import InfoTooltip from "src/components/InfoTooltip"; +import FilterBar from "src/cluster-activity/nav/FilterBar"; +import useFilters from "src/cluster-activity/useFilters"; +import { useDagAnomaliesData } from "src/api"; +import PieChart from "src/cluster-activity/historical-metrics/PieChart"; +import LoadingWrapper from "src/components/LoadingWrapper"; +import { SimpleStatus } from "src/dag/StatusBox"; +import { ClipboardText } from "src/components/Clipboard"; +import { formatDuration, getDuration } from "src/datetime_utils"; +import Time from "src/components/Time"; + +const DagAnomalies = () => { + const { + filters: { startDate, endDate }, + } = useFilters(); + const { data, isError } = useDagAnomaliesData(startDate, endDate); + return ( + + + + + Dag Anomalies + {/* */} + + + + {/* */} + + + + + + + + + + + + {(data?.dagAnomalies || []).map((d) => ( + + + + + ))} + +
Dag IdDuration
{d.dagId} + {d.reason} +
+
+
+
+
+
+
+ ); +}; + +export default DagAnomalies; diff --git a/airflow/www/static/js/cluster-activity/index.tsx b/airflow/www/static/js/cluster-activity/index.tsx index 01b21b0dd8403..b6db86e8a3431 100644 --- a/airflow/www/static/js/cluster-activity/index.tsx +++ b/airflow/www/static/js/cluster-activity/index.tsx @@ -25,6 +25,9 @@ import createCache from "@emotion/cache"; import { Flex, Heading } from "@chakra-ui/react"; import App from "src/App"; +import TaskAnomalies from "src/cluster-activity/task-anomalies"; +import DagAnomalies from "src/cluster-activity/dag-anomalies"; +import ResourceAnomalies from "src/cluster-activity/resource-anomalies"; import LiveMetrics from "./live-metrics"; import HistoricalMetrics from "./historical-metrics"; @@ -43,6 +46,12 @@ const ClusterActivity = () => ( flexDirection="column" justifyContent="space-between" > + + Anomaly Detection + + + + Cluster Activity diff --git a/airflow/www/static/js/cluster-activity/resource-anomalies/index.tsx b/airflow/www/static/js/cluster-activity/resource-anomalies/index.tsx new file mode 100644 index 0000000000000..811ea090885b2 --- /dev/null +++ b/airflow/www/static/js/cluster-activity/resource-anomalies/index.tsx @@ -0,0 +1,97 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useState, useEffect } from "react"; +import { + Card, + CardBody, + Flex, + Table, + Tbody, + Tr, + Td, + Box, + Thead, + Th, + CardHeader, + Heading, +} from "@chakra-ui/react"; +import useFilters from "src/cluster-activity/useFilters"; +import { useResourceAnomaliesData } from "src/api"; +import LoadingWrapper from "src/components/LoadingWrapper"; +import InfoTooltip from "src/components/InfoTooltip"; + +const ResourceAnomalies = () => { + const { + filters: { startDate, endDate }, + } = useFilters(); + const { data: fetchedData, isError } = useResourceAnomaliesData( + startDate, + endDate + ); + + const [data, setData] = useState({}); // Assuming fetchedData should be an object + + useEffect(() => { + if ( + fetchedData && + typeof fetchedData === "object" && + !Array.isArray(fetchedData) + ) { + setData(fetchedData); + } + }, [fetchedData]); + const dataEntries = data ? Object.entries(data) : []; + + return ( + + + + + Resource Anomalies + + + + + + + + + + {dataEntries.map(([key, value]) => ( + + + + + ))} + +
{key}{value.toString()}
+
+
+
+
+
+
+ ); +}; + +export default ResourceAnomalies; diff --git a/airflow/www/static/js/cluster-activity/task-anomalies/PieChart.tsx b/airflow/www/static/js/cluster-activity/task-anomalies/PieChart.tsx new file mode 100644 index 0000000000000..cb591bcedaf20 --- /dev/null +++ b/airflow/www/static/js/cluster-activity/task-anomalies/PieChart.tsx @@ -0,0 +1,147 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { + Box, + BoxProps, + Card, + CardBody, + CardHeader, + Heading, + useTheme, +} from "@chakra-ui/react"; +import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts"; +import type { TaskAnomaliesData } from "src/types"; +import { camelCase, mapKeys } from "lodash"; + +interface SeriesPoint { + name: string; + value: number; +} + +type SeriesData = Array; + +const camelCaseColorPalette = mapKeys(stateColors, (_, k) => camelCase(k)); + +const formatData = ( + data: TaskAnomaliesData[keyof TaskAnomaliesData] | undefined +): [number, SeriesData] => { + if (data === undefined) return [0, []]; + + let sum = 0; + const formattedData: { name: string; value: number }[] = []; + Object.entries(data).forEach(([k, v]) => { + sum += v; + formattedData.push({ + name: k, + value: v, + }); + }); + formattedData.sort((a: SeriesPoint, b: SeriesPoint) => b.value - a.value); + return [sum, formattedData]; +}; + +interface Props extends BoxProps { + title: string; + data?: TaskAnomaliesData[keyof TaskAnomaliesData]; + colorPalette?: { + [key: string]: string; + }; +} + +const PieChart = ({ + title, + data, + colorPalette = camelCaseColorPalette, + ...rest +}: Props) => { + const theme = useTheme(); + const [sum, formattedData] = formatData(data); + const option: ReactEChartsProps["option"] = { + title: { + text: `on a total of ${sum}`, + left: "right", + top: "bottom", + textStyle: { + fontSize: "14px", + color: theme.colors.gray["500"], + }, + }, + tooltip: { + trigger: "item", + }, + legend: { + left: "center", + type: "scroll", + }, + color: formattedData?.map((d) => { + let color = colorPalette[d.name]; + if (color === undefined) { + // eslint-disable-next-line no-console + console.warn( + `The color for ${d.name} is missing from the palette, defaulting to black` + ); + color = "black"; + } + return color; + }), + series: [ + { + name: title, + type: "pie", + radius: ["35%", "60%"], + avoidLabelOverlap: false, + top: "0%", + itemStyle: { + borderRadius: 5, + borderColor: "#fff", + borderWidth: 2, + }, + label: { + show: false, + position: "center", + }, + emphasis: { + label: { + show: true, + fontSize: 16, + fontWeight: "bold", + }, + }, + data: formattedData, + }, + ], + }; + + return ( + + + + {title} + + + + + + + ); +}; + +export default PieChart; diff --git a/airflow/www/static/js/cluster-activity/task-anomalies/index.tsx b/airflow/www/static/js/cluster-activity/task-anomalies/index.tsx new file mode 100644 index 0000000000000..bd41fe8036108 --- /dev/null +++ b/airflow/www/static/js/cluster-activity/task-anomalies/index.tsx @@ -0,0 +1,152 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React, { useState, useEffect } from "react"; +import { + Card, + CardBody, + CardHeader, + Flex, + Heading, + Table, + Thead, + Tr, + Th, + Tbody, + Td, + Box, + Tooltip, +} from "@chakra-ui/react"; +import InfoTooltip from "src/components/InfoTooltip"; +import FilterBar from "src/cluster-activity/nav/FilterBar"; +import useFilters from "src/cluster-activity/useFilters"; +import { useTaskAnomaliesData } from "src/api"; +import LoadingWrapper from "src/components/LoadingWrapper"; + +const mockTaskAnomalies = [ + { + taskId: "task_0", + period: "Last Month", + duration: 960, + deviation: "1.8 SDs", + median: 850, + }, + { + taskId: "task_1", + period: "Last Month", + duration: 1200, + deviation: "2.0 SDs", + median: 1150, + }, + { + taskId: "task_2", + period: "Last Week", + duration: 300, + deviation: "1.5 SDs", + median: 250, + }, +]; + +const TaskAnomalies = () => { + const { + filters: { startDate, endDate }, + } = useFilters(); + const { data: fetchedData, isError } = useTaskAnomaliesData( + startDate, + endDate + ); + + const [data, setData] = useState([]); + + useEffect(() => { + setData(mockTaskAnomalies); + }, [fetchedData]); + + return ( + + + + + Task Anomalies + + + + + + + + + + + + + + + + + + + + {data && + data.map((d) => ( + + + + + + + + ))} + +
Task IDPeriodDuration (s)Median (s)Deviation
+ + {d.taskId} + + + + {d.period} + + + + {d.duration} + + + + {d.median} + + + + {d.deviation} + +
+
+
+
+
+
+
+ ); +}; + +export default TaskAnomalies; diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts index 926db4760d985..3867a4067e396 100644 --- a/airflow/www/static/js/types/index.ts +++ b/airflow/www/static/js/types/index.ts @@ -214,6 +214,39 @@ interface HistoricalMetricsData { }; } +interface TaskAnomaliesData { + dagRunStates: { + [K in CamelCase]: number; + }; + dagRunTypes: { + [K in CamelCase]: number; + }; + taskInstanceStates: { + [K in TaskState extends string ? CamelCase : never]: number; + }; +} + +interface ResourceAnomaliesData { + resourceAnomalies: [ + worker: string, + scheduler: string, + trigger: string, + webserver: string + ]; +} + +interface TakingLongTimeData { + dagId: string; + latestMonthMedian: number; + allTimeMedian: number; + allTimeSd: number; + medianMult: number; + reason: string; +} +interface DagAnomaliesData { + dagAnomalies: [TakingLongTimeData]; +} + export type { API, Dag, @@ -230,4 +263,7 @@ export type { TaskState, KeyboardShortcutKeys, KeyboardShortcutIdentifier, + TaskAnomaliesData, + DagAnomaliesData, + ResourceAnomaliesData, }; diff --git a/airflow/www/templates/airflow/cluster_activity.html b/airflow/www/templates/airflow/cluster_activity.html index 0f99b186955a9..b60b618b2905e 100644 --- a/airflow/www/templates/airflow/cluster_activity.html +++ b/airflow/www/templates/airflow/cluster_activity.html @@ -28,6 +28,9 @@ + + + {% endblock %} {% block content %} diff --git a/airflow/www/templates/airflow/code_analysis.html b/airflow/www/templates/airflow/code_analysis.html new file mode 100644 index 0000000000000..0fc7ad7cf301c --- /dev/null +++ b/airflow/www/templates/airflow/code_analysis.html @@ -0,0 +1,62 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +#} + +{% extends "airflow/dag.html" %} +{% block page_title %}{{ dag.dag_id }} - Grid - {{ appbuilder.app_name }}{% endblock %} +{% from 'appbuilder/loading_dots.html' import loading_dots %} + +{% block head_meta %} + {{ super() }} + + + + + +{% endblock %} + +{% block content %} + {{ super() }} +
+
+ +
+

{{ ask_airflow_response.response }}

+
+ + + +
+

Relevant Sources:

+
    + {% for source in ask_airflow_response.sources %} +
  • + {{ source.name }} +
    +
  • + {% endfor %} +
+ +
+ +
+{% endblock %} + +{% block tail_js %} + {{ super()}} +{% endblock %} diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 939809ce81ea8..5e1eae831b5d6 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -216,6 +216,9 @@

Code +
  • + + SmartSuggest Code Analysis
  • Audit Log
  • diff --git a/airflow/www/templates/airflow/task_instance.html b/airflow/www/templates/airflow/task_instance.html index 44a764b63ada3..6fc927ae91cf0 100644 --- a/airflow/www/templates/airflow/task_instance.html +++ b/airflow/www/templates/airflow/task_instance.html @@ -56,6 +56,8 @@

  • XCom
  • +
  • + SmartSuggest Log Analysis

  • {% endblock %} diff --git a/airflow/www/templates/airflow/ti_log_help.html b/airflow/www/templates/airflow/ti_log_help.html new file mode 100644 index 0000000000000..6785654ee9146 --- /dev/null +++ b/airflow/www/templates/airflow/ti_log_help.html @@ -0,0 +1,65 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +#} + +{% extends "airflow/task_instance.html" %} + +{% block title %}Logs - {{ appbuilder.app_name }}{% endblock %} + +{% block head_meta %} + {{ super() }} + + +{% endblock %} + +{% block content %} + {{ super() }} + {% if ask_airflow_response %} +
    +
    {{ ask_airflow_response.response }}
    + {% if ask_airflow_response.sources %} +

    Relevant Sources:

    +
      + {% for source in ask_airflow_response.sources %} +
    • + {{ source.name }} +
      +
    • + + {% endfor %} +
    + + {% endif %} +
    + {% else %} +
    Loading analysis from Airflow Doctor...
    + {% endif %} +{% endblock %} diff --git a/airflow/www/views.py b/airflow/www/views.py index a08871ce5bdf8..bb93231577ff2 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -43,6 +43,7 @@ import lazy_object_proxy import nvd3 import re2 +import requests import sqlalchemy as sqla from croniter import croniter from flask import ( @@ -281,6 +282,50 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): } +super_ugly_query = """ +set TIMEZONE='utc'; +SELECT + latest_month.dag_id, +-- latest_month.month, + latest_month.median, + all_time.median, + all_time.sd, + (latest_month.median - all_time.median) / all_time.sd as median_mult +FROM ( + SELECT + a.dag_id, + a.month, + a.median_runtime_seconds AS median + FROM ( + SELECT + dag_run.dag_id, + cast(date_trunc('month', dag_run.start_date) AS DATE) AS "month", + percentile_cont(0.5) WITHIN GROUP ( ORDER BY extract('epoch' FROM dag_run.end_date - dag_run.start_date) ) AS median_runtime_seconds + FROM dag_run + GROUP BY + 1, 2 + ) a + WHERE + a.month + INTERVAL '1 month' = date_trunc('month', current_date)::DATE + ) AS latest_month +JOIN ( + SELECT + a.dag_id, + percentile_cont(.5) WITHIN GROUP (ORDER BY a.duration) AS median, + stddev(a.duration) AS sd + FROM ( + SELECT + dag_run.dag_id, + extract('epoch' FROM dag_run.end_date - dag_run.start_date) AS duration + FROM dag_run + ) a + GROUP BY 1 + ) all_time + ON all_time.dag_id = latest_month.dag_id + AND latest_month.median > (all_time.median + all_time.sd) +""" + + def _safe_parse_datetime(v, *, allow_empty=False, strict=True) -> datetime.datetime | None: """ Parse datetime and return error message for invalid dates. @@ -1654,6 +1699,118 @@ def log(self, session: Session = NEW_SESSION): wrapped=conf.getboolean("webserver", "default_wrap"), ) + @expose("/log_help") + @auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS) + @provide_session + def log_help(self, session: Session = NEW_SESSION): + """Retrieve suggestion for based on log.""" + dag_id = request.args["dag_id"] + task_id = request.args.get("task_id") + map_index = request.args.get("map_index", -1, type=int) + execution_date = request.args.get("execution_date") + try_number = request.args.get("try_number", type=int) + metadata_str = request.args.get("metadata", "{}") + + # Validate JSON metadata + try: + metadata: dict = json.loads(metadata_str) or {} + except json.decoder.JSONDecodeError: + return {"error": "Invalid JSON metadata"}, 400 + + if execution_date: + dttm = _safe_parse_datetime(execution_date) + else: + dttm = None + + dag_model = DagModel.get_dagmodel(dag_id) + + ti = session.scalar( + select(models.TaskInstance) + .filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index) + .limit(1) + ) + ask_airflow_response = {} + log_tuples: list[tuple[tuple[str, str]]] = [] + if ti is not None: + try: + task_log_reader = TaskLogReader() + if task_log_reader.supports_read: + log_tuples, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) + + logs = [] + for log_chunk in log_tuples: + for timestamp, message in log_chunk: + logs.append(f"{timestamp} {message}") + + combined_logs = "\n".join(logs) + if combined_logs: + ask_airflow_response = self._analyze_log_with_airflow_doctor(combined_logs) + + except Exception as e: + logs = [f"Error fetching logs: {e}"] + + root = request.args.get("root", "") + + return self.render_template( + "airflow/ti_log_help.html", + show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"), + logs=log_tuples, + ask_airflow_response=ask_airflow_response, + dag=dag_model, + title="Log by attempts", + dag_id=dag_id, + task_id=task_id, + execution_date=execution_date, + map_index=map_index, + root=root, + wrapped=conf.getboolean("webserver", "default_wrap"), + ) + + def _send_initial_ask_astro_request(self, prompt): + """Send the initial POST request to the API.""" + url = "https://ask-astro-dev-sxjq32dlaa-uk.a.run.app/requests" + headers = { + "accept": "application/json", + "Content-Type": "application/json", + } + data = {"prompt": prompt} + + response = requests.post(url, headers=headers, json=data) + response.raise_for_status() + return response.json() + + def _get_ask_astro_request_status(self, request_uuid): + """Poll the API for the status of a request.""" + url = f"https://ask-astro-dev-sxjq32dlaa-uk.a.run.app/requests/{request_uuid}" + headers = {"accept": "application/json"} + + while True: + response = requests.get(url, headers=headers) + # This will raise an exception for HTTP errors + response.raise_for_status() + response_data = response.json() + + status = response_data.get("status") + if status == "complete": + return response_data + elif status != "in_progress": + # If status is neither "complete" nor "in_progress", something unexpected happened + raise Exception(f"Unexpected status: {status}") + + def _analyze_log_with_airflow_doctor(self, log_data): + """Send log data to Airflow Doctor and waits for the analysis to complete.""" + prompt = ( + "Highlight any error in the following airflow task log? If there are any errors how can we fix it?\n" + + log_data + ) + initial_response = self._send_initial_ask_astro_request(prompt) + + request_uuid = initial_response.get("request_uuid") + if not request_uuid: + raise ValueError("No request UUID in response") + + return self._get_ask_astro_request_status(request_uuid) + @expose("/redirect_to_external_log") @auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS) @provide_session @@ -2828,6 +2985,64 @@ def grid(self, dag_id: str, session: Session = NEW_SESSION): ), ) + @expose("/code_analysis/") + @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) + @gzipped + @provide_session + def code_analysis(self, dag_id: str, session: Session = NEW_SESSION): + """Get Dag's grid view.""" + from airflow.models.dagcode import DagCode + + dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session) + dag_model = DagModel.get_dagmodel(dag_id, session=session) + if not dag: + flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error") + return redirect(url_for("Airflow.index")) + wwwutils.check_import_errors(dag.fileloc, session) + wwwutils.check_dag_warnings(dag.dag_id, session) + + root = request.args.get("root") + if root: + dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True) + + num_runs = request.args.get("num_runs", type=int) + if num_runs is None: + num_runs = conf.getint("webserver", "default_dag_run_display_number") + + doc_md = wwwutils.wrapped_markdown(getattr(dag, "doc_md", None)) + + task_log_reader = TaskLogReader() + if task_log_reader.supports_external_link: + external_log_name = task_log_reader.log_handler.log_name + else: + external_log_name = None + + dag_code = DagCode.get_code_by_fileloc(dag.fileloc) + ask_airflow_response = None + if dag_code: + prompt = ( + "Highlight if there is any problems (for example: review idempotency, check is secrets " + "are exposed, keep tasks atomic,avoid top-level code in your DAG file, use of a " + "consistent method for task dependencies, use DAG name and start date properly, set " + "retries, use deferable operators in possible cases etc) with the following DAG code? " + "Rewrite the DAG with good DAG writing practices. \n" + ) + dag_code + ask_airflow_response = self._analyze_log_with_airflow_doctor(prompt) + + return self.render_template( + "airflow/code_analysis.html", + show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"), + root=root, + dag=dag, + ask_airflow_response=ask_airflow_response, + doc_md=doc_md, + num_runs=num_runs, + show_external_log_redirect=task_log_reader.supports_external_link, + external_log_name=external_log_name, + dag_model=dag_model, + auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"), + ) + @expose("/calendar") def legacy_calendar(self): """Redirect from url param.""" @@ -3560,6 +3775,165 @@ def historical_metrics_data(self): {"Content-Type": "application/json; charset=utf-8"}, ) + @expose("/object/task_anomalies_data") + @auth.has_access_view(AccessView.CLUSTER_ACTIVITY) + def task_anomalies_data(self): + """Return cluster activity historical metrics.""" + start_date = _safe_parse_datetime(request.args.get("start_date")) + end_date = _safe_parse_datetime(request.args.get("end_date")) + + with create_session() as session: + # DagRuns + dag_run_types = session.execute( + select(DagRun.run_type, func.count(DagRun.run_id)) + .where( + DagRun.start_date >= start_date, + func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date, + ) + .group_by(DagRun.run_type) + ).all() + + dag_run_states = session.execute( + select(DagRun.state, func.count(DagRun.run_id)) + .where( + DagRun.start_date >= start_date, + func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date, + ) + .group_by(DagRun.state) + ).all() + + # TaskInstances + task_instance_states = session.execute( + select(TaskInstance.state, func.count(TaskInstance.run_id)) + .join(TaskInstance.dag_run) + .where( + DagRun.start_date >= start_date, + func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date, + ) + .group_by(TaskInstance.state) + ).all() + + data = { + "dag_run_types": { + **{dag_run_type.value: 0 for dag_run_type in DagRunType}, + **dict(dag_run_types), + }, + "dag_run_states": { + **{dag_run_state.value: 0 for dag_run_state in DagRunState}, + **dict(dag_run_states), + }, + "task_instance_states": { + "no_status": 0, + **{ti_state.value: 0 for ti_state in TaskInstanceState}, + **{ti_state or "no_status": sum_value for ti_state, sum_value in task_instance_states}, + }, + } + + return ( + htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps), + {"Content-Type": "application/json; charset=utf-8"}, + ) + + @expose("/object/dag_anomalies_data") + @auth.has_access_view(AccessView.CLUSTER_ACTIVITY) + def dag_anomalies_data(self): + """Return cluster activity historical metrics.""" + start_date = _safe_parse_datetime(request.args.get("start_date")) + end_date = _safe_parse_datetime(request.args.get("end_date")) + + with create_session() as session: + result = session.execute(super_ugly_query).all() + cols = [ + "dag_id", + "latest_month_median", + "all_time_median", + "all_time_sd", + "median_mult", + ] + + result = [dict(zip(cols, row)) for row in result] + for r in result: + mult = r["median_mult"] + mult = round(mult, 1) + latest_med = r["latest_month_median"] + all_time_med = r["all_time_median"] + r[ + "reason" + ] = f"Latest month this DAG took {latest_med}, which is {mult} standard deviations longer than all time median of {all_time_med}." + # todo ideas: + # month over month variation + # day of the week variation + # time of day variation + dags = session.scalars(select(DagModel.dag_id)).all() + slog = logging.getLogger("sqlalchemy") + slog.setLevel(logging.INFO) + slog.setLevel(logging.WARNING) + data = { + "dag_anomalies": result, + } + + return ( + htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps), + {"Content-Type": "application/json; charset=utf-8"}, + ) + + @expose("/object/resource_anomalies_data") + @auth.has_access_view(AccessView.CLUSTER_ACTIVITY) + def resource_anomalies_data(self): + mock_data = [ + {"worker_id": "1", "size": "2Gi", "used": "1.95Gi", "available": "0.05Gi", "time": 1709709345}, + {"worker_id": "2", "size": "2Gi", "used": "0.5Gi", "available": "1.5Gi", "time": 1709709345}, + {"worker_id": "3", "size": "2Gi", "used": "0.4Gi", "available": "1.6Gi", "time": 1709709345}, + {"worker_id": "4", "size": "2Gi", "used": "0.6Gi", "available": "1.4Gi", "time": 1709709345}, + {"worker_id": "1", "size": "2Gi", "used": "1.95Gi", "available": "0.05Gi", "time": 1709709445}, + {"worker_id": "2", "size": "2Gi", "used": "0.5Gi", "available": "1.5Gi", "time": 1709709445}, + {"worker_id": "3", "size": "2Gi", "used": "0.4Gi", "available": "1.6Gi", "time": 1709709445}, + {"worker_id": "4", "size": "2Gi", "used": "0.6Gi", "available": "1.4Gi", "time": 1709709445}, + {"worker_id": "1", "size": "2Gi", "used": "1.95Gi", "available": "0.05Gi", "time": 1709709545}, + {"worker_id": "2", "size": "2Gi", "used": "0.5Gi", "available": "1.5Gi", "time": 1709709545}, + {"worker_id": "3", "size": "2Gi", "used": "0.4Gi", "available": "1.6Gi", "time": 1709709545}, + {"worker_id": "4", "size": "2Gi", "used": "0.6Gi", "available": "1.4Gi", "time": 1709709545}, + {"worker_id": "1", "size": "2Gi", "used": "1.95Gi", "available": "0.05Gi", "time": 1709709645}, + {"worker_id": "2", "size": "2Gi", "used": "0.5Gi", "available": "1.5Gi", "time": 1709709645}, + {"worker_id": "3", "size": "2Gi", "used": "0.4Gi", "available": "1.6Gi", "time": 1709709645}, + {"worker_id": "4", "size": "2Gi", "used": "0.6Gi", "available": "1.4Gi", "time": 1709709645}, + ] + prompt = f""" + based on the worker memory consumption suggest which worker memory can be increase or decrease + {mock_data} in short 1 sentence + """ + response = "" + try: + init_response = self._send_initial_ask_astro_request(prompt) + request_uuid = init_response.get("request_uuid") + if not request_uuid: + return () + response = self._get_ask_astro_request_status(request_uuid) + except: + pass + + worker_suggestion = "" + response_on_worker = response.get("response") + cost_prompt = f"Based on the following suggestion. If all workers currently are A20 on Astro Deployments.What should be the ideal worker type based on following suggestion. {response_on_worker} and following worker memory profile {mock_data}. Explain in short 2 sentences." + try: + cost_init_response = self._send_initial_ask_astro_request(cost_prompt) + cost_request_uuid = cost_init_response.get("request_uuid") + if not cost_request_uuid: + return () + worker_suggestion = self._get_ask_astro_request_status(cost_request_uuid) + except: + pass + + json_response = { + "Workers": response.get("response"), + "Cost": "Currently all workers on Astro deployment are of type A20. " + + worker_suggestion.get("response"), + } + return ( + htmlsafe_json_dumps(json_response, separators=(",", ":"), dumps=flask.json.dumps), + {"Content-Type": "application/json; charset=utf-8"}, + ) + @expose("/object/next_run_datasets/") @auth.has_access_dag("GET", DagAccessEntity.RUN) @auth.has_access_dataset("GET")