Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blocks): Pinecone blocks #8535

Merged
merged 15 commits into from
Nov 8, 2024
Merged
123 changes: 110 additions & 13 deletions autogpt_platform/backend/backend/blocks/pinecone.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Literal
import uuid
from typing import Any, Literal

from autogpt_libs.supabase_integration_credentials_store import APIKeyCredentials
from pinecone import Pinecone, ServerlessSpec
Expand Down Expand Up @@ -98,10 +99,14 @@ class Input(BlockSchema):
include_metadata: bool = SchemaField(
description="Whether to include metadata in the response", default=True
)
host: str = SchemaField(description="Host for pinecone")
host: str = SchemaField(description="Host for pinecone", default="")
idx_name: str = SchemaField(description="Index name for pinecone")

class Output(BlockSchema):
results: dict = SchemaField(description="Query results from Pinecone")
results: Any = SchemaField(description="Query results from Pinecone")
combined_results: Any = SchemaField(
description="Combined results from Pinecone"
)

def __init__(self):
super().__init__(
Expand All @@ -119,13 +124,105 @@ def run(
credentials: APIKeyCredentials,
**kwargs,
) -> BlockOutput:
pc = Pinecone(api_key=credentials.api_key.get_secret_value())
idx = pc.Index(host=input_data.host)
results = idx.query(
namespace=input_data.namespace,
vector=input_data.query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata,
)
yield "results", results
try:
# Create a new client instance
pc = Pinecone(api_key=credentials.api_key.get_secret_value())

# Get the index
idx = pc.Index(input_data.idx_name)

# Ensure query_vector is in correct format
query_vector = input_data.query_vector
if isinstance(query_vector, list) and len(query_vector) > 0:
if isinstance(query_vector[0], list):
query_vector = query_vector[0]

results = idx.query(
namespace=input_data.namespace,
vector=query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata,
).to_dict()
combined_text = ""
if results["matches"]:
texts = [
match["metadata"]["text"]
for match in results["matches"]
if match.get("metadata", {}).get("text")
]
combined_text = "\n\n".join(texts)

# Return both the raw matches and combined text
yield "results", {
"matches": results["matches"],
"combined_text": combined_text,
}
yield "combined_results", combined_text

except Exception as e:
error_msg = f"Error querying Pinecone: {str(e)}"
raise RuntimeError(error_msg) from e


class PineconeInsertBlock(Block):
class Input(BlockSchema):
credentials: PineconeCredentialsInput = PineconeCredentialsField()
index: str = SchemaField(description="Initialized Pinecone index")
chunks: list = SchemaField(description="List of text chunks to ingest")
embeddings: list = SchemaField(
description="List of embeddings corresponding to the chunks"
)
namespace: str = SchemaField(
description="Namespace to use in Pinecone", default=""
)
metadata: dict = SchemaField(
description="Additional metadata to store with each vector", default={}
)

class Output(BlockSchema):
upsert_response: str = SchemaField(
description="Response from Pinecone upsert operation"
)

def __init__(self):
super().__init__(
id="477f2168-cd91-475a-8146-9499a5982434",
description="Upload data to a Pinecone index",
categories={BlockCategory.LOGIC},
input_schema=PineconeInsertBlock.Input,
output_schema=PineconeInsertBlock.Output,
)

def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs,
) -> BlockOutput:
try:
# Create a new client instance
pc = Pinecone(api_key=credentials.api_key.get_secret_value())

# Get the index
idx = pc.Index(input_data.index)

vectors = []
for chunk, embedding in zip(input_data.chunks, input_data.embeddings):
vector_metadata = input_data.metadata.copy()
vector_metadata["text"] = chunk
vectors.append(
{
"id": str(uuid.uuid4()),
"values": embedding,
"metadata": vector_metadata,
}
)
idx.upsert(vectors=vectors, namespace=input_data.namespace)

yield "upsert_response", "successfully upserted"

except Exception as e:
error_msg = f"Error uploading to Pinecone: {str(e)}"
raise RuntimeError(error_msg) from e
Loading