[example][uvloop] [fix aop double init]

pull/1117/merge
Kye Gomez 5 days ago
parent 2bebf31f54
commit 458b4921d2

@ -1,122 +0,0 @@
"""
Example demonstrating the use of uvloop for running multiple agents concurrently.
This example shows how to use the new uvloop-based functions:
- run_agents_concurrently_uvloop: For running multiple agents with the same task
- run_agents_with_tasks_uvloop: For running agents with different tasks
uvloop provides significant performance improvements over standard asyncio,
especially for I/O-bound operations and concurrent task execution.
"""
import os
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_uvloop,
run_agents_with_tasks_uvloop,
)
from swarms.structs.agent import Agent
def create_example_agents(num_agents: int = 3):
"""Create example agents for demonstration."""
agents = []
for i in range(num_agents):
agent = Agent(
agent_name=f"Agent_{i+1}",
system_prompt=f"You are Agent {i+1}, a helpful AI assistant.",
model_name="gpt-4o-mini", # Using a lightweight model for examples
max_loops=1,
autosave=False,
verbose=False,
)
agents.append(agent)
return agents
def example_same_task():
"""Example: Running multiple agents with the same task using uvloop."""
print("=== Example 1: Same Task for All Agents (uvloop) ===")
agents = create_example_agents(3)
task = (
"Write a one-sentence summary about artificial intelligence."
)
print(f"Running {len(agents)} agents with the same task...")
print(f"Task: {task}")
try:
results = run_agents_concurrently_uvloop(agents, task)
print("\nResults:")
for i, result in enumerate(results, 1):
print(f"Agent {i}: {result}")
except Exception as e:
print(f"Error: {e}")
def example_different_tasks():
"""Example: Running agents with different tasks using uvloop."""
print(
"\n=== Example 2: Different Tasks for Each Agent (uvloop) ==="
)
agents = create_example_agents(3)
tasks = [
"Explain what machine learning is in simple terms.",
"Describe the benefits of cloud computing.",
"What are the main challenges in natural language processing?",
]
print(f"Running {len(agents)} agents with different tasks...")
try:
results = run_agents_with_tasks_uvloop(agents, tasks)
print("\nResults:")
for i, (result, task) in enumerate(zip(results, tasks), 1):
print(f"Agent {i} (Task: {task[:50]}...):")
print(f" Response: {result}")
print()
except Exception as e:
print(f"Error: {e}")
def performance_comparison():
"""Demonstrate the performance benefit of uvloop vs standard asyncio."""
print("\n=== Performance Comparison ===")
# Note: This is a conceptual example. In practice, you'd need to measure actual performance
print("uvloop vs Standard asyncio:")
print("• uvloop: Cython-based event loop, ~2-4x faster")
print("• Better for I/O-bound operations")
print("• Lower latency and higher throughput")
print("• Especially beneficial for concurrent agent execution")
print("• Automatic fallback to asyncio if uvloop unavailable")
if __name__ == "__main__":
# Check if API key is available
if not os.getenv("OPENAI_API_KEY"):
print(
"Please set your OPENAI_API_KEY environment variable to run this example."
)
print("Example: export OPENAI_API_KEY='your-api-key-here'")
exit(1)
print("🚀 uvloop Multi-Agent Execution Examples")
print("=" * 50)
# Run examples
example_same_task()
example_different_tasks()
performance_comparison()
print("\n✅ Examples completed!")
print("\nTo use uvloop functions in your code:")
print(
"from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop"
)
print("results = run_agents_concurrently_uvloop(agents, task)")

@ -26,3 +26,4 @@ numpy
orjson
schedule
uvloop
winloop

