Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): Add Global Parent State Cursor #39593

Merged
merged 33 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
60d8c79
Add GlobalParentCursor
tolik0 Jun 19, 2024
9467198
Move `global_parent_cursor` to incremental sync
tolik0 Jun 19, 2024
2a5d5e5
Move last slice flag to StreamSlice
tolik0 Jun 19, 2024
f6fbd1d
Fix format
tolik0 Jun 19, 2024
d72b817
Fix docs
tolik0 Jun 19, 2024
af78dda
Add Slack changes
tolik0 Jul 9, 2024
f4f5b78
Small fix for Slack state migration
tolik0 Jul 9, 2024
72fc51f
Add Jira changes
tolik0 Jul 11, 2024
4f4461b
Add local filtering for Global Parent cursor
tolik0 Jul 11, 2024
2ff6f0e
Fix formatting
tolik0 Jul 12, 2024
8b3cf0b
Fix description
tolik0 Jul 12, 2024
bcd0645
Fix warnings
tolik0 Jul 16, 2024
5f15d19
Rename class and update the docs
tolik0 Jul 31, 2024
1d90e13
Fix mypy errors
tolik0 Jul 31, 2024
de255ce
Update docs
tolik0 Aug 1, 2024
85fe4cc
Add unit tests
tolik0 Aug 1, 2024
dd42be7
Delete connector changes
tolik0 Aug 1, 2024
b536e2c
Fix format
tolik0 Aug 1, 2024
23ea1ce
Delete Slack changes
tolik0 Aug 1, 2024
596b9c2
Add docs and fix small errors
tolik0 Aug 2, 2024
9f2d882
Update docs
tolik0 Aug 6, 2024
cdc0d6d
Add lookback window
tolik0 Aug 14, 2024
c2491ec
Update the docs with the lookback window
tolik0 Aug 14, 2024
6883e9b
Update incremental sync docs
tolik0 Aug 15, 2024
a2b210d
Update field description
tolik0 Aug 15, 2024
4cde7f0
Update class doc for GlobalSubstreamCursor
tolik0 Aug 15, 2024
1b60b3e
Refactor for concurrent CDK compatibility
tolik0 Aug 19, 2024
70bc219
Update docstring for stream_slices
tolik0 Aug 19, 2024
eed4423
Add comment with sequence for stream slices
tolik0 Aug 20, 2024
933af1e
Delete wrong change
tolik0 Aug 20, 2024
e4d5172
Fix Timer
tolik0 Sep 6, 2024
015114c
Fix tests
tolik0 Sep 6, 2024
95cd310
Fix formatting
tolik0 Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,11 @@ definitions:
description: Set to True if the target API does not accept queries where the start time equal the end time.
type: boolean
default: False
global_substream_cursor:
Copy link
Contributor

@girarda girarda Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tolik0 I thought we decided we wouldn't add this field to the schema and automatically switch to the global cursor when hitting the maximum number of partitions.

I'd prefer keeping this field hidden because it exposes internals of how the connector works that users shouldn't have to worry about

title: Whether to store cursor as one value instead of per partition
description: This setting optimizes performance when the parent stream has thousands of partitions by storing the cursor as a single value rather than per partition. Notably, the substream state is updated only at the end of the sync, which helps prevent data loss in case of a sync failure. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/incremental-syncs).
type: boolean
default: false
lookback_window:
title: Lookback Window
description: Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,16 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
"""

def __init__(
self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any
self,
date_time_based_cursor: DatetimeBasedCursor,
per_partition_cursor: Optional[PerPartitionCursor] = None,
is_global_substream_cursor: bool = False,
**kwargs: Any,
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._per_partition_cursor = per_partition_cursor
self.is_global_substream_cursor = is_global_substream_cursor

@property
def _cursor_field(self) -> str:
Expand Down Expand Up @@ -102,6 +107,10 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice)
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None

if self.is_global_substream_cursor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this predates this PR but it has impact on it so I think we should address this point regarding the method _get_state_value: Every time we will have a new type of cursor, we will need to think about going there and updating this method. This is even worst as if a dev want to implement their own cursor, they might need to have a modified version of the CDK because they could need to update this method.

I think the root cause of all this is that we've added a method select_state which basically expose the internals of the method and now since the internals are specific to the type of cursor, anyone consuming this methods needs to know about it.

Why don't we use Cursor.filter_records in filter_records instead? This would make us avoid the use of select_state and therefore this problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return stream_state.get("state", {}).get(self._cursor_field) # type: ignore # state is inside a dict for GlobalSubstreamCursor

return stream_state.get(self._cursor_field)

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor, ChildPartitionResumableFullRefreshCursor

__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor", "ResumableFullRefreshCursor", "ChildPartitionResumableFullRefreshCursor"]
__all__ = [
"CursorFactory",
"DatetimeBasedCursor",
"DeclarativeCursor",
"GlobalSubstreamCursor",
"PerPartitionCursor",
"ResumableFullRefreshCursor",
"ChildPartitionResumableFullRefreshCursor"
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import datetime
from dataclasses import InitVar, dataclass, field
from datetime import timedelta
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
Expand All @@ -15,7 +16,7 @@
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from isodate import Duration, parse_duration
from isodate import Duration, duration_isoformat, parse_duration


@dataclass
Expand Down Expand Up @@ -363,3 +364,17 @@ def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
return True
else:
return False

def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
"""
Updates the lookback window based on a given number of seconds if the new duration
is greater than the currently configured lookback window.

