From b8b0a882abcbd979f00a8d12060acee6889480bc Mon Sep 17 00:00:00 2001 From: Kye Date: Thu, 14 Sep 2023 16:58:40 -0400 Subject: [PATCH] `Orchestator` --- swarms/swarms/autoscaler.py | 4 +- swarms/swarms/orchestrate.py | 90 +++++++++++++++++++++++------------- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/swarms/swarms/autoscaler.py b/swarms/swarms/autoscaler.py index dfbf8fb3..0a91b10b 100644 --- a/swarms/swarms/autoscaler.py +++ b/swarms/swarms/autoscaler.py @@ -1,9 +1,9 @@ -import threading import queue +import threading from time import sleep -from swarms.workers.worker import Worker from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator +from swarms.workers.worker import Worker # TODO Handle task assignment and task delegation # TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents. diff --git a/swarms/swarms/orchestrate.py b/swarms/swarms/orchestrate.py index f29f323b..18843daa 100644 --- a/swarms/swarms/orchestrate.py +++ b/swarms/swarms/orchestrate.py @@ -3,43 +3,56 @@ import threading from abc import ABC, abstractmethod from typing import Any, Dict, List -from swarms.memory.ocean import OceanDB +# from swarms.memory.ocean import OceanDB + +import chromadb ## =========> class Orchestrator(ABC): - def __init__(self, - agent, - agent_list: List[Any], - task_queue: List[Any], - vector_db: OceanDB + def __init__( + self, + agent, + agent_list: List[Any], + task_queue: List[Any], + collection_name: str = "swarm" ): self.agent = agent self.agents = [agent() for _ in range(agent_list)] self.task_queue = task_queue - self.vector_db = vector_db + + self.chroma_client = chromadb.Client() + self.collection = self.chroma_client.create_collection( + name=collection_name + ) + self.current_tasks = {} self.lock = threading.Lock() @abstractmethod - def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None: + def assign_task( + self, + agent_id: int, + task: Dict[str, Any] + ) -> None: """Assign a task to a specific agent""" + with self.lock: if self.task_queue: #get and agent and a task agent = self.agents.pop(0) task = self.task_queue.popleft() - - #process the task and get result and vector representation result, vector_representation = agent.process_task() - #store the vector representation in the database - self.vector_db.add_documents([vector_representation],[str(id(task))]) + #use chromas's method to add data + self.collection.add( + embeddings=[vector_representation], + documents=[str(id(task))], + ids=[str(id(task))] + ) #put the agent back to agent slist self.agents.append(agent) - logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ") - return result else: logging.error("Task queue is empty") @@ -47,9 +60,14 @@ class Orchestrator(ABC): @abstractmethod def retrieve_results(self, agent_id: int) -> Any: """Retrieve results from a specific agent""" + try: #Query the vector database for documents created by the agents - results = self.vector_db.query(query_texts=[str(agent_id)], n_results=10) + results = self.collection.query( + query_texts=[str(agent_id)], + n_results=10 + ) + return results except Exception as e: logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}") @@ -58,8 +76,14 @@ class Orchestrator(ABC): @abstractmethod def update_vector_db(self, data) -> None: """Update the vector database""" + try: - self.vector_db.add_documents([data['vector']], [str(data['task_id'])]) + self.collection.add( + embeddings=[data["vector"]], + documents=[str(data["task_id"])], + ids=[str(data["task_id"])] + ) + except Exception as e: logging.error(f"Failed to update the vector database. Error: {e}") raise @@ -68,38 +92,42 @@ class Orchestrator(ABC): @abstractmethod def get_vector_db(self): """Retrieve the vector database""" - return self.vector_db + return self.collection - def append_to_db(self, collection, result: str): + def append_to_db(self, result: str): """append the result of the swarm to a specifici collection in the database""" + try: - self.vector_db.append_document(collection, result, id=str(id(result))) + self.collection.add( + documents=[result], + ids=[str(id(result))] + ) + except Exception as e: logging.error(f"Failed to append the agent output to database. Error: {e}") raise - def run( - self, - objective:str, - collection - ): + def run(self, objective:str): """Runs""" - if not objective or not isinstance(objective, str): logging.error("Invalid objective") raise ValueError("A valid objective is required") try: - #add objective to agent self.task_queue.append(objective) - - #assign tasks to agents - results = [self.assign_task(agent_id, task) for agent_id, task in zip(range(len(self.agents)), self.task_queue)] + results = [ + self.assign_task( + agent_id, task + ) for agent_id, task in zip( + range( + len(self.agents) + ), self.task_queue + ) + ] for result in results: - self.append_to_db(collection, result) + self.append_to_db(result) - logging.info(f"Successfully ran swarms with results: {results}") return results except Exception as e: