diff --git a/pyproject.toml b/pyproject.toml index cf1133f9..d9c62a4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ tenacity = "*" redis = "*" Pillow = "*" shapeless="*" +chromadb = "*" [tool.poetry.dev-dependencies] # Add development dependencies here diff --git a/swarms/swarms/orchestrate.py b/swarms/swarms/orchestrate.py index 18843daa..09786da6 100644 --- a/swarms/swarms/orchestrate.py +++ b/swarms/swarms/orchestrate.py @@ -1,11 +1,13 @@ import logging +import queue import threading from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List -# from swarms.memory.ocean import OceanDB - import chromadb +from chromadb.utils import embedding_functions + ## =========> class Orchestrator(ABC): @@ -17,16 +19,25 @@ class Orchestrator(ABC): collection_name: str = "swarm" ): self.agent = agent - self.agents = [agent() for _ in range(agent_list)] - self.task_queue = task_queue + self.agents = queue.Queue() + + for _ in range(agent_list): + self.agents.put(agent()) + + self.task_queue = queue.Queue() self.chroma_client = chromadb.Client() + self.collection = self.chroma_client.create_collection( - name=collection_name + name = collection_name ) self.current_tasks = {} + self.lock = threading.Lock() + self.condition = threading.Condition(self.lock) + self.executor = ThreadPoolExecutor(max_workers=len(agent_list)) + @abstractmethod def assign_task( @@ -36,26 +47,45 @@ class Orchestrator(ABC): ) -> None: """Assign a task to a specific agent""" - with self.lock: - if self.task_queue: - #get and agent and a task - agent = self.agents.pop(0) - task = self.task_queue.popleft() - result, vector_representation = agent.process_task() - - #use chromas's method to add data + while True: + with self.condition: + while not self.task_queue: + self.condition.wait() + agent = self.agents.get() + task = self.task_queue.get() + + try: + result, vector_representation = agent.process_task( + task + ) self.collection.add( embeddings=[vector_representation], documents=[str(id(task))], ids=[str(id(task))] ) + logging.info(f"Task {id(str)} has been processed by agent {id(agent)} with") + + except Exception as error: + logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}") + finally: + with self.condition: + self.agents.put(agent) + self.condition.notify() + + def embed(self, input, api_key, model_name): + openai = embedding_functions.OpenAIEmbeddingFunction( + api_key=api_key, + model_name=model_name + ) - #put the agent back to agent slist - self.agents.append(agent) - logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ") - return result - else: - logging.error("Task queue is empty") + embedding = openai(input) + # print(embedding) + + embedding_metadata = {input: embedding} + print(embedding_metadata) + + # return embedding + @abstractmethod def retrieve_results(self, agent_id: int) -> Any: @@ -115,6 +145,7 @@ class Orchestrator(ABC): try: self.task_queue.append(objective) + results = [ self.assign_task( agent_id, task