Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ feat(Destination PGVector): new connector #45428

Merged
merged 30 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ffab5f7
checkpoint: copy in files from destination-snowflake-cortex
aaronsteers Aug 16, 2024
e25b9a1
rename base python src folder
aaronsteers Aug 16, 2024
edcbe73
some global renames
aaronsteers Aug 16, 2024
90e482b
refactor/replace SQLConfig and SQLTypeConverter classes
aaronsteers Aug 16, 2024
53fd970
refactor of sqlprocessor class
aaronsteers Aug 16, 2024
54faae9
updated config.py for PGVector
aaronsteers Aug 16, 2024
cf071e7
updated Destination class
aaronsteers Aug 16, 2024
8d082be
update main.py
aaronsteers Aug 16, 2024
16bded5
update pyproject, poetry and metadata files
aldogonzalez8 Aug 20, 2024
8ca7b35
make common unit test destination_test.py pass
aldogonzalez8 Aug 20, 2024
f64279a
fix unit test class name
aldogonzalez8 Aug 21, 2024
d6bee0c
fix integration tests
aldogonzalez8 Aug 21, 2024
f648cb3
update acceptance-tests-config, and sample_config and spec in integra…
aldogonzalez8 Aug 21, 2024
afe701e
Update readme file
aldogonzalez8 Aug 21, 2024
c6f5614
Update bootstrap file
aldogonzalez8 Aug 21, 2024
207731c
Update icon to postgres one
aldogonzalez8 Aug 21, 2024
769f4e3
fix command in readme file
aldogonzalez8 Aug 22, 2024
04b09b6
Merge branch 'master' into destination-pgvector/new-start
aldogonzalez8 Sep 3, 2024
9adea7a
Merge branch 'master' into destination-pgvector/new-start
aldogonzalez8 Sep 11, 2024
47af773
remove todos from metadata.yaml
aldogonzalez8 Sep 11, 2024
1be7a53
remove todos from pgvector_processor.py
aldogonzalez8 Sep 11, 2024
a60967b
update definitionId in metadata.yaml
aldogonzalez8 Sep 12, 2024
d1b9fa7
chore: format code
aldogonzalez8 Sep 12, 2024
a09b84c
fix(destination-pgvector): fix cli entrypoint
aaronsteers Sep 12, 2024
1de95ed
chore: enable pypi publish
aaronsteers Sep 12, 2024
976f9ab
add pgvector doc
aldogonzalez8 Sep 12, 2024
aee08eb
fix: missing run() function
aaronsteers Sep 12, 2024
aa86c32
fix image tag version
aldogonzalez8 Sep 13, 2024
2e27767
fix import of models
aldogonzalez8 Sep 14, 2024
589102a
Fix release date
aldogonzalez8 Sep 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions airbyte-integrations/connectors/destination-pgvector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# PGVector Destination

This is the repository for the PGVector destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/pgvector).

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**

#### Minimum Python version required `= 3.9.0`

### Installing the connector
From this connector directory, run:
```bash
poetry install --with dev.
```

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/pgvector)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_pgvector/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination pgvector test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
poetry run python main.py spec
poetry run python main.py check --config secrets/config.json
cat examples/messages.jsonl | poetry run python main.py write --config secrets/config.json --catalog examples/configured_catalog.json
```

### Locally running the connector docker image

#### Use `airbyte-ci` to build your connector
The Airbyte way of building this connector is to use our `airbyte-ci` tool.
You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1).
Then running the following command will build your connector:

```bash
airbyte-ci connectors --name destination-pgvector build
```
Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-pgvector:dev`.

##### Customizing our build process
When contributing on our connector you might need to customize the build process to add a system dependency or set an env var.
You can customize our build process by adding a `build_customization.py` module to your connector.
This module should contain a `pre_connector_install` and `post_connector_install` async function that will mutate the base image and the connector container respectively.
It will be imported at runtime by our build process and the functions will be called if they exist.

Here is an example of a `build_customization.py` module:
```python
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
# Feel free to check the dagger documentation for more information on the Container object and its methods.
# https://dagger-io.readthedocs.io/en/sdk-python-v0.6.4/
from dagger import Container


async def pre_connector_install(base_image_container: Container) -> Container:
return await base_image_container.with_env_variable("MY_PRE_BUILD_ENV_VAR", "my_pre_build_env_var_value")

async def post_connector_install(connector_container: Container) -> Container:
return await connector_container.with_env_variable("MY_POST_BUILD_ENV_VAR", "my_post_build_env_var_value")
```

