|
|
|
@ -1,11 +1,13 @@
|
|
|
|
|
import logging
|
|
|
|
|
import queue
|
|
|
|
|
import threading
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
from typing import Any, Dict, List
|
|
|
|
|
|
|
|
|
|
# from swarms.memory.ocean import OceanDB
|
|
|
|
|
|
|
|
|
|
import chromadb
|
|
|
|
|
from chromadb.utils import embedding_functions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## =========>
|
|
|
|
|
class Orchestrator(ABC):
|
|
|
|
@ -17,16 +19,25 @@ class Orchestrator(ABC):
|
|
|
|
|
collection_name: str = "swarm"
|
|
|
|
|
):
|
|
|
|
|
self.agent = agent
|
|
|
|
|
self.agents = [agent() for _ in range(agent_list)]
|
|
|
|
|
self.task_queue = task_queue
|
|
|
|
|
self.agents = queue.Queue()
|
|
|
|
|
|
|
|
|
|
for _ in range(agent_list):
|
|
|
|
|
self.agents.put(agent())
|
|
|
|
|
|
|
|
|
|
self.task_queue = queue.Queue()
|
|
|
|
|
|
|
|
|
|
self.chroma_client = chromadb.Client()
|
|
|
|
|
|
|
|
|
|
self.collection = self.chroma_client.create_collection(
|
|
|
|
|
name = collection_name
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.current_tasks = {}
|
|
|
|
|
|
|
|
|
|
self.lock = threading.Lock()
|
|
|
|
|
self.condition = threading.Condition(self.lock)
|
|
|
|
|
self.executor = ThreadPoolExecutor(max_workers=len(agent_list))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def assign_task(
|
|
|
|
@ -36,26 +47,45 @@ class Orchestrator(ABC):
|
|
|
|
|
) -> None:
|
|
|
|
|
"""Assign a task to a specific agent"""
|
|
|
|
|
|
|
|
|
|
with self.lock:
|
|
|
|
|
if self.task_queue:
|
|
|
|
|
#get and agent and a task
|
|
|
|
|
agent = self.agents.pop(0)
|
|
|
|
|
task = self.task_queue.popleft()
|
|
|
|
|
result, vector_representation = agent.process_task()
|
|
|
|
|
while True:
|
|
|
|
|
with self.condition:
|
|
|
|
|
while not self.task_queue:
|
|
|
|
|
self.condition.wait()
|
|
|
|
|
agent = self.agents.get()
|
|
|
|
|
task = self.task_queue.get()
|
|
|
|
|
|
|
|
|
|
#use chromas's method to add data
|
|
|
|
|
try:
|
|
|
|
|
result, vector_representation = agent.process_task(
|
|
|
|
|
task
|
|
|
|
|
)
|
|
|
|
|
self.collection.add(
|
|
|
|
|
embeddings=[vector_representation],
|
|
|
|
|
documents=[str(id(task))],
|
|
|
|
|
ids=[str(id(task))]
|
|
|
|
|
)
|
|
|
|
|
logging.info(f"Task {id(str)} has been processed by agent {id(agent)} with")
|
|
|
|
|
|
|
|
|
|
except Exception as error:
|
|
|
|
|
logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}")
|
|
|
|
|
finally:
|
|
|
|
|
with self.condition:
|
|
|
|
|
self.agents.put(agent)
|
|
|
|
|
self.condition.notify()
|
|
|
|
|
|
|
|
|
|
def embed(self, input, api_key, model_name):
|
|
|
|
|
openai = embedding_functions.OpenAIEmbeddingFunction(
|
|
|
|
|
api_key=api_key,
|
|
|
|
|
model_name=model_name
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
embedding = openai(input)
|
|
|
|
|
# print(embedding)
|
|
|
|
|
|
|
|
|
|
embedding_metadata = {input: embedding}
|
|
|
|
|
print(embedding_metadata)
|
|
|
|
|
|
|
|
|
|
# return embedding
|
|
|
|
|
|
|
|
|
|
#put the agent back to agent slist
|
|
|
|
|
self.agents.append(agent)
|
|
|
|
|
logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ")
|
|
|
|
|
return result
|
|
|
|
|
else:
|
|
|
|
|
logging.error("Task queue is empty")
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def retrieve_results(self, agent_id: int) -> Any:
|
|
|
|
@ -115,6 +145,7 @@ class Orchestrator(ABC):
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.task_queue.append(objective)
|
|
|
|
|
|
|
|
|
|
results = [
|
|
|
|
|
self.assign_task(
|
|
|
|
|
agent_id, task
|
|
|
|
|