:param lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
"""
runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds))
config_lookback = parse_duration(self._lookback_window.eval(self.config) if self._lookback_window else "P0D")

# Check if the new runtime lookback window is greater than the current config lookback
if parse_duration(runtime_lookback_window) > config_lookback:
self._lookback_window = InterpolatedString.create(runtime_lookback_window, parameters={})
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import threading
import time
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState


class Timer:
"""
A simple timer class that measures elapsed time in seconds using a high-resolution performance counter.
"""

def __init__(self) -> None:
self._start: Optional[int] = None

def start(self) -> None:
self._start = time.perf_counter_ns()

def finish(self) -> int:
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
Comment on lines +27 to +28
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
if self._start != -1:
return (time.perf_counter_ns() - self._start) // 1_000_000_000

else:
Comment on lines +27 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
else:
if self._start is None:
return (time.perf_counter_ns() - self._start) // 1_000_000_000
def start(self) -> None:
self._start = time.perf_counter_ns()

raise RuntimeError("Global substream cursor timer not started")


class GlobalSubstreamCursor(DeclarativeCursor):
"""
The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor.
This class is beneficial for streams with many partitions, as it allows the state to be managed globally
instead of per partition, simplifying state management and reducing the size of state messages.

This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync.

Warnings:
- This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
- The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
- When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss.
"""

def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter):
self._stream_cursor = stream_cursor
self._partition_router = partition_router
self._timer = Timer()
self._lock = threading.Lock()
self._slice_semaphore = threading.Semaphore(0) # Start with 0, indicating no slices being tracked
self._all_slices_yielded = False
self._lookback_window: Optional[int] = None

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Generates stream slices, ensuring the last slice is properly flagged and processed.

This method creates a sequence of stream slices by iterating over partitions and cursor slices.
It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the
final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice`
is called only after all slices have been processed.

We expect the following events:
* Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
* Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
* Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
* Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
"""
previous_slice = None

slice_generator = (
StreamSlice(partition=partition, cursor_slice=cursor_slice)
for partition in self._partition_router.stream_slices()
for cursor_slice in self._stream_cursor.stream_slices()
)
self._timer.start()

for slice in slice_generator:
if previous_slice is not None:
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
# Release the semaphore to indicate that a slice has been yielded
self._slice_semaphore.release()
yield previous_slice

# Store the current slice as the previous slice for the next iteration
previous_slice = slice

# After all slices have been generated, release the semaphore one final time
# and flag that all slices have been yielded
self._slice_semaphore.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we have added a comment, but I'm still not sure it is sufficient to understand why we release. Should we be more explicit about why? It feels like it has something to do about not releasing on the first slice (see if previous_slice is not None at line 70) but it is not clear to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore is released every time before yielding the slice to increase the counter by one, indicating one more slice to process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I think by brain fart is that we set self._all_slices_yielded = True before we have actually yielded all the slices but the fact that we release the semaphore just before prevents close_slice to actually close the slice. Can we expand on the comment? This might be overkill but it feels like there are a lot of decision/sequencing of operation that would be worth making explicit. Something like:

