[FEAT][New Multi-Agent Execution utilities][run_agents_concurrently_uvloop] [run_agents_with_tasks_uvloop] [Improvement][Improved the batched_grid_agent_execution to use concurrent futures] [Updated docs] [Agent][Fix batched run] [FEAT][CustomAgent] [New util examples]

pull/1072/head
Kye Gomez 1 month ago
parent 5ceef024ea
commit 28995b6900

1
.gitignore vendored

@ -16,6 +16,7 @@ databases
static/generated static/generated
conversations/ conversations/
next_swarms_update.txt next_swarms_update.txt
infra.md
runs runs
Financial-Analysis-Agent_state.json Financial-Analysis-Agent_state.json
conversations/ conversations/

@ -49826,32 +49826,6 @@ A dataclass for system resource metrics.
| memory_percent | float | Current memory usage percentage | | memory_percent | float | Current memory usage percentage |
| active_threads | int | Number of active threads | | active_threads | int | Number of active threads |
### run_agents_with_resource_monitoring()
Runs agents with system resource monitoring and adaptive batch sizing.
#### Arguments
| Parameter | Type | Required | Default | Description |
|------------------|----------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances |
| task | str | Yes | - | Task string to execute |
| cpu_threshold | float | No | 90.0 | Max CPU usage percentage |
| memory_threshold | float | No | 90.0 | Max memory usage percentage |
| check_interval | float | No | 1.0 | Resource check interval in seconds |
## Performance Considerations
- Default batch sizes and worker counts are optimized based on CPU cores
- Resource monitoring helps prevent system overload
- Using `uvloop` provides better performance than standard `asyncio`
## Error Handling
- Functions handle asyncio event loop creation/retrieval
- Timeout mechanism prevents infinite waiting
- Resource monitoring allows for adaptive performance adjustment
-------------------------------------------------- --------------------------------------------------
# File: swarms/structs/yaml_model.md # File: swarms/structs/yaml_model.md

@ -282,7 +282,7 @@ nav:
- SpreadSheetSwarm: "swarms/structs/spreadsheet_swarm.md" - SpreadSheetSwarm: "swarms/structs/spreadsheet_swarm.md"
- ForestSwarm: "swarms/structs/forest_swarm.md" - ForestSwarm: "swarms/structs/forest_swarm.md"
- MALT: "swarms/structs/malt.md" - MALT: "swarms/structs/malt.md"
- Various Execution Methods: "swarms/structs/various_execution_methods.md" - Multi-Agent Execution Utilities: "swarms/structs/various_execution_methods.md"
- Deep Research Swarm: "swarms/structs/deep_research_swarm.md" - Deep Research Swarm: "swarms/structs/deep_research_swarm.md"
- Council of Judges: "swarms/structs/council_of_judges.md" - Council of Judges: "swarms/structs/council_of_judges.md"
- Heavy Swarm: "swarms/structs/heavy_swarm.md" - Heavy Swarm: "swarms/structs/heavy_swarm.md"

