Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(platform): Add api generator functions and endpoints #8597

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import NamedTuple
import secrets
import hashlib

class APIKeyContainer(NamedTuple):
"""Container for API key parts."""
raw: str
prefix: str
postfix: str
hash: str

class APIKeyManager:
PREFIX: str = "agpt_"
PREFIX_LENGTH: int = 8
POSTFIX_LENGTH: int = 8

def generate_api_key(self) -> APIKeyContainer:
"""Generate a new API key with all its parts."""
raw_key = f"{self.PREFIX}{secrets.token_urlsafe(32)}"
return APIKeyContainer(
raw=raw_key,
prefix=raw_key[:self.PREFIX_LENGTH],
postfix=raw_key[-self.POSTFIX_LENGTH:],
hash=hashlib.sha256(raw_key.encode()).hexdigest()
)

def verify_api_key(self, provided_key: str, stored_hash: str) -> bool:
"""Verify if a provided API key matches the stored hash."""
if not provided_key.startswith(self.PREFIX):
return False
return hashlib.sha256(provided_key.encode()).hexdigest() == stored_hash
203 changes: 203 additions & 0 deletions autogpt_platform/backend/backend/data/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import uuid
Abhi1992002 marked this conversation as resolved.
Show resolved Hide resolved
import logging
from datetime import datetime, timezone
from typing import Optional, List, Union
from enum import Enum
from pydantic import BaseModel

from prisma.models import APIKey as PrismaAPIKey
from backend.data.db import BaseDbModel, transaction
from autogpt_libs.api_key.key_manager import APIKeyManager

logger = logging.getLogger(__name__)

class APIKeyPermission(str, Enum):
EXECUTE_GRAPH = "EXECUTE_GRAPH"
READ_GRAPH = "READ_GRAPH"
EXECUTE_BLOCK = "EXECUTE_BLOCK"
READ_BLOCK = "READ_BLOCK"

class APIKeyStatus(str, Enum):
ACTIVE = "ACTIVE"
REVOKED = "REVOKED"
SUSPENDED = "SUSPENDED"

class APIKey(BaseDbModel):
name: str
prefix: str
key: str
status: APIKeyStatus = APIKeyStatus.ACTIVE
permissions: List[APIKeyPermission]
postfix: str
created_at: datetime
last_used_at: Optional[datetime] = None
revoked_at: Optional[datetime] = None
description: Optional[str] = None
user_id: str

@staticmethod
def from_db(api_key: PrismaAPIKey):
return APIKey(
id=api_key.id,
name=api_key.name,
prefix=api_key.prefix,
postfix=api_key.postfix,
key=api_key.key,
status=APIKeyStatus(api_key.status),
permissions=[APIKeyPermission(p) for p in api_key.permissions],
created_at=api_key.createdAt,
last_used_at=api_key.lastUsedAt,
revoked_at=api_key.revokedAt,
description=api_key.description,
user_id=api_key.userId
)

class APIKeyWithoutHash(BaseModel):
id: str
name: str
prefix: str
postfix: str
status: APIKeyStatus
permissions: List[APIKeyPermission]
created_at: datetime
last_used_at: Optional[datetime]
revoked_at: Optional[datetime]
description: Optional[str]
user_id: str

@staticmethod
def from_db(api_key: PrismaAPIKey):
return APIKeyWithoutHash(
id=api_key.id,
name=api_key.name,
prefix=api_key.prefix,
postfix=api_key.postfix,
status=APIKeyStatus(api_key.status),
permissions=[APIKeyPermission(p) for p in api_key.permissions],
created_at=api_key.createdAt,
last_used_at=api_key.lastUsedAt,
revoked_at=api_key.revokedAt,
description=api_key.description,
user_id=api_key.userId
)

# --------------------- Model functions --------------------- #

