Skip to content

Commit

Permalink
feat(platform, blocks): Webhook-triggered blocks (#8358)
Browse files Browse the repository at this point in the history
- feat(blocks): Add GitHub Pull Request Trigger block

## feat(platform): Add support for Webhook-triggered blocks
- ⚠️ Add `PLATFORM_BASE_URL` setting

- Add webhook config option and `BlockType.WEBHOOK` to `Block`
  - Add check to `Block.__init__` to enforce type and shape of webhook event filter
  - Add check to `Block.__init__` to enforce `payload` input on webhook blocks
  - Add check to `Block.__init__` to disable webhook blocks if `PLATFORM_BASE_URL` is not set

- Add `Webhook` model + CRUD functions in `backend.data.integrations` to represent webhooks created by our system
  - Add `IntegrationWebhook` to DB schema + reference `AgentGraphNode.webhook_id`
    - Add `set_node_webhook(..)` in `backend.data.graph`

- Add webhook-related endpoints:
  - `POST /integrations/{provider}/webhooks/{webhook_id}/ingress` endpoint, to receive webhook payloads, and for all associated nodes create graph executions
    - Add `Node.is_triggered_by_event_type(..)` helper method
  - `POST /integrations/{provider}/webhooks/{webhook_id}/ping` endpoint, to allow testing a webhook
  - Add `WebhookEvent` + pub/sub functions in `backend.data.integrations`

- Add `backend.integrations.webhooks` module, including:
  - `graph_lifecycle_hooks`, e.g. `on_graph_activate(..)`, to handle corresponding webhook creation etc.
    - Add calls to these hooks in the graph create/update endpoints
  - `BaseWebhooksManager` + `GithubWebhooksManager` to handle creating + registering, removing + deregistering, and retrieving existing webhooks, and validating incoming payloads

## Other improvements
- fix(blocks): Allow having an input and output pin with the same name
- fix(blocks): Add tooltip with description in places where block inputs are rendered without `NodeHandle`
- feat(blocks): Allow hiding inputs (e.g. `payload`) with `SchemaField(hidden=True)`
- fix(frontend): Fix `MultiSelector` component styling
- feat(frontend): Add `AlertDialog` UI component
- feat(frontend): Add `NodeMultiSelectInput` component
- feat(backend/data): Add `NodeModel` with `graph_id`, `graph_version`; `GraphModel` with `user_id`
  - Add `make_graph_model(..)` helper function in `backend.data.graph`
- refactor(backend/data): Make `RedisEventQueue` generic and move to `backend.data.execution`
- refactor(frontend): Deduplicate & clean up code for different block types in `generateInputHandles(..)` in `CustomNode`
- dx(backend): Add `MissingConfigError`, `NeedConfirmation` exception

---------

Co-authored-by: Zamil Majdy <[email protected]>
  • Loading branch information
Pwuts and majdyz authored Nov 25, 2024
1 parent 464b530 commit eef9bbe
Show file tree
Hide file tree
Showing 44 changed files with 2,787 additions and 301 deletions.
11 changes: 9 additions & 2 deletions autogpt_platform/backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ SUPABASE_URL=http://localhost:8000
SUPABASE_SERVICE_ROLE_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q
SUPABASE_JWT_SECRET=your-super-secret-jwt-token-with-at-least-32-characters-long

# For local development, you may need to set FRONTEND_BASE_URL for the OAuth flow for integrations to work.
FRONTEND_BASE_URL=http://localhost:3000
## For local development, you may need to set FRONTEND_BASE_URL for the OAuth flow
## for integrations to work. Defaults to the value of PLATFORM_BASE_URL if not set.
# FRONTEND_BASE_URL=http://localhost:3000

## PLATFORM_BASE_URL must be set to a *publicly accessible* URL pointing to your backend
## to use the platform's webhook-related functionality.
## If you are developing locally, you can use something like ngrok to get a publc URL
## and tunnel it to your locally running backend.
PLATFORM_BASE_URL=https://your-public-url-here

## == INTEGRATION CREDENTIALS == ##
# Each set of server side credentials is required for the corresponding 3rd party
Expand Down
7 changes: 0 additions & 7 deletions autogpt_platform/backend/backend/blocks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ def all_subclasses(cls: Type[T]) -> list[Type[T]]:
input_schema = block.input_schema.model_fields
output_schema = block.output_schema.model_fields

# Prevent duplicate field name in input_schema and output_schema
duplicate_field_names = set(input_schema.keys()) & set(output_schema.keys())
if duplicate_field_names:
raise ValueError(
f"{block.name} has duplicate field names in input_schema and output_schema: {duplicate_field_names}"
)

# Make sure `error` field is a string in the output schema
if "error" in output_schema and output_schema["error"].annotation is not str:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion autogpt_platform/backend/backend/blocks/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_executor_manager_client():

@thread_cached
def get_event_bus():
from backend.data.queue import RedisExecutionEventBus
from backend.data.execution import RedisExecutionEventBus

return RedisExecutionEventBus()

Expand Down

Large diffs are not rendered by default.

156 changes: 156 additions & 0 deletions autogpt_platform/backend/backend/blocks/github/triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import json
import logging
from pathlib import Path

from pydantic import BaseModel

from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchema,
BlockWebhookConfig,
)
from backend.data.model import SchemaField

