Update multi_agent_exec.py

pull/1116/head
CI-DEV 5 days ago committed by GitHub
parent 569aa2dce4
commit 8194b0fb63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,6 +7,7 @@ from concurrent.futures import (
from typing import Any, Callable, List, Optional, Union from typing import Any, Callable, List, Optional, Union
import uvloop import uvloop
import sys
from loguru import logger from loguru import logger
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
@ -210,17 +211,16 @@ def run_agents_with_different_tasks(
return results return results
def run_agents_concurrently_uvloop( def run_agents_concurrently_optimized(
agents: List[AgentType], agents: List[AgentType],
task: str, task: str,
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
) -> List[Any]: ) -> List[Any]:
""" """
Run multiple agents concurrently using uvloop for optimized async performance. Run multiple agents concurrently using optimized async performance.
uvloop is a fast, drop-in replacement for asyncio's event loop, implemented in Cython. Uses uvloop on Linux/macOS and winloop on Windows for enhanced performance.
It's designed to be significantly faster than the standard asyncio event loop, Falls back to standard asyncio if optimized event loops are not available.
especially beneficial for I/O-bound tasks and concurrent operations.
Args: Args:
agents: List of Agent instances to run concurrently agents: List of Agent instances to run concurrently
@ -231,21 +231,40 @@ def run_agents_concurrently_uvloop(
List of outputs from each agent List of outputs from each agent
Raises: Raises:
ImportError: If uvloop is not installed ImportError: If neither uvloop nor winloop is available
RuntimeError: If uvloop cannot be set as the event loop policy RuntimeError: If event loop policy cannot be set
""" """
try: # Platform-specific event loop policy setup
# Set uvloop as the default event loop policy for better performance if sys.platform in ('win32', 'cygwin'):
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # Windows: Try to use winloop
except ImportError: try:
logger.warning( import winloop
"uvloop not available, falling back to standard asyncio. " asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
"Install uvloop with: pip install uvloop" logger.info("Using winloop for enhanced Windows performance")
) except ImportError:
except RuntimeError as e: logger.warning(
logger.warning( "winloop not available, falling back to standard asyncio. "
f"Could not set uvloop policy: {e}. Using default asyncio." "Install winloop with: pip install winloop"
) )
except RuntimeError as e:
logger.warning(
f"Could not set winloop policy: {e}. Using default asyncio."
)
else:
# Linux/macOS: Try to use uvloop
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for enhanced Unix performance")
except ImportError:
logger.warning(
"uvloop not available, falling back to standard asyncio. "
"Install uvloop with: pip install uvloop"
)
except RuntimeError as e:
logger.warning(
f"Could not set uvloop policy: {e}. Using default asyncio."
)
if max_workers is None: if max_workers is None:
# Use 95% of available CPU cores for optimal performance # Use 95% of available CPU cores for optimal performance
@ -305,16 +324,16 @@ def run_agents_concurrently_uvloop(
raise raise
def run_agents_with_tasks_uvloop( def run_agents_with_tasks_optimized(
agents: List[AgentType], agents: List[AgentType],
tasks: List[str], tasks: List[str],
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
) -> List[Any]: ) -> List[Any]:
""" """
Run multiple agents with different tasks concurrently using uvloop. Run multiple agents with different tasks concurrently using optimized performance.
This function pairs each agent with a specific task and runs them concurrently This function pairs each agent with a specific task and runs them concurrently
using uvloop for optimized performance. using uvloop on Linux/macOS and winloop on Windows for optimized performance.
Args: Args:
agents: List of Agent instances to run agents: List of Agent instances to run
@ -332,25 +351,44 @@ def run_agents_with_tasks_uvloop(
f"Number of agents ({len(agents)}) must match number of tasks ({len(tasks)})" f"Number of agents ({len(agents)}) must match number of tasks ({len(tasks)})"
) )
try: # Platform-specific event loop policy setup
# Set uvloop as the default event loop policy if sys.platform in ('win32', 'cygwin'):
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # Windows: Try to use winloop
except ImportError: try:
logger.warning( import winloop
"uvloop not available, falling back to standard asyncio. " asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
"Install uvloop with: pip install uvloop" logger.info("Using winloop for enhanced Windows performance")
) except ImportError:
except RuntimeError as e: logger.warning(
logger.warning( "winloop not available, falling back to standard asyncio. "
f"Could not set uvloop policy: {e}. Using default asyncio." "Install winloop with: pip install winloop"
) )
except RuntimeError as e:
logger.warning(
f"Could not set winloop policy: {e}. Using default asyncio."
)
else:
# Linux/macOS: Try to use uvloop
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for enhanced Unix performance")
except ImportError:
logger.warning(
"uvloop not available, falling back to standard asyncio. "
"Install uvloop with: pip install uvloop"
)
except RuntimeError as e:
logger.warning(
f"Could not set uvloop policy: {e}. Using default asyncio."
)
if max_workers is None: if max_workers is None:
num_cores = os.cpu_count() num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1 max_workers = int(num_cores * 0.95) if num_cores else 1
logger.inufo( logger.info(
f"Running {len(agents)} agents with {len(tasks)} tasks using uvloop (max_workers: {max_workers})" f"Running {len(agents)} agents with {len(tasks)} tasks using optimized event loop (max_workers: {max_workers})"
) )
async def run_agents_with_tasks_async(): async def run_agents_with_tasks_async():

Loading…
Cancel
Save