Skip to content

Commit

Permalink
day of week fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gibsondan committed Jan 29, 2024
1 parent 9bfd001 commit ce5a40f
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def pack(
self, datetime: Optional[datetime], whitelist_map: WhitelistMap, descent_path: str
) -> Optional[Mapping[str, Any]]:
if datetime:
check.invariant(datetime.tzinfo is not None)
check.invariant(datetime.tzinfo is not None, "No timezone set")
pendulum_datetime = pendulum.instance(datetime, tz=datetime.tzinfo)
timezone_name = pendulum_datetime.timezone.name

Expand Down
19 changes: 16 additions & 3 deletions python_modules/dagster/dagster/_seven/compat/pendulum.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import datetime
from contextlib import contextmanager
from unittest import mock

import packaging.version
import pendulum
from typing_extensions import TypeAlias

_IS_PENDULUM_1 = (
_IS_PENDULUM_2 = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 1
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 2
)

_IS_PENDULUM_3 = (
hasattr(pendulum, "__version__")
and getattr(packaging.version.parse(getattr(pendulum, "__version__")), "major") == 3
)

# pendulum 1 has no __version__ property
_IS_PENDULUM_1 = not _IS_PENDULUM_2 and not _IS_PENDULUM_3

POST_TRANSITION = pendulum.tz.POST_TRANSITION if _IS_PENDULUM_3 else pendulum.POST_TRANSITION
PRE_TRANSITION = pendulum.tz.PRE_TRANSITION if _IS_PENDULUM_3 else pendulum.PRE_TRANSITION
TRANSITION_ERROR = pendulum.tz.TRANSITION_ERROR if _IS_PENDULUM_3 else pendulum.TRANSITION_ERROR
Expand Down Expand Up @@ -98,7 +102,8 @@ def create_pendulum_time(year, month, day, *args, **kwargs):
@contextmanager
def pendulum_freeze_time(t):
if _IS_PENDULUM_3:
yield from pendulum.travel_to(t, freeze=True)
with mock.patch("pendulum.now", return_value=t):
yield
else:
with pendulum.test(t) as frozen_time:
yield frozen_time
Expand All @@ -112,3 +117,11 @@ def to_timezone(dt: PendulumDateTime, tz: str):
if timestamp < 0:
return pendulum.from_timestamp(0, tz=tz) + datetime.timedelta(seconds=timestamp)
return pendulum.from_timestamp(dt.timestamp(), tz=tz)


def get_crontab_day_of_week(dt: PendulumDateTime) -> int:
if _IS_PENDULUM_3:
# In pendulum 3, 0-6 is Monday-Sunday (unlike crontab, where 0-6 is Sunday-Saturday)
return (dt.day_of_week + 1) % 7
else:
return dt.day_of_week
5 changes: 3 additions & 2 deletions python_modules/dagster/dagster/_utils/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TRANSITION_ERROR,
PendulumDateTime,
create_pendulum_time,
get_crontab_day_of_week,
)

# Monthly schedules with 29-31 won't reliably run every month
Expand Down Expand Up @@ -292,9 +293,9 @@ def _find_weekly_schedule_time(
minute,
pendulum_date.day,
)

# Move to the correct day of the week
current_day_of_week = new_time.day_of_week
current_day_of_week = get_crontab_day_of_week(new_time)

if day_of_week != current_day_of_week:
if ascending:
new_time = new_time.add(days=(day_of_week - current_day_of_week) % 7)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._daemon import get_default_daemon_logger
from dagster._daemon.sensor import execute_sensor_iteration, execute_sensor_iteration_loop
from dagster._seven.compat.pendulum import create_pendulum_time, pendulum_freeze_time, to_timezone
from dagster._seven.compat.pendulum import (
_IS_PENDULUM_3,
create_pendulum_time,
pendulum_freeze_time,
to_timezone,
)

from .conftest import create_workspace_load_target

