[Improve MOA] [Improve alt swarms] [Improvement][run_agents_concurrently] [Description: aadded output agent types and image handling] [delete meme agent personator agent]

pull/1138/head
Kye Gomez 3 days ago
parent 28aa8ef4f0
commit b8955fbf7e

@ -0,0 +1,187 @@
# MCP Tools Bug Fix Test Scripts
This directory contains test scripts to verify the fix for the MCP (Model Context Protocol) tools integration bug.
## Bug Description
**Issue**: `TypeError: object of type 'Function' has no len()`
**Location**: `swarms/utils/litellm_wrapper.py` in the `output_for_tools` method
**Root Cause**: The code was incorrectly trying to call `len()` on a `Function` object instead of checking the length of the `tool_calls` array.
## Test Scripts
### 1. `test_mcp_bug_fix.py` - Simple Bug Fix Test
A focused test script that specifically reproduces the exact scenario from the bug report.
**Features**:
- Tests the specific error scenario that was failing
- Verifies the fix handles both single and multiple tool calls
- Provides clear pass/fail results
**Usage**:
```bash
python test_mcp_bug_fix.py
```
### 2. `test_mcp_tools_example.py` - Comprehensive Test Suite
A comprehensive test suite that covers various aspects of MCP tools integration.
**Features**:
- Basic tool fetching test
- Multiple MCP servers test
- Agent execution with MCP tools
- Error handling scenarios
- Performance testing
- Detailed reporting
**Usage**:
```bash
python test_mcp_tools_example.py
```
## Prerequisites
Before running the tests, you need to start the MCP server:
1. **Start the OKX Crypto Server**:
```bash
python examples/mcp/multi_mcp_guide/okx_crypto_server.py
```
This will start the server on `http://0.0.0.0:8001/mcp`
2. **Install Required Dependencies**:
```bash
pip install swarms mcp fastmcp requests
```
## Expected Results
### Before the Fix
- ❌ `TypeError: object of type 'Function' has no len()`
- ❌ Agent execution fails when using MCP tools
- ❌ MCP tool calls cannot be processed
### After the Fix
- ✅ Tools are fetched successfully
- ✅ Agent can execute tasks using MCP tools
- ✅ Both single and multiple tool calls work correctly
- ✅ No TypeError occurs
## Test Scenarios
### Basic Functionality
1. **Tool Fetching**: Verify MCP tools can be retrieved from the server
2. **Agent Creation**: Verify agents can be created with MCP tool integration
3. **Tool Execution**: Verify agents can execute tasks that use MCP tools
### Error Handling
1. **Invalid Server URL**: Test behavior with non-existent server
2. **Invalid Authentication**: Test behavior with wrong credentials
3. **Network Timeouts**: Test behavior with connection timeouts
### Edge Cases
1. **Single Tool Call**: Verify single tool call processing
2. **Multiple Tool Calls**: Verify multiple tool call processing
3. **Empty Responses**: Test behavior with empty tool responses
## Sample Output
### Successful Test Run
```
🚀 MCP Bug Fix Test
This test verifies the fix for the TypeError in MCP tool usage.
Make sure the OKX crypto server is running on port 8001.
🐛 Testing MCP Bug Fix
========================================
1. Fetching MCP tools...
✅ Successfully fetched 2 tools
2. Creating agent with MCP tools...
✅ Agent created successfully
3. Running task with MCP tools...
Task: Get Bitcoin trading volume using get_okx_crypto_volume tool
✅ Task completed successfully!
Result: [Tool execution result with Bitcoin volume data]
========================================
📊 TEST SUMMARY
========================================
✅ Main bug fix: PASSED
The TypeError: object of type 'Function' has no len() is fixed!
✅ Multiple tool calls: PASSED
🎉 ALL TESTS PASSED!
The MCP tools integration is working correctly.
```
## Troubleshooting
### Common Issues
1. **Server Not Running**:
```
Error: Connection refused
Solution: Start the OKX crypto server first
```
2. **Port Already in Use**:
```
Error: Address already in use
Solution: Change the port in the server script or kill existing processes
```
3. **Authentication Error**:
```
Error: 401 Unauthorized
Solution: Check the Authorization header in the connection
```
### Debug Mode
To get more detailed output, you can modify the test scripts to enable verbose logging:
```python
# In the test scripts, add:
import logging
logging.basicConfig(level=logging.DEBUG)
```
## Code Changes Made
The fix involved modifying the `output_for_tools` method in `swarms/utils/litellm_wrapper.py`:
**Before** (buggy code):
```python
if self.mcp_call is True:
out = response.choices[0].message.tool_calls[0].function
if len(out) > 1: # ❌ Error: Function objects don't have len()
return out
else:
out = out[0]
```
**After** (fixed code):
```python
if self.mcp_call is True:
tool_calls = response.choices[0].message.tool_calls
if len(tool_calls) > 1: # ✅ Correct: Check tool_calls length
# Handle multiple tool calls
return [...]
else:
# Handle single tool call
out = tool_calls[0].function
```
## Contributing
If you find any issues with these test scripts or the MCP tools integration, please:
1. Run the test scripts to reproduce the issue
2. Check the server logs for additional error information
3. Report the issue with the test output and error details
4. Include your environment details (Python version, OS, etc.)

@ -1,17 +0,0 @@
from swarms.structs.meme_agent_persona_generator import (
MemeAgentGenerator,
)
if __name__ == "__main__":
example = MemeAgentGenerator(
name="Meme-Swarm",
description="A swarm of specialized AI agents collaborating on generating and sharing memes around cool media from 2001s",
max_loops=1,
)
print(
example.run(
"Generate funny meme agents around cool media from 2001s"
)
)

@ -45,7 +45,7 @@
# # print(aggregator_system_prompt.get_prompt()) # # print(aggregator_system_prompt.get_prompt())
aggregator_system_prompt_main = """ AGGREGATOR_SYSTEM_PROMPT_MAIN = """
# Multi-Agent Observer and Summarizer # Multi-Agent Observer and Summarizer
@ -82,4 +82,4 @@ aggregator_system_prompt_main = """
5. Potential improvements or areas for further exploration 5. Potential improvements or areas for further exploration
Remember: Your role is crucial in distilling complex mult-agent interactions into actionable insights. Strive for clarity, accuracy, and impartiality in all your summaries. Remember: Your role is crucial in distilling complex mult-agent interactions into actionable insights. Strive for clarity, accuracy, and impartiality in all your summaries.
""" """

@ -0,0 +1,60 @@
MOA_RANKER_PROMPT = """
You are a highly efficient assistant who evaluates and selects the best large language model (LLMs) based on the quality of their responses to a given instruction. This process will be used to create a leaderboard reflecting the most accurate and human-preferred answers.
I require a leaderboard for various large language models. I'll provide you with prompts given to these models and their corresponding outputs. Your task is to assess these responses and select the model that produces the best output from a human perspective.
## Instruction
{
"instruction": "{instruction}"
}
## Model Outputs
Here are the unordered outputs from the models. Each output is associated with a specific model, identified by a unique model identifier.
[
{
"model_identifier": "{identifier_1}",
"output": "{output_1}"
},
{
"model_identifier": "{identifier_2}",
"output": "{output_2}"
},
{
"model_identifier": "{identifier_3}",
"output": "{output_3}"
},
{
"model_identifier": "{identifier_4}",
"output": "{output_4}"
},
{
"model_identifier": "{identifier_5}",
"output": "{output_5}"
},
{
"model_identifier": "{identifier_6}",
"output": "{output_6}"
}
]
## Task
Evaluate the models based on the quality and relevance of their outputs and select the model that generated the best output. Answer by providing the model identifier of the best model. We will use your output as the name of the best model, so make sure your output only contains one of the following model identifiers and nothing else (no quotes, no spaces, no new lines, ...).
## Best Model Identifier
"""
MOA_AGGREGATOR_SYSTEM_PROMPT = """
You have been provided with a set of responses from various open-source models to the latest user query. Your
task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the
information provided in these responses, recognizing that some of it may be biased or incorrect. Your response
should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply
to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of
accuracy and reliability.
Responses from models:
1. [Model Response from Ai,1]
2. [Model Response from Ai,2]
...
n. [Model Response from Ai,n]
"""

@ -42,9 +42,6 @@ from swarms.structs.majority_voting import (
MajorityVoting, MajorityVoting,
) )
from swarms.structs.malt import MALT from swarms.structs.malt import MALT
from swarms.structs.meme_agent_persona_generator import (
MemeAgentGenerator,
)
from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.model_router import ModelRouter from swarms.structs.model_router import ModelRouter
from swarms.structs.multi_agent_exec import ( from swarms.structs.multi_agent_exec import (
@ -153,7 +150,6 @@ __all__ = [
"GroupChat", "GroupChat",
"expertise_based", "expertise_based",
"MultiAgentRouter", "MultiAgentRouter",
"MemeAgentGenerator",
"ModelRouter", "ModelRouter",
"MALT", "MALT",
"HybridHierarchicalClusterSwarm", "HybridHierarchicalClusterSwarm",

@ -2383,4 +2383,3 @@ class AOPCluster:
if tool.get("function", {}).get("name") == server_name: if tool.get("function", {}).get("name") == server_name:
return tool return tool
return None return None

@ -1,10 +1,12 @@
import concurrent.futures import concurrent.futures
from swarms.structs.agent import Agent
from typing import List, Union, Callable
import os import os
from swarms.utils.formatter import formatter
from loguru import logger
import traceback import traceback
from typing import Callable, List, Union
from loguru import logger
from swarms.structs.agent import Agent
from swarms.utils.formatter import formatter
class BatchAgentExecutionError(Exception): class BatchAgentExecutionError(Exception):

@ -1,291 +0,0 @@
import json
from typing import List
from dotenv import load_dotenv
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.litellm_wrapper import LiteLLM
load_dotenv()
class MemeAgentConfig(BaseModel):
"""Configuration for an individual meme agent in a swarm"""
name: str = Field(
description="The name of the meme agent",
example="Meme-Generator-Agent",
)
description: str = Field(
description="A description of the meme agent's purpose and capabilities",
example="Agent responsible for generating and sharing memes",
)
system_prompt: str = Field(
description="The system prompt that defines the meme agent's behavior. Make this prompt as detailed and as extensive as possible.",
example="You are a meme generator agent. Your role is to create and share funny memes...",
)
class MemeSwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative meme agents"""
name: str = Field(
description="The name of the meme swarm",
example="Meme-Creation-Swarm",
)
description: str = Field(
description="The description of the meme swarm's purpose and capabilities",
example="A swarm of agents that work together to generate and share memes",
)
agents: List[MemeAgentConfig] = Field(
description="The list of meme agents that make up the swarm",
example=[
MemeAgentConfig(
name="Meme-Generator-Agent",
description="Generates memes",
system_prompt="You are a meme generator agent...",
),
MemeAgentConfig(
name="Meme-Sharer-Agent",
description="Shares memes",
system_prompt="You are a meme sharer agent...",
),
],
)
max_loops: int = Field(
description="The maximum number of meme generation loops to run the swarm",
example=1,
)
BOSS_SYSTEM_PROMPT = """
You are the Meme Generator Boss, responsible for creating and managing a swarm of agents that generate funny, weird, and cool personas. Your goal is to ensure that each agent is uniquely suited to create hilarious and entertaining content.
### Instructions:
1. **Persona Generation**:
- Analyze the type of meme or content required.
- Assign tasks to existing agents with a fitting persona, ensuring they understand the tone and style needed.
- If no suitable agent exists, create a new agent with a persona tailored to the task, including a system prompt that outlines their role, objectives, and creative liberties.
2. **Agent Persona Creation**:
- Name agents based on their persona or the type of content they generate (e.g., "Dank Meme Lord" or "Surreal Humor Specialist").
- Provide each new agent with a system prompt that outlines their persona, including their tone, style, and any specific themes or topics they should focus on.
3. **Creativity and Originality**:
- Encourage agents to think outside the box and come up with unique, humorous, and entertaining content.
- Foster an environment where agents can experiment with different styles and formats to keep content fresh and engaging.
4. **Communication and Feedback**:
- Clearly communicate the requirements and expectations for each task to ensure agents understand what is needed.
- Encourage agents to provide feedback on their creative process and suggest new ideas or directions for future content.
5. **Transparency and Accountability**:
- Maintain transparency in the selection or creation of agents for specific tasks, ensuring that the reasoning behind each decision is clear.
- Hold agents accountable for the content they generate, ensuring it meets the required standards of humor and creativity.
# Output Format
Present your plan in a clear, bullet-point format or short concise paragraphs, outlining persona generation, agent creation, creativity strategies, and communication protocols.
# Notes
- Ensure that agents understand the importance of originality and creativity in their content.
- Foster a culture of experimentation and continuous improvement to keep the content generated by agents fresh and engaging.
"""
class MemeAgentGenerator:
"""A class that automatically builds and manages swarms of AI agents.
This class handles the creation, coordination and execution of multiple AI agents working
together as a swarm to accomplish complex tasks. It uses a boss agent to delegate work
and create new specialized agents as needed.
Args:
name (str): The name of the swarm
description (str): A description of the swarm's purpose
verbose (bool, optional): Whether to output detailed logs. Defaults to True.
max_loops (int, optional): Maximum number of execution loops. Defaults to 1.
"""
def __init__(
self,
name: str = None,
description: str = None,
verbose: bool = True,
max_loops: int = 1,
):
self.name = name
self.description = description
self.verbose = verbose
self.max_loops = max_loops
self.agents_pool = []
logger.info(
f"Initialized AutoSwarmBuilder: {name} {description}"
)
def run(self, task: str, image_url: str = None, *args, **kwargs):
"""Run the swarm on a given task.
Args:
task (str): The task to be accomplished
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the swarm's execution
"""
logger.info(f"Running swarm on task: {task}")
agents = self._create_agents(task, image_url, *args, **kwargs)
logger.info(f"Agents created {len(agents)}")
logger.info("Routing task through swarm")
output = self.swarm_router(agents, task, image_url)
logger.info(f"Swarm execution complete with output: {output}")
return output
def _create_agents(self, task: str, *args, **kwargs):
"""Create the necessary agents for a task.
Args:
task (str): The task to create agents for
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
list: List of created agents
"""
logger.info("Creating agents for task")
model = LiteLLM(
model_name="gpt-4.1",
system_prompt=BOSS_SYSTEM_PROMPT,
temperature=0.1,
response_format=MemeSwarmConfig,
)
agents_dictionary = model.run(task)
print(agents_dictionary)
agents_dictionary = json.loads(agents_dictionary)
if isinstance(agents_dictionary, dict):
agents_dictionary = MemeSwarmConfig(**agents_dictionary)
else:
raise ValueError(
"Agents dictionary is not a valid dictionary"
)
# Set swarm config
self.name = agents_dictionary.name
self.description = agents_dictionary.description
logger.info(
f"Swarm config: {self.name}, {self.description}, {self.max_loops}"
)
# Create agents from config
agents = []
for agent_config in agents_dictionary.agents:
# Convert dict to AgentConfig if needed
if isinstance(agent_config, dict):
agent_config = MemeAgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
)
agents.append(agent)
return agents
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
max_loops: int = 1,
):
"""Build a single agent with the given specifications.
Args:
agent_name (str): Name of the agent
agent_description (str): Description of the agent's purpose
agent_system_prompt (str): The system prompt for the agent
Returns:
Agent: The constructed agent instance
"""
logger.info(f"Building agent: {agent_name}")
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
model_name="gpt-4.1",
max_loops=max_loops,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
# auto_generate_prompt=True,
)
return agent
def swarm_router(
self,
agents: List[Agent],
task: str,
*args,
**kwargs,
):
"""Route tasks between agents in the swarm.
Args:
agents (List[Agent]): List of available agents
task (str): The task to route
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the routed task execution
"""
logger.info("Routing task through swarm")
swarm_router_instance = SwarmRouter(
name=self.name,
description=self.description,
agents=agents,
swarm_type="auto",
max_loops=1,
)
return swarm_router_instance.run(
self.name + " " + self.description + " " + task,
)
# if __name__ == "__main__":
# example = MemeAgentGenerator(
# name="Meme-Swarm",
# description="A swarm of specialized AI agents collaborating on generating and sharing memes around cool media from 2001s",
# max_loops=1,
# )
# print(
# example.run(
# "Generate funny meme agents around cool media from 2001s"
# )
# )

