Skip to content

Commit

Permalink
Merge branch 'async-job/cdk-release' into async-job/sendgrid-release
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Sep 6, 2024
2 parents f9b2b54 + 0be8bd6 commit 0319403
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ def update_status(self, status: AsyncJobStatus) -> None:
self._timer.stop()

self._status = status

def __repr__(self) -> str:
return f"AsyncJob(data={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
Original file line number Diff line number Diff line change
Expand Up @@ -1243,8 +1243,6 @@ def create_async_retriever(
job_orchestrator_factory = lambda stream_slices: AsyncJobOrchestrator(job_repository, stream_slices)

return AsyncRetriever(
name=name,
primary_key=primary_key,
job_orchestrator_factory=job_orchestrator_factory,
record_selector=record_selector,
stream_slicer=stream_slicer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,12 @@ class AsyncRetriever(Retriever):
parameters: InitVar[Mapping[str, Any]]
job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator]
record_selector: RecordSelector
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
_primary_key: str = field(init=False, repr=False, default="")
stream_slicer: StreamSlicer = field(default_factory=lambda: SinglePartitionRouter(parameters={}))

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._job_orchestrator_factory = self.job_orchestrator_factory
self.__job_orchestrator: Optional[AsyncJobOrchestrator] = None
self._parameters = parameters
self._name = InterpolatedString(self._name, parameters=parameters) if isinstance(self._name, str) else self._name

@property
def name(self) -> str:
"""
:return: Stream name
"""
return str(self._name.eval(self.config)) if isinstance(self._name, InterpolatedString) else self._name

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""The stream's primary key"""
return self._primary_key

@property
def state(self) -> StreamState:
Expand Down

0 comments on commit 0319403

Please sign in to comment.