from ._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
GithubCredentialsField,
GithubCredentialsInput,
)

logger = logging.getLogger(__name__)


# --8<-- [start:GithubTriggerExample]
class GitHubTriggerBase:
class Input(BlockSchema):
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
repo: str = SchemaField(
description=(
"Repository to subscribe to.\n\n"
"**Note:** Make sure your GitHub credentials have permissions "
"to create webhooks on this repo."
),
placeholder="{owner}/{repo}",
)
# --8<-- [start:example-payload-field]
payload: dict = SchemaField(hidden=True, default={})
# --8<-- [end:example-payload-field]

class Output(BlockSchema):
payload: dict = SchemaField(
description="The complete webhook payload that was received from GitHub. "
"Includes information about the affected resource (e.g. pull request), "
"the event, and the user who triggered the event."
)
triggered_by_user: dict = SchemaField(
description="Object representing the GitHub user who triggered the event"
)
error: str = SchemaField(
description="Error message if the payload could not be processed"
)

def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "payload", input_data.payload
yield "triggered_by_user", input_data.payload["sender"]


class GithubPullRequestTriggerBlock(GitHubTriggerBase, Block):
EXAMPLE_PAYLOAD_FILE = (
Path(__file__).parent / "example_payloads" / "pull_request.synchronize.json"
)

# --8<-- [start:example-event-filter]
class Input(GitHubTriggerBase.Input):
class EventsFilter(BaseModel):
"""
https://docs.github.com/en/webhooks/webhook-events-and-payloads#pull_request
"""

opened: bool = False
edited: bool = False
closed: bool = False
reopened: bool = False
synchronize: bool = False
assigned: bool = False
unassigned: bool = False
labeled: bool = False
unlabeled: bool = False
converted_to_draft: bool = False
locked: bool = False
unlocked: bool = False
enqueued: bool = False
dequeued: bool = False
milestoned: bool = False
demilestoned: bool = False
ready_for_review: bool = False
review_requested: bool = False
review_request_removed: bool = False
auto_merge_enabled: bool = False
auto_merge_disabled: bool = False

events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
# --8<-- [end:example-event-filter]

class Output(GitHubTriggerBase.Output):
event: str = SchemaField(
description="The PR event that triggered the webhook (e.g. 'opened')"
)
number: int = SchemaField(description="The number of the affected pull request")
pull_request: dict = SchemaField(
description="Object representing the affected pull request"
)
pull_request_url: str = SchemaField(
description="The URL of the affected pull request"
)

def __init__(self):
from backend.integrations.webhooks.github import GithubWebhookType

example_payload = json.loads(self.EXAMPLE_PAYLOAD_FILE.read_text())

super().__init__(
id="6c60ec01-8128-419e-988f-96a063ee2fea",
description="This block triggers on pull request events and outputs the event type and payload.",
categories={BlockCategory.DEVELOPER_TOOLS, BlockCategory.INPUT},
input_schema=GithubPullRequestTriggerBlock.Input,
output_schema=GithubPullRequestTriggerBlock.Output,
# --8<-- [start:example-webhook_config]
webhook_config=BlockWebhookConfig(
provider="github",
webhook_type=GithubWebhookType.REPO,
resource_format="{repo}",
event_filter_input="events",
event_format="pull_request.{event}",
),
# --8<-- [end:example-webhook_config]
test_input={
"repo": "Significant-Gravitas/AutoGPT",
"events": {"opened": True, "synchronize": True},
"credentials": TEST_CREDENTIALS_INPUT,
"payload": example_payload,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", example_payload),
("triggered_by_user", example_payload["sender"]),
("event", example_payload["action"]),
("number", example_payload["number"]),
("pull_request", example_payload["pull_request"]),
("pull_request_url", example_payload["pull_request"]["html_url"]),
],
)

def run(self, input_data: Input, **kwargs) -> BlockOutput: # type: ignore
yield from super().run(input_data, **kwargs)
yield "event", input_data.payload["action"]
yield "number", input_data.payload["number"]
yield "pull_request", input_data.payload["pull_request"]
yield "pull_request_url", input_data.payload["pull_request"]["html_url"]


# --8<-- [end:GithubTriggerExample]
71 changes: 70 additions & 1 deletion autogpt_platform/backend/backend/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
from pydantic import BaseModel

from backend.util import json
from backend.util.settings import Config

from .model import CREDENTIALS_FIELD_NAME, ContributorDetails, CredentialsMetaInput

app_config = Config()

BlockData = tuple[str, Any] # Input & Output data should be a tuple of (name, data).
BlockInput = dict[str, Any] # Input: 1 input pin consumes 1 data.
BlockOutput = Generator[BlockData, None, None] # Output: 1 output pin produces n data.
Expand All @@ -34,6 +37,7 @@ class BlockType(Enum):
INPUT = "Input"
OUTPUT = "Output"
NOTE = "Note"
WEBHOOK = "Webhook"
AGENT = "Agent"


