Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): Add Global Parent State Cursor #39593

Merged
merged 33 commits into from
Sep 6, 2024

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Jun 19, 2024

What

Add the ability to sync streams with incremental parent streams by tracking the parent cursor as global instead of per partition. Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/8185

How

Introduce a new class GlobalParentCursor that tracks the state of the parent stream as a single cursor when the global_parent_cursor parameter is set for incremental sync.

Review guide

  1. airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  2. airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_parent_cursor.py
  3. airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  4. airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py

User Impact

The Tickets Comments stream that iterates over tickets can now be implemented for Zendesk Support, resolving Zendesk Support Issue #5254. The following connectors can migrate to low code without breaking changes:

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented Jun 19, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Sep 6, 2024 0:30am

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Jun 19, 2024
@tolik0 tolik0 requested a review from a team June 19, 2024 14:45
@tolik0 tolik0 force-pushed the tolik0/airbyte-cdk/add-global-parent-state-cursor branch from 62e9567 to e94e1b5 Compare July 9, 2024 13:51
@tolik0 tolik0 self-assigned this Jul 11, 2024
@tolik0 tolik0 force-pushed the tolik0/airbyte-cdk/add-global-parent-state-cursor branch from b7152de to b2fd84a Compare July 12, 2024 14:46
@tolik0 tolik0 marked this pull request as ready for review July 12, 2024 14:46
@tolik0 tolik0 requested review from girarda and brianjlai July 12, 2024 17:10
@tolik0
Copy link
Contributor Author

tolik0 commented Jul 16, 2024

Regression test for Jira:
All records were produced:
Screenshot from 2024-07-16 16-06-20
Screenshot from 2024-07-16 16-44-51

https://github.com/airbytehq/airbyte/actions/runs/9957227440/job/27508789617
As the parent state is incompatible with the new version, we set it as empty so that duplicate records may be produced during the first sync after the update.

@tolik0 tolik0 requested review from a team and removed request for a team July 17, 2024 09:21
@girarda girarda requested a review from artem1205 July 17, 2024 16:57
@@ -812,6 +812,11 @@ definitions:
description: Set to True if the target API does not accept queries where the start time equal the end time.
type: boolean
default: False
global_parent_cursor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try to speak the user's language here.

How about we call this
use_parent_cursor

title: Whether to use the parent stream cursor

description: If the parent stream cursor is kept in sync with this stream's, using the parent stream's cursor allows for more efficient storage of the connection state.

@lmossman thoughts on how make this most useful for users?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd modify this slightly:

use_parent_cursor:
  title: Use Parent Cursor
  description: Make syncs more efficient by using the parent stream's cursor for the connection state.\n\n**WARNING:** This only works if both the parent stream and this child stream are incremental and if the parent stream cursor is kept in sync with this child stream's cursor!

If that sounds correct to you.

Though thinking about this more, I almost feel like we should just have a single boolean field in the builder like Parent cursor is updated when child cursor is updated?. And if the user enables this, then we set both incremental_dependency and use_parent_cursor to true, since I think they would always want both enabled in that case, and that way the user doesn't really need to know about the internals.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, what if we just combined this global_parent_cursor functionality into the existing incremental_dependency field? Is there ever a use case to have one enabled and not the other?

Copy link
Contributor Author

@tolik0 tolik0 Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girarda, @lmossman That's not the correct explanation for this parameter. This is what incremental_dependency does. The global cursor is responsible for storing the state as one value instead of per partition.

These parameters can be used separately:

  • incremental_dependency: true + global_cursor: true: Incremental streams with dependency.
  • incremental_dependency: true + global_cursor: false: Per partition but saves state for efficiency, need a lookback.
  • incremental_dependency: false + global_cursor: true: Not a good idea, as we will always read the parent in the full refresh, and the state will be updated only at the end of the sync.
  • incremental_dependency: false + global_cursor: false: Per partition with full refresh parent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for this option?

incremental_dependency: true + global_cursor: false: Per partition but saves state for efficiency, need a lookback.

I thought that the requirement that updates to the child record resulted in updates to the parent record cursor was just for the incremental_dependency value, and if so it seems like global_cursor: true would always be desired in that situation, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case involves a stream without dependency between cursors. Adding incremental_dependency may lead to losing records but will help limit parent records, significantly speeding up the sync compared to using just PerPartition without parent state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in that use case, why would you ever want to have global_cursor: false? It seems like you would always want to have global_cursor: true if you are setting incremental_dependency: true, so that the state is saved more efficiently, right? I feel like I'm missing something here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we can use incremental_dependency without a global cursor to avoid using the lookback window. This approach may be more efficient if the number of partitions is small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, you said above:

