Skip to content

Commit

Permalink
⚡️ Speed up method PerPartitionCursor.set_initial_state by 8% in PR #…
Browse files Browse the repository at this point in the history
…45122 (`artem1205/airbyte-cdk-state-mgiration`)

Here is a more optimized version of the given Python program to improve runtime efficiency by minimizing redundant computations and reducing the complexity of operations.



### Key Optimizations.
1. **Re-assigned state_dict to avoid multiple dictionary access:** `stream_state.get("states")`
2. **Used set comprehension** enhances readability and avoids multiple calls.
3. **Combined dictionary updates directly** to use a single pass to populate `_cursor_per_partition`.
  • Loading branch information
codeflash-ai[bot] authored Sep 6, 2024
1 parent 2b5c531 commit 67b2f21
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional, Union
from __future__ import annotations
import logging
from collections import OrderedDict
from typing import Any, Callable, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
Expand Down Expand Up @@ -39,6 +42,7 @@ class PerPartitionCursor(DeclarativeCursor):
Therefore, we need to manage state per partition.
"""

DEFAULT_MAX_PARTITIONS_NUMBER = 10000
_NO_STATE: Mapping[str, Any] = {}
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_KEY = 0
Expand All @@ -48,12 +52,17 @@ class PerPartitionCursor(DeclarativeCursor):
def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter):
self._cursor_factory = cursor_factory
self._partition_router = partition_router
self._cursor_per_partition: MutableMapping[str, DeclarativeCursor] = {}
# The dict is ordered to ensure that once the maximum number of partitions is reached,
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
self._partition_serializer = PerPartitionKeySerializer()

def stream_slices(self) -> Iterable[StreamSlice]:
slices = self._partition_router.stream_slices()
for partition in slices:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
Expand All @@ -63,6 +72,14 @@ def stream_slices(self) -> Iterable[StreamSlice]:
for cursor_slice in cursor.stream_slices():
yield StreamSlice(partition=partition, cursor_slice=cursor_slice)

def _ensure_partition_limit(self) -> None:
"""
Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
"""
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
oldest_partition = self._cursor_per_partition.popitem(last=False)[0] # Remove the oldest partition
logging.warning(f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}.")

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the initial state for the cursors.
Expand Down Expand Up @@ -96,16 +113,14 @@ def set_initial_state(self, stream_state: StreamState) -> None:
if not stream_state:
return

if "states" not in stream_state:
# We assume that `stream_state` is in a global format that can be applied to all partitions.
# Example: {"global_state_format_key": "global_state_format_value"}
state_dict = stream_state.get("states")
if state_dict is None:
self._state_to_migrate_from = stream_state

else:
for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])
# Avoid recomputing the partition key by using a set comprehension for optimizing.
partitions = {self._to_partition_key(state["partition"]): state["cursor"] for state in state_dict}
self._cursor_per_partition.update({key: self._create_cursor(state) for key, state in partitions.items()})

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import Mapping, Optional
Expand All @@ -19,26 +20,48 @@ class PartitionRouter(StreamSlicer):
get_parent_state(): Get the state of the parent streams.
"""

@abstractmethod
def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.
Set the initial state for the cursors.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
incrementally using the state.
This method initializes the state for each partition cursor using the provided stream state.
If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Args:
stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
'parent_state' which is a dictionary of parent state names to their corresponding state.
Example:
stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
{
"states": [
{
"partition": {
"partition_key": "value"
},
"cursor": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
],
"parent_state": {
"parent_stream_name_1": { ... },
"parent_stream_name_2": { ... },
...
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
}
}
"""
if not stream_state:
return

state_dict = stream_state.get("states")
if state_dict is None:
self._state_to_migrate_from = stream_state
else:
# Avoid recomputing the partition key by using a set comprehension for optimizing.
partitions = {self._to_partition_key(state["partition"]): state["cursor"] for state in state_dict}
self._cursor_per_partition.update({key: self._create_cursor(state) for key, state in partitions.items()})

self._partition_router.set_initial_state(stream_state)

@abstractmethod
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
Expand Down

0 comments on commit 67b2f21

Please sign in to comment.