`Orchestator`

Former-commit-id: b8b0a882ab
group-chat
Kye 1 year ago
parent 3fc7fceacc
commit af66efe9ed

@ -1,9 +1,9 @@
import threading
import queue import queue
import threading
from time import sleep from time import sleep
from swarms.workers.worker import Worker
from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator
from swarms.workers.worker import Worker
# TODO Handle task assignment and task delegation # TODO Handle task assignment and task delegation
# TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents. # TODO: User task => decomposed into very small sub tasks => sub tasks assigned to workers => workers complete and update the swarm, can ask for help from other agents.

@ -3,43 +3,56 @@ import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Dict, List from typing import Any, Dict, List
from swarms.memory.ocean import OceanDB # from swarms.memory.ocean import OceanDB
import chromadb
## =========> ## =========>
class Orchestrator(ABC): class Orchestrator(ABC):
def __init__(self, def __init__(
agent, self,
agent_list: List[Any], agent,
task_queue: List[Any], agent_list: List[Any],
vector_db: OceanDB task_queue: List[Any],
collection_name: str = "swarm"
): ):
self.agent = agent self.agent = agent
self.agents = [agent() for _ in range(agent_list)] self.agents = [agent() for _ in range(agent_list)]
self.task_queue = task_queue self.task_queue = task_queue
self.vector_db = vector_db
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection(
name=collection_name
)
self.current_tasks = {} self.current_tasks = {}
self.lock = threading.Lock() self.lock = threading.Lock()
@abstractmethod @abstractmethod
def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None: def assign_task(
self,
agent_id: int,
task: Dict[str, Any]
) -> None:
"""Assign a task to a specific agent""" """Assign a task to a specific agent"""
with self.lock: with self.lock:
if self.task_queue: if self.task_queue:
#get and agent and a task #get and agent and a task
agent = self.agents.pop(0) agent = self.agents.pop(0)
task = self.task_queue.popleft() task = self.task_queue.popleft()
#process the task and get result and vector representation
result, vector_representation = agent.process_task() result, vector_representation = agent.process_task()
#store the vector representation in the database #use chromas's method to add data
self.vector_db.add_documents([vector_representation],[str(id(task))]) self.collection.add(
embeddings=[vector_representation],
documents=[str(id(task))],
ids=[str(id(task))]
)
#put the agent back to agent slist #put the agent back to agent slist
self.agents.append(agent) self.agents.append(agent)
logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ") logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ")
return result return result
else: else:
logging.error("Task queue is empty") logging.error("Task queue is empty")
@ -47,9 +60,14 @@ class Orchestrator(ABC):
@abstractmethod @abstractmethod
def retrieve_results(self, agent_id: int) -> Any: def retrieve_results(self, agent_id: int) -> Any:
"""Retrieve results from a specific agent""" """Retrieve results from a specific agent"""
try: try:
#Query the vector database for documents created by the agents #Query the vector database for documents created by the agents
results = self.vector_db.query(query_texts=[str(agent_id)], n_results=10) results = self.collection.query(
query_texts=[str(agent_id)],
n_results=10
)
return results return results
except Exception as e: except Exception as e:
logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}") logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}")
@ -58,8 +76,14 @@ class Orchestrator(ABC):
@abstractmethod @abstractmethod
def update_vector_db(self, data) -> None: def update_vector_db(self, data) -> None:
"""Update the vector database""" """Update the vector database"""
try: try:
self.vector_db.add_documents([data['vector']], [str(data['task_id'])]) self.collection.add(
embeddings=[data["vector"]],
documents=[str(data["task_id"])],
ids=[str(data["task_id"])]
)
except Exception as e: except Exception as e:
logging.error(f"Failed to update the vector database. Error: {e}") logging.error(f"Failed to update the vector database. Error: {e}")
raise raise
@ -68,38 +92,42 @@ class Orchestrator(ABC):
@abstractmethod @abstractmethod
def get_vector_db(self): def get_vector_db(self):
"""Retrieve the vector database""" """Retrieve the vector database"""
return self.vector_db return self.collection
def append_to_db(self, collection, result: str): def append_to_db(self, result: str):
"""append the result of the swarm to a specifici collection in the database""" """append the result of the swarm to a specifici collection in the database"""
try: try:
self.vector_db.append_document(collection, result, id=str(id(result))) self.collection.add(
documents=[result],
ids=[str(id(result))]
)
except Exception as e: except Exception as e:
logging.error(f"Failed to append the agent output to database. Error: {e}") logging.error(f"Failed to append the agent output to database. Error: {e}")
raise raise
def run( def run(self, objective:str):
self,
objective:str,
collection
):
"""Runs""" """Runs"""
if not objective or not isinstance(objective, str): if not objective or not isinstance(objective, str):
logging.error("Invalid objective") logging.error("Invalid objective")
raise ValueError("A valid objective is required") raise ValueError("A valid objective is required")
try: try:
#add objective to agent
self.task_queue.append(objective) self.task_queue.append(objective)
results = [
#assign tasks to agents self.assign_task(
results = [self.assign_task(agent_id, task) for agent_id, task in zip(range(len(self.agents)), self.task_queue)] agent_id, task
) for agent_id, task in zip(
range(
len(self.agents)
), self.task_queue
)
]
for result in results: for result in results:
self.append_to_db(collection, result) self.append_to_db(result)
logging.info(f"Successfully ran swarms with results: {results}") logging.info(f"Successfully ran swarms with results: {results}")
return results return results
except Exception as e: except Exception as e:

Loading…
Cancel
Save