diff --git a/docs/swarms/agents/omni_agent.md b/docs/swarms/agents/omni_agent.md index 046586d2..5e1400b2 100644 --- a/docs/swarms/agents/omni_agent.md +++ b/docs/swarms/agents/omni_agent.md @@ -10,7 +10,6 @@ The `OmniModalAgent` class is at the core of an architecture designed to facilit 4. **Tools**: A collection of tools and utilities used to process different types of tasks. They span across areas like image captioning, translation, and more. - ## Structure & Organization ### Table of Contents: diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index 467e67f2..19df6e15 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -8,8 +8,6 @@ from swarms.agents.omni_modal_agent import OmniModalAgent - - #utils from swarms.agents.message import Message from swarms.agents.stream_response import stream diff --git a/swarms/swarms/simple_swarm.py b/swarms/swarms/simple_swarm.py index 1ffe260f..c5003003 100644 --- a/swarms/swarms/simple_swarm.py +++ b/swarms/swarms/simple_swarm.py @@ -1,4 +1,5 @@ from swarms.workers.worker import Worker +from queue import Queue, PriorityQueue class SimpleSwarm: def __init__( @@ -8,21 +9,77 @@ class SimpleSwarm: ai_name ): """ - - # Usage - swarm = Swarm(num_workers=5, openai_api_key="", ai_name="Optimus Prime") - task = "What were the winning Boston Marathon times for the past 5 years (ending in 2022)? Generate a table of the year, name, country of origin, and times." - responses = swarm.distribute_task(task) + Usage: + # Initialize the swarm with 5 workers, an API key, and a name for the AI model + swarm = SimpleSwarm(num_workers=5, openai_api_key="YOUR_OPENAI_API_KEY", ai_name="Optimus Prime") + + # Normal task without priority + normal_task = "Describe the process of photosynthesis in simple terms." + swarm.distribute_task(normal_task) + + # Priority task; lower numbers indicate higher priority (e.g., 1 is higher priority than 2) + priority_task = "Translate the phrase 'Hello World' to French." + swarm.distribute_task(priority_task, priority=1) + + # Run the tasks and gather the responses + responses = swarm.run() + + # Print responses for response in responses: print(response) - + + # Providing feedback to the system (this is a stubbed method and won't produce a visible effect, but serves as an example) + swarm.provide_feedback("Improve translation accuracy.") + + # Perform a health check on the agents (this is also a stubbed method, illustrating potential usage) + swarm.health_check() + """ self.workers = [ Worker(openai_api_key, ai_name) for _ in range(num_workers) ] + self.task_queue = Queue() + self.priority_queue = PriorityQueue() + + def distribute( + self, + task, + priority=None + ): + """Distribute a task to the workers""" + if priority: + self.priority_queue.put((priority, task)) + else: + self.task_queue.put(task) + + def _process_task(self, task): + #TODO, Implement load balancing, fallback mechanism + for worker in self.workers: + response = worker.run(task) + if response: + return response + return "All Agents failed" + + def run(self): + """Run the simple swarm""" + + responses = [] + + #process high priority tasks first + while not self.priority_queue.empty(): + _, task = self.priority_queue.get() + responses.append(self._process_task(task)) + + #process normal tasks + while not self.task_queue.empty(): + task = self.task_queue.get() + responses.append(self._process_task(task)) + + return responses + - def run(self, task): + def run_old(self, task): responses = [] for worker in self.workers: diff --git a/swarms/workers/worker.py b/swarms/workers/worker.py index 692b3302..a2cacead 100644 --- a/swarms/workers/worker.py +++ b/swarms/workers/worker.py @@ -1,4 +1,4 @@ -# import concurrent.futures + import faiss from langchain.chat_models import ChatOpenAI from langchain.docstore import InMemoryDocstore @@ -7,6 +7,7 @@ from langchain.tools.human.tool import HumanInputRun from langchain.vectorstores import FAISS from langchain_experimental.autonomous_agents import AutoGPT +from swarms.agents.message import Message from swarms.tools.autogpt import ( ReadFileTool, VQAinference, @@ -17,6 +18,13 @@ from swarms.tools.autogpt import ( ) from swarms.utils.decorators import error_decorator, log_decorator, timing_decorator + # self.llm = ChatOpenAI( + # model_name=model_name, + # openai_api_key=self.openai_api_key, + # temperature=self.temperature + # ) + +#cache ROOT_DIR = "./data/" @@ -51,42 +59,20 @@ class Worker: ``` """ - # @log_decorator - # @error_decorator - # @timing_decorator def __init__( self, - model_name="gpt-4", - openai_api_key=None, - ai_name="Autobot Swarm Worker", - ai_role="Worker in a swarm", + openai_api_key = None, + ai_name = "Autobot Swarm Worker", + ai_role = "Worker in a swarm", external_tools = None, - human_in_the_loop=False, - temperature=0.5, - # llm=None, - # openai: bool = True, + human_in_the_loop = False, + temperature = 0.5, + llm = None, ): self.openai_api_key = openai_api_key self.temperature = temperature self.human_in_the_loop = human_in_the_loop - # self.openai = openai - - - # if self.openai is True: - # try: - self.llm = ChatOpenAI( - model_name=model_name, - openai_api_key=self.openai_api_key, - temperature=self.temperature - ) - # except Exception as error: - # raise RuntimeError(f"Error Initializing ChatOpenAI: {error}") - # else: - # self.llm = llm( - # model_name=model_name, - # temperature=self.temperature - # ) - + self.llm = llm self.ai_name = ai_name self.ai_role = ai_role self.setup_tools(external_tools) @@ -96,8 +82,6 @@ class Worker: # self.task_queue = [] # self.executor = concurrent.futures.ThreadPoolExecutor() - - def reset(self): """ Reset the message history. @@ -108,7 +92,6 @@ class Worker: def name(self): return self.ai_name - def receieve(self, name: str, message: str) -> None: """ Receive a message and update the message history. @@ -125,29 +108,8 @@ class Worker: def add(self, task, priority=0): self.task_queue.append((priority, task)) - # def process_task(self, task): - # try: - # result = self.agent.run([task]) - # return result - # except Exception as error: - # error_message = f"Error while running task: {str(error)}" - # return error_message - - # def process_tasks_parallel(self): - # futures = [ - # self.executor.submit( - # self.process_task, - # task - # ) for _, task in self.task_queue - # ] - # concurrent.futures.wait(futures) - # results = [future.result() for future in futures] - # return results - # @log_decorator - # @error_decorator - # @timing_decorator def setup_tools(self, external_tools): """ Set up tools for the worker. @@ -179,6 +141,7 @@ class Worker: if external_tools is not None: self.tools.extend(external_tools) + def setup_memory(self): """ Set up memory for the worker. @@ -210,9 +173,9 @@ class Worker: except Exception as error: raise RuntimeError(f"Error setting up agent: {error}") - # @log_decorator - # @error_decorator - # @timing_decorator + @log_decorator + @error_decorator + @timing_decorator def run(self, task): """ Run the autonomous agent on a given task. @@ -229,9 +192,9 @@ class Worker: except Exception as error: raise RuntimeError(f"Error while running agent: {error}") - # @log_decorator - # @error_decorator - # @timing_decorator + @log_decorator + @error_decorator + @timing_decorator def __call__(self, task): """ Make the worker callable to run the agent on a given task. @@ -248,4 +211,89 @@ class Worker: except Exception as error: raise RuntimeError(f"Error while running agent: {error}") + def health_check(self): + pass + + @log_decorator + @error_decorator + @timing_decorator + def chat( + self, + msg: str = None, + streaming: bool = False + ): + """ + Run chat + + Args: + msg (str, optional): Message to send to the agent. Defaults to None. + language (str, optional): Language to use. Defaults to None. + streaming (bool, optional): Whether to stream the response. Defaults to False. + + Returns: + str: Response from the agent + + Usage: + -------------- + agent = MultiModalAgent() + agent.chat("Hello") + + """ + + #add users message to the history + self.history.append( + Message( + "User", + msg + ) + ) + + #process msg + try: + response = self.agent.run(msg) + + #add agent's response to the history + self.history.append( + Message( + "Agent", + response + ) + ) + + #if streaming is = True + if streaming: + return self._stream_response(response) + else: + response + + except Exception as error: + error_message = f"Error processing message: {str(error)}" + + #add error to history + self.history.append( + Message( + "Agent", + error_message + ) + ) + + return error_message + + def _stream_response( + self, + response: str = None + ): + """ + Yield the response token by token (word by word) + + Usage: + -------------- + for token in _stream_response(response): + print(token) + + """ + for token in response.split(): + yield token + +