Skip to content

Commit

Permalink
Test standard provider with Airflow 2.8 and 2.9
Browse files Browse the repository at this point in the history
The standard provider has now min version of Airflow = 2.8
since #43553, but we have not tested it for Airflow 2.8 and 2.9.
  • Loading branch information
potiuk committed Nov 5, 2024
1 parent ed3accb commit 7021195
Show file tree
Hide file tree
Showing 20 changed files with 224 additions and 165 deletions.
6 changes: 3 additions & 3 deletions contributing-docs/testing/unit_tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1173,11 +1173,11 @@ are not part of the public API. We deal with it in one of the following ways:
.. code-block:: python
from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS
from tests_common.test_utils.compat import AIRFLOW_V_2_9_PLUS
@pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="The tests should be skipped for Airflow < 2.8")
def some_test_that_only_works_for_airflow_2_8_plus():
@pytest.mark.skipif(not AIRFLOW_V_2_9_PLUS, reason="The tests should be skipped for Airflow < 2.9")
def some_test_that_only_works_for_airflow_2_9_plus():
pass
4) Sometimes, the tests should only be run when airflow is installed from the sources in main.
Expand Down
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,13 @@ def get_airflow_extras():
{
"python-version": "3.9",
"airflow-version": "2.8.4",
"remove-providers": "cloudant fab edge standard",
"remove-providers": "cloudant fab edge",
"run-tests": "true",
},
{
"python-version": "3.9",
"airflow-version": "2.9.3",
"remove-providers": "cloudant edge standard",
"remove-providers": "cloudant edge",
"run-tests": "true",
},
{
Expand Down
58 changes: 41 additions & 17 deletions providers/src/airflow/providers/standard/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import Literal
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess, execute_in_subprocess_with_kwargs
from airflow.utils.session import create_session

log = logging.getLogger(__name__)

AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.10.0")
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")

if TYPE_CHECKING:
Expand Down Expand Up @@ -187,7 +188,15 @@ def __init__(
def execute(self, context: Context) -> Any:
context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
self.op_kwargs = self.determine_kwargs(context)
self._asset_events = context_get_outlet_events(context)

if AIRFLOW_V_3_0_PLUS:
from airflow.utils.context import context_get_outlet_events

self._asset_events = context_get_outlet_events(context)
elif AIRFLOW_V_2_10_PLUS:
from airflow.utils.context import context_get_outlet_events

self._dataset_events = context_get_outlet_events(context)

return_value = self.execute_callable()
if self.show_return_value_in_logs:
Expand All @@ -206,7 +215,15 @@ def execute_callable(self) -> Any:
:return: the return value of the call.
"""
runner = ExecutionCallableRunner(self.python_callable, self._asset_events, logger=self.log)
try:
from airflow.utils.operator_helpers import ExecutionCallableRunner

asset_events = self._asset_events if AIRFLOW_V_3_0_PLUS else self._dataset_events

runner = ExecutionCallableRunner(self.python_callable, asset_events, logger=self.log)
except ImportError:
# Handle Pre Airflow 3.10 case where ExecutionCallableRunner was not available
return self.python_callable(*self.op_args, **self.op_kwargs)
return runner.run(*self.op_args, **self.op_kwargs)


Expand Down Expand Up @@ -551,18 +568,25 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
env_vars.update(self.env_vars)

try:
execute_in_subprocess(
cmd=[
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
],
env=env_vars,
)
cmd: list[str] = [
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
]
if AIRFLOW_V_2_10_PLUS:
execute_in_subprocess(
cmd=cmd,
env=env_vars,
)
else:
execute_in_subprocess_with_kwargs(
cmd=cmd,
env=env_vars,
)
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/standard/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn, Sequence

from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand Down Expand Up @@ -125,7 +142,9 @@ def execute(self, context: Context) -> NoReturn:
trigger=DateTimeTrigger(
moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger,
),
)
if AIRFLOW_V_3_0_PLUS
else DateTimeTrigger(moment=timezone.parse(self.target_time)),
)

def execute_complete(self, context: Context, event: Any = None) -> None:
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn

from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand Down Expand Up @@ -102,7 +119,9 @@ def __init__(

def execute(self, context: Context) -> NoReturn:
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger)
if AIRFLOW_V_3_0_PLUS
else DateTimeTrigger(moment=self.target_datetime),
method_name="execute_complete",
)

Expand Down
10 changes: 8 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -81,7 +82,10 @@ def execute(self, context: Context) -> bool | NoReturn:
# If the target datetime is in the past, return immediately
return True
try:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
if AIRFLOW_V_3_0_PLUS:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
else:
trigger = DateTimeTrigger(moment=target_dttm)
except (TypeError, ValueError) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
Expand Down Expand Up @@ -121,7 +125,9 @@ def __init__(
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True),
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True)
if AIRFLOW_V_3_0_PLUS
else TimeDeltaTrigger(self.time_to_wait),
method_name="execute_complete",
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@
from airflow.cli import cli_parser
from airflow.providers.amazon.aws.auth_manager.cli.avp_commands import init_avp, update_schema

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS
from tests_common.test_utils.config import conf_vars

mock_boto3 = Mock()

pytestmark = [
pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Test requires Airflow 2.8+"),
pytest.mark.skip_if_database_isolation_mode,
]
pytestmark = pytest.mark.skip_if_database_isolation_mode


@pytest.mark.db_test
Expand Down
31 changes: 10 additions & 21 deletions providers/tests/amazon/aws/auth_manager/test_aws_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
from flask import Flask, session
from flask_appbuilder.menu import MenuItem

from airflow.auth.managers.models.resource_details import (
AccessView,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
PoolDetails,
VariableDetails,
)
from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.auth_manager.avp.facade import AwsAuthManagerAmazonVerifiedPermissionsFacade
from airflow.providers.amazon.aws.auth_manager.aws_auth_manager import AwsAuthManager
Expand All @@ -39,30 +48,10 @@
from airflow.www import app as application
from airflow.www.extensions.init_appbuilder import init_appbuilder

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS, AIRFLOW_V_2_9_PLUS
from tests_common.test_utils.compat import AIRFLOW_V_2_9_PLUS
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.www import check_content_in_response

try:
from airflow.auth.managers.models.resource_details import (
AccessView,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
PoolDetails,
VariableDetails,
)
except ImportError:
if not AIRFLOW_V_2_8_PLUS:
pytest.skip(
"Skipping tests that require airflow.auth.managers.models.resource_details for Airflow < 2.8.0",
allow_module_level=True,
)
else:
raise


if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import ResourceMethod
from airflow.auth.managers.models.resource_details import AssetDetails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
from airflow.utils import db, timezone
from airflow.utils.types import DagRunType

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS


@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
def test_spark_kubernetes_operator(mock_kubernetes_hook, data_file):
Expand Down Expand Up @@ -780,9 +778,6 @@ def test_resolve_application_file_template_non_dictionary(dag_maker, tmp_path, b
@pytest.mark.parametrize(
"use_literal_value", [pytest.param(True, id="literal-value"), pytest.param(False, id="whitespace-compat")]
)
@pytest.mark.skipif(
not AIRFLOW_V_2_8_PLUS, reason="Skipping tests that require LiteralValue for Airflow < 2.8.0"
)
def test_resolve_application_file_real_file(
create_task_instance_of_operator, tmp_path, use_literal_value, session
):
Expand Down Expand Up @@ -815,9 +810,6 @@ def test_resolve_application_file_real_file(


@pytest.mark.db_test
@pytest.mark.skipif(
not AIRFLOW_V_2_8_PLUS, reason="Skipping tests that require LiteralValue for Airflow < 2.8.0"
)
def test_resolve_application_file_real_file_not_exists(create_task_instance_of_operator, tmp_path, session):
application_file = (tmp_path / "test-application-file.yml").resolve().as_posix()
from airflow.template.templater import LiteralValue
Expand Down
6 changes: 0 additions & 6 deletions providers/tests/common/sql/hooks/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@
from airflow.models import Connection
from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler, fetch_one_handler

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS

pytestmark = [
pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Tests for Airflow 2.8.0+ only"),
]


class DbApiHookInProvider(DbApiHook):
conn_name_attr = "test_conn_id"
Expand Down
5 changes: 0 additions & 5 deletions providers/tests/common/sql/hooks/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
from airflow.utils.session import provide_session

from providers.tests.common.sql.test_utils import mock_hook
from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS

pytestmark = [
pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Tests for Airflow 2.8.0+ only"),
]

TASK_ID = "sql-operator"
HOST = "host"
Expand Down
6 changes: 0 additions & 6 deletions providers/tests/common/sql/hooks/test_sqlparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@

from airflow.providers.common.sql.hooks.sql import DbApiHook

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS

pytestmark = [
pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Tests for Airflow 2.8.0+ only"),
]


@pytest.mark.parametrize(
"line,parsed_statements",
Expand Down
3 changes: 1 addition & 2 deletions providers/tests/common/sql/operators/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@
from airflow.utils.session import create_session
from airflow.utils.state import State

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS, AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.providers import get_provider_min_airflow_version

if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType

pytestmark = [
pytest.mark.db_test,
pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Tests for Airflow 2.8.0+ only"),
pytest.mark.skip_if_database_isolation_mode,
]

Expand Down
6 changes: 0 additions & 6 deletions providers/tests/common/sql/operators/test_sql_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.openlineage.extractors.base import OperatorLineage

from tests_common.test_utils.compat import AIRFLOW_V_2_8_PLUS

pytestmark = [
pytest.mark.skipif(not AIRFLOW_V_2_8_PLUS, reason="Tests for Airflow 2.8.0+ only"),
]

DATE = "2017-04-20"
TASK_ID = "sql-operator"

Expand Down
Loading

0 comments on commit 7021195

Please sign in to comment.