diff --git a/README.md b/README.md index b5eae2b9..e6dea7ca 100644 --- a/README.md +++ b/README.md @@ -555,6 +555,67 @@ def interactive_conversation(llm): # Replace with your LLM instance interactive_conversation(llm) +``` + + +### `SwarmNetwork` +- Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion. + +- Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents. + +- Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure. + +```python +import os + +from dotenv import load_dotenv + +# Import the OpenAIChat model and the Agent struct +from swarms.models import OpenAIChat +from swarms.structs import Agent +from swarms.structs.swarm_net import SwarmNetwork + +# Load the environment variables +load_dotenv() + +# Get the API key from the environment +api_key = os.environ.get("OPENAI_API_KEY") + +# Initialize the language model +llm = OpenAIChat( + temperature=0.5, + openai_api_key=api_key, +) + +## Initialize the workflow +agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager") +agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager") +agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager") + + +# Load the swarmnet with the agents +swarmnet = SwarmNetwork( + agents=[agent, agent2, agent3], +) + +# List the agents in the swarm network +out = swarmnet.list_agents() +print(out) + +# Run the workflow on a task +out = swarmnet.run_single_agent( + agent2.id, "Generate a 10,000 word blog on health and wellness." +) +print(out) + + +# Run all the agents in the swarm network on a task +out = swarmnet.run_many_agents( + "Generate a 10,000 word blog on health and wellness." +) +print(out) + + ``` --- diff --git a/file_list.txt b/file_list.txt deleted file mode 100644 index 3fe9ba56..00000000 --- a/file_list.txt +++ /dev/null @@ -1,12 +0,0 @@ -- pdf_to_text: "swarms/utils/pdf_to_text.md" -- load_model_torch: "swarms/utils/load_model_torch.md" -- metrics_decorator: "swarms/utils/metrics_decorator.md" -- prep_torch_inference: "swarms/utils/prep_torch_inference.md" -- find_image_path: "swarms/utils/find_image_path.md" -- print_class_parameters: "swarms/utils/print_class_parameters.md" -- extract_code_from_markdown: "swarms/utils/extract_code_from_markdown.md" -- check_device: "swarms/utils/check_device.md" -- display_markdown_message: "swarms/utils/display_markdown_message.md" -- phoenix_tracer: "swarms/utils/phoenix_tracer.md" -- limit_tokens_from_string: "swarms/utils/limit_tokens_from_string.md" -- math_eval: "swarms/utils/math_eval.md" diff --git a/scripts/auto_tests_docs/mkdocs_handler.py b/scripts/auto_tests_docs/mkdocs_handler.py index 58718bf3..6cb0452b 100644 --- a/scripts/auto_tests_docs/mkdocs_handler.py +++ b/scripts/auto_tests_docs/mkdocs_handler.py @@ -22,7 +22,9 @@ def generate_file_list(directory, output_file): # Remove the file extension file_name, _ = os.path.splitext(file) # Write the file name and path to the output file - f.write(f'- {file_name}: "swarms/utils/{file_path}"\n') + f.write( + f'- {file_name}: "swarms/utils/{file_path}"\n' + ) # Use the function to generate the file list diff --git a/swarm_network.py b/swarm_network.py new file mode 100644 index 00000000..ba96a934 --- /dev/null +++ b/swarm_network.py @@ -0,0 +1,48 @@ +import os + +from dotenv import load_dotenv + +# Import the OpenAIChat model and the Agent struct +from swarms.models import OpenAIChat +from swarms.structs import Agent +from swarms.structs.swarm_net import SwarmNetwork + +# Load the environment variables +load_dotenv() + +# Get the API key from the environment +api_key = os.environ.get("OPENAI_API_KEY") + +# Initialize the language model +llm = OpenAIChat( + temperature=0.5, + openai_api_key=api_key, +) + +## Initialize the workflow +agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager") +agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager") +agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager") + + +# Load the swarmnet with the agents +swarmnet = SwarmNetwork( + agents=[agent, agent2, agent3], +) + +# List the agents in the swarm network +out = swarmnet.list_agents() +print(out) + +# Run the workflow on a task +out = swarmnet.run_single_agent( + agent2.id, "Generate a 10,000 word blog on health and wellness." +) +print(out) + + +# Run all the agents in the swarm network on a task +out = swarmnet.run_many_agents( + "Generate a 10,000 word blog on health and wellness." +) +print(out) diff --git a/swarms/__init__.py b/swarms/__init__.py index f6f04205..d555cf4d 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -1,6 +1,4 @@ -from swarms.utils.disable_logging import disable_logging - -disable_logging() +# disable_logging() from swarms.agents import * # noqa: E402, F403 from swarms.swarms import * # noqa: E402, F403 diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 4a58ea8d..5921fb52 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -8,6 +8,7 @@ from swarms.structs.schemas import ( ArtifactUpload, StepInput, ) +from swarms.structs.swarm_net import SwarmNetwork __all__ = [ "Agent", @@ -18,4 +19,5 @@ __all__ = [ "Artifact", "ArtifactUpload", "StepInput", + "SwarmNetwork", ] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index b1607783..3903d4ad 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -63,98 +63,95 @@ class Agent: * Ability to provide a loop interval Args: - id (str): The id of the agent llm (Any): The language model to use - template (Optional[str]): The template to use - max_loops (int): The maximum number of loops - stopping_condition (Optional[Callable[[str], bool]]): The stopping condition + template (str): The template to use + max_loops (int): The maximum number of loops to run + stopping_condition (Callable): The stopping condition to use loop_interval (int): The loop interval - retry_attempts (int): The retry attempts + retry_attempts (int): The number of retry attempts retry_interval (int): The retry interval return_history (bool): Return the history stopping_token (str): The stopping token - dynamic_loops (Optional[bool]): Dynamic loops - interactive (bool): Interactive mode - dashboard (bool): Dashboard mode + dynamic_loops (bool): Enable dynamic loops + interactive (bool): Enable interactive mode + dashboard (bool): Enable dashboard agent_name (str): The name of the agent agent_description (str): The description of the agent system_prompt (str): The system prompt - tools (List[BaseTool]): The tools - dynamic_temperature_enabled (Optional[bool]): Dynamic temperature enabled - sop (Optional[str]): The standard operating procedure - sop_list (Optional[List[str]]): The standard operating procedure list - saved_state_path (Optional[str]): The saved state path - autosave (Optional[bool]): Autosave - context_length (Optional[int]): The context length + tools (List[BaseTool]): The tools to use + dynamic_temperature_enabled (bool): Enable dynamic temperature + sop (str): The standard operating procedure + sop_list (List[str]): The standard operating procedure list + saved_state_path (str): The path to the saved state + autosave (bool): Autosave the state + context_length (int): The context length user_name (str): The user name - self_healing_enabled (Optional[bool]): Self healing enabled - code_interpreter (Optional[bool]): Code interpreter - multi_modal (Optional[bool]): Multi modal - pdf_path (Optional[str]): The pdf path - list_of_pdf (Optional[str]): The list of pdf - tokenizer (Optional[Any]): The tokenizer - memory (Optional[VectorDatabase]): The memory - preset_stopping_token (Optional[bool]): Preset stopping token - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. + self_healing_enabled (bool): Enable self healing + code_interpreter (bool): Enable code interpreter + multi_modal (bool): Enable multimodal + pdf_path (str): The path to the pdf + list_of_pdf (str): The list of pdf + tokenizer (Any): The tokenizer + memory (VectorDatabase): The memory + preset_stopping_token (bool): Enable preset stopping token + traceback (Any): The traceback + traceback_handlers (Any): The traceback handlers + streaming_on (bool): Enable streaming Methods: - run(task: str, **kwargs: Any): Run the agent on a task - run_concurrent(tasks: List[str], **kwargs: Any): Run the agent on a list of tasks concurrently - bulk_run(inputs: List[Dict[str, Any]]): Run the agent on a list of inputs - from_llm_and_template(llm: Any, template: str): Create AgentStream from LLM and a string template. - from_llm_and_template_file(llm: Any, template_file: str): Create AgentStream from LLM and a template file. - save(file_path): Save the agent history to a file - load(file_path): Load the agent history from a file - validate_response(response: str): Validate the response based on certain criteria - print_history_and_memory(): Print the entire history and memory of the agent - step(task: str, **kwargs): Executes a single step in the agent interaction, generating a response from the language model based on the given input text. - graceful_shutdown(): Gracefully shutdown the system saving the state - run_with_timeout(task: str, timeout: int): Run the loop but stop if it takes longer than the timeout - analyze_feedback(): Analyze the feedback for issues - undo_last(): Response the last response and return the previous state - add_response_filter(filter_word: str): Add a response filter to filter out certain words from the response - apply_reponse_filters(response: str): Apply the response filters to the response - filtered_run(task: str): Filtered run - interactive_run(max_loops: int): Interactive run mode - streamed_generation(prompt: str): Stream the generation of the response - get_llm_params(): Extracts and returns the parameters of the llm object for serialization. - save_state(file_path: str): Saves the current state of the agent to a JSON file, including the llm parameters. - load_state(file_path: str): Loads the state of the agent from a json file and restores the configuration and memory. - retry_on_failure(function, retries: int = 3, retry_delay: int = 1): Retry wrapper for LLM calls. - run_code(response: str): Run the code in the response - construct_dynamic_prompt(): Construct the dynamic prompt - extract_tool_commands(text: str): Extract the tool commands from the text - parse_and_execute_tools(response: str): Parse and execute the tools - execute_tools(tool_name, params): Execute the tool with the provided params - truncate_history(): Take the history and truncate it to fit into the model context length - add_task_to_memory(task: str): Add the task to the memory - add_message_to_memory(message: str): Add the message to the memory - add_message_to_memory_and_truncate(message: str): Add the message to the memory and truncate - print_dashboard(task: str): Print dashboard - activate_autonomous_agent(): Print the autonomous agent activation message - dynamic_temperature(): Dynamically change the temperature - _check_stopping_condition(response: str): Check if the stopping condition is met - format_prompt(template, **kwargs: Any): Format the template with the provided kwargs using f-string interpolation. - get_llm_init_params(): Get LLM init params - get_tool_description(): Get the tool description - find_tool_by_name(name: str): Find a tool by name - - - Example: + run: Run the agent + run_concurrent: Run the agent concurrently + bulk_run: Run the agent in bulk + save: Save the agent + load: Load the agent + validate_response: Validate the response + print_history_and_memory: Print the history and memory + step: Step through the agent + graceful_shutdown: Gracefully shutdown the agent + run_with_timeout: Run the agent with a timeout + analyze_feedback: Analyze the feedback + undo_last: Undo the last response + add_response_filter: Add a response filter + apply_response_filters: Apply the response filters + filtered_run: Run the agent with filtered responses + interactive_run: Run the agent in interactive mode + streamed_generation: Stream the generation of the response + get_llm_params: Get the llm parameters + save_state: Save the state + load_state: Load the state + get_llm_init_params: Get the llm init parameters + get_tool_description: Get the tool description + find_tool_by_name: Find a tool by name + extract_tool_commands: Extract the tool commands + execute_tools: Execute the tools + parse_and_execute_tools: Parse and execute the tools + truncate_history: Truncate the history + add_task_to_memory: Add the task to the memory + add_message_to_memory: Add the message to the memory + add_message_to_memory_and_truncate: Add the message to the memory and truncate + parse_tool_docs: Parse the tool docs + print_dashboard: Print the dashboard + loop_count_print: Print the loop count + streaming: Stream the content + _history: Generate the history + _dynamic_prompt_setup: Setup the dynamic prompt + agent_system_prompt_2: Agent system prompt 2 + run_async: Run the agent asynchronously + run_async_concurrent: Run the agent asynchronously and concurrently + run_async_concurrent: Run the agent asynchronously and concurrently + construct_dynamic_prompt: Construct the dynamic prompt + construct_dynamic_prompt: Construct the dynamic prompt + + + Examples: >>> from swarms.models import OpenAIChat >>> from swarms.structs import Agent - >>> llm = OpenAIChat( - ... openai_api_key=api_key, - ... temperature=0.5, - ... ) - >>> agent = Agent( - ... llm=llm, max_loops=5, - ... #system_prompt=SYSTEM_PROMPT, - ... #retry_interval=1, - ... ) - >>> agent.run("Generate a 10,000 word blog") - >>> agent.save("path/agent.yaml") + >>> llm = OpenAIChat() + >>> agent = Agent(llm=llm, max_loops=1) + >>> response = agent.run("Generate a report on the financials.") + >>> print(response) + >>> # Generate a report on the financials. + """ def __init__( @@ -172,7 +169,7 @@ class Agent: dynamic_loops: Optional[bool] = False, interactive: bool = False, dashboard: bool = False, - agent_name: str = "Autonomous-Agent-XYZ1B", + agent_name: str = None, agent_description: str = None, system_prompt: str = AGENT_SYSTEM_PROMPT_3, tools: List[BaseTool] = None, diff --git a/swarms/structs/swarm_net.py b/swarms/structs/swarm_net.py new file mode 100644 index 00000000..4a0ae0de --- /dev/null +++ b/swarms/structs/swarm_net.py @@ -0,0 +1,320 @@ +import asyncio +import logging +import queue +import threading +from typing import List, Optional + +from fastapi import FastAPI + +from swarms.structs.agent import Agent +from swarms.structs.base import BaseStructure + + +class SwarmNetwork(BaseStructure): + """ + SwarmNetwork class + + The SwarmNetwork class is responsible for managing the agents pool + and the task queue. It also monitors the health of the agents and + scales the pool up or down based on the number of pending tasks + and the current load of the agents. + + For example, if the number of pending tasks is greater than the + number of agents in the pool, the SwarmNetwork will scale up the + pool by adding new agents. If the number of pending tasks is less + than the number of agents in the pool, the SwarmNetwork will scale + down the pool by removing agents. + + The SwarmNetwork class also provides a simple API for interacting + with the agents pool. The API is implemented using the Flask + framework and is enabled by default. The API can be disabled by + setting the `api_enabled` parameter to False. + + Features: + - Agent pool management + - Task queue management + - Agent health monitoring + - Agent pool scaling + - Simple API for interacting with the agent pool + - Simple API for interacting with the task queue + - Simple API for interacting with the agent health monitor + - Simple API for interacting with the agent pool scaler + - Create APIs for each agent in the pool (optional) + - Run each agent on it's own thread + - Run each agent on it's own process + - Run each agent on it's own container + - Run each agent on it's own machine + - Run each agent on it's own cluster + + + """ + + def __init__( + self, + idle_threshold: float = 0.2, + busy_threshold: float = 0.7, + agents: List[Agent] = None, + api_enabled: Optional[bool] = False, + logging_enabled: Optional[bool] = False, + *args, + **kwargs, + ): + self.task_queue = queue.Queue() + self.idle_threshold = idle_threshold + self.busy_threshold = busy_threshold + self.lock = threading.Lock() + self.agents = agents + self.api_enabled = api_enabled + self.logging_enabled = logging_enabled + + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + + if api_enabled: + self.api = FastAPI() + + self.agent_pool = [] + + def add_task(self, task): + """Add task to the task queue + + Args: + task (_type_): _description_ + + Example: + >>> from swarms.structs.agent import Agent + >>> from swarms.structs.swarm_net import SwarmNetwork + >>> agent = Agent() + >>> swarm = SwarmNetwork(agents=[agent]) + >>> swarm.add_task("task") + + """ + self.logger.info(f"Adding task {task} to queue") + try: + self.task_queue.put(task) + self.logger.info(f"Task {task} added to queue") + except Exception as error: + print( + f"Error adding task to queue: {error} try again with" + " a new task" + ) + raise error + + async def async_add_task(self, task): + """Add task to the task queue + + Args: + task (_type_): _description_ + + Example: + >>> from swarms.structs.agent import Agent + >>> from swarms.structs.swarm_net import SwarmNetwork + >>> agent = Agent() + >>> swarm = SwarmNetwork(agents=[agent]) + >>> swarm.add_task("task") + + """ + self.logger.info( + f"Adding task {task} to queue asynchronously" + ) + try: + # Add task to queue asynchronously with asyncio + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, self.task_queue.put, task + ) + self.logger.info(f"Task {task} added to queue") + except Exception as error: + print( + f"Error adding task to queue: {error} try again with" + " a new task" + ) + raise error + + def run_single_agent( + self, agent_id, task: Optional[str] = None, *args, **kwargs + ): + """Run agent the task on the agent id + + Args: + agent_id (_type_): _description_ + task (str, optional): _description_. Defaults to None. + + Raises: + ValueError: _description_ + + Returns: + _type_: _description_ + """ + self.logger.info(f"Running task {task} on agent {agent_id}") + try: + for agent in self.agents_pool: + if agent.id == agent_id: + return agent.run(task, *args, **kwargs) + self.logger.info(f"No agent found with ID {agent_id}") + raise ValueError(f"No agent found with ID {agent_id}") + except Exception as error: + print(f"Error running task on agent: {error}") + raise error + + def run_many_agents( + self, task: Optional[str] = None, *args, **kwargs + ) -> List: + """Run the task on all agents + + Args: + task (str, optional): _description_. Defaults to None. + + Returns: + List: _description_ + """ + self.logger.info(f"Running task {task} on all agents") + try: + return [ + agent.run(task, *args, **kwargs) + for agent in self.agents_pool + ] + except Exception as error: + print(f"Error running task on agents: {error}") + raise error + + def list_agents(self): + """List all agents + + Returns: + List: _description_ + """ + self.logger.info("[Listing all active agents]") + try: + # return [agent.id for agent in self.agents_pool] + for agent in self.agents: + num_agents = len(self.agents) + self.logger.info( + f"[Number of active agents: {num_agents}]" + ) + return self.logger.info( + f"[Agent] [ID: {agent.id}] [Name:" + f" {agent.agent_name}] [Description:" + f" {agent.agent_description}] [Status] [Running]" + ) + except Exception as error: + print(f"Error listing agents: {error}") + raise error + + def get_agent(self, agent_id): + """Get agent by id + + Args: + agent_id (_type_): _description_ + + Returns: + _type_: _description_ + """ + self.logger.info(f"Getting agent {agent_id}") + + try: + for agent in self.agents_pool: + if agent.id == agent_id: + return agent + raise ValueError(f"No agent found with ID {agent_id}") + except Exception as error: + self.logger.error(f"Error getting agent: {error}") + raise error + + def add_agent(self, agent): + """Add agent to the agent pool + + Args: + agent (_type_): _description_ + """ + self.logger.info(f"Adding agent {agent} to pool") + try: + self.agents_pool.append(agent) + except Exception as error: + print(f"Error adding agent to pool: {error}") + raise error + + def remove_agent(self, agent_id): + """Remove agent from the agent pool + + Args: + agent_id (_type_): _description_ + """ + self.logger.info(f"Removing agent {agent_id} from pool") + try: + for agent in self.agents_pool: + if agent.id == agent_id: + self.agents_pool.remove(agent) + return + raise ValueError(f"No agent found with ID {agent_id}") + except Exception as error: + print(f"Error removing agent from pool: {error}") + raise error + + async def async_remove_agent(self, agent_id): + """Remove agent from the agent pool + + Args: + agent_id (_type_): _description_ + """ + self.logger.info(f"Removing agent {agent_id} from pool") + try: + # Remove agent from pool asynchronously with asyncio + loop = asyncio.get_running_loop() + await loop.run_in_executor( + None, self.remove_agent, agent_id + ) + except Exception as error: + print(f"Error removing agent from pool: {error}") + raise error + + def scale_up(self, num_agents: int = 1): + """Scale up the agent pool + + Args: + num_agents (int, optional): _description_. Defaults to 1. + """ + self.logger.info(f"Scaling up agent pool by {num_agents}") + try: + for _ in range(num_agents): + self.agents_pool.append(Agent()) + except Exception as error: + print(f"Error scaling up agent pool: {error}") + raise error + + def scale_down(self, num_agents: int = 1): + """Scale down the agent pool + + Args: + num_agents (int, optional): _description_. Defaults to 1. + """ + for _ in range(num_agents): + self.agents_pool.pop() + + # - Create APIs for each agent in the pool (optional) with fastapi + def create_apis_for_agents(self): + """Create APIs for each agent in the pool (optional) with fastapi + + Returns: + _type_: _description_ + """ + self.apis = [] + for agent in self.agents: + self.api.get(f"/{agent.id}") + + def run_agent(task: str, *args, **kwargs): + return agent.run(task, *args, **kwargs) + + self.apis.append(self.api) + + def run(self): + """run the swarm network""" + # Observe all agents in the pool + self.logger.info("Starting the SwarmNetwork") + + for agent in self.agents: + self.logger.info(f"Starting agent {agent.id}") + self.logger.info( + f"[Agent][{agent.id}] [Status] [Running] [Awaiting" + " Task]" + ) diff --git a/swarms/structs/task.py b/swarms/structs/task.py index 81351b4f..b96390b8 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -1,4 +1,7 @@ +import sched +import time from dataclasses import dataclass, field +from datetime import datetime from typing import ( Any, Callable, @@ -10,23 +13,36 @@ from typing import ( from swarms.structs.agent import Agent -# Define a generic Task that can handle different types of callable objects @dataclass class Task: """ Task class for running a task in a sequential workflow. - - Args: - description (str): The description of the task. - agent (Union[Callable, Agent]): The model or agent to execute the task. - args (List[Any]): Additional arguments to pass to the task execution. - kwargs (Dict[str, Any]): Additional keyword arguments to pass to the task execution. - result (Any): The result of the task execution. - history (List[Any]): The history of the task execution. + Attributes: + description (str): Description of the task. + agent (Union[Callable, Agent]): Agent or callable object to run the task. + args (List[Any]): Arguments to pass to the agent or callable object. + kwargs (Dict[str, Any]): Keyword arguments to pass to the agent or callable object. + result (Any): Result of the task. + history (List[Any]): History of the task. + schedule_time (datetime): Time to schedule the task. + scheduler (sched.scheduler): Scheduler to schedule the task. + trigger (Callable): Trigger to run the task. + action (Callable): Action to run the task. + condition (Callable): Condition to run the task. + priority (int): Priority of the task. + dependencies (List[Task]): List of tasks that need to be completed before this task can be executed. Methods: - execute: Execute the task. + execute: Execute the task by calling the agent or model with the arguments and keyword arguments. + handle_scheduled_task: Handles the execution of a scheduled task. + set_trigger: Sets the trigger for the task. + set_action: Sets the action for the task. + set_condition: Sets the condition for the task. + is_completed: Checks whether the task has been completed. + add_dependency: Adds a task to the list of dependencies. + set_priority: Sets the priority of the task. + check_dependency_completion: Checks whether all the dependencies have been completed. Examples: @@ -45,34 +61,134 @@ class Task: kwargs: Dict[str, Any] = field(default_factory=dict) result: Any = None history: List[Any] = field(default_factory=list) - # logger = logging.getLogger(__name__) + schedule_time: datetime = None + scheduler = sched.scheduler(time.time, time.sleep) + trigger: Callable = None + action: Callable = None + condition: Callable = None + priority: int = 0 + dependencies: List["Task"] = field(default_factory=list) def execute(self): """ - Execute the task. + Execute the task by calling the agent or model with the arguments and + keyword arguments. + + Examples: + >>> from swarms.structs import Task, Agent + >>> from swarms.models import OpenAIChat + >>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False) + >>> task = Task(description="What's the weather in miami", agent=agent) + >>> task.execute() + >>> task.result - Raises: - ValueError: If a Agent instance is used as a task and the 'task' argument is not provided. """ - if isinstance(self.agent, Agent): - # Add a prompt to notify the Agent of the sequential workflow - if "prompt" in self.kwargs: - self.kwargs["prompt"] += ( - f"\n\nPrevious output: {self.result}" - if self.result - else "" - ) - else: - self.kwargs["prompt"] = ( - f"Main task: {self.description}" - + ( - f"\n\nPrevious output: {self.result}" - if self.result - else "" + + try: + if isinstance(self.agent, Agent): + if self.condition is None or self.condition(): + self.result = self.agent.run( + *self.args, **self.kwargs ) + self.history.append(self.result) + + if self.action is not None: + self.action() + else: + self.result = self.agent.run( + *self.args, **self.kwargs ) - self.result = self.agent.run(*self.args, **self.kwargs) - else: - self.result = self.agent(*self.args, **self.kwargs) - self.history.append(self.result) + self.history.append(self.result) + except Exception as error: + print(f"[ERROR][Task] {error}") + + def handle_scheduled_task(self): + """ + Handles the execution of a scheduled task. + + If the schedule time is not set or has already passed, the task is executed immediately. + Otherwise, the task is scheduled to be executed at the specified schedule time. + """ + try: + if ( + self.schedule_time is None + or self.schedule_time <= datetime.now() + ): + self.execute() + + else: + delay = ( + self.schedule_time - datetime.now() + ).total_seconds() + self.scheduler.enter(delay, 1, self.execute) + self.scheduler_run() + except Exception as error: + print(f"[ERROR][Task] {error}") + + def set_trigger(self, trigger: Callable): + """ + Sets the trigger for the task. + + Args: + trigger (Callable): The trigger to set. + """ + self.trigger = trigger + + def set_action(self, action: Callable): + """ + Sets the action for the task. + + Args: + action (Callable): The action to set. + """ + self.action = action + + def set_condition(self, condition: Callable): + """ + Sets the condition for the task. + + Args: + condition (Callable): The condition to set. + """ + self.condition = condition + + def is_completed(self): + """Is the task completed? + + Returns: + _type_: _description_ + """ + return self.result is not None + + def add_dependency(self, task): + """Adds a task to the list of dependencies. + + Args: + task (_type_): _description_ + """ + self.dependencies.append(task) + + def set_priority(self, priority: int): + """Sets the priority of the task. + + Args: + priority (int): _description_ + """ + self.priority = priority + + def check_dependency_completion(self): + """ + Checks whether all the dependencies have been completed. + + Returns: + bool: True if all the dependencies have been completed, False otherwise. + """ + try: + for task in self.dependencies: + if not task.is_completed(): + return False + except Exception as error: + print( + f"[ERROR][Task][check_dependency_completion] {error}" + ) diff --git a/swarms/swarms/team.py b/swarms/swarms/team.py new file mode 100644 index 00000000..d4482db9 --- /dev/null +++ b/swarms/swarms/team.py @@ -0,0 +1,95 @@ +import json +from typing import List, Optional + +from pydantic.v1 import BaseModel, Field, Json, root_validator + +from swarms.structs.agent import Agent +from swarms.structs.task import Task + + +class Team(BaseModel): + """ + Class that represents a group of agents, how they should work together and + their tasks. + + Attributes: + tasks (Optional[List[Task]]): List of tasks. + agents (Optional[List[Agent]]): List of agents in this Team. + architecture (str): Architecture that the Team will follow. Default is "sequential". + verbose (bool): Verbose mode for the Agent Execution. Default is False. + config (Optional[Json]): Configuration of the Team. Default is None. + """ + + tasks: Optional[List[Task]] = Field(description="List of tasks") + agents: Optional[List[Agent]] = Field(description="List of agents in this Team.") + architecture = Field( + description="architecture that the Team will follow.", default="sequential" + ) + verbose: bool = Field( + description="Verbose mode for the Agent Execution", default=False + ) + config: Optional[Json] = Field( + description="Configuration of the Team.", default=None + ) + + @root_validator(pre=True) + def check_config(_cls, values): + if not values.get("config") and ( + not values.get("agents") and not values.get("tasks") + ): + raise ValueError("Either agents and task need to be set or config.") + + if values.get("config"): + config = json.loads(values.get("config")) + if not config.get("agents") or not config.get("tasks"): + raise ValueError("Config should have agents and tasks.") + + values["agents"] = [Agent(**agent) for agent in config["agents"]] + + tasks = [] + for task in config["tasks"]: + task_agent = [ + agt for agt in values["agents"] if agt.role == task["agent"] + ][0] + del task["agent"] + tasks.append(Task(**task, agent=task_agent)) + + values["tasks"] = tasks + return values + + def run(self) -> str: + """ + Kickoff the Team to work on its tasks. + + Returns: + output (List[str]): Output of the Team for each task. + """ + if self.architecture == "sequential": + return self.__sequential_loop() + + def __sequential_loop(self) -> str: + """ + Loop that executes the sequential architecture. + + Returns: + output (str): Output of the Team. + """ + task_outcome = None + for task in self.tasks: + # Add delegation tools to the task if the agent allows it + # if task.agent.allow_delegation: + # tools = AgentTools(agents=self.agents).tools() + # task.tools += tools + + self.__log(f"\nWorking Agent: {task.agent.role}") + self.__log(f"Starting Task: {task.description} ...") + + task_outcome = task.execute(task_outcome) + + self.__log(f"Task output: {task_outcome}") + + return task_outcome + + def __log(self, message): + if self.verbose: + print(message) \ No newline at end of file diff --git a/tests/structs/test_swarmnetwork.py b/tests/structs/test_swarmnetwork.py new file mode 100644 index 00000000..9264ee8d --- /dev/null +++ b/tests/structs/test_swarmnetwork.py @@ -0,0 +1,50 @@ +import pytest +from unittest.mock import Mock, patch +from swarms.structs.swarm_net import SwarmNetwork +from swarms.structs.agent import Agent + + +@pytest.fixture +def swarm_network(): + agents = [Agent(id=f"Agent_{i}") for i in range(5)] + return SwarmNetwork(agents=agents) + + +def test_swarm_network_init(swarm_network): + assert isinstance(swarm_network.agents, list) + assert len(swarm_network.agents) == 5 + + +@patch("swarms.structs.swarm_net.SwarmNetwork.logger") +def test_run(mock_logger, swarm_network): + swarm_network.run() + assert ( + mock_logger.info.call_count == 10 + ) # 2 log messages per agent + + +def test_run_with_mocked_agents(mocker, swarm_network): + mock_agents = [Mock(spec=Agent) for _ in range(5)] + mocker.patch.object(swarm_network, "agents", mock_agents) + swarm_network.run() + for mock_agent in mock_agents: + assert mock_agent.run.called + + +def test_swarm_network_with_no_agents(): + swarm_network = SwarmNetwork(agents=[]) + assert swarm_network.agents == [] + + +def test_swarm_network_add_agent(swarm_network): + new_agent = Agent(id="Agent_5") + swarm_network.add_agent(new_agent) + assert len(swarm_network.agents) == 6 + assert swarm_network.agents[-1] == new_agent + + +def test_swarm_network_remove_agent(swarm_network): + agent_to_remove = swarm_network.agents[0] + swarm_network.remove_agent(agent_to_remove) + assert len(swarm_network.agents) == 4 + assert agent_to_remove not in swarm_network.agents diff --git a/tests/structs/test_task.py b/tests/structs/test_task.py index fada564a..8a76549c 100644 --- a/tests/structs/test_task.py +++ b/tests/structs/test_task.py @@ -9,6 +9,8 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( ) from swarms.structs.agent import Agent from swarms.structs.task import Task +import datetime +from datetime import timedelta load_dotenv() @@ -163,3 +165,119 @@ def test_execute(): task = Task(id="5", task="Task5", result=None, agents=[agent]) # Assuming execute method returns True on successful execution assert task.execute() is True + + +def test_task_execute_with_agent(mocker): + mock_agent = mocker.Mock(spec=Agent) + mock_agent.run.return_value = "result" + task = Task(description="Test task", agent=mock_agent) + task.execute() + assert task.result == "result" + assert task.history == ["result"] + + +def test_task_execute_with_callable(mocker): + mock_callable = mocker.Mock() + mock_callable.run.return_value = "result" + task = Task(description="Test task", agent=mock_callable) + task.execute() + assert task.result == "result" + assert task.history == ["result"] + + +def test_task_execute_with_condition(mocker): + mock_agent = mocker.Mock(spec=Agent) + mock_agent.run.return_value = "result" + condition = mocker.Mock(return_value=True) + task = Task( + description="Test task", agent=mock_agent, condition=condition + ) + task.execute() + assert task.result == "result" + assert task.history == ["result"] + + +def test_task_execute_with_condition_false(mocker): + mock_agent = mocker.Mock(spec=Agent) + mock_agent.run.return_value = "result" + condition = mocker.Mock(return_value=False) + task = Task( + description="Test task", agent=mock_agent, condition=condition + ) + task.execute() + assert task.result is None + assert task.history == [] + + +def test_task_execute_with_action(mocker): + mock_agent = mocker.Mock(spec=Agent) + mock_agent.run.return_value = "result" + action = mocker.Mock() + task = Task( + description="Test task", agent=mock_agent, action=action + ) + task.execute() + assert task.result == "result" + assert task.history == ["result"] + action.assert_called_once() + + +def test_task_handle_scheduled_task_now(mocker): + mock_agent = mocker.Mock(spec=Agent) + mock_agent.run.return_value = "result" + task = Task( + description="Test task", + agent=mock_agent, + schedule_time=datetime.now(), + ) + task.handle_scheduled_task() + assert task.result == "result" + assert task.history == ["result"] + + +def test_task_handle_scheduled_task_future(mocker): + mock_agent = mocker.Mock(spec=Agent) + mock_agent.run.return_value = "result" + task = Task( + description="Test task", + agent=mock_agent, + schedule_time=datetime.now() + timedelta(days=1), + ) + with mocker.patch.object( + task.scheduler, "enter" + ) as mock_enter, mocker.patch.object( + task.scheduler, "run" + ) as mock_run: + task.handle_scheduled_task() + mock_enter.assert_called_once() + mock_run.assert_called_once() + + +def test_task_set_trigger(): + task = Task(description="Test task", agent=Agent()) + + def trigger(): + return True + + task.set_trigger(trigger) + assert task.trigger == trigger + + +def test_task_set_action(): + task = Task(description="Test task", agent=Agent()) + + def action(): + return True + + task.set_action(action) + assert task.action == action + + +def test_task_set_condition(): + task = Task(description="Test task", agent=Agent()) + + def condition(): + return True + + task.set_condition(condition) + assert task.condition == condition