From 5cd6c5d1b7b4705cbbad826f5bc5404cf9fdeaab Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 18 Dec 2024 19:52:31 +0800 Subject: [PATCH] Implement Asset.ref for name or URI references This allows us to refer to an asset without needing the original object, making it easier to schedule against an asset. --- .../api_connexion/endpoints/asset_endpoint.py | 2 +- .../core_api/routes/public/assets.py | 2 +- airflow/assets/manager.py | 33 +- airflow/dag_processing/collection.py | 71 +- .../0052_3_0_0_add_asset_reference_models.py | 93 + airflow/models/asset.py | 126 +- airflow/models/dag.py | 30 +- airflow/models/taskinstance.py | 50 +- airflow/serialization/serialized_objects.py | 9 +- airflow/timetables/base.py | 7 +- airflow/utils/context.py | 62 +- airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3310 +++++++++-------- docs/apache-airflow/migrations-ref.rst | 4 +- .../airflow/sdk/definitions/asset/__init__.py | 130 +- .../sdk/definitions/asset/decorators.py | 12 +- task_sdk/src/airflow/sdk/definitions/dag.py | 18 +- .../tests/defintions/test_asset_decorators.py | 8 +- tests/models/test_dag.py | 23 + 20 files changed, 2274 insertions(+), 1720 deletions(-) create mode 100644 airflow/migrations/versions/0052_3_0_0_add_asset_reference_models.py diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 64930b1249468..9336a8afd36ca 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -349,7 +349,7 @@ def create_asset_event(session: Session = NEW_SESSION) -> APIResponse: extra = json_body.get("extra", {}) extra["from_rest_api"] = True asset_event = asset_manager.register_asset_change( - asset=asset_model.to_public(), + asset=asset_model, timestamp=timestamp, extra=extra, session=session, diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index a258f2bd07c5b..40e2fc5ce6cff 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -232,7 +232,7 @@ def create_asset_event( timestamp = timezone.utcnow() assets_event = asset_manager.register_asset_change( - asset=asset_model.to_public(), + asset=asset_model, timestamp=timestamp, extra=body.extra, session=session, diff --git a/airflow/assets/manager.py b/airflow/assets/manager.py index 99de69176a878..cecd76b07c3f4 100644 --- a/airflow/assets/manager.py +++ b/airflow/assets/manager.py @@ -20,7 +20,7 @@ from collections.abc import Collection, Iterable from typing import TYPE_CHECKING -from sqlalchemy import exc, select +from sqlalchemy import exc, or_, select from sqlalchemy.orm import joinedload from airflow.configuration import conf @@ -31,7 +31,9 @@ AssetEvent, AssetModel, DagScheduleAssetAliasReference, + DagScheduleAssetNameReference, DagScheduleAssetReference, + DagScheduleAssetUriReference, ) from airflow.models.dagbag import DagPriorityParsingRequest from airflow.stats import Stats @@ -42,7 +44,7 @@ from airflow.models.dag import DagModel from airflow.models.taskinstance import TaskInstance - from airflow.sdk.definitions.asset import Asset, AssetAlias + from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey class AssetManager(LoggingMixin): @@ -106,7 +108,7 @@ def register_asset_change( cls, *, task_instance: TaskInstance | None = None, - asset: Asset, + asset: Asset | AssetModel | AssetUniqueKey, extra=None, aliases: Collection[AssetAlias] = (), source_alias_names: Iterable[str] | None = None, @@ -119,7 +121,9 @@ def register_asset_change( For local assets, look them up, record the asset event, queue dagruns, and broadcast the asset event """ - asset_model = session.scalar( + from airflow.models.dag import DagModel + + asset_model: AssetModel | None = session.scalar( select(AssetModel) .where(AssetModel.name == asset.name, AssetModel.uri == asset.uri) .options( @@ -154,6 +158,7 @@ def register_asset_change( dags_to_queue_from_asset = { ref.dag for ref in asset_model.consuming_dags if ref.dag.is_active and not ref.dag.is_paused } + dags_to_queue_from_asset_alias = set() if source_alias_names: asset_alias_models = session.scalars( @@ -174,11 +179,27 @@ def register_asset_change( if alias_ref.dag.is_active and not alias_ref.dag.is_paused } - cls.notify_asset_changed(asset=asset) + dags_to_queue_from_asset_ref = set( + session.scalars( + select(DagModel) + .join(DagModel.schedule_asset_name_references, isouter=True) + .join(DagModel.schedule_asset_uri_references, isouter=True) + .where( + or_( + DagScheduleAssetNameReference.name == asset.name, + DagScheduleAssetUriReference.uri == asset.uri, + ) + ) + ) + ) + + cls.notify_asset_changed(asset=asset_model.to_public()) Stats.incr("asset.updates") - dags_to_queue = dags_to_queue_from_asset | dags_to_queue_from_asset_alias + dags_to_queue = ( + dags_to_queue_from_asset | dags_to_queue_from_asset_alias | dags_to_queue_from_asset_ref + ) cls._queue_dagruns(asset_id=asset_model.id, dags_to_queue=dags_to_queue, session=session) return asset_event diff --git a/airflow/dag_processing/collection.py b/airflow/dag_processing/collection.py index fff1cc64adfae..b5586d8b9ae2a 100644 --- a/airflow/dag_processing/collection.py +++ b/airflow/dag_processing/collection.py @@ -32,7 +32,7 @@ import traceback from typing import TYPE_CHECKING, NamedTuple -from sqlalchemy import and_, delete, exists, func, select, tuple_ +from sqlalchemy import and_, delete, exists, func, insert, select, tuple_ from sqlalchemy.exc import OperationalError from sqlalchemy.orm import joinedload, load_only @@ -42,7 +42,9 @@ AssetAliasModel, AssetModel, DagScheduleAssetAliasReference, + DagScheduleAssetNameReference, DagScheduleAssetReference, + DagScheduleAssetUriReference, TaskOutletAssetReference, ) from airflow.models.dag import DAG, DagModel, DagOwnerAttributes, DagTag @@ -50,7 +52,7 @@ from airflow.models.dagwarning import DagWarningType from airflow.models.errors import ParseImportError from airflow.models.trigger import Trigger -from airflow.sdk.definitions.asset import Asset, AssetAlias +from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUriRef from airflow.triggers.base import BaseTrigger from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries from airflow.utils.sqlalchemy import with_row_locks @@ -495,6 +497,8 @@ class AssetModelOperation(NamedTuple): schedule_asset_references: dict[str, list[Asset]] schedule_asset_alias_references: dict[str, list[AssetAlias]] + schedule_asset_name_references: set[tuple[str, str]] # dag_id, ref_name. + schedule_asset_uri_references: set[tuple[str, str]] # dag_id, ref_uri. outlet_references: dict[str, list[tuple[str, Asset]]] assets: dict[tuple[str, str], Asset] asset_aliases: dict[str, AssetAlias] @@ -510,6 +514,18 @@ def collect(cls, dags: dict[str, DAG]) -> Self: dag_id: [alias for _, alias in dag.timetable.asset_condition.iter_asset_aliases()] for dag_id, dag in dags.items() }, + schedule_asset_name_references={ + (dag_id, ref.name) + for dag_id, dag in dags.items() + for ref in dag.timetable.asset_condition.iter_asset_refs() + if isinstance(ref, AssetNameRef) + }, + schedule_asset_uri_references={ + (dag_id, ref.uri) + for dag_id, dag in dags.items() + for ref in dag.timetable.asset_condition.iter_asset_refs() + if isinstance(ref, AssetUriRef) + }, outlet_references={ dag_id: [ (task_id, outlet) @@ -614,6 +630,57 @@ def add_dag_asset_alias_references( if alias_id not in orm_refs ) + def add_dag_asset_name_uri_references(self, *, session: Session) -> None: + orm_name_refs = set( + session.scalars( + select(DagScheduleAssetNameReference.dag_id, DagScheduleAssetNameReference.name).where( + DagScheduleAssetNameReference.dag_id.in_( + dag_id for dag_id, _ in self.schedule_asset_name_references + ) + ) + ) + ) + new_name_refs = self.schedule_asset_name_references - orm_name_refs + old_name_refs = orm_name_refs - self.schedule_asset_name_references + if old_name_refs: + session.execute( + delete(DagScheduleAssetNameReference).where( + tuple_(DagScheduleAssetNameReference.dag_id, DagScheduleAssetNameReference.name).in_( + old_name_refs + ) + ) + ) + if new_name_refs: + session.execute( + insert(DagScheduleAssetNameReference), + [{"dag_id": d, "name": n} for d, n in new_name_refs], + ) + + orm_uri_refs = set( + session.scalars( + select(DagScheduleAssetUriReference.dag_id, DagScheduleAssetUriReference.uri).where( + DagScheduleAssetUriReference.dag_id.in_( + dag_id for dag_id, _ in self.schedule_asset_uri_references + ) + ) + ) + ) + new_uri_refs = self.schedule_asset_uri_references - orm_uri_refs + old_uri_refs = orm_uri_refs - self.schedule_asset_uri_references + if old_uri_refs: + session.execute( + delete(DagScheduleAssetUriReference).where( + tuple_(DagScheduleAssetUriReference.dag_id, DagScheduleAssetUriReference.uri).in_( + old_uri_refs + ) + ) + ) + if new_uri_refs: + session.execute( + insert(DagScheduleAssetUriReference), + [{"dag_id": d, "uri": u} for d, u in new_uri_refs], + ) + def add_task_asset_references( self, dags: dict[str, DagModel], diff --git a/airflow/migrations/versions/0052_3_0_0_add_asset_reference_models.py b/airflow/migrations/versions/0052_3_0_0_add_asset_reference_models.py new file mode 100644 index 0000000000000..df46e6236d1f2 --- /dev/null +++ b/airflow/migrations/versions/0052_3_0_0_add_asset_reference_models.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add asset reference models. + +Revision ID: 38770795785f +Revises: 038dc8bc6284 +Create Date: 2024-12-18 11:12:50.639369 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import StringID +from airflow.utils.sqlalchemy import UtcDateTime + +# revision identifiers, used by Alembic. +revision = "38770795785f" +down_revision = "038dc8bc6284" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + +ASSET_STR_FIELD = sa.String(length=1500).with_variant( + sa.String(length=1500, collation="latin1_general_cs"), "mysql" +) + + +def upgrade(): + """Add asset reference models.""" + op.create_table( + "dag_schedule_asset_name_reference", + sa.Column("name", ASSET_STR_FIELD, primary_key=True, nullable=False), + sa.Column("dag_id", StringID(), primary_key=True, nullable=False), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("name", "dag_id", name="dsanr_pkey"), + sa.ForeignKeyConstraint( + columns=("dag_id",), + refcolumns=["dag.dag_id"], + name="dsanr_dag_id_fkey", + ondelete="CASCADE", + ), + ) + op.create_index( + "idx_dag_schedule_asset_name_reference_dag_id", + "dag_schedule_asset_name_reference", + ["dag_id"], + unique=False, + ) + + op.create_table( + "dag_schedule_asset_uri_reference", + sa.Column("uri", ASSET_STR_FIELD, primary_key=True, nullable=False), + sa.Column("dag_id", StringID(), primary_key=True, nullable=False), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("uri", "dag_id", name="dsanr_pkey"), + sa.ForeignKeyConstraint( + columns=("dag_id",), + refcolumns=["dag.dag_id"], + name="dsanr_dag_id_fkey", + ondelete="CASCADE", + ), + ) + op.create_index( + "idx_dag_schedule_asset_uri_reference_dag_id", + "dag_schedule_asset_uri_reference", + ["dag_id"], + unique=False, + ) + + +def downgrade(): + """Unadd asset reference models.""" + op.drop_table("dag_schedule_asset_name_reference") + op.drop_table("dag_schedule_asset_uri_reference") diff --git a/airflow/models/asset.py b/airflow/models/asset.py index 1c1f9558a54f3..1fa4c20c49b73 100644 --- a/airflow/models/asset.py +++ b/airflow/models/asset.py @@ -43,17 +43,29 @@ if TYPE_CHECKING: from collections.abc import Iterable + from typing import Any from sqlalchemy.orm import Session def fetch_active_assets_by_name(names: Iterable[str], session: Session) -> dict[str, Asset]: return { - asset_model[0].name: asset_model[0].to_public() - for asset_model in session.execute( + asset_model.name: asset_model.to_public() + for asset_model in session.scalars( select(AssetModel) .join(AssetActive, AssetActive.name == AssetModel.name) - .where(AssetActive.name.in_(name for name in names)) + .where(AssetActive.name.in_(names)) + ) + } + + +def fetch_active_assets_by_uri(uris: Iterable[str], session: Session) -> dict[str, Asset]: + return { + asset_model.uri: asset_model.to_public() + for asset_model in session.scalars( + select(AssetModel) + .join(AssetActive, AssetActive.uri == AssetModel.uri) + .where(AssetActive.uri.in_(uris)) ) } @@ -68,6 +80,22 @@ def expand_alias_to_assets(alias_name: str, session: Session) -> Iterable[AssetM return [] +def resolve_ref_to_asset( + *, + name: str | None = None, + uri: str | None = None, + session: Session, +) -> AssetModel | None: + if name is None and uri is None: + raise TypeError("either name or uri is required") + stmt = select(AssetModel).where(AssetModel.active.has()) + if name is not None: + stmt = stmt.where(AssetModel.name == name) + if uri is not None: + stmt = stmt.where(AssetModel.uri == uri) + return session.scalar(stmt) + + alias_association_table = Table( "asset_alias_asset", Base.metadata, @@ -323,6 +351,98 @@ def for_asset(cls, asset: AssetModel) -> AssetActive: return cls(name=asset.name, uri=asset.uri) +class DagScheduleAssetNameReference(Base): + """Reference from a DAG to an asset name reference of which it is a consumer.""" + + name = Column( + String(length=1500).with_variant( + String( + length=1500, + # latin1 allows for more indexed length in mysql + # and this field should only be ascii chars + collation="latin1_general_cs", + ), + "mysql", + ), + primary_key=True, + nullable=False, + ) + dag_id = Column(StringID(), primary_key=True, nullable=False) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + + dag = relationship("DagModel", back_populates="schedule_asset_name_references") + + __tablename__ = "dag_schedule_asset_name_reference" + __table_args__ = ( + PrimaryKeyConstraint(name, dag_id, name="dsanr_pkey"), + ForeignKeyConstraint( + columns=(dag_id,), + refcolumns=["dag.dag_id"], + name="dsanr_dag_id_fkey", + ondelete="CASCADE", + ), + Index("idx_dag_schedule_asset_name_reference_dag_id", dag_id), + ) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, self.__class__): + return self.name == other.name and self.dag_id == other.dag_id + return NotImplemented + + def __hash__(self): + return hash(self.__mapper__.primary_key) + + def __repr__(self): + args = [f"{x.name}={getattr(self, x.name)!r}" for x in self.__mapper__.primary_key] + return f"{self.__class__.__name__}({', '.join(args)})" + + +class DagScheduleAssetUriReference(Base): + """Reference from a DAG to an asset URI reference of which it is a consumer.""" + + uri = Column( + String(length=1500).with_variant( + String( + length=1500, + # latin1 allows for more indexed length in mysql + # and this field should only be ascii chars + collation="latin1_general_cs", + ), + "mysql", + ), + primary_key=True, + nullable=False, + ) + dag_id = Column(StringID(), primary_key=True, nullable=False) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + + dag = relationship("DagModel", back_populates="schedule_asset_uri_references") + + __tablename__ = "dag_schedule_asset_uri_reference" + __table_args__ = ( + PrimaryKeyConstraint(uri, dag_id, name="dsaur_pkey"), + ForeignKeyConstraint( + columns=(dag_id,), + refcolumns=["dag.dag_id"], + name="dsanr_dag_id_fkey", + ondelete="CASCADE", + ), + Index("idx_dag_schedule_asset_uri_reference_dag_id", dag_id), + ) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, self.__class__): + return self.name == other.name and self.dag_id == other.dag_id + return NotImplemented + + def __hash__(self): + return hash(self.__mapper__.primary_key) + + def __repr__(self): + args = [f"{x.name}={getattr(self, x.name)!r}" for x in self.__mapper__.primary_key] + return f"{self.__class__.__name__}({', '.join(args)})" + + class DagScheduleAssetAliasReference(Base): """References from a DAG to an asset alias of which it is a consumer.""" diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1decc2db683ea..d0c6533a10332 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -90,7 +90,7 @@ clear_task_instances, ) from airflow.models.tasklog import LogTemplate -from airflow.sdk.definitions.asset import Asset, AssetAlias, BaseAsset +from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, BaseAsset from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as task_sdk_dag_decorator from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.security import permissions @@ -1858,6 +1858,7 @@ def bulk_write_to_db( orm_dags = dag_op.find_orm_dags(session=session) # Refetch so relationship is up to date. asset_op.add_dag_asset_references(orm_dags, orm_assets, session=session) asset_op.add_dag_asset_alias_references(orm_dags, orm_asset_aliases, session=session) + asset_op.add_dag_asset_name_uri_references(session=session) asset_op.add_task_asset_references(orm_dags, orm_assets, session=session) asset_op.add_asset_trigger_references(orm_assets, session=session) session.flush() @@ -2080,6 +2081,16 @@ class DagModel(Base): back_populates="dag", cascade="all, delete, delete-orphan", ) + schedule_asset_name_references = relationship( + "DagScheduleAssetNameReference", + back_populates="dag", + cascade="all, delete, delete-orphan", + ) + schedule_asset_uri_references = relationship( + "DagScheduleAssetUriReference", + back_populates="dag", + cascade="all, delete, delete-orphan", + ) schedule_assets = association_proxy("schedule_asset_references", "asset") task_outlet_asset_references = relationship( "TaskOutletAssetReference", @@ -2275,7 +2286,7 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ """ from airflow.models.serialized_dag import SerializedDagModel - def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None: + def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict[AssetUniqueKey, bool]) -> bool | None: # if dag was serialized before 2.9 and we *just* upgraded, # we may be dealing with old version. In that case, # just wait for the dag to be reserialized. @@ -2286,22 +2297,17 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None: return None # this loads all the ADRQ records.... may need to limit num dags - all_records = session.scalars(select(AssetDagRunQueue)).all() - by_dag = defaultdict(list) - for r in all_records: + by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list) + for r in session.scalars(select(AssetDagRunQueue)): by_dag[r.target_dag_id].append(r) - del all_records - dag_statuses = {} + dag_statuses: dict[str, dict[AssetUniqueKey, bool]] = {} for dag_id, records in by_dag.items(): - dag_statuses[dag_id] = {x.asset.uri: True for x in records} - ser_dags = SerializedDagModel.get_latest_serialized_dags( - dag_ids=list(dag_statuses.keys()), session=session - ) + dag_statuses[dag_id] = {AssetUniqueKey.from_asset(x.asset): True for x in records} + ser_dags = SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), session=session) for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.asset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 24ebcebcc373f..a0b1284fa93ba 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -101,7 +101,7 @@ from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import LazyXComSelectSequence, XCom from airflow.plugins_manager import integrate_macros_plugins -from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey +from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef from airflow.sentry import Sentry from airflow.settings import task_instance_mutation_hook from airflow.stats import Stats @@ -2731,6 +2731,10 @@ def _register_asset_changes_int( # This tuple[asset uri, extra] to sets alias names mapping is used to find whether # there're assets with same uri but different extra that we need to emit more than one asset events. asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = defaultdict(set) + + asset_name_refs: set[str] = set() + asset_uri_refs: set[str] = set() + for obj in ti.task.outlets or []: ti.log.debug("outlet obj %s", obj) # Lineage can have other types of objects besides assets @@ -2741,6 +2745,10 @@ def _register_asset_changes_int( extra=events[obj].extra, session=session, ) + elif isinstance(obj, AssetNameRef): + asset_name_refs.add(obj.name) + elif isinstance(obj, AssetUriRef): + asset_uri_refs.add(obj.uri) elif isinstance(obj, AssetAlias): for asset_alias_event in events[obj].asset_alias_events: asset_alias_name = asset_alias_event.source_alias_name @@ -2748,10 +2756,10 @@ def _register_asset_changes_int( frozen_extra = frozenset(asset_alias_event.extra.items()) asset_alias_names[(asset_unique_key, frozen_extra)].add(asset_alias_name) - asset_models: dict[AssetUniqueKey, AssetModel] = { - AssetUniqueKey.from_asset(asset_obj): asset_obj - for asset_obj in session.scalars( - select(AssetModel).where( + existing_aliased_assets: set[AssetUniqueKey] = { + AssetUniqueKey(name=name, uri=uri) + for name, uri in session.execute( + select(AssetModel.name, AssetModel.uri).where( tuple_(AssetModel.name, AssetModel.uri).in_( (key.name, key.uri) for key, _ in asset_alias_names ) @@ -2761,31 +2769,47 @@ def _register_asset_changes_int( if missing_assets := [ asset_unique_key.to_asset() for asset_unique_key, _ in asset_alias_names - if asset_unique_key not in asset_models + if asset_unique_key not in existing_aliased_assets ]: - asset_models.update( - (AssetUniqueKey.from_asset(asset_obj), asset_obj) - for asset_obj in asset_manager.create_assets(missing_assets, session=session) - ) + asset_manager.create_assets(missing_assets, session=session) ti.log.warning("Created new assets for alias reference: %s", missing_assets) session.flush() # Needed because we need the id for fk. for (unique_key, extra_items), alias_names in asset_alias_names.items(): - asset_obj = asset_models[unique_key] ti.log.info( 'Creating event for %r through aliases "%s"', - asset_obj, + unique_key, ", ".join(alias_names), ) asset_manager.register_asset_change( task_instance=ti, - asset=asset_obj, + asset=unique_key, aliases=[AssetAlias(name=name) for name in alias_names], extra=dict(extra_items), session=session, source_alias_names=alias_names, ) + # Handle events derived from references. + asset_stmt = select(AssetModel).where(AssetModel.name.in_(asset_name_refs), AssetModel.active.has()) + for asset_model in session.scalars(asset_stmt): + ti.log.info("Creating event through asset name reference %r", asset_model.name) + asset_manager.register_asset_change( + task_instance=ti, + asset=asset_model, + extra=events[asset_model].extra, + session=session, + ) + asset_stmt = select(AssetModel).where(AssetModel.uri.in_(asset_uri_refs), AssetModel.active.has()) + for asset_model in session.scalars(asset_stmt): + ti.log.info("Creating event for through asset URI reference %r", asset_model.uri) + asset_manager.register_asset_change( + task_instance=ti, + asset=asset_model, + extra=events[asset_model].extra, + session=session, + ) + def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session): """Prepare Task for Execution.""" if TYPE_CHECKING: diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index fa0984f6d5f1e..eb6c548eeadf2 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -58,8 +58,9 @@ AssetAliasUniqueKey, AssetAll, AssetAny, - AssetRef, + AssetNameRef, AssetUniqueKey, + AssetUriRef, BaseAsset, ) from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator @@ -760,8 +761,10 @@ def serialize( elif isinstance(var, BaseAsset): serialized_asset = encode_asset_condition(var) return cls._encode(serialized_asset, type_=serialized_asset.pop("__type")) - elif isinstance(var, AssetRef): + elif isinstance(var, AssetNameRef): return cls._encode({"name": var.name}, type_=DAT.ASSET_REF) + elif isinstance(var, AssetUriRef): + return cls._encode({"uri": var.uri}, type_=DAT.ASSET_REF) elif isinstance(var, SimpleTaskInstance): return cls._encode( cls.serialize(var.__dict__, strict=strict), @@ -878,7 +881,7 @@ def deserialize(cls, encoded_var: Any) -> Any: elif type_ == DAT.ASSET_ALL: return AssetAll(*(decode_asset_condition(x) for x in var["objects"])) elif type_ == DAT.ASSET_REF: - return AssetRef(name=var["name"]) + return Asset.ref(**var) elif type_ == DAT.SIMPLE_TASK_INSTANCE: return SimpleTaskInstance(**cls.deserialize(var)) elif type_ == DAT.CONNECTION: diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py index c719f92437be1..38655fb73b3d4 100644 --- a/airflow/timetables/base.py +++ b/airflow/timetables/base.py @@ -26,7 +26,7 @@ from pendulum import DateTime from sqlalchemy.orm import Session - from airflow.sdk.definitions.asset import Asset, AssetAlias + from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetRef from airflow.serialization.dag_dependency import DagDependency from airflow.utils.types import DagRunType @@ -53,7 +53,7 @@ def __and__(self, other: BaseAsset) -> BaseAsset: def as_expression(self) -> Any: return None - def evaluate(self, statuses: dict[str, bool], *, session: Session | None = None) -> bool: + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: return False def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: @@ -62,6 +62,9 @@ def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: return iter(()) + def iter_asset_refs(self) -> Iterator[AssetRef]: + return iter(()) + def iter_dag_dependencies(self, source, target) -> Iterator[DagDependency]: return iter(()) diff --git a/airflow/utils/context.py b/airflow/utils/context.py index c6cf2db498532..5957b48103901 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -44,16 +44,25 @@ from sqlalchemy import and_, select from airflow.exceptions import RemovedInAirflow3Warning -from airflow.models.asset import AssetAliasModel, AssetEvent, AssetModel, fetch_active_assets_by_name +from airflow.models.asset import ( + AssetAliasModel, + AssetEvent, + AssetModel, + fetch_active_assets_by_name, + fetch_active_assets_by_uri, +) from airflow.sdk.definitions.asset import ( Asset, AssetAlias, AssetAliasUniqueKey, + AssetNameRef, AssetRef, AssetUniqueKey, + AssetUriRef, BaseAssetUniqueKey, ) from airflow.utils.db import LazySelectSequence +from airflow.utils.session import create_session from airflow.utils.types import NOTSET if TYPE_CHECKING: @@ -200,6 +209,8 @@ class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor :meta private: """ + _asset_ref_cache: dict[AssetRef, AssetUniqueKey] = {} + def __init__(self) -> None: self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} @@ -220,6 +231,8 @@ def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor: hashable_key = AssetUniqueKey.from_asset(key) elif isinstance(key, AssetAlias): hashable_key = AssetAliasUniqueKey.from_asset_alias(key) + elif isinstance(key, AssetRef): + hashable_key = self._resolve_asset_ref(key) else: raise TypeError(f"Key should be either an asset or an asset alias, not {type(key)}") @@ -227,6 +240,28 @@ def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor: self._dict[hashable_key] = OutletEventAccessor(extra={}, key=hashable_key) return self._dict[hashable_key] + def _resolve_asset_ref(self, ref: AssetRef) -> AssetUniqueKey: + with contextlib.suppress(KeyError): + return self._asset_ref_cache[ref] + + refs_to_cache: list[AssetRef] + with create_session() as session: + if isinstance(ref, AssetNameRef): + asset = session.scalar( + select(AssetModel).where(AssetModel.name == ref.name, AssetModel.active.has()) + ) + refs_to_cache = [ref, AssetUriRef(asset.uri)] + elif isinstance(ref, AssetUriRef): + asset = session.scalar( + select(AssetModel).where(AssetModel.uri == ref.uri, AssetModel.active.has()) + ) + refs_to_cache = [ref, AssetNameRef(asset.name)] + else: + raise TypeError(f"Unimplemented asset ref: {type(ref)}") + for ref in refs_to_cache: + self._asset_ref_cache[ref] = unique_key = AssetUniqueKey.from_asset(asset) + return unique_key + class LazyAssetEventSelectSequence(LazySelectSequence[AssetEvent]): """ @@ -264,17 +299,23 @@ def __init__(self, inlets: list, *, session: Session) -> None: self._asset_aliases = {} _asset_ref_names: list[str] = [] + _asset_ref_uris: list[str] = [] for inlet in inlets: if isinstance(inlet, Asset): self._assets[AssetUniqueKey.from_asset(inlet)] = inlet elif isinstance(inlet, AssetAlias): self._asset_aliases[AssetAliasUniqueKey.from_asset_alias(inlet)] = inlet - elif isinstance(inlet, AssetRef): + elif isinstance(inlet, AssetNameRef): _asset_ref_names.append(inlet.name) + elif isinstance(inlet, AssetUriRef): + _asset_ref_uris.append(inlet.uri) if _asset_ref_names: for _, asset in fetch_active_assets_by_name(_asset_ref_names, self._session).items(): self._assets[AssetUniqueKey.from_asset(asset)] = asset + if _asset_ref_uris: + for _, asset in fetch_active_assets_by_uri(_asset_ref_uris, self._session).items(): + self._assets[AssetUniqueKey.from_asset(asset)] = asset def __iter__(self) -> Iterator[Asset | AssetAlias]: return iter(self._inlets) @@ -298,11 +339,20 @@ def __getitem__(self, key: int | Asset | AssetAlias | AssetRef) -> LazyAssetEven asset_alias = self._asset_aliases[AssetAliasUniqueKey.from_asset_alias(obj)] join_clause = AssetEvent.source_aliases where_clause = AssetAliasModel.name == asset_alias.name - elif isinstance(obj, AssetRef): - # TODO: handle the case that Asset uri is different from name - asset = self._assets[AssetUniqueKey.from_asset(Asset(name=obj.name))] + elif isinstance(obj, AssetNameRef): + try: + asset = next(a for k, a in self._assets.items() if k.name == obj.name) + except StopIteration: + raise KeyError(obj) from None join_clause = AssetEvent.asset - where_clause = and_(AssetModel.name == asset.name, AssetModel.uri == asset.uri) + where_clause = and_(AssetModel.name == asset.name, AssetModel.active.has()) + elif isinstance(obj, AssetUriRef): + try: + asset = next(a for k, a in self._assets.items() if k.uri == obj.uri) + except StopIteration: + raise KeyError(obj) from None + join_clause = AssetEvent.asset + where_clause = and_(AssetModel.uri == asset.uri, AssetModel.active.has()) else: raise ValueError(key) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 3cd3c206a66d2..1a6e698c7daa1 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "038dc8bc6284", + "3.0.0": "38770795785f", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index b503e8dfaf91a..1960a09bc7a98 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -ba10504bc54d15b2faca37ae9db172848a498e471bbf332e031715f728158ff8 \ No newline at end of file +c90ee05191d0beaf59f468d230d39a3bd77778c692558d388f75589ebc6ed68c \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 9f19f0f920a5c..0989ced6c8680 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -304,1096 +304,1096 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} - + task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 @@ -1536,891 +1536,947 @@ {0,1} - + dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 - + dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 - + dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 - + dag--asset_dag_run_queue - -0..N -1 + +0..N +1 - + +dag_schedule_asset_name_reference + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + + + +dag--dag_schedule_asset_name_reference + +0..N +1 + + + +dag_schedule_asset_uri_reference + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + + + +dag--dag_schedule_asset_uri_reference + +0..N +1 + + + dag_version - -dag_version - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +version_number + + [INTEGER] + NOT NULL - + dag--dag_version - -0..N -1 + +0..N +1 - + dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL - + dag--dag_tag - -0..N -1 + +0..N +1 - + dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL - + dag--dag_owner_attributes - -0..N -1 + +0..N +1 - + dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL - + dag--dag_warning - -0..N -1 + +0..N +1 - + dag_version--task_instance - -0..N -{0,1} + +0..N +{0,1} - + dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] - + dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} - + dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL - + dag_version--dag_code - -0..N -1 + +0..N +1 - + serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +processor_subdir + + [VARCHAR(2000)] - + dag_version--serialized_dag - -0..N -1 + +0..N +1 - + dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 - + dag_run--task_instance - -0..N -1 + +0..N +1 - + dag_run--task_instance - -0..N -1 + +0..N +1 - + backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL - + dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} - + dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] - + dag_run--dag_run_note - -1 -1 + +1 +1 - + dag_run--task_reschedule - -0..N -1 + +0..N +1 - + dag_run--task_reschedule - -0..N -1 + +0..N +1 - + log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} - + backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} - + backfill--backfill_dag_run - -0..N -1 + +0..N +1 - + session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] - + alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL - + ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] - + ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL - + ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] - + ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] - + ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL - + ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL - + ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index edd166e0bf418..df1cc275ebbba 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``038dc8bc6284`` (head) | ``e229247a6cb1`` | ``3.0.0`` | update trigger_timeout column in task_instance table to UTC. | +| ``38770795785f`` (head) | ``038dc8bc6284`` | ``3.0.0`` | Add asset reference models. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``038dc8bc6284`` | ``e229247a6cb1`` | ``3.0.0`` | update trigger_timeout column in task_instance table to UTC. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``e229247a6cb1`` | ``eed27faa34e3`` | ``3.0.0`` | Add DagBundleModel. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py index c7d3906cf2e56..5b0cbb4a784d9 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -38,15 +38,21 @@ from airflow.models.asset import AssetModel from airflow.triggers.base import BaseTrigger + AttrsInstance = attrs.AttrsInstance +else: + AttrsInstance = object + __all__ = [ "Asset", "Dataset", "Model", - "AssetRef", "AssetAlias", "AssetAll", "AssetAny", + "AssetNameRef", + "AssetRef", + "AssetUriRef", ] @@ -230,7 +236,7 @@ def as_expression(self) -> Any: """ raise NotImplementedError - def evaluate(self, statuses: dict[str, bool], *, session: Session | None = None) -> bool: + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: raise NotImplementedError def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: @@ -239,6 +245,9 @@ def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: raise NotImplementedError + def iter_asset_refs(self) -> Iterator[AssetRef]: + raise NotImplementedError + def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: """ Iterate a base asset as dag dependency. @@ -340,6 +349,24 @@ def __init__( self.__attrs_init__(name=name, uri=uri, **kwargs) + @overload + @staticmethod + def ref(*, name: str) -> AssetNameRef: ... + + @overload + @staticmethod + def ref(*, uri: str) -> AssetUriRef: ... + + @staticmethod + def ref(*, name: str = "", uri: str = "") -> AssetRef: + if name and uri: + raise TypeError("Asset reference must be made to either name or URI, not both") + if name: + return AssetNameRef(name) + if uri: + return AssetUriRef(uri) + raise TypeError("Asset reference expects keyword argument 'name' or 'uri'") + def __fspath__(self) -> str: return self.uri @@ -388,8 +415,11 @@ def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: return iter(()) - def evaluate(self, statuses: dict[str, bool], *, session: Session | None = None) -> bool: - return statuses.get(self.uri, False) + def iter_asset_refs(self) -> Iterator[AssetRef]: + return iter(()) + + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: + return statuses.get(AssetUniqueKey.from_asset(self), False) def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: """ @@ -405,13 +435,73 @@ def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDepe ) -@attrs.define(kw_only=True) -class AssetRef: - """Reference to an asset.""" +class AssetRef(BaseAsset, AttrsInstance): + """ + Reference to an asset. + + This class is not intended to be instantiated directly. Call ``Asset.ref`` + instead to create one of the subclasses. + + :meta private: + """ + + def as_expression(self) -> Any: + return {"asset_ref": attrs.asdict(self)} + + def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: + return iter(()) + + def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: + return iter(()) + + def iter_asset_refs(self) -> Iterator[AssetRef]: + yield self + + def _resolve_asset(self, *, session: Session | None = None) -> Asset | None: + from airflow.models.asset import resolve_ref_to_asset + from airflow.utils.session import create_session + + with contextlib.nullcontext(session) if session else create_session() as session: + asset = resolve_ref_to_asset(**attrs.asdict(self), session=session) + return asset.to_public() if asset else None + + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: + if asset := self._resolve_asset(session=session): + return asset.evaluate(statuses=statuses, session=session) + return False + + def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]: + (dependency_id,) = attrs.astuple(self) + if asset := self._resolve_asset(): + yield DagDependency( + source=f"asset-ref:{dependency_id}" if source else "asset", + target="asset" if source else f"asset-ref:{dependency_id}", + dependency_type="asset", + dependency_id=asset.name, + ) + else: + yield DagDependency( + source=source or "asset-ref", + target=target or "asset-ref", + dependency_type="asset-ref", + dependency_id=dependency_id, + ) + + +@attrs.define() +class AssetNameRef(AssetRef): + """Name reference to an asset.""" name: str +@attrs.define() +class AssetUriRef(AssetRef): + """URI reference to an asset.""" + + uri: str + + class Dataset(Asset): """A representation of dataset dependencies between workflows.""" @@ -447,7 +537,7 @@ def as_expression(self) -> Any: """ return {"alias": {"name": self.name, "group": self.group}} - def evaluate(self, statuses: dict[str, bool], *, session: Session | None = None) -> bool: + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: return any(x.evaluate(statuses=statuses, session=session) for x in self._resolve_assets(session)) def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: @@ -456,6 +546,9 @@ def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: yield self.name, self + def iter_asset_refs(self) -> Iterator[AssetRef]: + return iter(()) + def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterator[DagDependency]: """ Iterate an asset alias and its resolved assets as dag dependency. @@ -498,27 +591,20 @@ def __init__(self, *objects: BaseAsset) -> None: raise TypeError("expect asset expressions in condition") self.objects = objects - def evaluate(self, statuses: dict[str, bool], *, session: Session | None = None) -> bool: + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: return self.agg_func(x.evaluate(statuses=statuses, session=session) for x in self.objects) def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: - seen: set[AssetUniqueKey] = set() # We want to keep the first instance. for o in self.objects: - for k, v in o.iter_assets(): - if k in seen: - continue - yield k, v - seen.add(k) + yield from o.iter_assets() def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: - """Filter asset aliases in the condition.""" - seen: set[str] = set() # We want to keep the first instance. for o in self.objects: - for k, v in o.iter_asset_aliases(): - if k in seen: - continue - yield k, v - seen.add(k) + yield from o.iter_asset_aliases() + + def iter_asset_refs(self) -> Iterator[AssetRef]: + for o in self.objects: + yield from o.iter_asset_refs() def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: """ diff --git a/task_sdk/src/airflow/sdk/definitions/asset/decorators.py b/task_sdk/src/airflow/sdk/definitions/asset/decorators.py index 531e097fd99f8..1f1d90883240b 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/decorators.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/decorators.py @@ -23,7 +23,7 @@ import attrs from airflow.providers.standard.operators.python import PythonOperator -from airflow.sdk.definitions.asset import Asset, AssetRef, BaseAsset +from airflow.sdk.definitions.asset import Asset, AssetNameRef, AssetRef, BaseAsset if TYPE_CHECKING: from collections.abc import Callable, Collection, Iterator, Mapping @@ -49,7 +49,7 @@ def from_definition(cls, definition: AssetDefinition | MultiAssetDefinition) -> return cls( task_id="__main__", inlets=[ - AssetRef(name=inlet_asset_name) + Asset.ref(name=inlet_asset_name) for inlet_asset_name in inspect.signature(definition._function).parameters if inlet_asset_name not in ("self", "context") ], @@ -75,7 +75,7 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: from airflow.models.asset import fetch_active_assets_by_name from airflow.utils.session import create_session - asset_names = {asset_ref.name for asset_ref in self.inlets if isinstance(asset_ref, AssetRef)} + asset_names = {asset_ref.name for asset_ref in self.inlets if isinstance(asset_ref, AssetNameRef)} if "self" in inspect.signature(self.python_callable).parameters: asset_names.add(self._definition_name) @@ -122,7 +122,7 @@ def __attrs_post_init__(self) -> None: with self._source.create_dag(dag_id=self._function.__name__): _AssetMainOperator.from_definition(self) - def evaluate(self, statuses: dict[str, bool], *, session: Session | None = None) -> bool: + def evaluate(self, statuses: dict[AssetUniqueKey, bool], *, session: Session | None = None) -> bool: return all(o.evaluate(statuses=statuses, session=session) for o in self._source.outlets) def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]: @@ -133,6 +133,10 @@ def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]: for o in self._source.outlets: yield from o.iter_asset_aliases() + def iter_asset_refs(self) -> Iterator[AssetRef]: + for o in self._source.outlets: + yield from o.iter_asset_refs() + def iter_dag_dependencies(self, *, source: str, target: str) -> Iterator[DagDependency]: for obj in self._source.outlets: yield from obj.iter_dag_dependencies(source=source, target=target) diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py b/task_sdk/src/airflow/sdk/definitions/dag.py index 2b1270d6f25b5..f49e42aa0eb30 100644 --- a/task_sdk/src/airflow/sdk/definitions/dag.py +++ b/task_sdk/src/airflow/sdk/definitions/dag.py @@ -53,7 +53,7 @@ ) from airflow.models.param import DagParam, ParamsDict from airflow.sdk.definitions.abstractoperator import AbstractOperator -from airflow.sdk.definitions.asset import Asset, AssetAlias, BaseAsset +from airflow.sdk.definitions.asset import AssetAll, BaseAsset from airflow.sdk.definitions.baseoperator import BaseOperator from airflow.sdk.types import NOTSET from airflow.timetables.base import Timetable @@ -92,12 +92,7 @@ DagStateChangeCallback = Callable[[Context], None] ScheduleInterval = Union[None, str, timedelta, relativedelta] -ScheduleArg = Union[ - ScheduleInterval, - Timetable, - BaseAsset, - Collection[Union["Asset", "AssetAlias"]], -] +ScheduleArg = Union[ScheduleInterval, Timetable, BaseAsset, Collection[BaseAsset]] _DAG_HASH_ATTRS = frozenset( @@ -492,8 +487,6 @@ def _validate_max_active_runs(self, _, max_active_runs): @timetable.default def _default_timetable(instance: DAG): - from airflow.sdk.definitions.asset import AssetAll - schedule = instance.schedule # TODO: Once # delattr(self, "schedule") @@ -502,8 +495,11 @@ def _default_timetable(instance: DAG): elif isinstance(schedule, BaseAsset): return AssetTriggeredTimetable(schedule) elif isinstance(schedule, Collection) and not isinstance(schedule, str): - if not all(isinstance(x, (Asset, AssetAlias)) for x in schedule): - raise ValueError("All elements in 'schedule' should be assets or asset aliases") + if not all(isinstance(x, BaseAsset) for x in schedule): + raise ValueError( + "All elements in 'schedule' should be either assets, " + "asset references, or asset aliases" + ) return AssetTriggeredTimetable(AssetAll(*schedule)) else: return _create_timetable(schedule, instance.timezone) diff --git a/task_sdk/tests/defintions/test_asset_decorators.py b/task_sdk/tests/defintions/test_asset_decorators.py index 2e714237b4193..5cac8b8181364 100644 --- a/task_sdk/tests/defintions/test_asset_decorators.py +++ b/task_sdk/tests/defintions/test_asset_decorators.py @@ -21,7 +21,7 @@ import pytest from airflow.models.asset import AssetModel -from airflow.sdk.definitions.asset import Asset, AssetRef +from airflow.sdk.definitions.asset import Asset from airflow.sdk.definitions.asset.decorators import _AssetMainOperator, asset @@ -171,7 +171,7 @@ def test_from_definition(self, example_asset_func_with_valid_arg_as_inlet_asset) ) op = _AssetMainOperator.from_definition(definition) assert op.task_id == "__main__" - assert op.inlets == [AssetRef(name="inlet_asset_1"), AssetRef(name="inlet_asset_2")] + assert op.inlets == [Asset.ref(name="inlet_asset_1"), Asset.ref(name="inlet_asset_2")] assert op.outlets == [definition] assert op.python_callable == example_asset_func_with_valid_arg_as_inlet_asset assert op._definition_name == "example_asset_func" @@ -183,7 +183,7 @@ def test_from_definition_multi(self, example_asset_func_with_valid_arg_as_inlet_ )(example_asset_func_with_valid_arg_as_inlet_asset) op = _AssetMainOperator.from_definition(definition) assert op.task_id == "__main__" - assert op.inlets == [AssetRef(name="inlet_asset_1"), AssetRef(name="inlet_asset_2")] + assert op.inlets == [Asset.ref(name="inlet_asset_1"), Asset.ref(name="inlet_asset_2")] assert op.outlets == [Asset(name="a"), Asset(name="b")] assert op.python_callable == example_asset_func_with_valid_arg_as_inlet_asset assert op._definition_name == "example_asset_func" @@ -215,7 +215,7 @@ def __exit__(self, *args, **kwargs): op = _AssetMainOperator( task_id="__main__", - inlets=[AssetRef(name="inlet_asset_1"), AssetRef(name="inlet_asset_2")], + inlets=[Asset.ref(name="inlet_asset_1"), Asset.ref(name="inlet_asset_2")], outlets=[asset_definition], python_callable=example_asset_func_with_valid_arg_as_inlet_asset, definition_name="example_asset_func", diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 534cfa6756637..f8a0b14232218 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2273,6 +2273,29 @@ def test_dags_needing_dagruns_asset_aliases(self, dag_maker, session): dag_models = query.all() assert dag_models == [dag_model] + @pytest.mark.parametrize("ref", [Asset.ref(name="1"), Asset.ref(uri="s3://bucket/assets/1")]) + def test_dags_needing_dagruns_asset_refs(self, dag_maker, session, ref): + asset = Asset(name="1", uri="s3://bucket/assets/1") + + with dag_maker(dag_id="producer", schedule=None, session=session): + EmptyOperator(task_id="op", outlets=asset) + + dr: DagRun = dag_maker.create_dagrun() + + with dag_maker(dag_id="consumer", schedule=ref, max_active_runs=1): + pass + + # Nothing from the upstream yet, no runs needed. + assert session.scalars(select(AssetDagRunQueue.target_dag_id)).all() == [] + query, _ = DagModel.dags_needing_dagruns(session) + assert query.all() == [] + + # Upstream triggered, now we need a run. + dr.get_task_instance("op").run() + assert session.scalars(select(AssetDagRunQueue.target_dag_id)).all() == ["consumer"] + query, _ = DagModel.dags_needing_dagruns(session) + assert [dm.dag_id for dm in query] == ["consumer"] + def test_max_active_runs_not_none(self): dag = DAG( dag_id="test_max_active_runs_not_none",