#### Build your own connector image
This connector is built using our dynamic built process in `airbyte-ci`.
The base image used to build it is defined within the metadata.yaml file under the `connectorBuildOptions`.
The build logic is defined using [Dagger](https://dagger.io/) [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/builds/python_connectors.py).
It does not rely on a Dockerfile.

If you would like to patch our connector and build your own a simple approach would be to:

1. Create your own Dockerfile based on the latest version of the connector image.
```Dockerfile
FROM airbyte/destination-pgvector:latest

COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code

# The entrypoint and default env vars are already set in the base image
# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
```
Please use this as an example. This is not optimized.

2. Build your image:
```bash
docker build -t airbyte/destination-pgvector:dev .
# Running the spec command against your patched connector
docker run airbyte/destination-pgvector:dev spec
```
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-pgvector:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-pgvector:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-pgvector:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=destination-pgvector test
```

### Unit Tests
To run unit tests locally, from the connector directory run:
```
poetry run pytest -s unit_tests
```

### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).

To run integration tests locally, make sure you have a secrets/config.json as explained above, and then run:
```
poetry run pytest -s integration_tests
```

### Customizing acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.

### Using `airbyte-ci` to run tests
See [airbyte-ci documentation](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#connectors-test-command)

## Dependency Management
All of your dependencies should go in `pyproject.toml`
* required for your connector to work need to go to `[tool.poetry.dependencies]` list.
* required for the testing need to go to `[tool.poetry.group.dev.dependencies]` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
acceptance_tests:
spec:
tests:
- spec_path: integration_tests/spec.json
connector_image: airbyte/destination-pgvector:dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# PGVector Destination Connector Bootstrap

This destination does three things:
* Split records into chunks and separates metadata from text data
* Embeds text data into an embedding vector
* Stores the metadata and embedding vector in Postgres DB with PGVector extension enabled

The record processing is using the text split components from https://python.langchain.com/docs/modules/data_connection/document_transformers/.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from .destination import DestinationPGVector

__all__ = ["DestinationPGVector"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Catalog provider implementation.

A catalog provider wraps a configured catalog and configured streams. This class is responsible for
providing information about the catalog and streams. A catalog provider can also be updated with new
streams as they are discovered, providing a thin layer of abstraction over the configured catalog.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, final

from airbyte import exceptions as exc
from airbyte_cdk.models import DestinationSyncMode

if TYPE_CHECKING:
from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream


class CatalogProvider:
"""A catalog provider wraps a configured catalog and configured streams.

This class is responsible for providing information about the catalog and streams.

Note:
- The catalog provider is not responsible for managing the catalog or streams but it may
be updated with new streams as they are discovered.
"""

def __init__(
self,
configured_catalog: ConfiguredAirbyteCatalog,
) -> None:
"""Initialize the catalog manager with a catalog object reference.

Since the catalog is passed by reference, the catalog manager may be updated with new
streams as they are discovered.
"""
self._catalog: ConfiguredAirbyteCatalog = configured_catalog

@property
def configured_catalog(self) -> ConfiguredAirbyteCatalog:
return self._catalog

@property
def stream_names(self) -> list[str]:
return list({stream.stream.name for stream in self.configured_catalog.streams})

def get_configured_stream_info(
self,
stream_name: str,
) -> ConfiguredAirbyteStream:
"""Return the column definitions for the given stream."""
if not self.configured_catalog:
raise exc.PyAirbyteInternalError(
message="Cannot get stream JSON schema without a catalog.",
)

matching_streams: list[ConfiguredAirbyteStream] = [
stream for stream in self.configured_catalog.streams if stream.stream.name == stream_name
]
if not matching_streams:
raise exc.AirbyteStreamNotFoundError(
stream_name=stream_name,
context={
"available_streams": [stream.stream.name for stream in self.configured_catalog.streams],
},
)

if len(matching_streams) > 1:
raise exc.PyAirbyteInternalError(
message="Multiple streams found with same name.",
context={
"stream_name": stream_name,
},
)

return matching_streams[0]

@final
def get_stream_json_schema(
self,
stream_name: str,
) -> dict[str, Any]:
"""Return the column definitions for the given stream."""
return self.get_configured_stream_info(stream_name).stream.json_schema

def get_stream_properties(
self,
stream_name: str,
) -> dict[str, dict]:
"""Return the names of the top-level properties for the given stream."""
return self.get_stream_json_schema(stream_name)["properties"]

def get_destination_sync_mode(
self,
stream_name: str,
) -> DestinationSyncMode:
"""Return the destination sync mode for the given stream."""
return self.get_configured_stream_info(stream_name).destination_sync_mode
Loading
Loading