Skip to content

Commit

Permalink
Add pendulum 3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
gibsondan committed Jan 29, 2024
1 parent 41f1f3c commit a9839bc
Show file tree
Hide file tree
Showing 36 changed files with 497 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,17 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
"storage_tests_sqlalchemy_1_4",
"daemon_sensor_tests",
"daemon_tests",
"definitions_tests_old_pendulum",
"definitions_tests",
"definitions_tests_pendulum_1",
"definitions_tests_pendulum_2",
"general_tests",
"general_tests_old_protobuf",
"scheduler_tests",
"scheduler_tests_old_pendulum",
"scheduler_tests_pendulum_1",
"scheduler_tests_pendulum_2",
"execution_tests",
"storage_tests",
"type_signature_tests",
"definitions_tests",
"asset_defs_tests",
"launcher_tests",
"logging_tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
all_daemons_healthy,
)
from dagster._seven.compat.pendulum import pendulum_freeze_time
from utils import start_daemon


Expand All @@ -23,5 +24,5 @@ def test_heartbeat():
+ DEFAULT_DAEMON_HEARTBEAT_TOLERANCE_SECONDS
+ 5
)
with pendulum.test(frozen_datetime):
with pendulum_freeze_time(frozen_datetime):
assert all_daemons_healthy(instance) is False
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.workspace.context import WorkspaceRequestContext
from dagster._seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime
from dagster._seven.compat.pendulum import create_pendulum_time
from dagster._seven.compat.pendulum import create_pendulum_time, pendulum_freeze_time
from dagster._utils import Counter, traced_counter
from dagster_graphql.implementation.utils import UserFacingGraphQLError
from dagster_graphql.test.utils import (
Expand Down Expand Up @@ -697,7 +697,7 @@ def test_unloadable_schedule(graphql_context):

stopped_origin = _get_unloadable_schedule_origin("unloadable_stopped")

with pendulum.test(initial_datetime):
with pendulum_freeze_time(initial_datetime):
instance.add_instigator_state(running_instigator_state)

instance.add_instigator_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dagster._core.workspace.context import WorkspaceRequestContext
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration
from dagster._seven.compat.pendulum import pendulum_freeze_time
from dagster._utils import Counter, traced_counter
from dagster._utils.error import SerializableErrorInfo
from dagster_graphql.implementation.utils import UserFacingGraphQLError
Expand Down Expand Up @@ -1062,15 +1063,15 @@ def test_sensor_tick_range(graphql_context: WorkspaceRequestContext):

now = pendulum.now("US/Central")
one = now.subtract(days=2).subtract(hours=1)
with pendulum.test(one):
with pendulum_freeze_time(one):
_create_tick(graphql_context)

two = now.subtract(days=1).subtract(hours=1)
with pendulum.test(two):
with pendulum_freeze_time(two):
_create_tick(graphql_context)

three = now.subtract(hours=1)
with pendulum.test(three):
with pendulum_freeze_time(three):
_create_tick(graphql_context)

result = execute_dagster_graphql(
Expand Down Expand Up @@ -1174,7 +1175,7 @@ def test_sensor_ticks_filtered(graphql_context: WorkspaceRequestContext):
)

now = pendulum.now("US/Central")
with pendulum.test(now):
with pendulum_freeze_time(now):
_create_tick(graphql_context) # create a success tick

# create a started tick
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import datetime
from typing import TYPE_CHECKING, AbstractSet, Optional, Sequence, Tuple

import pendulum

from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._seven.compat.pendulum import (
PendulumInterval,
)
from dagster._utils.schedules import cron_string_iterator

if TYPE_CHECKING:
Expand All @@ -27,7 +28,7 @@ def get_execution_period_for_policy(
freshness_policy: FreshnessPolicy,
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> pendulum.Period:
) -> PendulumInterval:
if freshness_policy.cron_schedule:
tick_iterator = cron_string_iterator(
start_timestamp=current_time.timestamp(),
Expand All @@ -41,18 +42,18 @@ def get_execution_period_for_policy(
tick = next(tick_iterator)
required_data_time = tick - freshness_policy.maximum_lag_delta
if effective_data_time is None or effective_data_time < required_data_time:
return pendulum.Period(start=required_data_time, end=tick)
return PendulumInterval(start=required_data_time, end=tick)

else:
# occurs when asset is missing
if effective_data_time is None:
return pendulum.Period(
return PendulumInterval(
# require data from at most maximum_lag_delta ago
start=current_time - freshness_policy.maximum_lag_delta,
# this data should be available as soon as possible
end=current_time,
)
return pendulum.Period(
return PendulumInterval(
# we don't want to execute this too frequently
start=effective_data_time + 0.9 * freshness_policy.maximum_lag_delta,
end=max(effective_data_time + freshness_policy.maximum_lag_delta, current_time),
Expand All @@ -64,7 +65,7 @@ def get_execution_period_and_evaluation_data_for_policies(
policies: AbstractSet[FreshnessPolicy],
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> Tuple[Optional[pendulum.Period], Optional["TextRuleEvaluationData"]]:
) -> Tuple[Optional[PendulumInterval], Optional["TextRuleEvaluationData"]]:
"""Determines a range of times for which you can kick off an execution of this asset to solve
the most pressing constraint, alongside a maximum number of additional constraints.
"""
Expand All @@ -84,7 +85,7 @@ def get_execution_period_and_evaluation_data_for_policies(
if merged_period is None:
merged_period = period
elif period.start <= merged_period.end:
merged_period = pendulum.Period(
merged_period = PendulumInterval(
start=max(period.start, merged_period.start),
end=period.end,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import datetime
from typing import AbstractSet, NamedTuple, Optional

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._serdes import whitelist_for_serdes
from dagster._seven.compat.pendulum import pendulum_create_timezone
from dagster._utils.schedules import (
is_valid_cron_schedule,
reverse_cron_string_iterator,
Expand Down Expand Up @@ -122,7 +121,7 @@ def __new__(
)
try:
# Verify that the timezone can be loaded
pendulum.tz.timezone(cron_schedule_timezone) # type: ignore
pendulum_create_timezone(cron_schedule_timezone) # type: ignore
except Exception as e:
raise DagsterInvalidDefinitionError(
"Invalid cron schedule timezone '{cron_schedule_timezone}'. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
cast,
)

import pendulum
from typing_extensions import TypeAlias

import dagster._check as check
Expand All @@ -29,6 +28,7 @@
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._serdes import whitelist_for_serdes
from dagster._seven.compat.pendulum import pendulum_create_timezone
from dagster._utils import IHasInternalInit, ensure_gen
from dagster._utils.merger import merge_dicts
from dagster._utils.schedules import is_valid_cron_schedule
Expand Down Expand Up @@ -692,7 +692,7 @@ def _execution_fn(context: ScheduleEvaluationContext) -> RunRequestIterator:
if self._execution_timezone:
try:
# Verify that the timezone can be loaded
pendulum.tz.timezone(self._execution_timezone) # type: ignore
pendulum_create_timezone(self._execution_timezone)
except Exception as e:
raise DagsterInvalidDefinitionError(
f"Invalid execution timezone {self._execution_timezone} for {name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
unpack_value,
)
from dagster._seven.compat.pendulum import (
_IS_PENDULUM_2,
_IS_PENDULUM_1,
PRE_TRANSITION,
PendulumDateTime,
create_pendulum_time,
to_timezone,
Expand Down Expand Up @@ -146,7 +147,7 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
# Pendulum 1.x erroneously believes that there are two instances of the *second* hour after
# a datetime transition, so to work around this we calculate the timestamp of the next
# microsecond of the given datetime.
dt_microsecond = dt.microsecond + 1 if not _IS_PENDULUM_2 else dt.microsecond
dt_microsecond = dt.microsecond + 1 if _IS_PENDULUM_1 else dt.microsecond
dt = create_pendulum_time(
dt.year,
dt.month,
Expand All @@ -156,9 +157,9 @@ def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> PendulumDateTime:
dt.second,
dt_microsecond,
tz=tz,
dst_rule=pendulum.PRE_TRANSITION,
dst_rule=PRE_TRANSITION,
)
if not _IS_PENDULUM_2:
if _IS_PENDULUM_1:
dt = dt.add(microseconds=-1)
return dt

Expand Down
73 changes: 63 additions & 10 deletions python_modules/dagster/dagster/_seven/compat/pendulum.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,42 @@
import pendulum
from typing_extensions import TypeAlias

_IS_PENDULUM_2 = (
_IS_PENDULUM_1 = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 2
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 1
)

_IS_PENDULUM_3 = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 3
)

POST_TRANSITION = pendulum.tz.POST_TRANSITION if _IS_PENDULUM_3 else pendulum.POST_TRANSITION
PRE_TRANSITION = pendulum.tz.PRE_TRANSITION if _IS_PENDULUM_3 else pendulum.PRE_TRANSITION
TRANSITION_ERROR = pendulum.tz.TRANSITION_ERROR if _IS_PENDULUM_3 else pendulum.TRANSITION_ERROR


def pendulum_create_timezone(tz_name: str):
if _IS_PENDULUM_3:
from pendulum.tz.timezone import Timezone

return Timezone(tz_name)
else:
return pendulum.tz.timezone(tz_name) # type: ignore


@contextmanager
def mock_pendulum_timezone(override_timezone):
if _IS_PENDULUM_2:
with pendulum.tz.test_local_timezone(pendulum.tz.timezone(override_timezone)):
if _IS_PENDULUM_1:
with pendulum.tz.LocalTimezone.test(pendulum.Timezone.load(override_timezone)):
yield
else:
with pendulum.tz.LocalTimezone.test(pendulum.Timezone.load(override_timezone)):
with pendulum.tz.test_local_timezone(pendulum.tz.timezone(override_timezone)):
yield


def create_pendulum_time(year, month, day, *args, **kwargs):
if "tz" in kwargs and "dst_rule" in kwargs and not _IS_PENDULUM_2:
if "tz" in kwargs and "dst_rule" in kwargs and _IS_PENDULUM_1:
tz = pendulum.timezone(kwargs.pop("tz"))
dst_rule = kwargs.pop("dst_rule")

Expand All @@ -39,18 +57,53 @@ def create_pendulum_time(year, month, day, *args, **kwargs):
)
)

if "dst_rule" in kwargs and _IS_PENDULUM_3:
dst_rule = kwargs.pop("dst_rule")
if dst_rule == PRE_TRANSITION:
kwargs["fold"] = 0
elif dst_rule == POST_TRANSITION:
kwargs["fold"] = 1
elif dst_rule == TRANSITION_ERROR:
tz_name = kwargs.pop("tz")
assert tz_name
return pendulum.instance(
pendulum_create_timezone(tz_name).convert(
datetime.datetime(
year,
month,
day,
*args,
**kwargs,
),
raise_on_unknown_times=True,
)
)

return (
pendulum.datetime(year, month, day, *args, **kwargs)
if _IS_PENDULUM_2
else pendulum.create(year, month, day, *args, **kwargs)
pendulum.create(year, month, day, *args, **kwargs)
if _IS_PENDULUM_1
else pendulum.datetime(year, month, day, *args, **kwargs)
)


PendulumDateTime: TypeAlias = (
pendulum.DateTime if _IS_PENDULUM_2 else pendulum.Pendulum # type: ignore[attr-defined]
pendulum.Pendulum if _IS_PENDULUM_1 else pendulum.DateTime # type: ignore[attr-defined]
)

PendulumInterval: TypeAlias = (
pendulum.Interval if _IS_PENDULUM_3 else pendulum.Period # type: ignore[attr-defined]
)


@contextmanager
def pendulum_freeze_time(t):
if _IS_PENDULUM_3:
yield from pendulum.travel_to(t, freeze=True)
else:
with pendulum.test(t) as frozen_time:
yield frozen_time


# Workaround for issue with .in_tz() in pendulum:
# https://github.com/sdispater/pendulum/issues/535
def to_timezone(dt: PendulumDateTime, tz: str):
Expand Down
Loading

0 comments on commit a9839bc

Please sign in to comment.