Skip to content

Commit

Permalink
[source-us-census] fix empty fields after sync (#45331)
Browse files Browse the repository at this point in the history
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
topefolorunso and octavia-squidington-iii authored Sep 11, 2024
1 parent 7056428 commit 19335d6
Show file tree
Hide file tree
Showing 7 changed files with 595 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c4cfaeda-c757-489a-8aba-859fb08b6970
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
dockerRepository: airbyte/source-us-census
githubIssueLabel: source-us-census
icon: uscensus.svg
Expand Down
649 changes: 453 additions & 196 deletions airbyte-integrations/connectors/source-us-census/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.2.0"
version = "0.2.1"
name = "source-us-census"
description = "Source implementation for Us Census."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@
#


from typing import List, Optional, Union
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional, Union

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
DEFAULT_ERROR_RESOLUTION,
SUCCESS_RESOLUTION,
ErrorResolution,
ResponseAction,
create_fallback_error_resolution,
)
from airbyte_cdk.sources.types import Config


class USCensusRecordExtractor(RecordExtractor):
Expand Down Expand Up @@ -132,4 +135,46 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)

return default_response_filter_resolution if default_response_filter_resolution else DEFAULT_ERROR_RESOLUTION
return (
default_response_filter_resolution
if default_response_filter_resolution
else create_fallback_error_resolution(response_or_exception)
)


@dataclass
class USCensusSchema(SchemaLoader):
"""
The US Census website hosts many APIs: https://www.census.gov/data/developers/data-sets.html
These APIs return data in a non standard format.
We create the JSON schemas dynamically by reading the first "row" of data we get.
In this implementation all records are of type "string", but this function could
be changed to try and infer the data type based on the values it finds.
"""

config: Config

def get_json_schema(self) -> Mapping[str, Any]:
query_params = self.config.get("query_params")
if query_params:
parts = query_params.split("&")
parameters = []
for part in parts:
key, value = part.split("=", 1)
if key == "get":
parameters += value.split(",")
elif key == "for":
parameters.append(value.split(":")[0])
else:
parameters.append(key)
json_schema = {k: {"type": "string"} for k in parameters}
else:
json_schema = {"{ @context: https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld": {"type": "string"}}
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": json_schema,
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ definitions:
type: CustomRecordExtractor
class_name: source_us_census.components.USCensusRecordExtractor
schema_loader:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/us_census_stream"
type: CustomSchemaLoader
class_name: "source_us_census.components.USCensusSchema"
base_requester:
type: HttpRequester
url_base: https://api.census.gov/
Expand Down Expand Up @@ -84,14 +83,3 @@ spec:
order: 2
airbyte_secret: true
additionalProperties: true

metadata:
autoImportSchema:
us_census_stream: true

schemas:
us_census_stream:
type: object
$schema: http://json-schema.org/draft-07/schema#
additionalProperties: true
properties: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from typing import Any, Mapping
from unittest.mock import Mock

import pytest
from source_us_census.components import USCensusSchema


@dataclass
class MockConfig:
query_params: str = None

def get(self, key):
if key == "query_params":
return self.query_params


@pytest.fixture
def census_schema():
def _create_schema(query_params=None):
config = MockConfig(query_params=query_params)
return USCensusSchema(config=config)
return _create_schema


def test_get_json_schema_basic_case(census_schema):
schema_instance = census_schema(query_params="get=NAME,POP&for=state:*")
schema = schema_instance.get_json_schema()

expected_properties = {
"NAME": {"type": "string"},
"POP": {"type": "string"},
"state": {"type": "string"}
}

assert schema["properties"] == expected_properties
assert schema["$schema"] == "http://json-schema.org/draft-07/schema#"
assert schema["type"] == "object"
assert schema["additionalProperties"] is True


def test_get_json_schema_with_get_param(census_schema):
schema_instance = census_schema(query_params="get=NAME,AGE,EMPLOYMENT")
schema = schema_instance.get_json_schema()

expected_properties = {
"NAME": {"type": "string"},
"AGE": {"type": "string"},
"EMPLOYMENT": {"type": "string"}
}

assert schema["properties"] == expected_properties


def test_get_json_schema_with_for_param(census_schema):
schema_instance = census_schema(query_params="for=county:1234")
schema = schema_instance.get_json_schema()

expected_properties = {
"county": {"type": "string"}
}

assert schema["properties"] == expected_properties


def test_get_json_schema_with_additional_params(census_schema):
schema_instance = census_schema(query_params="get=NAME&year=2020&for=us:*")
schema = schema_instance.get_json_schema()

expected_properties = {
"NAME": {"type": "string"},
"year": {"type": "string"},
"us": {"type": "string"}
}

assert schema["properties"] == expected_properties


def test_get_json_schema_no_query_params(census_schema):
schema_instance = census_schema(query_params=None)
schema = schema_instance.get_json_schema()

expected_properties = {
"{ @context: https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld": {"type": "string"}
}

assert schema["properties"] == expected_properties
1 change: 1 addition & 0 deletions docs/integrations/sources/us-census.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ In addition, to understand how to configure the dataset path and query parameter

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------ |
| 0.2.1 | 2024-09-07 | [45331](https://github.com/airbytehq/airbyte/pull/45331) | Fix schema |
| 0.2.0 | 2024-08-10 | [43521](https://github.com/airbytehq/airbyte/pull/43521) | Migrate to Low Code |
| 0.1.16 | 2024-08-10 | [43566](https://github.com/airbytehq/airbyte/pull/43566) | Update dependencies |
| 0.1.15 | 2024-08-03 | [43214](https://github.com/airbytehq/airbyte/pull/43214) | Update dependencies |
Expand Down

0 comments on commit 19335d6

Please sign in to comment.