async def generate_api_key(
name: str,
user_id: str,
permissions: List[APIKeyPermission],
description: Optional[str] = None
) -> tuple[APIKeyWithoutHash, str]:
"""
Generate a new API key and store it in the database.
Returns the API key object (without hash) and the plain text key.
"""
api_manager = APIKeyManager()
key = api_manager.generate_api_key()

api_key = await PrismaAPIKey.prisma().create(
data={
"id": str(uuid.uuid4()),
"name": name,
"prefix": key.prefix,
"postfix": key.postfix,
"key": key.hash,
"permissions": [p.value for p in permissions],
"description": description,
"userId": user_id
}
)

api_key_without_hash = APIKeyWithoutHash.from_db(api_key)
return api_key_without_hash, key.raw

async def validate_api_key(plain_text_key: str) -> Optional[APIKey]:
"""
Validate an API key and return the API key object if valid.
"""
if not plain_text_key.startswith(APIKeyManager.PREFIX):
return None

prefix = plain_text_key[:APIKeyManager.PREFIX_LENGTH]
api_manager = APIKeyManager()

api_key = await PrismaAPIKey.prisma().find_first(
where={
"prefix": prefix,
"status": APIKeyStatus.ACTIVE.value
}
)

if not api_key:
return None

is_valid = api_manager.verify_api_key(plain_text_key, api_key.key)
if not is_valid:
return None

return APIKey.from_db(api_key)

async def revoke_api_key(key_id: str, user_id: str) -> APIKeyWithoutHash:
api_key = await PrismaAPIKey.prisma().update(
where={
"id": key_id,
"userId": user_id
},
data={
"status": APIKeyStatus.REVOKED.value,
"revokedAt": datetime.now(timezone.utc)
}
)

return APIKeyWithoutHash.from_db(api_key)

async def list_user_api_keys(user_id: str) -> List[APIKeyWithoutHash]:
api_keys = await PrismaAPIKey.prisma().find_many(
where={"userId": user_id},
order={"createdAt": "desc"}
)

return [APIKeyWithoutHash.from_db(key) for key in api_keys]

async def suspend_api_key(key_id: str, user_id: str) -> APIKeyWithoutHash:
api_key = await PrismaAPIKey.prisma().update(
where={
"id": key_id,
"userId": user_id
},
data={"status": APIKeyStatus.SUSPENDED.value}
)

return APIKeyWithoutHash.from_db(api_key)

def has_permission(api_key: APIKey, required_permission: APIKeyPermission) -> bool:
return required_permission in api_key.permissions

async def get_api_key_by_id(key_id: str, user_id: str) -> Optional[APIKeyWithoutHash]:
api_key = await PrismaAPIKey.prisma().find_first(
where={
"id": key_id,
"userId": user_id
}
)

return APIKeyWithoutHash.from_db(api_key) if api_key else None

async def update_api_key_permissions(
key_id: str,
user_id: str,
permissions: List[APIKeyPermission]
) -> APIKeyWithoutHash:
"""
Update the permissions of an API key.
"""
api_key = await PrismaAPIKey.prisma().update(
where={
"id": key_id,
"userId": user_id
},
data={"permissions": [p.value for p in permissions]}
)

return APIKeyWithoutHash.from_db(api_key)
111 changes: 110 additions & 1 deletion autogpt_platform/backend/backend/server/routers/v1.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import logging
from collections import defaultdict
from typing import Annotated, Any, Dict
from typing import Annotated, Any, Dict, List, Optional

from autogpt_libs.auth.middleware import auth_middleware
from autogpt_libs.utils.cache import thread_cached
from fastapi import APIRouter, Body, Depends, HTTPException
from typing_extensions import TypedDict
from pydantic import BaseModel

