diff --git a/airbyte-cdk/python/airbyte_cdk/logger.py b/airbyte-cdk/python/airbyte_cdk/logger.py index da3f869f2f32..59d4d7dd68d3 100644 --- a/airbyte-cdk/python/airbyte_cdk/logger.py +++ b/airbyte-cdk/python/airbyte_cdk/logger.py @@ -5,7 +5,7 @@ import json import logging import logging.config -from typing import Any, Mapping, Optional, Tuple +from typing import Any, Callable, Mapping, Optional, Tuple from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteMessageSerializer, Level, Type from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets @@ -38,6 +38,14 @@ def init_logger(name: Optional[str] = None) -> logging.Logger: return logger +def lazy_log(logger: logging.Logger, level: int, lazy_log_provider: Callable[[], str]) -> None: + """ + This method ensure that the processing of the log message is only done if the logger is enabled for the log level. + """ + if logger.isEnabledFor(level): + logger.log(level, lazy_log_provider()) + + class AirbyteLogFormatter(logging.Formatter): """Output log records using AirbyteMessage""" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py new file mode 100644 index 000000000000..09a527b0bcb0 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py @@ -0,0 +1,50 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +from datetime import timedelta +from typing import Optional + +from airbyte_cdk import StreamSlice +from airbyte_cdk.sources.declarative.async_job.timer import Timer + +from .status import AsyncJobStatus + + +class AsyncJob: + """ + Description of an API job. + + Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for + it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status. + """ + + def __init__(self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None) -> None: + self._api_job_id = api_job_id + self._job_parameters = job_parameters + self._status = AsyncJobStatus.RUNNING + + timeout = timeout if timeout else timedelta(minutes=60) + self._timer = Timer(timeout) + self._timer.start() + + def api_job_id(self) -> str: + return self._api_job_id + + def status(self) -> AsyncJobStatus: + if self._timer.has_timed_out(): + return AsyncJobStatus.TIMED_OUT + return self._status + + def job_parameters(self) -> StreamSlice: + return self._job_parameters + + def update_status(self, status: AsyncJobStatus) -> None: + if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING: + self._timer.start() + elif status.is_terminal(): + self._timer.stop() + + self._status = status + + def __repr__(self) -> str: + return f"AsyncJob(data={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py new file mode 100644 index 000000000000..ec2bafdde2f1 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -0,0 +1,241 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import logging +import time +from typing import Any, Generator, Iterable, List, Mapping, Optional, Set + +from airbyte_cdk import StreamSlice +from airbyte_cdk.logger import lazy_log +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.declarative.async_job.job import AsyncJob +from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository +from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +LOGGER = logging.getLogger("airbyte") + + +class AsyncPartition: + """ + This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs + """ + + _MAX_NUMBER_OF_ATTEMPTS = 3 + + def __init__(self, jobs: List[AsyncJob], stream_slice: StreamSlice) -> None: + self._attempts_per_job = {job: 0 for job in jobs} + self._stream_slice = stream_slice + + def has_reached_max_attempt(self) -> bool: + return any(map(lambda attempt_count: attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS, self._attempts_per_job.values())) + + def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: + current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) + if current_attempt_count is None: + raise ValueError("Could not find job to replace") + elif current_attempt_count >= self._MAX_NUMBER_OF_ATTEMPTS: + raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") + + new_attempt_count = current_attempt_count + 1 + for job in new_jobs: + self._attempts_per_job[job] = new_attempt_count + + def should_split(self, job: AsyncJob) -> bool: + """ + Not used right now but once we support job split, we should split based on the number of attempts + """ + return False + + @property + def jobs(self) -> Iterable[AsyncJob]: + return self._attempts_per_job.keys() + + @property + def stream_slice(self) -> StreamSlice: + return self._stream_slice + + @property + def status(self) -> AsyncJobStatus: + """ + Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed. + """ + statuses = set(map(lambda job: job.status(), self.jobs)) + if statuses == {AsyncJobStatus.COMPLETED}: + return AsyncJobStatus.COMPLETED + elif AsyncJobStatus.FAILED in statuses: + return AsyncJobStatus.FAILED + elif AsyncJobStatus.TIMED_OUT in statuses: + return AsyncJobStatus.TIMED_OUT + else: + return AsyncJobStatus.RUNNING + + def __repr__(self) -> str: + return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})" + + +class AsyncJobOrchestrator: + _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5 + + def __init__( + self, + job_repository: AsyncJobRepository, + slices: Iterable[StreamSlice], + number_of_retries: Optional[int] = None, + ) -> None: + self._job_repository: AsyncJobRepository = job_repository + self._slice_iterator = iter(slices) + self._running_partitions: List[AsyncPartition] = [] + + def _replace_failed_jobs(self, partition: AsyncPartition) -> None: + failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) + jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] + for job in jobs_to_replace: + new_job = self._job_repository.start(job.job_parameters()) + partition.replace_job(job, [new_job]) + + def _start_jobs(self) -> None: + """ + Retry failed jobs and start jobs for each slice in the slice iterator. + This method iterates over the running jobs and slice iterator and starts a job for each slice. + The started jobs are added to the running partitions. + Returns: + None + + TODO Eventually, we need to cap the number of concurrent jobs. + However, the first iteration is for sendgrid which only has one job. + """ + for partition in self._running_partitions: + self._replace_failed_jobs(partition) + + for _slice in self._slice_iterator: + job = self._job_repository.start(_slice) + self._running_partitions.append(AsyncPartition([job], _slice)) + + def _get_running_jobs(self) -> Set[AsyncJob]: + """ + Returns a set of running AsyncJob objects. + + Returns: + Set[AsyncJob]: A set of AsyncJob objects that are currently running. + """ + return {job for partition in self._running_partitions for job in partition.jobs if job.status() == AsyncJobStatus.RUNNING} + + def _update_jobs_status(self) -> None: + """ + Update the status of all running jobs in the repository. + """ + running_jobs = self._get_running_jobs() + if running_jobs: + # update the status only if there are RUNNING jobs + self._job_repository.update_jobs_status(running_jobs) + + def _wait_on_status_update(self) -> None: + """ + Waits for a specified amount of time between status updates. + + + This method is used to introduce a delay between status updates in order to avoid excessive polling. + The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`. + + Returns: + None + """ + lazy_log( + LOGGER, + logging.DEBUG, + lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.", + ) + + # wait only when there are running partitions + if self._running_partitions: + lazy_log( + LOGGER, + logging.DEBUG, + lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...", + ) + time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS) + + def _process_completed_partition(self, partition: AsyncPartition) -> None: + """ + Process a completed partition. + Args: + partition (AsyncPartition): The completed partition to process. + """ + job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) + LOGGER.info(f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}.") + + def _process_running_partitions_and_yield_completed_ones(self) -> Generator[AsyncPartition, Any, None]: + """ + Process the running partitions. + + Yields: + AsyncPartition: The processed partition. + + Raises: + Any: Any exception raised during processing. + """ + current_running_partitions: List[AsyncPartition] = [] + for partition in self._running_partitions: + match partition.status: + case AsyncJobStatus.COMPLETED: + self._process_completed_partition(partition) + yield partition + case AsyncJobStatus.RUNNING: + current_running_partitions.append(partition) + case _ if partition.has_reached_max_attempt(): + self._process_partitions_with_errors(partition) + case _: + # job will be restarted in `_start_job` + current_running_partitions.insert(0, partition) + # update the referenced list with running partitions + self._running_partitions = current_running_partitions + + def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: + """ + Process a partition with status errors (FAILED and TIMEOUT). + + Args: + partition (AsyncPartition): The partition to process. + Returns: + AirbyteTracedException: An exception indicating that at least one job could not be completed. + Raises: + AirbyteTracedException: If at least one job could not be completed. + """ + status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} + raise AirbyteTracedException( + message=f"At least one job could not be completed. Job statuses were: {status_by_job_id}", + failure_type=FailureType.system_error, + ) + + def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: + """ + Creates and retrieves completed partitions. + This method continuously starts jobs, updates job status, processes running partitions, + logs polling partitions, and waits for status updates. It yields completed partitions + as they become available. + + Returns: + An iterable of completed partitions, represented as AsyncPartition objects. + Each partition is wrapped in an Optional, allowing for None values. + """ + while True: + self._start_jobs() + if not self._running_partitions: + break + + self._update_jobs_status() + yield from self._process_running_partitions_and_yield_completed_ones() + self._wait_on_status_update() + + def fetch_records(self, partition: AsyncPartition) -> Iterable[Mapping[str, Any]]: + """ + Fetches records from the given partition's jobs. + + Args: + partition (AsyncPartition): The partition containing the jobs. + + Yields: + Iterable[Mapping[str, Any]]: The fetched records from the jobs. + """ + for job in partition.jobs: + yield from self._job_repository.fetch_records(job) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py new file mode 100644 index 000000000000..2880fea16333 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py @@ -0,0 +1,21 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +from abc import abstractmethod +from typing import Any, Iterable, Mapping, Set + +from airbyte_cdk import StreamSlice +from airbyte_cdk.sources.declarative.async_job.job import AsyncJob + + +class AsyncJobRepository: + @abstractmethod + def start(self, stream_slice: StreamSlice) -> AsyncJob: + pass + + @abstractmethod + def update_jobs_status(self, jobs: Set[AsyncJob]) -> None: + pass + + @abstractmethod + def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/status.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/status.py new file mode 100644 index 000000000000..586e79889ca1 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/status.py @@ -0,0 +1,24 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +from enum import Enum + +_TERMINAL = True + + +class AsyncJobStatus(Enum): + RUNNING = ("RUNNING", not _TERMINAL) + COMPLETED = ("COMPLETED", _TERMINAL) + FAILED = ("FAILED", _TERMINAL) + TIMED_OUT = ("TIMED_OUT", _TERMINAL) + + def __init__(self, value: str, is_terminal: bool) -> None: + self._value = value + self._is_terminal = is_terminal + + def is_terminal(self) -> bool: + """ + A status is terminal when a job status can't be updated anymore. For example if a job is completed, it will stay completed but a + running job might because completed, failed or timed out. + """ + return self._is_terminal diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/timer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/timer.py new file mode 100644 index 000000000000..c4e5a9a1d85a --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/timer.py @@ -0,0 +1,39 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +from datetime import datetime, timedelta, timezone +from typing import Optional + + +class Timer: + def __init__(self, timeout: timedelta) -> None: + self._start_datetime: Optional[datetime] = None + self._end_datetime: Optional[datetime] = None + self._timeout = timeout + + def start(self) -> None: + self._start_datetime = self._now() + self._end_datetime = None + + def stop(self) -> None: + if self._end_datetime is None: + self._end_datetime = self._now() + + def is_started(self) -> bool: + return self._start_datetime is not None + + @property + def elapsed_time(self) -> Optional[timedelta]: + if not self._start_datetime: + return None + + end_time = self._end_datetime or self._now() + elapsed_period = end_time - self._start_datetime + return elapsed_period + + def has_timed_out(self) -> bool: + if not self.is_started(): + return False + return self.elapsed_time > self._timeout # type: ignore # given the job timer is started, we assume there is an elapsed_period + + @staticmethod + def _now() -> datetime: + return datetime.now(tz=timezone.utc) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f05c342fa68c..9c0ea7876a93 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1156,6 +1156,7 @@ definitions: title: Retriever description: Component used to coordinate how records are extracted across stream slices and request pages. anyOf: + - "$ref": "#/definitions/AsyncRetriever" - "$ref": "#/definitions/CustomRetriever" - "$ref": "#/definitions/SimpleRetriever" incremental_sync: @@ -1194,6 +1195,7 @@ definitions: - "$ref": "#/definitions/AddFields" - "$ref": "#/definitions/CustomTransformation" - "$ref": "#/definitions/RemoveFields" + - "$ref": "#/definitions/KeysToLower" state_migrations: title: State Migrations description: Array of state migrations to be applied on the input state @@ -1690,6 +1692,19 @@ definitions: type: type: string enum: [JsonlDecoder] + KeysToLower: + title: Keys to Lower Case + description: A transformation that renames all keys to lower case. + type: object + required: + - type + properties: + type: + type: string + enum: [KeysToLower] + $parameters: + type: object + additionalProperties: true IterableDecoder: title: Iterable Decoder description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key. @@ -2346,6 +2361,104 @@ definitions: $parameters: type: object additionalProperties: true + AsyncJobStatusMap: + description: Matches the api job status to Async Job Status. + type: object + required: + - running + - completed + - failed + - timeout + properties: + type: + type: string + enum: [AsyncJobStatusMap] + running: + type: array + items: + type: string + completed: + type: array + items: + type: string + failed: + type: array + items: + type: string + timeout: + type: array + items: + type: string + AsyncRetriever: + description: Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router. + type: object + required: + - type + - record_selector + - status_mapping + - creation_requester + - polling_requester + - download_requester + properties: + type: + type: string + enum: [AsyncRetriever] + record_selector: + description: Component that describes how to extract records from a HTTP response. + "$ref": "#/definitions/RecordSelector" + status_mapping: + description: Async Job Status to Airbyte CDK Async Job Status mapping. + anyOf: + - "$ref": "#/definitions/AsyncJobStatusMap" + status_extractor: + description: Responsible for fetching the actual status of the async job. + anyOf: + - "$ref": "#/definitions/CustomRecordExtractor" + - "$ref": "#/definitions/DpathExtractor" + urls_extractor: + description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job. + anyOf: + - "$ref": "#/definitions/CustomRecordExtractor" + - "$ref": "#/definitions/DpathExtractor" + creation_requester: + description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job. + anyOf: + - "$ref": "#/definitions/CustomRequester" + - "$ref": "#/definitions/HttpRequester" + polling_requester: + description: Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job. + anyOf: + - "$ref": "#/definitions/CustomRequester" + - "$ref": "#/definitions/HttpRequester" + download_requester: + description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job. + anyOf: + - "$ref": "#/definitions/CustomRequester" + - "$ref": "#/definitions/HttpRequester" + partition_router: + title: Partition Router + description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing. + default: [] + anyOf: + - "$ref": "#/definitions/CustomPartitionRouter" + - "$ref": "#/definitions/ListPartitionRouter" + - "$ref": "#/definitions/SubstreamPartitionRouter" + - type: array + items: + anyOf: + - "$ref": "#/definitions/CustomPartitionRouter" + - "$ref": "#/definitions/ListPartitionRouter" + - "$ref": "#/definitions/SubstreamPartitionRouter" + decoder: + title: Decoder + description: Component decoding the response so records can be extracted. + anyOf: + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" + - "$ref": "#/definitions/IterableDecoder" + $parameters: + type: object + additionalProperties: true Spec: title: Spec description: A source specification made up of connector metadata and how it can be configured. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/__init__.py index 5c361598d351..76304b467f43 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -6,5 +6,6 @@ from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ResponseToFileExtractor -__all__ = ["HttpSelector", "DpathExtractor", "RecordFilter", "RecordSelector"] +__all__ = ["HttpSelector", "DpathExtractor", "RecordFilter", "RecordSelector", "ResponseToFileExtractor"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py index e70ac150564c..905477a6c6d9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py @@ -3,14 +3,12 @@ # from abc import abstractmethod -from dataclasses import dataclass from typing import Any, Iterable, Mapping, Optional import requests from airbyte_cdk.sources.types import Record, StreamSlice, StreamState -@dataclass class HttpSelector: """ Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py index 6f9cc4047838..eed33d858228 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -61,6 +61,24 @@ def select_records( :return: List of Records selected from the response """ all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) + yield from self.filter_and_transform(all_data, stream_state, records_schema, stream_slice, next_page_token) + + def filter_and_transform( + self, + all_data: Iterable[Mapping[str, Any]], + stream_state: StreamState, + records_schema: Mapping[str, Any], + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Iterable[Record]: + """ + There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and + normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests + library). + + Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could + share the logic of doing transformations on a set of records. + """ filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token) transformed_data = self._transform(filtered_data, stream_state, stream_slice) normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema) @@ -101,6 +119,5 @@ def _transform( ) -> Iterable[Mapping[str, Any]]: for record in records: for transformation in self.transformations: - # record has type Mapping[str, Any], but Record expected - transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore + transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected yield record diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py new file mode 100644 index 000000000000..a177ee65240f --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -0,0 +1,162 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import logging +import os +import uuid +import zlib +from contextlib import closing +from typing import Any, Dict, Iterable, Mapping, Optional, Tuple + +import pandas as pd +import requests +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor +from numpy import nan + +EMPTY_STR: str = "" +DEFAULT_ENCODING: str = "utf-8" +DOWNLOAD_CHUNK_SIZE: int = 1024 * 1024 * 10 + + +class ResponseToFileExtractor(RecordExtractor): + """ + This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as + a tradeoff. + + Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for + a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. + """ + + def __init__(self) -> None: + self.logger = logging.getLogger("airbyte") + + def _get_response_encoding(self, headers: Dict[str, Any]) -> str: + """ + Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library + implementation. + + Args: + headers (Dict[str, Any]): The headers of the response. + Returns: + str: The encoding of the response. + """ + + content_type = headers.get("content-type") + + if not content_type: + return DEFAULT_ENCODING + + content_type, params = requests.utils.parse_header_links(content_type) + + if "charset" in params: + return params["charset"].strip("'\"") # type: ignore # we assume headers are returned as str + + return DEFAULT_ENCODING + + def _filter_null_bytes(self, b: bytes) -> bytes: + """ + Filter out null bytes from a bytes object. + + Args: + b (bytes): The input bytes object. + Returns: + bytes: The filtered bytes object with null bytes removed. + + Referenced Issue: + https://github.com/airbytehq/airbyte/issues/8300 + """ + + res = b.replace(b"\x00", b"") + if len(res) < len(b): + self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)) + return res + + def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: + """ + Saves the binary data from the given response to a temporary file and returns the filepath and response encoding. + + Args: + response (Optional[requests.Response]): The response object containing the binary data. Defaults to None. + + Returns: + Tuple[str, str]: A tuple containing the filepath of the temporary file and the response encoding. + + Raises: + ValueError: If the temporary file does not exist after saving the binary data. + """ + # set filepath for binary data from response + decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32) + needs_decompression = True # we will assume at first that the response is compressed and change the flag if not + + tmp_file = str(uuid.uuid4()) + with closing(response) as response, open(tmp_file, "wb") as data_file: + response_encoding = self._get_response_encoding(dict(response.headers or {})) + for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): + try: + if needs_decompression: + data_file.write(decompressor.decompress(chunk)) + needs_decompression = True + else: + data_file.write(self._filter_null_bytes(chunk)) + except zlib.error: + data_file.write(self._filter_null_bytes(chunk)) + needs_decompression = False + + # check the file exists + if os.path.isfile(tmp_file): + return tmp_file, response_encoding + else: + raise ValueError(f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist.") + + def _read_with_chunks(self, path: str, file_encoding: str, chunk_size: int = 100) -> Iterable[Mapping[str, Any]]: + """ + Reads data from a file in chunks and yields each row as a dictionary. + + Args: + path (str): The path to the file to be read. + file_encoding (str): The encoding of the file. + chunk_size (int, optional): The size of each chunk to be read. Defaults to 100. + + Yields: + Mapping[str, Any]: A dictionary representing each row of data. + + Raises: + ValueError: If an IO/Error occurs while reading the temporary data. + """ + + try: + with open(path, "r", encoding=file_encoding) as data: + chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object) + for chunk in chunks: + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for row in chunk: + yield row + except pd.errors.EmptyDataError as e: + self.logger.info(f"Empty data received. {e}") + yield from [] + except IOError as ioe: + raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe) + finally: + # remove binary tmp file, after data is read + os.remove(path) + + def extract_records(self, response: Optional[requests.Response] = None) -> Iterable[Mapping[str, Any]]: + """ + Extracts records from the given response by: + 1) Saving the result to a tmp file + 2) Reading from saved file by chunks to avoid OOM + + Args: + response (Optional[requests.Response]): The response object containing the data. Defaults to None. + + Yields: + Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. + + Returns: + None + """ + if response: + file_path, encoding = self._save_to_file(response) + yield from self._read_with_chunks(file_path, encoding) + else: + yield from [] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2af4b5a0ac26..fb42fb59a7c2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -613,6 +613,11 @@ class JsonlDecoder(BaseModel): type: Literal['JsonlDecoder'] +class KeysToLower(BaseModel): + type: Literal['KeysToLower'] + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') + + class IterableDecoder(BaseModel): type: Literal['IterableDecoder'] @@ -881,6 +886,14 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') +class AsyncJobStatusMap(BaseModel): + type: Optional[Literal['AsyncJobStatusMap']] = None + running: List[str] + completed: List[str] + failed: List[str] + timeout: List[str] + + class ValueType(Enum): string = 'string' number = 'number' @@ -1360,7 +1373,7 @@ class Config: extra = Extra.allow type: Literal['DeclarativeStream'] - retriever: Union[CustomRetriever, SimpleRetriever] = Field( + retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( ..., description='Component used to coordinate how records are extracted across stream slices and request pages.', title='Retriever', @@ -1386,7 +1399,7 @@ class Config: title='Schema Loader', ) transformations: Optional[ - List[Union[AddFields, CustomTransformation, RemoveFields]] + List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] ] = Field( None, description='A list of transformations to be applied to each output record.', @@ -1612,6 +1625,58 @@ class SimpleRetriever(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') +class AsyncRetriever(BaseModel): + type: Literal['AsyncRetriever'] + record_selector: RecordSelector = Field( + ..., + description='Component that describes how to extract records from a HTTP response.', + ) + status_mapping: AsyncJobStatusMap = Field( + ..., description='Async Job Status to Airbyte CDK Async Job Status mapping.' + ) + status_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field( + None, description='Responsible for fetching the actual status of the async job.' + ) + urls_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field( + None, + description='Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.', + ) + creation_requester: Union[CustomRequester, HttpRequester] = Field( + ..., + description='Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.', + ) + polling_requester: Union[CustomRequester, HttpRequester] = Field( + ..., + description='Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.', + ) + download_requester: Union[CustomRequester, HttpRequester] = Field( + ..., + description='Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.', + ) + partition_router: Optional[ + Union[ + CustomPartitionRouter, + ListPartitionRouter, + SubstreamPartitionRouter, + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], + ] + ] = Field( + [], + description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.', + title='Partition Router', + ) + decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder]] = Field( + None, + description='Component decoding the response so records can be extracted.', + title='Decoder', + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') + + class SubstreamPartitionRouter(BaseModel): type: Literal['SubstreamPartitionRouter'] parent_stream_configs: List[ParentStreamConfig] = Field( @@ -1628,3 +1693,4 @@ class SubstreamPartitionRouter(BaseModel): DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() SimpleRetriever.update_forward_refs() +AsyncRetriever.update_forward_refs() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdef2551153b..45ef5388c65d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -11,6 +11,9 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Type, Union, get_args, get_origin, get_type_hints from airbyte_cdk.models import FailureType, Level +from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator +from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository +from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator, NoAuth from airbyte_cdk.sources.declarative.auth.jwt import JwtAlgorithm @@ -46,6 +49,8 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import AddedFieldDefinition as AddedFieldDefinitionModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import AddFields as AddFieldsModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import ApiKeyAuthenticator as ApiKeyAuthenticatorModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import AsyncJobStatusMap as AsyncJobStatusMapModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import AsyncRetriever as AsyncRetrieverModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import BasicHttpAuthenticator as BasicHttpAuthenticatorModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import BearerAuthenticator as BearerAuthenticatorModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CheckStream as CheckStreamModel @@ -82,6 +87,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import JwtAuthenticator as JwtAuthenticatorModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import JwtHeaders as JwtHeadersModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import JwtPayload as JwtPayloadModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import KeysToLower as KeysToLowerModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel, ) @@ -124,6 +130,7 @@ WaitTimeFromHeaderBackoffStrategy, WaitUntilTimeFromHeaderBackoffStrategy, ) +from airbyte_cdk.sources.declarative.requesters.http_job_repository import AsyncHttpJobRepository from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, NoPagination, PaginatorTestReadDecorator from airbyte_cdk.sources.declarative.requesters.paginators.strategies import ( CursorPaginationStrategy, @@ -136,12 +143,13 @@ from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod -from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator +from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever, SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader from airbyte_cdk.sources.declarative.spec import Spec from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import KeysToLowerTransformation from airbyte_cdk.sources.message import InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config @@ -208,6 +216,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, JwtAuthenticatorModel: self.create_jwt_authenticator, @@ -232,6 +241,7 @@ def _init_mappings(self) -> None: SubstreamPartitionRouterModel: self.create_substream_partition_router, WaitTimeFromHeaderModel: self.create_wait_time_from_header, WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header, + AsyncRetrieverModel: self.create_async_retriever, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -291,6 +301,9 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any ] return AddFields(fields=added_field_definitions, parameters=model.parameters or {}) + def create_keys_to_lower_transformation(self, model: KeysToLowerModel, config: Config, **kwargs: Any) -> KeysToLowerTransformation: + return KeysToLowerTransformation() + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: @@ -1174,6 +1187,86 @@ def create_simple_retriever( parameters=model.parameters or {}, ) + def _create_async_job_status_mapping( + self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any + ) -> Mapping[str, AsyncJobStatus]: + api_status_to_cdk_status = {} + for cdk_status, api_statuses in model.dict().items(): + if cdk_status == "type": + # This is an element of the dict because of the typing of the CDK but it is not a CDK status + continue + + for status in api_statuses: + if status in api_status_to_cdk_status: + raise ValueError( + f"API status {status} is already set for CDK status {cdk_status}. Please ensure API statuses are only provided once" + ) + api_status_to_cdk_status[status] = self._get_async_job_status(cdk_status) + return api_status_to_cdk_status + + def _get_async_job_status(self, status: str) -> AsyncJobStatus: + match status: + case "running": + return AsyncJobStatus.RUNNING + case "completed": + return AsyncJobStatus.COMPLETED + case "failed": + return AsyncJobStatus.FAILED + case "timeout": + return AsyncJobStatus.TIMED_OUT + case _: + raise ValueError(f"Unsupported CDK status {status}") + + def create_async_retriever( + self, + model: AsyncRetrieverModel, + config: Config, + *, + name: str, + primary_key: Optional[Union[str, List[str], List[List[str]]]], # this seems to be needed to match create_simple_retriever + stream_slicer: Optional[StreamSlicer], + client_side_incremental_sync: Optional[Dict[str, Any]] = None, + transformations: List[RecordTransformation], + **kwargs: Any, + ) -> AsyncRetriever: + + decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) + record_selector = self._create_component_from_model( + model=model.record_selector, + config=config, + decoder=decoder, + transformations=transformations, + client_side_incremental_sync=client_side_incremental_sync, + ) + stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) + creation_requester = self._create_component_from_model( + model=model.creation_requester, decoder=decoder, config=config, name=f"job creation - {name}" + ) + polling_requester = self._create_component_from_model( + model=model.polling_requester, decoder=decoder, config=config, name=f"job polling - {name}" + ) + download_requester = self._create_component_from_model( + model=model.download_requester, decoder=decoder, config=config, name=f"job download - {name}" + ) + status_extractor = self._create_component_from_model(model=model.status_extractor, decoder=decoder, config=config, name=name) + urls_extractor = self._create_component_from_model(model=model.urls_extractor, decoder=decoder, config=config, name=name) + job_repository: AsyncJobRepository = AsyncHttpJobRepository( + creation_requester=creation_requester, + polling_requester=polling_requester, + download_requester=download_requester, + status_extractor=status_extractor, + status_mapping=self._create_async_job_status_mapping(model.status_mapping, config), + urls_extractor=urls_extractor, + ) + + return AsyncRetriever( + job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(job_repository, stream_slices), + record_selector=record_selector, + stream_slicer=stream_slicer, + config=config, + parameters=model.parameters or {}, + ) + @staticmethod def create_spec(model: SpecModel, config: Config, **kwargs: Any) -> Spec: return Spec( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_job_repository.py new file mode 100644 index 000000000000..ada35e9d5571 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -0,0 +1,177 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +import logging +import uuid +from dataclasses import dataclass, field +from typing import Any, Dict, Iterable, Mapping, Optional + +import requests +from airbyte_cdk.logger import lazy_log +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.declarative.async_job.job import AsyncJob +from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository +from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus +from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor, RecordExtractor +from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ResponseToFileExtractor +from airbyte_cdk.sources.declarative.requesters.requester import Requester +from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.utils import AirbyteTracedException +from requests import Response + +LOGGER = logging.getLogger("airbyte") + + +@dataclass +class AsyncHttpJobRepository(AsyncJobRepository): + creation_requester: Requester + polling_requester: Requester + download_requester: Requester + status_extractor: DpathExtractor + status_mapping: Mapping[str, AsyncJobStatus] + urls_extractor: DpathExtractor + + record_extractor: RecordExtractor = field(init=False, repr=False, default_factory=lambda: ResponseToFileExtractor()) + + def __post_init__(self) -> None: + self._create_job_response_by_id: Dict[str, Response] = {} + self._polling_job_response_by_id: Dict[str, Response] = {} + + def _get_validated_polling_response(self, stream_slice: StreamSlice) -> requests.Response: + """ + Validates and retrieves the pooling response for a given stream slice. + + Args: + stream_slice (StreamSlice): The stream slice to send the pooling request for. + + Returns: + requests.Response: The validated pooling response. + + Raises: + AirbyteTracedException: If the polling request returns an empty response. + """ + + polling_response: Optional[requests.Response] = self.polling_requester.send_request(stream_slice=stream_slice) + if polling_response is None: + raise AirbyteTracedException( + internal_message="Polling Requester received an empty Response.", + failure_type=FailureType.system_error, + ) + return polling_response + + def _get_validated_job_status(self, response: requests.Response) -> AsyncJobStatus: + """ + Validates the job status extracted from the API response. + + Args: + response (requests.Response): The API response. + + Returns: + AsyncJobStatus: The validated job status. + + Raises: + ValueError: If the API status is unknown. + """ + + api_status = next(iter(self.status_extractor.extract_records(response)), None) + job_status = self.status_mapping.get(str(api_status), None) + if job_status is None: + raise ValueError( + f"API status `{api_status}` is unknown. Contact the connector developer to make sure this status is supported." + ) + + return job_status + + def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> requests.Response: + """ + Starts a job and validates the response. + + Args: + stream_slice (StreamSlice): The stream slice to be used for the job. + + Returns: + requests.Response: The response from the job creation requester. + + Raises: + AirbyteTracedException: If no response is received from the creation requester. + """ + + response: Optional[requests.Response] = self.creation_requester.send_request(stream_slice=stream_slice) + if not response: + raise AirbyteTracedException( + internal_message="Always expect a response or an exception from creation_requester", + failure_type=FailureType.system_error, + ) + + return response + + def start(self, stream_slice: StreamSlice) -> AsyncJob: + """ + Starts a job for the given stream slice. + + Args: + stream_slice (StreamSlice): The stream slice to start the job for. + + Returns: + AsyncJob: The asynchronous job object representing the started job. + """ + + response: requests.Response = self._start_job_and_validate_response(stream_slice) + job_id: str = str(uuid.uuid4()) + self._create_job_response_by_id[job_id] = response + + return AsyncJob(api_job_id=job_id, job_parameters=stream_slice) + + def update_jobs_status(self, jobs: Iterable[AsyncJob]) -> None: + """ + Updates the status of multiple jobs. + + Because we don't have interpolation on random fields, we have this hack which consist on using the stream_slice to allow for + interpolation. We are looking at enabling interpolation on more field which would require a change to those three layers: + HttpRequester, RequestOptionProvider, RequestInputProvider. + + Args: + jobs (Iterable[AsyncJob]): An iterable of AsyncJob objects representing the jobs to update. + + Returns: + None + """ + for job in jobs: + stream_slice = StreamSlice( + partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]}, + cursor_slice={}, + ) + polling_response: requests.Response = self._get_validated_polling_response(stream_slice) + job_status: AsyncJobStatus = self._get_validated_job_status(polling_response) + + if job_status != job.status(): + lazy_log(LOGGER, logging.DEBUG, lambda: f"Status of job {job.api_job_id()} changed from {job.status()} to {job_status}") + + job.update_status(job_status) + if job_status == AsyncJobStatus.COMPLETED: + self._polling_job_response_by_id[job.api_job_id()] = polling_response + + def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: + """ + Fetches records from the given job. + + Args: + job (AsyncJob): The job to fetch records from. + + Yields: + Iterable[Mapping[str, Any]]: A generator that yields records as dictionaries. + + """ + + for url in self.urls_extractor.extract_records(self._polling_job_response_by_id[job.api_job_id()]): + stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={}) + # FIXME salesforce will require pagination here + response = self.download_requester.send_request(stream_slice=stream_slice) + if response: + yield from self.record_extractor.extract_records(response) + + yield from [] + + self._clean_up_job(job.api_job_id()) + + def _clean_up_job(self, job_id: str) -> None: + del self._create_job_response_by_id[job_id] + del self._polling_job_response_by_id[job_id] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py index fcbe40d95414..9ec5017fb38c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py @@ -4,5 +4,6 @@ from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever, SimpleRetrieverTestReadDecorator +from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever -__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator"] +__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/async_retriever.py new file mode 100644 index 000000000000..4bb9421e44dc --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -0,0 +1,112 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +from dataclasses import InitVar, dataclass, field +from typing import Any, Callable, Iterable, Mapping, Optional + +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator, AsyncPartition +from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter +from airbyte_cdk.sources.declarative.retrievers import Retriever +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer +from airbyte_cdk.sources.streams.core import StreamData +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +@dataclass +class AsyncRetriever(Retriever): + config: Config + parameters: InitVar[Mapping[str, Any]] + job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator] + record_selector: RecordSelector + stream_slicer: StreamSlicer = field(default_factory=lambda: SinglePartitionRouter(parameters={})) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._job_orchestrator_factory = self.job_orchestrator_factory + self.__job_orchestrator: Optional[AsyncJobOrchestrator] = None + self._parameters = parameters + + @property + def state(self) -> StreamState: + """ + As a first iteration for sendgrid, there is no state to be managed + """ + return {} + + @state.setter + def state(self, value: StreamState) -> None: + """ + As a first iteration for sendgrid, there is no state to be managed + """ + pass + + @property + def _job_orchestrator(self) -> AsyncJobOrchestrator: + if not self.__job_orchestrator: + raise AirbyteTracedException( + message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", + internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", + failure_type=FailureType.system_error, + ) + + return self.__job_orchestrator + + def _get_stream_state(self) -> StreamState: + """ + Gets the current state of the stream. + + Returns: + StreamState: Mapping[str, Any] + """ + + return self.state + + def _validate_and_get_stream_slice_partition(self, stream_slice: Optional[StreamSlice] = None) -> AsyncPartition: + """ + Validates the stream_slice argument and returns the partition from it. + + Args: + stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. + + Returns: + AsyncPartition: The partition extracted from the stream_slice. + + Raises: + AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. + + """ + if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition: + raise AirbyteTracedException( + message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support", + failure_type=FailureType.system_error, + ) + return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices + + def stream_slices(self) -> Iterable[Optional[StreamSlice]]: + slices = self.stream_slicer.stream_slices() + self.__job_orchestrator = self._job_orchestrator_factory(slices) + + for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): + yield StreamSlice( + partition=dict(completed_partition.stream_slice.partition) | {"partition": completed_partition}, + cursor_slice=completed_partition.stream_slice.cursor_slice, + ) + + def read_records( + self, + records_schema: Mapping[str, Any], + stream_slice: Optional[StreamSlice] = None, + ) -> Iterable[StreamData]: + + stream_state: StreamState = self._get_stream_state() + partition: AsyncPartition = self._validate_and_get_stream_slice_partition(stream_slice) + records: Iterable[Mapping[str, Any]] = self._job_orchestrator.fetch_records(partition) + + yield from self.record_selector.filter_and_transform( + all_data=records, + stream_state=stream_state, + records_schema=records_schema, + stream_slice=stream_slice, + ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py index ddab62222694..155de5782aa0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py @@ -3,7 +3,6 @@ # from abc import abstractmethod -from dataclasses import dataclass from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import StreamSlice @@ -11,7 +10,6 @@ from airbyte_cdk.sources.types import StreamState -@dataclass class Retriever: """ Responsible for fetching a stream's records from an HTTP API source. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py index 67290d2a5d95..2a69b78218fd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/add_fields.py @@ -3,12 +3,12 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, List, Mapping, Optional, Type, Union +from typing import Any, Dict, List, Mapping, Optional, Type, Union import dpath from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.transformations import RecordTransformation -from airbyte_cdk.sources.types import Config, FieldPointer, Record, StreamSlice, StreamState +from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState @dataclass(frozen=True) @@ -111,11 +111,11 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def transform( self, - record: Record, + record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - ) -> Record: + ) -> None: if config is None: config = {} kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} @@ -124,7 +124,5 @@ def transform( value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs) dpath.new(record, parsed_field.path, value) - return record - def __eq__(self, other: Any) -> bool: return bool(self.__dict__ == other.__dict__) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py new file mode 100644 index 000000000000..53db3d49abd4 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/keys_to_lower_transformation.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class KeysToLowerTransformation(RecordTransformation): + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + for key in set(record.keys()): + record[key.lower()] = record.pop(key) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py index 1d4edfc39367..658d5dd2ccdb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Optional +from typing import Any, Dict, List, Mapping, Optional import dpath import dpath.exceptions @@ -48,11 +48,11 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def transform( self, - record: Mapping[str, Any], + record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - ) -> Mapping[str, Any]: + ) -> None: """ :param record: The record to be transformed :return: the input record with the requested fields removed @@ -68,5 +68,3 @@ def transform( except dpath.exceptions.PathNotFound: # if the (potentially nested) property does not exist, silently skip pass - - return record diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py index bd66f5fae119..f5b22642964b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/transformation.py @@ -4,9 +4,9 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Any, Mapping, Optional +from typing import Any, Dict, Optional -from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @dataclass @@ -18,13 +18,13 @@ class RecordTransformation: @abstractmethod def transform( self, - record: Record, + record: Dict[str, Any], config: Optional[Config] = None, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, - ) -> Mapping[str, Any]: + ) -> None: """ - Transform a record by adding, deleting, or mutating fields. + Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument. :param record: The input record to be transformed :param config: The user-provided configuration as specified by the source's spec diff --git a/airbyte-cdk/python/poetry.lock b/airbyte-cdk/python/poetry.lock index 4564f7891812..4617334a85d1 100644 --- a/airbyte-cdk/python/poetry.lock +++ b/airbyte-cdk/python/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -1961,8 +1961,8 @@ files = [ httpx = ">=0.23.0,<1" orjson = ">=3.9.14,<4.0.0" pydantic = [ - {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, + {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, ] requests = ">=2,<3" @@ -2668,7 +2668,7 @@ twitter = ["twython"] name = "numpy" version = "1.26.4" description = "Fundamental package for array computing in Python" -optional = true +optional = false python-versions = ">=3.9" files = [ {file = "numpy-1.26.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:9ff0f4f29c51e2803569d7a51c2304de5554655a60c5d776e35b4a41413830d0"}, @@ -2835,7 +2835,7 @@ files = [ name = "pandas" version = "2.2.0" description = "Powerful data structures for data analysis, time series, and statistics" -optional = true +optional = false python-versions = ">=3.9" files = [ {file = "pandas-2.2.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:8108ee1712bb4fa2c16981fba7e68b3f6ea330277f5ca34fa8d557e986a11670"}, @@ -2871,9 +2871,9 @@ files = [ [package.dependencies] numpy = [ + {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, {version = ">=1.22.4,<2", markers = "python_version < \"3.11\""}, {version = ">=1.23.2,<2", markers = "python_version == \"3.11\""}, - {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -3306,8 +3306,8 @@ files = [ annotated-types = ">=0.4.0" pydantic-core = "2.20.1" typing-extensions = [ - {version = ">=4.6.1", markers = "python_version < \"3.13\""}, {version = ">=4.12.2", markers = "python_version >= \"3.13\""}, + {version = ">=4.6.1", markers = "python_version < \"3.13\""}, ] [package.extras] @@ -4959,7 +4959,7 @@ typing-extensions = ">=3.7.4" name = "tzdata" version = "2024.1" description = "Provider of IANA time zone data" -optional = true +optional = false python-versions = ">=2" files = [ {file = "tzdata-2024.1-py2.py3-none-any.whl", hash = "sha256:9068bc196136463f5245e51efda838afa15aaeca9903f49050dfa2679db4d252"}, @@ -5404,4 +5404,4 @@ vector-db-based = ["cohere", "langchain", "openai", "tiktoken"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "1759d8574c392cf39fccff997263873168087159c5f741314ceff6db4e5a32af" +content-hash = "1df63da7ed8c114e2732cbe566cf45b102edfbcf4aba88aa1be5fd505addd54b" diff --git a/airbyte-cdk/python/pyproject.toml b/airbyte-cdk/python/pyproject.toml index 0b69640a7d0a..e72e79099fc6 100644 --- a/airbyte-cdk/python/pyproject.toml +++ b/airbyte-cdk/python/pyproject.toml @@ -35,6 +35,7 @@ isodate = "~0.6.1" Jinja2 = "~3.1.2" jsonref = "~0.2" jsonschema = "~3.2.0" +pandas = "2.2.0" pendulum = "<3.0.0" pydantic = "^2.7" pyrate-limiter = "~3.1.0" @@ -51,7 +52,6 @@ langchain = { version = "0.1.16", optional = true } langchain_core = { version = "0.1.42", optional = true } markdown = { version = "*", optional = true } openai = { version = "0.27.9", extras = ["embeddings"], optional = true } -pandas = { version = "2.2.0", optional = true } pdf2image = { version = "1.16.3", optional = true } "pdfminer.six" = { version = "20221105", optional = true } pyarrow = { version = "~15.0.0", optional = true } diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/async_job/__init__.py b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_integration.py b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_integration.py new file mode 100644 index 000000000000..fa608bbd16fd --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_integration.py @@ -0,0 +1,111 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +import logging +from typing import Any, Iterable, List, Mapping, Optional, Set, Tuple +from unittest import TestCase, mock + +from airbyte_cdk import AbstractSource, DeclarativeStream, SinglePartitionRouter, Stream, StreamSlice +from airbyte_cdk.models import ConnectorSpecification +from airbyte_cdk.sources.declarative.async_job.job import AsyncJob +from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator +from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository +from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor +from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever +from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder +from airbyte_cdk.test.entrypoint_wrapper import read + +_A_STREAM_NAME = "a_stream_name" +_EXTRACTOR_NOT_USED: RecordExtractor = None # type: ignore # the extractor should not be used. If it is the case, there is an issue that needs fixing + + +class MockAsyncJobRepository(AsyncJobRepository): + + def start(self, stream_slice: StreamSlice) -> AsyncJob: + return AsyncJob("a_job_id", StreamSlice(partition={}, cursor_slice={})) + + def update_jobs_status(self, jobs: Set[AsyncJob]) -> None: + for job in jobs: + job.update_status(AsyncJobStatus.COMPLETED) + + def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: + yield from [{"record_field": 10}] + + +class MockSource(AbstractSource): + + def __init__(self, stream_slicer: Optional[StreamSlicer] = None) -> None: + self._stream_slicer = SinglePartitionRouter({}) if stream_slicer is None else stream_slicer + + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: + return True, None + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: + return ConnectorSpecification(connectionSpecification={}) + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + noop_record_selector = RecordSelector( + extractor=_EXTRACTOR_NOT_USED, + config={}, + parameters={}, + schema_normalization=TypeTransformer(TransformConfig.NoTransform), + record_filter=None, + transformations=[] + ) + return [ + DeclarativeStream( + retriever=AsyncRetriever( + config={}, + parameters={}, + record_selector=noop_record_selector, + stream_slicer=self._stream_slicer, + job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( + MockAsyncJobRepository(), stream_slices, + ), + ), + config={}, + parameters={}, + name=_A_STREAM_NAME, + primary_key=["id"], + schema_loader=InlineSchemaLoader({}, {}), + # the interface mentions that this is Optional, + # but I get `'NoneType' object has no attribute 'eval'` by passing None + stream_cursor_field="", + ) + ] + + +class JobDeclarativeStreamTest(TestCase): + _CONFIG: Mapping[str, Any] = {} + + def setUp(self) -> None: + self._stream_slicer = mock.Mock(wraps=SinglePartitionRouter({})) + self._source = MockSource(self._stream_slicer) + self._source.streams({}) + + def test_when_read_then_return_records_from_repository(self) -> None: + output = read( + self._source, + self._CONFIG, + CatalogBuilder().with_stream(ConfiguredAirbyteStreamBuilder().with_name(_A_STREAM_NAME)).build() + ) + + assert len(output.records) == 1 + + def test_when_read_then_call_stream_slices_only_once(self) -> None: + """ + As generating stream slices is very expensive, we want to ensure that during a read, it is only called once. + """ + output = read( + self._source, + self._CONFIG, + CatalogBuilder().with_stream(ConfiguredAirbyteStreamBuilder().with_name(_A_STREAM_NAME)).build() + ) + + assert not output.errors + assert self._stream_slicer.stream_slices.call_count == 1 diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_job.py b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_job.py new file mode 100644 index 000000000000..6399433e4413 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_job.py @@ -0,0 +1,32 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import time +from datetime import timedelta +from unittest import TestCase + +from airbyte_cdk.sources.declarative.async_job.job import AsyncJob +from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus +from airbyte_cdk.sources.declarative.types import StreamSlice + +_AN_API_JOB_ID = "an api job id" +_ANY_STREAM_SLICE = StreamSlice(partition={}, cursor_slice={}) +_A_VERY_BIG_TIMEOUT = timedelta(days=999999999) +_IMMEDIATELY_TIMED_OUT = timedelta(microseconds=1) + + +class AsyncJobTest(TestCase): + def test_given_timer_is_not_out_when_status_then_return_actual_status(self) -> None: + job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _A_VERY_BIG_TIMEOUT) + assert job.status() == AsyncJobStatus.RUNNING + + def test_given_timer_is_out_when_status_then_return_timed_out(self) -> None: + job = AsyncJob(_AN_API_JOB_ID, _ANY_STREAM_SLICE, _IMMEDIATELY_TIMED_OUT) + time.sleep(0.001) + assert job.status() == AsyncJobStatus.TIMED_OUT + + def test_given_status_is_terminal_when_update_status_then_stop_timer(self) -> None: + """ + This test will become important once we will print stats associated with jobs. As for now, we stop the timer but do not return any + metrics regarding the timer so it is not useful. + """ + pass diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_job_orchestrator.py new file mode 100644 index 000000000000..5e2c3a51a5ab --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -0,0 +1,144 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import logging +from typing import Callable, List, Mapping, Set +from unittest import TestCase, mock +from unittest.mock import MagicMock, Mock, call + +import pytest +from airbyte_cdk import AirbyteTracedException, StreamSlice +from airbyte_cdk.sources.declarative.async_job.job import AsyncJob, AsyncJobStatus +from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator, AsyncPartition +from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository + +_ANY_STREAM_SLICE = Mock() +_A_STREAM_SLICE = Mock() +_ANOTHER_STREAM_SLICE = Mock() +_ANY_RECORD = {"a record field": "a record value"} + + +def _create_job(status: AsyncJobStatus = AsyncJobStatus.FAILED) -> AsyncJob: + job = Mock(spec=AsyncJob) + job.status.return_value = status + return job + + +class AsyncPartitionTest(TestCase): + def test_given_one_failed_job_when_status_then_return_failed(self) -> None: + partition = AsyncPartition([_create_job(status) for status in AsyncJobStatus], _ANY_STREAM_SLICE) + assert partition.status == AsyncJobStatus.FAILED + + def test_given_all_status_except_failed_when_status_then_return_timed_out(self) -> None: + statuses = [status for status in AsyncJobStatus if status != AsyncJobStatus.FAILED] + partition = AsyncPartition([_create_job(status) for status in statuses], _ANY_STREAM_SLICE) + assert partition.status == AsyncJobStatus.TIMED_OUT + + def test_given_running_and_completed_jobs_when_status_then_return_running(self) -> None: + partition = AsyncPartition([_create_job(AsyncJobStatus.RUNNING), _create_job(AsyncJobStatus.COMPLETED)], _ANY_STREAM_SLICE) + assert partition.status == AsyncJobStatus.RUNNING + + def test_given_only_completed_jobs_when_status_then_return_running(self) -> None: + partition = AsyncPartition([_create_job(AsyncJobStatus.COMPLETED) for _ in range(10)], _ANY_STREAM_SLICE) + assert partition.status == AsyncJobStatus.COMPLETED + + +def _status_update_per_jobs(status_update_per_jobs: Mapping[AsyncJob, List[AsyncJobStatus]]) -> Callable[[set[AsyncJob]], None]: + status_index_by_job = {job: 0 for job in status_update_per_jobs.keys()} + + def _update_status(jobs: Set[AsyncJob]) -> None: + for job in jobs: + status_index = status_index_by_job[job] + job.update_status(status_update_per_jobs[job][status_index]) + status_index_by_job[job] += 1 + + return _update_status + + +sleep_mock_target = "airbyte_cdk.sources.declarative.async_job.job_orchestrator.time.sleep" + + +class AsyncJobOrchestratorTest(TestCase): + def setUp(self) -> None: + self._job_repository = Mock(spec=AsyncJobRepository) + self._logger = Mock(spec=logging.Logger) + + self._job_for_a_slice = mock.Mock(wraps=AsyncJob("an api job id", _A_STREAM_SLICE)) + self._job_for_another_slice = mock.Mock(wraps=AsyncJob("another api job id", _ANOTHER_STREAM_SLICE)) + + @mock.patch(sleep_mock_target) + def test_when_create_and_get_completed_partitions_then_create_job_and_update_status_until_completed(self, mock_sleep: MagicMock) -> None: + self._job_repository.start.return_value = self._job_for_a_slice + status_updates = [AsyncJobStatus.RUNNING, AsyncJobStatus.RUNNING, AsyncJobStatus.COMPLETED] + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + { + self._job_for_a_slice: status_updates + } + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + partitions = list(orchestrator.create_and_get_completed_partitions()) + + assert len(partitions) == 1 + assert partitions[0].status == AsyncJobStatus.COMPLETED + assert self._job_for_a_slice.update_status.mock_calls == [call(status) for status in status_updates] + + @mock.patch(sleep_mock_target) + def test_given_one_job_still_running_when_create_and_get_completed_partitions_then_only_update_running_job_status(self, mock_sleep: MagicMock) -> None: + self._job_repository.start.side_effect = [self._job_for_a_slice, self._job_for_another_slice] + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + { + self._job_for_a_slice: [AsyncJobStatus.COMPLETED], + self._job_for_another_slice: [AsyncJobStatus.RUNNING, AsyncJobStatus.COMPLETED], + } + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE, _ANOTHER_STREAM_SLICE]) + + list(orchestrator.create_and_get_completed_partitions()) + + assert self._job_repository.update_jobs_status.mock_calls == [ + call({self._job_for_a_slice, self._job_for_another_slice}), + call({self._job_for_another_slice}), + ] + + @mock.patch(sleep_mock_target) + def test_given_timeout_when_create_and_get_completed_partitions_then_raise_exception(self, mock_sleep: MagicMock) -> None: + self._job_repository.start.return_value = self._job_for_a_slice + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + { + self._job_for_a_slice: [AsyncJobStatus.TIMED_OUT] + } + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + with pytest.raises(AirbyteTracedException): + list(orchestrator.create_and_get_completed_partitions()) + assert self._job_repository.start.call_args_list == [call(_A_STREAM_SLICE)] * 4 + + @mock.patch(sleep_mock_target) + def test_given_failure_when_create_and_get_completed_partitions_then_raise_exception(self, mock_sleep: MagicMock) -> None: + self._job_repository.start.return_value = self._job_for_a_slice + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + { + self._job_for_a_slice: [AsyncJobStatus.FAILED] + } + ) + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + with pytest.raises(AirbyteTracedException): + list(orchestrator.create_and_get_completed_partitions()) + assert self._job_repository.start.call_args_list == [call(_A_STREAM_SLICE)] * 4 + + def test_when_fetch_records_then_yield_records_from_each_job(self) -> None: + self._job_repository.fetch_records.return_value = [_ANY_RECORD] + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + first_job = _create_job() + second_job = _create_job() + partition = AsyncPartition([first_job, second_job], _A_STREAM_SLICE) + + records = list(orchestrator.fetch_records(partition)) + + assert len(records) == 2 + assert self._job_repository.fetch_records.mock_calls == [call(first_job), call(second_job)] + + def _orchestrator(self, slices: List[StreamSlice]) -> AsyncJobOrchestrator: + return AsyncJobOrchestrator(self._job_repository, slices) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/compressed_response b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/compressed_response new file mode 100644 index 000000000000..da79e347f053 Binary files /dev/null and b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/compressed_response differ diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/decompressed_response.csv b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/decompressed_response.csv new file mode 100644 index 000000000000..ebef74b8ec70 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/decompressed_response.csv @@ -0,0 +1,25 @@ +"EMAIL","FIRST_NAME","LAST_NAME","ADDRESS_LINE_1","ADDRESS_LINE_2","CITY","STATE_PROVINCE_REGION","POSTAL_CODE","COUNTRY","ALTERNATE_EMAILS","PHONE_NUMBER","WHATSAPP","LINE","FACEBOOK","UNIQUE_NAME","CREATED_AT","UPDATED_AT","CONTACT_ID" +"fake_email_10@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:22Z","2021-02-01T12:35:51Z","eae8c5c8-f97e-40a8-8945-72acca457f5a" +"fake_email_1@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:08Z","2021-02-01T12:35:38Z","198f959f-f441-4d15-a280-9e8f65a90ba5" +"fake_email_12@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:24Z","2021-02-01T12:35:53Z","6975b74c-bb1e-4d54-a251-b934c4193ed4" +"fake_email_8@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:19Z","2021-02-01T12:35:49Z","36ef1a2d-3cc4-4515-9c00-1615c5f860d0" +"fake_email_18@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:30Z","2021-02-01T12:36:00Z","19163421-bb29-495d-950f-edede6218081" +"fake_email_3@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:14Z","2021-02-01T12:35:43Z","d1211b88-e116-4a0b-a823-0361bf059a06" +"fake_email_9@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:21Z","2021-02-01T12:35:50Z","ef4225b0-dff9-4756-af87-c4228d836d53" +"fake_email_4@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:15Z","2021-02-01T12:35:44Z","9adef36c-fe51-421a-9653-6bd010962e98" +"fake_email_2@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:13Z","2021-02-01T12:35:42Z","210d8004-d12a-4f01-815a-f90cfa9e4360" +"fake_email_6@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:17Z","2021-02-01T12:35:46Z","76330f89-5645-4432-b3bb-9e33a9195273" +"fake_email_14@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:26Z","2021-02-01T12:35:55Z","77200269-0b69-462c-bed1-9e6f912d4b83" +"fake_email_13@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:25Z","2021-02-01T12:35:54Z","c91c993b-1dfa-4686-bcf0-31e4aeb2a1a9" +"joepogbm@ggma.co",,,,,,,,,,,,,,,"2021-02-03T19:26:52Z","2021-02-03T19:27:21Z","a2a1f3f4-0170-4fbd-9152-ffe8cbcdb93d" +"fake_email_17@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:29Z","2021-02-01T12:35:59Z","e45af829-de4e-44d6-9c89-bb0c7ce47925" +"fake_email_15@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:27Z","2021-02-01T12:35:56Z","50b36a31-daf8-45c4-bc48-13e150f6746e" +"fake_email_7@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:18Z","2021-02-01T12:35:47Z","353113b9-b41e-480a-bf98-72213350194c" +"y.kurochkin@zazmic.com",,,,,,,,,,,,,,,"2021-02-03T19:34:41Z","2021-02-03T19:35:47Z","0b62947e-de93-419e-8c96-83572bf15ed1" +"fake_email_19@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:31Z","2021-02-01T12:36:01Z","9932d677-1128-47e4-9d97-667c6155bfee" +"joepogbum@ggma.co",,,,,,,,,,,,,,,"2021-02-03T19:22:41Z","2021-02-03T19:23:10Z","ba3c48d5-b63b-48e6-8687-c5034ed0a8dd" +"fake_email_0@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:34:49Z","2021-02-01T12:35:19Z","44ec451f-d401-40d2-831d-3e3ce8a94f66" +"avida.d3@gmail.com","dima","dima",,,,,,,,,,,,,"2021-09-08T09:02:22Z","2021-09-08T09:04:58Z","2f7b13f2-60d2-462a-bfb0-d30bb8eabed8" +"fake_email_16@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:28Z","2021-02-01T12:35:57Z","c6cfd936-e327-48da-aa76-824076461d80" +"fake_email_11@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:23Z","2021-02-01T12:35:52Z","4101feb2-2b07-4aef-8eb5-62878b612fcd" +"fake_email_5@lmail.c","Fake contact","Lastname",,,,,"22341",,,,,,,,"2021-02-01T12:35:16Z","2021-02-01T12:35:45Z","32deb20d-9f8f-44b4-aed2-dc15d5bf45ba" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py new file mode 100644 index 000000000000..33ed74d395fd --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py @@ -0,0 +1,54 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +from io import BytesIO +from pathlib import Path +from unittest import TestCase + +import requests +import requests_mock +from airbyte_cdk.sources.declarative.extractors import ResponseToFileExtractor + + +class ResponseToFileExtractorTest(TestCase): + def setUp(self) -> None: + self._extractor = ResponseToFileExtractor() + self._http_mocker = requests_mock.Mocker() + self._http_mocker.__enter__() + + def tearDown(self) -> None: + self._http_mocker.__exit__(None, None, None) + + def test_compressed_response(self) -> None: + response = self._mock_streamed_response_from_file(self._compressed_response_path()) + extracted_records = list(self._extractor.extract_records(response)) + assert len(extracted_records) == 24 + + def test_text_response(self) -> None: + response = self._mock_streamed_response_from_file(self._decompressed_response_path()) + extracted_records = list(self._extractor.extract_records(response)) + assert len(extracted_records) == 24 + + def test_text_response_with_null_bytes(self) -> None: + csv_with_null_bytes = '"FIRST_\x00NAME","LAST_NAME"\n"a first n\x00ame","a last na\x00me"\n' + response = self._mock_streamed_response(BytesIO(csv_with_null_bytes.encode("utf-8"))) + + extracted_records = list(self._extractor.extract_records(response)) + + assert extracted_records == [{"FIRST_NAME": "a first name", "LAST_NAME": "a last name"}] + + def _test_folder_path(self) -> Path: + return Path(__file__).parent.resolve() + + def _compressed_response_path(self) -> Path: + return self._test_folder_path() / "compressed_response" + + def _decompressed_response_path(self) -> Path: + return self._test_folder_path() / "decompressed_response.csv" + + def _mock_streamed_response_from_file(self, path: Path) -> requests.Response: + with path.open("rb") as f: + return self._mock_streamed_response(f) # type: ignore # Could not find the right typing for file io + + def _mock_streamed_response(self, io: BytesIO) -> requests.Response: + any_url = "https://anyurl.com" + self._http_mocker.register_uri("GET", any_url, [{"body": io, "status_code": 200}]) + return requests.get(any_url) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py new file mode 100644 index 000000000000..17966e963eac --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -0,0 +1,180 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + + +import json +from unittest import TestCase +from unittest.mock import Mock + +import pytest +from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.extractors import DpathExtractor +from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler +from airbyte_cdk.sources.declarative.requesters.http_job_repository import AsyncHttpJobRepository +from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester +from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod +from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_ANY_CONFIG = {} +_ANY_SLICE = StreamSlice(partition={}, cursor_slice={}) +_URL_BASE = "https://api.sendgrid.com/v3/" +_EXPORT_PATH = "marketing/contacts/exports" +_EXPORT_URL = f"{_URL_BASE}{_EXPORT_PATH}" +_A_JOB_ID = "a-job-id" +_ANOTHER_JOB_ID = "another-job-id" +_JOB_FIRST_URL = "https://job.result.api.com/1" +_JOB_SECOND_URL = "https://job.result.api.com/2" +_A_CSV_WITH_ONE_RECORD = """id,value +a_record_id,a_value +""" + + +class HttpJobRepositoryTest(TestCase): + def setUp(self) -> None: + message_repository = Mock() + error_handler = DefaultErrorHandler(config=_ANY_CONFIG, parameters={}) + + self._create_job_requester = HttpRequester( + name="stream : create_job", + url_base=_URL_BASE, + path=_EXPORT_PATH, + error_handler=error_handler, + http_method=HttpMethod.POST, + config=_ANY_CONFIG, + disable_retries=False, + parameters={}, + message_repository=message_repository, + use_cache=False, + stream_response=False, + ) + + self._polling_job_requester = HttpRequester( + name="stream : polling", + url_base=_URL_BASE, + path=_EXPORT_PATH + "/{{stream_slice['create_job_response'].json()['id']}}", + error_handler=error_handler, + http_method=HttpMethod.GET, + config=_ANY_CONFIG, + disable_retries=False, + parameters={}, + message_repository=message_repository, + use_cache=False, + stream_response=False, + ) + + self._download_job_requester = HttpRequester( + name="stream : fetch_result", + url_base="", + path="{{stream_slice['url']}}", + error_handler=error_handler, + http_method=HttpMethod.GET, + config=_ANY_CONFIG, + disable_retries=False, + parameters={}, + message_repository=message_repository, + use_cache=False, + stream_response=True, + ) + + self._repository = AsyncHttpJobRepository( + creation_requester=self._create_job_requester, + polling_requester=self._polling_job_requester, + download_requester=self._download_job_requester, + status_extractor=DpathExtractor(decoder=JsonDecoder(parameters={}), field_path=["status"], config={}, parameters={} or {}), + status_mapping={ + "ready": AsyncJobStatus.COMPLETED, + "failure": AsyncJobStatus.FAILED, + "pending": AsyncJobStatus.RUNNING, + }, + urls_extractor=DpathExtractor(decoder=JsonDecoder(parameters={}), field_path=["urls"], config={}, parameters={} or {}), + ) + + self._http_mocker = HttpMocker() + self._http_mocker.__enter__() + + def tearDown(self) -> None: + self._http_mocker.__exit__(None, None, None) + + def test_given_different_statuses_when_update_jobs_status_then_update_status_properly(self) -> None: + self._mock_create_response(_A_JOB_ID) + self._http_mocker.get( + HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), + [ + HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "pending"})), + HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "failure"})), + HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), + ] + ) + job = self._repository.start(_ANY_SLICE) + + self._repository.update_jobs_status([job]) + assert job.status() == AsyncJobStatus.RUNNING + self._repository.update_jobs_status([job]) + assert job.status() == AsyncJobStatus.FAILED + self._repository.update_jobs_status([job]) + assert job.status() == AsyncJobStatus.COMPLETED + + def test_given_unknown_status_when_update_jobs_status_then_raise_error(self) -> None: + self._mock_create_response(_A_JOB_ID) + self._http_mocker.get( + HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), + HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "invalid_status"})), + ) + job = self._repository.start(_ANY_SLICE) + + with pytest.raises(ValueError): + self._repository.update_jobs_status([job]) + + def test_given_multiple_jobs_when_update_jobs_status_then_all_the_jobs_are_updated(self) -> None: + self._mock_create_response(_A_JOB_ID) + self._http_mocker.get( + HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), + HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), + ) + self._mock_create_response(_ANOTHER_JOB_ID) + self._http_mocker.get( + HttpRequest(url=f"{_EXPORT_URL}/{_ANOTHER_JOB_ID}"), + HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), + ) + a_job = self._repository.start(_ANY_SLICE) + another_job = self._repository.start(_ANY_SLICE) + + self._repository.update_jobs_status([a_job, another_job]) + + assert a_job.status() == AsyncJobStatus.COMPLETED + assert another_job.status() == AsyncJobStatus.COMPLETED + + def test_given_multiple_urls_when_fetch_records_then_fetch_from_multiple_urls(self) -> None: + self._mock_create_response(_A_JOB_ID) + self._http_mocker.get( + HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), + HttpResponse(body=json.dumps({ + "id": _A_JOB_ID, + "status": "ready", + "urls": [ + _JOB_FIRST_URL, + _JOB_SECOND_URL, + ] + })) + ) + self._http_mocker.get( + HttpRequest(url=_JOB_FIRST_URL), + HttpResponse(body=_A_CSV_WITH_ONE_RECORD), + ) + self._http_mocker.get( + HttpRequest(url=_JOB_SECOND_URL), + HttpResponse(body=_A_CSV_WITH_ONE_RECORD), + ) + + job = self._repository.start(_ANY_SLICE) + self._repository.update_jobs_status([job]) + records = list(self._repository.fetch_records(job)) + + assert len(records) == 2 + + def _mock_create_response(self, job_id: str) -> None: + self._http_mocker.post( + HttpRequest(url=_EXPORT_URL), + HttpResponse(body=json.dumps({"id": job_id})), + ) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_add_fields.py b/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_add_fields.py index 83d5f19e8585..9b46cf49b99b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_add_fields.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_add_fields.py @@ -132,4 +132,5 @@ def test_add_fields( expected: Mapping[str, Any], ): inputs = [AddedFieldDefinition(path=v[0], value=v[1], value_type=field_type, parameters={}) for v in field] - assert AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs) == expected + AddFields(fields=inputs, parameters={"alas": "i live"}).transform(input_record, **kwargs) + assert input_record == expected diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_keys_to_lower_transformation.py b/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_keys_to_lower_transformation.py new file mode 100644 index 000000000000..7464b9f04fd2 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_keys_to_lower_transformation.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import KeysToLowerTransformation + +_ANY_VALUE = -1 + + +def test_transform() -> None: + record = {"wIth_CapITal": _ANY_VALUE, "anOThEr_witH_Caps": _ANY_VALUE} + KeysToLowerTransformation().transform(record) + assert {"with_capital": _ANY_VALUE, "another_with_caps": _ANY_VALUE} diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_remove_fields.py b/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_remove_fields.py index 0b9d6da5b56d..89b17e8d0f75 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_remove_fields.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_remove_fields.py @@ -85,4 +85,5 @@ ) def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], condition: str, expected: Mapping[str, Any]): transformation = RemoveFields(field_pointers=field_pointers, condition=condition, parameters={}) - assert transformation.transform(input_record) == expected + transformation.transform(input_record) + assert input_record == expected