Expand Down Expand Up @@ -177,6 +181,41 @@ class EmptySchema(BlockSchema):
pass


# --8<-- [start:BlockWebhookConfig]
class BlockWebhookConfig(BaseModel):
provider: str
"""The service provider that the webhook connects to"""

webhook_type: str
"""
Identifier for the webhook type. E.g. GitHub has repo and organization level hooks.
Only for use in the corresponding `WebhooksManager`.
"""

resource_format: str
"""
Template string for the resource that a block instance subscribes to.
Fields will be filled from the block's inputs (except `payload`).
Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented)
Only for use in the corresponding `WebhooksManager`.
"""

event_filter_input: str
"""Name of the block's event filter input."""

event_format: str = "{event}"
"""
Template string for the event(s) that a block instance subscribes to.
Applied individually to each event selected in the event filter input.
Example: `"pull_request.{event}"` -> `"pull_request.opened"`
"""
# --8<-- [end:BlockWebhookConfig]


class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
def __init__(
self,
Expand All @@ -193,6 +232,7 @@ def __init__(
disabled: bool = False,
static_output: bool = False,
block_type: BlockType = BlockType.STANDARD,
webhook_config: Optional[BlockWebhookConfig] = None,
):
"""
Initialize the block with the given schema.
Expand Down Expand Up @@ -223,9 +263,38 @@ def __init__(
self.contributors = contributors or set()
self.disabled = disabled
self.static_output = static_output
self.block_type = block_type
self.block_type = block_type if not webhook_config else BlockType.WEBHOOK
self.webhook_config = webhook_config
self.execution_stats = {}

if self.webhook_config:
# Enforce shape of webhook event filter
event_filter_field = self.input_schema.model_fields[
self.webhook_config.event_filter_input
]
if not (
isinstance(event_filter_field.annotation, type)
and issubclass(event_filter_field.annotation, BaseModel)
and all(
field.annotation is bool
for field in event_filter_field.annotation.model_fields.values()
)
):
raise NotImplementedError(
f"{self.name} has an invalid webhook event selector: "
"field must be a BaseModel and all its fields must be boolean"
)

# Enforce presence of 'payload' input
if "payload" not in self.input_schema.model_fields:
raise TypeError(
f"{self.name} is webhook-triggered but has no 'payload' input"
)

# Disable webhook-triggered block if webhook functionality not available
if not app_config.platform_base_url:
self.disabled = True

@classmethod
def create(cls: Type["Block"]) -> "Block":
return cls()
Expand Down
44 changes: 42 additions & 2 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import defaultdict
from datetime import datetime, timezone
from multiprocessing import Manager
from typing import Any, Generic, TypeVar
from typing import Any, AsyncGenerator, Generator, Generic, TypeVar

from prisma.enums import AgentExecutionStatus
from prisma.models import (
Expand All @@ -14,7 +14,9 @@

from backend.data.block import BlockData, BlockInput, CompletedBlockOutput
from backend.data.includes import EXECUTION_RESULT_INCLUDE, GRAPH_EXECUTION_INCLUDE
from backend.data.queue import AsyncRedisEventBus, RedisEventBus
from backend.util import json, mock
from backend.util.settings import Config


class GraphExecution(BaseModel):
Expand Down Expand Up @@ -271,7 +273,6 @@ async def update_graph_execution_stats(
graph_exec_id: str,
stats: dict[str, Any],
) -> ExecutionResult:

status = ExecutionStatus.FAILED if stats.get("error") else ExecutionStatus.COMPLETED
res = await AgentGraphExecution.prisma().update(
where={"id": graph_exec_id},
Expand Down Expand Up @@ -471,3 +472,42 @@ async def get_incomplete_executions(
include=EXECUTION_RESULT_INCLUDE,
)
return [ExecutionResult.from_db(execution) for execution in executions]


# --------------------- Event Bus --------------------- #

config = Config()


class RedisExecutionEventBus(RedisEventBus[ExecutionResult]):
Model = ExecutionResult

@property
def event_bus_name(self) -> str:
return config.execution_event_bus_name

def publish(self, res: ExecutionResult):
self.publish_event(res, f"{res.graph_id}/{res.graph_exec_id}")

def listen(
self, graph_id: str = "*", graph_exec_id: str = "*"
) -> Generator[ExecutionResult, None, None]:
for execution_result in self.listen_events(f"{graph_id}/{graph_exec_id}"):
yield execution_result


class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionResult]):
Model = ExecutionResult

@property
def event_bus_name(self) -> str:
return config.execution_event_bus_name

async def publish(self, res: ExecutionResult):
await self.publish_event(res, f"{res.graph_id}/{res.graph_exec_id}")

async def listen(
self, graph_id: str = "*", graph_exec_id: str = "*"
) -> AsyncGenerator[ExecutionResult, None]:
async for execution_result in self.listen_events(f"{graph_id}/{graph_exec_id}"):
yield execution_result
Loading

0 comments on commit eef9bbe

Please sign in to comment.