From 28995b6900127a4510dd3c3e926dddf41646ac58 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sun, 7 Sep 2025 22:38:13 -0700 Subject: [PATCH] [FEAT][New Multi-Agent Execution utilities][run_agents_concurrently_uvloop] [run_agents_with_tasks_uvloop] [Improvement][Improved the batched_grid_agent_execution to use concurrent futures] [Updated docs] [Agent][Fix batched run] [FEAT][CustomAgent] [New util examples] --- .gitignore | 1 + docs/llm.txt | 26 - docs/mkdocs.yml | 2 +- docs/swarms/structs/batched_grid_workflow.md | 3 +- docs/swarms/structs/swarm_network.md | 705 --------------- docs/swarms/structs/taskqueue_swarm.md | 90 -- .../structs/various_execution_methods.md | 833 +++++++++++++++--- .../{ => asb}/auto_swarm_builder_example.py | 0 .../concurrent_examples/concurrent_mix.py | 30 +- .../concurrent_examples/uvloop/README.md | 48 + .../uvloop/different_tasks_example.py | 66 ++ .../uvloop/run_all_examples.py | 38 + .../uvloop/same_task_example.py | 57 ++ .../agent_communication_examples.py | 0 .../{ => utils}/aggregate_example.py | 0 examples/multi_agent/utils/uvloop_example.py | 122 +++ .../external_agents/custom_agent_example.py | 40 + pyproject.toml | 1 + requirements.txt | 1 + swarms/structs/__init__.py | 18 +- swarms/structs/agent.py | 4 +- swarms/structs/custom_agent.py | 344 ++++++++ swarms/structs/multi_agent_exec.py | 453 +++++----- swarms/structs/swarm_router.py | 3 +- swarms/utils/best_models.py | 88 ++ 25 files changed, 1758 insertions(+), 1215 deletions(-) delete mode 100644 docs/swarms/structs/swarm_network.md delete mode 100644 docs/swarms/structs/taskqueue_swarm.md rename examples/multi_agent/{ => asb}/auto_swarm_builder_example.py (100%) create mode 100644 examples/multi_agent/concurrent_examples/uvloop/README.md create mode 100644 examples/multi_agent/concurrent_examples/uvloop/different_tasks_example.py create mode 100644 examples/multi_agent/concurrent_examples/uvloop/run_all_examples.py create mode 100644 examples/multi_agent/concurrent_examples/uvloop/same_task_example.py rename examples/multi_agent/{ => utils}/agent_communication_examples.py (100%) rename examples/multi_agent/{ => utils}/aggregate_example.py (100%) create mode 100644 examples/multi_agent/utils/uvloop_example.py create mode 100644 examples/single_agent/external_agents/custom_agent_example.py create mode 100644 swarms/structs/custom_agent.py create mode 100644 swarms/utils/best_models.py diff --git a/.gitignore b/.gitignore index 9df57269..0c1030a9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ databases static/generated conversations/ next_swarms_update.txt +infra.md runs Financial-Analysis-Agent_state.json conversations/ diff --git a/docs/llm.txt b/docs/llm.txt index a3f62996..c83f597f 100644 --- a/docs/llm.txt +++ b/docs/llm.txt @@ -49826,32 +49826,6 @@ A dataclass for system resource metrics. | memory_percent | float | Current memory usage percentage | | active_threads | int | Number of active threads | -### run_agents_with_resource_monitoring() - -Runs agents with system resource monitoring and adaptive batch sizing. - -#### Arguments - -| Parameter | Type | Required | Default | Description | -|------------------|----------------|----------|---------|-------------| -| agents | List[AgentType]| Yes | - | List of Agent instances | -| task | str | Yes | - | Task string to execute | -| cpu_threshold | float | No | 90.0 | Max CPU usage percentage | -| memory_threshold | float | No | 90.0 | Max memory usage percentage | -| check_interval | float | No | 1.0 | Resource check interval in seconds | - -## Performance Considerations - -- Default batch sizes and worker counts are optimized based on CPU cores -- Resource monitoring helps prevent system overload -- Using `uvloop` provides better performance than standard `asyncio` - -## Error Handling - -- Functions handle asyncio event loop creation/retrieval -- Timeout mechanism prevents infinite waiting -- Resource monitoring allows for adaptive performance adjustment - -------------------------------------------------- # File: swarms/structs/yaml_model.md diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 7e7bb0e8..1da89ac5 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -282,7 +282,7 @@ nav: - SpreadSheetSwarm: "swarms/structs/spreadsheet_swarm.md" - ForestSwarm: "swarms/structs/forest_swarm.md" - MALT: "swarms/structs/malt.md" - - Various Execution Methods: "swarms/structs/various_execution_methods.md" + - Multi-Agent Execution Utilities: "swarms/structs/various_execution_methods.md" - Deep Research Swarm: "swarms/structs/deep_research_swarm.md" - Council of Judges: "swarms/structs/council_of_judges.md" - Heavy Swarm: "swarms/structs/heavy_swarm.md" diff --git a/docs/swarms/structs/batched_grid_workflow.md b/docs/swarms/structs/batched_grid_workflow.md index e81b01e4..f5922446 100644 --- a/docs/swarms/structs/batched_grid_workflow.md +++ b/docs/swarms/structs/batched_grid_workflow.md @@ -113,8 +113,7 @@ Internal method that runs the workflow without error handling. ### Basic Usage ```python -from swarms import Agent -from swarms.structs.batched_grid_workflow import BatchedGridWorkflow +from swarms import Agent, BatchedGridWorkflow # Initialize the ETF-focused agent agent = Agent( diff --git a/docs/swarms/structs/swarm_network.md b/docs/swarms/structs/swarm_network.md deleted file mode 100644 index 1b74a85f..00000000 --- a/docs/swarms/structs/swarm_network.md +++ /dev/null @@ -1,705 +0,0 @@ -# SwarmNetwork [WIP] - -The `SwarmNetwork` class is a powerful tool for managing a pool of agents, orchestrating task distribution, and scaling resources based on workload. It is designed to handle tasks efficiently by dynamically adjusting the number of agents according to the current demand. This class also provides an optional API for interacting with the agent pool, making it accessible for integration with other systems. - -### Key Features -- **Agent Pool Management**: Dynamically manage a pool of agents. -- **Task Queue Management**: Handle tasks through a queue system. -- **Agent Health Monitoring**: Monitor the health of agents. -- **Agent Pool Scaling**: Scale the agent pool up or down based on workload. -- **API**: Interact with the agent pool and task queue through a simple API. -- **Agent Deployment Options**: Run agents on threads, processes, containers, machines, or clusters. - -### Parameters - -| Parameter | Type | Default Value | Description | -|-----------------|--------------------|---------------|-----------------------------------------------------------------------------| -| name | str | None | The name of the swarm network. | -| description | str | None | A description of the swarm network. | -| agents | List[Agent] | None | A list of agents in the pool. | -| idle_threshold | float | 0.2 | The idle threshold for the agents. | -| busy_threshold | float | 0.7 | The busy threshold for the agents. | -| api_enabled | Optional[bool] | False | A flag to enable/disable the API. | -| logging_enabled | Optional[bool] | False | A flag to enable/disable logging. | -| api_on | Optional[bool] | False | A flag to enable/disable the FastAPI instance. | -| host | str | "0.0.0.0" | The host address for the FastAPI instance. | -| port | int | 8000 | The port number for the FastAPI instance. | -| swarm_callable | Optional[callable] | None | A callable to be executed by the swarm network. | -| *args | tuple | | Additional positional arguments. | -| **kwargs | dict | | Additional keyword arguments. | - -### Attributes - -| Attribute | Type | Description | -|------------------|--------------------|----------------------------------------------------------------| -| task_queue | queue.Queue | A queue for storing tasks. | -| idle_threshold | float | The idle threshold for the agents. | -| busy_threshold | float | The busy threshold for the agents. | -| agents | List[Agent] | A list of agents in the pool. | -| api_enabled | bool | A flag to enable/disable the API. | -| logging_enabled | bool | A flag to enable/disable logging. | -| host | str | The host address for the FastAPI instance. | -| port | int | The port number for the FastAPI instance. | -| swarm_callable | Optional[callable] | A callable to be executed by the swarm network. | -| agent_dict | dict | A dictionary of agents for easy access. | -| lock | threading.Lock | A lock for synchronizing access to shared resources. | - -## Methods - -#### Description -Initializes a new instance of the `SwarmNetwork` class. - -#### Parameters -- `name` (str): The name of the swarm network. -- `description` (str): A description of the swarm network. -- `agents` (List[Agent]): A list of agents in the pool. -- `idle_threshold` (float): The idle threshold for the agents. -- `busy_threshold` (float): The busy threshold for the agents. -- `api_enabled` (Optional[bool]): A flag to enable/disable the API. -- `logging_enabled` (Optional[bool]): A flag to enable/disable logging. -- `api_on` (Optional[bool]): A flag to enable/disable the FastAPI instance. -- `host` (str): The host address for the FastAPI instance. -- `port` (int): The port number for the FastAPI instance. -- `swarm_callable` (Optional[callable]): A callable to be executed by the swarm network. -- `*args`: Additional positional arguments. -- `**kwargs`: Additional keyword arguments. - -### `add_task` - -```python -def add_task(self, task) -``` - -#### Description -Adds a task to the task queue. - -#### Parameters -- `task` (_type_): The task to be added to the queue. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -agent = Agent() -swarm = SwarmNetwork(agents=[agent]) -swarm.add_task("task") -``` - -### `async_add_task` - -```python -async def async_add_task(self, task) -``` - -#### Description -Adds a task to the task queue asynchronously. - -#### Parameters -- `task` (_type_): The task to be added to the queue. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -agent = Agent() -swarm = SwarmNetwork(agents=[agent]) -await swarm.async_add_task("task") -``` - -### `run_single_agent` - -```python -def run_single_agent(self, agent_id, task: Optional[str], *args, **kwargs) -``` - -#### Description -Runs a task on a specific agent by ID. - -#### Parameters -- `agent_id` (_type_): The ID of the agent. -- `task` (str, optional): The task to be executed by the agent. -- `*args`: Additional positional arguments. -- `**kwargs`: Additional keyword arguments. - -#### Returns -- `_type_`: The output of the agent running the task. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent = Agent( - agent_name="Financial-Analysis-Agent", - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[agent]) -result = swarm.run_single_agent(agent.id, "task") -``` - -### `run_many_agents` - -```python -def run_many_agents(self, task: Optional[str] = None, *args, **kwargs) -> List -``` - -#### Description -Runs a task on all agents in the pool. - -#### Parameters -- `task` (str, optional): The task to be executed by the agents. -- `*args`: Additional positional arguments. -- `**kwargs`: Additional keyword arguments. - -#### Returns -- `List`: The output of all agents running the task. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent = Agent( - agent_name="Financial-Analysis-Agent", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - - -swarm = SwarmNetwork(agents=[agent1, agent2]) -results = swarm.run_many_agents("task") -``` - -### `list_agents` - -```python -def list_agents(self) -``` - -#### Description -Lists all agents in the pool. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[agent]) -swarm.list_agents() -``` - -### `get_agent` - -```python -def get_agent(self, agent_id) -``` - -#### Description -Gets an agent by ID. - -#### Parameters -- `agent_id` (_type_): The ID of the agent to retrieve. - -#### Returns -- `_type_`: The agent with the specified ID. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[agent]) -retrieved_agent = swarm.get_agent(agent.id) -``` - -### `add_agent` - -```python -def add_agent(self, agent: Agent) -``` - -#### Description -Adds an agent to the agent pool. - -#### Parameters -- `agent` (_type_): The agent to be added to the pool. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[]) -swarm.add_agent(agent) -``` - -### `remove_agent` - -```python -def remove_agent(self, agent_id) -``` - -#### Description -Removes an agent from the agent pool. - -#### Parameters -- `agent_id` (_type_): The ID of the agent to be removed. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[agent]) -swarm.remove_agent(agent.id) -``` - -### ` - -async_remove_agent` - -```python -async def async_remove_agent(self, agent_id) -``` - -#### Description -Removes an agent from the agent pool asynchronously. - -#### Parameters -- `agent_id` (_type_): The ID of the agent to be removed. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[agent]) -await swarm.async_remove_agent(agent.id) -``` - -### `scale_up` - -```python -def scale_up(self, num_agents: int = 1) -``` - -#### Description -Scales up the agent pool by adding new agents. - -#### Parameters -- `num_agents` (int, optional): The number of agents to add. Defaults to 1. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - -swarm = SwarmNetwork(agents=[agent]) -swarm.scale_up(2) -``` - -### `scale_down` - -```python -def scale_down(self, num_agents: int = 1) -``` - -#### Description -Scales down the agent pool by removing agents. - -#### Parameters -- `num_agents` (int, optional): The number of agents to remove. Defaults to 1. - -#### Example - -```python -from swarms.structs.agent import Agent -from swarms.structs.swarm_net import SwarmNetwork - -# Initialize the agent -agent2 = Agent( - agent_name="ROTH-IRA-AGENT", - system_prompt=ESTATE_PLANNING_AGENT_SYS_PROMPT, - llm=model, - max_loops="auto", - autosave=True, - dashboard=False, - verbose=True, - streaming_on=True, - interactive=True, - # interactive=True, # Set to False to disable interactive mode - saved_state_path="finance_agent.json", - # tools=[Add your functions here# ], - # stopping_token="Stop!", - # interactive=True, - # docs_folder="docs", # Enter your folder name - # pdf_path="docs/finance_agent.pdf", - # sop="Calculate the profit for a company.", - # sop_list=["Calculate the profit for a company."], - user_name="swarms_corp", - # # docs= - # # docs_folder="docs", - retry_attempts=3, - # context_length=1000, - # tool_schema = dict - context_length=200000, - # agent_ops_on=True, - # long_term_memory=ChromaDB(docs_folder="artifacts"), -) - - -swarm = SwarmNetwork(agents=[agent]) -swarm.scale_down(1) -``` - -### `run` - -#### Description -Runs the swarm network, starting the FastAPI application. - -#### Example - -```python - -import os - -from dotenv import load_dotenv - -# Import the OpenAIChat model and the Agent struct -from swarms import Agent, OpenAIChat, SwarmNetwork - -# Load the environment variables -load_dotenv() - -# Get the API key from the environment -api_key = os.environ.get("OPENAI_API_KEY") - -# Initialize the language model -llm = OpenAIChat( - temperature=0.5, - openai_api_key=api_key, -) - -## Initialize the workflow -agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager") -agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager") -agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager") - - -# Load the swarmnet with the agents -swarmnet = SwarmNetwork( - agents=[agent, agent2, agent3], -) - -# List the agents in the swarm network -out = swarmnet.list_agents() -print(out) - -# Run the workflow on a task -out = swarmnet.run_single_agent( - agent2.id, "Generate a 10,000 word blog on health and wellness." -) -print(out) - - -# Run all the agents in the swarm network on a task -out = swarmnet.run_many_agents("Generate a 10,000 word blog on health and wellness.") -print(out) -``` - -## Additional Information and Tips - -- **Error Handling**: Make use of try-except blocks to handle potential errors when adding tasks, running tasks, and managing agents. -- **Logging**: Enable logging to track the activity and status of the swarm network. -- **API**: The provided API allows for easy interaction with the swarm network and can be extended as needed. -- **Asynchronous Operations**: Utilize the asynchronous methods for non-blocking operations, especially in a production environment. -- **Scaling**: Adjust the scaling thresholds (`idle_threshold` and `busy_threshold`) based on the specific needs and workload patterns. - -## References and Resources - -- [Python Queue Documentation](https://docs.python.org/3/library/queue.html) -- [Threading in Python](https://docs.python.org/3/library/threading.html) -- [FastAPI Documentation](https://fastapi.tiangolo.com/) -- [Tenacity Documentation](https://tenacity.readthedocs.io/en/latest/) - -By following this documentation, users can effectively manage and utilize the `SwarmNetwork` class to handle dynamic workloads and maintain an efficient pool of agents. diff --git a/docs/swarms/structs/taskqueue_swarm.md b/docs/swarms/structs/taskqueue_swarm.md deleted file mode 100644 index 4145e69a..00000000 --- a/docs/swarms/structs/taskqueue_swarm.md +++ /dev/null @@ -1,90 +0,0 @@ -# TaskQueueSwarm Documentation - -The `TaskQueueSwarm` class is designed to manage and execute tasks using multiple agents concurrently. This class allows for the orchestration of multiple agents processing tasks from a shared queue, facilitating complex workflows where tasks can be distributed and processed in parallel by different agents. - -## Attributes - -| Attribute | Type | Description | -|-----------|------|-------------| -| `agents` | `List[Agent]` | The list of agents in the swarm. | -| `task_queue` | `queue.Queue` | A queue to store tasks for processing. | -| `lock` | `threading.Lock` | A lock for thread synchronization. | -| `autosave_on` | `bool` | Whether to automatically save the swarm metadata. | -| `save_file_path` | `str` | The file path for saving swarm metadata. | -| `workspace_dir` | `str` | The directory path of the workspace. | -| `return_metadata_on` | `bool` | Whether to return the swarm metadata after running. | -| `max_loops` | `int` | The maximum number of loops to run the swarm. | -| `metadata` | `SwarmRunMetadata` | Metadata about the swarm run. | - -## Methods - -### `__init__(self, agents: List[Agent], name: str = "Task-Queue-Swarm", description: str = "A swarm that processes tasks from a queue using multiple agents on different threads.", autosave_on: bool = True, save_file_path: str = "swarm_run_metadata.json", workspace_dir: str = os.getenv("WORKSPACE_DIR"), return_metadata_on: bool = False, max_loops: int = 1, *args, **kwargs)` - -The constructor initializes the `TaskQueueSwarm` object. - -- **Parameters:** - - `agents` (`List[Agent]`): The list of agents in the swarm. - - `name` (`str`, optional): The name of the swarm. Defaults to "Task-Queue-Swarm". - - `description` (`str`, optional): The description of the swarm. Defaults to "A swarm that processes tasks from a queue using multiple agents on different threads.". - - `autosave_on` (`bool`, optional): Whether to automatically save the swarm metadata. Defaults to True. - - `save_file_path` (`str`, optional): The file path to save the swarm metadata. Defaults to "swarm_run_metadata.json". - - `workspace_dir` (`str`, optional): The directory path of the workspace. Defaults to os.getenv("WORKSPACE_DIR"). - - `return_metadata_on` (`bool`, optional): Whether to return the swarm metadata after running. Defaults to False. - - `max_loops` (`int`, optional): The maximum number of loops to run the swarm. Defaults to 1. - - `*args`: Variable length argument list. - - `**kwargs`: Arbitrary keyword arguments. - -### `add_task(self, task: str)` - -Adds a task to the queue. - -- **Parameters:** - - `task` (`str`): The task to be added to the queue. - -### `run(self)` - -Runs the swarm by having agents pick up tasks from the queue. - -- **Returns:** - - `str`: JSON string of the swarm run metadata if `return_metadata_on` is True. - -- **Usage Example:** - ```python - from swarms import Agent, TaskQueueSwarm - - - # Initialize agents - agent1 = Agent(agent_name="Agent1", model_name="gpt-4o") - agent2 = Agent(agent_name="Agent2", model_name="gpt-4o") - - # Create the TaskQueueSwarm - swarm = TaskQueueSwarm(agents=[agent1, agent2], max_loops=5) - - # Add tasks to the swarm - swarm.add_task("Analyze the latest market trends") - swarm.add_task("Generate a summary report") - - # Run the swarm - result = swarm.run() - print(result) # Prints the swarm run metadata - ``` - - This example initializes a `TaskQueueSwarm` with two agents, adds tasks to the queue, and runs the swarm. - -### `save_json_to_file(self)` - -Saves the swarm run metadata to a JSON file. - -### `export_metadata(self)` - -Exports the swarm run metadata as a JSON string. - -- **Returns:** - - `str`: JSON string of the swarm run metadata. - -## Additional Notes - -- The `TaskQueueSwarm` uses threading to process tasks concurrently, which can significantly improve performance for I/O-bound tasks. -- The `reliability_checks` method ensures that the swarm is properly configured before running. -- The swarm automatically handles task distribution among agents and provides detailed metadata about the run. -- Error handling and logging are implemented to track the execution flow and capture any issues during task processing. diff --git a/docs/swarms/structs/various_execution_methods.md b/docs/swarms/structs/various_execution_methods.md index 04592a15..0ce985de 100644 --- a/docs/swarms/structs/various_execution_methods.md +++ b/docs/swarms/structs/various_execution_methods.md @@ -1,171 +1,776 @@ -# Concurrent Agents API Reference +# Multi-Agent Execution API Reference -This documentation covers the API for running multiple agents concurrently using various execution strategies. The implementation uses `asyncio` with `uvloop` for enhanced performance and `ThreadPoolExecutor` for handling CPU-bound operations. +This comprehensive documentation covers all functions in the `multi_agent_exec.py` module for running multiple agents using various execution strategies. The module provides synchronous and asynchronous execution methods, optimized performance with uvloop, and utility functions for information retrieval. -## Table of Contents -- [Core Functions](#core-functions) -- [Advanced Functions](#advanced-functions) -- [Utility Functions](#utility-functions) -- [Resource Monitoring](#resource-monitoring) -- [Usage Examples](#usage-examples) +## Function Overview -## Core Functions +| Function | Signature | Category | Description | +|----------|-----------|----------|-------------| +| `run_single_agent` | `run_single_agent(agent, task, *args, **kwargs) -> Any` | Single Agent | Runs a single agent synchronously | +| `run_agent_async` | `run_agent_async(agent, task) -> Any` | Single Agent | Runs a single agent asynchronously using asyncio | +| `run_agents_concurrently_async` | `run_agents_concurrently_async(agents, task) -> List[Any]` | Concurrent Execution | Runs multiple agents concurrently using asyncio | +| `run_agents_concurrently` | `run_agents_concurrently(agents, task, max_workers=None) -> List[Any]` | Concurrent Execution | Optimized concurrent agent runner using ThreadPoolExecutor | +| `run_agents_concurrently_multiprocess` | `run_agents_concurrently_multiprocess(agents, task, batch_size=None) -> List[Any]` | Concurrent Execution | Manages agents concurrently in batches with optimized performance | +| `batched_grid_agent_execution` | `batched_grid_agent_execution(agents, tasks, max_workers=None) -> List[Any]` | Batched & Grid | Runs multiple agents with different tasks concurrently | +| `run_agents_with_different_tasks` | `run_agents_with_different_tasks(agent_task_pairs, batch_size=10, max_workers=None) -> List[Any]` | Batched & Grid | Runs agents with different tasks concurrently in batches | +| `run_agents_concurrently_uvloop` | `run_agents_concurrently_uvloop(agents, task, max_workers=None) -> List[Any]` | UVLoop Optimized | Runs agents concurrently using uvloop for optimized performance | +| `run_agents_with_tasks_uvloop` | `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None) -> List[Any]` | UVLoop Optimized | Runs agents with different tasks using uvloop optimization | +| `get_swarms_info` | `get_swarms_info(swarms) -> str` | Utility | Fetches and formats information about available swarms | +| `get_agents_info` | `get_agents_info(agents, team_name=None) -> str` | Utility | Fetches and formats information about available agents | -### run_agents_concurrently() +## Single Agent Functions -Primary function for running multiple agents concurrently with optimized performance using both uvloop and ThreadPoolExecutor. +### `run_single_agent(agent, task, *args, **kwargs)` -#### Arguments +Runs a single agent synchronously. -| Parameter | Type | Required | Default | Description | -|-------------|----------------|----------|----------------|-------------| -| agents | List[AgentType]| Yes | - | List of Agent instances to run concurrently | -| task | str | Yes | - | Task string to execute | -| batch_size | int | No | CPU count | Number of agents to run in parallel in each batch | -| max_workers | int | No | CPU count * 2 | Maximum number of threads in the executor | +#### Signature +```python +def run_single_agent( + agent: AgentType, + task: str, + *args, + **kwargs +) -> Any +``` + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|-----------|----------|-------------| +| `agent` | `AgentType` | Yes | Agent instance to run | +| `task` | `str` | Yes | Task string to execute | +| `*args` | `Any` | No | Additional positional arguments | +| `**kwargs`| `Any` | No | Additional keyword arguments | #### Returns -`List[Any]`: List of outputs from each agent +- `Any`: Agent execution result + +#### Example -#### Flow Diagram +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_single_agent + +agent = Agent( + agent_name="Financial-Analyst", + system_prompt="You are a financial analysis expert", + model_name="gpt-4o-mini", + max_loops=1 +) -```mermaid -graph TD - A[Start] --> B[Initialize ThreadPoolExecutor] - B --> C[Split Agents into Batches] - C --> D[Process Batch] - D --> E{More Batches?} - E -->|Yes| D - E -->|No| F[Combine Results] - F --> G[Return Results] - - subgraph "Batch Processing" - D --> H[Run Agents Async] - H --> I[Wait for Completion] - I --> J[Collect Batch Results] - end +result = run_single_agent(agent, "Analyze the current stock market trends") +print(result) ``` -### run_agents_sequentially() +### `run_agent_async(agent, task)` -Runs multiple agents sequentially for baseline comparison or simple use cases. +Runs a single agent asynchronously using asyncio. -#### Arguments +#### Signature +```python +async def run_agent_async(agent: AgentType, task: str) -> Any +``` -| Parameter | Type | Required | Default | Description | -|-----------|----------------|----------|---------|-------------| -| agents | List[AgentType]| Yes | - | List of Agent instances to run | -| task | str | Yes | - | Task string to execute | +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|-------------|----------|-------------| +| `agent` | `AgentType` | Yes | Agent instance to run | +| `task` | `str` | Yes | Task string to execute | #### Returns -`List[Any]`: List of outputs from each agent -## Advanced Functions +- `Any`: Agent execution result + +#### Example + +```python +import asyncio +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agent_async + +async def main(): + agent = Agent( + agent_name="Researcher", + system_prompt="You are a research assistant", + model_name="gpt-4o-mini", + max_loops=1 + ) -### run_agents_with_different_tasks() + result = await run_agent_async(agent, "Research AI advancements in 2024") + print(result) -Runs multiple agents with different tasks concurrently. +asyncio.run(main()) +``` -#### Arguments +## Concurrent Execution Functions -| Parameter | Type | Required | Default | Description | -|-----------------|-------------------------------|----------|----------------|-------------| -| agent_task_pairs| List[tuple[AgentType, str]] | Yes | - | List of (agent, task) tuples | -| batch_size | int | No | CPU count | Number of agents to run in parallel | -| max_workers | int | No | CPU count * 2 | Maximum number of threads | +### `run_agents_concurrently_async(agents, task)` -### run_agents_with_timeout() +Runs multiple agents concurrently using asyncio. -Runs multiple agents concurrently with timeout limits. +#### Signature -#### Arguments +```python +async def run_agents_concurrently_async( + agents: List[AgentType], + task: str +) -> List[Any] +``` + +#### Parameters -| Parameter | Type | Required | Default | Description | -|-------------|----------------|----------|----------------|-------------| -| agents | List[AgentType]| Yes | - | List of Agent instances | -| task | str | Yes | - | Task string to execute | -| timeout | float | Yes | - | Timeout in seconds for each agent | -| batch_size | int | No | CPU count | Number of agents to run in parallel | -| max_workers | int | No | CPU count * 2 | Maximum number of threads | +| Parameter | Type | Required | Description | +|-----------|-------------------|----------|-------------| +| `agents` | `List[AgentType]` | Yes | List of Agent instances to run concurrently | +| `task` | `str` | Yes | Task string to execute by all agents | + +#### Returns -## Usage Examples +- `List[Any]`: List of outputs from each agent + +#### Example ```python +import asyncio from swarms.structs.agent import Agent -from swarms.structs.multi_agent_exec import ( - run_agents_concurrently, - run_agents_with_timeout, - run_agents_with_different_tasks -) +from swarms.structs.multi_agent_exec import run_agents_concurrently_async + +async def main(): + agents = [ + Agent( + agent_name=f"Analyst-{i}", + system_prompt="You are a market analyst", + model_name="gpt-4o-mini", + max_loops=1 + ) + for i in range(3) + ] + + task = "Analyze the impact of AI on job markets" + results = await run_agents_concurrently_async(agents, task) + + for i, result in enumerate(results): + print(f"Agent {i+1} result: {result}") + +asyncio.run(main()) +``` + +### `run_agents_concurrently(agents, task, max_workers=None)` + +Optimized concurrent agent runner using ThreadPoolExecutor. + +#### Signature +```python +def run_agents_concurrently( + agents: List[AgentType], + task: str, + max_workers: Optional[int] = None, +) -> List[Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|--------------|-------------------|----------|---------|-------------| +| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run concurrently | +| `task` | `str` | Yes | - | Task string to execute | +| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor | + +#### Returns + +- `List[Any]`: List of outputs from each agent (exceptions included if agents fail) + +#### Example +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agents_concurrently + +# Create multiple agents +agents = [ + Agent( + agent_name="Tech-Analyst", + system_prompt="You are a technology analyst", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="Finance-Analyst", + system_prompt="You are a financial analyst", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="Market-Strategist", + system_prompt="You are a market strategist", + model_name="gpt-4o-mini", + max_loops=1 + ) +] + +task = "Analyze the future of electric vehicles in 2025" +results = run_agents_concurrently(agents, task, max_workers=4) + +for i, result in enumerate(results): + print(f"Agent {i+1} ({agents[i].agent_name}): {result}") +``` + +### `run_agents_concurrently_multiprocess(agents, task, batch_size=None)` + +Manages and runs multiple agents concurrently in batches with optimized performance. + +#### Signature + +```python +def run_agents_concurrently_multiprocess( + agents: List[Agent], + task: str, + batch_size: int = os.cpu_count() +) -> List[Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|--------------|---------------|----------|--------------|-------------| +| `agents` | `List[Agent]` | Yes | - | List of Agent instances to run concurrently | +| `task` | `str` | Yes | - | Task string to execute by all agents | +| `batch_size` | `int` | No | CPU count | Number of agents to run in parallel in each batch | + +#### Returns + +- `List[Any]`: List of outputs from each agent + +#### Example +```python +import os +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agents_concurrently_multiprocess -# Initialize agents using only the built-in model_name parameter agents = [ Agent( - agent_name=f"Analysis-Agent-{i}", - system_prompt="You are a financial analysis expert", + agent_name=f"Research-Agent-{i}", + system_prompt="You are a research specialist", model_name="gpt-4o-mini", max_loops=1 ) for i in range(5) ] -# Basic concurrent execution -task = "Analyze the impact of rising interest rates on tech stocks" -outputs = run_agents_concurrently(agents, task) +task = "Research the benefits of renewable energy" +batch_size = os.cpu_count() # Use all CPU cores +results = run_agents_concurrently_multiprocess(agents, task, batch_size) -# Running with timeout -outputs_with_timeout = run_agents_with_timeout( - agents=agents, - task=task, - timeout=30.0, - batch_size=2 -) +print(f"Completed {len(results)} agent executions") +``` + +## Batched and Grid Execution + +### `batched_grid_agent_execution(agents, tasks, max_workers=None)` + +Runs multiple agents with different tasks concurrently using batched grid execution. + +#### Signature +```python +def batched_grid_agent_execution( + agents: List["AgentType"], + tasks: List[str], + max_workers: int = None, +) -> List[Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|--------------|---------------------|----------|---------|-------------| +| `agents` | `List[AgentType]` | Yes | - | List of agent instances | +| `tasks` | `List[str]` | Yes | - | List of tasks, one for each agent | +| `max_workers`| `int` | No | 90% of CPU cores | Maximum number of threads to use | + +#### Returns +- `List[Any]`: List of results from each agent + +#### Raises +- `ValueError`: If number of agents doesn't match number of tasks + +#### Example +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import batched_grid_agent_execution + +agents = [ + Agent( + agent_name="Data-Scientist", + system_prompt="You are a data science expert", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="ML-Engineer", + system_prompt="You are a machine learning engineer", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="AI-Researcher", + system_prompt="You are an AI researcher", + model_name="gpt-4o-mini", + max_loops=1 + ) +] -# Running different tasks -task_pairs = [ - (agents[0], "Analyze tech stocks"), - (agents[1], "Analyze energy stocks"), - (agents[2], "Analyze retail stocks") +tasks = [ + "Analyze machine learning algorithms performance", + "Design a neural network architecture", + "Research latest AI breakthroughs" ] -different_outputs = run_agents_with_different_tasks(task_pairs) + +results = batched_grid_agent_execution(agents, tasks, max_workers=3) + +for i, result in enumerate(results): + print(f"Task {i+1}: {tasks[i]}") + print(f"Result: {result}\n") ``` -## Resource Monitoring +### `run_agents_with_different_tasks(agent_task_pairs, batch_size=10, max_workers=None)` + +Runs multiple agents with different tasks concurrently, processing them in batches. + +#### Signature +```python +def run_agents_with_different_tasks( + agent_task_pairs: List[tuple["AgentType", str]], + batch_size: int = 10, + max_workers: int = None, +) -> List[Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|--------------------|-----------------------------------|----------|---------|-------------| +| `agent_task_pairs` | `List[tuple[AgentType, str]]` | Yes | - | List of (agent, task) tuples | +| `batch_size` | `int` | No | 10 | Number of agents to run in parallel in each batch | +| `max_workers` | `int` | No | None | Maximum number of threads | + +#### Returns +- `List[Any]`: List of outputs from each agent, in the same order as input pairs + +#### Example +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agents_with_different_tasks + +# Create agents +agents = [ + Agent( + agent_name="Content-Writer", + system_prompt="You are a content writer", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="Editor", + system_prompt="You are an editor", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="SEO-Specialist", + system_prompt="You are an SEO specialist", + model_name="gpt-4o-mini", + max_loops=1 + ) +] + +# Create agent-task pairs +agent_task_pairs = [ + (agents[0], "Write a blog post about sustainable living"), + (agents[1], "Edit and improve this article draft"), + (agents[2], "Optimize this content for SEO") +] -### ResourceMetrics +results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2) -A dataclass for system resource metrics. +for i, result in enumerate(results): + agent, task = agent_task_pairs[i] + print(f"{agent.agent_name} - {task}: {result}") +``` -#### Properties +## UVLoop Optimized Functions -| Property | Type | Description | -|----------------|-------|-------------| -| cpu_percent | float | Current CPU usage percentage | -| memory_percent | float | Current memory usage percentage | -| active_threads | int | Number of active threads | +### `run_agents_concurrently_uvloop(agents, task, max_workers=None)` -### run_agents_with_resource_monitoring() +Runs multiple agents concurrently using uvloop for optimized async performance. -Runs agents with system resource monitoring and adaptive batch sizing. +#### Signature +```python +def run_agents_concurrently_uvloop( + agents: List[AgentType], + task: str, + max_workers: Optional[int] = None, +) -> List[Any] +``` -#### Arguments +#### Parameters -| Parameter | Type | Required | Default | Description | -|------------------|----------------|----------|---------|-------------| -| agents | List[AgentType]| Yes | - | List of Agent instances | -| task | str | Yes | - | Task string to execute | -| cpu_threshold | float | No | 90.0 | Max CPU usage percentage | -| memory_threshold | float | No | 90.0 | Max memory usage percentage | -| check_interval | float | No | 1.0 | Resource check interval in seconds | +| Parameter | Type | Required | Default | Description | +|--------------|---------------------|----------|---------|-------------| +| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run concurrently | +| `task` | `str` | Yes | - | Task string to execute by all agents | +| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads in the executor | -## Performance Considerations +#### Returns +- `List[Any]`: List of outputs from each agent -- Default batch sizes and worker counts are optimized based on CPU cores -- Resource monitoring helps prevent system overload -- Using `uvloop` provides better performance than standard `asyncio` +#### Raises +- `ImportError`: If uvloop is not installed +- `RuntimeError`: If uvloop cannot be set as the event loop policy -## Error Handling +#### Example +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop + +# Note: uvloop must be installed (pip install uvloop) +agents = [ + Agent( + agent_name="Performance-Analyst", + system_prompt="You are a performance analyst", + model_name="gpt-4o-mini", + max_loops=1 + ) + for _ in range(5) +] + +task = "Analyze system performance metrics" +results = run_agents_concurrently_uvloop(agents, task) + +print(f"Processed {len(results)} agents with uvloop optimization") +``` + +### `run_agents_with_tasks_uvloop(agents, tasks, max_workers=None)` + +Runs multiple agents with different tasks concurrently using uvloop. + +#### Signature +```python +def run_agents_with_tasks_uvloop( + agents: List[AgentType], + tasks: List[str], + max_workers: Optional[int] = None, +) -> List[Any] +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|--------------|---------------------|----------|---------|-------------| +| `agents` | `List[AgentType]` | Yes | - | List of Agent instances to run | +| `tasks` | `List[str]` | Yes | - | List of task strings (must match number of agents) | +| `max_workers`| `Optional[int]` | No | 95% of CPU cores | Maximum number of threads | + +#### Returns +- `List[Any]`: List of outputs from each agent + +#### Raises + +- `ValueError`: If number of agents doesn't match number of tasks + +- `ImportError`: If uvloop is not installed + +- `RuntimeError`: If uvloop cannot be set as the event loop policy + +#### Example +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agents_with_tasks_uvloop + +agents = [ + Agent( + agent_name="Data-Analyst-1", + system_prompt="You are a data analyst", + model_name="gpt-4o-mini", + max_loops=1 + ), + Agent( + agent_name="Data-Analyst-2", + system_prompt="You are a data analyst", + model_name="gpt-4o-mini", + max_loops=1 + ) +] + +tasks = [ + "Analyze sales data from Q1 2024", + "Analyze customer satisfaction metrics" +] + +results = run_agents_with_tasks_uvloop(agents, tasks) + +for i, result in enumerate(results): + print(f"Task: {tasks[i]}") + print(f"Result: {result}\n") +``` + +## Utility Functions + +### `get_swarms_info(swarms)` + +Fetches and formats information about all available swarms in the system. + +#### Signature +```python +def get_swarms_info(swarms: List[Callable]) -> str +``` + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|-------------------|----------|-------------| +| `swarms` | `List[Callable]` | Yes | List of swarm objects to get information about | + +#### Returns + +- `str`: Formatted string containing names and descriptions of all swarms + +#### Example +```python +from swarms.structs.multi_agent_exec import get_swarms_info + +# Assuming you have swarm objects +swarms = [ + # Your swarm objects here +] + +info = get_swarms_info(swarms) +print(info) +# Output: +# Available Swarms: +# +# [Swarm 1] +# Name: ResearchSwarm +# Description: A swarm for research tasks +# Length of Agents: 3 +# Swarm Type: hierarchical +``` + +### `get_agents_info(agents, team_name=None)` + +Fetches and formats information about all available agents in the system. + +#### Signature +```python +def get_agents_info( + agents: List[Union[Agent, Callable]], + team_name: str = None +) -> str +``` + +#### Parameters + +| Parameter | Type | Required | Default | Description | +|-------------|-----------------------------------|----------|---------|-------------| +| `agents` | `List[Union[Agent, Callable]]` | Yes | - | List of agent objects to get information about | +| `team_name` | `str` | No | None | Optional team name to display | + +#### Returns + +- `str`: Formatted string containing names and descriptions of all agents + +#### Example + +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import get_agents_info + +agents = [ + Agent( + agent_name="Research-Agent", + system_prompt="You are a research assistant", + model_name="gpt-4o-mini", + max_loops=2, + role="Researcher" + ), + Agent( + agent_name="Analysis-Agent", + system_prompt="You are a data analyst", + model_name="gpt-4o-mini", + max_loops=1, + role="Analyst" + ) +] + +info = get_agents_info(agents, team_name="Data Team") +print(info) +# Output: +# Available Agents for Team: Data Team +# +# [Agent 1] +# Name: Research-Agent +# Description: You are a research assistant +# Role: Researcher +# Model: gpt-4o-mini +# Max Loops: 2 +# +# [Agent 2] +# Name: Analysis-Agent +# Description: You are a data analyst +# Role: Analyst +# Model: gpt-4o-mini +# Max Loops: 1 +``` + +## Complete Usage Examples + +### Advanced Multi-Agent Workflow Example + +```python +import asyncio +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import ( + run_agents_concurrently, + run_agents_with_different_tasks, + batched_grid_agent_execution, + get_agents_info +) + +# Create specialized agents +agents = [ + Agent( + agent_name="Market-Researcher", + system_prompt="You are a market research expert specializing in consumer behavior", + model_name="gpt-4o-mini", + max_loops=1, + role="Researcher" + ), + Agent( + agent_name="Data-Analyst", + system_prompt="You are a data analyst expert in statistical analysis", + model_name="gpt-4o-mini", + max_loops=1, + role="Analyst" + ), + Agent( + agent_name="Strategy-Consultant", + system_prompt="You are a strategy consultant specializing in business development", + model_name="gpt-4o-mini", + max_loops=1, + role="Consultant" + ), + Agent( + agent_name="Financial-Advisor", + system_prompt="You are a financial advisor specializing in investment strategies", + model_name="gpt-4o-mini", + max_loops=1, + role="Advisor" + ) +] + +# Display agent information +print("=== Agent Information ===") +print(get_agents_info(agents, "Business Intelligence Team")) +print("\n" + "="*50 + "\n") + +# Example 1: Same task for all agents +print("=== Example 1: Concurrent Execution with Same Task ===") +task = "Analyze the impact of remote work trends on commercial real estate market in 2024" +results = run_agents_concurrently(agents, task, max_workers=4) + +for i, result in enumerate(results): + print(f"\n{agents[i].agent_name} Analysis:") + print(f"Result: {result}") + +print("\n" + "="*50 + "\n") + +# Example 2: Different tasks for different agents +print("=== Example 2: Different Tasks for Different Agents ===") +agent_task_pairs = [ + (agents[0], "Research consumer preferences for electric vehicles"), + (agents[1], "Analyze sales data for EV market penetration"), + (agents[2], "Develop marketing strategy for EV adoption"), + (agents[3], "Assess financial viability of EV charging infrastructure") +] + +results = run_agents_with_different_tasks(agent_task_pairs, batch_size=2) + +for i, result in enumerate(results): + agent, task = agent_task_pairs[i] + print(f"\n{agent.agent_name} - Task: {task}") + print(f"Result: {result}") + +print("\n" + "="*50 + "\n") + +# Example 3: Grid execution with matched agents and tasks +print("=== Example 3: Batched Grid Execution ===") +grid_agents = agents[:3] # Use first 3 agents +grid_tasks = [ + "Forecast market trends for renewable energy", + "Evaluate risk factors in green technology investments", + "Compare traditional vs sustainable investment portfolios" +] + +grid_results = batched_grid_agent_execution(grid_agents, grid_tasks, max_workers=3) + +for i, result in enumerate(grid_results): + print(f"\nTask {i+1}: {grid_tasks[i]}") + print(f"Agent: {grid_agents[i].agent_name}") + print(f"Result: {result}") + +print("\n=== Workflow Complete ===") +``` + +### Error Handling and Best Practices + +```python +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import run_agents_concurrently +import logging + +# Set up logging +logging.basicConfig(level=logging.INFO) + +# Create agents with error handling +agents = [ + Agent( + agent_name=f"Agent-{i}", + system_prompt="You are a helpful assistant", + model_name="gpt-4o-mini", + max_loops=1 + ) + for i in range(5) +] + +task = "Perform a complex analysis task" + +try: + results = run_agents_concurrently(agents, task, max_workers=4) + + # Handle results (some may be exceptions) + for i, result in enumerate(results): + if isinstance(result, Exception): + print(f"Agent {i+1} failed with error: {result}") + else: + print(f"Agent {i+1} succeeded: {result}") + +except Exception as e: + print(f"Execution failed: {e}") + +# Best practices: +# 1. Always handle exceptions in results +# 2. Use appropriate max_workers based on system resources +# 3. Monitor memory usage for large agent counts +# 4. Consider batch processing for very large numbers of agents +# 5. Use uvloop functions for I/O intensive tasks +``` + +## Performance Considerations -- Functions handle asyncio event loop creation/retrieval -- Timeout mechanism prevents infinite waiting -- Resource monitoring allows for adaptive performance adjustment \ No newline at end of file +| Technique | Best Use Case / Description | +|------------------------|------------------------------------------------------------------------------------| +| **ThreadPoolExecutor** | Best for CPU-bound tasks with moderate I/O | +| **uvloop** | Optimized for I/O-bound tasks, significantly faster than standard asyncio | +| **Batch Processing** | Prevents system overload with large numbers of agents | +| **Resource Monitoring**| Adjust worker counts based on system capabilities | +| **Async/Await** | Use async functions for better concurrency control | diff --git a/examples/multi_agent/auto_swarm_builder_example.py b/examples/multi_agent/asb/auto_swarm_builder_example.py similarity index 100% rename from examples/multi_agent/auto_swarm_builder_example.py rename to examples/multi_agent/asb/auto_swarm_builder_example.py diff --git a/examples/multi_agent/concurrent_examples/concurrent_mix.py b/examples/multi_agent/concurrent_examples/concurrent_mix.py index e072eccb..35f6f033 100644 --- a/examples/multi_agent/concurrent_examples/concurrent_mix.py +++ b/examples/multi_agent/concurrent_examples/concurrent_mix.py @@ -1,15 +1,6 @@ -import os - -from swarm_models import OpenAIChat - -from swarms import Agent, run_agents_with_tasks_concurrently - -# Fetch the OpenAI API key from the environment variable -api_key = os.getenv("OPENAI_API_KEY") - -# Create an instance of the OpenAIChat class -model = OpenAIChat( - openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +from swarms import Agent +from swarms.structs.multi_agent_exec import ( + batched_grid_agent_execution, ) # Initialize agents for different roles @@ -27,7 +18,7 @@ delaware_ccorp_agent = Agent( corporate law and ensure that all hiring practices are in compliance with state and federal regulations. """, - llm=model, + model_name="claude-sonnet-4-20250514", max_loops=1, autosave=False, dashboard=False, @@ -54,7 +45,7 @@ indian_foreign_agent = Agent( implications of hiring foreign nationals and the requirements for obtaining necessary visas and work permits. """, - llm=model, + model_name="claude-sonnet-4-20250514", max_loops=1, autosave=False, dashboard=False, @@ -86,11 +77,10 @@ tasks = [ """, ] -# Run agents with tasks concurrently -results = run_agents_with_tasks_concurrently( - agents, tasks, all_cores=True, device="cpu", no_clusterops=True +results = batched_grid_agent_execution( + agents=agents, + tasks=tasks, ) -# Print the results -# for result in results: -# print(result) +for result in results: + print(result) diff --git a/examples/multi_agent/concurrent_examples/uvloop/README.md b/examples/multi_agent/concurrent_examples/uvloop/README.md new file mode 100644 index 00000000..10aa6eec --- /dev/null +++ b/examples/multi_agent/concurrent_examples/uvloop/README.md @@ -0,0 +1,48 @@ +# uvloop Examples + +This directory contains examples demonstrating the use of uvloop for running multiple agents concurrently with improved performance. + +## Files + +- `utils.py`: Utility functions for creating example agents +- `same_task_example.py`: Example of running multiple agents with the same task +- `different_tasks_example.py`: Example of running agents with different tasks +- `performance_info.py`: Information about uvloop performance benefits +- `run_all_examples.py`: Script to run all examples + +## Prerequisites + +Set your OpenAI API key: +```bash +export OPENAI_API_KEY='your-api-key-here' +``` + +## Usage + +### Individual Examples + +Run a specific example: +```python +from same_task_example import run_same_task_example +results = run_same_task_example() +``` + +### Run All Examples + +```python +from run_all_examples import run_all_uvloop_examples +all_results = run_all_uvloop_examples() +``` + +## Performance Benefits + +uvloop provides: +- ~2-4x faster execution compared to standard asyncio +- Better performance for I/O-bound operations +- Lower latency and higher throughput +- Automatic fallback to asyncio if uvloop is unavailable + +## Functions Used + +- `run_agents_concurrently_uvloop`: For running multiple agents with the same task +- `run_agents_with_tasks_uvloop`: For running agents with different tasks diff --git a/examples/multi_agent/concurrent_examples/uvloop/different_tasks_example.py b/examples/multi_agent/concurrent_examples/uvloop/different_tasks_example.py new file mode 100644 index 00000000..d9369648 --- /dev/null +++ b/examples/multi_agent/concurrent_examples/uvloop/different_tasks_example.py @@ -0,0 +1,66 @@ +""" +Example demonstrating running agents with different tasks using uvloop. + +This example shows how to use run_agents_with_tasks_uvloop to execute +different tasks across multiple agents concurrently. +""" + +import os + +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import ( + run_agents_with_tasks_uvloop, +) + + +def create_example_agents(num_agents: int = 3): + """ + Create example agents for demonstration. + + Args: + num_agents: Number of agents to create + + Returns: + List of Agent instances + """ + agents = [] + for i in range(num_agents): + agent = Agent( + agent_name=f"Agent_{i+1}", + system_prompt=f"You are Agent {i+1}, a helpful AI assistant.", + model_name="gpt-4o-mini", # Using a lightweight model for examples + max_loops=1, + autosave=False, + verbose=False, + ) + agents.append(agent) + return agents + + +def run_different_tasks_example(): + """ + Run agents with different tasks using uvloop. + + Returns: + List of results from each agent + """ + # Check if API key is available + if not os.getenv("OPENAI_API_KEY"): + raise ValueError( + "OPENAI_API_KEY environment variable must be set" + ) + + agents = create_example_agents(3) + tasks = [ + "Explain what machine learning is in simple terms.", + "Describe the benefits of cloud computing.", + "What are the main challenges in natural language processing?", + ] + + results = run_agents_with_tasks_uvloop(agents, tasks) + return results + + +if __name__ == "__main__": + results = run_different_tasks_example() + # Results can be processed further as needed diff --git a/examples/multi_agent/concurrent_examples/uvloop/run_all_examples.py b/examples/multi_agent/concurrent_examples/uvloop/run_all_examples.py new file mode 100644 index 00000000..4ee9696a --- /dev/null +++ b/examples/multi_agent/concurrent_examples/uvloop/run_all_examples.py @@ -0,0 +1,38 @@ +""" +Runner script to execute all uvloop examples. + +This script demonstrates how to run multiple uvloop-based agent execution examples. +""" + +import os +from same_task_example import run_same_task_example +from different_tasks_example import run_different_tasks_example + + +def run_all_uvloop_examples(): + """ + Execute all uvloop examples. + + Returns: + Dictionary containing results from all examples + """ + # Check if API key is available + if not os.getenv("OPENAI_API_KEY"): + raise ValueError( + "OPENAI_API_KEY environment variable must be set" + ) + + results = {} + + # Run same task example + results["same_task"] = run_same_task_example() + + # Run different tasks example + results["different_tasks"] = run_different_tasks_example() + + return results + + +if __name__ == "__main__": + all_results = run_all_uvloop_examples() + # Process results as needed diff --git a/examples/multi_agent/concurrent_examples/uvloop/same_task_example.py b/examples/multi_agent/concurrent_examples/uvloop/same_task_example.py new file mode 100644 index 00000000..8191797c --- /dev/null +++ b/examples/multi_agent/concurrent_examples/uvloop/same_task_example.py @@ -0,0 +1,57 @@ +import os + +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_exec import ( + run_agents_concurrently_uvloop, +) + + +def create_example_agents(num_agents: int = 3): + """ + Create example agents for demonstration. + + Args: + num_agents: Number of agents to create + + Returns: + List of Agent instances + """ + agents = [] + for i in range(num_agents): + agent = Agent( + agent_name=f"Agent_{i+1}", + system_prompt=f"You are Agent {i+1}, a helpful AI assistant.", + model_name="gpt-4o-mini", # Using a lightweight model for examples + max_loops=1, + autosave=False, + verbose=False, + ) + agents.append(agent) + return agents + + +def run_same_task_example(): + """ + Run multiple agents with the same task using uvloop. + + Returns: + List of results from each agent + """ + # Check if API key is available + if not os.getenv("OPENAI_API_KEY"): + raise ValueError( + "OPENAI_API_KEY environment variable must be set" + ) + + agents = create_example_agents(3) + task = ( + "Write a one-sentence summary about artificial intelligence." + ) + + results = run_agents_concurrently_uvloop(agents, task) + return results + + +if __name__ == "__main__": + results = run_same_task_example() + # Results can be processed further as needed diff --git a/examples/multi_agent/agent_communication_examples.py b/examples/multi_agent/utils/agent_communication_examples.py similarity index 100% rename from examples/multi_agent/agent_communication_examples.py rename to examples/multi_agent/utils/agent_communication_examples.py diff --git a/examples/multi_agent/aggregate_example.py b/examples/multi_agent/utils/aggregate_example.py similarity index 100% rename from examples/multi_agent/aggregate_example.py rename to examples/multi_agent/utils/aggregate_example.py diff --git a/examples/multi_agent/utils/uvloop_example.py b/examples/multi_agent/utils/uvloop_example.py new file mode 100644 index 00000000..acc9f70e --- /dev/null +++ b/examples/multi_agent/utils/uvloop_example.py @@ -0,0 +1,122 @@ +""" +Example demonstrating the use of uvloop for running multiple agents concurrently. + +This example shows how to use the new uvloop-based functions: +- run_agents_concurrently_uvloop: For running multiple agents with the same task +- run_agents_with_tasks_uvloop: For running agents with different tasks + +uvloop provides significant performance improvements over standard asyncio, +especially for I/O-bound operations and concurrent task execution. +""" + +import os +from swarms.structs.multi_agent_exec import ( + run_agents_concurrently_uvloop, + run_agents_with_tasks_uvloop, +) +from swarms.structs.agent import Agent + + +def create_example_agents(num_agents: int = 3): + """Create example agents for demonstration.""" + agents = [] + for i in range(num_agents): + agent = Agent( + agent_name=f"Agent_{i+1}", + system_prompt=f"You are Agent {i+1}, a helpful AI assistant.", + model_name="gpt-4o-mini", # Using a lightweight model for examples + max_loops=1, + autosave=False, + verbose=False, + ) + agents.append(agent) + return agents + + +def example_same_task(): + """Example: Running multiple agents with the same task using uvloop.""" + print("=== Example 1: Same Task for All Agents (uvloop) ===") + + agents = create_example_agents(3) + task = ( + "Write a one-sentence summary about artificial intelligence." + ) + + print(f"Running {len(agents)} agents with the same task...") + print(f"Task: {task}") + + try: + results = run_agents_concurrently_uvloop(agents, task) + + print("\nResults:") + for i, result in enumerate(results, 1): + print(f"Agent {i}: {result}") + + except Exception as e: + print(f"Error: {e}") + + +def example_different_tasks(): + """Example: Running agents with different tasks using uvloop.""" + print( + "\n=== Example 2: Different Tasks for Each Agent (uvloop) ===" + ) + + agents = create_example_agents(3) + tasks = [ + "Explain what machine learning is in simple terms.", + "Describe the benefits of cloud computing.", + "What are the main challenges in natural language processing?", + ] + + print(f"Running {len(agents)} agents with different tasks...") + + try: + results = run_agents_with_tasks_uvloop(agents, tasks) + + print("\nResults:") + for i, (result, task) in enumerate(zip(results, tasks), 1): + print(f"Agent {i} (Task: {task[:50]}...):") + print(f" Response: {result}") + print() + + except Exception as e: + print(f"Error: {e}") + + +def performance_comparison(): + """Demonstrate the performance benefit of uvloop vs standard asyncio.""" + print("\n=== Performance Comparison ===") + + # Note: This is a conceptual example. In practice, you'd need to measure actual performance + print("uvloop vs Standard asyncio:") + print("• uvloop: Cython-based event loop, ~2-4x faster") + print("• Better for I/O-bound operations") + print("• Lower latency and higher throughput") + print("• Especially beneficial for concurrent agent execution") + print("• Automatic fallback to asyncio if uvloop unavailable") + + +if __name__ == "__main__": + # Check if API key is available + if not os.getenv("OPENAI_API_KEY"): + print( + "Please set your OPENAI_API_KEY environment variable to run this example." + ) + print("Example: export OPENAI_API_KEY='your-api-key-here'") + exit(1) + + print("🚀 uvloop Multi-Agent Execution Examples") + print("=" * 50) + + # Run examples + example_same_task() + example_different_tasks() + performance_comparison() + + print("\n✅ Examples completed!") + print("\nTo use uvloop functions in your code:") + print( + "from swarms.structs.multi_agent_exec import run_agents_concurrently_uvloop" + ) + print("results = run_agents_concurrently_uvloop(agents, task)") diff --git a/examples/single_agent/external_agents/custom_agent_example.py b/examples/single_agent/external_agents/custom_agent_example.py new file mode 100644 index 00000000..67f315e1 --- /dev/null +++ b/examples/single_agent/external_agents/custom_agent_example.py @@ -0,0 +1,40 @@ +import os + +from dotenv import load_dotenv + +from swarms.structs.custom_agent import CustomAgent + +load_dotenv() + +# Example usage with Anthropic API +if __name__ == "__main__": + # Initialize the agent for Anthropic API + anthropic_agent = CustomAgent( + base_url="https://api.anthropic.com", + endpoint="v1/messages", + headers={ + "x-api-key": os.getenv("ANTHROPIC_API_KEY"), + "anthropic-version": "2023-06-01", + }, + ) + + # Example payload for Anthropic API + payload = { + "model": "claude-3-5-sonnet-20241022", + "max_tokens": 1000, + "messages": [ + { + "role": "user", + "content": "Hello! Can you explain what artaddificial intelligence is?", + } + ], + } + + # Make the request + try: + response = anthropic_agent.run(payload) + print("Anthropic API Response:") + print(response) + print(type(response)) + except Exception as e: + print(f"Error: {e}") diff --git a/pyproject.toml b/pyproject.toml index 9f76b006..8a64472a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,7 @@ mcp = "*" openai = "*" aiohttp = "*" schedule = "*" +uvloop = "*" [tool.poetry.scripts] swarms = "swarms.cli.main:main" diff --git a/requirements.txt b/requirements.txt index 74380a53..5c59f3c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,3 +28,4 @@ mcp numpy openai schedule +uvloop diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 1d65e9f0..a546c070 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -47,16 +47,16 @@ from swarms.structs.meme_agent_persona_generator import ( from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.model_router import ModelRouter from swarms.structs.multi_agent_exec import ( + batched_grid_agent_execution, get_agents_info, get_swarms_info, - run_agent_with_timeout, + run_agent_async, run_agents_concurrently, run_agents_concurrently_async, run_agents_concurrently_multiprocess, - run_agents_sequentially, + run_agents_concurrently_uvloop, run_agents_with_different_tasks, - run_agents_with_resource_monitoring, - run_agents_with_tasks_concurrently, + run_agents_with_tasks_uvloop, run_single_agent, ) from swarms.structs.multi_agent_router import MultiAgentRouter @@ -141,15 +141,15 @@ __all__ = [ "SwarmRouter", "SwarmType", "SwarmRearrange", + "batched_grid_agent_execution", + "run_agent_async", "run_agents_concurrently", "run_agents_concurrently_async", - "run_single_agent", "run_agents_concurrently_multiprocess", - "run_agents_sequentially", + "run_agents_concurrently_uvloop", "run_agents_with_different_tasks", - "run_agent_with_timeout", - "run_agents_with_resource_monitoring", - "run_agents_with_tasks_concurrently", + "run_agents_with_tasks_uvloop", + "run_single_agent", "GroupChat", "expertise_based", "MultiAgentRouter", diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 7ab073eb..00f8dc60 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -332,6 +332,7 @@ class Agent: list_of_pdf: Optional[str] = None, tokenizer: Optional[Any] = None, long_term_memory: Optional[Union[Callable, Any]] = None, + fallback_model_name: Optional[str] = None, preset_stopping_token: Optional[bool] = False, traceback: Optional[Any] = None, traceback_handlers: Optional[Any] = None, @@ -582,6 +583,7 @@ class Agent: self.drop_params = drop_params self.thinking_tokens = thinking_tokens self.reasoning_enabled = reasoning_enabled + self.fallback_model_name = fallback_model_name # self.init_handling() self.setup_config() @@ -2633,7 +2635,7 @@ class Agent: """ return [ self.run(task=task, imgs=imgs, *args, **kwargs) - for task in tasks + for task, imgs in zip(tasks, imgs) ] def handle_artifacts( diff --git a/swarms/structs/custom_agent.py b/swarms/structs/custom_agent.py new file mode 100644 index 00000000..b4c3617e --- /dev/null +++ b/swarms/structs/custom_agent.py @@ -0,0 +1,344 @@ +import json +from dataclasses import dataclass +from typing import Any, Dict, Optional, Union + +import httpx +from loguru import logger + + +@dataclass +class AgentResponse: + """Data class to hold agent response information""" + + status_code: int + content: str + headers: Dict[str, str] + json_data: Optional[Dict[str, Any]] = None + success: bool = False + error_message: Optional[str] = None + + +class CustomAgent: + """ + A custom HTTP agent class for making POST requests using httpx. + + Features: + - Configurable headers and payload + - Both sync and async execution + - Built-in error handling and logging + - Flexible response handling + - Name and description + """ + + def __init__( + self, + name: str, + description: str, + base_url: str, + endpoint: str, + headers: Optional[Dict[str, str]] = None, + timeout: float = 30.0, + verify_ssl: bool = True, + *args, + **kwargs, + ): + """ + Initialize the Custom Agent. + + Args: + base_url: Base URL for the API endpoint + endpoint: API endpoint path + headers: Default headers to include in requests + timeout: Request timeout in seconds + verify_ssl: Whether to verify SSL certificates + """ + self.base_url = base_url.rstrip("/") + self.endpoint = endpoint.lstrip("/") + self.default_headers = headers or {} + self.timeout = timeout + self.verify_ssl = verify_ssl + + # Default headers + if "Content-Type" not in self.default_headers: + self.default_headers["Content-Type"] = "application/json" + + logger.info( + f"CustomAgent initialized for {self.base_url}/{self.endpoint}" + ) + + def _prepare_headers( + self, additional_headers: Optional[Dict[str, str]] = None + ) -> Dict[str, str]: + """Merge default headers with additional headers.""" + headers = self.default_headers.copy() + if additional_headers: + headers.update(additional_headers) + return headers + + def _prepare_payload( + self, payload: Union[Dict, str, bytes] + ) -> Union[str, bytes]: + """Prepare the payload for the request.""" + if isinstance(payload, dict): + return json.dumps(payload) + return payload + + def _parse_response( + self, response: httpx.Response + ) -> AgentResponse: + """Parse httpx response into AgentResponse object.""" + try: + # Try to parse JSON if possible + json_data = None + if response.headers.get("content-type", "").startswith( + "application/json" + ): + try: + json_data = response.json() + except json.JSONDecodeError: + pass + + return AgentResponse( + status_code=response.status_code, + content=response.text, + headers=dict(response.headers), + json_data=json_data, + success=200 <= response.status_code < 300, + error_message=( + None + if 200 <= response.status_code < 300 + else f"HTTP {response.status_code}" + ), + ) + except Exception as e: + logger.error(f"Error parsing response: {e}") + return AgentResponse( + status_code=response.status_code, + content=response.text, + headers=dict(response.headers), + success=False, + error_message=str(e), + ) + + def _extract_content(self, response_data: Dict[str, Any]) -> str: + """ + Extract message content from API response, supporting multiple formats. + + Args: + response_data: Parsed JSON response from API + + Returns: + str: Extracted message content + """ + try: + # OpenAI format + if ( + "choices" in response_data + and response_data["choices"] + ): + choice = response_data["choices"][0] + if ( + "message" in choice + and "content" in choice["message"] + ): + return choice["message"]["content"] + elif "text" in choice: + return choice["text"] + + # Anthropic format + elif ( + "content" in response_data + and response_data["content"] + ): + if isinstance(response_data["content"], list): + # Extract text from content blocks + text_parts = [] + for content_block in response_data["content"]: + if ( + isinstance(content_block, dict) + and "text" in content_block + ): + text_parts.append(content_block["text"]) + elif isinstance(content_block, str): + text_parts.append(content_block) + return "".join(text_parts) + elif isinstance(response_data["content"], str): + return response_data["content"] + + # Generic fallback - look for common content fields + elif "text" in response_data: + return response_data["text"] + elif "message" in response_data: + return response_data["message"] + elif "response" in response_data: + return response_data["response"] + + # If no known format, return the entire response as JSON string + logger.warning( + "Unknown response format, returning full response" + ) + return json.dumps(response_data, indent=2) + + except Exception as e: + logger.error(f"Error extracting content: {e}") + return json.dumps(response_data, indent=2) + + def run( + self, + payload: Union[Dict[str, Any], str, bytes], + additional_headers: Optional[Dict[str, str]] = None, + **kwargs, + ) -> str: + """ + Execute a synchronous POST request. + + Args: + payload: Request body/payload + additional_headers: Additional headers for this request + **kwargs: Additional httpx client options + + Returns: + str: Extracted message content from response + """ + url = f"{self.base_url}/{self.endpoint}" + request_headers = self._prepare_headers(additional_headers) + request_payload = self._prepare_payload(payload) + + logger.info(f"Making POST request to: {url}") + + try: + with httpx.Client( + timeout=self.timeout, verify=self.verify_ssl, **kwargs + ) as client: + response = client.post( + url, + content=request_payload, + headers=request_headers, + ) + + if 200 <= response.status_code < 300: + logger.info( + f"Request successful: {response.status_code}" + ) + try: + response_data = response.json() + return self._extract_content(response_data) + except json.JSONDecodeError: + logger.warning( + "Response is not JSON, returning raw text" + ) + return response.text + else: + logger.warning( + f"Request failed: {response.status_code}" + ) + return f"Error: HTTP {response.status_code} - {response.text}" + + except httpx.RequestError as e: + logger.error(f"Request error: {e}") + return f"Request error: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error: {e}") + return f"Unexpected error: {str(e)}" + + async def run_async( + self, + payload: Union[Dict[str, Any], str, bytes], + additional_headers: Optional[Dict[str, str]] = None, + **kwargs, + ) -> str: + """ + Execute an asynchronous POST request. + + Args: + payload: Request body/payload + additional_headers: Additional headers for this request + **kwargs: Additional httpx client options + + Returns: + str: Extracted message content from response + """ + url = f"{self.base_url}/{self.endpoint}" + request_headers = self._prepare_headers(additional_headers) + request_payload = self._prepare_payload(payload) + + logger.info(f"Making async POST request to: {url}") + + try: + async with httpx.AsyncClient( + timeout=self.timeout, verify=self.verify_ssl, **kwargs + ) as client: + response = await client.post( + url, + content=request_payload, + headers=request_headers, + ) + + if 200 <= response.status_code < 300: + logger.info( + f"Async request successful: {response.status_code}" + ) + try: + response_data = response.json() + return self._extract_content(response_data) + except json.JSONDecodeError: + logger.warning( + "Async response is not JSON, returning raw text" + ) + return response.text + else: + logger.warning( + f"Async request failed: {response.status_code}" + ) + return f"Error: HTTP {response.status_code} - {response.text}" + + except httpx.RequestError as e: + logger.error(f"Async request error: {e}") + return f"Request error: {str(e)}" + except Exception as e: + logger.error(f"Unexpected async error: {e}") + return f"Unexpected error: {str(e)}" + + +# # Example usage with Anthropic API +# if __name__ == "__main__": +# # Initialize the agent for Anthropic API +# anthropic_agent = CustomAgent( +# base_url="https://api.anthropic.com", +# endpoint="v1/messages", +# headers={ +# "x-api-key": "your-anthropic-api-key-here", +# "anthropic-version": "2023-06-01" +# } +# ) + +# # Example payload for Anthropic API +# payload = { +# "model": "claude-3-sonnet-20240229", +# "max_tokens": 1000, +# "messages": [ +# { +# "role": "user", +# "content": "Hello! Can you explain what artificial intelligence is?" +# } +# ] +# } + +# # Make the request +# try: +# response = anthropic_agent.run(payload) +# print("Anthropic API Response:") +# print(response) +# except Exception as e: +# print(f"Error: {e}") + +# # Example with async usage +# # import asyncio +# # +# # async def async_example(): +# # response = await anthropic_agent.run_async(payload) +# # print("Async Anthropic API Response:") +# # print(response) +# # +# # Uncomment to run async example +# # asyncio.run(async_example()) diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index d4119a7a..437cd79c 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -1,25 +1,16 @@ -import concurrent.futures import asyncio +import concurrent.futures import os -import threading from concurrent.futures import ( ThreadPoolExecutor, ) -from dataclasses import dataclass from typing import Any, Callable, List, Optional, Union -import psutil +import uvloop +from loguru import logger from swarms.structs.agent import Agent from swarms.structs.omni_agent_types import AgentType -from loguru import logger - - -@dataclass -class ResourceMetrics: - cpu_percent: float - memory_percent: float - active_threads: int def run_single_agent( @@ -29,42 +20,38 @@ def run_single_agent( return agent.run(task=task, *args, **kwargs) -async def run_agent_async( - agent: AgentType, task: str, executor: ThreadPoolExecutor -) -> Any: +async def run_agent_async(agent: AgentType, task: str) -> Any: """ - Run an agent asynchronously using a thread executor. + Run an agent asynchronously. Args: agent: Agent instance to run task: Task string to execute - executor: ThreadPoolExecutor instance for handling CPU-bound operations Returns: Agent execution result """ loop = asyncio.get_event_loop() return await loop.run_in_executor( - executor, run_single_agent, agent, task + None, run_single_agent, agent, task ) async def run_agents_concurrently_async( - agents: List[AgentType], task: str, executor: ThreadPoolExecutor + agents: List[AgentType], task: str ) -> List[Any]: """ - Run multiple agents concurrently using asyncio and thread executor. + Run multiple agents concurrently using asyncio. Args: agents: List of Agent instances to run concurrently task: Task string to execute - executor: ThreadPoolExecutor for CPU-bound operations Returns: List of outputs from each agent """ results = await asyncio.gather( - *(run_agent_async(agent, task, executor) for agent in agents) + *(run_agent_async(agent, task) for agent in agents) ) return results @@ -142,14 +129,23 @@ def run_agents_concurrently_multiprocess( def batched_grid_agent_execution( - agents: List[AgentType], + agents: List["AgentType"], tasks: List[str], + max_workers: int = None, ) -> List[Any]: """ Run multiple agents with different tasks concurrently. + + Args: + agents (List[AgentType]): List of agent instances. + tasks (List[str]): List of tasks, one for each agent. + max_workers (int, optional): Maximum number of threads to use. Defaults to 90% of CPU cores. + + Returns: + List[Any]: List of results from each agent. """ logger.info( - f"Batch Grid Execution with {len(agents)} and number of tasks: {len(tasks)}" + f"Batch Grid Execution with {len(agents)} agents and number of tasks: {len(tasks)}" ) if len(agents) != len(tasks): @@ -157,289 +153,256 @@ def batched_grid_agent_execution( "The number of agents must match the number of tasks." ) - results = [] + # 90% of the available CPU cores + max_workers = max_workers or int(os.cpu_count() * 0.9) - for agent, task in zip(agents, tasks): - result = run_single_agent(agent, task) - results.append(result) + results = [None] * len(agents) + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + future_to_index = { + executor.submit(run_single_agent, agent, task): idx + for idx, (agent, task) in enumerate(zip(agents, tasks)) + } + for future in concurrent.futures.as_completed( + future_to_index + ): + idx = future_to_index[future] + try: + results[idx] = future.result() + except Exception as e: + results[idx] = e return results -def run_agents_sequentially( - agents: List[AgentType], task: str -) -> List[Any]: - """ - Run multiple agents sequentially for baseline comparison. - - Args: - agents: List of Agent instances to run - task: Task string to execute - - Returns: - List of outputs from each agent - """ - return [run_single_agent(agent, task) for agent in agents] - - def run_agents_with_different_tasks( - agent_task_pairs: List[tuple[AgentType, str]], - batch_size: int = None, + agent_task_pairs: List[tuple["AgentType", str]], + batch_size: int = 10, max_workers: int = None, ) -> List[Any]: """ - Run multiple agents with different tasks concurrently. + Run multiple agents with different tasks concurrently, processing them in batches. + + This function executes each agent on its corresponding task, processing the agent-task pairs in batches + of size `batch_size` for efficient resource utilization. Args: - agent_task_pairs: List of (agent, task) tuples - batch_size: Number of agents to run in parallel - max_workers: Maximum number of threads + agent_task_pairs: List of (agent, task) tuples. + batch_size: Number of agents to run in parallel in each batch. + max_workers: Maximum number of threads. Returns: - List of outputs from each agent + List of outputs from each agent, in the same order as the input pairs. """ + if not agent_task_pairs: + return [] - async def run_pair_async( - pair: tuple[AgentType, str], executor: ThreadPoolExecutor - ) -> Any: - agent, task = pair - return await run_agent_async(agent, task, executor) - - cpu_cores = os.cpu_count() - batch_size = batch_size or cpu_cores - max_workers = max_workers or cpu_cores * 2 results = [] - - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - for i in range(0, len(agent_task_pairs), batch_size): - batch = agent_task_pairs[i : i + batch_size] - batch_results = loop.run_until_complete( - asyncio.gather( - *( - run_pair_async(pair, executor) - for pair in batch - ) - ) - ) - results.extend(batch_results) - + total_pairs = len(agent_task_pairs) + for i in range(0, total_pairs, batch_size): + batch = agent_task_pairs[i : i + batch_size] + agents, tasks = zip(*batch) + batch_results = batched_grid_agent_execution( + list(agents), list(tasks), max_workers=max_workers + ) + results.extend(batch_results) return results -async def run_agent_with_timeout( - agent: AgentType, +def run_agents_concurrently_uvloop( + agents: List[AgentType], task: str, - timeout: float, - executor: ThreadPoolExecutor, -) -> Any: + max_workers: Optional[int] = None, +) -> List[Any]: """ - Run an agent with a timeout limit. + Run multiple agents concurrently using uvloop for optimized async performance. + + uvloop is a fast, drop-in replacement for asyncio's event loop, implemented in Cython. + It's designed to be significantly faster than the standard asyncio event loop, + especially beneficial for I/O-bound tasks and concurrent operations. Args: - agent: Agent instance to run - task: Task string to execute - timeout: Timeout in seconds - executor: ThreadPoolExecutor instance + agents: List of Agent instances to run concurrently + task: Task string to execute by all agents + max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores) Returns: - Agent execution result or None if timeout occurs + List of outputs from each agent + + Raises: + ImportError: If uvloop is not installed + RuntimeError: If uvloop cannot be set as the event loop policy """ try: - return await asyncio.wait_for( - run_agent_async(agent, task, executor), timeout=timeout + # Set uvloop as the default event loop policy for better performance + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + except ImportError: + logger.warning( + "uvloop not available, falling back to standard asyncio. " + "Install uvloop with: pip install uvloop" + ) + except RuntimeError as e: + logger.warning( + f"Could not set uvloop policy: {e}. Using default asyncio." ) - except asyncio.TimeoutError: - return None + if max_workers is None: + # Use 95% of available CPU cores for optimal performance + num_cores = os.cpu_count() + max_workers = int(num_cores * 0.95) if num_cores else 1 -def run_agents_with_timeout( - agents: List[AgentType], - task: str, - timeout: float, - batch_size: int = None, - max_workers: int = None, -) -> List[Any]: - """ - Run multiple agents concurrently with a timeout for each agent. + logger.info( + f"Running {len(agents)} agents concurrently with uvloop (max_workers: {max_workers})" + ) - Args: - agents: List of Agent instances - task: Task string to execute - timeout: Timeout in seconds for each agent - batch_size: Number of agents to run in parallel - max_workers: Maximum number of threads + async def run_agents_async(): + """Inner async function to handle the concurrent execution.""" + results = [] - Returns: - List of outputs (None for timed out agents) - """ - cpu_cores = os.cpu_count() - batch_size = batch_size or cpu_cores - max_workers = max_workers or cpu_cores * 2 - results = [] + def run_agent_sync(agent: AgentType) -> Any: + """Synchronous wrapper for agent execution.""" + return agent.run(task=task) - try: loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - for i in range(0, len(agents), batch_size): - batch = agents[i : i + batch_size] - batch_results = loop.run_until_complete( - asyncio.gather( - *( - run_agent_with_timeout( - agent, task, timeout, executor - ) - for agent in batch - ) - ) - ) - results.extend(batch_results) - return results - - -def get_system_metrics() -> ResourceMetrics: - """Get current system resource usage""" - return ResourceMetrics( - cpu_percent=psutil.cpu_percent(), - memory_percent=psutil.virtual_memory().percent, - active_threads=threading.active_count(), - ) - - -def run_agents_with_resource_monitoring( - agents: List[AgentType], - task: str, - cpu_threshold: float = 90.0, - memory_threshold: float = 90.0, - check_interval: float = 1.0, -) -> List[Any]: - """ - Run agents with system resource monitoring and adaptive batch sizing. + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Create tasks for all agents + tasks = [ + loop.run_in_executor(executor, run_agent_sync, agent) + for agent in agents + ] - Args: - agents: List of Agent instances - task: Task string to execute - cpu_threshold: Max CPU usage percentage - memory_threshold: Max memory usage percentage - check_interval: Resource check interval in seconds + # Wait for all tasks to complete and collect results + completed_tasks = await asyncio.gather( + *tasks, return_exceptions=True + ) - Returns: - List of outputs from each agent - """ + # Handle results and exceptions + for i, result in enumerate(completed_tasks): + if isinstance(result, Exception): + logger.error( + f"Agent {i+1} failed with error: {result}" + ) + results.append(result) + else: + results.append(result) - async def monitor_resources(): - while True: - metrics = get_system_metrics() - if ( - metrics.cpu_percent > cpu_threshold - or metrics.memory_percent > memory_threshold - ): - # Reduce batch size or pause execution - pass - await asyncio.sleep(check_interval) + return results - # Implementation details... + # Run the async function + try: + return asyncio.run(run_agents_async()) + except RuntimeError as e: + if "already running" in str(e).lower(): + # Handle case where event loop is already running + logger.warning( + "Event loop already running, using get_event_loop()" + ) + loop = asyncio.get_event_loop() + return loop.run_until_complete(run_agents_async()) + else: + raise -def _run_agents_with_tasks_concurrently( +def run_agents_with_tasks_uvloop( agents: List[AgentType], - tasks: List[str] = [], - batch_size: int = None, - max_workers: int = None, + tasks: List[str], + max_workers: Optional[int] = None, ) -> List[Any]: """ - Run multiple agents with corresponding tasks concurrently. + Run multiple agents with different tasks concurrently using uvloop. + + This function pairs each agent with a specific task and runs them concurrently + using uvloop for optimized performance. Args: agents: List of Agent instances to run - tasks: List of task strings to execute - batch_size: Number of agents to run in parallel - max_workers: Maximum number of threads + tasks: List of task strings (must match number of agents) + max_workers: Maximum number of threads (defaults to 95% of CPU cores) Returns: List of outputs from each agent + + Raises: + ValueError: If number of agents doesn't match number of tasks """ if len(agents) != len(tasks): raise ValueError( - "The number of agents must match the number of tasks." + f"Number of agents ({len(agents)}) must match number of tasks ({len(tasks)})" ) - cpu_cores = os.cpu_count() - batch_size = batch_size or cpu_cores - max_workers = max_workers or cpu_cores * 2 - results = [] - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - async def run_agent_task_pair( - agent: AgentType, task: str, executor: ThreadPoolExecutor - ) -> Any: - return await run_agent_async(agent, task, executor) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - for i in range(0, len(agents), batch_size): - batch_agents = agents[i : i + batch_size] - batch_tasks = tasks[i : i + batch_size] - batch_results = loop.run_until_complete( - asyncio.gather( - *( - run_agent_task_pair(agent, task, executor) - for agent, task in zip( - batch_agents, batch_tasks - ) - ) - ) - ) - results.extend(batch_results) + # Set uvloop as the default event loop policy + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + except ImportError: + logger.warning( + "uvloop not available, falling back to standard asyncio. " + "Install uvloop with: pip install uvloop" + ) + except RuntimeError as e: + logger.warning( + f"Could not set uvloop policy: {e}. Using default asyncio." + ) - return results + if max_workers is None: + num_cores = os.cpu_count() + max_workers = int(num_cores * 0.95) if num_cores else 1 + logger.inufo( + f"Running {len(agents)} agents with {len(tasks)} tasks using uvloop (max_workers: {max_workers})" + ) -def run_agents_with_tasks_concurrently( - agents: List[AgentType], - tasks: List[str] = [], - batch_size: int = None, - max_workers: int = None, - device: str = "cpu", - device_id: int = 1, - all_cores: bool = True, - no_clusterops: bool = False, -) -> List[Any]: - """ - Executes a list of agents with their corresponding tasks concurrently on a specified device. + async def run_agents_with_tasks_async(): + """Inner async function to handle concurrent execution with different tasks.""" + results = [] - This function orchestrates the concurrent execution of a list of agents with their respective tasks on a specified device, either CPU or GPU. It leverages the `exec_callable_with_clusterops` function to manage the execution on the specified device. + def run_agent_task_sync(agent: AgentType, task: str) -> Any: + """Synchronous wrapper for agent execution with specific task.""" + return agent.run(task=task) - Args: - agents (List[AgentType]): A list of Agent instances or callable functions to execute concurrently. - tasks (List[str], optional): A list of task strings to execute for each agent. Defaults to an empty list. - batch_size (int, optional): The number of agents to run in parallel. Defaults to None. - max_workers (int, optional): The maximum number of threads to use for execution. Defaults to None. - device (str, optional): The device to use for execution. Defaults to "cpu". - device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0. - all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True. + loop = asyncio.get_event_loop() - Returns: - List[Any]: A list of outputs from each agent execution. - """ - # Make the first agent not use the ifrs - return _run_agents_with_tasks_concurrently( - agents, tasks, batch_size, max_workers - ) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Create tasks for agent-task pairs + tasks_async = [ + loop.run_in_executor( + executor, run_agent_task_sync, agent, task + ) + for agent, task in zip(agents, tasks) + ] + + # Wait for all tasks to complete + completed_tasks = await asyncio.gather( + *tasks_async, return_exceptions=True + ) + + # Handle results and exceptions + for i, result in enumerate(completed_tasks): + if isinstance(result, Exception): + logger.error( + f"Agent {i+1} (task: {tasks[i][:50]}...) failed with error: {result}" + ) + results.append(result) + else: + results.append(result) + + return results + + # Run the async function + try: + return asyncio.run(run_agents_with_tasks_async()) + except RuntimeError as e: + if "already running" in str(e).lower(): + logger.warning( + "Event loop already running, using get_event_loop()" + ) + loop = asyncio.get_event_loop() + return loop.run_until_complete( + run_agents_with_tasks_async() + ) + else: + raise def get_swarms_info(swarms: List[Callable]) -> str: diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index c5b891be..d3229b0a 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -429,8 +429,7 @@ class SwarmRouter: *args, **kwargs, ) - - + def _create_batched_grid_workflow(self, *args, **kwargs): """Factory function for BatchedGridWorkflow.""" return BatchedGridWorkflow( diff --git a/swarms/utils/best_models.py b/swarms/utils/best_models.py new file mode 100644 index 00000000..49a77930 --- /dev/null +++ b/swarms/utils/best_models.py @@ -0,0 +1,88 @@ +# Best LLM Models by Task Type +# Simplified dictionary structure with model names and categories + +best_models = { + "Vision": [ + {"model": "gemini/gemini-2.5-pro", "category": "Vision"}, + ], + "text-generation": [ + { + "model": "claude-sonnet-4-20250514", + "category": "text-generation", + }, + {"model": "gpt-5-chat", "category": "text-generation"}, + ], +} + + +# Function to get all models for a task type +def get_models_by_task(task_type: str) -> list: + """ + Get all models for a specific task type. + + Args: + task_type (str): The task category (e.g., 'WebDev', 'Vision', 'text-generation') + + Returns: + list: List of all models for the task type + """ + if task_type not in best_models: + raise ValueError( + f"Task type '{task_type}' not found. Available types: {list(best_models.keys())}" + ) + + return best_models[task_type] + + +# Function to get the first model for a task type (simplified from get_top_model) +def get_first_model(task_type: str) -> dict: + """ + Get the first model for a specific task type. + + Args: + task_type (str): The task category (e.g., 'WebDev', 'Vision', 'text-generation') + + Returns: + dict: First model information with model name and category + """ + if task_type not in best_models: + raise ValueError( + f"Task type '{task_type}' not found. Available types: {list(best_models.keys())}" + ) + + models = best_models[task_type] + if not models: + raise ValueError( + f"No models found for task type '{task_type}'" + ) + + return models[0] + + +# Function to search for a specific model across all categories +def find_model_by_name(model_name: str) -> dict: + """ + Find a model by name across all task categories. + + Args: + model_name (str): The model name to search for + + Returns: + dict: Model information if found, None otherwise + """ + for task_type, models in best_models.items(): + for model in models: + if model["model"].lower() == model_name.lower(): + return model + return None + + +# Function to get all available task types +def get_available_task_types() -> list: + """ + Get all available task types/categories. + + Returns: + list: List of all task type names + """ + return list(best_models.keys())