From b8955fbf7e8d203fbd9c04db5566231a882e0a65 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sat, 11 Oct 2025 12:26:42 -0700 Subject: [PATCH] [Improve MOA] [Improve alt swarms] [Improvement][run_agents_concurrently] [Description: aadded output agent types and image handling] [delete meme agent personator agent] --- MCP_TEST_README.md | 187 ++++++ .../utils/meme_agents/meme_agent_generator.py | 17 - swarms/prompts/ag_prompt.py | 4 +- swarms/prompts/moa_prompt.py | 60 ++ swarms/structs/__init__.py | 4 - swarms/structs/aop.py | 1 - swarms/structs/batch_agent_execution.py | 10 +- .../structs/meme_agent_persona_generator.py | 291 ---------- swarms/structs/mixture_of_agents.py | 146 +++-- swarms/structs/multi_agent_exec.py | 136 +++-- swarms/structs/various_alt_swarms.py | 530 +++++++++++++----- swarms/utils/history_output_formatter.py | 31 +- swarms/utils/litellm_wrapper.py | 34 +- test_agent_concurrent.py | 33 ++ test_moa_new.py | 50 ++ 15 files changed, 950 insertions(+), 584 deletions(-) create mode 100644 MCP_TEST_README.md delete mode 100644 examples/multi_agent/utils/meme_agents/meme_agent_generator.py create mode 100644 swarms/prompts/moa_prompt.py delete mode 100644 swarms/structs/meme_agent_persona_generator.py create mode 100644 test_agent_concurrent.py create mode 100644 test_moa_new.py diff --git a/MCP_TEST_README.md b/MCP_TEST_README.md new file mode 100644 index 00000000..a9cb41c7 --- /dev/null +++ b/MCP_TEST_README.md @@ -0,0 +1,187 @@ +# MCP Tools Bug Fix Test Scripts + +This directory contains test scripts to verify the fix for the MCP (Model Context Protocol) tools integration bug. + +## Bug Description + +**Issue**: `TypeError: object of type 'Function' has no len()` + +**Location**: `swarms/utils/litellm_wrapper.py` in the `output_for_tools` method + +**Root Cause**: The code was incorrectly trying to call `len()` on a `Function` object instead of checking the length of the `tool_calls` array. + +## Test Scripts + +### 1. `test_mcp_bug_fix.py` - Simple Bug Fix Test + +A focused test script that specifically reproduces the exact scenario from the bug report. + +**Features**: +- Tests the specific error scenario that was failing +- Verifies the fix handles both single and multiple tool calls +- Provides clear pass/fail results + +**Usage**: +```bash +python test_mcp_bug_fix.py +``` + +### 2. `test_mcp_tools_example.py` - Comprehensive Test Suite + +A comprehensive test suite that covers various aspects of MCP tools integration. + +**Features**: +- Basic tool fetching test +- Multiple MCP servers test +- Agent execution with MCP tools +- Error handling scenarios +- Performance testing +- Detailed reporting + +**Usage**: +```bash +python test_mcp_tools_example.py +``` + +## Prerequisites + +Before running the tests, you need to start the MCP server: + +1. **Start the OKX Crypto Server**: + ```bash + python examples/mcp/multi_mcp_guide/okx_crypto_server.py + ``` + + This will start the server on `http://0.0.0.0:8001/mcp` + +2. **Install Required Dependencies**: + ```bash + pip install swarms mcp fastmcp requests + ``` + +## Expected Results + +### Before the Fix +- ❌ `TypeError: object of type 'Function' has no len()` +- ❌ Agent execution fails when using MCP tools +- ❌ MCP tool calls cannot be processed + +### After the Fix +- ✅ Tools are fetched successfully +- ✅ Agent can execute tasks using MCP tools +- ✅ Both single and multiple tool calls work correctly +- ✅ No TypeError occurs + +## Test Scenarios + +### Basic Functionality +1. **Tool Fetching**: Verify MCP tools can be retrieved from the server +2. **Agent Creation**: Verify agents can be created with MCP tool integration +3. **Tool Execution**: Verify agents can execute tasks that use MCP tools + +### Error Handling +1. **Invalid Server URL**: Test behavior with non-existent server +2. **Invalid Authentication**: Test behavior with wrong credentials +3. **Network Timeouts**: Test behavior with connection timeouts + +### Edge Cases +1. **Single Tool Call**: Verify single tool call processing +2. **Multiple Tool Calls**: Verify multiple tool call processing +3. **Empty Responses**: Test behavior with empty tool responses + +## Sample Output + +### Successful Test Run +``` +🚀 MCP Bug Fix Test +This test verifies the fix for the TypeError in MCP tool usage. +Make sure the OKX crypto server is running on port 8001. + +🐛 Testing MCP Bug Fix +======================================== +1. Fetching MCP tools... + ✅ Successfully fetched 2 tools +2. Creating agent with MCP tools... + ✅ Agent created successfully +3. Running task with MCP tools... + Task: Get Bitcoin trading volume using get_okx_crypto_volume tool + ✅ Task completed successfully! + Result: [Tool execution result with Bitcoin volume data] + +======================================== +📊 TEST SUMMARY +======================================== +✅ Main bug fix: PASSED + The TypeError: object of type 'Function' has no len() is fixed! +✅ Multiple tool calls: PASSED + +🎉 ALL TESTS PASSED! +The MCP tools integration is working correctly. +``` + +## Troubleshooting + +### Common Issues + +1. **Server Not Running**: + ``` + Error: Connection refused + Solution: Start the OKX crypto server first + ``` + +2. **Port Already in Use**: + ``` + Error: Address already in use + Solution: Change the port in the server script or kill existing processes + ``` + +3. **Authentication Error**: + ``` + Error: 401 Unauthorized + Solution: Check the Authorization header in the connection + ``` + +### Debug Mode + +To get more detailed output, you can modify the test scripts to enable verbose logging: + +```python +# In the test scripts, add: +import logging +logging.basicConfig(level=logging.DEBUG) +``` + +## Code Changes Made + +The fix involved modifying the `output_for_tools` method in `swarms/utils/litellm_wrapper.py`: + +**Before** (buggy code): +```python +if self.mcp_call is True: + out = response.choices[0].message.tool_calls[0].function + if len(out) > 1: # ❌ Error: Function objects don't have len() + return out + else: + out = out[0] +``` + +**After** (fixed code): +```python +if self.mcp_call is True: + tool_calls = response.choices[0].message.tool_calls + if len(tool_calls) > 1: # ✅ Correct: Check tool_calls length + # Handle multiple tool calls + return [...] + else: + # Handle single tool call + out = tool_calls[0].function +``` + +## Contributing + +If you find any issues with these test scripts or the MCP tools integration, please: + +1. Run the test scripts to reproduce the issue +2. Check the server logs for additional error information +3. Report the issue with the test output and error details +4. Include your environment details (Python version, OS, etc.) diff --git a/examples/multi_agent/utils/meme_agents/meme_agent_generator.py b/examples/multi_agent/utils/meme_agents/meme_agent_generator.py deleted file mode 100644 index 2ec86d9d..00000000 --- a/examples/multi_agent/utils/meme_agents/meme_agent_generator.py +++ /dev/null @@ -1,17 +0,0 @@ -from swarms.structs.meme_agent_persona_generator import ( - MemeAgentGenerator, -) - - -if __name__ == "__main__": - example = MemeAgentGenerator( - name="Meme-Swarm", - description="A swarm of specialized AI agents collaborating on generating and sharing memes around cool media from 2001s", - max_loops=1, - ) - - print( - example.run( - "Generate funny meme agents around cool media from 2001s" - ) - ) diff --git a/swarms/prompts/ag_prompt.py b/swarms/prompts/ag_prompt.py index c454cb9d..6707f0c0 100644 --- a/swarms/prompts/ag_prompt.py +++ b/swarms/prompts/ag_prompt.py @@ -45,7 +45,7 @@ # # print(aggregator_system_prompt.get_prompt()) -aggregator_system_prompt_main = """ +AGGREGATOR_SYSTEM_PROMPT_MAIN = """ # Multi-Agent Observer and Summarizer @@ -82,4 +82,4 @@ aggregator_system_prompt_main = """ 5. Potential improvements or areas for further exploration Remember: Your role is crucial in distilling complex mult-agent interactions into actionable insights. Strive for clarity, accuracy, and impartiality in all your summaries. - """ +""" diff --git a/swarms/prompts/moa_prompt.py b/swarms/prompts/moa_prompt.py new file mode 100644 index 00000000..c8da664e --- /dev/null +++ b/swarms/prompts/moa_prompt.py @@ -0,0 +1,60 @@ +MOA_RANKER_PROMPT = """ +You are a highly efficient assistant who evaluates and selects the best large language model (LLMs) based on the quality of their responses to a given instruction. This process will be used to create a leaderboard reflecting the most accurate and human-preferred answers. + +I require a leaderboard for various large language models. I'll provide you with prompts given to these models and their corresponding outputs. Your task is to assess these responses and select the model that produces the best output from a human perspective. + +## Instruction +{ + "instruction": "{instruction}" +} + +## Model Outputs +Here are the unordered outputs from the models. Each output is associated with a specific model, identified by a unique model identifier. +[ + { + "model_identifier": "{identifier_1}", + "output": "{output_1}" + }, + { + "model_identifier": "{identifier_2}", + "output": "{output_2}" + }, + { + "model_identifier": "{identifier_3}", + "output": "{output_3}" + }, + { + "model_identifier": "{identifier_4}", + "output": "{output_4}" + }, + { + "model_identifier": "{identifier_5}", + "output": "{output_5}" + }, + { + "model_identifier": "{identifier_6}", + "output": "{output_6}" + } +] + +## Task +Evaluate the models based on the quality and relevance of their outputs and select the model that generated the best output. Answer by providing the model identifier of the best model. We will use your output as the name of the best model, so make sure your output only contains one of the following model identifiers and nothing else (no quotes, no spaces, no new lines, ...). + +## Best Model Identifier +""" + +MOA_AGGREGATOR_SYSTEM_PROMPT = """ +You have been provided with a set of responses from various open-source models to the latest user query. Your +task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the +information provided in these responses, recognizing that some of it may be biased or incorrect. Your response +should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply +to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of +accuracy and reliability. + +Responses from models: +1. [Model Response from Ai,1] +2. [Model Response from Ai,2] +... +n. [Model Response from Ai,n] + +""" diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 145a736c..ec6fb11e 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -42,9 +42,6 @@ from swarms.structs.majority_voting import ( MajorityVoting, ) from swarms.structs.malt import MALT -from swarms.structs.meme_agent_persona_generator import ( - MemeAgentGenerator, -) from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.model_router import ModelRouter from swarms.structs.multi_agent_exec import ( @@ -153,7 +150,6 @@ __all__ = [ "GroupChat", "expertise_based", "MultiAgentRouter", - "MemeAgentGenerator", "ModelRouter", "MALT", "HybridHierarchicalClusterSwarm", diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py index caab14cb..c03c4534 100644 --- a/swarms/structs/aop.py +++ b/swarms/structs/aop.py @@ -2383,4 +2383,3 @@ class AOPCluster: if tool.get("function", {}).get("name") == server_name: return tool return None - diff --git a/swarms/structs/batch_agent_execution.py b/swarms/structs/batch_agent_execution.py index 7be9e1f7..7f5b433a 100644 --- a/swarms/structs/batch_agent_execution.py +++ b/swarms/structs/batch_agent_execution.py @@ -1,10 +1,12 @@ import concurrent.futures -from swarms.structs.agent import Agent -from typing import List, Union, Callable import os -from swarms.utils.formatter import formatter -from loguru import logger import traceback +from typing import Callable, List, Union + +from loguru import logger + +from swarms.structs.agent import Agent +from swarms.utils.formatter import formatter class BatchAgentExecutionError(Exception): diff --git a/swarms/structs/meme_agent_persona_generator.py b/swarms/structs/meme_agent_persona_generator.py deleted file mode 100644 index bc231c5c..00000000 --- a/swarms/structs/meme_agent_persona_generator.py +++ /dev/null @@ -1,291 +0,0 @@ -import json -from typing import List - - -from dotenv import load_dotenv -from loguru import logger -from pydantic import BaseModel, Field - -from swarms.structs.agent import Agent -from swarms.structs.swarm_router import SwarmRouter -from swarms.utils.litellm_wrapper import LiteLLM - -load_dotenv() - - -class MemeAgentConfig(BaseModel): - """Configuration for an individual meme agent in a swarm""" - - name: str = Field( - description="The name of the meme agent", - example="Meme-Generator-Agent", - ) - description: str = Field( - description="A description of the meme agent's purpose and capabilities", - example="Agent responsible for generating and sharing memes", - ) - system_prompt: str = Field( - description="The system prompt that defines the meme agent's behavior. Make this prompt as detailed and as extensive as possible.", - example="You are a meme generator agent. Your role is to create and share funny memes...", - ) - - -class MemeSwarmConfig(BaseModel): - """Configuration for a swarm of cooperative meme agents""" - - name: str = Field( - description="The name of the meme swarm", - example="Meme-Creation-Swarm", - ) - description: str = Field( - description="The description of the meme swarm's purpose and capabilities", - example="A swarm of agents that work together to generate and share memes", - ) - agents: List[MemeAgentConfig] = Field( - description="The list of meme agents that make up the swarm", - example=[ - MemeAgentConfig( - name="Meme-Generator-Agent", - description="Generates memes", - system_prompt="You are a meme generator agent...", - ), - MemeAgentConfig( - name="Meme-Sharer-Agent", - description="Shares memes", - system_prompt="You are a meme sharer agent...", - ), - ], - ) - max_loops: int = Field( - description="The maximum number of meme generation loops to run the swarm", - example=1, - ) - - -BOSS_SYSTEM_PROMPT = """ -You are the Meme Generator Boss, responsible for creating and managing a swarm of agents that generate funny, weird, and cool personas. Your goal is to ensure that each agent is uniquely suited to create hilarious and entertaining content. - -### Instructions: - -1. **Persona Generation**: - - Analyze the type of meme or content required. - - Assign tasks to existing agents with a fitting persona, ensuring they understand the tone and style needed. - - If no suitable agent exists, create a new agent with a persona tailored to the task, including a system prompt that outlines their role, objectives, and creative liberties. - -2. **Agent Persona Creation**: - - Name agents based on their persona or the type of content they generate (e.g., "Dank Meme Lord" or "Surreal Humor Specialist"). - - Provide each new agent with a system prompt that outlines their persona, including their tone, style, and any specific themes or topics they should focus on. - -3. **Creativity and Originality**: - - Encourage agents to think outside the box and come up with unique, humorous, and entertaining content. - - Foster an environment where agents can experiment with different styles and formats to keep content fresh and engaging. - -4. **Communication and Feedback**: - - Clearly communicate the requirements and expectations for each task to ensure agents understand what is needed. - - Encourage agents to provide feedback on their creative process and suggest new ideas or directions for future content. - -5. **Transparency and Accountability**: - - Maintain transparency in the selection or creation of agents for specific tasks, ensuring that the reasoning behind each decision is clear. - - Hold agents accountable for the content they generate, ensuring it meets the required standards of humor and creativity. - -# Output Format - -Present your plan in a clear, bullet-point format or short concise paragraphs, outlining persona generation, agent creation, creativity strategies, and communication protocols. - -# Notes - -- Ensure that agents understand the importance of originality and creativity in their content. -- Foster a culture of experimentation and continuous improvement to keep the content generated by agents fresh and engaging. -""" - - -class MemeAgentGenerator: - """A class that automatically builds and manages swarms of AI agents. - - This class handles the creation, coordination and execution of multiple AI agents working - together as a swarm to accomplish complex tasks. It uses a boss agent to delegate work - and create new specialized agents as needed. - - Args: - name (str): The name of the swarm - description (str): A description of the swarm's purpose - verbose (bool, optional): Whether to output detailed logs. Defaults to True. - max_loops (int, optional): Maximum number of execution loops. Defaults to 1. - """ - - def __init__( - self, - name: str = None, - description: str = None, - verbose: bool = True, - max_loops: int = 1, - ): - self.name = name - self.description = description - self.verbose = verbose - self.max_loops = max_loops - self.agents_pool = [] - logger.info( - f"Initialized AutoSwarmBuilder: {name} {description}" - ) - - def run(self, task: str, image_url: str = None, *args, **kwargs): - """Run the swarm on a given task. - - Args: - task (str): The task to be accomplished - image_url (str, optional): URL of an image input if needed. Defaults to None. - *args: Variable length argument list - **kwargs: Arbitrary keyword arguments - - Returns: - The output from the swarm's execution - """ - logger.info(f"Running swarm on task: {task}") - agents = self._create_agents(task, image_url, *args, **kwargs) - logger.info(f"Agents created {len(agents)}") - logger.info("Routing task through swarm") - output = self.swarm_router(agents, task, image_url) - logger.info(f"Swarm execution complete with output: {output}") - return output - - def _create_agents(self, task: str, *args, **kwargs): - """Create the necessary agents for a task. - - Args: - task (str): The task to create agents for - *args: Variable length argument list - **kwargs: Arbitrary keyword arguments - - Returns: - list: List of created agents - """ - logger.info("Creating agents for task") - model = LiteLLM( - model_name="gpt-4.1", - system_prompt=BOSS_SYSTEM_PROMPT, - temperature=0.1, - response_format=MemeSwarmConfig, - ) - - agents_dictionary = model.run(task) - print(agents_dictionary) - - agents_dictionary = json.loads(agents_dictionary) - - if isinstance(agents_dictionary, dict): - agents_dictionary = MemeSwarmConfig(**agents_dictionary) - else: - raise ValueError( - "Agents dictionary is not a valid dictionary" - ) - - # Set swarm config - self.name = agents_dictionary.name - self.description = agents_dictionary.description - - logger.info( - f"Swarm config: {self.name}, {self.description}, {self.max_loops}" - ) - - # Create agents from config - agents = [] - for agent_config in agents_dictionary.agents: - # Convert dict to AgentConfig if needed - if isinstance(agent_config, dict): - agent_config = MemeAgentConfig(**agent_config) - - agent = self.build_agent( - agent_name=agent_config.name, - agent_description=agent_config.description, - agent_system_prompt=agent_config.system_prompt, - ) - agents.append(agent) - - return agents - - def build_agent( - self, - agent_name: str, - agent_description: str, - agent_system_prompt: str, - max_loops: int = 1, - ): - """Build a single agent with the given specifications. - - Args: - agent_name (str): Name of the agent - agent_description (str): Description of the agent's purpose - agent_system_prompt (str): The system prompt for the agent - - Returns: - Agent: The constructed agent instance - """ - logger.info(f"Building agent: {agent_name}") - agent = Agent( - agent_name=agent_name, - description=agent_description, - system_prompt=agent_system_prompt, - model_name="gpt-4.1", - max_loops=max_loops, - autosave=True, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path=f"{agent_name}.json", - user_name="swarms_corp", - retry_attempts=1, - context_length=200000, - return_step_meta=False, - output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and - streaming_on=False, - # auto_generate_prompt=True, - ) - - return agent - - def swarm_router( - self, - agents: List[Agent], - task: str, - *args, - **kwargs, - ): - """Route tasks between agents in the swarm. - - Args: - agents (List[Agent]): List of available agents - task (str): The task to route - image_url (str, optional): URL of an image input if needed. Defaults to None. - *args: Variable length argument list - **kwargs: Arbitrary keyword arguments - - Returns: - The output from the routed task execution - """ - logger.info("Routing task through swarm") - swarm_router_instance = SwarmRouter( - name=self.name, - description=self.description, - agents=agents, - swarm_type="auto", - max_loops=1, - ) - - return swarm_router_instance.run( - self.name + " " + self.description + " " + task, - ) - - -# if __name__ == "__main__": -# example = MemeAgentGenerator( -# name="Meme-Swarm", -# description="A swarm of specialized AI agents collaborating on generating and sharing memes around cool media from 2001s", -# max_loops=1, -# ) - -# print( -# example.run( -# "Generate funny meme agents around cool media from 2001s" -# ) -# ) diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index 3bab8211..58f0b77e 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -1,18 +1,18 @@ +import concurrent.futures import os +import uuid from typing import List, Optional - +from swarms.prompts.ag_prompt import AGGREGATOR_SYSTEM_PROMPT_MAIN from swarms.structs.agent import Agent -from swarms.prompts.ag_prompt import aggregator_system_prompt_main +from swarms.structs.conversation import Conversation from swarms.structs.ma_utils import list_all_agents +from swarms.structs.multi_agent_exec import run_agents_concurrently from swarms.utils.history_output_formatter import ( history_output_formatter, ) from swarms.utils.loguru_logger import initialize_logger -import concurrent.futures from swarms.utils.output_types import OutputType -from swarms.structs.conversation import Conversation - logger = initialize_logger(log_folder="mixture_of_agents") @@ -24,15 +24,16 @@ class MixtureOfAgents: def __init__( self, + id: str = str(uuid.uuid4()), name: str = "MixtureOfAgents", description: str = "A class to run a mixture of agents and aggregate their responses.", agents: List[Agent] = None, aggregator_agent: Agent = None, - aggregator_system_prompt: str = aggregator_system_prompt_main, + aggregator_system_prompt: str = AGGREGATOR_SYSTEM_PROMPT_MAIN, layers: int = 3, max_loops: int = 1, output_type: OutputType = "final", - aggregator_model_name: str = "claude-3-5-sonnet-20240620", + aggregator_model_name: str = "claude-sonnet-4-20250514", ) -> None: """ Initialize the Mixture of Agents class with agents and configuration. @@ -54,7 +55,6 @@ class MixtureOfAgents: self.max_loops = max_loops self.output_type = output_type self.aggregator_model_name = aggregator_model_name - self.aggregator_agent = self.aggregator_agent_setup() self.reliability_check() @@ -68,16 +68,8 @@ class MixtureOfAgents: add_to_conversation=True, ) - def aggregator_agent_setup(self): - return Agent( - agent_name="Aggregator Agent", - description="An agent that aggregates the responses of the other agents.", - system_prompt=aggregator_system_prompt_main, - model_name=self.aggregator_model_name, - temperature=0.5, - max_loops=1, - output_type="str-all-except-first", - ) + if self.aggregator_agent is None: + self.aggregator_agent = self.aggregator_agent_setup() def reliability_check(self) -> None: """ @@ -90,9 +82,6 @@ class MixtureOfAgents: if len(self.agents) == 0: raise ValueError("No agents provided.") - if not self.aggregator_agent: - raise ValueError("No aggregator agent provided.") - if not self.aggregator_system_prompt: raise ValueError("No aggregator system prompt provided.") @@ -102,77 +91,116 @@ class MixtureOfAgents: logger.info("Reliability check passed.") logger.info("Mixture of Agents class is ready for use.") - def save_to_markdown_file(self, file_path: str = "moa.md"): - with open(file_path, "w") as f: - f.write(self.conversation.get_str()) + def aggregator_agent_setup(self): + return Agent( + agent_name="Aggregator Agent", + agent_description="An agent that aggregates the responses of the other agents.", + system_prompt=self.aggregator_system_prompt, + model_name=self.aggregator_model_name, + temperature=0.5, + max_loops=1, + output_type="str-all-except-first", + dynamic_context_window=True, + ) def step( self, task: str, img: Optional[str] = None, - imgs: Optional[List[str]] = None, ): - # self.conversation.add(role="User", content=task) + # # Run agents concurrently + # with concurrent.futures.ThreadPoolExecutor( + # max_workers=os.cpu_count() + # ) as executor: + # # Submit all agent tasks and store with their index + # future_to_agent = { + # executor.submit( + # agent.run, task=task, img=img, imgs=imgs + # ): agent + # for agent in self.agents + # } + + # # Collect results and add to conversation in completion order + # for future in concurrent.futures.as_completed( + # future_to_agent + # ): + # agent = future_to_agent[future] + # output = future.result() + # self.conversation.add(role=agent.name, content=output) + agent_outputs = run_agents_concurrently( + agents=self.agents, + task=task, + img=img, + return_agent_output_dict=True, + ) - # Run agents concurrently - with concurrent.futures.ThreadPoolExecutor( - max_workers=os.cpu_count() - ) as executor: - # Submit all agent tasks and store with their index - future_to_agent = { - executor.submit( - agent.run, task=task, img=img, imgs=imgs - ): agent - for agent in self.agents - } - - # Collect results and add to conversation in completion order - for future in concurrent.futures.as_completed( - future_to_agent - ): - agent = future_to_agent[future] - output = future.result() - self.conversation.add(role=agent.name, content=output) - - return self.conversation.get_str() + return agent_outputs def _run( self, task: str, img: Optional[str] = None, - imgs: Optional[List[str]] = None, ): + # self.conversation.add(role="User", content=task) + + # for i in range(self.layers): + # out = self.step( + # task=self.conversation.get_str(), img=img, imgs=imgs + # ) + # task = out + + # out = self.aggregator_agent.run( + # task=self.conversation.get_str() + # ) + + # self.conversation.add( + # role=self.aggregator_agent.agent_name, content=out + # ) + + # out = history_output_formatter( + # conversation=self.conversation, type=self.output_type + # ) + + # return out + self.conversation.add(role="User", content=task) + full_context = self.conversation.get_str() + for i in range(self.layers): - out = self.step( - task=self.conversation.get_str(), img=img, imgs=imgs - ) - task = out + # Pass the full context/history string to the step method + step_output = self.step(task=full_context, img=img) + + # Log each agent's output with full context awareness + for agent_name, agent_output in step_output.items(): + self.conversation.add( + role=agent_name, content=agent_output + ) - out = self.aggregator_agent.run( + # Update the full_context with the latest conversation history + full_context = self.conversation.get_str() + + aggregator_output = self.aggregator_agent.run( task=self.conversation.get_str() ) self.conversation.add( - role=self.aggregator_agent.agent_name, content=out + role=self.aggregator_agent.agent_name, + content=aggregator_output, ) - out = history_output_formatter( + return history_output_formatter( conversation=self.conversation, type=self.output_type ) - return out - def run( self, task: str, img: Optional[str] = None, - imgs: Optional[List[str]] = None, ): try: - return self._run(task=task, img=img, imgs=imgs) + return self._run(task=task, img=img) except Exception as e: logger.error(f"Error running Mixture of Agents: {e}") return f"Error: {e}" diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 44918d03..ef1cc6b3 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -98,66 +98,104 @@ async def run_agents_concurrently_async( def run_agents_concurrently( - agents: List[AgentType], + agents: List["AgentType"], task: str, + img: Optional[str] = None, max_workers: Optional[int] = None, -) -> List[Any]: + return_agent_output_dict: bool = False, +) -> Any: """ - Run multiple agents concurrently using ThreadPoolExecutor for optimal performance. + Execute multiple agents concurrently using a ThreadPoolExecutor. - This function executes multiple agents concurrently using a thread pool executor, - which provides better performance than asyncio for CPU-bound tasks. It automatically - determines the optimal number of worker threads based on available CPU cores. + This function runs agent tasks in parallel threads, benefitting I/O-bound or mixed-load scenarios. + Each agent receives the same 'task' (and optional 'img' argument) and runs its .run() method. + The number of worker threads defaults to 95% of the available CPU cores, unless otherwise specified. Args: - agents (List[AgentType]): List of agent instances to run concurrently - task (str): The task string to be executed by all agents - max_workers (Optional[int]): Maximum number of threads in the executor. - Defaults to 95% of available CPU cores for optimal performance + agents (List[AgentType]): List of agent instances to execute concurrently. + task (str): Task string to pass to all agent run() methods. + img (Optional[str]): Optional image data to pass to agent run() if supported. + max_workers (Optional[int]): Maximum threads for the executor (default: 95% of CPU cores). + return_agent_output_dict (bool): If True, returns a dict mapping agent names to outputs. + Otherwise returns a list of results in completion order. Returns: - List[Any]: List of results from each agent. If an agent fails, the exception - is included in the results list instead of the result. - - Note: - - Uses 95% of CPU cores by default for optimal resource utilization - - Handles exceptions gracefully by including them in the results - - Results may not be in the same order as input agents due to concurrent execution + List[Any] or Dict[str, Any]: List of results from each agent's run() method in completion order, + or a dict of agent names to results (preserving agent order) + if return_agent_output_dict is True. + If an agent fails, the corresponding result is the Exception. + + Notes: + - ThreadPoolExecutor is used for efficient, parallel execution. + - By default, utilizes nearly all available CPU cores for optimal performance. + - Any Exception during agent execution is caught and included in the results. + - If return_agent_output_dict is True, the results dict preserves agent input order. + - Otherwise, the results list is in order of completion (not input order). Example: - >>> agents = [Agent1(), Agent2(), Agent3()] - >>> results = run_agents_concurrently(agents, "Process data") - >>> for i, result in enumerate(results): - ... if isinstance(result, Exception): - ... print(f"Agent {i+1} failed: {result}") - ... else: - ... print(f"Agent {i+1} result: {result}") + >>> agents = [Agent1(), Agent2()] + >>> # As list + >>> results = run_agents_concurrently(agents, task="Summarize", img=None) + >>> # As dict + >>> results_dict = run_agents_concurrently( + ... agents, task="Summarize", return_agent_output_dict=True) + >>> for name, val in results_dict.items(): + ... print(f"Result from {name}: {val}") """ - if max_workers is None: - # 95% of the available CPU cores - num_cores = os.cpu_count() - max_workers = int(num_cores * 0.95) if num_cores else 1 - - results = [] - - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) as executor: - # Submit all tasks and get futures - futures = [ - executor.submit(agent.run, task) for agent in agents - ] - - # Wait for all futures to complete and get results - for future in concurrent.futures.as_completed(futures): - try: - result = future.result() - results.append(result) - except Exception as e: - # Append the error if an agent fails - results.append(e) - - return results + try: + if max_workers is None: + num_cores = os.cpu_count() + max_workers = int(num_cores * 0.95) if num_cores else 1 + + futures = [] + agent_id_map = {} + + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + for agent in agents: + agent_kwargs = {} + if task is not None: + agent_kwargs["task"] = task + if img is not None: + agent_kwargs["img"] = img + future = executor.submit(agent.run, **agent_kwargs) + futures.append(future) + agent_id_map[future] = agent + + if return_agent_output_dict: + # Use agent name as key, preserve input order + output_dict = {} + for agent, future in zip(agents, futures): + try: + result = future.result() + except Exception as e: + result = e + # Prefer .agent_name or .name, fallback to str(agent) + name = ( + getattr(agent, "agent_name", None) + or getattr(agent, "name", None) + or str(agent) + ) + output_dict[name] = result + return output_dict + else: + results = [] + for future in concurrent.futures.as_completed( + futures + ): + try: + result = future.result() + results.append(result) + except Exception as e: + results.append(e) + return results + + except Exception as e: + logger.error( + f"Error running_agents_concurrently: {e} Traceback: {e.__traceback__}" + ) + raise e def run_agents_concurrently_multiprocess( diff --git a/swarms/structs/various_alt_swarms.py b/swarms/structs/various_alt_swarms.py index ab12b9ad..c4b34f9f 100644 --- a/swarms/structs/various_alt_swarms.py +++ b/swarms/structs/various_alt_swarms.py @@ -1,33 +1,51 @@ import math -from typing import List, Union, Dict +from typing import Dict, List, Union from loguru import logger from swarms.structs.agent import Agent -from swarms.structs.omni_agent_types import AgentListType from swarms.structs.conversation import Conversation +from swarms.structs.omni_agent_types import AgentListType +from swarms.utils.history_output_formatter import ( + history_output_formatter, +) # Base Swarm class that all other swarm types will inherit from class BaseSwarm: - def __init__(self, agents: AgentListType): + def __init__( + self, + agents: AgentListType, + name: str = "BaseSwarm", + description: str = "A base swarm implementation", + output_type: str = "dict", + ): + """ + Initialize the BaseSwarm with agents, name, description, and output type. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ # Ensure agents is a flat list of Agent objects self.agents = ( [agent for sublist in agents for agent in sublist] if isinstance(agents[0], list) else agents ) + self.name = name + self.description = description + self.output_type = output_type self.conversation = Conversation() - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -42,20 +60,11 @@ class BaseSwarm: "This method should be implemented by child classes" ) - def _format_return( - self, return_type: str - ) -> Union[Dict, List, str]: - """Format the return value based on the return_type""" - if return_type.lower() == "dict": - return self.conversation.return_messages_as_dictionary() - elif return_type.lower() == "list": - return self.conversation.return_messages_as_list() - elif return_type.lower() == "string": - return self.conversation.return_history_as_string() - else: - raise ValueError( - "return_type must be one of 'dict', 'list', or 'string'" - ) + def _format_return(self) -> Union[Dict, List, str]: + """Format the return value based on the output_type using history_output_formatter""" + return history_output_formatter( + self.conversation, self.output_type + ) class CircularSwarm(BaseSwarm): @@ -63,15 +72,30 @@ class CircularSwarm(BaseSwarm): Implements a circular swarm where agents pass tasks in a circular manner. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "CircularSwarm", + description: str = "A circular swarm where agents pass tasks in a circular manner", + output_type: str = "dict", + ): + """ + Initialize the CircularSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the circular swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -92,7 +116,7 @@ class CircularSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class LinearSwarm(BaseSwarm): @@ -100,15 +124,30 @@ class LinearSwarm(BaseSwarm): Implements a linear swarm where agents process tasks sequentially. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "LinearSwarm", + description: str = "A linear swarm where agents process tasks sequentially", + output_type: str = "dict", + ): + """ + Initialize the LinearSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the linear swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -131,7 +170,7 @@ class LinearSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class StarSwarm(BaseSwarm): @@ -139,15 +178,30 @@ class StarSwarm(BaseSwarm): Implements a star swarm where a central agent processes all tasks, followed by others. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "StarSwarm", + description: str = "A star swarm where a central agent processes all tasks, followed by others", + output_type: str = "dict", + ): + """ + Initialize the StarSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the star swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -178,7 +232,7 @@ class StarSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class MeshSwarm(BaseSwarm): @@ -186,15 +240,30 @@ class MeshSwarm(BaseSwarm): Implements a mesh swarm where agents work on tasks randomly from a task queue. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "MeshSwarm", + description: str = "A mesh swarm where agents work on tasks randomly from a task queue", + output_type: str = "dict", + ): + """ + Initialize the MeshSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the mesh swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -218,7 +287,7 @@ class MeshSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class PyramidSwarm(BaseSwarm): @@ -226,15 +295,30 @@ class PyramidSwarm(BaseSwarm): Implements a pyramid swarm where agents are arranged in a pyramid structure. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "PyramidSwarm", + description: str = "A pyramid swarm where agents are arranged in a pyramid structure", + output_type: str = "dict", + ): + """ + Initialize the PyramidSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the pyramid swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -264,7 +348,7 @@ class PyramidSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class FibonacciSwarm(BaseSwarm): @@ -272,15 +356,30 @@ class FibonacciSwarm(BaseSwarm): Implements a Fibonacci swarm where agents are arranged according to the Fibonacci sequence. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "FibonacciSwarm", + description: str = "A Fibonacci swarm where agents are arranged according to the Fibonacci sequence", + output_type: str = "dict", + ): + """ + Initialize the FibonacciSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Fibonacci swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -309,7 +408,7 @@ class FibonacciSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class PrimeSwarm(BaseSwarm): @@ -317,15 +416,30 @@ class PrimeSwarm(BaseSwarm): Implements a Prime swarm where agents at prime indices process tasks. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "PrimeSwarm", + description: str = "A Prime swarm where agents at prime indices process tasks", + output_type: str = "dict", + ): + """ + Initialize the PrimeSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Prime swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -376,7 +490,7 @@ class PrimeSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class PowerSwarm(BaseSwarm): @@ -384,15 +498,30 @@ class PowerSwarm(BaseSwarm): Implements a Power swarm where agents at power-of-2 indices process tasks. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "PowerSwarm", + description: str = "A Power swarm where agents at power-of-2 indices process tasks", + output_type: str = "dict", + ): + """ + Initialize the PowerSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Power swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -417,7 +546,7 @@ class PowerSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class LogSwarm(BaseSwarm): @@ -425,15 +554,30 @@ class LogSwarm(BaseSwarm): Implements a Log swarm where agents at logarithmic indices process tasks. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "LogSwarm", + description: str = "A Log swarm where agents at logarithmic indices process tasks", + output_type: str = "dict", + ): + """ + Initialize the LogSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Log swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -457,7 +601,7 @@ class LogSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class ExponentialSwarm(BaseSwarm): @@ -465,15 +609,30 @@ class ExponentialSwarm(BaseSwarm): Implements an Exponential swarm where agents at exponential indices process tasks. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "ExponentialSwarm", + description: str = "An Exponential swarm where agents at exponential indices process tasks", + output_type: str = "dict", + ): + """ + Initialize the ExponentialSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Exponential swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -497,7 +656,7 @@ class ExponentialSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class GeometricSwarm(BaseSwarm): @@ -505,15 +664,30 @@ class GeometricSwarm(BaseSwarm): Implements a Geometric swarm where agents at geometrically increasing indices process tasks. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "GeometricSwarm", + description: str = "A Geometric swarm where agents at geometrically increasing indices process tasks", + output_type: str = "dict", + ): + """ + Initialize the GeometricSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Geometric swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -538,7 +712,7 @@ class GeometricSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class HarmonicSwarm(BaseSwarm): @@ -546,15 +720,30 @@ class HarmonicSwarm(BaseSwarm): Implements a Harmonic swarm where agents at harmonically spaced indices process tasks. """ - def run( - self, tasks: List[str], return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "HarmonicSwarm", + description: str = "A Harmonic swarm where agents at harmonically spaced indices process tasks", + output_type: str = "dict", + ): + """ + Initialize the HarmonicSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, tasks: List[str]) -> Union[Dict, List, str]: """ Run the Harmonic swarm with the given tasks Args: tasks: List of tasks to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -580,7 +769,7 @@ class HarmonicSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class StaircaseSwarm(BaseSwarm): @@ -588,15 +777,30 @@ class StaircaseSwarm(BaseSwarm): Implements a Staircase swarm where agents at staircase-patterned indices process a task. """ - def run( - self, task: str, return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "StaircaseSwarm", + description: str = "A Staircase swarm where agents at staircase-patterned indices process a task", + output_type: str = "dict", + ): + """ + Initialize the StaircaseSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, task: str) -> Union[Dict, List, str]: """ Run the Staircase swarm with the given task Args: task: Task to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -617,7 +821,7 @@ class StaircaseSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class SigmoidSwarm(BaseSwarm): @@ -625,15 +829,30 @@ class SigmoidSwarm(BaseSwarm): Implements a Sigmoid swarm where agents at sigmoid-distributed indices process a task. """ - def run( - self, task: str, return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "SigmoidSwarm", + description: str = "A Sigmoid swarm where agents at sigmoid-distributed indices process a task", + output_type: str = "dict", + ): + """ + Initialize the SigmoidSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, task: str) -> Union[Dict, List, str]: """ Run the Sigmoid swarm with the given task Args: task: Task to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -653,7 +872,7 @@ class SigmoidSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() class SinusoidalSwarm(BaseSwarm): @@ -661,15 +880,30 @@ class SinusoidalSwarm(BaseSwarm): Implements a Sinusoidal swarm where agents at sinusoidally-distributed indices process a task. """ - def run( - self, task: str, return_type: str = "dict" - ) -> Union[Dict, List, str]: + def __init__( + self, + agents: AgentListType, + name: str = "SinusoidalSwarm", + description: str = "A Sinusoidal swarm where agents at sinusoidally-distributed indices process a task", + output_type: str = "dict", + ): + """ + Initialize the SinusoidalSwarm. + + Args: + agents: List of Agent objects or nested list of Agent objects + name: Name of the swarm + description: Description of the swarm's purpose + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ + super().__init__(agents, name, description, output_type) + + def run(self, task: str) -> Union[Dict, List, str]: """ Run the Sinusoidal swarm with the given task Args: task: Task to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -689,7 +923,7 @@ class SinusoidalSwarm(BaseSwarm): ) responses.append(response) - return self._format_return(return_type) + return self._format_return() # Communication classes @@ -698,13 +932,27 @@ class OneToOne: Facilitates one-to-one communication between two agents. """ - def __init__(self, sender: Agent, receiver: Agent): + def __init__( + self, + sender: Agent, + receiver: Agent, + output_type: str = "dict", + ): + """ + Initialize the OneToOne communication. + + Args: + sender: The sender agent + receiver: The receiver agent + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ self.sender = sender self.receiver = receiver + self.output_type = output_type self.conversation = Conversation() def run( - self, task: str, max_loops: int = 1, return_type: str = "dict" + self, task: str, max_loops: int = 1 ) -> Union[Dict, List, str]: """ Run the one-to-one communication with the given task @@ -712,7 +960,6 @@ class OneToOne: Args: task: Task to be processed max_loops: Number of exchange iterations - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -752,16 +999,9 @@ class OneToOne: ) raise error - if return_type.lower() == "dict": - return self.conversation.return_messages_as_dictionary() - elif return_type.lower() == "list": - return self.conversation.return_messages_as_list() - elif return_type.lower() == "string": - return self.conversation.return_history_as_string() - else: - raise ValueError( - "return_type must be one of 'dict', 'list', or 'string'" - ) + return history_output_formatter( + self.conversation, self.output_type + ) class Broadcast: @@ -769,24 +1009,35 @@ class Broadcast: Facilitates broadcasting from one agent to many agents. """ - def __init__(self, sender: Agent, receivers: AgentListType): + def __init__( + self, + sender: Agent, + receivers: AgentListType, + output_type: str = "dict", + ): + """ + Initialize the Broadcast communication. + + Args: + sender: The sender agent + receivers: List of receiver agents + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ self.sender = sender self.receivers = ( [agent for sublist in receivers for agent in sublist] if isinstance(receivers[0], list) else receivers ) + self.output_type = output_type self.conversation = Conversation() - def run( - self, task: str, return_type: str = "dict" - ) -> Union[Dict, List, str]: + def run(self, task: str) -> Union[Dict, List, str]: """ Run the broadcast communication with the given task Args: task: Task to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -812,18 +1063,9 @@ class Broadcast: content=response, ) - if return_type.lower() == "dict": - return ( - self.conversation.return_messages_as_dictionary() - ) - elif return_type.lower() == "list": - return self.conversation.return_messages_as_list() - elif return_type.lower() == "string": - return self.conversation.return_history_as_string() - else: - raise ValueError( - "return_type must be one of 'dict', 'list', or 'string'" - ) + return history_output_formatter( + self.conversation, self.output_type + ) except Exception as error: logger.error(f"Error during broadcast: {error}") @@ -835,7 +1077,20 @@ class OneToThree: Facilitates one-to-three communication from one agent to exactly three agents. """ - def __init__(self, sender: Agent, receivers: AgentListType): + def __init__( + self, + sender: Agent, + receivers: AgentListType, + output_type: str = "dict", + ): + """ + Initialize the OneToThree communication. + + Args: + sender: The sender agent + receivers: List of exactly three receiver agents + output_type: Type of output format, one of 'dict', 'list', 'string', 'json', 'yaml', 'xml', etc. + """ if len(receivers) != 3: raise ValueError( "The number of receivers must be exactly 3." @@ -843,17 +1098,15 @@ class OneToThree: self.sender = sender self.receivers = receivers + self.output_type = output_type self.conversation = Conversation() - def run( - self, task: str, return_type: str = "dict" - ) -> Union[Dict, List, str]: + def run(self, task: str) -> Union[Dict, List, str]: """ Run the one-to-three communication with the given task Args: task: Task to be processed - return_type: Type of return value, one of 'dict', 'list', or 'string' Returns: Union[Dict, List, str]: The conversation history in the requested format @@ -877,18 +1130,9 @@ class OneToThree: content=response, ) - if return_type.lower() == "dict": - return ( - self.conversation.return_messages_as_dictionary() - ) - elif return_type.lower() == "list": - return self.conversation.return_messages_as_list() - elif return_type.lower() == "string": - return self.conversation.return_history_as_string() - else: - raise ValueError( - "return_type must be one of 'dict', 'list', or 'string'" - ) + return history_output_formatter( + self.conversation, self.output_type + ) except Exception as error: logger.error(f"Error in one_to_three: {error}") diff --git a/swarms/utils/history_output_formatter.py b/swarms/utils/history_output_formatter.py index f7b86e29..245666e5 100644 --- a/swarms/utils/history_output_formatter.py +++ b/swarms/utils/history_output_formatter.py @@ -1,12 +1,39 @@ import yaml -from typing import Union, List, Dict, Any +from typing import Any from swarms.utils.xml_utils import to_xml_string from swarms.utils.output_types import HistoryOutputType def history_output_formatter( conversation: callable, type: HistoryOutputType = "list" -) -> Union[List[Dict[str, Any]], Dict[str, Any], str]: +) -> Any: + """ + Formats the output of a conversation object into various formats. + + Args: + conversation (callable): The conversation object that provides various output methods. + type (HistoryOutputType, optional): The desired output format. + Supported values: + - "list": Returns the conversation as a list of message dicts. + - "dict" or "dictionary": Returns the conversation as a dictionary. + - "string" or "str": Returns the conversation as a string. + - "final" or "last": Returns the content of the final message. + - "json": Returns the conversation as a JSON string. + - "all": Returns the conversation as a string (same as "string"). + - "yaml": Returns the conversation as a YAML string. + - "dict-all-except-first": Returns all messages except the first as a dictionary. + - "list-final": Returns the final message as a list. + - "str-all-except-first": Returns all messages except the first as a string. + - "dict-final": Returns the final message as a dictionary. + - "xml": Returns the conversation as an XML string. + Defaults to "list". + + Returns: + Union[List[Dict[str, Any]], Dict[str, Any], str]: The formatted conversation output. + + Raises: + ValueError: If an invalid type is provided. + """ if type == "list": return conversation.return_messages_as_list() elif type in ["dict", "dictionary"]: diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index 45cb72c6..469f2f3d 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -477,20 +477,30 @@ class LiteLLM: for standard tool call responses. """ if self.mcp_call is True: - out = response.choices[0].message.tool_calls[0].function - - if len(out) > 1: - return out + tool_calls = response.choices[0].message.tool_calls + + # Check if there are multiple tool calls + if len(tool_calls) > 1: + # Return all tool calls if there are multiple + return [ + { + "function": { + "name": tool_call.function.name, + "arguments": tool_call.function.arguments, + } + } + for tool_call in tool_calls + ] else: - out = out[0] - - output = { - "function": { - "name": out.name, - "arguments": out.arguments, + # Single tool call + out = tool_calls[0].function + output = { + "function": { + "name": out.name, + "arguments": out.arguments, + } } - } - return output + return output else: out = response.choices[0].message.tool_calls diff --git a/test_agent_concurrent.py b/test_agent_concurrent.py new file mode 100644 index 00000000..8479141f --- /dev/null +++ b/test_agent_concurrent.py @@ -0,0 +1,33 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.structs.multi_agent_exec import run_agents_concurrently + +# Initialize the equity analyst agents +equity_analyst_1 = Agent( + agent_name="Equity-Analyst-1", + agent_description="Equity research analyst focused on fundamental analysis", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, +) + +equity_analyst_2 = Agent( + agent_name="Equity-Analyst-2", + agent_description="Equity research analyst focused on technical analysis", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, +) + + +outputs = run_agents_concurrently( + agents=[equity_analyst_1, equity_analyst_2], + task="Analyze high growth tech stocks focusing on fundamentals like revenue growth, margins, and market position. Create a detailed analysis table in markdown.", + return_agent_output_dict=True, +) + +print(outputs) diff --git a/test_moa_new.py b/test_moa_new.py new file mode 100644 index 00000000..dd6a7f6d --- /dev/null +++ b/test_moa_new.py @@ -0,0 +1,50 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms.structs.mixture_of_agents import MixtureOfAgents +from swarms.prompts.moa_prompt import MOA_AGGREGATOR_SYSTEM_PROMPT + +# Initialize the equity analyst agents +equity_analyst_1 = Agent( + agent_name="Equity-Analyst-1", + agent_description="Equity research analyst focused on fundamental analysis", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, +) + +equity_analyst_2 = Agent( + agent_name="Equity-Analyst-2", + agent_description="Equity research analyst focused on technical analysis", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, +) + +equity_analyst_3 = Agent( + agent_name="Equity-Analyst-3", + agent_description="Equity research analyst focused on quantitative analysis and risk modeling", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4.1", + dynamic_temperature_enabled=True, +) + + +swarm = MixtureOfAgents( + name="Equity-Research-Swarm", + agents=[equity_analyst_1, equity_analyst_2, equity_analyst_3], + output_type="dict", + layers=1, + aggregator_system_prompt=MOA_AGGREGATOR_SYSTEM_PROMPT, +) + + +out = swarm.run( + task="Analyze Exchange-Traded Funds (ETFs) and stocks related to copper. Focus on fundamentals including supply/demand factors, production costs, major market participants, and recent price trends. Create a detailed analysis table in markdown.", +) + +print(out)