From 4bcd090a06224a4dcc912b0e54dd732f6cd9243b Mon Sep 17 00:00:00 2001 From: Kye Date: Thu, 27 Jul 2023 16:47:37 -0400 Subject: [PATCH] orchestrator scaffold --- swarms/agents/agent.py | 4 ++-- swarms/orchestrate.py | 51 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/swarms/agents/agent.py b/swarms/agents/agent.py index e997c620..5ceb24dd 100644 --- a/swarms/agents/agent.py +++ b/swarms/agents/agent.py @@ -1,5 +1,7 @@ #base toolset from swarms.agents.tools.agent_tools import * +from swarms.utils.logger import logger + from langchain.tools import BaseTool from langchain.callbacks.manager import ( @@ -11,12 +13,10 @@ from langchain.memory.chat_message_histories import FileChatMessageHistory import logging from pydantic import BaseModel, Extra - from swarms.agents.models.hf import HuggingFaceLLM logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - class AgentNodeInitializer: """Useful for spawning autonomous agent instances to accomplish complex tasks.""" diff --git a/swarms/orchestrate.py b/swarms/orchestrate.py index d5f53fcc..912e0f39 100644 --- a/swarms/orchestrate.py +++ b/swarms/orchestrate.py @@ -44,35 +44,72 @@ import celery from typing import List, Dict, Any import numpy as np -from swarms.agents. +from swarms.agents.memory.ocean import OceanDB + class Orchestrator(ABC): - def __init__(self, agent, agent_list: List[Any], task_queue: celery.Celery, vector_db: np.ndarray): + def __init__(self, + agent, + agent_list: List[Any], + task_queue: celery.Celery, + vector_db: OceanDB + ): self.agent = agent - self.agents = agent_list + self.agents = [agent_class() for _ in range(agent_list)] self.task_queue = task_queue self.vector_db = vector_db self.current_tasks = {} + self.lock = Lock() @abstractmethod def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None: """Assign a task to a specific agent""" - pass + with self.lock: + if self.task_queue: + #get and agent and a task + agent = self.agents.pop(0) + task = self.task_queue.popleft() + + #process the task and get result and vector representation + result, vector_representation = agent.process_task() + + #store the vector representation in the database + self.vector_db.add_documents([vector_representation],[str(id(task))]) + + #put the agent back to agent slist + self.agents.append(agent) + + logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ") + + return result + else: + logging.error("Task queue is empty") @abstractmethod def retrieve_results(self, agent_id: int) -> Any: """Retrieve results from a specific agent""" - pass + try: + #Query the vector database for documents created by the agents + results = self.vector_db.query(query_texts=[str(agent_id)], n_results=10) + return results + except Exception as e: + logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}") + raise @abstractmethod def update_vector_db(self, data: np.ndarray) -> None: """Update the vector database""" - pass + try: + self.vector_db.add_documents([data['vector']], [str(data['task_id'])]) + except Exception as e: + logging.error(f"Failed to update the vector database. Error: {e}") + raise + @abstractmethod def get_vector_db(self) -> np.ndarray: """Retrieve the vector database""" - pass + return self.vector_db