[DOCS][Update][ Multi agent exec utilites]

pull/1127/merge
Kye Gomez 3 days ago
parent e3767bb449
commit 7fe7ebdc46

@ -9,12 +9,12 @@ This comprehensive documentation covers all functions in the `multi_agent_exec.p
| `run_single_agent` | `run_single_agent(agent, task, *args, **kwargs) -> Any` | Single Agent | Runs a single agent synchronously |
| `run_agent_async` | `run_agent_async(agent, task) -> Any` | Single Agent | Runs a single agent asynchronously using asyncio |
| `run_agents_concurrently_async` | `run_agents_concurrently_async(agents, task) -> List[Any]` | Concurrent Execution | Runs multiple agents concurrently using asyncio |
| `run_agents_concurrently` | `run_agents_concurrently(agents, task, max_workers=None) -> List[Any]` | Concurrent Execution | Optimized concurrent agent runner using ThreadPoolExecutor |
| `run_agents_concurrently` | `run_agents_concurrently(agents, task, img=None, max_workers=None, return_agent_output_dict=False) -> Union[List[Any], Dict[str, Any]]` | Concurrent Execution | Optimized concurrent agent runner with image support and flexible output formats |
| `run_agents_concurrently_multiprocess` | `run_agents_concurrently_multiprocess(agents, task, batch_size=None) -> List[Any]` | Concurrent Execution | Manages agents concurrently in batches with optimized performance |
| `batched_grid_agent_execution` | `batched_grid_agent_execution(agents, tasks, max_workers=None) -> List[Any]` | Batched & Grid | Runs multiple agents with different tasks concurrently |
| `run_agents_with_different_tasks` | `run_agents_with_different_tasks(agent_task_pairs, batch_size=10, max_workers=None) -> List[Any]` | Batched & Grid | Runs agents with different tasks concurrently in batches |
| `run_agents_concurrently_uvloop` | `run_agents_concurrently_uvloop(agents, task, max_workers=None) -> List[Any]` | UVLoop Optimized | Runs agents concurrently using uvloop for optimized performance |
| `run_agents_with_tasks_uvloop` | `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None) -> List[Any]` | UVLoop Optimized | Runs agents with different tasks using uvloop optimization |
| `run_agents_concurrently_uvloop` | `run_agents_concurrently_uvloop(agents, task, max_workers=None) -> List[Any]` | Platform Optimized | Runs agents concurrently using uvloop (Unix) or winloop (Windows) for optimized performance |
| `run_agents_with_tasks_uvloop` | `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None) -> List[Any]` | Platform Optimized | Runs agents with different tasks using platform-specific optimizations |
| `get_swarms_info` | `get_swarms_info(swarms) -> str` | Utility | Fetches and formats information about available swarms |
| `get_agents_info` | `get_agents_info(agents, team_name=None) -> str` | Utility | Fetches and formats information about available agents |
@ -157,30 +157,36 @@ async def main():
asyncio.run(main())
```
### `run_agents_concurrently(agents, task, max_workers=None)`
### `run_agents_concurrently(agents, task, img=None, max_workers=None, return_agent_output_dict=False)`
Optimized concurrent agent runner using ThreadPoolExecutor.
Optimized concurrent agent runner using ThreadPoolExecutor with image support and flexible output formats.
#### Signature
```python
def run_agents_concurrently(
agents: List[AgentType],
task: str,
img: Optional[str] = None,
max_workers: Optional[int] = None,
) -> List[Any]
return_agent_output_dict: bool = False,
) -> Union[List[Any], Dict[str, Any]]
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|--------------|-------------------|----------|---------|-------------|
| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run concurrently |
| `task` | `str` | Yes | - | Task string to execute |
| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor |
| Parameter | Type | Required | Default | Description |
|----------------------------|-------------------|----------|---------|-------------|
| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run concurrently |
| `task` | `str` | Yes | - | Task string to execute |
| `img` | `Optional[str]` | No | None | Optional image data to pass to agent run() if supported |
| `max_workers` | `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor |
| `return_agent_output_dict` | `bool` | No | False | If True, returns a dict mapping agent names to outputs |
#### Returns
- `List[Any]`: List of outputs from each agent (exceptions included if agents fail)
- `Union[List[Any], Dict[str, Any]]`:
- If `return_agent_output_dict=False`: List of outputs from each agent in completion order (exceptions included if agents fail)
- If `return_agent_output_dict=True`: Dictionary mapping agent names to outputs, preserving agent input order
#### Example
```python
@ -210,10 +216,22 @@ agents = [
]
task = "Analyze the future of electric vehicles in 2025"
results = run_agents_concurrently(agents, task, max_workers=4)
# Example 1: Basic concurrent execution
results = run_agents_concurrently(agents, task, max_workers=4)
for i, result in enumerate(results):
print(f"Agent {i+1} ({agents[i].agent_name}): {result}")
# Example 2: With image support (if agents support it)
# image_data = "base64_encoded_image_string"
# results_with_img = run_agents_concurrently(agents, task, img=image_data)
# Example 3: Return results as dictionary with agent names as keys
results_dict = run_agents_concurrently(
agents, task, return_agent_output_dict=True
)
for agent_name, result in results_dict.items():
print(f"{agent_name}: {result}")
```
### `run_agents_concurrently_multiprocess(agents, task, batch_size=None)`
@ -398,11 +416,16 @@ for i, result in enumerate(results):
print(f"{agent.agent_name} - {task}: {result}")
```
## UVLoop Optimized Functions
## Platform Optimized Functions
### `run_agents_concurrently_uvloop(agents, task, max_workers=None)`
Runs multiple agents concurrently using uvloop for optimized async performance.
Runs multiple agents concurrently using platform-specific optimized event loops for enhanced performance.
This function automatically selects the best available event loop implementation for your platform:
- **Unix/Linux/macOS**: Uses `uvloop` for significantly improved async performance
- **Windows**: Uses `winloop` for optimized Windows performance
- **Fallback**: Gracefully falls back to standard asyncio if optimized loops are unavailable
#### Signature
```python
@ -422,18 +445,22 @@ def run_agents_concurrently_uvloop(
| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor |
#### Returns
- `List[Any]`: List of outputs from each agent
- `List[Any]`: List of outputs from each agent. If an agent fails, the exception is included in the results.
#### Raises
- `ImportError`: If uvloop is not installed
- `RuntimeError`: If uvloop cannot be set as the event loop policy
- `ImportError`: If neither uvloop nor winloop is available (falls back to standard asyncio)
- `RuntimeError`: If event loop policy cannot be set (falls back to standard asyncio)
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop
# Note: uvloop must be installed (pip install uvloop)
# Note: Platform-specific optimizations are automatically selected
# - Unix/Linux/macOS: Install uvloop with 'pip install uvloop'
# - Windows: Install winloop with 'pip install winloop'
# - Falls back to standard asyncio if neither is available
agents = [
Agent(
agent_name="Performance-Analyst",
@ -447,12 +474,17 @@ agents = [
task = "Analyze system performance metrics"
results = run_agents_concurrently_uvloop(agents, task)
print(f"Processed {len(results)} agents with uvloop optimization")
print(f"Processed {len(results)} agents with platform-optimized event loop")
```
### `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None)`
Runs multiple agents with different tasks concurrently using uvloop.
Runs multiple agents with different tasks concurrently using platform-specific optimized event loops.
This function automatically selects the best available event loop implementation for your platform:
- **Unix/Linux/macOS**: Uses `uvloop` for significantly improved async performance
- **Windows**: Uses `winloop` for optimized Windows performance
- **Fallback**: Gracefully falls back to standard asyncio if optimized loops are unavailable
#### Signature
```python
@ -472,21 +504,23 @@ def run_agents_with_tasks_uvloop(
| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads |
#### Returns
- `List[Any]`: List of outputs from each agent
- `List[Any]`: List of outputs from each agent in the same order as input agents. If an agent fails, the exception is included in the results.
#### Raises
- `ValueError`: If number of agents doesn't match number of tasks
- `ImportError`: If uvloop is not installed
- `RuntimeError`: If uvloop cannot be set as the event loop policy
- `ImportError`: If neither uvloop nor winloop is available (falls back to standard asyncio)
- `RuntimeError`: If event loop policy cannot be set (falls back to standard asyncio)
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_with_tasks_uvloop
# Note: Platform-specific optimizations are automatically selected
# - Unix/Linux/macOS: Install uvloop with 'pip install uvloop'
# - Windows: Install winloop with 'pip install winloop'
# - Falls back to standard asyncio if neither is available
agents = [
Agent(
agent_name="Data-Analyst-1",
@ -510,8 +544,11 @@ tasks = [
results = run_agents_with_tasks_uvloop(agents, tasks)
for i, result in enumerate(results):
print(f"Task: {tasks[i]}")
print(f"Result: {result}\n")
if isinstance(result, Exception):
print(f"Agent {i+1} with {tasks[i]} failed: {result}")
else:
print(f"Task: {tasks[i]}")
print(f"Result: {result}\n")
```
## Utility Functions
@ -673,7 +710,7 @@ print("=== Agent Information ===")
print(get_agents_info(agents, "Business Intelligence Team"))
print("\n" + "="*50 + "\n")
# Example 1: Same task for all agents
# Example 1: Same task for all agents (basic concurrent execution)
print("=== Example 1: Concurrent Execution with Same Task ===")
task = "Analyze the impact of remote work trends on commercial real estate market in 2024"
results = run_agents_concurrently(agents, task, max_workers=4)
@ -682,6 +719,20 @@ for i, result in enumerate(results):
print(f"\n{agents[i].agent_name} Analysis:")
print(f"Result: {result}")
# Example 1b: Same task with dictionary output format
print("\n=== Example 1b: Dictionary Output Format ===")
results_dict = run_agents_concurrently(
agents, task, return_agent_output_dict=True, max_workers=4
)
for agent_name, result in results_dict.items():
print(f"\n{agent_name} Analysis:")
print(f"Result: {result}")
# Example 1c: With image support (if agents support it)
print("\n=== Example 1c: With Image Support ===")
# image_data = "base64_encoded_image_string" # Uncomment if you have image data
# results_with_img = run_agents_concurrently(agents, task, img=image_data, max_workers=4)
print("\n" + "="*50 + "\n")
# Example 2: Different tasks for different agents
@ -721,6 +772,68 @@ for i, result in enumerate(grid_results):
print("\n=== Workflow Complete ===")
```
### Platform-Optimized Execution Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_uvloop,
run_agents_with_tasks_uvloop
)
# Create agents for high-performance execution
agents = [
Agent(
agent_name="High-Perf-Analyst-1",
system_prompt="You are a high-performance data analyst",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="High-Perf-Analyst-2",
system_prompt="You are a high-performance data analyst",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="High-Perf-Analyst-3",
system_prompt="You are a high-performance data analyst",
model_name="gpt-4o-mini",
max_loops=1
)
]
# Example 1: Platform-optimized concurrent execution
print("=== Platform-Optimized Concurrent Execution ===")
task = "Perform high-speed data analysis on market trends"
results = run_agents_concurrently_uvloop(agents, task)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Agent {i+1} failed: {result}")
else:
print(f"Agent {i+1} result: {result}")
# Example 2: Platform-optimized execution with different tasks
print("\n=== Platform-Optimized Different Tasks ===")
tasks = [
"Analyze Q1 financial performance",
"Evaluate market volatility patterns",
"Assess competitive landscape changes"
]
results = run_agents_with_tasks_uvloop(agents, tasks)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Agent {i+1} with {tasks[i]} failed: {result}")
else:
print(f"Task: {tasks[i]}")
print(f"Result: {result}\n")
print("=== Platform-Optimized Execution Complete ===")
```
### Error Handling and Best Practices
```python
@ -762,15 +875,21 @@ except Exception as e:
# 2. Use appropriate max_workers based on system resources
# 3. Monitor memory usage for large agent counts
# 4. Consider batch processing for very large numbers of agents
# 5. Use uvloop functions for I/O intensive tasks
# 5. Use platform-optimized functions (uvloop/winloop) for I/O intensive tasks
# 6. Use return_agent_output_dict=True for structured, named results
# 7. Pass image data to agents that support multimodal processing
# 8. Leverage platform-specific optimizations automatically
```
## Performance Considerations
| Technique | Best Use Case / Description |
|------------------------|------------------------------------------------------------------------------------|
| **ThreadPoolExecutor** | Best for CPU-bound tasks with moderate I/O |
| **uvloop** | Optimized for I/O-bound tasks, significantly faster than standard asyncio |
| **Batch Processing** | Prevents system overload with large numbers of agents |
| **Resource Monitoring**| Adjust worker counts based on system capabilities |
| **Async/Await** | Use async functions for better concurrency control |
| **ThreadPoolExecutor** | Best for CPU-bound tasks with moderate I/O, supports image processing and flexible output formats |
| **Platform-Specific Event Loops** | **uvloop** (Unix/Linux/macOS) and **winloop** (Windows) for significantly improved async performance |
| **Batch Processing** | Prevents system overload with large numbers of agents, maintains order with grid execution |
| **Resource Monitoring**| Adjust worker counts based on system capabilities (defaults to 95% of CPU cores) |
| **Async/Await** | Use async functions for better concurrency control and platform optimizations |
| **Image Support** | Pass image data to agents that support multimodal processing for enhanced capabilities |
| **Dictionary Output** | Use `return_agent_output_dict=True` for structured results with agent name mapping |
| **Error Handling** | All functions include comprehensive exception handling with graceful fallbacks |

@ -0,0 +1,34 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_uvloop,
)
# Initialize the equity analyst agents
equity_analyst_1 = Agent(
agent_name="Equity-Analyst-1",
agent_description="Equity research analyst focused on fundamental analysis",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
equity_analyst_2 = Agent(
agent_name="Equity-Analyst-2",
agent_description="Equity research analyst focused on technical analysis",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
outputs = run_agents_concurrently_uvloop(
agents=[equity_analyst_1, equity_analyst_2],
task="Analyze high growth tech stocks focusing on fundamentals like revenue growth, margins, and market position. Create a detailed analysis table in markdown.",
)
print(outputs)

@ -1,15 +1,4 @@
"""
Enhanced Collaborative InteractiveGroupChat Example
This example demonstrates the improved collaborative behavior where agents:
1. Read and understand all previous responses
2. Acknowledge what other agents have said
3. Build upon their insights rather than repeating information
4. Synthesize multiple perspectives
5. Delegate appropriately using @mentions
The enhanced prompts ensure agents work as a true collaborative team.
"""
from swarms import Agent
from swarms.structs.interactive_groupchat import (

@ -91,6 +91,4 @@ social_alg = SocialAlgorithms(
verbose=True,
)
result = social_alg.run(
"Innovative solutions for climate change"
)
result = social_alg.run("Innovative solutions for climate change")

@ -37,7 +37,7 @@ def competitive_evaluation_algorithm(
] = result
# Judge evaluates all solutions
evaluation_prompt = f"Evaluate these solutions and rank them:\n\n"
evaluation_prompt = "Evaluate these solutions and rank them:\n\n"
for name, result in competitor_results.items():
evaluation_prompt += f"{name}:\n{result}\n\n"
@ -84,4 +84,4 @@ social_alg = SocialAlgorithms(
result = social_alg.run(
"Design the most efficient algorithm for sorting large datasets"
)
)

@ -85,4 +85,4 @@ social_alg = SocialAlgorithms(
result = social_alg.run(
"Develop a comprehensive marketing strategy for a new product launch"
)
)

@ -78,6 +78,4 @@ social_alg = SocialAlgorithms(
verbose=True,
)
result = social_alg.run(
"Design a sustainable city planning strategy"
)
result = social_alg.run("Design a sustainable city planning strategy")

@ -78,4 +78,4 @@ social_alg = SocialAlgorithms(
result = social_alg.run(
"The future of artificial intelligence in healthcare"
)
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "8.4.2"
version = "8.5.0"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@swarms.world>"]

@ -475,13 +475,17 @@ class SocialAlgorithms:
"algorithm_name": self.name,
"task": task,
"agent_count": len(self.agents),
"agent_names": [agent.agent_name for agent in self.agents],
"agent_names": [
agent.agent_name for agent in self.agents
],
"max_execution_time": self.max_execution_time,
}
},
)
# Clear previous communication history
self._log_execution_step("Clearing previous communication history")
self._log_execution_step(
"Clearing previous communication history"
)
self.clear_communication_history()
# Prepare algorithm arguments
@ -490,18 +494,22 @@ class SocialAlgorithms:
{
"algorithm_args": algorithm_args,
"additional_kwargs": kwargs,
}
},
)
algorithm_kwargs = algorithm_args or {}
algorithm_kwargs.update(kwargs)
# Add communication logging wrapper if enabled
if self.enable_communication_logging:
self._log_execution_step("Wrapping algorithm with communication logging")
self._log_execution_step(
"Wrapping algorithm with communication logging"
)
wrapped_algorithm = self._wrap_algorithm_with_logging()
wrapped_algorithm.social_algorithms_instance = self
else:
self._log_execution_step("Using algorithm without communication logging")
self._log_execution_step(
"Using algorithm without communication logging"
)
wrapped_algorithm = self.social_algorithm
start_time = time.time()
@ -513,7 +521,7 @@ class SocialAlgorithms:
if self.max_execution_time > 0:
self._log_execution_step(
"Executing algorithm with timeout",
{"timeout_seconds": self.max_execution_time}
{"timeout_seconds": self.max_execution_time},
)
result = self._execute_with_timeout(
wrapped_algorithm,
@ -522,7 +530,9 @@ class SocialAlgorithms:
**algorithm_kwargs,
)
else:
self._log_execution_step("Executing algorithm without timeout")
self._log_execution_step(
"Executing algorithm without timeout"
)
result = wrapped_algorithm(
self.agents, task, **algorithm_kwargs
)
@ -532,22 +542,24 @@ class SocialAlgorithms:
"Algorithm execution completed successfully",
{
"successful_steps": successful_steps,
"communication_steps": len(self.communication_history),
}
"communication_steps": len(
self.communication_history
),
},
)
except TimeoutError:
self._log_execution_step(
"Algorithm execution timed out",
{"timeout_seconds": self.max_execution_time},
level="error"
level="error",
)
raise
except Exception as e:
self._log_execution_step(
"Algorithm execution failed",
{"error": str(e), "error_type": type(e).__name__},
level="error"
level="error",
)
failed_steps = 1
raise
@ -556,8 +568,7 @@ class SocialAlgorithms:
# Format the output
self._log_execution_step(
"Formatting output",
{"output_type": self.output_type}
"Formatting output", {"output_type": self.output_type}
)
formatted_result = self._format_output(result)
@ -578,10 +589,12 @@ class SocialAlgorithms:
"Algorithm execution completed",
{
"execution_time": f"{execution_time:.2f} seconds",
"total_communication_steps": len(self.communication_history),
"total_communication_steps": len(
self.communication_history
),
"successful_steps": successful_steps,
"failed_steps": failed_steps,
}
},
)
return algorithm_result

Loading…
Cancel
Save