diff --git a/swarms/swarms/README.md b/swarms/swarms/README.md index eb8213dc..1a417831 100644 --- a/swarms/swarms/README.md +++ b/swarms/swarms/README.md @@ -94,4 +94,269 @@ class Orchestrator(ABC): **Example**: Maintain a performance profile for each agent, categorizing them based on their strengths. Assign tasks to agents based on their specialization for optimal performance. -By implementing these ideas and constantly iterating based on real-world usage and performance metrics, it's possible to create a robust and scalable multi-agent collaboration framework. \ No newline at end of file +By implementing these ideas and constantly iterating based on real-world usage and performance metrics, it's possible to create a robust and scalable multi-agent collaboration framework. + + +# 10 improvements to the `Orchestrator` class to enable more flexibility and usability: + +1. Dynamic Agent Creation: Allow the number of agents to be specified at runtime, rather than being fixed at the time of instantiation. + +``` +def add_agents(self, num_agents: int): + for _ in range(num_agents): + self.agents.put(self.agent()) + self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) +``` + +1. Agent Removal: Allow agents to be removed from the pool. + +``` +def remove_agents(self, num_agents: int): + for _ in range(num_agents): + if not self.agents.empty(): + self.agents.get() + self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) +``` + +1. Task Prioritization: Allow tasks to be prioritized. + +``` +from queue import PriorityQueue + +def __init__(self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None): + # ... + self.task_queue = PriorityQueue() + # ... + +def add_task(self, task: Dict[str, Any], priority: int = 0): + self.task_queue.put((priority, task)) +``` + +1. Task Status: Track the status of tasks. + +``` +from enum import Enum + +class TaskStatus(Enum): + QUEUED = 1 + RUNNING = 2 + COMPLETED = 3 + FAILED = 4 + +# In assign_task method +self.current_tasks[id(task)] = TaskStatus.RUNNING +# On successful completion +self.current_tasks[id(task)] = TaskStatus.COMPLETED +# On failure +self.current_tasks[id(task)] = TaskStatus.FAILED +``` + +1. Result Retrieval: Allow results to be retrieved by task ID. + +``` +def retrieve_result(self, task_id: int) -> Any: + return self.collection.query(query_texts=[str(task_id)], n_results=1) +``` + +1. Batch Task Assignment: Allow multiple tasks to be assigned at once. + +``` +def assign_tasks(self, tasks: List[Dict[str, Any]]): + for task in tasks: + self.task_queue.put(task) +``` + +1. Error Handling: Improve error handling by re-queuing failed tasks. + +``` +# In assign_task method +except Exception as error: + logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}") + self.task_queue.put(task) +``` + +1. Agent Status: Track the status of agents (e.g., idle, working). + +``` +self.agent_status = {id(agent): "idle" for agent in self.agents.queue} + +# In assign_task method +self.agent_status[id(agent)] = "working" +# On task completion +self.agent_status[id(agent)] = "idle" +``` + +1. Custom Embedding Function: Allow a custom embedding function to be used. + +``` +def __init__(self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None, embed_func=None): + # ... + self.embed_func = embed_func if embed_func else self.embed + # ... + +def embed(self, input, api_key, model_name): + # ... + embedding = self.embed_func(input) + # ... +``` + +1. Agent Communication: Allow agents to communicate with each other. + +``` +def communicate(self, sender_id: int, receiver_id: int, message: str): + message_vector = self.embed_func(message) + self.collection.add(embeddings=[message_vector], documents=[message], ids=[f"{sender_id}_to_{receiver_id}"]) +``` + + + +``` +import logging +import queue +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List +from enum import Enum + +import chromadb +from chromadb.utils import embedding_functions + +class TaskStatus(Enum): + QUEUED = 1 + RUNNING = 2 + COMPLETED = 3 + FAILED = 4 + +class Orchestrator: + def __init__(self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None, embed_func=None): + self.agent = agent + self.agents = queue.Queue() + self.agent_status = {} + + self.add_agents(agent_list) + + self.task_queue = queue.PriorityQueue() + + self.chroma_client = chromadb.Client() + + self.collection = self.chroma_client.create_collection(name = collection_name) + + self.current_tasks = {} + + self.lock = threading.Lock() + self.condition = threading.Condition(self.lock) + + self.embed_func = embed_func if embed_func else self.embed + + def add_agents(self, num_agents: int): + for _ in range(num_agents): + agent = self.agent() + self.agents.put(agent) + self.agent_status[id(agent)] = "idle" + self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) + + def remove_agents(self, num_agents: int): + for _ in range(num_agents): + if not self.agents.empty(): + agent = self.agents.get() + del self.agent_status[id(agent)] + self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) + + def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None: + while True: + with self.condition: + while not self.task_queue: + self.condition.wait() + agent = self.agents.get() + task = self.task_queue.get() + + try: + self.agent_status[id(agent)] = "working" + result = self.worker.run(task["content"]) + + vector_representation = self.embed_func(result) + + self.collection.add(embeddings=[vector_representation], documents=[str(id(task))], ids=[str(id(task))]) + + logging.info(f"Task {id(str)} has been processed by agent {id(agent)} with") + self.current_tasks[id(task)] = TaskStatus.COMPLETED + + except Exception as error: + logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}") + self.current_tasks[id(task)] = TaskStatus.FAILED + self.task_queue.put(task) + finally: + with self.condition: + self.agent_status[id(agent)] = "idle" + self.agents.put(agent) + self.condition.notify() + + def embed(self, input): + openai = embedding_functions.OpenAIEmbeddingFunction(api_key=self.api_key, model_name=self.model_name) + embedding = openai(input) + return embedding + + def retrieve_results(self, agent_id: int) -> Any: + try: + results = self.collection.query(query_texts=[str(agent_id)], n_results=10) + return results + except Exception as e: + logging.error(f"Failed to retrieve results from agent {id(agent_id)}. Error {e}") + raise + + def update_vector_db(self, data) -> None: + try: + self.collection.add(embeddings=[data["vector"]], documents=[str(data["task_id"])], ids=[str(data["task_id"])]) + except Exception as e: + logging.error(f"Failed to update the vector database. Error: {e}") + raise + + def get_vector_db(self): + return self.collection + + def append_to_db(self, result: str): + try: + self.collection.add(documents=[result], ids=[str(id(result))]) + except Exception as e: + logging.error(f"Failed to append the agent output to database. Error: {e}") + raise + + def run(self, objective:str): + if not objective or not isinstance(objective, str): + logging.error("Invalid objective") + raise ValueError("A valid objective is required") + + try: + self.task_queue.put((0, objective)) + + results = [self.assign_task(agent_id, task) for agent_id, task in zip(range(len(self.agents)), self.task_queue)] + + for result in results: + self.append_to_db(result) + + logging.info(f"Successfully ran swarms with results: {results}") + return results + except Exception as e: + logging.error(f"An error occured in swarm: {e}") + return None + + def chat(self, sender_id: int, receiver_id: int, message: str): + message_vector = self.embed_func(message) + + # Store the message in the vector database + self.collection.add(embeddings=[message_vector], documents=[message], ids=[f"{sender_id}_to_{receiver_id}"]) + + def assign_tasks(self, tasks: List[Dict[str, Any]], priority: int = 0): + for task in tasks: + self.task_queue.put((priority, task)) + + def retrieve_result(self, task_id: int) -> Any: + try: + result = self.collection.query(query_texts=[str(task_id)], n_results=1) + return result + except Exception as e: + logging.error(f"Failed to retrieve result for task {task_id}. Error: {e}") + raise +``` + +With these improvements, the `Orchestrator` class now supports dynamic agent creation and removal, task prioritization, task status tracking, result retrieval by task ID, batch task assignment, improved error handling, agent status tracking, custom embedding functions, and agent communication. This should make the class more flexible and easier to use when creating swarms of LLMs. \ No newline at end of file diff --git a/swarms/swarms/dialogue_simulator.py b/swarms/swarms/dialogue_simulator.py deleted file mode 100644 index b5a07d7b..00000000 --- a/swarms/swarms/dialogue_simulator.py +++ /dev/null @@ -1,97 +0,0 @@ -import os -from typing import Callable, List - - -class DialogueSimulator: - """ - Dialogue Simulator - ------------------ - - Args: - ------ - agents: List[Callable] - max_iters: int - name: str - - Usage: - ------ - >>> from swarms import DialogueSimulator - >>> from swarms.structs.agent import Agent - >>> agents = Agent() - >>> agents1 = Agent() - >>> model = DialogueSimulator([agents, agents1], max_iters=10, name="test") - >>> model.run("test") - """ - - def __init__( - self, - agents: List[Callable], - max_iters: int = 10, - name: str = None, - ): - self.agents = agents - self.max_iters = max_iters - self.name = name - - def run(self, message: str = None): - """Run the dialogue simulator""" - try: - step = 0 - if self.name and message: - prompt = f"Name {self.name} and message: {message}" - for agent in self.agents: - agent.run(prompt) - step += 1 - - while step < self.max_iters: - speaker_idx = step % len(self.agents) - speaker = self.agents[speaker_idx] - speaker_message = speaker.run(prompt) - - for receiver in self.agents: - message_history = ( - f"Speaker Name: {speaker.name} and message:" - f" {speaker_message}" - ) - receiver.run(message_history) - - print(f"({speaker.name}): {speaker_message}") - print("\n") - step += 1 - except Exception as error: - print(f"Error running dialogue simulator: {error}") - - def __repr__(self): - return ( - f"DialogueSimulator({self.agents}, {self.max_iters}," - f" {self.name})" - ) - - def save_state(self): - """Save the state of the dialogue simulator""" - try: - if self.name: - filename = f"{self.name}.txt" - with open(filename, "w") as file: - file.write(str(self)) - except Exception as error: - print(f"Error saving state: {error}") - - def load_state(self): - """Load the state of the dialogue simulator""" - try: - if self.name: - filename = f"{self.name}.txt" - with open(filename, "r") as file: - return file.read() - except Exception as error: - print(f"Error loading state: {error}") - - def delete_state(self): - """Delete the state of the dialogue simulator""" - try: - if self.name: - filename = f"{self.name}.txt" - os.remove(filename) - except Exception as error: - print(f"Error deleting state: {error}") diff --git a/swarms/swarms/notes.md b/swarms/swarms/notes.md deleted file mode 100644 index 8b367f58..00000000 --- a/swarms/swarms/notes.md +++ /dev/null @@ -1,263 +0,0 @@ -# 10 improvements to the `Orchestrator` class to enable more flexibility and usability: - -1. Dynamic Agent Creation: Allow the number of agents to be specified at runtime, rather than being fixed at the time of instantiation. - -``` -def add_agents(self, num_agents: int): - for _ in range(num_agents): - self.agents.put(self.agent()) - self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) -``` - -1. Agent Removal: Allow agents to be removed from the pool. - -``` -def remove_agents(self, num_agents: int): - for _ in range(num_agents): - if not self.agents.empty(): - self.agents.get() - self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) -``` - -1. Task Prioritization: Allow tasks to be prioritized. - -``` -from queue import PriorityQueue - -def __init__(self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None): - # ... - self.task_queue = PriorityQueue() - # ... - -def add_task(self, task: Dict[str, Any], priority: int = 0): - self.task_queue.put((priority, task)) -``` - -1. Task Status: Track the status of tasks. - -``` -from enum import Enum - -class TaskStatus(Enum): - QUEUED = 1 - RUNNING = 2 - COMPLETED = 3 - FAILED = 4 - -# In assign_task method -self.current_tasks[id(task)] = TaskStatus.RUNNING -# On successful completion -self.current_tasks[id(task)] = TaskStatus.COMPLETED -# On failure -self.current_tasks[id(task)] = TaskStatus.FAILED -``` - -1. Result Retrieval: Allow results to be retrieved by task ID. - -``` -def retrieve_result(self, task_id: int) -> Any: - return self.collection.query(query_texts=[str(task_id)], n_results=1) -``` - -1. Batch Task Assignment: Allow multiple tasks to be assigned at once. - -``` -def assign_tasks(self, tasks: List[Dict[str, Any]]): - for task in tasks: - self.task_queue.put(task) -``` - -1. Error Handling: Improve error handling by re-queuing failed tasks. - -``` -# In assign_task method -except Exception as error: - logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}") - self.task_queue.put(task) -``` - -1. Agent Status: Track the status of agents (e.g., idle, working). - -``` -self.agent_status = {id(agent): "idle" for agent in self.agents.queue} - -# In assign_task method -self.agent_status[id(agent)] = "working" -# On task completion -self.agent_status[id(agent)] = "idle" -``` - -1. Custom Embedding Function: Allow a custom embedding function to be used. - -``` -def __init__(self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None, embed_func=None): - # ... - self.embed_func = embed_func if embed_func else self.embed - # ... - -def embed(self, input, api_key, model_name): - # ... - embedding = self.embed_func(input) - # ... -``` - -1. Agent Communication: Allow agents to communicate with each other. - -``` -def communicate(self, sender_id: int, receiver_id: int, message: str): - message_vector = self.embed_func(message) - self.collection.add(embeddings=[message_vector], documents=[message], ids=[f"{sender_id}_to_{receiver_id}"]) -``` - - - -``` -import logging -import queue -import threading -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Dict, List -from enum import Enum - -import chromadb -from chromadb.utils import embedding_functions - -class TaskStatus(Enum): - QUEUED = 1 - RUNNING = 2 - COMPLETED = 3 - FAILED = 4 - -class Orchestrator: - def __init__(self, agent, agent_list: List[Any], task_queue: List[Any], collection_name: str = "swarm", api_key: str = None, model_name: str = None, embed_func=None): - self.agent = agent - self.agents = queue.Queue() - self.agent_status = {} - - self.add_agents(agent_list) - - self.task_queue = queue.PriorityQueue() - - self.chroma_client = chromadb.Client() - - self.collection = self.chroma_client.create_collection(name = collection_name) - - self.current_tasks = {} - - self.lock = threading.Lock() - self.condition = threading.Condition(self.lock) - - self.embed_func = embed_func if embed_func else self.embed - - def add_agents(self, num_agents: int): - for _ in range(num_agents): - agent = self.agent() - self.agents.put(agent) - self.agent_status[id(agent)] = "idle" - self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) - - def remove_agents(self, num_agents: int): - for _ in range(num_agents): - if not self.agents.empty(): - agent = self.agents.get() - del self.agent_status[id(agent)] - self.executor = ThreadPoolExecutor(max_workers=self.agents.qsize()) - - def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None: - while True: - with self.condition: - while not self.task_queue: - self.condition.wait() - agent = self.agents.get() - task = self.task_queue.get() - - try: - self.agent_status[id(agent)] = "working" - result = self.worker.run(task["content"]) - - vector_representation = self.embed_func(result) - - self.collection.add(embeddings=[vector_representation], documents=[str(id(task))], ids=[str(id(task))]) - - logging.info(f"Task {id(str)} has been processed by agent {id(agent)} with") - self.current_tasks[id(task)] = TaskStatus.COMPLETED - - except Exception as error: - logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}") - self.current_tasks[id(task)] = TaskStatus.FAILED - self.task_queue.put(task) - finally: - with self.condition: - self.agent_status[id(agent)] = "idle" - self.agents.put(agent) - self.condition.notify() - - def embed(self, input): - openai = embedding_functions.OpenAIEmbeddingFunction(api_key=self.api_key, model_name=self.model_name) - embedding = openai(input) - return embedding - - def retrieve_results(self, agent_id: int) -> Any: - try: - results = self.collection.query(query_texts=[str(agent_id)], n_results=10) - return results - except Exception as e: - logging.error(f"Failed to retrieve results from agent {id(agent_id)}. Error {e}") - raise - - def update_vector_db(self, data) -> None: - try: - self.collection.add(embeddings=[data["vector"]], documents=[str(data["task_id"])], ids=[str(data["task_id"])]) - except Exception as e: - logging.error(f"Failed to update the vector database. Error: {e}") - raise - - def get_vector_db(self): - return self.collection - - def append_to_db(self, result: str): - try: - self.collection.add(documents=[result], ids=[str(id(result))]) - except Exception as e: - logging.error(f"Failed to append the agent output to database. Error: {e}") - raise - - def run(self, objective:str): - if not objective or not isinstance(objective, str): - logging.error("Invalid objective") - raise ValueError("A valid objective is required") - - try: - self.task_queue.put((0, objective)) - - results = [self.assign_task(agent_id, task) for agent_id, task in zip(range(len(self.agents)), self.task_queue)] - - for result in results: - self.append_to_db(result) - - logging.info(f"Successfully ran swarms with results: {results}") - return results - except Exception as e: - logging.error(f"An error occured in swarm: {e}") - return None - - def chat(self, sender_id: int, receiver_id: int, message: str): - message_vector = self.embed_func(message) - - # Store the message in the vector database - self.collection.add(embeddings=[message_vector], documents=[message], ids=[f"{sender_id}_to_{receiver_id}"]) - - def assign_tasks(self, tasks: List[Dict[str, Any]], priority: int = 0): - for task in tasks: - self.task_queue.put((priority, task)) - - def retrieve_result(self, task_id: int) -> Any: - try: - result = self.collection.query(query_texts=[str(task_id)], n_results=1) - return result - except Exception as e: - logging.error(f"Failed to retrieve result for task {task_id}. Error: {e}") - raise -``` - -With these improvements, the `Orchestrator` class now supports dynamic agent creation and removal, task prioritization, task status tracking, result retrieval by task ID, batch task assignment, improved error handling, agent status tracking, custom embedding functions, and agent communication. This should make the class more flexible and easier to use when creating swarms of LLMs. \ No newline at end of file diff --git a/swarms/swarms/orchestrate.py b/swarms/swarms/orchestrate.py deleted file mode 100644 index 387c32e4..00000000 --- a/swarms/swarms/orchestrate.py +++ /dev/null @@ -1,307 +0,0 @@ -import logging -import queue -import threading -from concurrent.futures import ThreadPoolExecutor -from enum import Enum -from typing import Any, Dict, List - -import chromadb -from chromadb.utils import embedding_functions - - -class TaskStatus(Enum): - QUEUED = 1 - RUNNING = 2 - COMPLETED = 3 - FAILED = 4 - - -class Orchestrator: - """ - The Orchestrator takes in an agent, worker, or boss as input - then handles all the logic for - - task creation, - - task assignment, - - and task compeletion. - - And, the communication for millions of agents to chat with eachother through - a vector database that each agent has access to chat with. - - Each LLM agent chats with the orchestrator through a dedicated - communication layer. The orchestrator assigns tasks to each LLM agent, - which the agents then complete and return. - - This setup allows for a high degree of flexibility, scalability, and robustness. - - In the context of swarm LLMs, one could consider an **Omni-Vector Embedding Database - for communication. This database could store and manage - the high-dimensional vectors produced by each LLM agent. - - Strengths: This approach would allow for similarity-based lookup and matching of - LLM-generated vectors, which can be particularly useful for tasks that involve finding similar outputs or recognizing patterns. - - Weaknesses: An Omni-Vector Embedding Database might add complexity to the system in terms of setup and maintenance. - It might also require significant computational resources, - depending on the volume of data being handled and the complexity of the vectors. - The handling and transmission of high-dimensional vectors could also pose challenges - in terms of network load. - - # Orchestrator - * Takes in an agent class with vector store, - then handles all the communication and scales - up a swarm with number of agents and handles task assignment and task completion - - from swarms import OpenAI, Orchestrator, Swarm - - orchestrated = Orchestrate(OpenAI, nodes=40) #handles all the task assignment and allocation and agent communication using a vectorstore as a universal communication layer and also handlles the task completion logic - - Objective = "Make a business website for a marketing consultancy" - - Swarms = Swarms(orchestrated, auto=True, Objective)) - ``` - - In terms of architecture, the swarm might look something like this: - - ``` - (Orchestrator) - / \ - Tools + Vector DB -- (LLM Agent)---(Communication Layer) (Communication Layer)---(LLM Agent)-- Tools + Vector DB - / | | \ - (Task Assignment) (Task Completion) (Task Assignment) (Task Completion) - - - ###Usage - ``` - from swarms import Orchestrator - - # Instantiate the Orchestrator with 10 agents - orchestrator = Orchestrator(llm, agent_list=[llm]*10, task_queue=[]) - - # Add tasks to the Orchestrator - tasks = [{"content": f"Write a short story about a {animal}."} for animal in ["cat", "dog", "bird", "fish", "lion", "tiger", "elephant", "giraffe", "monkey", "zebra"]] - orchestrator.assign_tasks(tasks) - - # Run the Orchestrator - orchestrator.run() - - # Retrieve the results - for task in tasks: - print(orchestrator.retrieve_result(id(task))) - ``` - """ - - def __init__( - self, - agent, - agent_list: List[Any], - task_queue: List[Any], - collection_name: str = "swarm", - api_key: str = None, - model_name: str = None, - embed_func=None, - worker=None, - ): - self.agent = agent - self.agents = queue.Queue() - - for _ in range(agent_list): - self.agents.put(agent()) - - self.task_queue = queue.Queue() - - self.chroma_client = chromadb.Client() - - self.collection = self.chroma_client.create_collection( - name=collection_name - ) - - self.current_tasks = {} - - self.lock = threading.Lock() - self.condition = threading.Condition(self.lock) - self.executor = ThreadPoolExecutor( - max_workers=len(agent_list) - ) - - self.embed_func = embed_func if embed_func else self.embed - - # @abstractmethod - - def assign_task( - self, agent_id: int, task: Dict[str, Any] - ) -> None: - """Assign a task to a specific agent""" - - while True: - with self.condition: - while not self.task_queue: - self.condition.wait() - agent = self.agents.get() - task = self.task_queue.get() - - try: - result = self.worker.run(task["content"]) - - # using the embed method to get the vector representation of the result - vector_representation = self.embed( - result, self.api_key, self.model_name - ) - - self.collection.add( - embeddings=[vector_representation], - documents=[str(id(task))], - ids=[str(id(task))], - ) - - logging.info( - f"Task {id(str)} has been processed by agent" - f" {id(agent)} with" - ) - - except Exception as error: - logging.error( - f"Failed to process task {id(task)} by agent" - f" {id(agent)}. Error: {error}" - ) - finally: - with self.condition: - self.agents.put(agent) - self.condition.notify() - - def embed(self, input, api_key, model_name): - openai = embedding_functions.OpenAIEmbeddingFunction( - api_key=api_key, model_name=model_name - ) - embedding = openai(input) - return embedding - - # @abstractmethod - - def retrieve_results(self, agent_id: int) -> Any: - """Retrieve results from a specific agent""" - - try: - # Query the vector database for documents created by the agents - results = self.collection.query( - query_texts=[str(agent_id)], n_results=10 - ) - - return results - except Exception as e: - logging.error( - f"Failed to retrieve results from agent {agent_id}." - f" Error {e}" - ) - raise - - # @abstractmethod - def update_vector_db(self, data) -> None: - """Update the vector database""" - - try: - self.collection.add( - embeddings=[data["vector"]], - documents=[str(data["task_id"])], - ids=[str(data["task_id"])], - ) - - except Exception as e: - logging.error( - f"Failed to update the vector database. Error: {e}" - ) - raise - - # @abstractmethod - - def get_vector_db(self): - """Retrieve the vector database""" - return self.collection - - def append_to_db(self, result: str): - """append the result of the swarm to a specifici collection in the database""" - - try: - self.collection.add( - documents=[result], ids=[str(id(result))] - ) - - except Exception as e: - logging.error( - "Failed to append the agent output to database." - f" Error: {e}" - ) - raise - - def run(self, objective: str): - """Runs""" - if not objective or not isinstance(objective, str): - logging.error("Invalid objective") - raise ValueError("A valid objective is required") - - try: - self.task_queue.append(objective) - - results = [ - self.assign_task(agent_id, task) - for agent_id, task in zip( - range(len(self.agents)), self.task_queue - ) - ] - - for result in results: - self.append_to_db(result) - - logging.info( - f"Successfully ran swarms with results: {results}" - ) - return results - except Exception as e: - logging.error(f"An error occured in swarm: {e}") - return None - - def chat(self, sender_id: int, receiver_id: int, message: str): - """ - - Allows the agents to chat with eachother thrught the vectordatabase - - # Instantiate the Orchestrator with 10 agents - orchestrator = Orchestrator( - llm, - agent_list=[llm]*10, - task_queue=[] - ) - - # Agent 1 sends a message to Agent 2 - orchestrator.chat(sender_id=1, receiver_id=2, message="Hello, Agent 2!") - - """ - - message_vector = self.embed( - message, self.api_key, self.model_name - ) - - # store the mesage in the vector database - self.collection.add( - embeddings=[message_vector], - documents=[message], - ids=[f"{sender_id}_to_{receiver_id}"], - ) - - self.run( - objective=f"chat with agent {receiver_id} about {message}" - ) - - def add_agents(self, num_agents: int): - for _ in range(num_agents): - self.agents.put(self.agent()) - self.executor = ThreadPoolExecutor( - max_workers=self.agents.qsize() - ) - - def remove_agents(self, num_agents): - for _ in range(num_agents): - if not self.agents.empty(): - self.agents.get() - self.executor = ThreadPoolExecutor( - max_workers=self.agents.qsize() - ) diff --git a/swarms/swarms/simple_swarm.py b/swarms/swarms/simple_swarm.py deleted file mode 100644 index 7e806215..00000000 --- a/swarms/swarms/simple_swarm.py +++ /dev/null @@ -1,90 +0,0 @@ -from queue import Queue, PriorityQueue - - -class SimpleSwarm: - def __init__( - self, - llm, - num_agents: int = None, - openai_api_key: str = None, - ai_name: str = None, - rounds: int = 1, - *args, - **kwargs, - ): - """ - - Usage: - # Initialize the swarm with 5 agents, an API key, and a name for the AI model - swarm = SimpleSwarm(num_agents=5, openai_api_key="YOUR_OPENAI_API_KEY", ai_name="Optimus Prime") - - # Normal task without priority - normal_task = "Describe the process of photosynthesis in simple terms." - swarm.distribute_task(normal_task) - - # Priority task; lower numbers indicate higher priority (e.g., 1 is higher priority than 2) - priority_task = "Translate the phrase 'Hello World' to French." - swarm.distribute_task(priority_task, priority=1) - - # Run the tasks and gather the responses - responses = swarm.run() - - # Print responses - for response in responses: - print(response) - - # Providing feedback to the system (this is a stubbed method and won't produce a visible effect, but serves as an example) - swarm.provide_feedback("Improve translation accuracy.") - - # Perform a health check on the agents (this is also a stubbed method, illustrating potential usage) - swarm.health_check() - - """ - self.llm = llm - self.agents = [self.llm for _ in range(num_agents)] - self.task_queue = Queue() - self.priority_queue = PriorityQueue() - - def distribute(self, task: str = None, priority=None): - """Distribute a task to the agents""" - if priority: - self.priority_queue.put((priority, task)) - else: - self.task_queue.put(task) - - def _process_task(self, task): - # TODO, Implement load balancing, fallback mechanism - for worker in self.agents: - response = worker.run(task) - if response: - return response - return "All Agents failed" - - def run(self): - """Run the simple swarm""" - - responses = [] - - # process high priority tasks first - while not self.priority_queue.empty(): - _, task = self.priority_queue.get() - responses.append(self._process_task(task)) - - # process normal tasks - while not self.task_queue.empty(): - task = self.task_queue.get() - responses.append(self._process_task(task)) - - return responses - - def run_old(self, task): - responses = [] - - for worker in self.agents: - response = worker.run(task) - responses.append(response) - - return responses - - def __call__(self, task): - return self.run(task)