Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for pendulum 3 #19456

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import datetime
from typing import TYPE_CHECKING, AbstractSet, Optional, Sequence, Tuple

import pendulum

from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._seven.compat.pendulum import Period
from dagster._utils.schedules import cron_string_iterator

if TYPE_CHECKING:
Expand All @@ -27,7 +26,7 @@ def get_execution_period_for_policy(
freshness_policy: FreshnessPolicy,
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> pendulum.Period:
) -> Period:
if freshness_policy.cron_schedule:
tick_iterator = cron_string_iterator(
start_timestamp=current_time.timestamp(),
Expand All @@ -41,18 +40,18 @@ def get_execution_period_for_policy(
tick = next(tick_iterator)
required_data_time = tick - freshness_policy.maximum_lag_delta
if effective_data_time is None or effective_data_time < required_data_time:
return pendulum.Period(start=required_data_time, end=tick)
return Period(start=required_data_time, end=tick)

else:
# occurs when asset is missing
if effective_data_time is None:
return pendulum.Period(
return Period(
# require data from at most maximum_lag_delta ago
start=current_time - freshness_policy.maximum_lag_delta,
# this data should be available as soon as possible
end=current_time,
)
return pendulum.Period(
return Period(
# we don't want to execute this too frequently
start=effective_data_time + 0.9 * freshness_policy.maximum_lag_delta,
end=max(effective_data_time + freshness_policy.maximum_lag_delta, current_time),
Expand All @@ -64,7 +63,7 @@ def get_execution_period_and_evaluation_data_for_policies(
policies: AbstractSet[FreshnessPolicy],
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> Tuple[Optional[pendulum.Period], Optional["TextRuleEvaluationData"]]:
) -> Tuple[Optional[Period], Optional["TextRuleEvaluationData"]]:
"""Determines a range of times for which you can kick off an execution of this asset to solve
the most pressing constraint, alongside a maximum number of additional constraints.
"""
Expand All @@ -84,7 +83,7 @@ def get_execution_period_and_evaluation_data_for_policies(
if merged_period is None:
merged_period = period
elif period.start <= merged_period.end:
merged_period = pendulum.Period(
merged_period = Period(
start=max(period.start, merged_period.start),
end=period.end,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import datetime
from typing import AbstractSet, NamedTuple, Optional

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._serdes import whitelist_for_serdes
from dagster._seven.compat.pendulum import create_pendulum_timezone
from dagster._utils.schedules import (
is_valid_cron_schedule,
reverse_cron_string_iterator,
Expand Down Expand Up @@ -122,7 +121,7 @@ def __new__(
)
try:
# Verify that the timezone can be loaded
pendulum.tz.timezone(cron_schedule_timezone) # type: ignore
create_pendulum_timezone(cron_schedule_timezone) # type: ignore
except Exception as e:
raise DagsterInvalidDefinitionError(
"Invalid cron schedule timezone '{cron_schedule_timezone}'. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
cast,
)

import pendulum
from typing_extensions import TypeAlias

import dagster._check as check
Expand All @@ -29,6 +28,7 @@
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._serdes import whitelist_for_serdes
from dagster._seven.compat.pendulum import create_pendulum_timezone
from dagster._utils import IHasInternalInit, ensure_gen
from dagster._utils.merger import merge_dicts
from dagster._utils.schedules import is_valid_cron_schedule
Expand Down Expand Up @@ -692,7 +692,7 @@ def _execution_fn(context: ScheduleEvaluationContext) -> RunRequestIterator:
if self._execution_timezone:
try:
# Verify that the timezone can be loaded
pendulum.tz.timezone(self._execution_timezone) # type: ignore
create_pendulum_timezone(self._execution_timezone) # type: ignore
except Exception as e:
raise DagsterInvalidDefinitionError(
f"Invalid execution timezone {self._execution_timezone} for {name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
unpack_value,
)
from dagster._seven.compat.pendulum import (
_IS_PENDULUM_2,
_IS_PENDULUM_2_OR_NEWER,
PRE_TRANSITION,
PendulumDateTime,
create_pendulum_time,
to_timezone,
Expand Down Expand Up @@ -146,7 +147,7 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
# Pendulum 1.x erroneously believes that there are two instances of the *second* hour after
# a datetime transition, so to work around this we calculate the timestamp of the next
# microsecond of the given datetime.
dt_microsecond = dt.microsecond + 1 if not _IS_PENDULUM_2 else dt.microsecond
dt_microsecond = dt.microsecond + 1 if not _IS_PENDULUM_2_OR_NEWER else dt.microsecond
dt = create_pendulum_time(
dt.year,
dt.month,
Expand All @@ -156,9 +157,9 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
dt.second,
dt_microsecond,
tz=tz,
dst_rule=pendulum.PRE_TRANSITION,
dst_rule=PRE_TRANSITION,
)
if not _IS_PENDULUM_2:
if not _IS_PENDULUM_2_OR_NEWER:
dt = dt.add(microseconds=-1)
return dt

Expand Down
104 changes: 81 additions & 23 deletions python_modules/dagster/dagster/_seven/compat/pendulum.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,109 @@
import pendulum
from typing_extensions import TypeAlias

_IS_PENDULUM_2 = (
_IS_PENDULUM_2_OR_NEWER = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 2
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") >= 2
)

_IS_PENDULUM_3_OR_NEWER = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") >= 3
)


PRE_TRANSITION = pendulum.tz.PRE_TRANSITION if _IS_PENDULUM_3_OR_NEWER else pendulum.PRE_TRANSITION
POST_TRANSITION = (
pendulum.tz.POST_TRANSITION if _IS_PENDULUM_3_OR_NEWER else pendulum.POST_TRANSITION
)
TRANSITION_ERROR = (
pendulum.tz.TRANSITION_ERROR if _IS_PENDULUM_3_OR_NEWER else pendulum.TRANSITION_ERROR
)


@contextmanager
def mock_pendulum_timezone(override_timezone):
if _IS_PENDULUM_2:
if _IS_PENDULUM_2_OR_NEWER:
with pendulum.tz.test_local_timezone(pendulum.tz.timezone(override_timezone)):
yield
else:
with pendulum.tz.LocalTimezone.test(pendulum.Timezone.load(override_timezone)):
yield


@contextmanager
def pendulum_test(mock):
if _IS_PENDULUM_3_OR_NEWER:
with pendulum.travel_to(mock, freeze=True):
yield
else:
with pendulum.test(mock):
yield


def create_pendulum_time(year, month, day, *args, **kwargs):
if "tz" in kwargs and "dst_rule" in kwargs and not _IS_PENDULUM_2:
tz = pendulum.timezone(kwargs.pop("tz"))
dst_rule = kwargs.pop("dst_rule")

return pendulum.instance(
tz.convert(
datetime.datetime(
year,
month,
day,
*args,
**kwargs,
),
dst_rule=dst_rule,
# pendulum <2.0
if not _IS_PENDULUM_2_OR_NEWER:
if "tz" in kwargs and "dst_rule" in kwargs:
tz = pendulum.timezone(kwargs.pop("tz"))
dst_rule = kwargs.pop("dst_rule")

return pendulum.instance(
tz.convert(
datetime.datetime(
year,
month,
day,
*args,
**kwargs,
),
dst_rule=dst_rule,
)
)
else:
pendulum.create(year, month, day, *args, **kwargs)

# pendulum >=3.0
elif _IS_PENDULUM_3_OR_NEWER:
raise_on_unknown_times = False
fold = 1

if "dst_rule" in kwargs:
dst_rule = kwargs.pop("dst_rule")
raise_on_unknown_times = dst_rule == TRANSITION_ERROR
if dst_rule == PRE_TRANSITION:
fold = 0
elif dst_rule == POST_TRANSITION:
fold = 1

return pendulum.datetime(
year,
month,
day,
*args,
fold=fold,
raise_on_unknown_times=raise_on_unknown_times,
**kwargs,
)

return (
pendulum.datetime(year, month, day, *args, **kwargs)
if _IS_PENDULUM_2
else pendulum.create(year, month, day, *args, **kwargs)
)
# pendulum >=2.0,<3.0
else:
return pendulum.datetime(year, month, day, *args, **kwargs)


def create_pendulum_timezone(*args, **kwargs):
if _IS_PENDULUM_3_OR_NEWER:
return pendulum.timezone(*args, **kwargs)
else:
return pendulum.tz.timezone(*args, **kwargs)


PendulumDateTime: TypeAlias = (
pendulum.DateTime if _IS_PENDULUM_2 else pendulum.Pendulum # type: ignore[attr-defined]
pendulum.DateTime if _IS_PENDULUM_2_OR_NEWER else pendulum.Pendulum # type: ignore[attr-defined]
)

Period: TypeAlias = pendulum.Interval if _IS_PENDULUM_3_OR_NEWER else pendulum.Period # type: ignore[attr-defined]


# Workaround for issue with .in_tz() in pendulum:
# https://github.com/sdispater/pendulum/issues/535
Expand Down
28 changes: 17 additions & 11 deletions python_modules/dagster/dagster/_utils/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

import dagster._check as check
from dagster._core.definitions.partition import ScheduleType
from dagster._seven.compat.pendulum import PendulumDateTime, create_pendulum_time
from dagster._seven.compat.pendulum import (
POST_TRANSITION,
PRE_TRANSITION,
TRANSITION_ERROR,
PendulumDateTime,
create_pendulum_time,
)

# Monthly schedules with 29-31 won't reliably run every month
MAX_DAY_OF_MONTH_WITH_GUARANTEED_MONTHLY_INTERVAL = 28
Expand Down Expand Up @@ -99,7 +105,7 @@ def _replace_date_fields(
0,
0,
tz=pendulum_date.timezone_name,
dst_rule=pendulum.TRANSITION_ERROR,
dst_rule=TRANSITION_ERROR,
)
except pendulum.tz.exceptions.NonExistingTime: # type: ignore
# If we fall on a non-existant time (e.g. between 2 and 3AM during a DST transition)
Expand All @@ -114,7 +120,7 @@ def _replace_date_fields(
0,
0,
tz=pendulum_date.timezone_name,
dst_rule=pendulum.TRANSITION_ERROR,
dst_rule=TRANSITION_ERROR,
)
except pendulum.tz.exceptions.AmbiguousTime: # type: ignore
# For consistency, always choose the latter of the two possible times during a fall DST
Expand All @@ -129,7 +135,7 @@ def _replace_date_fields(
0,
0,
tz=pendulum_date.timezone_name,
dst_rule=pendulum.POST_TRANSITION,
dst_rule=POST_TRANSITION,
)

return new_time
Expand Down Expand Up @@ -288,7 +294,7 @@ def _find_weekly_schedule_time(
)

# Move to the correct day of the week
current_day_of_week = new_time.day_of_week
current_day_of_week = new_time.isoweekday() % 7
if day_of_week != current_day_of_week:
if ascending:
new_time = new_time.add(days=(day_of_week - current_day_of_week) % 7)
Expand Down Expand Up @@ -436,7 +442,7 @@ def _get_dates_to_consider_after_ambigious_time(
next_date.second,
next_date.microsecond,
tz=timezone_str,
dst_rule=pendulum.POST_TRANSITION,
dst_rule=POST_TRANSITION,
)

dates_to_consider = [post_transition_time]
Expand All @@ -458,7 +464,7 @@ def _get_dates_to_consider_after_ambigious_time(
next_date.second,
next_date.microsecond,
tz=timezone_str,
dst_rule=pendulum.PRE_TRANSITION,
dst_rule=PRE_TRANSITION,
)
dates_to_consider.append(pre_transition_time)

Expand Down Expand Up @@ -493,7 +499,7 @@ def _get_dates_to_consider_after_ambigious_time(
next_date.second,
next_date.microsecond,
tz=timezone_str,
dst_rule=pendulum.PRE_TRANSITION,
dst_rule=PRE_TRANSITION,
)
dates_to_consider.append(curr_pre_transition_time)

Expand All @@ -506,7 +512,7 @@ def _get_dates_to_consider_after_ambigious_time(
next_date.second,
next_date.microsecond,
tz=timezone_str,
dst_rule=pendulum.POST_TRANSITION,
dst_rule=POST_TRANSITION,
)
dates_to_consider.append(curr_post_transition_time)

Expand Down Expand Up @@ -590,7 +596,7 @@ def _timezone_aware_cron_iter(
next_date.second,
next_date.microsecond,
tz=timezone_str,
dst_rule=pendulum.TRANSITION_ERROR,
dst_rule=TRANSITION_ERROR,
)
]
except pendulum.tz.exceptions.NonExistingTime: # type:ignore
Expand All @@ -610,7 +616,7 @@ def _timezone_aware_cron_iter(
0,
0,
tz=timezone_str,
dst_rule=pendulum.TRANSITION_ERROR,
dst_rule=TRANSITION_ERROR,
)
]
except pendulum.tz.exceptions.AmbiguousTime: # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._seven import get_current_datetime_in_utc
from dagster._seven.compat.pendulum import pendulum_test
from dagster._utils.error import SerializableErrorInfo


Expand Down Expand Up @@ -327,7 +328,7 @@ def test_update_tick_to_success(self, storage):

freeze_datetime = pendulum.now("UTC")

with pendulum.test(freeze_datetime):
with pendulum_test(freeze_datetime):
updated_tick = tick.with_status(TickStatus.SUCCESS).with_run_info(run_id="1234")
assert updated_tick.status == TickStatus.SUCCESS
assert updated_tick.end_timestamp == freeze_datetime.timestamp()
Expand Down
Loading