diff --git a/examples/multi_agent/utils/uvloop_example.py b/examples/multi_agent/utils/uvloop_example.py deleted file mode 100644 index acc9f70e..00000000 --- a/examples/multi_agent/utils/uvloop_example.py +++ /dev/null @@ -1,122 +0,0 @@ -""" -Example demonstrating the use of uvloop for running multiple agents concurrently. - -This example shows how to use the new uvloop-based functions: -- run_agents_concurrently_uvloop: For running multiple agents with the same task -- run_agents_with_tasks_uvloop: For running agents with different tasks - -uvloop provides significant performance improvements over standard asyncio, -especially for I/O-bound operations and concurrent task execution. -""" - -import os -from swarms.structs.multi_agent_exec import ( - run_agents_concurrently_uvloop, - run_agents_with_tasks_uvloop, -) -from swarms.structs.agent import Agent - - -def create_example_agents(num_agents: int = 3): - """Create example agents for demonstration.""" - agents = [] - for i in range(num_agents): - agent = Agent( - agent_name=f"Agent_{i+1}", - system_prompt=f"You are Agent {i+1}, a helpful AI assistant.", - model_name="gpt-4o-mini", # Using a lightweight model for examples - max_loops=1, - autosave=False, - verbose=False, - ) - agents.append(agent) - return agents - - -def example_same_task(): - """Example: Running multiple agents with the same task using uvloop.""" - print("=== Example 1: Same Task for All Agents (uvloop) ===") - - agents = create_example_agents(3) - task = ( - "Write a one-sentence summary about artificial intelligence." - ) - - print(f"Running {len(agents)} agents with the same task...") - print(f"Task: {task}") - - try: - results = run_agents_concurrently_uvloop(agents, task) - - print("\nResults:") - for i, result in enumerate(results, 1): - print(f"Agent {i}: {result}") - - except Exception as e: - print(f"Error: {e}") - - -def example_different_tasks(): - """Example: Running agents with different tasks using uvloop.""" - print( - "\n=== Example 2: Different Tasks for Each Agent (uvloop) ===" - ) - - agents = create_example_agents(3) - tasks = [ - "Explain what machine learning is in simple terms.", - "Describe the benefits of cloud computing.", - "What are the main challenges in natural language processing?", - ] - - print(f"Running {len(agents)} agents with different tasks...") - - try: - results = run_agents_with_tasks_uvloop(agents, tasks) - - print("\nResults:") - for i, (result, task) in enumerate(zip(results, tasks), 1): - print(f"Agent {i} (Task: {task[:50]}...):") - print(f" Response: {result}") - print() - - except Exception as e: - print(f"Error: {e}") - - -def performance_comparison(): - """Demonstrate the performance benefit of uvloop vs standard asyncio.""" - print("\n=== Performance Comparison ===") - - # Note: This is a conceptual example. In practice, you'd need to measure actual performance - print("uvloop vs Standard asyncio:") - print("• uvloop: Cython-based event loop, ~2-4x faster") - print("• Better for I/O-bound operations") - print("• Lower latency and higher throughput") - print("• Especially beneficial for concurrent agent execution") - print("• Automatic fallback to asyncio if uvloop unavailable") - - -if __name__ == "__main__": - # Check if API key is available - if not os.getenv("OPENAI_API_KEY"): - print( - "Please set your OPENAI_API_KEY environment variable to run this example." - ) - print("Example: export OPENAI_API_KEY='your-api-key-here'") - exit(1) - - print("🚀 uvloop Multi-Agent Execution Examples") - print("=" * 50) - - # Run examples - example_same_task() - example_different_tasks() - performance_comparison() - - print("\n✅ Examples completed!") - print("\nTo use uvloop functions in your code:") - print( - "from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop" - ) - print("results = run_agents_concurrently_uvloop(agents, task)") diff --git a/requirements.txt b/requirements.txt index 763d2987..325b3574 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,4 @@ numpy orjson schedule uvloop +winloop diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 29b92dcd..145a736c 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -101,7 +101,6 @@ from swarms.structs.swarming_architectures import ( staircase_swarm, star_swarm, ) -from swarms.structs.aop import AOP __all__ = [ "Agent", diff --git a/swarms/structs/heavy_swarm.py b/swarms/structs/heavy_swarm.py index f82a5398..36a91536 100644 --- a/swarms/structs/heavy_swarm.py +++ b/swarms/structs/heavy_swarm.py @@ -213,62 +213,62 @@ schema = [schema] class HeavySwarm: """ - HeavySwarm is a sophisticated multi-agent orchestration system that - decomposes complex tasks into specialized questions and executes them - using four specialized agents: Research, Analysis, Alternatives, and - Verification. The results are then synthesized into a comprehensive - response. - - This swarm architecture provides robust task analysis through: - - Intelligent question generation for specialized agent roles - - Parallel execution of specialized agents for efficiency - - Comprehensive synthesis of multi-perspective results - - Real-time progress monitoring with rich dashboard displays - - Reliability checks and validation systems - - Multi-loop iterative refinement with context preservation - - The HeavySwarm follows a structured workflow: - 1. Task decomposition into specialized questions - 2. Parallel execution by specialized agents - 3. Result synthesis and integration - 4. Comprehensive final report generation - 5. Optional iterative refinement through multiple loops - - Key Features: - - **Multi-loop Execution**: The max_loops parameter enables iterative - refinement where each subsequent loop builds upon the context and - results from previous loops -S **Iterative Refinement**: Each loop can refine, improve, or complete - aspects of the analysis based on previous results - - Attributes: - name (str): Name identifier for the swarm instance - description (str): Description of the swarm's purpose - agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally) - timeout (int): Maximum execution time per agent in seconds - aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis') - loops_per_agent (int): Number of execution loops per agent - question_agent_model_name (str): Model name for question generation - worker_model_name (str): Model name for specialized worker agents - verbose (bool): Enable detailed logging output - max_workers (int): Maximum number of concurrent worker threads - show_dashboard (bool): Enable rich dashboard with progress visualization - agent_prints_on (bool): Enable individual agent output printing - max_loops (int): Maximum number of execution loops for iterative refinement - conversation (Conversation): Conversation history tracker - console (Console): Rich console for dashboard output - - Example: - >>> swarm = HeavySwarm( - ... name="AnalysisSwarm", - ... description="Market analysis swarm", - ... question_agent_model_name="gpt-4o-mini", - ... worker_model_name="gpt-4o-mini", - ... show_dashboard=True, - ... max_loops=3 - ... ) - >>> result = swarm.run("Analyze the current cryptocurrency market trends") - >>> # The swarm will run 3 iterations, each building upon the previous results + HeavySwarm is a sophisticated multi-agent orchestration system that + decomposes complex tasks into specialized questions and executes them + using four specialized agents: Research, Analysis, Alternatives, and + Verification. The results are then synthesized into a comprehensive + response. + + This swarm architecture provides robust task analysis through: + - Intelligent question generation for specialized agent roles + - Parallel execution of specialized agents for efficiency + - Comprehensive synthesis of multi-perspective results + - Real-time progress monitoring with rich dashboard displays + - Reliability checks and validation systems + - Multi-loop iterative refinement with context preservation + + The HeavySwarm follows a structured workflow: + 1. Task decomposition into specialized questions + 2. Parallel execution by specialized agents + 3. Result synthesis and integration + 4. Comprehensive final report generation + 5. Optional iterative refinement through multiple loops + + Key Features: + - **Multi-loop Execution**: The max_loops parameter enables iterative + refinement where each subsequent loop builds upon the context and + results from previous loops + S **Iterative Refinement**: Each loop can refine, improve, or complete + aspects of the analysis based on previous results + + Attributes: + name (str): Name identifier for the swarm instance + description (str): Description of the swarm's purpose + agents (Dict[str, Agent]): Dictionary of specialized agent instances (created internally) + timeout (int): Maximum execution time per agent in seconds + aggregation_strategy (str): Strategy for result aggregation (currently 'synthesis') + loops_per_agent (int): Number of execution loops per agent + question_agent_model_name (str): Model name for question generation + worker_model_name (str): Model name for specialized worker agents + verbose (bool): Enable detailed logging output + max_workers (int): Maximum number of concurrent worker threads + show_dashboard (bool): Enable rich dashboard with progress visualization + agent_prints_on (bool): Enable individual agent output printing + max_loops (int): Maximum number of execution loops for iterative refinement + conversation (Conversation): Conversation history tracker + console (Console): Rich console for dashboard output + + Example: + >>> swarm = HeavySwarm( + ... name="AnalysisSwarm", + ... description="Market analysis swarm", + ... question_agent_model_name="gpt-4o-mini", + ... worker_model_name="gpt-4o-mini", + ... show_dashboard=True, + ... max_loops=3 + ... ) + >>> result = swarm.run("Analyze the current cryptocurrency market trends") + >>> # The swarm will run 3 iterations, each building upon the previous results """ def __init__( diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 0b41478c..44918d03 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -1,13 +1,12 @@ import asyncio import concurrent.futures import os +import sys from concurrent.futures import ( ThreadPoolExecutor, ) from typing import Any, Callable, List, Optional, Union -import uvloop -import sys from loguru import logger from swarms.structs.agent import Agent @@ -17,20 +16,50 @@ from swarms.structs.omni_agent_types import AgentType def run_single_agent( agent: AgentType, task: str, *args, **kwargs ) -> Any: - """Run a single agent synchronously""" + """ + Run a single agent synchronously with the given task. + + This function provides a synchronous wrapper for executing a single agent + with a specific task. It passes through any additional arguments and + keyword arguments to the agent's run method. + + Args: + agent (AgentType): The agent instance to execute + task (str): The task string to be executed by the agent + *args: Variable length argument list passed to agent.run() + **kwargs: Arbitrary keyword arguments passed to agent.run() + + Returns: + Any: The result returned by the agent's run method + + Example: + >>> agent = SomeAgent() + >>> result = run_single_agent(agent, "Analyze this data") + >>> print(result) + """ return agent.run(task=task, *args, **kwargs) async def run_agent_async(agent: AgentType, task: str) -> Any: """ - Run an agent asynchronously. + Run an agent asynchronously using asyncio event loop. + + This function executes a single agent asynchronously by running it in a + thread executor to avoid blocking the event loop. It's designed to be + used within async contexts for concurrent execution. Args: - agent: Agent instance to run - task: Task string to execute + agent (AgentType): The agent instance to execute asynchronously + task (str): The task string to be executed by the agent Returns: - Agent execution result + Any: The result returned by the agent's run method + + Example: + >>> async def main(): + ... agent = SomeAgent() + ... result = await run_agent_async(agent, "Process data") + ... return result """ loop = asyncio.get_event_loop() return await loop.run_in_executor( @@ -42,14 +71,25 @@ async def run_agents_concurrently_async( agents: List[AgentType], task: str ) -> List[Any]: """ - Run multiple agents concurrently using asyncio. + Run multiple agents concurrently using asyncio gather. + + This function executes multiple agents concurrently using asyncio.gather(), + which runs all agents in parallel and waits for all to complete. Each agent + runs the same task asynchronously. Args: - agents: List of Agent instances to run concurrently - task: Task string to execute + agents (List[AgentType]): List of agent instances to run concurrently + task (str): The task string to be executed by all agents Returns: - List of outputs from each agent + List[Any]: List of results from each agent in the same order as input + + Example: + >>> async def main(): + ... agents = [Agent1(), Agent2(), Agent3()] + ... results = await run_agents_concurrently_async(agents, "Analyze data") + ... for i, result in enumerate(results): + ... print(f"Agent {i+1} result: {result}") """ results = await asyncio.gather( *(run_agent_async(agent, task) for agent in agents) @@ -63,15 +103,35 @@ def run_agents_concurrently( max_workers: Optional[int] = None, ) -> List[Any]: """ - Optimized concurrent agent runner using ThreadPoolExecutor. + Run multiple agents concurrently using ThreadPoolExecutor for optimal performance. + + This function executes multiple agents concurrently using a thread pool executor, + which provides better performance than asyncio for CPU-bound tasks. It automatically + determines the optimal number of worker threads based on available CPU cores. Args: - agents: List of Agent instances to run concurrently - task: Task string to execute - max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores) + agents (List[AgentType]): List of agent instances to run concurrently + task (str): The task string to be executed by all agents + max_workers (Optional[int]): Maximum number of threads in the executor. + Defaults to 95% of available CPU cores for optimal performance Returns: - List of outputs from each agent + List[Any]: List of results from each agent. If an agent fails, the exception + is included in the results list instead of the result. + + Note: + - Uses 95% of CPU cores by default for optimal resource utilization + - Handles exceptions gracefully by including them in the results + - Results may not be in the same order as input agents due to concurrent execution + + Example: + >>> agents = [Agent1(), Agent2(), Agent3()] + >>> results = run_agents_concurrently(agents, "Process data") + >>> for i, result in enumerate(results): + ... if isinstance(result, Exception): + ... print(f"Agent {i+1} failed: {result}") + ... else: + ... print(f"Agent {i+1} result: {result}") """ if max_workers is None: # 95% of the available CPU cores @@ -104,16 +164,30 @@ def run_agents_concurrently_multiprocess( agents: List[Agent], task: str, batch_size: int = os.cpu_count() ) -> List[Any]: """ - Manage and run multiple agents concurrently in batches, with optimized performance. + Run multiple agents concurrently in batches using asyncio for optimized performance. + + This function processes agents in batches to avoid overwhelming system resources + while still achieving high concurrency. It uses asyncio internally to manage + the concurrent execution of agent batches. Args: - agents (List[Agent]): List of Agent instances to run concurrently. - task (str): The task string to execute by all agents. + agents (List[Agent]): List of Agent instances to run concurrently + task (str): The task string to be executed by all agents batch_size (int, optional): Number of agents to run in parallel in each batch. - Defaults to the number of CPU cores. + Defaults to the number of CPU cores for optimal resource usage Returns: - List[Any]: A list of outputs from each agent. + List[Any]: List of results from each agent, maintaining the order of input agents + + Note: + - Processes agents in batches to prevent resource exhaustion + - Uses asyncio for efficient concurrent execution within batches + - Results are returned in the same order as input agents + + Example: + >>> agents = [Agent1(), Agent2(), Agent3(), Agent4(), Agent5()] + >>> results = run_agents_concurrently_multiprocess(agents, "Analyze data", batch_size=2) + >>> print(f"Processed {len(results)} agents") """ results = [] loop = asyncio.get_event_loop() @@ -135,15 +209,36 @@ def batched_grid_agent_execution( max_workers: int = None, ) -> List[Any]: """ - Run multiple agents with different tasks concurrently. + Run multiple agents with different tasks concurrently using ThreadPoolExecutor. + + This function pairs each agent with a specific task and executes them concurrently. + It's designed for scenarios where different agents need to work on different tasks + simultaneously, creating a grid-like execution pattern. Args: - agents (List[AgentType]): List of agent instances. - tasks (List[str]): List of tasks, one for each agent. - max_workers (int, optional): Maximum number of threads to use. Defaults to 90% of CPU cores. + agents (List[AgentType]): List of agent instances to execute + tasks (List[str]): List of task strings, one for each agent. Must match the number of agents + max_workers (int, optional): Maximum number of threads to use. + Defaults to 90% of available CPU cores for optimal performance Returns: - List[Any]: List of results from each agent. + List[Any]: List of results from each agent in the same order as input agents. + If an agent fails, the exception is included in the results. + + Raises: + ValueError: If the number of agents doesn't match the number of tasks + + Note: + - Uses 90% of CPU cores by default for optimal resource utilization + - Results maintain the same order as input agents + - Handles exceptions gracefully by including them in results + + Example: + >>> agents = [Agent1(), Agent2(), Agent3()] + >>> tasks = ["Task A", "Task B", "Task C"] + >>> results = batched_grid_agent_execution(agents, tasks) + >>> for i, result in enumerate(results): + ... print(f"Agent {i+1} with {tasks[i]}: {result}") """ logger.info( f"Batch Grid Execution with {len(agents)} agents and number of tasks: {len(tasks)}" @@ -185,16 +280,34 @@ def run_agents_with_different_tasks( """ Run multiple agents with different tasks concurrently, processing them in batches. - This function executes each agent on its corresponding task, processing the agent-task pairs in batches - of size `batch_size` for efficient resource utilization. + This function executes each agent on its corresponding task, processing the agent-task pairs + in batches for efficient resource utilization. It's designed for scenarios where you have + a large number of agent-task pairs that need to be processed efficiently. Args: - agent_task_pairs: List of (agent, task) tuples. - batch_size: Number of agents to run in parallel in each batch. - max_workers: Maximum number of threads. + agent_task_pairs (List[tuple[AgentType, str]]): List of (agent, task) tuples to execute. + Each tuple contains an agent instance and its task + batch_size (int, optional): Number of agent-task pairs to process in parallel in each batch. + Defaults to 10 for balanced resource usage + max_workers (int, optional): Maximum number of threads to use for each batch. + If None, uses the default from batched_grid_agent_execution Returns: - List of outputs from each agent, in the same order as the input pairs. + List[Any]: List of outputs from each agent-task pair, maintaining the same order as input pairs. + If an agent fails, the exception is included in the results. + + Note: + - Processes agent-task pairs in batches to prevent resource exhaustion + - Results maintain the same order as input pairs + - Handles exceptions gracefully by including them in results + - Uses batched_grid_agent_execution internally for each batch + + Example: + >>> pairs = [(agent1, "Task A"), (agent2, "Task B"), (agent3, "Task C")] + >>> results = run_agents_with_different_tasks(pairs, batch_size=5) + >>> for i, result in enumerate(results): + ... agent, task = pairs[i] + ... print(f"Agent {agent.agent_name} with {task}: {result}") """ if not agent_task_pairs: return [] @@ -217,30 +330,52 @@ def run_agents_concurrently_uvloop( max_workers: Optional[int] = None, ) -> List[Any]: """ - Run multiple agents concurrently using optimized async performance. + Run multiple agents concurrently using optimized async performance with uvloop/winloop. - Uses uvloop on Linux/macOS and winloop on Windows for enhanced performance. - Falls back to standard asyncio if optimized event loops are not available. + This function provides high-performance concurrent execution of multiple agents using + optimized event loop implementations. It automatically selects the best available + event loop for the platform (uvloop on Unix systems, winloop on Windows). Args: - agents: List of Agent instances to run concurrently - task: Task string to execute by all agents - max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores) + agents (List[AgentType]): List of agent instances to run concurrently + task (str): The task string to be executed by all agents + max_workers (Optional[int]): Maximum number of threads in the executor. + Defaults to 95% of available CPU cores for optimal performance Returns: - List of outputs from each agent + List[Any]: List of results from each agent. If an agent fails, the exception + is included in the results list instead of the result. Raises: - ImportError: If neither uvloop nor winloop is available - RuntimeError: If event loop policy cannot be set + ImportError: If neither uvloop nor winloop is available (falls back to standard asyncio) + RuntimeError: If event loop policy cannot be set (falls back to standard asyncio) + + Note: + - Automatically uses uvloop on Linux/macOS and winloop on Windows + - Falls back gracefully to standard asyncio if optimized loops are unavailable + - Uses 95% of CPU cores by default for optimal resource utilization + - Handles exceptions gracefully by including them in results + - Results may not be in the same order as input agents due to concurrent execution + + Example: + >>> agents = [Agent1(), Agent2(), Agent3()] + >>> results = run_agents_concurrently_uvloop(agents, "Process data") + >>> for i, result in enumerate(results): + ... if isinstance(result, Exception): + ... print(f"Agent {i+1} failed: {result}") + ... else: + ... print(f"Agent {i+1} result: {result}") """ # Platform-specific event loop policy setup - if sys.platform in ('win32', 'cygwin'): + if sys.platform in ("win32", "cygwin"): # Windows: Try to use winloop try: import winloop + asyncio.set_event_loop_policy(winloop.EventLoopPolicy()) - logger.info("Using winloop for enhanced Windows performance") + logger.info( + "Using winloop for enhanced Windows performance" + ) except ImportError: logger.warning( "winloop not available, falling back to standard asyncio. " @@ -254,6 +389,7 @@ def run_agents_concurrently_uvloop( # Linux/macOS: Try to use uvloop try: import uvloop + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logger.info("Using uvloop for enhanced Unix performance") except ImportError: @@ -330,21 +466,42 @@ def run_agents_with_tasks_uvloop( max_workers: Optional[int] = None, ) -> List[Any]: """ - Run multiple agents with different tasks concurrently using optimized performance. + Run multiple agents with different tasks concurrently using optimized async performance. - This function pairs each agent with a specific task and runs them concurrently - using uvloop on Linux/macOS and winloop on Windows for optimized performance. + This function pairs each agent with a specific task and runs them concurrently using + optimized event loop implementations (uvloop on Unix systems, winloop on Windows). + It's designed for high-performance scenarios where different agents need to work + on different tasks simultaneously. Args: - agents: List of Agent instances to run - tasks: List of task strings (must match number of agents) - max_workers: Maximum number of threads (defaults to 95% of CPU cores) + agents (List[AgentType]): List of agent instances to run + tasks (List[str]): List of task strings, one for each agent. Must match the number of agents + max_workers (Optional[int]): Maximum number of threads in the executor. + Defaults to 95% of available CPU cores for optimal performance Returns: - List of outputs from each agent + List[Any]: List of results from each agent in the same order as input agents. + If an agent fails, the exception is included in the results. Raises: - ValueError: If number of agents doesn't match number of tasks + ValueError: If the number of agents doesn't match the number of tasks + + Note: + - Automatically uses uvloop on Linux/macOS and winloop on Windows + - Falls back gracefully to standard asyncio if optimized loops are unavailable + - Uses 95% of CPU cores by default for optimal resource utilization + - Results maintain the same order as input agents + - Handles exceptions gracefully by including them in results + + Example: + >>> agents = [Agent1(), Agent2(), Agent3()] + >>> tasks = ["Task A", "Task B", "Task C"] + >>> results = run_agents_with_tasks_uvloop(agents, tasks) + >>> for i, result in enumerate(results): + ... if isinstance(result, Exception): + ... print(f"Agent {i+1} with {tasks[i]} failed: {result}") + ... else: + ... print(f"Agent {i+1} with {tasks[i]}: {result}") """ if len(agents) != len(tasks): raise ValueError( @@ -352,12 +509,15 @@ def run_agents_with_tasks_uvloop( ) # Platform-specific event loop policy setup - if sys.platform in ('win32', 'cygwin'): + if sys.platform in ("win32", "cygwin"): # Windows: Try to use winloop try: import winloop + asyncio.set_event_loop_policy(winloop.EventLoopPolicy()) - logger.info("Using winloop for enhanced Windows performance") + logger.info( + "Using winloop for enhanced Windows performance" + ) except ImportError: logger.warning( "winloop not available, falling back to standard asyncio. " @@ -371,6 +531,7 @@ def run_agents_with_tasks_uvloop( # Linux/macOS: Try to use uvloop try: import uvloop + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logger.info("Using uvloop for enhanced Unix performance") except ImportError: @@ -445,10 +606,40 @@ def run_agents_with_tasks_uvloop( def get_swarms_info(swarms: List[Callable]) -> str: """ - Fetches and formats information about all available swarms in the system. + Fetch and format information about all available swarms in the system. + + This function provides a comprehensive overview of all swarms currently + available in the system, including their names, descriptions, agent counts, + and swarm types. It's useful for debugging, monitoring, and system introspection. + + Args: + swarms (List[Callable]): List of swarm instances to get information about. + Each swarm should have name, description, agents, and swarm_type attributes Returns: - str: A formatted string containing names and descriptions of all swarms. + str: A formatted string containing detailed information about all swarms. + Returns "No swarms currently available in the system." if the list is empty. + + Note: + - Each swarm is expected to have the following attributes: + - name: The name of the swarm + - description: A description of the swarm's purpose + - agents: A list of agents in the swarm + - swarm_type: The type/category of the swarm + - The output is formatted for human readability with clear section headers + + Example: + >>> swarms = [swarm1, swarm2, swarm3] + >>> info = get_swarms_info(swarms) + >>> print(info) + Available Swarms: + + [Swarm 1] + Name: Data Processing Swarm + Description: Handles data analysis tasks + Length of Agents: 5 + Swarm Type: Analysis + ... """ if not swarms: return "No swarms currently available in the system." @@ -477,10 +668,47 @@ def get_agents_info( agents: List[Union[Agent, Callable]], team_name: str = None ) -> str: """ - Fetches and formats information about all available agents in the system. + Fetch and format information about all available agents in the system. + + This function provides a comprehensive overview of all agents currently + available in the system, including their names, descriptions, roles, + models, and configuration details. It's useful for debugging, monitoring, + and system introspection. + + Args: + agents (List[Union[Agent, Callable]]): List of agent instances to get information about. + Each agent should have agent_name, agent_description, + role, model_name, and max_loops attributes + team_name (str, optional): Optional team name to include in the output header. + If None, uses a generic header Returns: - str: A formatted string containing names and descriptions of all swarms. + str: A formatted string containing detailed information about all agents. + Returns "No agents currently available in the system." if the list is empty. + + Note: + - Each agent is expected to have the following attributes: + - agent_name: The name of the agent + - agent_description: A description of the agent's purpose + - role: The role or function of the agent + - model_name: The AI model used by the agent + - max_loops: The maximum number of loops the agent can execute + - The output is formatted for human readability with clear section headers + - Team name is included in the header if provided + + Example: + >>> agents = [agent1, agent2, agent3] + >>> info = get_agents_info(agents, team_name="Data Team") + >>> print(info) + Available Agents for Team: Data Team + + [Agent 1] + Name: Data Analyzer + Description: Analyzes data patterns + Role: Analyst + Model: gpt-4 + Max Loops: 10 + ... """ if not agents: return "No agents currently available in the system." diff --git a/uvloop_example.py b/uvloop_example.py new file mode 100644 index 00000000..6714f89c --- /dev/null +++ b/uvloop_example.py @@ -0,0 +1,30 @@ +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import ( + run_agents_concurrently_uvloop, +) + + +def create_example_agents(num_agents: int = 3): + """Create example agents for demonstration.""" + agents = [] + for i in range(num_agents): + agent = Agent( + agent_name=f"Agent_{i+1}", + system_prompt=f"You are Agent {i+1}, a helpful AI assistant.", + model_name="gpt-4o-mini", # Using a lightweight model for examples + max_loops=1, + autosave=False, + verbose=False, + ) + agents.append(agent) + return agents + + +agents = create_example_agents(3) + +task = "Write a one-sentence summary about artificial intelligence." + + +results = run_agents_concurrently_uvloop(agents, task) + +print(results)