Expand Down Expand Up @@ -1541,6 +1546,7 @@ def test_custom_interval_sensor(executor, instance, workspace_context, external_
validate_tick(ticks[0], external_sensor, expected_datetime, TickStatus.SKIPPED)


@pytest.mark.skipif(_IS_PENDULUM_3, reason="pendulum.set_test_now not supported in pendulum 3")
def test_custom_interval_sensor_with_offset(
monkeypatch, executor, instance, workspace_context, external_repo
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def my_graph():
def test_daily_schedule():
@daily_partitioned_config(start_date="2021-05-05")
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()

Expand Down Expand Up @@ -102,7 +102,7 @@ def _repo():
def test_daily_schedule_with_offsets():
@daily_partitioned_config(start_date="2021-05-05", minute_offset=15, hour_offset=2)
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-05"
Expand Down Expand Up @@ -140,7 +140,7 @@ def _repo():
def test_hourly_schedule():
@hourly_partitioned_config(start_date=datetime(2021, 5, 5))
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-05-00:00"
Expand Down Expand Up @@ -178,7 +178,7 @@ def _repo():
def test_hourly_schedule_with_offsets():
@hourly_partitioned_config(start_date=datetime(2021, 5, 5), minute_offset=20)
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-05-00:20"
Expand Down Expand Up @@ -212,7 +212,7 @@ def _repo():
def test_weekly_schedule():
@weekly_partitioned_config(start_date="2021-05-05")
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-09"
Expand Down Expand Up @@ -250,7 +250,7 @@ def test_weekly_schedule_with_offsets():
start_date="2021-05-05", minute_offset=10, hour_offset=13, day_offset=3
)
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-05"
Expand Down Expand Up @@ -286,7 +286,7 @@ def _repo():
def test_monthly_schedule():
@monthly_partitioned_config(start_date="2021-05-05")
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-06-01"
Expand Down Expand Up @@ -324,7 +324,7 @@ def test_monthly_schedule_late_in_month():
start_date="2021-05-05", minute_offset=15, hour_offset=16, day_offset=31
)
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-31"
Expand All @@ -336,7 +336,7 @@ def test_monthly_schedule_with_offsets():
start_date="2021-05-05", minute_offset=15, hour_offset=16, day_offset=12
)
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

keys = my_partitioned_config.get_partition_keys()
assert keys[0] == "2021-05-12"
Expand Down Expand Up @@ -395,7 +395,7 @@ def test_future_tick():

@daily_partitioned_config(start_date="2021-05-05")
def my_partitioned_config(start, end):
return {"start": str(start), "end": str(end)}
return {"start": start.isoformat(), "end": end.isoformat()}

my_schedule = schedule_for_partitioned_config(my_partitioned_config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._serdes import deserialize_value, serialize_value, whitelist_for_serdes
from dagster._seven.compat.pendulum import create_pendulum_time
from dagster._utils import utc_datetime_from_timestamp
from dagster._seven.compat.pendulum import create_pendulum_time, pendulum_freeze_time
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE

DATE_FORMAT = "%Y-%m-%d"
Expand Down Expand Up @@ -1483,12 +1482,15 @@ def test_datetime_field_serializer():
class Foo(NamedTuple):
dt: datetime

utc_datetime_with_no_timezone = Foo(
dt=utc_datetime_from_timestamp(pendulum.now("UTC").timestamp())
)
with pytest.raises(CheckError, match="not a valid IANA timezone"):
utc_datetime_with_no_timezone = Foo(dt=datetime.now())
with pytest.raises(CheckError, match="No timezone set"):
serialize_value(utc_datetime_with_no_timezone)

assert (
deserialize_value(serialize_value(Foo(dt=pendulum.now("US/Pacific")))).dt.timezone_name
== "US/Pacific"
)


def test_cannot_pickle_time_window_partitions_def():
import datetime
Expand Down Expand Up @@ -1522,5 +1524,5 @@ def test_get_partition_keys_not_in_subset_empty_subset() -> None:
time_windows_subset = TimeWindowPartitionsSubset(
partitions_def, num_partitions=0, included_time_windows=[]
)
with pendulum.test(create_pendulum_time(2023, 1, 1)):
with pendulum_freeze_time(create_pendulum_time(2023, 1, 1)):
assert time_windows_subset.get_partition_keys_not_in_subset(partitions_def) == []

0 comments on commit ce5a40f

Please sign in to comment.