Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-agents, master agent refactor #869

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 68 additions & 36 deletions multi_agents/agents/master.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import os
import time
from langgraph.graph import StateGraph, END
#from langgraph.checkpoint.memory import MemorySaver
import datetime
from langgraph.graph import StateGraph, END
# from langgraph.checkpoint.memory import MemorySaver
from .utils.views import print_agent_output
from ..memory.research import ResearchState
from .utils.utils import sanitize_filename


# Import agent classes
from . import \
WriterAgent, \
Expand All @@ -18,69 +17,102 @@


class ChiefEditorAgent:
"""Agent responsible for managing and coordinating editing tasks."""

def __init__(self, task: dict, websocket=None, stream_output=None, tone=None, headers=None):
self.task_id = int(time.time()) # Currently time based, but can be any unique identifier
self.output_dir = "./outputs/" + sanitize_filename(f"run_{self.task_id}_{task.get('query')[0:40]}")
self.task = task
self.websocket = websocket
self.stream_output = stream_output
self.headers = headers or {}
self.tone = tone
os.makedirs(self.output_dir, exist_ok=True)
self.task_id = self._generate_task_id()
self.output_dir = self._create_output_directory()

def init_research_team(self):
# Initialize agents
writer_agent = WriterAgent(self.websocket, self.stream_output, self.headers)
editor_agent = EditorAgent(self.websocket, self.stream_output, self.headers)
research_agent = ResearchAgent(self.websocket, self.stream_output, self.tone, self.headers)
publisher_agent = PublisherAgent(self.output_dir, self.websocket, self.stream_output, self.headers)
human_agent = HumanAgent(self.websocket, self.stream_output, self.headers)

# Define a Langchain StateGraph with the ResearchState
def _generate_task_id(self):
# Currently time based, but can be any unique identifier
return int(time.time())

def _create_output_directory(self):
output_dir = "./outputs/" + \
sanitize_filename(
f"run_{self.task_id}_{self.task.get('query')[0:40]}")

os.makedirs(output_dir, exist_ok=True)
return output_dir

def _initialize_agents(self):
return {
"writer": WriterAgent(self.websocket, self.stream_output, self.headers),
"editor": EditorAgent(self.websocket, self.stream_output, self.headers),
"research": ResearchAgent(self.websocket, self.stream_output, self.tone, self.headers),
"publisher": PublisherAgent(self.output_dir, self.websocket, self.stream_output, self.headers),
"human": HumanAgent(self.websocket, self.stream_output, self.headers)
}

def _create_workflow(self, agents):
workflow = StateGraph(ResearchState)

# Add nodes for each agent
workflow.add_node("browser", research_agent.run_initial_research)
workflow.add_node("planner", editor_agent.plan_research)
workflow.add_node("researcher", editor_agent.run_parallel_research)
workflow.add_node("writer", writer_agent.run)
workflow.add_node("publisher", publisher_agent.run)
workflow.add_node("human", human_agent.review_plan)
workflow.add_node("browser", agents["research"].run_initial_research)
workflow.add_node("planner", agents["editor"].plan_research)
workflow.add_node("researcher", agents["editor"].run_parallel_research)
workflow.add_node("writer", agents["writer"].run)
workflow.add_node("publisher", agents["publisher"].run)
workflow.add_node("human", agents["human"].review_plan)

# Add edges
self._add_workflow_edges(workflow)

return workflow

def _add_workflow_edges(self, workflow):
workflow.add_edge('browser', 'planner')
workflow.add_edge('planner', 'human')
workflow.add_edge('researcher', 'writer')
workflow.add_edge('writer', 'publisher')

# set up start and end nodes
workflow.set_entry_point("browser")
workflow.add_edge('publisher', END)

# Add human in the loop
workflow.add_conditional_edges('human',
(lambda review: "accept" if review['human_feedback'] is None else "revise"),
{"accept": "researcher", "revise": "planner"})
workflow.add_conditional_edges(
'human',
lambda review: "accept" if review['human_feedback'] is None else "revise",
{"accept": "researcher", "revise": "planner"}
)

return workflow
def init_research_team(self):
"""Initialize and create a workflow for the research team."""
agents = self._initialize_agents()
return self._create_workflow(agents)

async def _log_research_start(self):
message = f"Starting the research process for query '{self.task.get('query')}'..."
if self.websocket and self.stream_output:
await self.stream_output("logs", "starting_research", message, self.websocket)
else:
print_agent_output(message, "MASTER")

async def run_research_task(self, task_id=None):
research_team = self.init_research_team()
"""
Run a research task with the initialized research team.

Args:
task_id (optional): The ID of the task to run.

# compile the graph
#memory = MemorySaver()
Returns:
The result of the research task.
"""
research_team = self.init_research_team()
chain = research_team.compile()
if self.websocket and self.stream_output:
await self.stream_output("logs", "starting_research", f"Starting the research process for query '{self.task.get('query')}'...", self.websocket)
else:
print_agent_output(f"Starting the research process for query '{self.task.get('query')}'...", "MASTER")

await self._log_research_start()

config = {
"configurable": {
"thread_id": task_id,
"thread_ts": datetime.datetime.utcnow()
}
}

result = await chain.ainvoke({"task": self.task}, config=config)

result = await chain.ainvoke({"task": self.task}, config=config)
return result