@ -101,7 +101,6 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.aop import AOP
__all__ = [
"Agent",

@ -238,7 +238,7 @@ class HeavySwarm:
- **Multi-loop Execution**: The max_loops parameter enables iterative
refinement where each subsequent loop builds upon the context and
results from previous loops
S **Iterative Refinement**: Each loop can refine, improve, or complete
S **Iterative Refinement**: Each loop can refine, improve, or complete
aspects of the analysis based on previous results
Attributes:

@ -1,13 +1,12 @@
import asyncio
import concurrent.futures
import os
import sys
from concurrent.futures import (
ThreadPoolExecutor,
)
from typing import Any, Callable, List, Optional, Union
import uvloop
import sys
from loguru import logger
from swarms.structs.agent import Agent
@ -17,20 +16,50 @@ from swarms.structs.omni_agent_types import AgentType
def run_single_agent(
agent: AgentType, task: str, *args, **kwargs
) -> Any:
"""Run a single agent synchronously"""
"""
Run a single agent synchronously with the given task.
This function provides a synchronous wrapper for executing a single agent
with a specific task. It passes through any additional arguments and
keyword arguments to the agent's run method.
Args:
agent (AgentType): The agent instance to execute
task (str): The task string to be executed by the agent
*args: Variable length argument list passed to agent.run()
**kwargs: Arbitrary keyword arguments passed to agent.run()
Returns:
Any: The result returned by the agent's run method
Example:
>>> agent = SomeAgent()
>>> result = run_single_agent(agent, "Analyze this data")
>>> print(result)
"""
return agent.run(task=task, *args, **kwargs)
async def run_agent_async(agent: AgentType, task: str) -> Any:
"""
Run an agent asynchronously.
Run an agent asynchronously using asyncio event loop.
This function executes a single agent asynchronously by running it in a
thread executor to avoid blocking the event loop. It's designed to be
used within async contexts for concurrent execution.
Args:
agent: Agent instance to run
task: Task string to execute
agent (AgentType): The agent instance to execute asynchronously
task (str): The task string to be executed by the agent
Returns:
Agent execution result
Any: The result returned by the agent's run method
Example:
>>> async def main():
... agent = SomeAgent()
... result = await run_agent_async(agent, "Process data")
... return result
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
@ -42,14 +71,25 @@ async def run_agents_concurrently_async(
agents: List[AgentType], task: str
) -> List[Any]:
"""
Run multiple agents concurrently using asyncio.
Run multiple agents concurrently using asyncio gather.
This function executes multiple agents concurrently using asyncio.gather(),
which runs all agents in parallel and waits for all to complete. Each agent
runs the same task asynchronously.
Args:
agents: List of Agent instances to run concurrently
task: Task string to execute
agents (List[AgentType]): List of agent instances to run concurrently
task (str): The task string to be executed by all agents
Returns:
List of outputs from each agent
List[Any]: List of results from each agent in the same order as input
Example:
>>> async def main():
... agents = [Agent1(), Agent2(), Agent3()]
... results = await run_agents_concurrently_async(agents, "Analyze data")
... for i, result in enumerate(results):
... print(f"Agent {i+1} result: {result}")
"""
results = await asyncio.gather(
*(run_agent_async(agent, task) for agent in agents)
@ -63,15 +103,35 @@ def run_agents_concurrently(
max_workers: Optional[int] = None,
) -> List[Any]:
"""
Optimized concurrent agent runner using ThreadPoolExecutor.
Run multiple agents concurrently using ThreadPoolExecutor for optimal performance.
This function executes multiple agents concurrently using a thread pool executor,
which provides better performance than asyncio for CPU-bound tasks. It automatically
determines the optimal number of worker threads based on available CPU cores.
Args:
agents: List of Agent instances to run concurrently
task: Task string to execute
max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores)
agents (List[AgentType]): List of agent instances to run concurrently
task (str): The task string to be executed by all agents
max_workers (Optional[int]): Maximum number of threads in the executor.
Defaults to 95% of available CPU cores for optimal performance
Returns:
List of outputs from each agent
List[Any]: List of results from each agent. If an agent fails, the exception
is included in the results list instead of the result.
Note:
- Uses 95% of CPU cores by default for optimal resource utilization
- Handles exceptions gracefully by including them in the results
- Results may not be in the same order as input agents due to concurrent execution
Example:
>>> agents = [Agent1(), Agent2(), Agent3()]
>>> results = run_agents_concurrently(agents, "Process data")
>>> for i, result in enumerate(results):
... if isinstance(result, Exception):
... print(f"Agent {i+1} failed: {result}")
... else:
... print(f"Agent {i+1} result: {result}")
"""
if max_workers is None:
# 95% of the available CPU cores
@ -104,16 +164,30 @@ def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = os.cpu_count()
) -> List[Any]:
"""
Manage and run multiple agents concurrently in batches, with optimized performance.
Run multiple agents concurrently in batches using asyncio for optimized performance.
This function processes agents in batches to avoid overwhelming system resources
while still achieving high concurrency. It uses asyncio internally to manage
the concurrent execution of agent batches.
Args:
agents (List[Agent]): List of Agent instances to run concurrently.
task (str): The task string to execute by all agents.
agents (List[Agent]): List of Agent instances to run concurrently
task (str): The task string to be executed by all agents
batch_size (int, optional): Number of agents to run in parallel in each batch.
Defaults to the number of CPU cores.
Defaults to the number of CPU cores for optimal resource usage
Returns:
List[Any]: A list of outputs from each agent.
List[Any]: List of results from each agent, maintaining the order of input agents
Note:
- Processes agents in batches to prevent resource exhaustion
- Uses asyncio for efficient concurrent execution within batches
- Results are returned in the same order as input agents
Example:
>>> agents = [Agent1(), Agent2(), Agent3(), Agent4(), Agent5()]
>>> results = run_agents_concurrently_multiprocess(agents, "Analyze data", batch_size=2)
>>> print(f"Processed {len(results)} agents")
"""
results = []
loop = asyncio.get_event_loop()
@ -135,15 +209,36 @@ def batched_grid_agent_execution(
max_workers: int = None,
) -> List[Any]:
"""
Run multiple agents with different tasks concurrently.
Run multiple agents with different tasks concurrently using ThreadPoolExecutor.
This function pairs each agent with a specific task and executes them concurrently.
It's designed for scenarios where different agents need to work on different tasks
simultaneously, creating a grid-like execution pattern.
Args:
agents (List[AgentType]): List of agent instances.
tasks (List[str]): List of tasks, one for each agent.
max_workers (int, optional): Maximum number of threads to use. Defaults to 90% of CPU cores.
agents (List[AgentType]): List of agent instances to execute
tasks (List[str]): List of task strings, one for each agent. Must match the number of agents
max_workers (int, optional): Maximum number of threads to use.
Defaults to 90% of available CPU cores for optimal performance
Returns:
List[Any]: List of results from each agent.
List[Any]: List of results from each agent in the same order as input agents.
If an agent fails, the exception is included in the results.
Raises:
ValueError: If the number of agents doesn't match the number of tasks
Note:
- Uses 90% of CPU cores by default for optimal resource utilization
- Results maintain the same order as input agents
- Handles exceptions gracefully by including them in results
Example:
>>> agents = [Agent1(), Agent2(), Agent3()]
>>> tasks = ["Task A", "Task B", "Task C"]
>>> results = batched_grid_agent_execution(agents, tasks)
>>> for i, result in enumerate(results):
... print(f"Agent {i+1} with {tasks[i]}: {result}")
"""
logger.info(
f"Batch Grid Execution with {len(agents)} agents and number of tasks: {len(tasks)}"
@ -185,16 +280,34 @@ def run_agents_with_different_tasks(
"""
Run multiple agents with different tasks concurrently, processing them in batches.
This function executes each agent on its corresponding task, processing the agent-task pairs in batches
of size `batch_size` for efficient resource utilization.
This function executes each agent on its corresponding task, processing the agent-task pairs
in batches for efficient resource utilization. It's designed for scenarios where you have
a large number of agent-task pairs that need to be processed efficiently.
Args:
agent_task_pairs: List of (agent, task) tuples.
batch_size: Number of agents to run in parallel in each batch.
max_workers: Maximum number of threads.
agent_task_pairs (List[tuple[AgentType, str]]): List of (agent, task) tuples to execute.
Each tuple contains an agent instance and its task
batch_size (int, optional): Number of agent-task pairs to process in parallel in each batch.
Defaults to 10 for balanced resource usage
max_workers (int, optional): Maximum number of threads to use for each batch.
If None, uses the default from batched_grid_agent_execution
Returns:
List of outputs from each agent, in the same order as the input pairs.
List[Any]: List of outputs from each agent-task pair, maintaining the same order as input pairs.
If an agent fails, the exception is included in the results.
Note:
- Processes agent-task pairs in batches to prevent resource exhaustion
- Results maintain the same order as input pairs
- Handles exceptions gracefully by including them in results
- Uses batched_grid_agent_execution internally for each batch
Example:
>>> pairs = [(agent1, "Task A"), (agent2, "Task B"), (agent3, "Task C")]
>>> results = run_agents_with_different_tasks(pairs, batch_size=5)
>>> for i, result in enumerate(results):
... agent, task = pairs[i]
... print(f"Agent {agent.agent_name} with {task}: {result}")
"""
if not agent_task_pairs:
return []
@ -217,30 +330,52 @@ def run_agents_concurrently_uvloop(
max_workers: Optional[int] = None,
) -> List[Any]:
"""
Run multiple agents concurrently using optimized async performance.
Run multiple agents concurrently using optimized async performance with uvloop/winloop.
Uses uvloop on Linux/macOS and winloop on Windows for enhanced performance.
Falls back to standard asyncio if optimized event loops are not available.
This function provides high-performance concurrent execution of multiple agents using
optimized event loop implementations. It automatically selects the best available
event loop for the platform (uvloop on Unix systems, winloop on Windows).
Args:
agents: List of Agent instances to run concurrently
task: Task string to execute by all agents
max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores)
agents (List[AgentType]): List of agent instances to run concurrently
task (str): The task string to be executed by all agents
max_workers (Optional[int]): Maximum number of threads in the executor.
Defaults to 95% of available CPU cores for optimal performance
Returns:
List of outputs from each agent
List[Any]: List of results from each agent. If an agent fails, the exception
is included in the results list instead of the result.
Raises:
ImportError: If neither uvloop nor winloop is available
RuntimeError: If event loop policy cannot be set
ImportError: If neither uvloop nor winloop is available (falls back to standard asyncio)
RuntimeError: If event loop policy cannot be set (falls back to standard asyncio)
Note:
- Automatically uses uvloop on Linux/macOS and winloop on Windows
- Falls back gracefully to standard asyncio if optimized loops are unavailable
- Uses 95% of CPU cores by default for optimal resource utilization
- Handles exceptions gracefully by including them in results
- Results may not be in the same order as input agents due to concurrent execution
Example:
>>> agents = [Agent1(), Agent2(), Agent3()]
>>> results = run_agents_concurrently_uvloop(agents, "Process data")
>>> for i, result in enumerate(results):
... if isinstance(result, Exception):
... print(f"Agent {i+1} failed: {result}")
... else:
... print(f"Agent {i+1} result: {result}")
"""
# Platform-specific event loop policy setup
if sys.platform in ('win32', 'cygwin'):
if sys.platform in ("win32", "cygwin"):
# Windows: Try to use winloop
try:
import winloop
asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
logger.info("Using winloop for enhanced Windows performance")
logger.info(
"Using winloop for enhanced Windows performance"
)
except ImportError:
logger.warning(
"winloop not available, falling back to standard asyncio. "
@ -254,6 +389,7 @@ def run_agents_concurrently_uvloop(
# Linux/macOS: Try to use uvloop
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for enhanced Unix performance")
except ImportError:
@ -330,21 +466,42 @@ def run_agents_with_tasks_uvloop(
max_workers: Optional[int] = None,
) -> List[Any]:
"""
Run multiple agents with different tasks concurrently using optimized performance.
Run multiple agents with different tasks concurrently using optimized async performance.
This function pairs each agent with a specific task and runs them concurrently
using uvloop on Linux/macOS and winloop on Windows for optimized performance.
This function pairs each agent with a specific task and runs them concurrently using
optimized event loop implementations (uvloop on Unix systems, winloop on Windows).
It's designed for high-performance scenarios where different agents need to work
on different tasks simultaneously.
Args:
agents: List of Agent instances to run
tasks: List of task strings (must match number of agents)
max_workers: Maximum number of threads (defaults to 95% of CPU cores)
agents (List[AgentType]): List of agent instances to run
tasks (List[str]): List of task strings, one for each agent. Must match the number of agents
max_workers (Optional[int]): Maximum number of threads in the executor.
Defaults to 95% of available CPU cores for optimal performance
Returns:
List of outputs from each agent
List[Any]: List of results from each agent in the same order as input agents.
If an agent fails, the exception is included in the results.
Raises:
ValueError: If number of agents doesn't match number of tasks
ValueError: If the number of agents doesn't match the number of tasks
Note:
- Automatically uses uvloop on Linux/macOS and winloop on Windows
- Falls back gracefully to standard asyncio if optimized loops are unavailable
- Uses 95% of CPU cores by default for optimal resource utilization
- Results maintain the same order as input agents
- Handles exceptions gracefully by including them in results
Example:
>>> agents = [Agent1(), Agent2(), Agent3()]
>>> tasks = ["Task A", "Task B", "Task C"]
>>> results = run_agents_with_tasks_uvloop(agents, tasks)
>>> for i, result in enumerate(results):
... if isinstance(result, Exception):
... print(f"Agent {i+1} with {tasks[i]} failed: {result}")
... else:
... print(f"Agent {i+1} with {tasks[i]}: {result}")
"""
if len(agents) != len(tasks):
raise ValueError(
@ -352,12 +509,15 @@ def run_agents_with_tasks_uvloop(
)
# Platform-specific event loop policy setup
if sys.platform in ('win32', 'cygwin'):
if sys.platform in ("win32", "cygwin"):
# Windows: Try to use winloop
try:
import winloop
asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
logger.info("Using winloop for enhanced Windows performance")
logger.info(
"Using winloop for enhanced Windows performance"
)
except ImportError:
logger.warning(
"winloop not available, falling back to standard asyncio. "
@ -371,6 +531,7 @@ def run_agents_with_tasks_uvloop(
# Linux/macOS: Try to use uvloop
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for enhanced Unix performance")
except ImportError:
@ -445,10 +606,40 @@ def run_agents_with_tasks_uvloop(
def get_swarms_info(swarms: List[Callable]) -> str:
"""
Fetches and formats information about all available swarms in the system.
Fetch and format information about all available swarms in the system.
This function provides a comprehensive overview of all swarms currently
available in the system, including their names, descriptions, agent counts,
and swarm types. It's useful for debugging, monitoring, and system introspection.
Args:
swarms (List[Callable]): List of swarm instances to get information about.
Each swarm should have name, description, agents, and swarm_type attributes
Returns:
str: A formatted string containing names and descriptions of all swarms.
str: A formatted string containing detailed information about all swarms.
Returns "No swarms currently available in the system." if the list is empty.
Note:
- Each swarm is expected to have the following attributes:
- name: The name of the swarm
- description: A description of the swarm's purpose
- agents: A list of agents in the swarm
- swarm_type: The type/category of the swarm
- The output is formatted for human readability with clear section headers
Example:
>>> swarms = [swarm1, swarm2, swarm3]
>>> info = get_swarms_info(swarms)
>>> print(info)
Available Swarms:
[Swarm 1]
Name: Data Processing Swarm
Description: Handles data analysis tasks
Length of Agents: 5
Swarm Type: Analysis
...
"""
if not swarms:
return "No swarms currently available in the system."
@ -477,10 +668,47 @@ def get_agents_info(
agents: List[Union[Agent, Callable]], team_name: str = None
) -> str:
"""
Fetches and formats information about all available agents in the system.
Fetch and format information about all available agents in the system.
This function provides a comprehensive overview of all agents currently
available in the system, including their names, descriptions, roles,
models, and configuration details. It's useful for debugging, monitoring,
and system introspection.
Args:
agents (List[Union[Agent, Callable]]): List of agent instances to get information about.
Each agent should have agent_name, agent_description,
role, model_name, and max_loops attributes
team_name (str, optional): Optional team name to include in the output header.
If None, uses a generic header
Returns:
str: A formatted string containing names and descriptions of all swarms.
str: A formatted string containing detailed information about all agents.
Returns "No agents currently available in the system." if the list is empty.
Note:
- Each agent is expected to have the following attributes:
- agent_name: The name of the agent
- agent_description: A description of the agent's purpose
- role: The role or function of the agent
- model_name: The AI model used by the agent
- max_loops: The maximum number of loops the agent can execute
- The output is formatted for human readability with clear section headers
- Team name is included in the header if provided
Example:
>>> agents = [agent1, agent2, agent3]
>>> info = get_agents_info(agents, team_name="Data Team")
>>> print(info)
Available Agents for Team: Data Team
[Agent 1]
Name: Data Analyzer
Description: Analyzes data patterns
Role: Analyst
Model: gpt-4
Max Loops: 10
...
"""
if not agents:
return "No agents currently available in the system."

@ -0,0 +1,30 @@
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_uvloop,
)
def create_example_agents(num_agents: int = 3):
"""Create example agents for demonstration."""
agents = []
for i in range(num_agents):
agent = Agent(
agent_name=f"Agent_{i+1}",
system_prompt=f"You are Agent {i+1}, a helpful AI assistant.",
model_name="gpt-4o-mini", # Using a lightweight model for examples
max_loops=1,
autosave=False,
verbose=False,
)
agents.append(agent)
return agents
agents = create_example_agents(3)
task = "Write a one-sentence summary about artificial intelligence."
results = run_agents_concurrently_uvloop(agents, task)
print(results)
Loading…
Cancel
Save