Skip to content

Commit

Permalink
Test standard provider with Airflow 2.8 and 2.9 (apache#43556)
Browse files Browse the repository at this point in the history
The standard provider has now min version of Airflow = 2.8
since apache#43553, but we have not tested it for Airflow 2.8 and 2.9.
  • Loading branch information
potiuk authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent 477ad40 commit 7070eb7
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 108 deletions.
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,13 @@ def get_airflow_extras():
{
"python-version": "3.9",
"airflow-version": "2.8.4",
"remove-providers": "cloudant fab edge standard",
"remove-providers": "cloudant fab edge",
"run-tests": "true",
},
{
"python-version": "3.9",
"airflow-version": "2.9.3",
"remove-providers": "cloudant edge standard",
"remove-providers": "cloudant edge",
"run-tests": "true",
},
{
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/howto/operator/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ With some limitations, you can also use ``Context`` in virtual environments.

You can also use ``get_current_context()`` in the same way as before, but with some limitations.

* Requires ``pydantic>=2``.
* Requires ``apache-airflow>=3.0.0``.

* Set ``use_airflow_context`` to ``True`` to call ``get_current_context()`` in the virtual environment.

Expand Down
9 changes: 9 additions & 0 deletions providers/src/airflow/providers/standard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from packaging.version import Version

from airflow import __version__ as airflow_version

AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.10.0")
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")
84 changes: 56 additions & 28 deletions providers/src/airflow/providers/standard/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast

import lazy_object_proxy
from packaging.version import Version

from airflow import __version__ as airflow_version
from airflow.exceptions import (
AirflowConfigException,
AirflowException,
Expand All @@ -50,21 +48,19 @@
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.providers.standard import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import Literal
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess, execute_in_subprocess_with_kwargs
from airflow.utils.session import create_session

log = logging.getLogger(__name__)

AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")

if TYPE_CHECKING:
from pendulum.datetime import DateTime

Expand Down Expand Up @@ -187,7 +183,15 @@ def __init__(
def execute(self, context: Context) -> Any:
context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
self.op_kwargs = self.determine_kwargs(context)
self._asset_events = context_get_outlet_events(context)

if AIRFLOW_V_3_0_PLUS:
from airflow.utils.context import context_get_outlet_events

self._asset_events = context_get_outlet_events(context)
elif AIRFLOW_V_2_10_PLUS:
from airflow.utils.context import context_get_outlet_events

self._dataset_events = context_get_outlet_events(context)

return_value = self.execute_callable()
if self.show_return_value_in_logs:
Expand All @@ -206,7 +210,15 @@ def execute_callable(self) -> Any:
:return: the return value of the call.
"""
runner = ExecutionCallableRunner(self.python_callable, self._asset_events, logger=self.log)
try:
from airflow.utils.operator_helpers import ExecutionCallableRunner

asset_events = self._asset_events if AIRFLOW_V_3_0_PLUS else self._dataset_events

runner = ExecutionCallableRunner(self.python_callable, asset_events, logger=self.log)
except ImportError:
# Handle Pre Airflow 3.10 case where ExecutionCallableRunner was not available
return self.python_callable(*self.op_args, **self.op_kwargs)
return runner.run(*self.op_args, **self.op_kwargs)


Expand Down Expand Up @@ -348,7 +360,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
"ds_nodash",
"expanded_ti_count",
"inlets",
"map_index_template",
"next_ds",
"next_ds_nodash",
"outlets",
Expand Down Expand Up @@ -551,18 +562,25 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
env_vars.update(self.env_vars)

try:
execute_in_subprocess(
cmd=[
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
],
env=env_vars,
)
cmd: list[str] = [
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
]
if AIRFLOW_V_2_10_PLUS:
execute_in_subprocess(
cmd=cmd,
env=env_vars,
)
else:
execute_in_subprocess_with_kwargs(
cmd=cmd,
env=env_vars,
)
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
Expand Down Expand Up @@ -697,10 +715,15 @@ def __init__(
raise AirflowException(
"Passing non-string types (e.g. int or float) as python_version not supported"
)

if use_airflow_context and not AIRFLOW_V_3_0_PLUS:
raise AirflowException(
"The `use_airflow_context=True` is only supported in Airflow 3.0.0 and later."
)
if use_airflow_context and (not expect_airflow and not system_site_packages):
error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False."
raise AirflowException(error_msg)
raise AirflowException(
"The `use_airflow_context` parameter is set to True, but "
"expect_airflow and system_site_packages are set to False."
)
if not requirements:
self.requirements: list[str] = []
elif isinstance(requirements, str):
Expand Down Expand Up @@ -976,9 +999,14 @@ def __init__(
):
if not python:
raise ValueError("Python Path must be defined in ExternalPythonOperator")
if use_airflow_context and not AIRFLOW_V_3_0_PLUS:
raise AirflowException(
"The `use_airflow_context=True` is only supported in Airflow 3.0.0 and later."
)
if use_airflow_context and not expect_airflow:
error_msg = "use_airflow_context is set to True, but expect_airflow is set to False."
raise AirflowException(error_msg)
raise AirflowException(
"The `use_airflow_context` parameter is set to True, but expect_airflow is set to False."
)
self.python = python
self.expect_pendulum = expect_pendulum
super().__init__(
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/standard/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn, Sequence

from airflow.providers.standard import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand Down Expand Up @@ -125,7 +142,9 @@ def execute(self, context: Context) -> NoReturn:
trigger=DateTimeTrigger(
moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger,
),
)
if AIRFLOW_V_3_0_PLUS
else DateTimeTrigger(moment=timezone.parse(self.target_time)),
)

def execute_complete(self, context: Context, event: Any = None) -> None:
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn

from airflow.providers.standard import AIRFLOW_V_2_10_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand Down Expand Up @@ -102,7 +119,9 @@ def __init__(

def execute(self, context: Context) -> NoReturn:
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger)
if AIRFLOW_V_2_10_PLUS
else DateTimeTrigger(moment=self.target_datetime),
method_name="execute_complete",
)

Expand Down
10 changes: 8 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
from airflow.providers.standard import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -81,7 +82,10 @@ def execute(self, context: Context) -> bool | NoReturn:
# If the target datetime is in the past, return immediately
return True
try:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
if AIRFLOW_V_3_0_PLUS:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
else:
trigger = DateTimeTrigger(moment=target_dttm)
except (TypeError, ValueError) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
Expand Down Expand Up @@ -121,7 +125,9 @@ def __init__(
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True),
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True)
if AIRFLOW_V_3_0_PLUS
else TimeDeltaTrigger(self.time_to_wait),
method_name="execute_complete",
)
else:
Expand Down
1 change: 0 additions & 1 deletion providers/tests/common/sql/operators/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

pytestmark = [
pytest.mark.db_test,
pytest.mark.skipif(reason="Tests for Airflow 2.8.0+ only"),
pytest.mark.skip_if_database_isolation_mode,
]

Expand Down
12 changes: 3 additions & 9 deletions providers/tests/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType

BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
if not AIRFLOW_V_2_10_PLUS:
BASH_OPERATOR_PATH = "airflow.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.operators.python"


class SafeStrDict(dict):
def __str__(self):
Expand Down Expand Up @@ -276,7 +270,7 @@ def test_get_fully_qualified_class_name():
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

result = get_fully_qualified_class_name(BashOperator(task_id="test", bash_command="exit 0;"))
assert result == f"{BASH_OPERATOR_PATH}.BashOperator"
assert result == "airflow.providers.standard.operators.bash.BashOperator"

result = get_fully_qualified_class_name(OpenLineageAdapter())
assert result == "airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter"
Expand All @@ -292,8 +286,8 @@ def test_is_operator_disabled(mock_disabled_operators):
assert is_operator_disabled(op) is False

mock_disabled_operators.return_value = {
f"{BASH_OPERATOR_PATH}.BashOperator",
f"{PYTHON_OPERATOR_PATH}.PythonOperator",
"airflow.providers.standard.operators.bash.BashOperator",
"airflow.providers.standard.operators.python.PythonOperator",
}
assert is_operator_disabled(op) is True

Expand Down
5 changes: 1 addition & 4 deletions providers/tests/openlineage/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType

from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, BashOperator, PythonOperator
from tests_common.test_utils.compat import BashOperator, PythonOperator
from tests_common.test_utils.mock_operators import MockOperator

BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
if not AIRFLOW_V_2_10_PLUS:
BASH_OPERATOR_PATH = "airflow.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.operators.python"


class CustomOperatorForTest(BashOperator):
Expand Down
Loading

0 comments on commit 7070eb7

Please sign in to comment.