import backend.data.block
import backend.server.integrations.router
Expand All @@ -16,6 +17,7 @@
from backend.data.block import BlockInput, CompletedBlockOutput
from backend.data.credit import get_block_costs, get_user_credit_model
from backend.data.user import get_or_create_user
from backend.data.api import APIKeyPermission, APIKeyWithoutHash, generate_api_key, get_api_key_by_id, list_user_api_keys, revoke_api_key, suspend_api_key, update_api_key_permissions
from backend.executor import ExecutionManager, ExecutionScheduler
from backend.server.model import CreateGraph, SetGraphActiveVersion
from backend.server.utils import get_user_id
Expand Down Expand Up @@ -521,3 +523,110 @@ async def update_configuration(
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))


########################################################
##################### API ##############################
Abhi1992002 marked this conversation as resolved.
Show resolved Hide resolved
########################################################

class CreateAPIKeyRequest(BaseModel):
Abhi1992002 marked this conversation as resolved.
Show resolved Hide resolved
name: str
permissions: List[APIKeyPermission]
description: Optional[str] = None

class CreateAPIKeyResponse(BaseModel):
api_key: APIKeyWithoutHash
plain_text_key: str


@v1_router.post(
"/api-keys",
response_model=CreateAPIKeyResponse,
tags=["api-keys"],
dependencies=[Depends(auth_middleware)],
)
async def create_api_key(
request: CreateAPIKeyRequest,
user_id: Annotated[str, Depends(get_user_id)]
) -> CreateAPIKeyResponse:
"""Create a new API key"""
api_key, plain_text = await generate_api_key(
name=request.name,
user_id=user_id,
permissions=request.permissions,
description=request.description
)

return CreateAPIKeyResponse(api_key=api_key, plain_text_key=plain_text)

@v1_router.get(
"/api-keys",
response_model=List[APIKeyWithoutHash],
tags=["api-keys"],
dependencies=[Depends(auth_middleware)],
)
async def get_api_keys(
user_id: Annotated[str, Depends(get_user_id)]
) -> List[APIKeyWithoutHash]:
"""List all API keys for the user"""
return await list_user_api_keys(user_id)

@v1_router.get(
"/api-keys/{key_id}",
response_model=APIKeyWithoutHash,
tags=["api-keys"],
dependencies=[Depends(auth_middleware)],
)
async def get_api_key(
key_id: str,
user_id: Annotated[str, Depends(get_user_id)]
) -> APIKeyWithoutHash:
"""Get a specific API key"""
api_key = await get_api_key_by_id(key_id, user_id)
if not api_key:
raise HTTPException(status_code=404, detail="API key not found")
return api_key

@v1_router.delete(
"/api-keys/{key_id}",
response_model=APIKeyWithoutHash,
tags=["api-keys"],
dependencies=[Depends(auth_middleware)],
)
async def delete_api_key(
key_id: str,
user_id: Annotated[str, Depends(get_user_id)]
) -> APIKeyWithoutHash:
"""Revoke an API key"""
return await revoke_api_key(key_id, user_id)

@v1_router.post(
"/api-keys/{key_id}/suspend",
response_model=APIKeyWithoutHash,
tags=["api-keys"],
dependencies=[Depends(auth_middleware)],
)
async def suspend_key(
key_id: str,
user_id: Annotated[str, Depends(get_user_id)]
) -> APIKeyWithoutHash:
"""Suspend an API key"""
return await suspend_api_key(key_id, user_id)

@v1_router.put(
"/api-keys/{key_id}/permissions",
response_model=APIKeyWithoutHash,
tags=["api-keys"],
dependencies=[Depends(auth_middleware)],
)

class UpdatePermissionsRequest(BaseModel):
permissions: List[APIKeyPermission]

async def update_permissions(
key_id: str,
request: UpdatePermissionsRequest,
user_id: Annotated[str, Depends(get_user_id)]
) -> APIKeyWithoutHash:
"""Update API key permissions"""
return await update_api_key_permissions(key_id, user_id, request.permissions)
Abhi1992002 marked this conversation as resolved.
Show resolved Hide resolved
Loading