Skip to content

Commit

Permalink
feat(platform): Support manually setting up webhooks (#8750)
Browse files Browse the repository at this point in the history
- Resolves #8748

The webhooks system as is works really well for full blown enterprise
webhooks managed via a UI. It does not work for more "chill guy" webhook
tools that just send notifications sometimes.

## Changes 🏗️

- feat(blocks): Add Compass transcription trigger block

- feat(backend): Amend webhooks system to support manual-set-up webhooks
   - Make event filter input optional on webhook-triggered nodes
   - Make credentials optional on webhook-triggered nodes
   - Add code path to re-use existing manual webhook on graph update
   - Add `ManualWebhookManagerBase`

- feat(frontend): Add UI to pass webhook URL to user on manual-set-up
webhook blocks

![image](https://github.com/user-attachments/assets/1c35f161-7fe4-4916-8506-5ca9a838f398)

- fix(backend): Strip webhook info from node objects for graph export

- refactor(backend): Rename `backend.integrations.webhooks.base` to
`._base`

---------

Co-authored-by: Reinier van der Leer <[email protected]>
Co-authored-by: Zamil Majdy <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent 89a9354 commit 746f3d4
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 132 deletions.
59 changes: 59 additions & 0 deletions autogpt_platform/backend/backend/blocks/compass/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from pydantic import BaseModel

from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
)
from backend.data.model import SchemaField
from backend.integrations.webhooks.compass import CompassWebhookType


class Transcription(BaseModel):
text: str
speaker: str
end: float
start: float
duration: float


class TranscriptionDataModel(BaseModel):
date: str
transcription: str
transcriptions: list[Transcription]


class CompassAITriggerBlock(Block):
class Input(BlockSchema):
payload: TranscriptionDataModel = SchemaField(hidden=True)

class Output(BlockSchema):
transcription: str = SchemaField(
description="The contents of the compass transcription."
)

def __init__(self):
super().__init__(
id="9464a020-ed1d-49e1-990f-7f2ac924a2b7",
description="This block will output the contents of the compass transcription.",
categories={BlockCategory.HARDWARE},
input_schema=CompassAITriggerBlock.Input,
output_schema=CompassAITriggerBlock.Output,
webhook_config=BlockManualWebhookConfig(
provider="compass",
webhook_type=CompassWebhookType.TRANSCRIPTION,
),
test_input=[
{"input": "Hello, World!"},
{"input": "Hello, World!", "data": "Existing Data"},
],
# test_output=[
# ("output", "Hello, World!"), # No data provided, so trigger is returned
# ("output", "Existing Data"), # Data is provided, so data is returned.
# ],
)

def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "transcription", input_data.payload.transcription
86 changes: 57 additions & 29 deletions autogpt_platform/backend/backend/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BlockType(Enum):
OUTPUT = "Output"
NOTE = "Note"
WEBHOOK = "Webhook"
WEBHOOK_MANUAL = "Webhook (manual)"
AGENT = "Agent"


Expand All @@ -57,6 +58,7 @@ class BlockCategory(Enum):
COMMUNICATION = "Block that interacts with communication platforms."
DEVELOPER_TOOLS = "Developer tools such as GitHub blocks."
DATA = "Block that interacts with structured data."
HARDWARE = "Block that interacts with hardware."
AGENT = "Block that interacts with other agents."
CRM = "Block that interacts with CRM services."

Expand Down Expand Up @@ -197,7 +199,12 @@ class EmptySchema(BlockSchema):


# --8<-- [start:BlockWebhookConfig]
class BlockWebhookConfig(BaseModel):
class BlockManualWebhookConfig(BaseModel):
"""
Configuration model for webhook-triggered blocks on which
the user has to manually set up the webhook at the provider.
"""

provider: str
"""The service provider that the webhook connects to"""

Expand All @@ -208,26 +215,36 @@ class BlockWebhookConfig(BaseModel):
Only for use in the corresponding `WebhooksManager`.
"""

resource_format: str
event_filter_input: str = ""
"""
Template string for the resource that a block instance subscribes to.
Fields will be filled from the block's inputs (except `payload`).
Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented)
Only for use in the corresponding `WebhooksManager`.
Name of the block's event filter input.
Leave empty if the corresponding webhook doesn't have distinct event/payload types.
"""

event_filter_input: str
"""Name of the block's event filter input."""

event_format: str = "{event}"
"""
Template string for the event(s) that a block instance subscribes to.
Applied individually to each event selected in the event filter input.
Example: `"pull_request.{event}"` -> `"pull_request.opened"`
"""


class BlockWebhookConfig(BlockManualWebhookConfig):
"""
Configuration model for webhook-triggered blocks for which
the webhook can be automatically set up through the provider's API.
"""