@ -1,18 +1,18 @@
import concurrent.futures
import os import os
import uuid
from typing import List, Optional from typing import List, Optional
from swarms.prompts.ag_prompt import AGGREGATOR_SYSTEM_PROMPT_MAIN
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.prompts.ag_prompt import aggregator_system_prompt_main from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents from swarms.structs.ma_utils import list_all_agents
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.utils.history_output_formatter import ( from swarms.utils.history_output_formatter import (
history_output_formatter, history_output_formatter,
) )
from swarms.utils.loguru_logger import initialize_logger from swarms.utils.loguru_logger import initialize_logger
import concurrent.futures
from swarms.utils.output_types import OutputType from swarms.utils.output_types import OutputType
from swarms.structs.conversation import Conversation
logger = initialize_logger(log_folder="mixture_of_agents") logger = initialize_logger(log_folder="mixture_of_agents")
@ -24,15 +24,16 @@ class MixtureOfAgents:
def __init__( def __init__(
self, self,
id: str = str(uuid.uuid4()),
name: str = "MixtureOfAgents", name: str = "MixtureOfAgents",
description: str = "A class to run a mixture of agents and aggregate their responses.", description: str = "A class to run a mixture of agents and aggregate their responses.",
agents: List[Agent] = None, agents: List[Agent] = None,
aggregator_agent: Agent = None, aggregator_agent: Agent = None,
aggregator_system_prompt: str = aggregator_system_prompt_main, aggregator_system_prompt: str = AGGREGATOR_SYSTEM_PROMPT_MAIN,
layers: int = 3, layers: int = 3,
max_loops: int = 1, max_loops: int = 1,
output_type: OutputType = "final", output_type: OutputType = "final",
aggregator_model_name: str = "claude-3-5-sonnet-20240620", aggregator_model_name: str = "claude-sonnet-4-20250514",
) -> None: ) -> None:
""" """
Initialize the Mixture of Agents class with agents and configuration. Initialize the Mixture of Agents class with agents and configuration.
@ -54,7 +55,6 @@ class MixtureOfAgents:
self.max_loops = max_loops self.max_loops = max_loops
self.output_type = output_type self.output_type = output_type
self.aggregator_model_name = aggregator_model_name self.aggregator_model_name = aggregator_model_name
self.aggregator_agent = self.aggregator_agent_setup()
self.reliability_check() self.reliability_check()
@ -68,16 +68,8 @@ class MixtureOfAgents:
add_to_conversation=True, add_to_conversation=True,
) )
def aggregator_agent_setup(self): if self.aggregator_agent is None:
return Agent( self.aggregator_agent = self.aggregator_agent_setup()
agent_name="Aggregator Agent",
description="An agent that aggregates the responses of the other agents.",
system_prompt=aggregator_system_prompt_main,
model_name=self.aggregator_model_name,
temperature=0.5,
max_loops=1,
output_type="str-all-except-first",
)
def reliability_check(self) -> None: def reliability_check(self) -> None:
""" """
@ -90,9 +82,6 @@ class MixtureOfAgents:
if len(self.agents) == 0: if len(self.agents) == 0:
raise ValueError("No agents provided.") raise ValueError("No agents provided.")
if not self.aggregator_agent:
raise ValueError("No aggregator agent provided.")
if not self.aggregator_system_prompt: if not self.aggregator_system_prompt:
raise ValueError("No aggregator system prompt provided.") raise ValueError("No aggregator system prompt provided.")
@ -102,77 +91,116 @@ class MixtureOfAgents:
logger.info("Reliability check passed.") logger.info("Reliability check passed.")
logger.info("Mixture of Agents class is ready for use.") logger.info("Mixture of Agents class is ready for use.")
def save_to_markdown_file(self, file_path: str = "moa.md"): def aggregator_agent_setup(self):
with open(file_path, "w") as f: return Agent(
f.write(self.conversation.get_str()) agent_name="Aggregator Agent",
agent_description="An agent that aggregates the responses of the other agents.",
system_prompt=self.aggregator_system_prompt,
model_name=self.aggregator_model_name,
temperature=0.5,
max_loops=1,
output_type="str-all-except-first",
dynamic_context_window=True,
)
def step( def step(
self, self,
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
imgs: Optional[List[str]] = None,
): ):
# self.conversation.add(role="User", content=task) # # Run agents concurrently
# with concurrent.futures.ThreadPoolExecutor(
# Run agents concurrently # max_workers=os.cpu_count()
with concurrent.futures.ThreadPoolExecutor( # ) as executor:
max_workers=os.cpu_count() # # Submit all agent tasks and store with their index
) as executor: # future_to_agent = {
# Submit all agent tasks and store with their index # executor.submit(
future_to_agent = { # agent.run, task=task, img=img, imgs=imgs
executor.submit( # ): agent
agent.run, task=task, img=img, imgs=imgs # for agent in self.agents
): agent # }
for agent in self.agents
} # # Collect results and add to conversation in completion order
# for future in concurrent.futures.as_completed(
# Collect results and add to conversation in completion order # future_to_agent
for future in concurrent.futures.as_completed( # ):
future_to_agent # agent = future_to_agent[future]
): # output = future.result()
agent = future_to_agent[future] # self.conversation.add(role=agent.name, content=output)
output = future.result() agent_outputs = run_agents_concurrently(
self.conversation.add(role=agent.name, content=output) agents=self.agents,
task=task,
img=img,
return_agent_output_dict=True,
)
return self.conversation.get_str() return agent_outputs
def _run( def _run(
self, self,
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
imgs: Optional[List[str]] = None,
): ):
# self.conversation.add(role="User", content=task)
# for i in range(self.layers):
# out = self.step(
# task=self.conversation.get_str(), img=img, imgs=imgs
# )
# task = out
# out = self.aggregator_agent.run(
# task=self.conversation.get_str()
# )
# self.conversation.add(
# role=self.aggregator_agent.agent_name, content=out
# )
# out = history_output_formatter(
# conversation=self.conversation, type=self.output_type
# )
# return out
self.conversation.add(role="User", content=task) self.conversation.add(role="User", content=task)
full_context = self.conversation.get_str()
for i in range(self.layers): for i in range(self.layers):
out = self.step( # Pass the full context/history string to the step method
task=self.conversation.get_str(), img=img, imgs=imgs step_output = self.step(task=full_context, img=img)
# Log each agent's output with full context awareness
for agent_name, agent_output in step_output.items():
self.conversation.add(
role=agent_name, content=agent_output
) )
task = out
out = self.aggregator_agent.run( # Update the full_context with the latest conversation history
full_context = self.conversation.get_str()
aggregator_output = self.aggregator_agent.run(
task=self.conversation.get_str() task=self.conversation.get_str()
) )
self.conversation.add( self.conversation.add(
role=self.aggregator_agent.agent_name, content=out role=self.aggregator_agent.agent_name,
content=aggregator_output,
) )
out = history_output_formatter( return history_output_formatter(
conversation=self.conversation, type=self.output_type conversation=self.conversation, type=self.output_type
) )
return out
def run( def run(
self, self,
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
imgs: Optional[List[str]] = None,
): ):
try: try:
return self._run(task=task, img=img, imgs=imgs) return self._run(task=task, img=img)
except Exception as e: except Exception as e:
logger.error(f"Error running Mixture of Agents: {e}") logger.error(f"Error running Mixture of Agents: {e}")
return f"Error: {e}" return f"Error: {e}"