@ -113,8 +113,7 @@ Internal method that runs the workflow without error handling.
### Basic Usage ### Basic Usage
```python ```python
from swarms import Agent from swarms import Agent, BatchedGridWorkflow
from swarms.structs.batched_grid_workflow import BatchedGridWorkflow
# Initialize the ETF-focused agent # Initialize the ETF-focused agent
agent = Agent( agent = Agent(

@ -1,705 +0,0 @@
# SwarmNetwork [WIP]
The `SwarmNetwork` class is a powerful tool for managing a pool of agents, orchestrating task distribution, and scaling resources based on workload. It is designed to handle tasks efficiently by dynamically adjusting the number of agents according to the current demand. This class also provides an optional API for interacting with the agent pool, making it accessible for integration with other systems.
### Key Features
- **Agent Pool Management**: Dynamically manage a pool of agents.
- **Task Queue Management**: Handle tasks through a queue system.
- **Agent Health Monitoring**: Monitor the health of agents.
- **Agent Pool Scaling**: Scale the agent pool up or down based on workload.
- **API**: Interact with the agent pool and task queue through a simple API.
- **Agent Deployment Options**: Run agents on threads, processes, containers, machines, or clusters.
### Parameters
| Parameter | Type | Default Value | Description |
|-----------------|--------------------|---------------|-----------------------------------------------------------------------------|
| name | str | None | The name of the swarm network. |
| description | str | None | A description of the swarm network. |
| agents | List[Agent] | None | A list of agents in the pool. |
| idle_threshold | float | 0.2 | The idle threshold for the agents. |
| busy_threshold | float | 0.7 | The busy threshold for the agents. |
| api_enabled | Optional[bool] | False | A flag to enable/disable the API. |
| logging_enabled | Optional[bool] | False | A flag to enable/disable logging. |
| api_on | Optional[bool] | False | A flag to enable/disable the FastAPI instance. |
| host | str | "0.0.0.0" | The host address for the FastAPI instance. |
| port | int | 8000 | The port number for the FastAPI instance. |
| swarm_callable | Optional[callable] | None | A callable to be executed by the swarm network. |
| *args | tuple | | Additional positional arguments. |
| **kwargs | dict | | Additional keyword arguments. |
### Attributes
| Attribute | Type | Description |
|------------------|--------------------|----------------------------------------------------------------|
| task_queue | queue.Queue | A queue for storing tasks. |
| idle_threshold | float | The idle threshold for the agents. |
| busy_threshold | float | The busy threshold for the agents. |
| agents | List[Agent] | A list of agents in the pool. |
| api_enabled | bool | A flag to enable/disable the API. |
| logging_enabled | bool | A flag to enable/disable logging. |
| host | str | The host address for the FastAPI instance. |
| port | int | The port number for the FastAPI instance. |
| swarm_callable | Optional[callable] | A callable to be executed by the swarm network. |
| agent_dict | dict | A dictionary of agents for easy access. |
| lock | threading.Lock | A lock for synchronizing access to shared resources. |
## Methods
#### Description
Initializes a new instance of the `SwarmNetwork` class.
#### Parameters
- `name` (str): The name of the swarm network.
- `description` (str): A description of the swarm network.
- `agents` (List[Agent]): A list of agents in the pool.
- `idle_threshold` (float): The idle threshold for the agents.
- `busy_threshold` (float): The busy threshold for the agents.
- `api_enabled` (Optional[bool]): A flag to enable/disable the API.
- `logging_enabled` (Optional[bool]): A flag to enable/disable logging.
- `api_on` (Optional[bool]): A flag to enable/disable the FastAPI instance.
- `host` (str): The host address for the FastAPI instance.
- `port` (int): The port number for the FastAPI instance.
- `swarm_callable` (Optional[callable]): A callable to be executed by the swarm network.
- `*args`: Additional positional arguments.
- `**kwargs`: Additional keyword arguments.
### `add_task`
```python
def add_task(self, task)
```
#### Description
Adds a task to the task queue.
#### Parameters
- `task` (_type_): The task to be added to the queue.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
agent = Agent()
swarm = SwarmNetwork(agents=[agent])
swarm.add_task("task")
```
### `async_add_task`
```python
async def async_add_task(self, task)
```
#### Description
Adds a task to the task queue asynchronously.
#### Parameters
- `task` (_type_): The task to be added to the queue.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
agent = Agent()
swarm = SwarmNetwork(agents=[agent])
await swarm.async_add_task("task")
```
### `run_single_agent`
```python
def run_single_agent(self, agent_id, task: Optional[str], *args, **kwargs)
```
#### Description
Runs a task on a specific agent by ID.
#### Parameters
- `agent_id` (_type_): The ID of the agent.
- `task` (str, optional): The task to be executed by the agent.
- `*args`: Additional positional arguments.
- `**kwargs`: Additional keyword arguments.
#### Returns
- `_type_`: The output of the agent running the task.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
result = swarm.run_single_agent(agent.id, "task")
```
### `run_many_agents`
```python
def run_many_agents(self, task: Optional[str] = None, *args, **kwargs) -> List
```
#### Description
Runs a task on all agents in the pool.
#### Parameters
- `task` (str, optional): The task to be executed by the agents.
- `*args`: Additional positional arguments.
- `**kwargs`: Additional keyword arguments.
#### Returns
- `List`: The output of all agents running the task.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent1, agent2])
results = swarm.run_many_agents("task")
```
### `list_agents`
```python
def list_agents(self)
```
#### Description
Lists all agents in the pool.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
swarm.list_agents()
```
### `get_agent`
```python
def get_agent(self, agent_id)
```
#### Description
Gets an agent by ID.
#### Parameters
- `agent_id` (_type_): The ID of the agent to retrieve.
#### Returns
- `_type_`: The agent with the specified ID.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
retrieved_agent = swarm.get_agent(agent.id)
```
### `add_agent`
```python
def add_agent(self, agent: Agent)
```
#### Description
Adds an agent to the agent pool.
#### Parameters
- `agent` (_type_): The agent to be added to the pool.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[])
swarm.add_agent(agent)
```
### `remove_agent`
```python
def remove_agent(self, agent_id)
```
#### Description
Removes an agent from the agent pool.
#### Parameters
- `agent_id` (_type_): The ID of the agent to be removed.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
swarm.remove_agent(agent.id)
```
### `
async_remove_agent`
```python
async def async_remove_agent(self, agent_id)
```
#### Description
Removes an agent from the agent pool asynchronously.
#### Parameters
- `agent_id` (_type_): The ID of the agent to be removed.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
await swarm.async_remove_agent(agent.id)
```
### `scale_up`
```python
def scale_up(self, num_agents: int = 1)
```
#### Description
Scales up the agent pool by adding new agents.
#### Parameters
- `num_agents` (int, optional): The number of agents to add. Defaults to 1.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
swarm.scale_up(2)
```
### `scale_down`
```python
def scale_down(self, num_agents: int = 1)
```
#### Description
Scales down the agent pool by removing agents.
#### Parameters
- `num_agents` (int, optional): The number of agents to remove. Defaults to 1.
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.swarm_net import SwarmNetwork
# Initialize the agent
agent2 = Agent(
agent_name="ROTH-IRA-AGENT",
system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT,
llm=model,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
interactive=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="finance_agent.json",
# tools=[Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
swarm = SwarmNetwork(agents=[agent])
swarm.scale_down(1)
```
### `run`
#### Description
Runs the swarm network, starting the FastAPI application.
#### Example
```python
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, OpenAIChat, SwarmNetwork
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
# Load the swarmnet with the agents
swarmnet = SwarmNetwork(
agents=[agent, agent2, agent3],
)
# List the agents in the swarm network
out = swarmnet.list_agents()
print(out)
# Run the workflow on a task
out = swarmnet.run_single_agent(
agent2.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)
# Run all the agents in the swarm network on a task
out = swarmnet.run_many_agents("Generate a 10,000 word blog on health and wellness.")
print(out)
```
## Additional Information and Tips
- **Error Handling**: Make use of try-except blocks to handle potential errors when adding tasks, running tasks, and managing agents.
- **Logging**: Enable logging to track the activity and status of the swarm network.
- **API**: The provided API allows for easy interaction with the swarm network and can be extended as needed.
- **Asynchronous Operations**: Utilize the asynchronous methods for non-blocking operations, especially in a production environment.
- **Scaling**: Adjust the scaling thresholds (`idle_threshold` and `busy_threshold`) based on the specific needs and workload patterns.
## References and Resources
- [Python Queue Documentation](https://docs.python.org/3/library/queue.html)
- [Threading in Python](https://docs.python.org/3/library/threading.html)
- [FastAPI Documentation](https://fastapi.tiangolo.com/)
- [Tenacity Documentation](https://tenacity.readthedocs.io/en/latest/)
By following this documentation, users can effectively manage and utilize the `SwarmNetwork` class to handle dynamic workloads and maintain an efficient pool of agents.

@ -1,90 +0,0 @@
# TaskQueueSwarm Documentation
The `TaskQueueSwarm` class is designed to manage and execute tasks using multiple agents concurrently. This class allows for the orchestration of multiple agents processing tasks from a shared queue, facilitating complex workflows where tasks can be distributed and processed in parallel by different agents.
## Attributes
| Attribute | Type | Description |
|-----------|------|-------------|
| `agents` | `List[Agent]` | The list of agents in the swarm. |
| `task_queue` | `queue.Queue` | A queue to store tasks for processing. |
| `lock` | `threading.Lock` | A lock for thread synchronization. |
| `autosave_on` | `bool` | Whether to automatically save the swarm metadata. |
| `save_file_path` | `str` | The file path for saving swarm metadata. |
| `workspace_dir` | `str` | The directory path of the workspace. |
| `return_metadata_on` | `bool` | Whether to return the swarm metadata after running. |
| `max_loops` | `int` | The maximum number of loops to run the swarm. |
| `metadata` | `SwarmRunMetadata` | Metadata about the swarm run. |
## Methods
### `__init__(self, agents: List[Agent], name: str = "Task-Queue-Swarm", description: str = "A swarm that processes tasks from a queue using multiple agents on different threads.", autosave_on: bool = True, save_file_path: str = "swarm_run_metadata.json", workspace_dir: str = os.getenv("WORKSPACE_DIR"), return_metadata_on: bool = False, max_loops: int = 1, *args, **kwargs)`
The constructor initializes the `TaskQueueSwarm` object.
- **Parameters:**
- `agents` (`List[Agent]`): The list of agents in the swarm.
- `name` (`str`, optional): The name of the swarm. Defaults to "Task-Queue-Swarm".
- `description` (`str`, optional): The description of the swarm. Defaults to "A swarm that processes tasks from a queue using multiple agents on different threads.".
- `autosave_on` (`bool`, optional): Whether to automatically save the swarm metadata. Defaults to True.
- `save_file_path` (`str`, optional): The file path to save the swarm metadata. Defaults to "swarm_run_metadata.json".
- `workspace_dir` (`str`, optional): The directory path of the workspace. Defaults to os.getenv("WORKSPACE_DIR").
- `return_metadata_on` (`bool`, optional): Whether to return the swarm metadata after running. Defaults to False.
- `max_loops` (`int`, optional): The maximum number of loops to run the swarm. Defaults to 1.
- `*args`: Variable length argument list.
- `**kwargs`: Arbitrary keyword arguments.
### `add_task(self, task: str)`
Adds a task to the queue.
- **Parameters:**
- `task` (`str`): The task to be added to the queue.
### `run(self)`
Runs the swarm by having agents pick up tasks from the queue.
- **Returns:**
- `str`: JSON string of the swarm run metadata if `return_metadata_on` is True.
- **Usage Example:**
```python
from swarms import Agent, TaskQueueSwarm
# Initialize agents
agent1 = Agent(agent_name="Agent1", model_name="gpt-4o")
agent2 = Agent(agent_name="Agent2", model_name="gpt-4o")
# Create the TaskQueueSwarm
swarm = TaskQueueSwarm(agents=[agent1, agent2], max_loops=5)
# Add tasks to the swarm
swarm.add_task("Analyze the latest market trends")
swarm.add_task("Generate a summary report")
# Run the swarm
result = swarm.run()
print(result) # Prints the swarm run metadata
```
This example initializes a `TaskQueueSwarm` with two agents, adds tasks to the queue, and runs the swarm.
### `save_json_to_file(self)`
Saves the swarm run metadata to a JSON file.
### `export_metadata(self)`
Exports the swarm run metadata as a JSON string.
- **Returns:**
- `str`: JSON string of the swarm run metadata.
## Additional Notes
- The `TaskQueueSwarm` uses threading to process tasks concurrently, which can significantly improve performance for I/O-bound tasks.
- The `reliability_checks` method ensures that the swarm is properly configured before running.
- The swarm automatically handles task distribution among agents and provides detailed metadata about the run.
- Error handling and logging are implemented to track the execution flow and capture any issues during task processing.

@ -1,171 +1,776 @@
# Concurrent Agents API Reference # Multi-Agent Execution API Reference
This documentation covers the API for running multiple agents concurrently using various execution strategies. The implementation uses `asyncio` with `uvloop` for enhanced performance and `ThreadPoolExecutor` for handling CPU-bound operations. This comprehensive documentation covers all functions in the `multi_agent_exec.py` module for running multiple agents using various execution strategies. The module provides synchronous and asynchronous execution methods, optimized performance with uvloop, and utility functions for information retrieval.
## Table of Contents ## Function Overview
- [Core Functions](#core-functions)
- [Advanced Functions](#advanced-functions)
- [Utility Functions](#utility-functions)
- [Resource Monitoring](#resource-monitoring)
- [Usage Examples](#usage-examples)
## Core Functions | Function | Signature | Category | Description |
|----------|-----------|----------|-------------|
| `run_single_agent` | `run_single_agent(agent, task, *args, **kwargs) -> Any` | Single Agent | Runs a single agent synchronously |
| `run_agent_async` | `run_agent_async(agent, task) -> Any` | Single Agent | Runs a single agent asynchronously using asyncio |
| `run_agents_concurrently_async` | `run_agents_concurrently_async(agents, task) -> List[Any]` | Concurrent Execution | Runs multiple agents concurrently using asyncio |
| `run_agents_concurrently` | `run_agents_concurrently(agents, task, max_workers=None) -> List[Any]` | Concurrent Execution | Optimized concurrent agent runner using ThreadPoolExecutor |
| `run_agents_concurrently_multiprocess` | `run_agents_concurrently_multiprocess(agents, task, batch_size=None) -> List[Any]` | Concurrent Execution | Manages agents concurrently in batches with optimized performance |
| `batched_grid_agent_execution` | `batched_grid_agent_execution(agents, tasks, max_workers=None) -> List[Any]` | Batched & Grid | Runs multiple agents with different tasks concurrently |
| `run_agents_with_different_tasks` | `run_agents_with_different_tasks(agent_task_pairs, batch_size=10, max_workers=None) -> List[Any]` | Batched & Grid | Runs agents with different tasks concurrently in batches |
| `run_agents_concurrently_uvloop` | `run_agents_concurrently_uvloop(agents, task, max_workers=None) -> List[Any]` | UVLoop Optimized | Runs agents concurrently using uvloop for optimized performance |
| `run_agents_with_tasks_uvloop` | `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None) -> List[Any]` | UVLoop Optimized | Runs agents with different tasks using uvloop optimization |
| `get_swarms_info` | `get_swarms_info(swarms) -> str` | Utility | Fetches and formats information about available swarms |
| `get_agents_info` | `get_agents_info(agents, team_name=None) -> str` | Utility | Fetches and formats information about available agents |
### run_agents_concurrently() ## Single Agent Functions
Primary function for running multiple agents concurrently with optimized performance using both uvloop and ThreadPoolExecutor. ### `run_single_agent(agent, task, *args, **kwargs)`
#### Arguments Runs a single agent synchronously.
| Parameter | Type | Required | Default | Description | #### Signature
|-------------|----------------|----------|----------------|-------------| ```python
| agents | List[AgentType]| Yes | - | List of Agent instances to run concurrently | def run_single_agent(
| task | str | Yes | - | Task string to execute | agent: AgentType,
| batch_size | int | No | CPU count | Number of agents to run in parallel in each batch | task: str,
| max_workers | int | No | CPU count * 2 | Maximum number of threads in the executor | *args,
**kwargs
) -> Any
```
#### Parameters
| Parameter | Type | Required | Description |
|-----------|-----------|----------|-------------|
| `agent` | `AgentType` | Yes | Agent instance to run |
| `task` | `str` | Yes | Task string to execute |
| `*args` | `Any` | No | Additional positional arguments |
| `**kwargs`| `Any` | No | Additional keyword arguments |
#### Returns #### Returns
`List[Any]`: List of outputs from each agent - `Any`: Agent execution result
#### Example
#### Flow Diagram ```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_single_agent
agent = Agent(
agent_name="Financial-Analyst",
system_prompt="You are a financial analysis expert",
model_name="gpt-4o-mini",
max_loops=1
)
```mermaid result = run_single_agent(agent, "Analyze the current stock market trends")
graph TD print(result)
A[Start] --> B[Initialize ThreadPoolExecutor]
B --> C[Split Agents into Batches]
C --> D[Process Batch]
D --> E{More Batches?}
E -->|Yes| D
E -->|No| F[Combine Results]
F --> G[Return Results]
subgraph "Batch Processing"
D --> H[Run Agents Async]
H --> I[Wait for Completion]
I --> J[Collect Batch Results]
end
``` ```
### run_agents_sequentially() ### `run_agent_async(agent, task)`
Runs multiple agents sequentially for baseline comparison or simple use cases. Runs a single agent asynchronously using asyncio.
#### Arguments #### Signature
```python
async def run_agent_async(agent: AgentType, task: str) -> Any
```
| Parameter | Type | Required | Default | Description | #### Parameters
|-----------|----------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances to run | | Parameter | Type | Required | Description |
| task | str | Yes | - | Task string to execute | |-----------|-------------|----------|-------------|
| `agent` | `AgentType` | Yes | Agent instance to run |
| `task` | `str` | Yes | Task string to execute |
#### Returns #### Returns
`List[Any]`: List of outputs from each agent
## Advanced Functions - `Any`: Agent execution result
#### Example
```python
import asyncio
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agent_async
async def main():
agent = Agent(
agent_name="Researcher",
system_prompt="You are a research assistant",
model_name="gpt-4o-mini",
max_loops=1
)
### run_agents_with_different_tasks() result = await run_agent_async(agent, "Research AI advancements in 2024")
print(result)
Runs multiple agents with different tasks concurrently. asyncio.run(main())
```
#### Arguments ## Concurrent Execution Functions
| Parameter | Type | Required | Default | Description | ### `run_agents_concurrently_async(agents, task)`
|-----------------|-------------------------------|----------|----------------|-------------|
| agent_task_pairs| List[tuple[AgentType, str]] | Yes | - | List of (agent, task) tuples |
| batch_size | int | No | CPU count | Number of agents to run in parallel |
| max_workers | int | No | CPU count * 2 | Maximum number of threads |
### run_agents_with_timeout() Runs multiple agents concurrently using asyncio.
Runs multiple agents concurrently with timeout limits. #### Signature
#### Arguments ```python
async def run_agents_concurrently_async(
agents: List[AgentType],
task: str
) -> List[Any]
```
#### Parameters
| Parameter | Type | Required | Default | Description | | Parameter | Type | Required | Description |
|-------------|----------------|----------|----------------|-------------| |-----------|-------------------|----------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances | | `agents` | `List[AgentType]` | Yes | List of Agent instances to run concurrently |
| task | str | Yes | - | Task string to execute | | `task` | `str` | Yes | Task string to execute by all agents |
| timeout | float | Yes | - | Timeout in seconds for each agent |
| batch_size | int | No | CPU count | Number of agents to run in parallel | #### Returns
| max_workers | int | No | CPU count * 2 | Maximum number of threads |
## Usage Examples - `List[Any]`: List of outputs from each agent
#### Example
```python ```python
import asyncio
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import ( from swarms.structs.multi_agent_exec import run_agents_concurrently_async
run_agents_concurrently,
run_agents_with_timeout, async def main():
run_agents_with_different_tasks agents = [
) Agent(
agent_name=f"Analyst-{i}",
system_prompt="You are a market analyst",
model_name="gpt-4o-mini",
max_loops=1
)
for i in range(3)
]
task = "Analyze the impact of AI on job markets"
results = await run_agents_concurrently_async(agents, task)
for i, result in enumerate(results):
print(f"Agent {i+1} result: {result}")
asyncio.run(main())
```
### `run_agents_concurrently(agents, task, max_workers=None)`
Optimized concurrent agent runner using ThreadPoolExecutor.
#### Signature
```python
def run_agents_concurrently(
agents: List[AgentType],
task: str,
max_workers: Optional[int] = None,
) -> List[Any]
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|--------------|-------------------|----------|---------|-------------|
| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run concurrently |
| `task` | `str` | Yes | - | Task string to execute |
| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor |
#### Returns
- `List[Any]`: List of outputs from each agent (exceptions included if agents fail)
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
# Create multiple agents
agents = [
Agent(
agent_name="Tech-Analyst",
system_prompt="You are a technology analyst",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="Finance-Analyst",
system_prompt="You are a financial analyst",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="Market-Strategist",
system_prompt="You are a market strategist",
model_name="gpt-4o-mini",
max_loops=1
)
]
task = "Analyze the future of electric vehicles in 2025"
results = run_agents_concurrently(agents, task, max_workers=4)
for i, result in enumerate(results):
print(f"Agent {i+1} ({agents[i].agent_name}): {result}")
```
### `run_agents_concurrently_multiprocess(agents, task, batch_size=None)`
Manages and runs multiple agents concurrently in batches with optimized performance.
#### Signature
```python
def run_agents_concurrently_multiprocess(
agents: List[Agent],
task: str,
batch_size: int = os.cpu_count()
) -> List[Any]
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|--------------|---------------|----------|--------------|-------------|
| `agents` | `List[Agent]` | Yes | - | List of Agent instances to run concurrently |
| `task` | `str` | Yes | - | Task string to execute by all agents |
| `batch_size` | `int` | No | CPU count | Number of agents to run in parallel in each batch |
#### Returns
- `List[Any]`: List of outputs from each agent
#### Example
```python
import os
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_multiprocess
# Initialize agents using only the built-in model_name parameter
agents = [ agents = [
Agent( Agent(
agent_name=f"Analysis-Agent-{i}", agent_name=f"Research-Agent-{i}",
system_prompt="You are a financial analysis expert", system_prompt="You are a research specialist",
model_name="gpt-4o-mini", model_name="gpt-4o-mini",
max_loops=1 max_loops=1
) )
for i in range(5) for i in range(5)
] ]
# Basic concurrent execution task = "Research the benefits of renewable energy"
task = "Analyze the impact of rising interest rates on tech stocks" batch_size = os.cpu_count() # Use all CPU cores
outputs = run_agents_concurrently(agents, task) results = run_agents_concurrently_multiprocess(agents, task, batch_size)
# Running with timeout print(f"Completed {len(results)} agent executions")
outputs_with_timeout = run_agents_with_timeout( ```
agents=agents,
task=task, ## Batched and Grid Execution
timeout=30.0,
batch_size=2 ### `batched_grid_agent_execution(agents, tasks, max_workers=None)`
)
Runs multiple agents with different tasks concurrently using batched grid execution.
#### Signature
```python
def batched_grid_agent_execution(
agents: List["AgentType"],
tasks: List[str],
max_workers: int = None,
) -> List[Any]
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|--------------|---------------------|----------|---------|-------------|
| `agents` | `List[AgentType]` | Yes | - | List of agent instances |
| `tasks` | `List[str]` | Yes | - | List of tasks, one for each agent |
| `max_workers`| `int` | No | 90% of CPU cores | Maximum number of threads to use |
#### Returns
- `List[Any]`: List of results from each agent
#### Raises
- `ValueError`: If number of agents doesn't match number of tasks
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import batched_grid_agent_execution
agents = [
Agent(
agent_name="Data-Scientist",
system_prompt="You are a data science expert",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="ML-Engineer",
system_prompt="You are a machine learning engineer",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="AI-Researcher",
system_prompt="You are an AI researcher",
model_name="gpt-4o-mini",
max_loops=1
)
]
# Running different tasks tasks = [
task_pairs = [ "Analyze machine learning algorithms performance",
(agents[0], "Analyze tech stocks"), "Design a neural network architecture",
(agents[1], "Analyze energy stocks"), "Research latest AI breakthroughs"
(agents[2], "Analyze retail stocks")
] ]
different_outputs = run_agents_with_different_tasks(task_pairs)
results = batched_grid_agent_execution(agents, tasks, max_workers=3)
for i, result in enumerate(results):
print(f"Task {i+1}: {tasks[i]}")
print(f"Result: {result}\n")
``` ```
## Resource Monitoring ### `run_agents_with_different_tasks(agent_task_pairs, batch_size=10, max_workers=None)`
Runs multiple agents with different tasks concurrently, processing them in batches.
#### Signature
```python
def run_agents_with_different_tasks(
agent_task_pairs: List[tuple["AgentType", str]],
batch_size: int = 10,
max_workers: int = None,
) -> List[Any]
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|--------------------|-----------------------------------|----------|---------|-------------|
| `agent_task_pairs` | `List[tuple[AgentType, str]]` | Yes | - | List of (agent, task) tuples |
| `batch_size` | `int` | No | 10 | Number of agents to run in parallel in each batch |
| `max_workers` | `int` | No | None | Maximum number of threads |
#### Returns
- `List[Any]`: List of outputs from each agent, in the same order as input pairs
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_with_different_tasks
# Create agents
agents = [
Agent(
agent_name="Content-Writer",
system_prompt="You are a content writer",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="Editor",
system_prompt="You are an editor",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="SEO-Specialist",
system_prompt="You are an SEO specialist",
model_name="gpt-4o-mini",
max_loops=1
)
]
# Create agent-task pairs
agent_task_pairs = [
(agents[0], "Write a blog post about sustainable living"),
(agents[1], "Edit and improve this article draft"),
(agents[2], "Optimize this content for SEO")
]
### ResourceMetrics results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2)
A dataclass for system resource metrics. for i, result in enumerate(results):
agent, task = agent_task_pairs[i]
print(f"{agent.agent_name} - {task}: {result}")
```
#### Properties ## UVLoop Optimized Functions
| Property | Type | Description | ### `run_agents_concurrently_uvloop(agents, task, max_workers=None)`
|----------------|-------|-------------|
| cpu_percent | float | Current CPU usage percentage |
| memory_percent | float | Current memory usage percentage |
| active_threads | int | Number of active threads |
### run_agents_with_resource_monitoring() Runs multiple agents concurrently using uvloop for optimized async performance.
Runs agents with system resource monitoring and adaptive batch sizing. #### Signature
```python
def run_agents_concurrently_uvloop(
agents: List[AgentType],
task: str,
max_workers: Optional[int] = None,
) -> List[Any]
```
#### Arguments #### Parameters
| Parameter | Type | Required | Default | Description | | Parameter | Type | Required | Default | Description |
|------------------|----------------|----------|---------|-------------| |--------------|---------------------|----------|---------|-------------|
| agents | List[AgentType]| Yes | - | List of Agent instances | | `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run concurrently |
| task | str | Yes | - | Task string to execute | | `task` | `str` | Yes | - | Task string to execute by all agents |
| cpu_threshold | float | No | 90.0 | Max CPU usage percentage | | `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor |
| memory_threshold | float | No | 90.0 | Max memory usage percentage |
| check_interval | float | No | 1.0 | Resource check interval in seconds |
## Performance Considerations #### Returns
- `List[Any]`: List of outputs from each agent
- Default batch sizes and worker counts are optimized based on CPU cores #### Raises
- Resource monitoring helps prevent system overload - `ImportError`: If uvloop is not installed
- Using `uvloop` provides better performance than standard `asyncio` - `RuntimeError`: If uvloop cannot be set as the event loop policy
## Error Handling #### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop
# Note: uvloop must be installed (pip install uvloop)
agents = [
Agent(
agent_name="Performance-Analyst",
system_prompt="You are a performance analyst",
model_name="gpt-4o-mini",
max_loops=1
)
for _ in range(5)
]
task = "Analyze system performance metrics"
results = run_agents_concurrently_uvloop(agents, task)
print(f"Processed {len(results)} agents with uvloop optimization")
```
### `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None)`
Runs multiple agents with different tasks concurrently using uvloop.
#### Signature
```python
def run_agents_with_tasks_uvloop(
agents: List[AgentType],
tasks: List[str],
max_workers: Optional[int] = None,
) -> List[Any]
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|--------------|---------------------|----------|---------|-------------|
| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run |
| `tasks` | `List[str]` | Yes | - | List of task strings (must match number of agents) |
| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads |
#### Returns
- `List[Any]`: List of outputs from each agent
#### Raises
- `ValueError`: If number of agents doesn't match number of tasks
- `ImportError`: If uvloop is not installed
- `RuntimeError`: If uvloop cannot be set as the event loop policy
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_with_tasks_uvloop
agents = [
Agent(
agent_name="Data-Analyst-1",
system_prompt="You are a data analyst",
model_name="gpt-4o-mini",
max_loops=1
),
Agent(
agent_name="Data-Analyst-2",
system_prompt="You are a data analyst",
model_name="gpt-4o-mini",
max_loops=1
)
]
tasks = [
"Analyze sales data from Q1 2024",
"Analyze customer satisfaction metrics"
]
results = run_agents_with_tasks_uvloop(agents, tasks)
for i, result in enumerate(results):
print(f"Task: {tasks[i]}")
print(f"Result: {result}\n")
```
## Utility Functions
### `get_swarms_info(swarms)`
Fetches and formats information about all available swarms in the system.
#### Signature
```python
def get_swarms_info(swarms: List[Callable]) -> str
```
#### Parameters
| Parameter | Type | Required | Description |
|-----------|-------------------|----------|-------------|
| `swarms` | `List[Callable]` | Yes | List of swarm objects to get information about |
#### Returns
- `str`: Formatted string containing names and descriptions of all swarms
#### Example
```python
from swarms.structs.multi_agent_exec import get_swarms_info
# Assuming you have swarm objects
swarms = [
# Your swarm objects here
]
info = get_swarms_info(swarms)
print(info)
# Output:
# Available Swarms:
#
# [Swarm 1]
# Name: ResearchSwarm
# Description: A swarm for research tasks
# Length of Agents: 3
# Swarm Type: hierarchical
```
### `get_agents_info(agents, team_name=None)`
Fetches and formats information about all available agents in the system.
#### Signature
```python
def get_agents_info(
agents: List[Union[Agent, Callable]],
team_name: str = None
) -> str
```
#### Parameters
| Parameter | Type | Required | Default | Description |
|-------------|-----------------------------------|----------|---------|-------------|
| `agents` | `List[Union[Agent, Callable]]` | Yes | - | List of agent objects to get information about |
| `team_name` | `str` | No | None | Optional team name to display |
#### Returns
- `str`: Formatted string containing names and descriptions of all agents
#### Example
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import get_agents_info
agents = [
Agent(
agent_name="Research-Agent",
system_prompt="You are a research assistant",
model_name="gpt-4o-mini",
max_loops=2,
role="Researcher"
),
Agent(
agent_name="Analysis-Agent",
system_prompt="You are a data analyst",
model_name="gpt-4o-mini",
max_loops=1,
role="Analyst"
)
]
info = get_agents_info(agents, team_name="Data Team")
print(info)
# Output:
# Available Agents for Team: Data Team
#
# [Agent 1]
# Name: Research-Agent
# Description: You are a research assistant
# Role: Researcher
# Model: gpt-4o-mini
# Max Loops: 2
#
# [Agent 2]
# Name: Analysis-Agent
# Description: You are a data analyst
# Role: Analyst
# Model: gpt-4o-mini
# Max Loops: 1
```
## Complete Usage Examples
### Advanced Multi-Agent Workflow Example
```python
import asyncio
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import (
run_agents_concurrently,
run_agents_with_different_tasks,
batched_grid_agent_execution,
get_agents_info
)
# Create specialized agents
agents = [
Agent(
agent_name="Market-Researcher",
system_prompt="You are a market research expert specializing in consumer behavior",
model_name="gpt-4o-mini",
max_loops=1,
role="Researcher"
),
Agent(
agent_name="Data-Analyst",
system_prompt="You are a data analyst expert in statistical analysis",
model_name="gpt-4o-mini",
max_loops=1,
role="Analyst"
),
Agent(
agent_name="Strategy-Consultant",
system_prompt="You are a strategy consultant specializing in business development",
model_name="gpt-4o-mini",
max_loops=1,
role="Consultant"
),
Agent(
agent_name="Financial-Advisor",
system_prompt="You are a financial advisor specializing in investment strategies",
model_name="gpt-4o-mini",
max_loops=1,
role="Advisor"
)
]
# Display agent information
print("=== Agent Information ===")
print(get_agents_info(agents, "Business Intelligence Team"))
print("\n" + "="*50 + "\n")
# Example 1: Same task for all agents
print("=== Example 1: Concurrent Execution with Same Task ===")
task = "Analyze the impact of remote work trends on commercial real estate market in 2024"
results = run_agents_concurrently(agents, task, max_workers=4)
for i, result in enumerate(results):
print(f"\n{agents[i].agent_name} Analysis:")
print(f"Result: {result}")
print("\n" + "="*50 + "\n")
# Example 2: Different tasks for different agents
print("=== Example 2: Different Tasks for Different Agents ===")
agent_task_pairs = [
(agents[0], "Research consumer preferences for electric vehicles"),
(agents[1], "Analyze sales data for EV market penetration"),
(agents[2], "Develop marketing strategy for EV adoption"),
(agents[3], "Assess financial viability of EV charging infrastructure")
]
results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2)
for i, result in enumerate(results):
agent, task = agent_task_pairs[i]
print(f"\n{agent.agent_name} - Task: {task}")
print(f"Result: {result}")
print("\n" + "="*50 + "\n")
# Example 3: Grid execution with matched agents and tasks
print("=== Example 3: Batched Grid Execution ===")
grid_agents = agents[:3] # Use first 3 agents
grid_tasks = [
"Forecast market trends for renewable energy",
"Evaluate risk factors in green technology investments",
"Compare traditional vs sustainable investment portfolios"
]
grid_results = batched_grid_agent_execution(grid_agents, grid_tasks, max_workers=3)
for i, result in enumerate(grid_results):
print(f"\nTask {i+1}: {grid_tasks[i]}")
print(f"Agent: {grid_agents[i].agent_name}")
print(f"Result: {result}")
print("\n=== Workflow Complete ===")
```
### Error Handling and Best Practices
```python
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import run_agents_concurrently
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create agents with error handling
agents = [
Agent(
agent_name=f"Agent-{i}",
system_prompt="You are a helpful assistant",
model_name="gpt-4o-mini",
max_loops=1
)
for i in range(5)
]
task = "Perform a complex analysis task"
try:
results = run_agents_concurrently(agents, task, max_workers=4)
# Handle results (some may be exceptions)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Agent {i+1} failed with error: {result}")
else:
print(f"Agent {i+1} succeeded: {result}")
except Exception as e:
print(f"Execution failed: {e}")
# Best practices:
# 1. Always handle exceptions in results
# 2. Use appropriate max_workers based on system resources
# 3. Monitor memory usage for large agent counts
# 4. Consider batch processing for very large numbers of agents
# 5. Use uvloop functions for I/O intensive tasks
```
## Performance Considerations
- Functions handle asyncio event loop creation/retrieval | Technique | Best Use Case / Description |
- Timeout mechanism prevents infinite waiting |------------------------|------------------------------------------------------------------------------------|
- Resource monitoring allows for adaptive performance adjustment | **ThreadPoolExecutor** | Best for CPU-bound tasks with moderate I/O |
| **uvloop** | Optimized for I/O-bound tasks, significantly faster than standard asyncio |
| **Batch Processing** | Prevents system overload with large numbers of agents |
| **Resource Monitoring**| Adjust worker counts based on system capabilities |
| **Async/Await** | Use async functions for better concurrency control |

@ -1,15 +1,6 @@
import os from swarms import Agent
from swarms.structs.multi_agent_exec import (
from swarm_models import OpenAIChat batched_grid_agent_execution,
from swarms import Agent, run_agents_with_tasks_concurrently
# Fetch the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
) )
# Initialize agents for different roles # Initialize agents for different roles
@ -27,7 +18,7 @@ delaware_ccorp_agent = Agent(
corporate law and ensure that all hiring practices are in compliance with corporate law and ensure that all hiring practices are in compliance with
state and federal regulations. state and federal regulations.
""", """,
llm=model, model_name="claude-sonnet-4-20250514",
max_loops=1, max_loops=1,
autosave=False, autosave=False,
dashboard=False, dashboard=False,
@ -54,7 +45,7 @@ indian_foreign_agent = Agent(
implications of hiring foreign nationals and the requirements for obtaining implications of hiring foreign nationals and the requirements for obtaining
necessary visas and work permits. necessary visas and work permits.
""", """,
llm=model, model_name="claude-sonnet-4-20250514",
max_loops=1, max_loops=1,
autosave=False, autosave=False,
dashboard=False, dashboard=False,
@ -86,11 +77,10 @@ tasks = [
""", """,
] ]
# Run agents with tasks concurrently results = batched_grid_agent_execution(
results = run_agents_with_tasks_concurrently( agents=agents,
agents, tasks, all_cores=True, device="cpu", no_clusterops=True tasks=tasks,
) )
# Print the results for result in results:
# for result in results: print(result)
# print(result)

@ -0,0 +1,48 @@
# uvloop Examples
This directory contains examples demonstrating the use of uvloop for running multiple agents concurrently with improved performance.
## Files
- `utils.py`: Utility functions for creating example agents
- `same_task_example.py`: Example of running multiple agents with the same task
- `different_tasks_example.py`: Example of running agents with different tasks
- `performance_info.py`: Information about uvloop performance benefits
- `run_all_examples.py`: Script to run all examples
## Prerequisites
Set your OpenAI API key:
```bash
export OPENAI_API_KEY='your-api-key-here'
```
## Usage
### Individual Examples
Run a specific example:
```python
from same_task_example import run_same_task_example
results = run_same_task_example()
```
### Run All Examples
```python
from run_all_examples import run_all_uvloop_examples
all_results = run_all_uvloop_examples()
```
## Performance Benefits
uvloop provides:
- ~2-4x faster execution compared to standard asyncio
- Better performance for I/O-bound operations
- Lower latency and higher throughput
- Automatic fallback to asyncio if uvloop is unavailable
## Functions Used
- `run_agents_concurrently_uvloop`: For running multiple agents with the same task
- `run_agents_with_tasks_uvloop`: For running agents with different tasks

@ -0,0 +1,66 @@
"""
Example demonstrating running agents with different tasks using uvloop.
This example shows how to use run_agents_with_tasks_uvloop to execute
different tasks across multiple agents concurrently.
"""
import os
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import (
run_agents_with_tasks_uvloop,
)
def create_example_agents(num_agents: int = 3):
"""
Create example agents for demonstration.
Args:
num_agents: Number of agents to create
Returns:
List of Agent instances
"""
agents = []
for i in range(num_agents):
agent = Agent(
agent_name=f"Agent_{i+1}",
system_prompt=f"You are Agent {i+1}, a helpful AI assistant.",
model_name="gpt-4o-mini", # Using a lightweight model for examples
max_loops=1,
autosave=False,
verbose=False,
)
agents.append(agent)
return agents
def run_different_tasks_example():
"""
Run agents with different tasks using uvloop.
Returns:
List of results from each agent
"""
# Check if API key is available
if not os.getenv("OPENAI_API_KEY"):
raise ValueError(
"OPENAI_API_KEY environment variable must be set"
)
agents = create_example_agents(3)
tasks = [
"Explain what machine learning is in simple terms.",
"Describe the benefits of cloud computing.",
"What are the main challenges in natural language processing?",
]
results = run_agents_with_tasks_uvloop(agents, tasks)
return results
if __name__ == "__main__":
results = run_different_tasks_example()
# Results can be processed further as needed

@ -0,0 +1,38 @@
"""
Runner script to execute all uvloop examples.
This script demonstrates how to run multiple uvloop-based agent execution examples.
"""
import os
from same_task_example import run_same_task_example
from different_tasks_example import run_different_tasks_example
def run_all_uvloop_examples():
"""
Execute all uvloop examples.
Returns:
Dictionary containing results from all examples
"""
# Check if API key is available
if not os.getenv("OPENAI_API_KEY"):
raise ValueError(
"OPENAI_API_KEY environment variable must be set"
)
results = {}
# Run same task example
results["same_task"] = run_same_task_example()
# Run different tasks example
results["different_tasks"] = run_different_tasks_example()
return results
if __name__ == "__main__":
all_results = run_all_uvloop_examples()
# Process results as needed

@ -0,0 +1,57 @@
import os
from swarms.structs.agent import Agent
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_uvloop,
)
def create_example_agents(num_agents: int = 3):
"""
Create example agents for demonstration.
Args:
num_agents: Number of agents to create
Returns:
List of Agent instances
"""
agents = []
for i in range(num_agents):
agent = Agent(
agent_name=f"Agent_{i+1}",
system_prompt=f"You are Agent {i+1}, a helpful AI assistant.",
model_name="gpt-4o-mini", # Using a lightweight model for examples
max_loops=1,
autosave=False,
verbose=False,
)
agents.append(agent)
return agents
def run_same_task_example():
"""
Run multiple agents with the same task using uvloop.
Returns:
List of results from each agent
"""
# Check if API key is available
if not os.getenv("OPENAI_API_KEY"):
raise ValueError(
"OPENAI_API_KEY environment variable must be set"
)
agents = create_example_agents(3)
task = (
"Write a one-sentence summary about artificial intelligence."
)
results = run_agents_concurrently_uvloop(agents, task)
return results
if __name__ == "__main__":
results = run_same_task_example()
# Results can be processed further as needed

@ -0,0 +1,122 @@
"""
Example demonstrating the use of uvloop for running multiple agents concurrently.
This example shows how to use the new uvloop-based functions:
- run_agents_concurrently_uvloop: For running multiple agents with the same task
- run_agents_with_tasks_uvloop: For running agents with different tasks
uvloop provides significant performance improvements over standard asyncio,
especially for I/O-bound operations and concurrent task execution.
"""
import os
from swarms.structs.multi_agent_exec import (
run_agents_concurrently_uvloop,
run_agents_with_tasks_uvloop,
)
from swarms.structs.agent import Agent
def create_example_agents(num_agents: int = 3):
"""Create example agents for demonstration."""
agents = []
for i in range(num_agents):
agent = Agent(
agent_name=f"Agent_{i+1}",
system_prompt=f"You are Agent {i+1}, a helpful AI assistant.",
model_name="gpt-4o-mini", # Using a lightweight model for examples
max_loops=1,
autosave=False,
verbose=False,
)
agents.append(agent)
return agents
def example_same_task():
"""Example: Running multiple agents with the same task using uvloop."""
print("=== Example 1: Same Task for All Agents (uvloop) ===")
agents = create_example_agents(3)
task = (
"Write a one-sentence summary about artificial intelligence."
)
print(f"Running {len(agents)} agents with the same task...")
print(f"Task: {task}")
try:
results = run_agents_concurrently_uvloop(agents, task)
print("\nResults:")
for i, result in enumerate(results, 1):
print(f"Agent {i}: {result}")
except Exception as e:
print(f"Error: {e}")
def example_different_tasks():
"""Example: Running agents with different tasks using uvloop."""
print(
"\n=== Example 2: Different Tasks for Each Agent (uvloop) ==="
)
agents = create_example_agents(3)
tasks = [
"Explain what machine learning is in simple terms.",
"Describe the benefits of cloud computing.",
"What are the main challenges in natural language processing?",
]
print(f"Running {len(agents)} agents with different tasks...")
try:
results = run_agents_with_tasks_uvloop(agents, tasks)
print("\nResults:")
for i, (result, task) in enumerate(zip(results, tasks), 1):
print(f"Agent {i} (Task: {task[:50]}...):")
print(f" Response: {result}")
print()
except Exception as e:
print(f"Error: {e}")
def performance_comparison():
"""Demonstrate the performance benefit of uvloop vs standard asyncio."""
print("\n=== Performance Comparison ===")
# Note: This is a conceptual example. In practice, you'd need to measure actual performance
print("uvloop vs Standard asyncio:")
print("• uvloop: Cython-based event loop, ~2-4x faster")
print("• Better for I/O-bound operations")
print("• Lower latency and higher throughput")
print("• Especially beneficial for concurrent agent execution")
print("• Automatic fallback to asyncio if uvloop unavailable")
if __name__ == "__main__":
# Check if API key is available
if not os.getenv("OPENAI_API_KEY"):
print(
"Please set your OPENAI_API_KEY environment variable to run this example."
)
print("Example: export OPENAI_API_KEY='your-api-key-here'")
exit(1)
print("🚀 uvloop Multi-Agent Execution Examples")
print("=" * 50)
# Run examples
example_same_task()
example_different_tasks()
performance_comparison()
print("\n✅ Examples completed!")
print("\nTo use uvloop functions in your code:")
print(
"from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop"
)
print("results = run_agents_concurrently_uvloop(agents, task)")

@ -0,0 +1,40 @@
import os
from dotenv import load_dotenv
from swarms.structs.custom_agent import CustomAgent
load_dotenv()
# Example usage with Anthropic API
if __name__ == "__main__":
# Initialize the agent for Anthropic API
anthropic_agent = CustomAgent(
base_url="https://api.anthropic.com",
endpoint="v1/messages",
headers={
"x-api-key": os.getenv("ANTHROPIC_API_KEY"),
"anthropic-version": "2023-06-01",
},
)
# Example payload for Anthropic API
payload = {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": "Hello! Can you explain what artaddificial intelligence is?",
}
],
}
# Make the request
try:
response = anthropic_agent.run(payload)
print("Anthropic API Response:")
print(response)
print(type(response))
except Exception as e:
print(f"Error: {e}")

@ -81,6 +81,7 @@ mcp = "*"
openai = "*" openai = "*"
aiohttp = "*" aiohttp = "*"
schedule = "*" schedule = "*"
uvloop = "*"
[tool.poetry.scripts] [tool.poetry.scripts]
swarms = "swarms.cli.main:main" swarms = "swarms.cli.main:main"

@ -28,3 +28,4 @@ mcp
numpy numpy
openai openai
schedule schedule
uvloop

@ -47,16 +47,16 @@ from swarms.structs.meme_agent_persona_generator import (
from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.model_router import ModelRouter from swarms.structs.model_router import ModelRouter
from swarms.structs.multi_agent_exec import ( from swarms.structs.multi_agent_exec import (
batched_grid_agent_execution,
get_agents_info, get_agents_info,
get_swarms_info, get_swarms_info,
run_agent_with_timeout, run_agent_async,
run_agents_concurrently, run_agents_concurrently,
run_agents_concurrently_async, run_agents_concurrently_async,
run_agents_concurrently_multiprocess, run_agents_concurrently_multiprocess,
run_agents_sequentially, run_agents_concurrently_uvloop,
run_agents_with_different_tasks, run_agents_with_different_tasks,
run_agents_with_resource_monitoring, run_agents_with_tasks_uvloop,
run_agents_with_tasks_concurrently,
run_single_agent, run_single_agent,
) )
from swarms.structs.multi_agent_router import MultiAgentRouter from swarms.structs.multi_agent_router import MultiAgentRouter
@ -141,15 +141,15 @@ __all__ = [
"SwarmRouter", "SwarmRouter",
"SwarmType", "SwarmType",
"SwarmRearrange", "SwarmRearrange",
"batched_grid_agent_execution",
"run_agent_async",
"run_agents_concurrently", "run_agents_concurrently",
"run_agents_concurrently_async", "run_agents_concurrently_async",
"run_single_agent",
"run_agents_concurrently_multiprocess", "run_agents_concurrently_multiprocess",
"run_agents_sequentially", "run_agents_concurrently_uvloop",
"run_agents_with_different_tasks", "run_agents_with_different_tasks",
"run_agent_with_timeout", "run_agents_with_tasks_uvloop",
"run_agents_with_resource_monitoring", "run_single_agent",
"run_agents_with_tasks_concurrently",
"GroupChat", "GroupChat",
"expertise_based", "expertise_based",
"MultiAgentRouter", "MultiAgentRouter",

@ -332,6 +332,7 @@ class Agent:
list_of_pdf: Optional[str] = None, list_of_pdf: Optional[str] = None,
tokenizer: Optional[Any] = None, tokenizer: Optional[Any] = None,
long_term_memory: Optional[Union[Callable, Any]] = None, long_term_memory: Optional[Union[Callable, Any]] = None,
fallback_model_name: Optional[str] = None,
preset_stopping_token: Optional[bool] = False, preset_stopping_token: Optional[bool] = False,
traceback: Optional[Any] = None, traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None, traceback_handlers: Optional[Any] = None,
@ -582,6 +583,7 @@ class Agent:
self.drop_params = drop_params self.drop_params = drop_params
self.thinking_tokens = thinking_tokens self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled self.reasoning_enabled = reasoning_enabled
self.fallback_model_name = fallback_model_name
# self.init_handling() # self.init_handling()
self.setup_config() self.setup_config()
@ -2633,7 +2635,7 @@ class Agent:
""" """
return [ return [
self.run(task=task, imgs=imgs, *args, **kwargs) self.run(task=task, imgs=imgs, *args, **kwargs)
for task in tasks for task, imgs in zip(tasks, imgs)
] ]
def handle_artifacts( def handle_artifacts(

@ -0,0 +1,344 @@
import json
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union
import httpx
from loguru import logger
@dataclass
class AgentResponse:
"""Data class to hold agent response information"""
status_code: int
content: str
headers: Dict[str, str]
json_data: Optional[Dict[str, Any]] = None
success: bool = False
error_message: Optional[str] = None
class CustomAgent:
"""
A custom HTTP agent class for making POST requests using httpx.
Features:
- Configurable headers and payload
- Both sync and async execution
- Built-in error handling and logging
- Flexible response handling
- Name and description
"""
def __init__(
self,
name: str,
description: str,
base_url: str,
endpoint: str,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0,
verify_ssl: bool = True,
*args,
**kwargs,
):
"""
Initialize the Custom Agent.
Args:
base_url: Base URL for the API endpoint
endpoint: API endpoint path
headers: Default headers to include in requests
timeout: Request timeout in seconds
verify_ssl: Whether to verify SSL certificates
"""
self.base_url = base_url.rstrip("/")
self.endpoint = endpoint.lstrip("/")
self.default_headers = headers or {}
self.timeout = timeout
self.verify_ssl = verify_ssl
# Default headers
if "Content-Type" not in self.default_headers:
self.default_headers["Content-Type"] = "application/json"
logger.info(
f"CustomAgent initialized for {self.base_url}/{self.endpoint}"
)
def _prepare_headers(
self, additional_headers: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""Merge default headers with additional headers."""
headers = self.default_headers.copy()
if additional_headers:
headers.update(additional_headers)
return headers
def _prepare_payload(
self, payload: Union[Dict, str, bytes]
) -> Union[str, bytes]:
"""Prepare the payload for the request."""
if isinstance(payload, dict):
return json.dumps(payload)
return payload
def _parse_response(
self, response: httpx.Response
) -> AgentResponse:
"""Parse httpx response into AgentResponse object."""
try:
# Try to parse JSON if possible
json_data = None
if response.headers.get("content-type", "").startswith(
"application/json"
):
try:
json_data = response.json()
except json.JSONDecodeError:
pass
return AgentResponse(
status_code=response.status_code,
content=response.text,
headers=dict(response.headers),
json_data=json_data,
success=200 <= response.status_code < 300,
error_message=(
None
if 200 <= response.status_code < 300
else f"HTTP {response.status_code}"
),
)
except Exception as e:
logger.error(f"Error parsing response: {e}")
return AgentResponse(
status_code=response.status_code,
content=response.text,
headers=dict(response.headers),
success=False,
error_message=str(e),
)
def _extract_content(self, response_data: Dict[str, Any]) -> str:
"""
Extract message content from API response, supporting multiple formats.
Args:
response_data: Parsed JSON response from API
Returns:
str: Extracted message content
"""
try:
# OpenAI format
if (
"choices" in response_data
and response_data["choices"]
):
choice = response_data["choices"][0]
if (
"message" in choice
and "content" in choice["message"]
):
return choice["message"]["content"]
elif "text" in choice:
return choice["text"]
# Anthropic format
elif (
"content" in response_data
and response_data["content"]
):
if isinstance(response_data["content"], list):
# Extract text from content blocks
text_parts = []
for content_block in response_data["content"]:
if (
isinstance(content_block, dict)
and "text" in content_block
):
text_parts.append(content_block["text"])
elif isinstance(content_block, str):
text_parts.append(content_block)
return "".join(text_parts)
elif isinstance(response_data["content"], str):
return response_data["content"]
# Generic fallback - look for common content fields
elif "text" in response_data:
return response_data["text"]
elif "message" in response_data:
return response_data["message"]
elif "response" in response_data:
return response_data["response"]
# If no known format, return the entire response as JSON string
logger.warning(
"Unknown response format, returning full response"
)
return json.dumps(response_data, indent=2)
except Exception as e:
logger.error(f"Error extracting content: {e}")
return json.dumps(response_data, indent=2)
def run(
self,
payload: Union[Dict[str, Any], str, bytes],
additional_headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> str:
"""
Execute a synchronous POST request.
Args:
payload: Request body/payload
additional_headers: Additional headers for this request
**kwargs: Additional httpx client options
Returns:
str: Extracted message content from response
"""
url = f"{self.base_url}/{self.endpoint}"
request_headers = self._prepare_headers(additional_headers)
request_payload = self._prepare_payload(payload)
logger.info(f"Making POST request to: {url}")
try:
with httpx.Client(
timeout=self.timeout, verify=self.verify_ssl, **kwargs
) as client:
response = client.post(
url,
content=request_payload,
headers=request_headers,
)
if 200 <= response.status_code < 300:
logger.info(
f"Request successful: {response.status_code}"
)
try:
response_data = response.json()
return self._extract_content(response_data)
except json.JSONDecodeError:
logger.warning(
"Response is not JSON, returning raw text"
)
return response.text
else:
logger.warning(
f"Request failed: {response.status_code}"
)
return f"Error: HTTP {response.status_code} - {response.text}"
except httpx.RequestError as e:
logger.error(f"Request error: {e}")
return f"Request error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error: {e}")
return f"Unexpected error: {str(e)}"
async def run_async(
self,
payload: Union[Dict[str, Any], str, bytes],
additional_headers: Optional[Dict[str, str]] = None,
**kwargs,
) -> str:
"""
Execute an asynchronous POST request.
Args:
payload: Request body/payload
additional_headers: Additional headers for this request
**kwargs: Additional httpx client options
Returns:
str: Extracted message content from response
"""
url = f"{self.base_url}/{self.endpoint}"
request_headers = self._prepare_headers(additional_headers)
request_payload = self._prepare_payload(payload)
logger.info(f"Making async POST request to: {url}")
try:
async with httpx.AsyncClient(
timeout=self.timeout, verify=self.verify_ssl, **kwargs
) as client:
response = await client.post(
url,
content=request_payload,
headers=request_headers,
)
if 200 <= response.status_code < 300:
logger.info(
f"Async request successful: {response.status_code}"
)
try:
response_data = response.json()
return self._extract_content(response_data)
except json.JSONDecodeError:
logger.warning(
"Async response is not JSON, returning raw text"
)
return response.text
else:
logger.warning(
f"Async request failed: {response.status_code}"
)
return f"Error: HTTP {response.status_code} - {response.text}"
except httpx.RequestError as e:
logger.error(f"Async request error: {e}")
return f"Request error: {str(e)}"
except Exception as e:
logger.error(f"Unexpected async error: {e}")
return f"Unexpected error: {str(e)}"
# # Example usage with Anthropic API
# if __name__ == "__main__":
# # Initialize the agent for Anthropic API
# anthropic_agent = CustomAgent(
# base_url="https://api.anthropic.com",
# endpoint="v1/messages",
# headers={
# "x-api-key": "your-anthropic-api-key-here",
# "anthropic-version": "2023-06-01"
# }
# )
# # Example payload for Anthropic API
# payload = {
# "model": "claude-3-sonnet-20240229",
# "max_tokens": 1000,
# "messages": [
# {
# "role": "user",
# "content": "Hello! Can you explain what artificial intelligence is?"
# }
# ]
# }
# # Make the request
# try:
# response = anthropic_agent.run(payload)
# print("Anthropic API Response:")
# print(response)
# except Exception as e:
# print(f"Error: {e}")
# # Example with async usage
# # import asyncio
# #
# # async def async_example():
# # response = await anthropic_agent.run_async(payload)
# # print("Async Anthropic API Response:")
# # print(response)
# #
# # Uncomment to run async example
# # asyncio.run(async_example())

@ -1,25 +1,16 @@
import concurrent.futures
import asyncio import asyncio
import concurrent.futures
import os import os
import threading
from concurrent.futures import ( from concurrent.futures import (
ThreadPoolExecutor, ThreadPoolExecutor,
) )
from dataclasses import dataclass
from typing import Any, Callable, List, Optional, Union from typing import Any, Callable, List, Optional, Union
import psutil import uvloop
from loguru import logger
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentType from swarms.structs.omni_agent_types import AgentType
from loguru import logger
@dataclass
class ResourceMetrics:
cpu_percent: float
memory_percent: float
active_threads: int
def run_single_agent( def run_single_agent(
@ -29,42 +20,38 @@ def run_single_agent(
return agent.run(task=task, *args, **kwargs) return agent.run(task=task, *args, **kwargs)
async def run_agent_async( async def run_agent_async(agent: AgentType, task: str) -> Any:
agent: AgentType, task: str, executor: ThreadPoolExecutor
) -> Any:
""" """
Run an agent asynchronously using a thread executor. Run an agent asynchronously.
Args: Args:
agent: Agent instance to run agent: Agent instance to run
task: Task string to execute task: Task string to execute
executor: ThreadPoolExecutor instance for handling CPU-bound operations
Returns: Returns:
Agent execution result Agent execution result
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
return await loop.run_in_executor( return await loop.run_in_executor(
executor, run_single_agent, agent, task None, run_single_agent, agent, task
) )
async def run_agents_concurrently_async( async def run_agents_concurrently_async(
agents: List[AgentType], task: str, executor: ThreadPoolExecutor agents: List[AgentType], task: str
) -> List[Any]: ) -> List[Any]:
""" """
Run multiple agents concurrently using asyncio and thread executor. Run multiple agents concurrently using asyncio.
Args: Args:
agents: List of Agent instances to run concurrently agents: List of Agent instances to run concurrently
task: Task string to execute task: Task string to execute
executor: ThreadPoolExecutor for CPU-bound operations
Returns: Returns:
List of outputs from each agent List of outputs from each agent
""" """
results = await asyncio.gather( results = await asyncio.gather(
*(run_agent_async(agent, task, executor) for agent in agents) *(run_agent_async(agent, task) for agent in agents)
) )
return results return results
@ -142,14 +129,23 @@ def run_agents_concurrently_multiprocess(
def batched_grid_agent_execution( def batched_grid_agent_execution(
agents: List[AgentType], agents: List["AgentType"],
tasks: List[str], tasks: List[str],
max_workers: int = None,
) -> List[Any]: ) -> List[Any]:
""" """
Run multiple agents with different tasks concurrently. Run multiple agents with different tasks concurrently.
Args:
agents (List[AgentType]): List of agent instances.
tasks (List[str]): List of tasks, one for each agent.
max_workers (int, optional): Maximum number of threads to use. Defaults to 90% of CPU cores.
Returns:
List[Any]: List of results from each agent.
""" """
logger.info( logger.info(
f"Batch Grid Execution with {len(agents)} and number of tasks: {len(tasks)}" f"Batch Grid Execution with {len(agents)} agents and number of tasks: {len(tasks)}"
) )
if len(agents) != len(tasks): if len(agents) != len(tasks):
@ -157,289 +153,256 @@ def batched_grid_agent_execution(
"The number of agents must match the number of tasks." "The number of agents must match the number of tasks."
) )
results = [] # 90% of the available CPU cores
max_workers = max_workers or int(os.cpu_count() * 0.9)
for agent, task in zip(agents, tasks): results = [None] * len(agents)
result = run_single_agent(agent, task) with concurrent.futures.ThreadPoolExecutor(
results.append(result) max_workers=max_workers
) as executor:
future_to_index = {
executor.submit(run_single_agent, agent, task): idx
for idx, (agent, task) in enumerate(zip(agents, tasks))
}
for future in concurrent.futures.as_completed(
future_to_index
):
idx = future_to_index[future]
try:
results[idx] = future.result()
except Exception as e:
results[idx] = e
return results return results
def run_agents_sequentially(
agents: List[AgentType], task: str
) -> List[Any]:
"""
Run multiple agents sequentially for baseline comparison.
Args:
agents: List of Agent instances to run
task: Task string to execute
Returns:
List of outputs from each agent
"""
return [run_single_agent(agent, task) for agent in agents]
def run_agents_with_different_tasks( def run_agents_with_different_tasks(
agent_task_pairs: List[tuple[AgentType, str]], agent_task_pairs: List[tuple["AgentType", str]],
batch_size: int = None, batch_size: int = 10,
max_workers: int = None, max_workers: int = None,
) -> List[Any]: ) -> List[Any]:
""" """
Run multiple agents with different tasks concurrently. Run multiple agents with different tasks concurrently, processing them in batches.
This function executes each agent on its corresponding task, processing the agent-task pairs in batches
of size `batch_size` for efficient resource utilization.
Args: Args:
agent_task_pairs: List of (agent, task) tuples agent_task_pairs: List of (agent, task) tuples.
batch_size: Number of agents to run in parallel batch_size: Number of agents to run in parallel in each batch.
max_workers: Maximum number of threads max_workers: Maximum number of threads.
Returns: Returns:
List of outputs from each agent List of outputs from each agent, in the same order as the input pairs.
""" """
if not agent_task_pairs:
return []
async def run_pair_async(
pair: tuple[AgentType, str], executor: ThreadPoolExecutor
) -> Any:
agent, task = pair
return await run_agent_async(agent, task, executor)
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = [] results = []
total_pairs = len(agent_task_pairs)
try: for i in range(0, total_pairs, batch_size):
loop = asyncio.get_event_loop() batch = agent_task_pairs[i : i + batch_size]
except RuntimeError: agents, tasks = zip(*batch)
loop = asyncio.new_event_loop() batch_results = batched_grid_agent_execution(
asyncio.set_event_loop(loop) list(agents), list(tasks), max_workers=max_workers
)
with ThreadPoolExecutor(max_workers=max_workers) as executor: results.extend(batch_results)
for i in range(0, len(agent_task_pairs), batch_size):
batch = agent_task_pairs[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(
*(
run_pair_async(pair, executor)
for pair in batch
)
)
)
results.extend(batch_results)
return results return results
async def run_agent_with_timeout( def run_agents_concurrently_uvloop(
agent: AgentType, agents: List[AgentType],
task: str, task: str,
timeout: float, max_workers: Optional[int] = None,
executor: ThreadPoolExecutor, ) -> List[Any]:
) -> Any:
""" """
Run an agent with a timeout limit. Run multiple agents concurrently using uvloop for optimized async performance.
uvloop is a fast, drop-in replacement for asyncio's event loop, implemented in Cython.
It's designed to be significantly faster than the standard asyncio event loop,
especially beneficial for I/O-bound tasks and concurrent operations.
Args: Args:
agent: Agent instance to run agents: List of Agent instances to run concurrently
task: Task string to execute task: Task string to execute by all agents
timeout: Timeout in seconds max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores)
executor: ThreadPoolExecutor instance
Returns: Returns:
Agent execution result or None if timeout occurs List of outputs from each agent
Raises:
ImportError: If uvloop is not installed
RuntimeError: If uvloop cannot be set as the event loop policy
""" """
try: try:
return await asyncio.wait_for( # Set uvloop as the default event loop policy for better performance
run_agent_async(agent, task, executor), timeout=timeout asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
logger.warning(
"uvloop not available, falling back to standard asyncio. "
"Install uvloop with: pip install uvloop"
)
except RuntimeError as e:
logger.warning(
f"Could not set uvloop policy: {e}. Using default asyncio."
) )
except asyncio.TimeoutError:
return None
if max_workers is None:
# Use 95% of available CPU cores for optimal performance
num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1
def run_agents_with_timeout( logger.info(
agents: List[AgentType], f"Running {len(agents)} agents concurrently with uvloop (max_workers: {max_workers})"
task: str, )
timeout: float,
batch_size: int = None,
max_workers: int = None,
) -> List[Any]:
"""
Run multiple agents concurrently with a timeout for each agent.
Args: async def run_agents_async():
agents: List of Agent instances """Inner async function to handle the concurrent execution."""
task: Task string to execute results = []
timeout: Timeout in seconds for each agent
batch_size: Number of agents to run in parallel
max_workers: Maximum number of threads
Returns: def run_agent_sync(agent: AgentType) -> Any:
List of outputs (None for timed out agents) """Synchronous wrapper for agent execution."""
""" return agent.run(task=task)
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(
*(
run_agent_with_timeout(
agent, task, timeout, executor
)
for agent in batch
)
)
)
results.extend(batch_results)
return results with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Create tasks for all agents
tasks = [
def get_system_metrics() -> ResourceMetrics: loop.run_in_executor(executor, run_agent_sync, agent)
"""Get current system resource usage""" for agent in agents
return ResourceMetrics( ]
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
active_threads=threading.active_count(),
)
def run_agents_with_resource_monitoring(
agents: List[AgentType],
task: str,
cpu_threshold: float = 90.0,
memory_threshold: float = 90.0,
check_interval: float = 1.0,
) -> List[Any]:
"""
Run agents with system resource monitoring and adaptive batch sizing.
Args: # Wait for all tasks to complete and collect results
agents: List of Agent instances completed_tasks = await asyncio.gather(
task: Task string to execute *tasks, return_exceptions=True
cpu_threshold: Max CPU usage percentage )
memory_threshold: Max memory usage percentage
check_interval: Resource check interval in seconds
Returns: # Handle results and exceptions
List of outputs from each agent for i, result in enumerate(completed_tasks):
""" if isinstance(result, Exception):
logger.error(
f"Agent {i+1} failed with error: {result}"
)
results.append(result)
else:
results.append(result)
async def monitor_resources(): return results
while True:
metrics = get_system_metrics()
if (
metrics.cpu_percent > cpu_threshold
or metrics.memory_percent > memory_threshold
):
# Reduce batch size or pause execution
pass
await asyncio.sleep(check_interval)
# Implementation details... # Run the async function
try:
return asyncio.run(run_agents_async())
except RuntimeError as e:
if "already running" in str(e).lower():
# Handle case where event loop is already running
logger.warning(
"Event loop already running, using get_event_loop()"
)
loop = asyncio.get_event_loop()
return loop.run_until_complete(run_agents_async())
else:
raise
def _run_agents_with_tasks_concurrently( def run_agents_with_tasks_uvloop(
agents: List[AgentType], agents: List[AgentType],
tasks: List[str] = [], tasks: List[str],
batch_size: int = None, max_workers: Optional[int] = None,
max_workers: int = None,
) -> List[Any]: ) -> List[Any]:
""" """
Run multiple agents with corresponding tasks concurrently. Run multiple agents with different tasks concurrently using uvloop.
This function pairs each agent with a specific task and runs them concurrently
using uvloop for optimized performance.
Args: Args:
agents: List of Agent instances to run agents: List of Agent instances to run
tasks: List of task strings to execute tasks: List of task strings (must match number of agents)
batch_size: Number of agents to run in parallel max_workers: Maximum number of threads (defaults to 95% of CPU cores)
max_workers: Maximum number of threads
Returns: Returns:
List of outputs from each agent List of outputs from each agent
Raises:
ValueError: If number of agents doesn't match number of tasks
""" """
if len(agents) != len(tasks): if len(agents) != len(tasks):
raise ValueError( raise ValueError(
"The number of agents must match the number of tasks." f"Number of agents ({len(agents)}) must match number of tasks ({len(tasks)})"
) )
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
try: try:
loop = asyncio.get_event_loop() # Set uvloop as the default event loop policy
except RuntimeError: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop() except ImportError:
asyncio.set_event_loop(loop) logger.warning(
"uvloop not available, falling back to standard asyncio. "
async def run_agent_task_pair( "Install uvloop with: pip install uvloop"
agent: AgentType, task: str, executor: ThreadPoolExecutor )
) -> Any: except RuntimeError as e:
return await run_agent_async(agent, task, executor) logger.warning(
f"Could not set uvloop policy: {e}. Using default asyncio."
with ThreadPoolExecutor(max_workers=max_workers) as executor: )
for i in range(0, len(agents), batch_size):
batch_agents = agents[i : i + batch_size]
batch_tasks = tasks[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(
*(
run_agent_task_pair(agent, task, executor)
for agent, task in zip(
batch_agents, batch_tasks
)
)
)
)
results.extend(batch_results)
return results if max_workers is None:
num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1
logger.inufo(
f"Running {len(agents)} agents with {len(tasks)} tasks using uvloop (max_workers: {max_workers})"
)
def run_agents_with_tasks_concurrently( async def run_agents_with_tasks_async():
agents: List[AgentType], """Inner async function to handle concurrent execution with different tasks."""
tasks: List[str] = [], results = []
batch_size: int = None,
max_workers: int = None,
device: str = "cpu",
device_id: int = 1,
all_cores: bool = True,
no_clusterops: bool = False,
) -> List[Any]:
"""
Executes a list of agents with their corresponding tasks concurrently on a specified device.
This function orchestrates the concurrent execution of a list of agents with their respective tasks on a specified device, either CPU or GPU. It leverages the `exec_callable_with_clusterops` function to manage the execution on the specified device. def run_agent_task_sync(agent: AgentType, task: str) -> Any:
"""Synchronous wrapper for agent execution with specific task."""
return agent.run(task=task)
Args: loop = asyncio.get_event_loop()
agents (List[AgentType]): A list of Agent instances or callable functions to execute concurrently.
tasks (List[str], optional): A list of task strings to execute for each agent. Defaults to an empty list.
batch_size (int, optional): The number of agents to run in parallel. Defaults to None.
max_workers (int, optional): The maximum number of threads to use for execution. Defaults to None.
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
Returns: with ThreadPoolExecutor(max_workers=max_workers) as executor:
List[Any]: A list of outputs from each agent execution. # Create tasks for agent-task pairs
""" tasks_async = [
# Make the first agent not use the ifrs loop.run_in_executor(
return _run_agents_with_tasks_concurrently( executor, run_agent_task_sync, agent, task
agents, tasks, batch_size, max_workers )
) for agent, task in zip(agents, tasks)
]
# Wait for all tasks to complete
completed_tasks = await asyncio.gather(
*tasks_async, return_exceptions=True
)
# Handle results and exceptions
for i, result in enumerate(completed_tasks):
if isinstance(result, Exception):
logger.error(
f"Agent {i+1} (task: {tasks[i][:50]}...) failed with error: {result}"
)
results.append(result)
else:
results.append(result)
return results
# Run the async function
try:
return asyncio.run(run_agents_with_tasks_async())
except RuntimeError as e:
if "already running" in str(e).lower():
logger.warning(
"Event loop already running, using get_event_loop()"
)
loop = asyncio.get_event_loop()
return loop.run_until_complete(
run_agents_with_tasks_async()
)
else:
raise
def get_swarms_info(swarms: List[Callable]) -> str: def get_swarms_info(swarms: List[Callable]) -> str:

@ -429,8 +429,7 @@ class SwarmRouter:
*args, *args,
**kwargs, **kwargs,
) )
def _create_batched_grid_workflow(self, *args, **kwargs): def _create_batched_grid_workflow(self, *args, **kwargs):
"""Factory function for BatchedGridWorkflow.""" """Factory function for BatchedGridWorkflow."""
return BatchedGridWorkflow( return BatchedGridWorkflow(

@ -0,0 +1,88 @@
# Best LLM Models by Task Type
# Simplified dictionary structure with model names and categories
best_models = {
"Vision": [
{"model": "gemini/gemini-2.5-pro", "category": "Vision"},
],
"text-generation": [
{
"model": "claude-sonnet-4-20250514",
"category": "text-generation",
},
{"model": "gpt-5-chat", "category": "text-generation"},
],
}
# Function to get all models for a task type
def get_models_by_task(task_type: str) -> list:
"""
Get all models for a specific task type.
Args:
task_type (str): The task category (e.g., 'WebDev', 'Vision', 'text-generation')
Returns:
list: List of all models for the task type
"""
if task_type not in best_models:
raise ValueError(
f"Task type '{task_type}' not found. Available types: {list(best_models.keys())}"
)
return best_models[task_type]
# Function to get the first model for a task type (simplified from get_top_model)
def get_first_model(task_type: str) -> dict:
"""
Get the first model for a specific task type.
Args:
task_type (str): The task category (e.g., 'WebDev', 'Vision', 'text-generation')
Returns:
dict: First model information with model name and category
"""
if task_type not in best_models:
raise ValueError(
f"Task type '{task_type}' not found. Available types: {list(best_models.keys())}"
)
models = best_models[task_type]
if not models:
raise ValueError(
f"No models found for task type '{task_type}'"
)
return models[0]
# Function to search for a specific model across all categories
def find_model_by_name(model_name: str) -> dict:
"""
Find a model by name across all task categories.
Args:
model_name (str): The model name to search for
Returns:
dict: Model information if found, None otherwise
"""
for task_type, models in best_models.items():
for model in models:
if model["model"].lower() == model_name.lower():
return model
return None
# Function to get all available task types
def get_available_task_types() -> list:
"""
Get all available task types/categories.
Returns:
list: List of all task type names
"""
return list(best_models.keys())
Loading…
Cancel
Save