incremental_dependency: true + global_cursor: false: Per partition but saves state for efficiency, need a lookback.

But just now you said

we can use incremental_dependency without a global cursor to avoid using the lookback window

Which seems contradictory.

It doesn't seem like this would be a straightforward decision for users to make, since I am also struggling to understand the use cases. Is there no way we can simplify this more?

Copy link
Contributor Author

@tolik0 tolik0 Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, I mixed up different cases. I've added more documentation for a better explanation. Here's a short recap:

  • Global Cursor: This should be used if there are many partitions, as we cannot store all partitions, so there's no reason to save only part of them.
  • Incremental Dependency: This should be used if there is a dependency between parent and child cursors. If we use this field without the dependency, we can miss new child records that were added to old parent records.

However, previously and possibly in the future, we used the incremental dependency approach for streams without dependency. Although this can lead to missing records, it provides a way to read these streams incrementally, significantly increasing performance.


class GlobalParentCursor(DeclarativeCursor):
"""
The GlobalParentCursor is designed to track the state of parent streams using a single global cursor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the description to make the constraints super clear? The main one I'm worried about is the fact that the parent and child cursors must be in sync

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency between the child and parent cursor is not a requirement for this class. The GlobalParentCursor can be used to read the parent stream in full refresh mode while using a global cursor for the child stream. This approach is particularly useful when the number of partitions is too large to store individually. By managing the state globally, we can efficiently handle a large number of partitions without exceeding storage limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a high level description of how this feature is used / when should a connector developer use global parent cursor?

I think it's worth documenting in the PR, in this docstring, and in the yaml schema file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: I see it's in the docs <3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added a note on how the state is updated

stream_slice (StreamSlice): The stream slice to be closed.
*args (Any): Additional arguments.
"""
if stream_slice.last_slice:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is stream_slice.last_slice defined? It doesn't seem to be a field on StreamSlice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GlobalCursorStreamSlice


return state

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment mentioning the stream_slice param is ignored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@@ -682,6 +684,12 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -
and model.retriever.partition_router
):
stream_slicer_model = model.retriever.partition_router

# if model.incremental_sync and hasattr(model.incremental_sync, "global_parent_cursor") and model.incremental_sync.global_parent_cursor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this commented code block still required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -53,12 +53,13 @@ def __ne__(self, other: object) -> bool:


class StreamSlice(Mapping[str, Any]):
def __init__(self, *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any]) -> None:
def __init__(self, *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], last_slice: bool = False) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is last_slice only useful for the global parent cursor? If yes, can we keep it scoped to the global parent cursor instead of introducing a new field?

The concern is that removing or correcting a field from public interfaces is very difficult so we should be conservative about what we add. This could be done by specializing the type of StreamSlice yielded by the global parent cursor.

Alternatively, if we think the field will be useful for other cases, I think we should promote the flag_last method up so the field is always properly set instead of only when using a global parent cursor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new class only for GlobalSubstreamCursor.

@tolik0 tolik0 force-pushed the tolik0/airbyte-cdk/add-global-parent-state-cursor branch from d235ac5 to 280abcf Compare August 1, 2024 13:23
@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Aug 1, 2024
@tolik0 tolik0 requested a review from girarda August 1, 2024 13:34
@tolik0 tolik0 force-pushed the tolik0/airbyte-cdk/add-global-parent-state-cursor branch from 280abcf to df0dd6f Compare August 1, 2024 13:43
@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Aug 1, 2024

def __init__(self, *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], last_slice: bool = False) -> None:
super().__init__(partition=partition, cursor_slice=cursor_slice)
self.last_slice = last_slice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is last_slice meant to be immutable? If yes, let's make it either a property or a getter so users can't modify it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -627,11 +628,12 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi
and hasattr(model.incremental_sync, "is_client_side_incremental")
and model.incremental_sync.is_client_side_incremental
):
if combined_slicers and not isinstance(combined_slicers, (DatetimeBasedCursor, PerPartitionCursor)):
if combined_slicers and not isinstance(combined_slicers, (DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionCursor)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit let's move this to a set of supported or default slicers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

),
partition_router=stream_slicer,
)
if hasattr(incremental_sync_model, "global_substream_cursor") and incremental_sync_model.global_substream_cursor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a problem specific with this PR, but as a general note: this factory code is getting pretty complicated and has some weird coupling scattered throughout the code. We'll probably need to revisit our approach soon-ish

