diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 4d5bd0a56e8d7..158a47644463d 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -159,38 +159,21 @@ def depends(self, dag_run_ids: list[str] = Query(None)) -> DagRunIdsFilter: return self.set_value(dag_run_ids) -class DagRunRunTypesFilter(BaseParam[list[str]]): +class DagRunRunTypesFilter(BaseParam[Optional[list[str]]]): """Filter on dag run run_types.""" - def __init__(self, model: Base, value: list[str] | None = None, skip_none: bool = True) -> None: + def __init__(self, value: list[str] | None = None, skip_none: bool = True) -> None: super().__init__(value, skip_none) - self.model = model def to_orm(self, select: Select) -> Select: if self.value and self.skip_none: - return select.where(self.model.run_type.in_(self.value)) + return select.where(DagRun.run_type.in_(self.value)) return select def depends(self, run_types: list[str] = Query(None)) -> DagRunRunTypesFilter: return self.set_value(run_types) -class DagRunRunStatesFilter(BaseParam[list[str]]): - """Filter on dag run_states.""" - - def __init__(self, model: Base, value: list[str] | None = None, skip_none: bool = True) -> None: - super().__init__(value, skip_none) - self.model = model - - def to_orm(self, select: Select) -> Select: - if self.value and self.skip_none: - return select.where(self.model.state.in_(self.value)) - return select - - def depends(self, run_states: list[str] = Query(None)) -> DagRunRunStatesFilter: - return self.set_value(run_states) - - class TaskIdsFilter(BaseParam[list[str]]): """Filter on task ids.""" @@ -786,8 +769,7 @@ def depends_float( QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)] QueryDagIdsFilter = Annotated[DagIdsFilter, Depends(DagIdsFilter(DagRun).depends)] QueryDagRunStateFilter = Annotated[DagRunStateFilter, Depends(DagRunStateFilter().depends)] -QueryDagRunRunTypesFilter = Annotated[DagRunRunTypesFilter, Depends(DagRunRunTypesFilter(DagRun).depends)] -QueryDagRunRunStatesFilter = Annotated[DagRunRunStatesFilter, Depends(DagRunRunStatesFilter(DagRun).depends)] +QueryDagRunRunTypesFilter = Annotated[DagRunRunTypesFilter, Depends(DagRunRunTypesFilter().depends)] # DAGWarning QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter, Depends(_DagIdFilter(DagWarning.dag_id).depends)] diff --git a/airflow/api_fastapi/core_api/datamodels/ui/grid.py b/airflow/api_fastapi/core_api/datamodels/ui/grid.py index ca615a490ca8e..74b937f7afcc4 100644 --- a/airflow/api_fastapi/core_api/datamodels/ui/grid.py +++ b/airflow/api_fastapi/core_api/datamodels/ui/grid.py @@ -23,18 +23,6 @@ from pydantic import BaseModel -class GridTaskInstance(BaseModel): - """Task Instance model for the Grid UI.""" - - run_id: str - task_id: str - try_number: int - start_date: datetime | None - end_date: datetime | None - queued_dttm: datetime | None - state: str | None - - class GridTaskInstanceSummary(BaseModel): """Task Instance Summary model for the Grid UI.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index bd3d1eda5f542..e9b218580630a 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -194,7 +194,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPExceptionResponse' - /ui/grid/: + /ui/grid/{dag_id}: get: tags: - Grid @@ -203,16 +203,18 @@ paths: operationId: grid_data parameters: - name: dag_id - in: query + in: path required: true schema: type: string title: Dag Id - name: base_date in: query - required: true + required: false schema: - type: string + anyOf: + - type: string + - type: 'null' title: Base Date - name: root in: query @@ -244,14 +246,14 @@ paths: items: type: string title: Run Types - - name: run_states + - name: state in: query required: false schema: type: array items: type: string - title: Run States + title: State - name: offset in: query required: false diff --git a/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow/api_fastapi/core_api/routes/ui/grid.py index 7d3c2f381ab5b..f8ed98cd88eba 100644 --- a/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -24,26 +24,25 @@ from typing import Annotated from fastapi import Depends, HTTPException, Request, status -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.orm import Session +from sqlalchemy.sql.operators import ColumnOperators from typing_extensions import Any from airflow import DAG from airflow.api_fastapi.common.db.common import get_session, paginated_select from airflow.api_fastapi.common.parameters import ( - DateTimeQuery, - QueryDagRunRunStatesFilter, + OptionalDateTimeQuery, QueryDagRunRunTypesFilter, + QueryDagRunStateFilter, QueryLimit, QueryOffset, SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.ui.grid import ( - GridDAGRun, GridDAGRunwithTIs, GridResponse, - GridTaskInstance, GridTaskInstanceSummary, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc @@ -51,26 +50,28 @@ from airflow.exceptions import AirflowConfigException from airflow.models import DagRun, TaskInstance from airflow.models.baseoperator import BaseOperator +from airflow.models.taskmap import TaskMap +from airflow.utils import timezone from airflow.utils.state import TaskInstanceState -from airflow.utils.task_group import TaskGroup +from airflow.utils.task_group import MappedTaskGroup, TaskGroup grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) @grid_router.get( - "/", + "/{dag_id}", include_in_schema=False, responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), ) def grid_data( dag_id: str, - base_date: DateTimeQuery, run_types: QueryDagRunRunTypesFilter, - run_states: QueryDagRunRunStatesFilter, + run_states: QueryDagRunStateFilter, session: Annotated[Session, Depends(get_session)], offset: QueryOffset, request: Request, num_runs: QueryLimit, + base_date: OptionalDateTimeQuery = None, root: str | None = None, filter_upstream: bool = False, filter_downstream: bool = False, @@ -86,9 +87,7 @@ def grid_data( task_ids_or_regex=root, include_upstream=filter_upstream, include_downstream=filter_downstream ) - if num_runs is None: - num_runs = QueryLimit(conf.getint("webserver", "default_dag_run_display_number")) - + current_time = timezone.utcnow() # Retrieve, sort and encode the previous DAG Runs base_query = ( select( @@ -103,23 +102,41 @@ def grid_data( DagRun.dag_version_id.label("version_number"), ) .select_from(DagRun) - .where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= base_date) + .where(DagRun.dag_id == dag.dag_id, DagRun.logical_date <= func.coalesce(base_date, current_time)) + .order_by(DagRun.id.desc()) ) + def get_dag_run_sort_param(): + """Get the Sort Param for the DAG Run.""" + + def _get_run_ordering_expr(name: str) -> ColumnOperators: + """Get the Run Ordering Expression.""" + expr = DagRun.__mapper__.columns[name] + # Data interval columns are NULL for runs created before 2.3, but SQL's + # NULL-sorting logic would make those old runs always appear first. In a + # perfect world we'd want to sort by ``get_run_data_interval()``, but that's + # not efficient, so instead the columns are coalesced into logical_date, + # which is good enough in most cases. + if name in ("data_interval_start", "data_interval_end"): + expr = func.coalesce(expr, DagRun.logical_date) + return expr.desc() + + ordering_expression = (_get_run_ordering_expr(name) for name in dag.timetable.run_ordering) + # create SortParam with ordering_expression and DagRun.id.desc() + return ordering_expression + dag_runs_select_filter, _ = paginated_select( - select=base_query, + statement=base_query.order_by(*get_dag_run_sort_param(), DagRun.id.desc()), filters=[ run_types, run_states, ], - order_by=SortParam(allowed_attrs=["id"], model=DagRun), + order_by=None, offset=offset, limit=num_runs, ) dag_runs = session.execute(dag_runs_select_filter) - # Validate the DAG Runs to have consistent data - dag_runs = [GridDAGRun(**dag_run) for dag_run in dag_runs.all()] # Check if there are any DAG Runs with given criteria to eliminate unnecessary queries/errors if not dag_runs: @@ -130,7 +147,7 @@ def grid_data( # Retrieve, sort and encode the Task Instances tis_of_dag_runs, _ = paginated_select( - select=select( + statement=select( TaskInstance.run_id, TaskInstance.task_id, TaskInstance.try_number, @@ -149,28 +166,9 @@ def grid_data( task_instances = session.execute(tis_of_dag_runs) - # Validate the task instances to have consistent data - task_instances = [GridTaskInstance(**ti) for ti in task_instances] - - ## Additional logic to calculate the overall state and task count dict of states - priority: list[None | TaskInstanceState] = [ - TaskInstanceState.FAILED, - TaskInstanceState.UPSTREAM_FAILED, - TaskInstanceState.UP_FOR_RETRY, - TaskInstanceState.UP_FOR_RESCHEDULE, - TaskInstanceState.QUEUED, - TaskInstanceState.SCHEDULED, - TaskInstanceState.DEFERRED, - TaskInstanceState.RUNNING, - TaskInstanceState.RESTARTING, - None, - TaskInstanceState.SUCCESS, - TaskInstanceState.SKIPPED, - TaskInstanceState.REMOVED, - ] - @cache def get_task_group_children_getter() -> operator.methodcaller: + """Get the Task Group Children Getter for the DAG.""" sort_order = conf.get("webserver", "grid_view_sorting_order", fallback="topological") if sort_order == "topological": return operator.methodcaller("topological_sort") @@ -180,110 +178,137 @@ def get_task_group_children_getter() -> operator.methodcaller: @cache def get_task_group_map() -> dict[str, dict[str, Any]]: + """Get the Task Group Map for the DAG.""" task_nodes = {} - def _fill_task_group_map_list( - task_node: BaseOperator | TaskGroup | None, parent_node: BaseOperator | TaskGroup | None + def _fill_task_group_map( + task_node: BaseOperator | MappedTaskGroup | TaskMap | None, + parent_node: BaseOperator | MappedTaskGroup | TaskMap | None, ): + """Recursively fill the Task Group Map.""" if task_node is None: return if isinstance(task_node, BaseOperator): - task_nodes.update( - { - task_node.task_id: { - "is_group": False, - "parent_id": parent_node.node_id if parent_node else None, - "task_count": 1, - } - } - ) + task_nodes[task_node.task_id] = { + "is_group": False, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": 1, + } return elif isinstance(task_node, TaskGroup): - task_nodes.update( - { - task_node.node_id: { - "is_group": True, - "parent_id": parent_node.node_id if parent_node else None, - "task_count": len( - [ - child - for child in get_task_group_children_getter()(task_node) - if isinstance(child, BaseOperator) - ] - ), - } - } - ) + task_nodes[task_node.node_id] = { + "is_group": True, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": len([child for child in get_task_group_children_getter()(task_node)]), + } return [ - _fill_task_group_map_list(child, task_node) + _fill_task_group_map(task_node=child, parent_node=task_node) for child in get_task_group_children_getter()(task_node) ] for node in [child for child in get_task_group_children_getter()(dag.task_group)]: - _fill_task_group_map_list(node, None) + _fill_task_group_map(task_node=node, parent_node=None) return task_nodes - task_group_map = get_task_group_map() + # Generate Grouped Task Instances + task_node_map = get_task_group_map() + parent_tis: dict[tuple[str, str], list] = collections.defaultdict(list) + all_tis: dict[tuple[str, str], list] = collections.defaultdict(list) for ti in task_instances: - if ( - task_group_map[ti.task_id]["is_group"] - or not task_group_map[ti.task_id]["parent_id"] - and task_group_map[ti.task_id]["is_group"] - ): - ti.task_id = ti.task_id - elif task_group_map[ti.task_id]["parent_id"] and not task_group_map[ti.task_id]["is_group"]: - ti.task_id = task_group_map[ti.task_id]["parent_id"] + all_tis[(ti.task_id, ti.run_id)].append(ti) + parent_id = task_node_map[ti.task_id]["parent_id"] + if not parent_id and task_node_map[ti.task_id]["is_group"]: + parent_tis[(ti.task_id, ti.run_id)].append(ti) + elif parent_id and task_node_map[parent_id]["is_group"]: + parent_tis[(parent_id, ti.run_id)].append(ti) - grouped_tis: dict[tuple, list[GridTaskInstance]] = collections.defaultdict( - list, - ( - ((task_id, run_id), list(tis)) - for (task_id, run_id), tis in itertools.groupby( - task_instances, key=lambda ti: (ti.task_id, ti.run_id) - ) - ), + # Extend subgroup task instances to parent task instances to calculate the aggregates states + task_group_map = {k: v for k, v in task_node_map.items() if v["is_group"]} + parent_tis.update( + { + (task_id_parent, run_id): parent_tis[(task_id_parent, run_id)] + parent_tis[(task_id, run_id)] + for task_id, task_map in task_group_map.items() + if task_map["is_group"] + for (task_id_parent, run_id), tis in parent_tis.items() + if task_id_parent == task_map["parent_id"] + } ) - task_instance_summaries: dict[str, list[GridTaskInstanceSummary]] = { - ti.run_id: [] for ti in task_instances - } + def fill_task_instance_summaries( + grouped_task_instances: dict[tuple[str, str], list], + task_instance_summaries_to_fill: dict[str, list], + ): + ## Additional logic to calculate the overall state and task count dict of states + priority: list[None | TaskInstanceState] = [ + TaskInstanceState.FAILED, + TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.UP_FOR_RETRY, + TaskInstanceState.UP_FOR_RESCHEDULE, + TaskInstanceState.QUEUED, + TaskInstanceState.SCHEDULED, + TaskInstanceState.DEFERRED, + TaskInstanceState.RUNNING, + TaskInstanceState.RESTARTING, + None, + TaskInstanceState.SUCCESS, + TaskInstanceState.SKIPPED, + TaskInstanceState.REMOVED, + ] - for (task_id, run_id), tis in grouped_tis.items(): - overall_state = next( - (state.value for ti in tis for state in priority if state is not None and ti.state == state), None - ) - ti_try_number = max([ti.try_number for ti in tis]) - ti_start_date = min([ti.start_date for ti in tis if ti.start_date]) - ti_end_date = max([ti.end_date for ti in tis if ti.end_date]) - ti_queued_dttm = min([ti.queued_dttm for ti in tis if ti.queued_dttm], default=None) - all_states = {"no_status" if state is None else state.name.lower(): 0 for state in priority} - all_states.update( - { - "no_status" if state is None else state.name.lower(): len( - [ti for ti in tis if ti.state == state] - ) - for state in priority - } - ) + for (task_id, run_id), tis in grouped_task_instances.items(): + overall_state = next( + (state.value for ti in tis for state in priority if state is not None and ti.state == state), + None, + ) + ti_try_number = max([ti.try_number for ti in tis]) + ti_start_date = min([ti.start_date for ti in tis if ti.start_date], default=None) + ti_end_date = max([ti.end_date for ti in tis if ti.end_date], default=None) + ti_queued_dttm = min([ti.queued_dttm for ti in tis if ti.queued_dttm], default=None) + all_states = {"no_status" if state is None else state.name.lower(): 0 for state in priority} + all_states.update( + { + "no_status" if state is None else state.name.lower(): len( + [ti for ti in tis if ti.state == state] + ) + for state in priority + } + ) - task_instance_summaries[run_id].append( - GridTaskInstanceSummary( - task_id=task_id, - try_number=ti_try_number, - start_date=ti_start_date, - end_date=ti_end_date, - queued_dttm=ti_queued_dttm, - states=all_states, - task_count=task_group_map[task_id]["task_count"], - overall_state=overall_state, + task_instance_summaries_to_fill[run_id].append( + GridTaskInstanceSummary( + task_id=task_id, + try_number=ti_try_number, + start_date=ti_start_date, + end_date=ti_end_date, + queued_dttm=ti_queued_dttm, + states=all_states, + task_count=task_node_map[task_id]["task_count"], + overall_state=overall_state, + ) ) - ) + + # Create the Task Instance Summaries to be used in the Grid Response + task_instance_summaries: dict[str, list] = { + run_id: [] for (_, run_id), _ in itertools.chain(parent_tis.items(), all_tis.items()) + } + + # Fill the Task Instance Summaries for the Parent and Grouped Task Instances. + # First the Parent Task Instances because they are used in the Grouped Task Instances + fill_task_instance_summaries( + grouped_task_instances=parent_tis, + task_instance_summaries_to_fill=task_instance_summaries, + ) + # Fill the Task Instance Summaries for the Grouped Task Instances + fill_task_instance_summaries( + grouped_task_instances=all_tis, + task_instance_summaries_to_fill=task_instance_summaries, + ) # Aggregate the Task Instances by DAG Run grid_dag_runs = [ GridDAGRunwithTIs( - **dag_run.model_dump(), + **dag_run, task_instances=task_instance_summaries[dag_run.run_id], ) for dag_run in dag_runs diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index cb7f5c7a53724..2e5774af61e79 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -15,6 +15,7 @@ import { DashboardService, EventLogService, ExtraLinksService, + GridService, ImportErrorService, JobService, MonitorService, @@ -325,6 +326,53 @@ export const UseConfigServiceGetConfigValueKeyFn = ( useConfigServiceGetConfigValueKey, ...(queryKey ?? [{ accept, option, section }]), ]; +export type GridServiceGridDataDefaultResponse = Awaited< + ReturnType +>; +export type GridServiceGridDataQueryResult< + TData = GridServiceGridDataDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useGridServiceGridDataKey = "GridServiceGridData"; +export const UseGridServiceGridDataKeyFn = ( + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }: { + baseDate?: string; + dagId: string; + filterDownstream?: boolean; + filterUpstream?: boolean; + limit?: number; + offset?: number; + root?: string; + runTypes?: string[]; + state?: string[]; + }, + queryKey?: Array, +) => [ + useGridServiceGridDataKey, + ...(queryKey ?? [ + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }, + ]), +]; export type BackfillServiceListBackfillsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 8f423b4c083de..b6e1ae6683b80 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -15,6 +15,7 @@ import { DashboardService, EventLogService, ExtraLinksService, + GridService, ImportErrorService, JobService, MonitorService, @@ -405,6 +406,71 @@ export const prefetchUseConfigServiceGetConfigValue = ( }), queryFn: () => ConfigService.getConfigValue({ accept, option, section }), }); +/** + * Grid Data + * Return grid data. + * @param data The data for the request. + * @param data.dagId + * @param data.baseDate + * @param data.root + * @param data.filterUpstream + * @param data.filterDownstream + * @param data.runTypes + * @param data.state + * @param data.offset + * @param data.limit + * @returns GridResponse Successful Response + * @throws ApiError + */ +export const prefetchUseGridServiceGridData = ( + queryClient: QueryClient, + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }: { + baseDate?: string; + dagId: string; + filterDownstream?: boolean; + filterUpstream?: boolean; + limit?: number; + offset?: number; + root?: string; + runTypes?: string[]; + state?: string[]; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseGridServiceGridDataKeyFn({ + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }), + queryFn: () => + GridService.gridData({ + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }), + }); /** * List Backfills * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index d644beb729488..a4acf64c815c8 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -20,6 +20,7 @@ import { DashboardService, EventLogService, ExtraLinksService, + GridService, ImportErrorService, JobService, MonitorService, @@ -519,6 +520,80 @@ export const useConfigServiceGetConfigValue = < ConfigService.getConfigValue({ accept, option, section }) as TData, ...options, }); +/** + * Grid Data + * Return grid data. + * @param data The data for the request. + * @param data.dagId + * @param data.baseDate + * @param data.root + * @param data.filterUpstream + * @param data.filterDownstream + * @param data.runTypes + * @param data.state + * @param data.offset + * @param data.limit + * @returns GridResponse Successful Response + * @throws ApiError + */ +export const useGridServiceGridData = < + TData = Common.GridServiceGridDataDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }: { + baseDate?: string; + dagId: string; + filterDownstream?: boolean; + filterUpstream?: boolean; + limit?: number; + offset?: number; + root?: string; + runTypes?: string[]; + state?: string[]; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseGridServiceGridDataKeyFn( + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }, + queryKey, + ), + queryFn: () => + GridService.gridData({ + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }) as TData, + ...options, + }); /** * List Backfills * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 11386ab5d1fbc..925c4347ee897 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -15,6 +15,7 @@ import { DashboardService, EventLogService, ExtraLinksService, + GridService, ImportErrorService, JobService, MonitorService, @@ -496,6 +497,80 @@ export const useConfigServiceGetConfigValueSuspense = < ConfigService.getConfigValue({ accept, option, section }) as TData, ...options, }); +/** + * Grid Data + * Return grid data. + * @param data The data for the request. + * @param data.dagId + * @param data.baseDate + * @param data.root + * @param data.filterUpstream + * @param data.filterDownstream + * @param data.runTypes + * @param data.state + * @param data.offset + * @param data.limit + * @returns GridResponse Successful Response + * @throws ApiError + */ +export const useGridServiceGridDataSuspense = < + TData = Common.GridServiceGridDataDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }: { + baseDate?: string; + dagId: string; + filterDownstream?: boolean; + filterUpstream?: boolean; + limit?: number; + offset?: number; + root?: string; + runTypes?: string[]; + state?: string[]; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseGridServiceGridDataKeyFn( + { + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }, + queryKey, + ), + queryFn: () => + GridService.gridData({ + baseDate, + dagId, + filterDownstream, + filterUpstream, + limit, + offset, + root, + runTypes, + state, + }) as TData, + ...options, + }); /** * List Backfills * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 024c2f2b58e1f..d8f0697e21075 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -672,7 +672,7 @@ export class GridService { * @param data.filterUpstream * @param data.filterDownstream * @param data.runTypes - * @param data.runStates + * @param data.state * @param data.offset * @param data.limit * @returns GridResponse Successful Response @@ -683,15 +683,17 @@ export class GridService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/ui/grid/", - query: { + url: "/ui/grid/{dag_id}", + path: { dag_id: data.dagId, + }, + query: { base_date: data.baseDate, root: data.root, filter_upstream: data.filterUpstream, filter_downstream: data.filterDownstream, run_types: data.runTypes, - run_states: data.runStates, + state: data.state, offset: data.offset, limit: data.limit, }, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index ee6a3630a3d05..806f4896bed9b 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1410,15 +1410,15 @@ export type GetConfigValueData = { export type GetConfigValueResponse = Config; export type GridDataData = { - baseDate: string; + baseDate?: string | null; dagId: string; filterDownstream?: boolean; filterUpstream?: boolean; limit?: number; offset?: number; root?: string | null; - runStates?: Array; runTypes?: Array; + state?: Array; }; export type GridDataResponse = GridResponse; @@ -2433,7 +2433,7 @@ export type $OpenApiTs = { }; }; }; - "/ui/grid/": { + "/ui/grid/{dag_id}": { get: { req: GridDataData; res: {