resource_format: str
"""
Template string for the resource that a block instance subscribes to.
Fields will be filled from the block's inputs (except `payload`).
Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented)
Only for use in the corresponding `WebhooksManager`.
"""
# --8<-- [end:BlockWebhookConfig]


Expand All @@ -247,7 +264,7 @@ def __init__(
disabled: bool = False,
static_output: bool = False,
block_type: BlockType = BlockType.STANDARD,
webhook_config: Optional[BlockWebhookConfig] = None,
webhook_config: Optional[BlockWebhookConfig | BlockManualWebhookConfig] = None,
):
"""
Initialize the block with the given schema.
Expand Down Expand Up @@ -278,27 +295,38 @@ def __init__(
self.contributors = contributors or set()
self.disabled = disabled
self.static_output = static_output
self.block_type = block_type if not webhook_config else BlockType.WEBHOOK
self.block_type = block_type
self.webhook_config = webhook_config
self.execution_stats = {}

if self.webhook_config:
# Enforce shape of webhook event filter
event_filter_field = self.input_schema.model_fields[
self.webhook_config.event_filter_input
]
if not (
isinstance(event_filter_field.annotation, type)
and issubclass(event_filter_field.annotation, BaseModel)
and all(
field.annotation is bool
for field in event_filter_field.annotation.model_fields.values()
)
):
raise NotImplementedError(
f"{self.name} has an invalid webhook event selector: "
"field must be a BaseModel and all its fields must be boolean"
)
if isinstance(self.webhook_config, BlockWebhookConfig):
# Enforce presence of credentials field on auto-setup webhook blocks
if CREDENTIALS_FIELD_NAME not in self.input_schema.model_fields:
raise TypeError(
"credentials field is required on auto-setup webhook blocks"
)
self.block_type = BlockType.WEBHOOK
else:
self.block_type = BlockType.WEBHOOK_MANUAL

# Enforce shape of webhook event filter, if present
if self.webhook_config.event_filter_input:
event_filter_field = self.input_schema.model_fields[
self.webhook_config.event_filter_input
]
if not (
isinstance(event_filter_field.annotation, type)
and issubclass(event_filter_field.annotation, BaseModel)
and all(
field.annotation is bool
for field in event_filter_field.annotation.model_fields.values()
)
):
raise NotImplementedError(
f"{self.name} has an invalid webhook event selector: "
"field must be a BaseModel and all its fields must be boolean"
)

# Enforce presence of 'payload' input
if "payload" not in self.input_schema.model_fields:
Expand Down
57 changes: 36 additions & 21 deletions autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def is_triggered_by_event_type(self, event_type: str) -> bool:
raise ValueError(f"Block #{self.block_id} not found for node #{self.id}")
if not block.webhook_config:
raise TypeError("This method can't be used on non-webhook blocks")
if not block.webhook_config.event_filter_input:
return True
event_filter = self.input_default.get(block.webhook_config.event_filter_input)
if not event_filter:
raise ValueError(f"Event filter is not configured on node #{self.id}")
Expand Down Expand Up @@ -268,11 +270,19 @@ def sanitize(name):
+ [sanitize(link.sink_name) for link in input_links.get(node.id, [])]
)
for name in block.input_schema.get_required_fields():
if name not in provided_inputs and (
for_run # Skip input completion validation, unless when executing.
or block.block_type == BlockType.INPUT
or block.block_type == BlockType.OUTPUT
or block.block_type == BlockType.AGENT
if (
name not in provided_inputs
and not (
name == "payload"
and block.block_type
in (BlockType.WEBHOOK, BlockType.WEBHOOK_MANUAL)
)
and (
for_run # Skip input completion validation, unless when executing.
or block.block_type == BlockType.INPUT
or block.block_type == BlockType.OUTPUT
or block.block_type == BlockType.AGENT
)
):
raise ValueError(
f"Node {block.name} #{node.id} required input missing: `{name}`"
Expand All @@ -292,7 +302,6 @@ def has_value(name):

# Validate dependencies between fields
for field_name, field_info in input_schema.items():

# Apply input dependency validation only on run & field with depends_on
json_schema_extra = field_info.json_schema_extra or {}
dependencies = json_schema_extra.get("depends_on", [])
Expand Down Expand Up @@ -359,7 +368,7 @@ def is_static_output_block(nid: str) -> bool:
link.is_static = True # Each value block output should be static.

@staticmethod
def from_db(graph: AgentGraph, hide_credentials: bool = False):
def from_db(graph: AgentGraph, for_export: bool = False):
return GraphModel(
id=graph.id,
user_id=graph.userId,
Expand All @@ -369,7 +378,7 @@ def from_db(graph: AgentGraph, hide_credentials: bool = False):
name=graph.name or "",
description=graph.description or "",
nodes=[
GraphModel._process_node(node, hide_credentials)
NodeModel.from_db(GraphModel._process_node(node, for_export))
for node in graph.AgentNodes or []
],
links=list(
Expand All @@ -382,23 +391,29 @@ def from_db(graph: AgentGraph, hide_credentials: bool = False):
)

@staticmethod
def _process_node(node: AgentNode, hide_credentials: bool) -> NodeModel:
node_dict = {field: getattr(node, field) for field in node.model_fields}
if hide_credentials and "constantInput" in node_dict:
constant_input = json.loads(
node_dict["constantInput"], target_type=dict[str, Any]
)
constant_input = GraphModel._hide_credentials_in_input(constant_input)
node_dict["constantInput"] = json.dumps(constant_input)
return NodeModel.from_db(AgentNode(**node_dict))
def _process_node(node: AgentNode, for_export: bool) -> AgentNode:
if for_export:
# Remove credentials from node input
if node.constantInput:
constant_input = json.loads(
node.constantInput, target_type=dict[str, Any]
)
constant_input = GraphModel._hide_node_input_credentials(constant_input)
node.constantInput = json.dumps(constant_input)

# Remove webhook info
node.webhookId = None
node.Webhook = None

return node

@staticmethod
def _hide_credentials_in_input(input_data: dict[str, Any]) -> dict[str, Any]:
def _hide_node_input_credentials(input_data: dict[str, Any]) -> dict[str, Any]:
sensitive_keys = ["credentials", "api_key", "password", "token", "secret"]
result = {}
for key, value in input_data.items():
if isinstance(value, dict):
result[key] = GraphModel._hide_credentials_in_input(value)
result[key] = GraphModel._hide_node_input_credentials(value)
elif isinstance(value, str) and any(
sensitive_key in key.lower() for sensitive_key in sensitive_keys
):
Expand Down Expand Up @@ -495,7 +510,7 @@ async def get_graph(
version: int | None = None,
template: bool = False,
user_id: str | None = None,
hide_credentials: bool = False,
for_export: bool = False,
) -> GraphModel | None:
"""
Retrieves a graph from the DB.
Expand All @@ -521,7 +536,7 @@ async def get_graph(
include=AGENT_GRAPH_INCLUDE,
order={"version": "desc"},
)
return GraphModel.from_db(graph, hide_credentials) if graph else None
return GraphModel.from_db(graph, for_export) if graph else None