stream_slice (StreamSlice): The stream slice to be closed.
*args (Any): Additional arguments.
"""
if isinstance(stream_slice, GlobalCursorStreamSlice) and stream_slice.last_slice:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we raise an exception if stream_slice is not of the right type? I think that would be indicative of a bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added raising an exception

### Usage Guidance
- **Use 'incremental_dependency'**: When the parent stream should be read incrementally. This option should only be used when the update or addition of a new child record updates the parent cursor so only the new data will be synced. If there is no dependency between the child and parent cursor, child records added to old parent records will be missed.

- **Use 'global_substream_cursor'**: When there are many partitions in the parent stream, and you want to manage the state globally instead of per partition. This simplifies state management and reduces the size of state messages. However, this should only be used with a lookback window to avoid missing records that were added during the sync.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there a risk of losing records if the sync fails?

Example:

  1. Sync partition A with timestamp = t1
  2. Fail before syncing partition B which has a timestamp = t2

On the next attempt, we won't try to sync Partition B

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as the child cursor is updated at the end of the sync.

@tolik0
Copy link
Contributor Author

tolik0 commented Aug 15, 2024

@tolik0 tolik0 force-pushed the tolik0/airbyte-cdk/add-global-parent-state-cursor branch from 629fdea to 015114c Compare September 6, 2024 12:07
@darynaishchenko
Copy link
Collaborator

darynaishchenko commented Sep 6, 2024

/approve-regression-tests

Check job output.

✅ Approving regression tests

Comment on lines +27 to +29
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
else:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
else:
if self._start is None:
return (time.perf_counter_ns() - self._start) // 1_000_000_000
def start(self) -> None:
self._start = time.perf_counter_ns()

Copy link

codeflash-ai bot commented Sep 6, 2024

⚡️ Codeflash found optimizations for this PR

📄 Timer.finish() in airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

📈 Performance improved by 49% (0.49x faster)

⏱️ Runtime went down from 14.5 milliseconds to 9.72 milliseconds

Explanation and details

To optimize the given code for performance, I will make a few modifications focused on eliminating unnecessary checks and improving attribute access. Here's an optimized version of your code.

The key changes are.

  1. Introduced a start method to explicitly set the starting time. This avoids the scenario where finish is called without starting the timer.
  2. Changed the division to 1_000_000_000 to follow the numeric constant format, which can be slightly more optimal.
  3. Kept the main logic but ensured the check for _start being None is done only once.

This code is slightly more efficient and avoids repeated checks during runtime.

Correctness verification

The new optimized code was tested for correctness. The results are listed below.

🔘 (none found) − ⚙️ Existing Unit Tests

✅ 10008 Passed − 🌀 Generated Regression Tests

(click to show generated tests)
# imports
import time
from typing import Optional

import pytest  # used for our unit tests
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import \
    Timer

# unit tests

# Basic Functionality
def test_timer_started_and_stopped_after_some_time():
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(2)  # wait for 2 seconds
    codeflash_output = timer.finish()  # should return approximately 2 seconds
    # Outputs were verified to be equal to the original implementation

def test_timer_started_and_stopped_immediately():
    timer = Timer()
    timer._start = time.perf_counter_ns()
    codeflash_output = timer.finish()  # should return 0 seconds as it's immediate
    # Outputs were verified to be equal to the original implementation

# Edge Cases
def test_timer_not_started():
    timer = Timer()
    with pytest.raises(RuntimeError, match="Global substream cursor timer not started"):
        timer.finish()  # should raise RuntimeError
    # Outputs were verified to be equal to the original implementation

def test_timer_started_multiple_times():
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(1)  # wait for 1 second
    timer._start = time.perf_counter_ns()  # start again
    time.sleep(1)  # wait for another second
    codeflash_output = timer.finish()  # should return approximately 1 second
    # Outputs were verified to be equal to the original implementation

def test_timer_started_and_stopped_with_minimal_delay():
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(0.000001)  # wait for a few microseconds
    codeflash_output = timer.finish()  # should return 0 seconds due to minimal delay
    # Outputs were verified to be equal to the original implementation

# Boundary Conditions
def test_timer_started_and_stopped_at_system_limits():
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(0.000001)  # wait for a very short time
    codeflash_output = timer.finish()  # should return 0 seconds due to minimal wait
    # Outputs were verified to be equal to the original implementation

# Error Handling
def test_exception_handling_when_timer_not_started():
    timer = Timer()
    with pytest.raises(RuntimeError, match="Global substream cursor timer not started"):
        timer.finish()  # should raise RuntimeError
    # Outputs were verified to be equal to the original implementation

# Performance and Scalability
def test_large_number_of_timer_instances():
    timers = [Timer() for _ in range(10000)]
    for timer in timers:
        timer._start = time.perf_counter_ns()
    time.sleep(1)  # wait for 1 second
    for timer in timers:
        codeflash_output = timer.finish()  # should return approximately 1 second
    # Outputs were verified to be equal to the original implementation

# Concurrency
import threading



def test_integration_with_other_time_dependent_functions():
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(2)  # wait for 2 seconds
    codeflash_output = timer.finish()  # should return approximately 2 seconds
    # Outputs were verified to be equal to the original implementation

# System Clock Changes (Note: This is a complex case and might not be easily testable in a unit test environment)
# def test_system_clock_adjustments():
#     timer = Timer()
#     timer._start = time.perf_counter_ns()
#     # Simulate system clock change here if possible
#     time.sleep(2)
#     assert timer.finish() == 2  # should return approximately 2 seconds

# To run the tests, use the command: pytest <name_of_this_file>.py

🔘 (none found) − ⏪ Replay Tests

@tolik0 tolik0 merged commit 2fa35ab into master Sep 6, 2024
32 of 35 checks passed
@tolik0 tolik0 deleted the tolik0/airbyte-cdk/add-global-parent-state-cursor branch September 6, 2024 13:44
Comment on lines +27 to +28
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
if self._start != -1:
return (time.perf_counter_ns() - self._start) // 1_000_000_000

Copy link

codeflash-ai bot commented Sep 6, 2024

⚡️ Codeflash found optimizations for this PR

📄 Timer.finish() in airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

📈 Performance improved by 80% (0.80x faster)

⏱️ Runtime went down from 46.7 microseconds to 26.0 microseconds

Explanation and details

Certainly! Here is the optimized version of the Timer class for faster performance.

  1. Eliminate the use of Optional for _start. Using just int is more efficient.
  2. Simplify the condition check for _start.

Here is the rewritten version.

Changes made.

  • Replaced Optional[int] with int and default _start to -1.
  • Removed unnecessary type conversion to int during division in the finish method.

These changes improve efficiency by reducing the complexity of type checking and conversions, thus making the program faster.

Correctness verification

The new optimized code was tested for correctness. The results are listed below.

🔘 (none found) − ⚙️ Existing Unit Tests

✅ 12 Passed − 🌀 Generated Regression Tests

(click to show generated tests)
# imports
import threading
import time
# function to test
from typing import Optional

import pytest  # used for our unit tests
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import \
    Timer

# unit tests

def test_basic_functionality_1_second():
    """Test basic functionality with a 1-second wait"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(1)
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_basic_functionality_2_seconds():
    """Test basic functionality with a 2-second wait"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(2)
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_timer_not_started():
    """Test that calling finish without starting the timer raises RuntimeError"""
    timer = Timer()
    with pytest.raises(RuntimeError, match="Global substream cursor timer not started"):
        timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_immediate_finish_call():
    """Test that calling finish immediately after starting the timer returns 0"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_subsecond_precision():
    """Test that a sub-second wait rounds down to 0 seconds"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(0.5)
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_boundary_condition_just_under_1_second():
    """Test that a wait just under 1 second returns 0 seconds"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(0.999)
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_boundary_condition_just_over_1_second():
    """Test that a wait just over 1 second returns 1 second"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(1.001)
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_multiple_timer_instances():
    """Test that multiple Timer instances operate independently"""
    timer1 = Timer()
    timer2 = Timer()
    timer1._start = time.perf_counter_ns()
    time.sleep(1)
    timer2._start = time.perf_counter_ns()
    time.sleep(1)
    codeflash_output = timer1.finish()
    codeflash_output = timer2.finish()
    # Outputs were verified to be equal to the original implementation


def test_high_frequency_calls():
    """Test that high frequency calls to finish are handled correctly"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    for _ in range(1000000):
        pass
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_invalid_state_transitions():
    """Test that calling finish twice without restarting raises RuntimeError"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    time.sleep(1)
    codeflash_output = timer.finish()
    with pytest.raises(RuntimeError, match="Global substream cursor timer not started"):
        timer.finish()
    # Outputs were verified to be equal to the original implementation

def test_stress_testing():
    """Test the timer under stress with a large number of operations"""
    timer = Timer()
    timer._start = time.perf_counter_ns()
    for _ in range(1000000):
        pass
    codeflash_output = timer.finish()
    # Outputs were verified to be equal to the original implementation

🔘 (none found) − ⏪ Replay Tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants