@ -3,43 +3,56 @@ import threading
from abc import ABC , abstractmethod
from abc import ABC , abstractmethod
from typing import Any , Dict , List
from typing import Any , Dict , List
from swarms . memory . ocean import OceanDB
# from swarms.memory.ocean import OceanDB
import chromadb
## =========>
## =========>
class Orchestrator ( ABC ) :
class Orchestrator ( ABC ) :
def __init__ ( self ,
def __init__ (
agent ,
self ,
agent_list : List [ Any ] ,
agent ,
task_queue : List [ Any ] ,
agent_list : List [ Any ] ,
vector_db : OceanDB
task_queue : List [ Any ] ,
collection_name : str = " swarm "
) :
) :
self . agent = agent
self . agent = agent
self . agents = [ agent ( ) for _ in range ( agent_list ) ]
self . agents = [ agent ( ) for _ in range ( agent_list ) ]
self . task_queue = task_queue
self . task_queue = task_queue
self . vector_db = vector_db
self . chroma_client = chromadb . Client ( )
self . collection = self . chroma_client . create_collection (
name = collection_name
)
self . current_tasks = { }
self . current_tasks = { }
self . lock = threading . Lock ( )
self . lock = threading . Lock ( )
@abstractmethod
@abstractmethod
def assign_task ( self , agent_id : int , task : Dict [ str , Any ] ) - > None :
def assign_task (
self ,
agent_id : int ,
task : Dict [ str , Any ]
) - > None :
""" Assign a task to a specific agent """
""" Assign a task to a specific agent """
with self . lock :
with self . lock :
if self . task_queue :
if self . task_queue :
#get and agent and a task
#get and agent and a task
agent = self . agents . pop ( 0 )
agent = self . agents . pop ( 0 )
task = self . task_queue . popleft ( )
task = self . task_queue . popleft ( )
#process the task and get result and vector representation
result , vector_representation = agent . process_task ( )
result , vector_representation = agent . process_task ( )
#store the vector representation in the database
#use chromas's method to add data
self . vector_db . add_documents ( [ vector_representation ] , [ str ( id ( task ) ) ] )
self . collection . add (
embeddings = [ vector_representation ] ,
documents = [ str ( id ( task ) ) ] ,
ids = [ str ( id ( task ) ) ]
)
#put the agent back to agent slist
#put the agent back to agent slist
self . agents . append ( agent )
self . agents . append ( agent )
logging . info ( f " Task { id ( str ) } has been processed by agent { id ( agent ) } " )
logging . info ( f " Task { id ( str ) } has been processed by agent { id ( agent ) } " )
return result
return result
else :
else :
logging . error ( " Task queue is empty " )
logging . error ( " Task queue is empty " )
@ -47,9 +60,14 @@ class Orchestrator(ABC):
@abstractmethod
@abstractmethod
def retrieve_results ( self , agent_id : int ) - > Any :
def retrieve_results ( self , agent_id : int ) - > Any :
""" Retrieve results from a specific agent """
""" Retrieve results from a specific agent """
try :
try :
#Query the vector database for documents created by the agents
#Query the vector database for documents created by the agents
results = self . vector_db . query ( query_texts = [ str ( agent_id ) ] , n_results = 10 )
results = self . collection . query (
query_texts = [ str ( agent_id ) ] ,
n_results = 10
)
return results
return results
except Exception as e :
except Exception as e :
logging . error ( f " Failed to retrieve results from agent { agent_id } . Error { e } " )
logging . error ( f " Failed to retrieve results from agent { agent_id } . Error { e } " )
@ -58,8 +76,14 @@ class Orchestrator(ABC):
@abstractmethod
@abstractmethod
def update_vector_db ( self , data ) - > None :
def update_vector_db ( self , data ) - > None :
""" Update the vector database """
""" Update the vector database """
try :
try :
self . vector_db . add_documents ( [ data [ ' vector ' ] ] , [ str ( data [ ' task_id ' ] ) ] )
self . collection . add (
embeddings = [ data [ " vector " ] ] ,
documents = [ str ( data [ " task_id " ] ) ] ,
ids = [ str ( data [ " task_id " ] ) ]
)
except Exception as e :
except Exception as e :
logging . error ( f " Failed to update the vector database. Error: { e } " )
logging . error ( f " Failed to update the vector database. Error: { e } " )
raise
raise
@ -68,38 +92,42 @@ class Orchestrator(ABC):
@abstractmethod
@abstractmethod
def get_vector_db ( self ) :
def get_vector_db ( self ) :
""" Retrieve the vector database """
""" Retrieve the vector database """
return self . vector_db
return self . collection
def append_to_db ( self , collection, result: str ) :
def append_to_db ( self , result: str ) :
""" append the result of the swarm to a specifici collection in the database """
""" append the result of the swarm to a specifici collection in the database """
try :
try :
self . vector_db . append_document ( collection , result , id = str ( id ( result ) ) )
self . collection . add (
documents = [ result ] ,
ids = [ str ( id ( result ) ) ]
)
except Exception as e :
except Exception as e :
logging . error ( f " Failed to append the agent output to database. Error: { e } " )
logging . error ( f " Failed to append the agent output to database. Error: { e } " )
raise
raise
def run (
def run ( self , objective : str ) :
self ,
objective : str ,
collection
) :
""" Runs """
""" Runs """
if not objective or not isinstance ( objective , str ) :
if not objective or not isinstance ( objective , str ) :
logging . error ( " Invalid objective " )
logging . error ( " Invalid objective " )
raise ValueError ( " A valid objective is required " )
raise ValueError ( " A valid objective is required " )
try :
try :
#add objective to agent
self . task_queue . append ( objective )
self . task_queue . append ( objective )
results = [
#assign tasks to agents
self . assign_task (
results = [ self . assign_task ( agent_id , task ) for agent_id , task in zip ( range ( len ( self . agents ) ) , self . task_queue ) ]
agent_id , task
) for agent_id , task in zip (
range (
len ( self . agents )
) , self . task_queue
)
]
for result in results :
for result in results :
self . append_to_db ( collection , result )
self . append_to_db ( result)
logging . info ( f " Successfully ran swarms with results: { results } " )
logging . info ( f " Successfully ran swarms with results: { results } " )
return results
return results
except Exception as e :
except Exception as e :