@ -98,67 +98,105 @@ async def run_agents_concurrently_async(
def run_agents_concurrently( def run_agents_concurrently(
agents: List[AgentType], agents: List["AgentType"],
task: str, task: str,
img: Optional[str] = None,
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
) -> List[Any]: return_agent_output_dict: bool = False,
) -> Any:
""" """
Run multiple agents concurrently using ThreadPoolExecutor for optimal performance. Execute multiple agents concurrently using a ThreadPoolExecutor.
This function executes multiple agents concurrently using a thread pool executor, This function runs agent tasks in parallel threads, benefitting I/O-bound or mixed-load scenarios.
which provides better performance than asyncio for CPU-bound tasks. It automatically Each agent receives the same 'task' (and optional 'img' argument) and runs its .run() method.
determines the optimal number of worker threads based on available CPU cores. The number of worker threads defaults to 95% of the available CPU cores, unless otherwise specified.
Args: Args:
agents (List[AgentType]): List of agent instances to run concurrently agents (List[AgentType]): List of agent instances to execute concurrently.
task (str): The task string to be executed by all agents task (str): Task string to pass to all agent run() methods.
max_workers (Optional[int]): Maximum number of threads in the executor. img (Optional[str]): Optional image data to pass to agent run() if supported.
Defaults to 95% of available CPU cores for optimal performance max_workers (Optional[int]): Maximum threads for the executor (default: 95% of CPU cores).
return_agent_output_dict (bool): If True, returns a dict mapping agent names to outputs.
Otherwise returns a list of results in completion order.
Returns: Returns:
List[Any]: List of results from each agent. If an agent fails, the exception List[Any] or Dict[str, Any]: List of results from each agent's run() method in completion order,
is included in the results list instead of the result. or a dict of agent names to results (preserving agent order)
if return_agent_output_dict is True.
Note: If an agent fails, the corresponding result is the Exception.
- Uses 95% of CPU cores by default for optimal resource utilization
- Handles exceptions gracefully by including them in the results Notes:
- Results may not be in the same order as input agents due to concurrent execution - ThreadPoolExecutor is used for efficient, parallel execution.
- By default, utilizes nearly all available CPU cores for optimal performance.
- Any Exception during agent execution is caught and included in the results.
- If return_agent_output_dict is True, the results dict preserves agent input order.
- Otherwise, the results list is in order of completion (not input order).
Example: Example:
>>> agents = [Agent1(), Agent2(), Agent3()] >>> agents = [Agent1(), Agent2()]
>>> results = run_agents_concurrently(agents, "Process data") >>> # As list
>>> for i, result in enumerate(results): >>> results = run_agents_concurrently(agents, task="Summarize", img=None)
... if isinstance(result, Exception): >>> # As dict
... print(f"Agent {i+1} failed: {result}") >>> results_dict = run_agents_concurrently(
... else: ... agents, task="Summarize", return_agent_output_dict=True)
... print(f"Agent {i+1} result: {result}") >>> for name, val in results_dict.items():
... print(f"Result from {name}: {val}")
""" """
try:
if max_workers is None: if max_workers is None:
# 95% of the available CPU cores
num_cores = os.cpu_count() num_cores = os.cpu_count()
max_workers = int(num_cores * 0.95) if num_cores else 1 max_workers = int(num_cores * 0.95) if num_cores else 1
results = [] futures = []
agent_id_map = {}
with concurrent.futures.ThreadPoolExecutor( with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers max_workers=max_workers
) as executor: ) as executor:
# Submit all tasks and get futures for agent in agents:
futures = [ agent_kwargs = {}
executor.submit(agent.run, task) for agent in agents if task is not None:
] agent_kwargs["task"] = task
if img is not None:
# Wait for all futures to complete and get results agent_kwargs["img"] = img
for future in concurrent.futures.as_completed(futures): future = executor.submit(agent.run, **agent_kwargs)
futures.append(future)
agent_id_map[future] = agent
if return_agent_output_dict:
# Use agent name as key, preserve input order
output_dict = {}
for agent, future in zip(agents, futures):
try:
result = future.result()
except Exception as e:
result = e
# Prefer .agent_name or .name, fallback to str(agent)
name = (
getattr(agent, "agent_name", None)
or getattr(agent, "name", None)
or str(agent)
)
output_dict[name] = result
return output_dict
else:
results = []
for future in concurrent.futures.as_completed(
futures
):
try: try:
result = future.result() result = future.result()
results.append(result) results.append(result)
except Exception as e: except Exception as e:
# Append the error if an agent fails
results.append(e) results.append(e)
return results return results
except Exception as e:
logger.error(
f"Error running_agents_concurrently: {e} Traceback: {e.__traceback__}"
)
raise e
def run_agents_concurrently_multiprocess( def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = os.cpu_count() agents: List[Agent], task: str, batch_size: int = os.cpu_count()

@ -1,33 +1,51 @@
import math import math
from typing import List, Union, Dict from typing import Dict, List, Union
from loguru import logger from loguru import logger
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentListType
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.structs.omni_agent_types import AgentListType
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
# Base Swarm class that all other swarm types will inherit from # Base Swarm class that all other swarm types will inherit from
class BaseSwarm: class BaseSwarm:
def __init__(self, agents: AgentListType): def __init__(
self,
agents: AgentListType,
name: str = "BaseSwarm",
description: str = "A base swarm implementation",
output_type: str = "dict",
):
"""
Initialize the BaseSwarm with agents, name, description, and output type.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
# Ensure agents is a flat list of Agent objects # Ensure agents is a flat list of Agent objects
self.agents = ( self.agents = (
[agent for sublist in agents for agent in sublist] [agent for sublist in agents for agent in sublist]
if isinstance(agents[0], list) if isinstance(agents[0], list)
else agents else agents
) )
self.name = name
self.description = description
self.output_type = output_type
self.conversation = Conversation() self.conversation = Conversation()
def run( def run(self, tasks: List[str]) -> Union[Dict, List, str]:
self, tasks: List[str], return_type: str = "dict"
) -> Union[Dict, List, str]:
""" """
Run the swarm with the given tasks Run the swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -42,19 +60,10 @@ class BaseSwarm:
"This method should be implemented by child classes" "This method should be implemented by child classes"
) )
def _format_return( def _format_return(self) -> Union[Dict, List, str]:
self, return_type: str """Format the return value based on the output_type using history_output_formatter"""
) -> Union[Dict, List, str]: return history_output_formatter(
"""Format the return value based on the return_type""" self.conversation, self.output_type
if return_type.lower() == "dict":
return self.conversation.return_messages_as_dictionary()
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
) )
@ -63,15 +72,30 @@ class CircularSwarm(BaseSwarm):
Implements a circular swarm where agents pass tasks in a circular manner. Implements a circular swarm where agents pass tasks in a circular manner.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "CircularSwarm",
description: str = "A circular swarm where agents pass tasks in a circular manner",
output_type: str = "dict",
):
"""
Initialize the CircularSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the circular swarm with the given tasks Run the circular swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -92,7 +116,7 @@ class CircularSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class LinearSwarm(BaseSwarm): class LinearSwarm(BaseSwarm):
@ -100,15 +124,30 @@ class LinearSwarm(BaseSwarm):
Implements a linear swarm where agents process tasks sequentially. Implements a linear swarm where agents process tasks sequentially.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "LinearSwarm",
description: str = "A linear swarm where agents process tasks sequentially",
output_type: str = "dict",
):
"""
Initialize the LinearSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the linear swarm with the given tasks Run the linear swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -131,7 +170,7 @@ class LinearSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class StarSwarm(BaseSwarm): class StarSwarm(BaseSwarm):
@ -139,15 +178,30 @@ class StarSwarm(BaseSwarm):
Implements a star swarm where a central agent processes all tasks, followed by others. Implements a star swarm where a central agent processes all tasks, followed by others.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "StarSwarm",
description: str = "A star swarm where a central agent processes all tasks, followed by others",
output_type: str = "dict",
):
"""
Initialize the StarSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the star swarm with the given tasks Run the star swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -178,7 +232,7 @@ class StarSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class MeshSwarm(BaseSwarm): class MeshSwarm(BaseSwarm):
@ -186,15 +240,30 @@ class MeshSwarm(BaseSwarm):
Implements a mesh swarm where agents work on tasks randomly from a task queue. Implements a mesh swarm where agents work on tasks randomly from a task queue.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "MeshSwarm",
description: str = "A mesh swarm where agents work on tasks randomly from a task queue",
output_type: str = "dict",
):
"""
Initialize the MeshSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the mesh swarm with the given tasks Run the mesh swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -218,7 +287,7 @@ class MeshSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class PyramidSwarm(BaseSwarm): class PyramidSwarm(BaseSwarm):
@ -226,15 +295,30 @@ class PyramidSwarm(BaseSwarm):
Implements a pyramid swarm where agents are arranged in a pyramid structure. Implements a pyramid swarm where agents are arranged in a pyramid structure.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "PyramidSwarm",
description: str = "A pyramid swarm where agents are arranged in a pyramid structure",
output_type: str = "dict",
):
"""
Initialize the PyramidSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the pyramid swarm with the given tasks Run the pyramid swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -264,7 +348,7 @@ class PyramidSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class FibonacciSwarm(BaseSwarm): class FibonacciSwarm(BaseSwarm):
@ -272,15 +356,30 @@ class FibonacciSwarm(BaseSwarm):
Implements a Fibonacci swarm where agents are arranged according to the Fibonacci sequence. Implements a Fibonacci swarm where agents are arranged according to the Fibonacci sequence.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "FibonacciSwarm",
description: str = "A Fibonacci swarm where agents are arranged according to the Fibonacci sequence",
output_type: str = "dict",
):
"""
Initialize the FibonacciSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Fibonacci swarm with the given tasks Run the Fibonacci swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -309,7 +408,7 @@ class FibonacciSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class PrimeSwarm(BaseSwarm): class PrimeSwarm(BaseSwarm):
@ -317,15 +416,30 @@ class PrimeSwarm(BaseSwarm):
Implements a Prime swarm where agents at prime indices process tasks. Implements a Prime swarm where agents at prime indices process tasks.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "PrimeSwarm",
description: str = "A Prime swarm where agents at prime indices process tasks",
output_type: str = "dict",
):
"""
Initialize the PrimeSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Prime swarm with the given tasks Run the Prime swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -376,7 +490,7 @@ class PrimeSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class PowerSwarm(BaseSwarm): class PowerSwarm(BaseSwarm):
@ -384,15 +498,30 @@ class PowerSwarm(BaseSwarm):
Implements a Power swarm where agents at power-of-2 indices process tasks. Implements a Power swarm where agents at power-of-2 indices process tasks.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "PowerSwarm",
description: str = "A Power swarm where agents at power-of-2 indices process tasks",
output_type: str = "dict",
):
"""
Initialize the PowerSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Power swarm with the given tasks Run the Power swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -417,7 +546,7 @@ class PowerSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class LogSwarm(BaseSwarm): class LogSwarm(BaseSwarm):
@ -425,15 +554,30 @@ class LogSwarm(BaseSwarm):
Implements a Log swarm where agents at logarithmic indices process tasks. Implements a Log swarm where agents at logarithmic indices process tasks.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "LogSwarm",
description: str = "A Log swarm where agents at logarithmic indices process tasks",
output_type: str = "dict",
):
"""
Initialize the LogSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Log swarm with the given tasks Run the Log swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -457,7 +601,7 @@ class LogSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class ExponentialSwarm(BaseSwarm): class ExponentialSwarm(BaseSwarm):
@ -465,15 +609,30 @@ class ExponentialSwarm(BaseSwarm):
Implements an Exponential swarm where agents at exponential indices process tasks. Implements an Exponential swarm where agents at exponential indices process tasks.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "ExponentialSwarm",
description: str = "An Exponential swarm where agents at exponential indices process tasks",
output_type: str = "dict",
):
"""
Initialize the ExponentialSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Exponential swarm with the given tasks Run the Exponential swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -497,7 +656,7 @@ class ExponentialSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class GeometricSwarm(BaseSwarm): class GeometricSwarm(BaseSwarm):
@ -505,15 +664,30 @@ class GeometricSwarm(BaseSwarm):
Implements a Geometric swarm where agents at geometrically increasing indices process tasks. Implements a Geometric swarm where agents at geometrically increasing indices process tasks.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "GeometricSwarm",
description: str = "A Geometric swarm where agents at geometrically increasing indices process tasks",
output_type: str = "dict",
):
"""
Initialize the GeometricSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Geometric swarm with the given tasks Run the Geometric swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -538,7 +712,7 @@ class GeometricSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class HarmonicSwarm(BaseSwarm): class HarmonicSwarm(BaseSwarm):
@ -546,15 +720,30 @@ class HarmonicSwarm(BaseSwarm):
Implements a Harmonic swarm where agents at harmonically spaced indices process tasks. Implements a Harmonic swarm where agents at harmonically spaced indices process tasks.
""" """
def run( def __init__(
self, tasks: List[str], return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "HarmonicSwarm",
description: str = "A Harmonic swarm where agents at harmonically spaced indices process tasks",
output_type: str = "dict",
):
"""
Initialize the HarmonicSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, tasks: List[str]) -> Union[Dict, List, str]:
""" """
Run the Harmonic swarm with the given tasks Run the Harmonic swarm with the given tasks
Args: Args:
tasks: List of tasks to be processed tasks: List of tasks to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -580,7 +769,7 @@ class HarmonicSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class StaircaseSwarm(BaseSwarm): class StaircaseSwarm(BaseSwarm):
@ -588,15 +777,30 @@ class StaircaseSwarm(BaseSwarm):
Implements a Staircase swarm where agents at staircase-patterned indices process a task. Implements a Staircase swarm where agents at staircase-patterned indices process a task.
""" """
def run( def __init__(
self, task: str, return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "StaircaseSwarm",
description: str = "A Staircase swarm where agents at staircase-patterned indices process a task",
output_type: str = "dict",
):
"""
Initialize the StaircaseSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, task: str) -> Union[Dict, List, str]:
""" """
Run the Staircase swarm with the given task Run the Staircase swarm with the given task
Args: Args:
task: Task to be processed task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -617,7 +821,7 @@ class StaircaseSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class SigmoidSwarm(BaseSwarm): class SigmoidSwarm(BaseSwarm):
@ -625,15 +829,30 @@ class SigmoidSwarm(BaseSwarm):
Implements a Sigmoid swarm where agents at sigmoid-distributed indices process a task. Implements a Sigmoid swarm where agents at sigmoid-distributed indices process a task.
""" """
def run( def __init__(
self, task: str, return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "SigmoidSwarm",
description: str = "A Sigmoid swarm where agents at sigmoid-distributed indices process a task",
output_type: str = "dict",
):
"""
Initialize the SigmoidSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, task: str) -> Union[Dict, List, str]:
""" """
Run the Sigmoid swarm with the given task Run the Sigmoid swarm with the given task
Args: Args:
task: Task to be processed task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -653,7 +872,7 @@ class SigmoidSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
class SinusoidalSwarm(BaseSwarm): class SinusoidalSwarm(BaseSwarm):
@ -661,15 +880,30 @@ class SinusoidalSwarm(BaseSwarm):
Implements a Sinusoidal swarm where agents at sinusoidally-distributed indices process a task. Implements a Sinusoidal swarm where agents at sinusoidally-distributed indices process a task.
""" """
def run( def __init__(
self, task: str, return_type: str = "dict" self,
) -> Union[Dict, List, str]: agents: AgentListType,
name: str = "SinusoidalSwarm",
description: str = "A Sinusoidal swarm where agents at sinusoidally-distributed indices process a task",
output_type: str = "dict",
):
"""
Initialize the SinusoidalSwarm.
Args:
agents: List of Agent objects or nested list of Agent objects
name: Name of the swarm
description: Description of the swarm's purpose
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
super().__init__(agents, name, description, output_type)
def run(self, task: str) -> Union[Dict, List, str]:
""" """
Run the Sinusoidal swarm with the given task Run the Sinusoidal swarm with the given task
Args: Args:
task: Task to be processed task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -689,7 +923,7 @@ class SinusoidalSwarm(BaseSwarm):
) )
responses.append(response) responses.append(response)
return self._format_return(return_type) return self._format_return()
# Communication classes # Communication classes
@ -698,13 +932,27 @@ class OneToOne:
Facilitates one-to-one communication between two agents. Facilitates one-to-one communication between two agents.
""" """
def __init__(self, sender: Agent, receiver: Agent): def __init__(
self,
sender: Agent,
receiver: Agent,
output_type: str = "dict",
):
"""
Initialize the OneToOne communication.
Args:
sender: The sender agent
receiver: The receiver agent
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
self.sender = sender self.sender = sender
self.receiver = receiver self.receiver = receiver
self.output_type = output_type
self.conversation = Conversation() self.conversation = Conversation()
def run( def run(
self, task: str, max_loops: int = 1, return_type: str = "dict" self, task: str, max_loops: int = 1
) -> Union[Dict, List, str]: ) -> Union[Dict, List, str]:
""" """
Run the one-to-one communication with the given task Run the one-to-one communication with the given task
@ -712,7 +960,6 @@ class OneToOne:
Args: Args:
task: Task to be processed task: Task to be processed
max_loops: Number of exchange iterations max_loops: Number of exchange iterations
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -752,15 +999,8 @@ class OneToOne:
) )
raise error raise error
if return_type.lower() == "dict": return history_output_formatter(
return self.conversation.return_messages_as_dictionary() self.conversation, self.output_type
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
) )
@ -769,24 +1009,35 @@ class Broadcast:
Facilitates broadcasting from one agent to many agents. Facilitates broadcasting from one agent to many agents.
""" """
def __init__(self, sender: Agent, receivers: AgentListType): def __init__(
self,
sender: Agent,
receivers: AgentListType,
output_type: str = "dict",
):
"""
Initialize the Broadcast communication.
Args:
sender: The sender agent
receivers: List of receiver agents
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
self.sender = sender self.sender = sender
self.receivers = ( self.receivers = (
[agent for sublist in receivers for agent in sublist] [agent for sublist in receivers for agent in sublist]
if isinstance(receivers[0], list) if isinstance(receivers[0], list)
else receivers else receivers
) )
self.output_type = output_type
self.conversation = Conversation() self.conversation = Conversation()
def run( def run(self, task: str) -> Union[Dict, List, str]:
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
""" """
Run the broadcast communication with the given task Run the broadcast communication with the given task
Args: Args:
task: Task to be processed task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -812,17 +1063,8 @@ class Broadcast:
content=response, content=response,
) )
if return_type.lower() == "dict": return history_output_formatter(
return ( self.conversation, self.output_type
self.conversation.return_messages_as_dictionary()
)
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
) )
except Exception as error: except Exception as error:
@ -835,7 +1077,20 @@ class OneToThree:
Facilitates one-to-three communication from one agent to exactly three agents. Facilitates one-to-three communication from one agent to exactly three agents.
""" """
def __init__(self, sender: Agent, receivers: AgentListType): def __init__(
self,
sender: Agent,
receivers: AgentListType,
output_type: str = "dict",
):
"""
Initialize the OneToThree communication.
Args:
sender: The sender agent
receivers: List of exactly three receiver agents
output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc.
"""
if len(receivers) != 3: if len(receivers) != 3:
raise ValueError( raise ValueError(
"The number of receivers must be exactly 3." "The number of receivers must be exactly 3."
@ -843,17 +1098,15 @@ class OneToThree:
self.sender = sender self.sender = sender
self.receivers = receivers self.receivers = receivers
self.output_type = output_type
self.conversation = Conversation() self.conversation = Conversation()
def run( def run(self, task: str) -> Union[Dict, List, str]:
self, task: str, return_type: str = "dict"
) -> Union[Dict, List, str]:
""" """
Run the one-to-three communication with the given task Run the one-to-three communication with the given task
Args: Args:
task: Task to be processed task: Task to be processed
return_type: Type of return value, one of 'dict', 'list', or 'string'
Returns: Returns:
Union[Dict, List, str]: The conversation history in the requested format Union[Dict, List, str]: The conversation history in the requested format
@ -877,17 +1130,8 @@ class OneToThree:
content=response, content=response,
) )
if return_type.lower() == "dict": return history_output_formatter(
return ( self.conversation, self.output_type
self.conversation.return_messages_as_dictionary()
)
elif return_type.lower() == "list":
return self.conversation.return_messages_as_list()
elif return_type.lower() == "string":
return self.conversation.return_history_as_string()
else:
raise ValueError(
"return_type must be one of 'dict', 'list', or 'string'"
) )
except Exception as error: except Exception as error:

@ -1,12 +1,39 @@
import yaml import yaml
from typing import Union, List, Dict, Any from typing import Any
from swarms.utils.xml_utils import to_xml_string from swarms.utils.xml_utils import to_xml_string
from swarms.utils.output_types import HistoryOutputType from swarms.utils.output_types import HistoryOutputType
def history_output_formatter( def history_output_formatter(
conversation: callable, type: HistoryOutputType = "list" conversation: callable, type: HistoryOutputType = "list"
) -> Union[List[Dict[str, Any]], Dict[str, Any], str]: ) -> Any:
"""
Formats the output of a conversation object into various formats.
Args:
conversation (callable): The conversation object that provides various output methods.
type (HistoryOutputType, optional): The desired output format.
Supported values:
- "list": Returns the conversation as a list of message dicts.
- "dict" or "dictionary": Returns the conversation as a dictionary.
- "string" or "str": Returns the conversation as a string.
- "final" or "last": Returns the content of the final message.
- "json": Returns the conversation as a JSON string.
- "all": Returns the conversation as a string (same as "string").
- "yaml": Returns the conversation as a YAML string.
- "dict-all-except-first": Returns all messages except the first as a dictionary.
- "list-final": Returns the final message as a list.
- "str-all-except-first": Returns all messages except the first as a string.
- "dict-final": Returns the final message as a dictionary.
- "xml": Returns the conversation as an XML string.
Defaults to "list".
Returns:
Union[List[Dict[str, Any]], Dict[str, Any], str]: The formatted conversation output.
Raises:
ValueError: If an invalid type is provided.
"""
if type == "list": if type == "list":
return conversation.return_messages_as_list() return conversation.return_messages_as_list()
elif type in ["dict", "dictionary"]: elif type in ["dict", "dictionary"]:

@ -477,13 +477,23 @@ class LiteLLM:
for standard tool call responses. for standard tool call responses.
""" """
if self.mcp_call is True: if self.mcp_call is True:
out = response.choices[0].message.tool_calls[0].function tool_calls = response.choices[0].message.tool_calls
if len(out) > 1: # Check if there are multiple tool calls
return out if len(tool_calls) > 1:
# Return all tool calls if there are multiple
return [
{
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
}
}
for tool_call in tool_calls
]
else: else:
out = out[0] # Single tool call
out = tool_calls[0].function
output = { output = {
"function": { "function": {
"name": out.name, "name": out.name,

@ -0,0 +1,33 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.multi_agent_exec import run_agents_concurrently
# Initialize the equity analyst agents
equity_analyst_1 = Agent(
agent_name="Equity-Analyst-1",
agent_description="Equity research analyst focused on fundamental analysis",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
equity_analyst_2 = Agent(
agent_name="Equity-Analyst-2",
agent_description="Equity research analyst focused on technical analysis",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
outputs = run_agents_concurrently(
agents=[equity_analyst_1, equity_analyst_2],
task="Analyze high growth tech stocks focusing on fundamentals like revenue growth, margins, and market position. Create a detailed analysis table in markdown.",
return_agent_output_dict=True,
)
print(outputs)

@ -0,0 +1,50 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.prompts.moa_prompt import MOA_AGGREGATOR_SYSTEM_PROMPT
# Initialize the equity analyst agents
equity_analyst_1 = Agent(
agent_name="Equity-Analyst-1",
agent_description="Equity research analyst focused on fundamental analysis",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
equity_analyst_2 = Agent(
agent_name="Equity-Analyst-2",
agent_description="Equity research analyst focused on technical analysis",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
equity_analyst_3 = Agent(
agent_name="Equity-Analyst-3",
agent_description="Equity research analyst focused on quantitative analysis and risk modeling",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
)
swarm = MixtureOfAgents(
name="Equity-Research-Swarm",
agents=[equity_analyst_1, equity_analyst_2, equity_analyst_3],
output_type="dict",
layers=1,
aggregator_system_prompt=MOA_AGGREGATOR_SYSTEM_PROMPT,
)
out = swarm.run(
task="Analyze Exchange-Traded Funds (ETFs) and stocks related to copper. Focus on fundamentals including supply/demand factors, production costs, major market participants, and recent price trends. Create a detailed analysis table in markdown.",
)
print(out)
Loading…
Cancel
Save