Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): Fix error pin output not being propagated into the next nodes #8408

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,19 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
# credentials is not supported.
credentials = creds_lock = None
majdyz marked this conversation as resolved.
Show resolved Hide resolved
creds_lock = None
if CREDENTIALS_FIELD_NAME in input_data:
credentials_meta = CredentialsMetaInput(**input_data[CREDENTIALS_FIELD_NAME])
credentials, creds_lock = creds_manager.acquire(user_id, credentials_meta.id)
ntindle marked this conversation as resolved.
Show resolved Hide resolved
extra_exec_kwargs["credentials"] = credentials

output_size = 0
try:
credit = db_client.get_or_refill_credit(user_id)
if credit < 0:
raise ValueError(f"Insufficient credit: {credit}")
end_status = ExecutionStatus.COMPLETED
credit = db_client.get_or_refill_credit(user_id)
if credit < 0:
raise ValueError(f"Insufficient credit: {credit}")

try:
for output_name, output_data in node_block.execute(
input_data, **extra_exec_kwargs
):
Expand All @@ -185,31 +186,44 @@ def update_execution(status: ExecutionStatus) -> ExecutionResult:
):
yield execution

# Release lock on credentials ASAP
majdyz marked this conversation as resolved.
Show resolved Hide resolved
if creds_lock:
creds_lock.release()

r = update_execution(ExecutionStatus.COMPLETED)
s = input_size + output_size
t = (
(r.end_time - r.start_time).total_seconds()
if r.end_time and r.start_time
else 0
)
db_client.spend_credits(user_id, credit, node_block.id, input_data, s, t)

except Exception as e:
end_status = ExecutionStatus.FAILED
majdyz marked this conversation as resolved.
Show resolved Hide resolved
error_msg = str(e)
log_metadata.exception(f"Node execution failed with error {error_msg}")
db_client.upsert_execution_output(node_exec_id, "error", error_msg)
update_execution(ExecutionStatus.FAILED)

raise e
for execution in _enqueue_next_nodes(
majdyz marked this conversation as resolved.
Show resolved Hide resolved
db_client=db_client,
node=node,
output=("error", error_msg),
user_id=user_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
log_metadata=log_metadata,
):
yield execution

raise e
finally:
# Ensure credentials are released even if execution fails
if creds_lock:
creds_lock.release()
try:
creds_lock.release()
except Exception as e:
log_metadata.error(f"Failed to release credentials lock: {e}")

# Update execution status and spend credits
res = update_execution(end_status)
if end_status == ExecutionStatus.COMPLETED:
s = input_size + output_size
t = (
(res.end_time - res.start_time).total_seconds()
if res.end_time and res.start_time
else 0
)
db_client.spend_credits(user_id, credit, node_block.id, input_data, s, t)

# Update execution stats
if execution_stats is not None:
execution_stats.update(node_block.execution_stats)
execution_stats["input_size"] = input_size
Expand Down
Loading