We expect the following events:
* Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False`
* Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed
* Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
* Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, great comment. I will add it to the docstring

self._all_slices_yielded = True

# Yield the last slice
if previous_slice is not None:
yield previous_slice

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the initial state for the cursors.

This method initializes the state for the global cursor using the provided stream state.

Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.

Args:
stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
{
"state": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_state": {
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
},
"lookback_window": 132
}
"""
if not stream_state:
return

if "lookback_window" in stream_state:
self._lookback_window = stream_state["lookback_window"]
self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
bazarnov marked this conversation as resolved.
Show resolved Hide resolved

self._stream_cursor.set_initial_state(stream_state["state"])

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None:
"""
Modifies the stream cursor's lookback window based on the duration of the previous sync.
This adjustment ensures the cursor is set to the minimal lookback window necessary for
avoiding missing data.

Parameters:
lookback_window (int): The lookback duration in seconds to be set, derived from
the previous sync.

Raises:
ValueError: If the cursor does not support dynamic lookback window adjustments.
"""
if hasattr(self._stream_cursor, "set_runtime_lookback_window"):
self._stream_cursor.set_runtime_lookback_window(lookback_window)
else:
raise ValueError("The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method")

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
self._stream_cursor.observe(StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record)

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
"""
Close the current stream slice.

This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor
only after reading all slices. This ensures that we do not miss any child records from a later parent record
if the child cursor is earlier than a record from the first parent record.

Args:
stream_slice (StreamSlice): The stream slice to be closed.
*args (Any): Additional arguments.
"""
with self._lock:
self._slice_semaphore.acquire()
if self._all_slices_yielded and self._slice_semaphore._value == 0:
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
self._lookback_window = self._timer.finish()
self._stream_cursor.close_slice(StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args)

def get_stream_state(self) -> StreamState:
state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}

parent_state = self._partition_router.get_stream_state()
if parent_state:
state["parent_state"] = parent_state

if self._lookback_window is not None:
state["lookback_window"] = self._lookback_window

return state

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
# stream_slice is ignored as cursor is global
return self._stream_cursor.get_stream_state()

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
return self._partition_router.get_request_params( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
next_page_token=next_page_token,
) | self._stream_cursor.get_request_params(
stream_state=stream_state,
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
next_page_token=next_page_token,
)
else:
raise ValueError("A partition needs to be provided in order to get request params")

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
next_page_token=next_page_token,
) | self._stream_cursor.get_request_headers(
stream_state=stream_state,
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
next_page_token=next_page_token,
)
else:
raise ValueError("A partition needs to be provided in order to get request headers")

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Union[Mapping[str, Any], str]:
if stream_slice:
return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
next_page_token=next_page_token,
) | self._stream_cursor.get_request_body_data(
stream_state=stream_state,
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
next_page_token=next_page_token,
)
else:
raise ValueError("A partition needs to be provided in order to get request body data")

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if stream_slice:
return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping
stream_state=stream_state,
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
next_page_token=next_page_token,
) | self._stream_cursor.get_request_body_json(
stream_state=stream_state,
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
next_page_token=next_page_token,
)
else:
raise ValueError("A partition needs to be provided in order to get request body json")

def should_be_synced(self, record: Record) -> bool:
return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
return self._stream_cursor.is_greater_than_or_equal(
self._convert_record_to_cursor_record(first), self._convert_record_to_cursor_record(second)
)

@staticmethod
def _convert_record_to_cursor_record(record: Record) -> Record:
return Record(
record.data,
StreamSlice(partition={}, cursor_slice=record.associated_slice.cursor_slice) if record.associated_slice else None,
)
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,11 @@ class DatetimeBasedCursor(BaseModel):
description='Set to True if the target API does not accept queries where the start time equal the end time.',
title='Whether to skip requests if the start time equals the end time',
)
global_substream_cursor: Optional[bool] = Field(
False,
description='This setting optimizes performance when the parent stream has thousands of partitions by storing the cursor as a single value rather than per partition. Notably, the substream state is updated only at the end of the sync, which helps prevent data loss in case of a sync failure. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/incremental-syncs).',
title='Whether to store cursor as one value instead of per partition',
)
lookback_window: Optional[str] = Field(
None,
description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.',
Expand Down
Loading
Loading