async def set_graph_active_version(graph_id: str, version: int, user_id: str) -> None:
Expand Down
30 changes: 27 additions & 3 deletions autogpt_platform/backend/backend/data/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

from prisma import Json
from prisma.models import IntegrationWebhook
from pydantic import Field
from pydantic import Field, computed_field

from backend.data.includes import INTEGRATION_WEBHOOK_INCLUDE
from backend.data.queue import AsyncRedisEventBus
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks.utils import webhook_ingress_url

from .db import BaseDbModel

Expand All @@ -31,6 +32,11 @@ class Webhook(BaseDbModel):

attached_nodes: Optional[list["NodeModel"]] = None

@computed_field
@property
def url(self) -> str:
return webhook_ingress_url(self.provider.value, self.id)

@staticmethod
def from_db(webhook: IntegrationWebhook):
from .graph import NodeModel
Expand Down Expand Up @@ -84,16 +90,18 @@ async def get_webhook(webhook_id: str) -> Webhook:
return Webhook.from_db(webhook)


async def get_all_webhooks(credentials_id: str) -> list[Webhook]:
async def get_all_webhooks_by_creds(credentials_id: str) -> list[Webhook]:
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
if not credentials_id:
raise ValueError("credentials_id must not be empty")
webhooks = await IntegrationWebhook.prisma().find_many(
where={"credentialsId": credentials_id},
include=INTEGRATION_WEBHOOK_INCLUDE,
)
return [Webhook.from_db(webhook) for webhook in webhooks]


async def find_webhook(
async def find_webhook_by_credentials_and_props(
credentials_id: str, webhook_type: str, resource: str, events: list[str]
) -> Webhook | None:
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
Expand All @@ -109,6 +117,22 @@ async def find_webhook(
return Webhook.from_db(webhook) if webhook else None


async def find_webhook_by_graph_and_props(
graph_id: str, provider: str, webhook_type: str, events: list[str]
) -> Webhook | None:
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
webhook = await IntegrationWebhook.prisma().find_first(
where={
"provider": provider,
"webhookType": webhook_type,
"events": {"has_every": events},
"AgentNodes": {"some": {"agentGraphId": graph_id}},
},
include=INTEGRATION_WEBHOOK_INCLUDE,
)
return Webhook.from_db(webhook) if webhook else None


async def update_webhook_config(webhook_id: str, updated_config: dict) -> Webhook:
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
_updated_webhook = await IntegrationWebhook.prisma().update(
Expand Down
Loading

0 comments on commit 746f3d4

Please sign in to comment.