diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md index 8f729c8f..063cf11f 100644 --- a/docs/swarms/structs/agent.md +++ b/docs/swarms/structs/agent.md @@ -91,6 +91,7 @@ The `Agent` class establishes a conversational loop with a language model, allow | `callback` | `Optional[Callable]` | Callable function to be called after each agent loop. | | `metadata` | `Optional[Dict[str, Any]]` | Dictionary containing metadata for the agent. | | `callbacks` | `Optional[List[Callable]]` | List of callable functions to be called during execution. | +| `handoffs` | `Optional[Union[Sequence[Callable], Any]]` | List of Agent instances that can be delegated tasks to. When provided, the agent will use a MultiAgentRouter to intelligently route tasks to the most appropriate specialized agent. | | `search_algorithm` | `Optional[Callable]` | Callable function for long-term memory retrieval. | | `logs_to_filename` | `Optional[str]` | File path for logging agent activities. | | `evaluator` | `Optional[Callable]` | Callable function for evaluating the agent's responses. | @@ -239,6 +240,7 @@ The `Agent` class establishes a conversational loop with a language model, allow | `model_dump_yaml()` | Saves the agent model to a YAML file in the workspace directory. | None | `agent.model_dump_yaml()` | | `log_agent_data()` | Logs the agent's data to an external API. | None | `agent.log_agent_data()` | | `handle_tool_schema_ops()` | Handles operations related to tool schemas. | None | `agent.handle_tool_schema_ops()` | +| `handle_handoffs(task)` | Handles task delegation to specialized agents when handoffs are configured. | `task` (str): Task to be delegated to appropriate specialized agent. | `response = agent.handle_handoffs("Analyze market data")` | | `call_llm(task, *args, **kwargs)` | Calls the appropriate method on the language model. | `task` (str): Task for the LLM.
`*args`, `**kwargs`: Additional arguments. | `response = agent.call_llm("Generate text")` | | `handle_sop_ops()` | Handles operations related to standard operating procedures. | None | `agent.handle_sop_ops()` | | `agent_output_type(responses)` | Processes and returns the agent's output based on the specified output type. | `responses` (list): List of responses. | `formatted_output = agent.agent_output_type(responses)` | @@ -427,6 +429,100 @@ response = agent.run("What are the components of a startup's stock incentive equ print(response) ``` +### Agent Handoffs and Task Delegation + +The `Agent` class supports intelligent task delegation through the `handoffs` parameter. When provided with a list of specialized agents, the main agent acts as a router that analyzes incoming tasks and delegates them to the most appropriate specialized agent based on their capabilities and descriptions. + +#### How Handoffs Work + +1. **Task Analysis**: When a task is received, the main agent uses a built-in "boss agent" to analyze the task requirements +2. **Agent Selection**: The boss agent evaluates all available specialized agents and selects the most suitable one(s) based on their descriptions and capabilities +3. **Task Delegation**: The selected agent(s) receive the task (potentially modified for better execution) and process it +4. **Response Aggregation**: Results from specialized agents are collected and returned + +#### Key Features + +| Feature | Description | +|---------------------------|-----------------------------------------------------------------------------------------------| +| **Intelligent Routing** | Uses AI to determine the best agent for each task | +| **Multiple Agent Support**| Can delegate to multiple agents for complex tasks requiring different expertise | +| **Task Modification** | Can modify tasks to better suit the selected agent's capabilities | +| **Transparent Reasoning** | Provides clear explanations for agent selection decisions | +| **Seamless Integration** | Works transparently with the existing `run()` method | + +#### Basic Handoff Example + +```python +from swarms import Agent + +# Create specialized agents +risk_metrics_agent = Agent( + agent_name="Risk-Metrics-Calculator", + agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility", + system_prompt="""You are a risk metrics specialist. Calculate and explain: + - Value at Risk (VaR) + - Sharpe ratio + - Volatility + - Maximum drawdown + - Beta coefficient + + Provide clear, numerical results with brief explanations.""", + max_loops=1, + model_name="gpt-4o-mini", + dynamic_temperature_enabled=True, +) + +market_risk_agent = Agent( + agent_name="Market-Risk-Monitor", + agent_description="Monitors market conditions and identifies risk factors", + system_prompt="""You are a market risk monitor. Identify and assess: + - Market volatility trends + - Economic risk factors + - Geopolitical risks + - Interest rate risks + - Currency risks + + Provide current risk alerts and trends.""", + max_loops=1, + dynamic_temperature_enabled=True, +) + +# Create main agent with handoffs +portfolio_risk_agent = Agent( + agent_name="Portfolio-Risk-Analyzer", + agent_description="Analyzes portfolio diversification and concentration risk", + system_prompt="""You are a portfolio risk analyst. Focus on: + - Portfolio diversification analysis + - Concentration risk assessment + - Correlation analysis + - Sector/asset allocation risk + - Liquidity risk evaluation + + Provide actionable insights for risk reduction.""", + max_loops=1, + dynamic_temperature_enabled=True, + handoffs=[ + risk_metrics_agent, + market_risk_agent, + ], +) + +# Run task - will be automatically delegated to appropriate agent +response = portfolio_risk_agent.run( + "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility" +) +print(response) +``` + + +#### Use Cases + +- **Financial Analysis**: Route different types of financial analysis to specialized agents (risk, valuation, market analysis) +- **Software Development**: Delegate coding, testing, documentation, and code review to different agents +- **Research Projects**: Route research tasks to domain-specific agents +- **Customer Support**: Delegate different types of inquiries to specialized support agents +- **Content Creation**: Route writing, editing, and fact-checking to different content specialists + ### Interactive Mode To enable interactive mode, set the `interactive` parameter to `True` when initializing the `Agent`: @@ -1014,5 +1110,6 @@ The `run` method now supports several new parameters for advanced functionality: | `mcp_url` or `mcp_urls` | Use mcp_url or mcp_urls to extend agent capabilities with external tools. | | `react_on` | Enable react_on for complex reasoning tasks requiring step-by-step analysis. | | `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. | +| `handoffs` | Use handoffs to create specialized agent teams that can intelligently route tasks based on complexity and expertise requirements. | By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications. \ No newline at end of file diff --git a/examples/multi_agent/mar/multi_agent_router_example.py b/examples/multi_agent/mar/multi_agent_router_example.py index 9113ada4..e6bdae29 100644 --- a/examples/multi_agent/mar/multi_agent_router_example.py +++ b/examples/multi_agent/mar/multi_agent_router_example.py @@ -1,42 +1,31 @@ -from swarms import Agent, MultiAgentRouter +from swarms.structs.agent import Agent +from swarms.structs.multi_agent_router import MultiAgentRouter # Example usage: -if __name__ == "__main__": - # Define some example agents - agents = [ - Agent( - agent_name="ResearchAgent", - description="Specializes in researching topics and providing detailed, factual information", - system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.", - model_name="openai/gpt-4o", - ), - Agent( - agent_name="CodeExpertAgent", - description="Expert in writing, reviewing, and explaining code across multiple programming languages", - system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.", - model_name="openai/gpt-4o", - ), - Agent( - agent_name="WritingAgent", - description="Skilled in creative and technical writing, content creation, and editing", - system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.", - model_name="openai/gpt-4o", - ), - ] +agents = [ + Agent( + agent_name="ResearchAgent", + agent_description="Specializes in researching topics and providing detailed, factual information", + system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.", + ), + Agent( + agent_name="CodeExpertAgent", + agent_description="Expert in writing, reviewing, and explaining code across multiple programming languages", + system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.", + ), + Agent( + agent_name="WritingAgent", + agent_description="Skilled in creative and technical writing, content creation, and editing", + system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.", + ), +] - # Initialize routers with different configurations - router_execute = MultiAgentRouter( - agents=agents, execute_task=True - ) +# Initialize routers with different configurations +router_execute = MultiAgentRouter( + agents=agents, temperature=0.5, model="claude-sonnet-4-20250514" +) - # Example task - task = "Write a Python function to calculate fibonacci numbers" - - try: - # Process the task with execution - print("\nWith task execution:") - result_execute = router_execute.route_task(task) - print(result_execute) - - except Exception as e: - print(f"Error occurred: {str(e)}") +# Example task: Remake the Fibonacci task +task = "Use all the agents available to you to remake the Fibonacci function in Python, providing both an explanation and code." +result_execute = router_execute.run(task) +print(result_execute) diff --git a/examples/multi_agent/mixture_of_agents_example.py b/examples/multi_agent/moa_examples/mixture_of_agents_example.py similarity index 100% rename from examples/multi_agent/mixture_of_agents_example.py rename to examples/multi_agent/moa_examples/mixture_of_agents_example.py diff --git a/examples/multi_agent/swarmarrange/rearrange_test.py b/examples/multi_agent/swarmarrange/rearrange_test.py deleted file mode 100644 index de020cc7..00000000 --- a/examples/multi_agent/swarmarrange/rearrange_test.py +++ /dev/null @@ -1,120 +0,0 @@ -import os - -from swarms import Agent, AgentRearrange - -from swarm_models import OpenAIChat - -# Get the OpenAI API key from the environment variable -api_key = os.getenv("OPENAI_API_KEY") - -# Create an instance of the OpenAIChat class -model = OpenAIChat( - api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 -) - - -# Initialize the boss agent (Director) -boss_agent = Agent( - agent_name="BossAgent", - system_prompt=""" - You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses. - Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently. - After receiving a report on the company's expenses, you will break down the work into smaller tasks, - assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures, - and identifying unnecessary transactions. Ensure the results are communicated back in a structured way - so the finance team can take actionable steps to cut off unproductive spending. You also monitor and - dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings - into a coherent report. - """, - llm=model, - max_loops=1, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - state_save_file_type="json", - saved_state_path="boss_agent.json", -) - -# Initialize worker 1: Expense Analyzer -worker1 = Agent( - agent_name="ExpenseAnalyzer", - system_prompt=""" - Your task is to carefully analyze the company's expense data provided to you. - You will focus on identifying high-cost recurring transactions, categorizing expenditures - (e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending. - You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting. - Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures. - """, - llm=model, - max_loops=1, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - state_save_file_type="json", - saved_state_path="worker1.json", -) - -# Initialize worker 2: Summary Generator -worker2 = Agent( - agent_name="SummaryGenerator", - system_prompt=""" - After receiving the detailed breakdown from the ExpenseAnalyzer, - your task is to create a concise summary of the findings. You will focus on the most actionable insights, - such as highlighting the specific transactions that can be immediately cut off and summarizing the areas - where the company is overspending. Your summary will be used by the BossAgent to generate the final report. - Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses. - """, - llm=model, - max_loops=1, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - state_save_file_type="json", - saved_state_path="worker2.json", -) - -# Swarm-Level Prompt (Collaboration Prompt) -swarm_prompt = """ - As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off. - You will work collaboratively to break down the entire process of expense analysis into manageable steps. - The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first - focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them, - and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then - consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses. - Together, your collaboration is essential to streamlining and improving the company’s financial health. -""" - -# Create a list of agents -agents = [boss_agent, worker1, worker2] - -# Define the flow pattern for the swarm -flow = "BossAgent -> ExpenseAnalyzer -> SummaryGenerator" - -# Using AgentRearrange class to manage the swarm -agent_system = AgentRearrange( - name="pe-swarm", - description="ss", - agents=agents, - flow=flow, - return_json=False, - output_type="final", - max_loops=1, -) - -# Input task for the swarm -task = f""" - - {swarm_prompt} - - The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed - analysis of recent transactions to identify which expenses can be cut off to improve profitability. - Analyze the provided transaction data and create a detailed report on cost-cutting opportunities, - focusing on recurring transactions and non-essential expenditures. -""" - -# Run the swarm system with the task -output = agent_system.run(task) -print(output) diff --git a/examples/single_agent/tools/omni_modal_agent.py b/examples/single_agent/tools/omni_modal_agent.py index a075535d..242a1171 100644 --- a/examples/single_agent/tools/omni_modal_agent.py +++ b/examples/single_agent/tools/omni_modal_agent.py @@ -1,7 +1,8 @@ -from typing import Dict, Union, Any import os -import requests from enum import Enum +from typing import Any, Dict, Union + +import requests from dotenv import load_dotenv load_dotenv() diff --git a/examples/single_agent/tools/swarms_tools_example.py b/examples/single_agent/tools/swarms_tools_example.py index 9aec628f..c381d5ea 100644 --- a/examples/single_agent/tools/swarms_tools_example.py +++ b/examples/single_agent/tools/swarms_tools_example.py @@ -1,20 +1,14 @@ from swarms import Agent -from swarms.prompts.finance_agent_sys_prompt import ( - FINANCIAL_AGENT_SYS_PROMPT, -) -from swarms_tools import yahoo_finance_api +from swarms_tools.finance.okx_tool import okx_api_tool # Initialize the agent agent = Agent( agent_name="Financial-Analysis-Agent", agent_description="Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, max_loops=1, model_name="gpt-4o-mini", - tools=[yahoo_finance_api], + tools=[okx_api_tool], dynamic_temperature_enabled=True, ) -agent.run( - "Fetch the data for nvidia and tesla both with the yahoo finance api" -) +agent.run("fetch the current price of bitcoin with okx") diff --git a/examples/single_agent/utils/handoffs_example.py b/examples/single_agent/utils/handoffs_example.py new file mode 100644 index 00000000..345d69fa --- /dev/null +++ b/examples/single_agent/utils/handoffs_example.py @@ -0,0 +1,63 @@ +from swarms.structs.agent import Agent + +# Agent 1: Risk Metrics Calculator +risk_metrics_agent = Agent( + agent_name="Risk-Metrics-Calculator", + agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility", + system_prompt="""You are a risk metrics specialist. Calculate and explain: + - Value at Risk (VaR) + - Sharpe ratio + - Volatility + - Maximum drawdown + - Beta coefficient + + Provide clear, numerical results with brief explanations.""", + max_loops=1, + model_name="gpt-4o-mini", + dynamic_temperature_enabled=True, +) + + +# Agent 3: Market Risk Monitor +market_risk_agent = Agent( + agent_name="Market-Risk-Monitor", + agent_description="Monitors market conditions and identifies risk factors", + system_prompt="""You are a market risk monitor. Identify and assess: + - Market volatility trends + - Economic risk factors + - Geopolitical risks + - Interest rate risks + - Currency risks + + Provide current risk alerts and trends.""", + max_loops=1, + dynamic_temperature_enabled=True, +) + + +# Agent 2: Portfolio Risk Analyzer +portfolio_risk_agent = Agent( + agent_name="Portfolio-Risk-Analyzer", + agent_description="Analyzes portfolio diversification and concentration risk", + system_prompt="""You are a portfolio risk analyst. Focus on: + - Portfolio diversification analysis + - Concentration risk assessment + - Correlation analysis + - Sector/asset allocation risk + - Liquidity risk evaluation + + Provide actionable insights for risk reduction.""", + max_loops=1, + dynamic_temperature_enabled=True, + handoffs=[ + risk_metrics_agent, + market_risk_agent, + ], +) + + +out = portfolio_risk_agent.run( + "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility using the risk metrics agent and market risk agent" +) + +print(out) diff --git a/pyproject.toml b/pyproject.toml index 66f08be6..67fb80b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "8.2.2" +version = "8.3.0" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/rearrange_test.py b/rearrange_test.py new file mode 100644 index 00000000..de8c9e52 --- /dev/null +++ b/rearrange_test.py @@ -0,0 +1,86 @@ +from swarms import Agent, AgentRearrange + +# Define all prompts as strings before agent initializations +boss_agent_prompt = """ +You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses. +Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently. +After receiving a report on the company's expenses, you will break down the work into smaller tasks, +assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures, +and identifying unnecessary transactions. Ensure the results are communicated back in a structured way +so the finance team can take actionable steps to cut off unproductive spending. You also monitor and +dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings +into a coherent report. +""" + +expense_analyzer_prompt = """ +Your task is to carefully analyze the company's expense data provided to you. +You will focus on identifying high-cost recurring transactions, categorizing expenditures +(e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending. +You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting. +Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures. +""" + +summary_generator_prompt = """ +After receiving the detailed breakdown from the ExpenseAnalyzer, +your task is to create a concise summary of the findings. You will focus on the most actionable insights, +such as highlighting the specific transactions that can be immediately cut off and summarizing the areas +where the company is overspending. Your summary will be used by the BossAgent to generate the final report. +Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses. +""" + +# Swarm-Level Prompt (Collaboration Prompt) +swarm_prompt = """ +As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off. +You will work collaboratively to break down the entire process of expense analysis into manageable steps. +The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first +focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them, +and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then +consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses. +Together, your collaboration is essential to streamlining and improving the company's financial health. +""" + +# Initialize the boss agent (Director) +boss_agent = Agent( + agent_name="BossAgent", + system_prompt=boss_agent_prompt, + max_loops=1, +) + +# Initialize worker 1: Expense Analyzer +worker1 = Agent( + agent_name="ExpenseAnalyzer", + system_prompt=expense_analyzer_prompt, + max_loops=1, + dashboard=False, +) + +# Initialize worker 2: Summary Generator +worker2 = Agent( + agent_name="SummaryGenerator", + system_prompt=summary_generator_prompt, + max_loops=1, +) + +# Create a list of agents +agents = [boss_agent, worker1, worker2] + +# Define the flow pattern for the swarm +flow = "BossAgent -> ExpenseAnalyzer, SummaryGenerator" + +# Using AgentRearrange class to manage the swarm +agent_system = AgentRearrange(agents=agents, flow=flow) + +# Input task for the swarm +task = f""" + + {swarm_prompt} + + The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed + analysis of recent transactions to identify which expenses can be cut off to improve profitability. + Analyze the provided transaction data and create a detailed report on cost-cutting opportunities, + focusing on recurring transactions and non-essential expenditures. +""" + +# Run the swarm system with the task +output = agent_system.run(task) +print(output) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 8155399f..945d55a4 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -17,6 +17,7 @@ from typing import ( Optional, Tuple, Union, + Sequence, ) import toml @@ -94,6 +95,7 @@ from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.output_types import OutputType from swarms.utils.pdf_to_text import pdf_to_text +from swarms.structs.multi_agent_router import MultiAgentRouter def stop_when_repeats(response: str) -> bool: @@ -456,6 +458,8 @@ class Agent: drop_params: bool = True, thinking_tokens: int = None, reasoning_enabled: bool = False, + handoffs: Optional[Union[Sequence[Callable], Any]] = None, + capabilities: Optional[List[str]] = None, *args, **kwargs, ): @@ -479,19 +483,6 @@ class Agent: self.user_name = user_name self.context_length = context_length - # Initialize transforms - if transforms is None: - self.transforms = None - elif isinstance(transforms, TransformConfig): - self.transforms = MessageTransforms(transforms) - elif isinstance(transforms, dict): - config = TransformConfig(**transforms) - self.transforms = MessageTransforms(config) - else: - raise ValueError( - "transforms must be a TransformConfig object or a dictionary" - ) - self.sop = sop self.sop_list = sop_list self.tools = tools @@ -617,6 +608,20 @@ class Agent: self.thinking_tokens = thinking_tokens self.reasoning_enabled = reasoning_enabled self.fallback_model_name = fallback_model_name + self.handoffs = handoffs + self.capabilities = capabilities + + # Initialize transforms + if transforms is None: + self.transforms = None + elif isinstance(transforms, TransformConfig): + self.transforms = MessageTransforms(transforms) + elif isinstance(transforms, dict): + config = TransformConfig(**transforms) + self.transforms = MessageTransforms(config) + else: + pass + self.fallback_models = fallback_models or [] self.current_model_index = 0 self.model_attempts = {} @@ -674,6 +679,18 @@ class Agent: self.reliability_check() + def handle_handoffs(self, task: Optional[str] = None): + router = MultiAgentRouter( + name=self.agent_name, + description=self.agent_description, + agents=self.handoffs, + model=self.model_name, + temperature=self.temperature, + output_type=self.output_type, + ) + + return router.run(task=task) + def setup_tools(self): return BaseTool( tools=self.tools, @@ -2647,6 +2664,8 @@ class Agent: *args, **kwargs, ) + elif exists(self.handoffs): + output = self.handle_handoffs(task=task) else: output = self._run( task=task, diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index a4f4944f..d3016de4 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -4,6 +4,8 @@ from typing import Any, Callable, Dict, List, Optional, Union from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation +from swarms.structs.multi_agent_exec import run_agents_concurrently +from swarms.structs.swarm_id import swarm_id from swarms.telemetry.main import log_agent_data from swarms.utils.any_to_str import any_to_str from swarms.utils.history_output_formatter import ( @@ -11,12 +13,75 @@ from swarms.utils.history_output_formatter import ( ) from swarms.utils.loguru_logger import initialize_logger from swarms.utils.output_types import OutputType -from swarms.structs.swarm_id import swarm_id logger = initialize_logger(log_folder="rearrange") class AgentRearrange: + """ + A sophisticated multi-agent system for task rearrangement and orchestration. + + The AgentRearrange class enables complex workflows where multiple agents can work + sequentially or concurrently based on a defined flow pattern. It supports both + sequential execution (using '->') and concurrent execution (using ',') within + the same workflow. + + Key Features: + - Sequential and concurrent agent execution + - Custom flow patterns with arrow (->) and comma (,) syntax + - Team awareness and sequential flow information + - Human-in-the-loop integration + - Memory system support + - Batch and concurrent processing capabilities + - Comprehensive error handling and logging + + Flow Syntax: + - Use '->' to define sequential execution: "agent1 -> agent2 -> agent3" + - Use ',' to define concurrent execution: "agent1, agent2 -> agent3" + - Combine both: "agent1 -> agent2, agent3 -> agent4" + - Use 'H' for human-in-the-loop: "agent1 -> H -> agent2" + + Attributes: + id (str): Unique identifier for the agent rearrange system + name (str): Human-readable name for the system + description (str): Description of the system's purpose + agents (Dict[str, Agent]): Dictionary mapping agent names to Agent objects + flow (str): Flow pattern defining agent execution order + max_loops (int): Maximum number of execution loops + verbose (bool): Whether to enable verbose logging + memory_system (Any): Optional memory system for persistence + human_in_the_loop (bool): Whether to enable human interaction + custom_human_in_the_loop (Callable): Custom human interaction handler + output_type (OutputType): Format for output results + autosave (bool): Whether to automatically save execution data + rules (str): System rules and constraints + team_awareness (bool): Whether agents are aware of team structure + time_enabled (bool): Whether to track timestamps + message_id_on (bool): Whether to include message IDs + conversation (Conversation): Conversation history management + + Example: + >>> from swarms import Agent, AgentRearrange + >>> + >>> # Create agents + >>> agent1 = Agent(name="researcher", ...) + >>> agent2 = Agent(name="writer", ...) + >>> agent3 = Agent(name="reviewer", ...) + >>> + >>> # Define flow: agent1 runs first, then agent2 and agent3 run concurrently + >>> flow = "researcher -> writer, reviewer" + >>> + >>> # Create rearrange system + >>> rearrange_system = AgentRearrange( + ... agents=[agent1, agent2, agent3], + ... flow=flow, + ... max_loops=1, + ... team_awareness=True + ... ) + >>> + >>> # Execute task + >>> result = rearrange_system.run("Research and write a report") + """ def __init__( self, @@ -38,9 +103,55 @@ class AgentRearrange: team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, - *args, - **kwargs, ): + """ + Initialize the AgentRearrange system. + + Args: + id (str): Unique identifier for the agent rearrange system. + Defaults to a generated swarm ID. + name (str): Human-readable name for the system. + Defaults to "AgentRearrange". + description (str): Description of the system's purpose. + Defaults to "A swarm of agents for rearranging tasks.". + agents (List[Union[Agent, Callable]], optional): List of agents to include + in the system. Can be Agent objects or callable functions. + Defaults to None. + flow (str, optional): Flow pattern defining agent execution order. + Uses '->' for sequential and ',' for concurrent execution. + Defaults to None. + max_loops (int): Maximum number of execution loops. Must be > 0. + Defaults to 1. + verbose (bool): Whether to enable verbose logging. + Defaults to True. + memory_system (Any, optional): Optional memory system for persistence. + Defaults to None. + human_in_the_loop (bool): Whether to enable human interaction points. + Defaults to False. + custom_human_in_the_loop (Callable[[str], str], optional): Custom function + for handling human interaction. Takes input string, returns response. + Defaults to None. + output_type (OutputType): Format for output results. Can be "all", "final", + "list", or "dict". Defaults to "all". + autosave (bool): Whether to automatically save execution data. + Defaults to True. + rules (str, optional): System rules and constraints to add to conversation. + Defaults to None. + team_awareness (bool): Whether agents should be aware of team structure + and sequential flow. Defaults to False. + time_enabled (bool): Whether to track timestamps in conversations. + Defaults to False. + message_id_on (bool): Whether to include message IDs in conversations. + Defaults to False. + + Raises: + ValueError: If agents list is None or empty, max_loops is 0, + flow is None or empty, or output_type is None or empty. + + Note: + The agents parameter is converted to a dictionary mapping agent names + to Agent objects for efficient lookup during execution. + """ self.name = name self.description = description self.id = id @@ -80,6 +191,23 @@ class AgentRearrange: self.reliability_check() def reliability_check(self): + """ + Validates the configuration parameters to ensure the system can run properly. + + Performs comprehensive validation checks on critical parameters including + agents list, max_loops, flow pattern, and output_type to prevent runtime errors. + + Raises: + ValueError: If any of the following conditions are met: + - agents list is None or empty + - max_loops is 0 + - flow is None or empty string + - output_type is None or empty string + + Note: + This method is called automatically during initialization to ensure + the system is properly configured before execution. + """ if self.agents is None or len(self.agents) == 0: raise ValueError("Agents list cannot be None or empty") @@ -93,6 +221,24 @@ class AgentRearrange: raise ValueError("output_type cannot be None or empty") def set_custom_flow(self, flow: str): + """ + Sets a custom flow pattern for agent execution. + + Allows dynamic modification of the execution flow after initialization. + The flow pattern defines how agents should be executed in sequence or + parallel using the standard syntax ('->' for sequential, ',' for concurrent). + + Args: + flow (str): The new flow pattern to use for agent execution. + Must follow the syntax: "agent1 -> agent2, agent3 -> agent4" + + Note: + The flow will be validated on the next execution. If invalid, + a ValueError will be raised during the run() method. + + Example: + >>> rearrange_system.set_custom_flow("researcher -> writer, editor") + """ self.flow = flow logger.info(f"Custom flow set: {flow}") @@ -111,6 +257,20 @@ class AgentRearrange: agent_name: str, result: str, ): + """ + Tracks the execution history for a specific agent. + + Records the result of an agent's execution in the swarm history + for later analysis or debugging purposes. + + Args: + agent_name (str): The name of the agent whose result to track. + result (str): The result/output from the agent's execution. + + Note: + This method is typically called internally during agent execution + to maintain a complete history of all agent activities. + """ self.swarm_history[agent_name].append(result) def remove_agent(self, agent_name: str): @@ -306,6 +466,120 @@ class AgentRearrange: """ return self._get_sequential_flow_info() + def _run_concurrent_workflow( + self, + agent_names: List[str], + img: str = None, + *args, + **kwargs, + ) -> Dict[str, str]: + """ + Executes agents concurrently when comma is detected in the flow. + + This method handles the parallel execution of multiple agents when they + are separated by commas in the flow pattern. All specified agents run + simultaneously and their results are collected and returned. + + Args: + agent_names (List[str]): List of agent names to run concurrently. + These agents will execute in parallel. + img (str, optional): Image input for agents that support it. + Defaults to None. + *args: Additional positional arguments passed to agent execution. + **kwargs: Additional keyword arguments passed to agent execution. + + Returns: + Dict[str, str]: Dictionary mapping agent names to their execution results. + Keys are agent names, values are their respective outputs. + + Note: + This method uses the run_agents_concurrently utility function + to handle the actual parallel execution and result collection. + """ + logger.info(f"Running agents in parallel: {agent_names}") + + # Get agent objects for concurrent execution + agents_to_run = [ + self.agents[agent_name] for agent_name in agent_names + ] + + # Run agents concurrently + results = run_agents_concurrently( + agents=agents_to_run, + task=self.conversation.get_str(), + ) + + # Process results and update conversation + response_dict = {} + for i, agent_name in enumerate(agent_names): + result = results[i] + + # print(f"Result: {result}") + + self.conversation.add(agent_name, result) + response_dict[agent_name] = result + logger.debug(f"Agent {agent_name} output: {result}") + + return response_dict + + def _run_sequential_workflow( + self, + agent_name: str, + tasks: List[str], + img: str = None, + *args, + **kwargs, + ) -> str: + """ + Executes a single agent sequentially. + + This method handles the sequential execution of a single agent in the flow. + It provides sequential awareness information to the agent if team_awareness + is enabled, allowing the agent to understand its position in the workflow. + + Args: + agent_name (str): Name of the agent to run sequentially. + tasks (List[str]): List of all tasks in the flow for awareness context. + Used to determine the agent's position and provide awareness info. + img (str, optional): Image input for agents that support it. + Defaults to None. + *args: Additional positional arguments passed to agent execution. + **kwargs: Additional keyword arguments passed to agent execution. + + Returns: + str: The result from the agent's execution, converted to string format. + + Note: + If team_awareness is enabled, this method will add sequential awareness + information to the conversation before executing the agent, informing + the agent about its position in the workflow sequence. + """ + logger.info(f"Running agent sequentially: {agent_name}") + + agent = self.agents[agent_name] + + # Add sequential awareness information for the agent + awareness_info = self._get_sequential_awareness( + agent_name, tasks + ) + if awareness_info: + self.conversation.add("system", awareness_info) + logger.info( + f"Added sequential awareness for {agent_name}: {awareness_info}" + ) + + current_task = agent.run( + task=self.conversation.get_str(), + img=img, + *args, + **kwargs, + ) + current_task = any_to_str(current_task) + + self.conversation.add(agent.agent_name, current_task) + + return current_task + def _run( self, task: str = None, @@ -315,27 +589,39 @@ class AgentRearrange: **kwargs, ): """ - Runs the swarm to rearrange the tasks. + Runs the swarm to rearrange the tasks according to the defined flow. + + This is the core execution method that orchestrates the entire workflow. + It processes the flow pattern, executes agents sequentially or concurrently + as specified, and returns the results in the requested format. Args: - task (str, optional): The initial task to be processed. Defaults to None. - img (str, optional): Image input for agents that support it. Defaults to None. - custom_tasks (Dict[str, str], optional): Custom tasks for specific agents. Defaults to None. - output_type (str, optional): Format of the output. Can be: + task (str, optional): The initial task to be processed by the swarm. + This is added to the conversation history. Defaults to None. + img (str, optional): Image input for agents that support it. + Defaults to None. + custom_tasks (Dict[str, str], optional): Custom tasks for specific agents. + Allows overriding the main task for specific agents in the flow. + Defaults to None. + *args: Additional positional arguments passed to agent execution. + **kwargs: Additional keyword arguments passed to agent execution. + + Returns: + Union[str, List[str], Dict[str, str]]: The processed output in the format + specified by output_type: - "all": String containing all agent responses concatenated - "final": Only the final agent's response - "list": List of all agent responses - "dict": Dict mapping agent names to their responses - Defaults to "final". - *args: Additional positional arguments - **kwargs: Additional keyword arguments - - Returns: - Union[str, List[str], Dict[str, str]]: The processed output in the specified format Raises: - ValueError: If flow validation fails - Exception: For any other errors during execution + ValueError: If flow validation fails or configuration is invalid. + Exception: For any other errors during execution. + + Note: + This method handles both sequential and concurrent execution patterns + based on the flow syntax. It also supports custom task injection + and multiple execution loops as configured. """ try: self.conversation.add("User", task) @@ -345,14 +631,13 @@ class AgentRearrange: return "Invalid flow configuration." tasks = self.flow.split("->") - current_task = task response_dict = {} logger.info( f"Starting task execution with {len(tasks)} steps" ) - # # Handle custom tasks + # Handle custom tasks if custom_tasks is not None: logger.info("Processing custom tasks") c_agent_name, c_task = next( @@ -377,68 +662,28 @@ class AgentRearrange: ] if len(agent_names) > 1: - # Parallel processing - logger.info( - f"Running agents in parallel: {agent_names}" - ) - - for agent_name in agent_names: - agent = self.agents[agent_name] - result = agent.run( - task=self.conversation.get_str(), + # Concurrent processing - comma detected + concurrent_results = ( + self._run_concurrent_workflow( + agent_names=agent_names, img=img, *args, **kwargs, ) - result = any_to_str(result) - - self.conversation.add( - agent.agent_name, result - ) - - response_dict[agent_name] = result - logger.debug( - f"Agent {agent_name} output: {result}" - ) - - ",".join(agent_names) + ) + response_dict.update(concurrent_results) else: # Sequential processing - logger.info( - f"Running agent sequentially: {agent_names[0]}" - ) agent_name = agent_names[0] - - agent = self.agents[agent_name] - - # Add sequential awareness information for the agent - awareness_info = ( - self._get_sequential_awareness( - agent_name, tasks - ) - ) - if awareness_info: - self.conversation.add( - "system", awareness_info - ) - logger.info( - f"Added sequential awareness for {agent_name}: {awareness_info}" - ) - - current_task = agent.run( - task=self.conversation.get_str(), + result = self._run_sequential_workflow( + agent_name=agent_name, + tasks=tasks, img=img, *args, **kwargs, ) - current_task = any_to_str(current_task) - - self.conversation.add( - agent.agent_name, current_task - ) - - response_dict[agent_name] = current_task + response_dict[agent_name] = result loop_count += 1 @@ -453,11 +698,28 @@ class AgentRearrange: self._catch_error(e) def _catch_error(self, e: Exception): + """ + Handles errors that occur during swarm execution. + + Provides comprehensive error handling including logging, data persistence, + and error reporting. This method is called whenever an exception occurs + during the execution of the swarm. + + Args: + e (Exception): The exception that occurred during execution. + + Returns: + Exception: The original exception for potential re-raising. + + Note: + If autosave is enabled, the current state of the swarm will be + automatically saved to the logging system before error reporting. + """ if self.autosave is True: log_agent_data(self.to_dict()) logger.error( - f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}" + f"AgentRearrange: Id: {self.id}, Name: {self.name}. An error occurred with your agent '{self.name}': Error: {e}. Traceback: {e.__traceback__}" ) return e @@ -470,16 +732,28 @@ class AgentRearrange: **kwargs, ): """ - Execute the agent rearrangement task with specified compute resources. + Execute the agent rearrangement task with comprehensive logging and error handling. + + This is the main public method for executing tasks through the agent rearrange + system. It provides telemetry logging, error handling, and delegates to the + internal _run method for actual execution. Args: - task (str, optional): The task to execute. Defaults to None. - img (str, optional): Path to input image if required. Defaults to None. - *args: Additional positional arguments passed to _run(). - **kwargs: Additional keyword arguments passed to _run(). + task (str, optional): The task to execute through the agent workflow. + Defaults to None. + img (str, optional): Path to input image if required by any agents. + Defaults to None. + *args: Additional positional arguments passed to the internal _run() method. + **kwargs: Additional keyword arguments passed to the internal _run() method. Returns: - The result from executing the task through the cluster operations wrapper. + The result from executing the task through the agent rearrange system. + The format depends on the configured output_type. + + Note: + This method automatically logs agent data before and after execution + for telemetry and debugging purposes. Any exceptions are caught and + handled by the _catch_error method. """ try: log_agent_data(self.to_dict()) @@ -502,13 +776,20 @@ class AgentRearrange: """ Make the class callable by executing the run() method. + Enables the AgentRearrange instance to be called directly as a function, + providing a convenient interface for task execution. + Args: - task (str): The task to execute. + task (str): The task to execute through the agent workflow. *args: Additional positional arguments passed to run(). **kwargs: Additional keyword arguments passed to run(). Returns: - The result from executing run(). + The result from executing the task through the agent rearrange system. + + Example: + >>> rearrange_system = AgentRearrange(agents=[agent1, agent2], flow="agent1 -> agent2") + >>> result = rearrange_system("Process this data") """ try: return self.run(task=task, *args, **kwargs) @@ -525,19 +806,29 @@ class AgentRearrange: **kwargs, ) -> List[str]: """ - Process multiple tasks in batches. + Process multiple tasks in batches for efficient execution. + + This method allows processing multiple tasks by dividing them into + smaller batches and processing each batch sequentially. This is useful + for managing memory usage and resource allocation when dealing with + large numbers of tasks. Args: - tasks: List of tasks to process - img: Optional list of images corresponding to tasks - batch_size: Number of tasks to process simultaneously - device: Computing device to use - device_id: Specific device ID if applicable - all_cores: Whether to use all CPU cores - all_gpus: Whether to use all available GPUs + tasks (List[str]): List of tasks to process through the agent workflow. + img (Optional[List[str]]): Optional list of images corresponding to tasks. + Must be the same length as tasks list. Defaults to None. + batch_size (int): Number of tasks to process simultaneously in each batch. + Defaults to 10. + *args: Additional positional arguments passed to individual task execution. + **kwargs: Additional keyword arguments passed to individual task execution. Returns: - List of results corresponding to input tasks + List[str]: List of results corresponding to input tasks in the same order. + + Note: + This method processes tasks in batches to manage resource usage. + Each batch is processed sequentially, but individual tasks within + a batch may run concurrently depending on the flow configuration. """ try: results = [] @@ -576,17 +867,27 @@ class AgentRearrange: """ Process multiple tasks concurrently using ThreadPoolExecutor. + This method enables true parallel processing of multiple tasks by using + Python's ThreadPoolExecutor to run tasks simultaneously across multiple + threads. This is ideal for I/O-bound tasks or when you want maximum + parallelization. + Args: - tasks: List of tasks to process - img: Optional list of images corresponding to tasks - max_workers: Maximum number of worker threads - device: Computing device to use - device_id: Specific device ID if applicable - all_cores: Whether to use all CPU cores - all_gpus: Whether to use all available GPUs + tasks (List[str]): List of tasks to process through the agent workflow. + img (Optional[List[str]]): Optional list of images corresponding to tasks. + Must be the same length as tasks list. Defaults to None. + max_workers (Optional[int]): Maximum number of worker threads to use. + If None, uses the default ThreadPoolExecutor behavior. Defaults to None. + *args: Additional positional arguments passed to individual task execution. + **kwargs: Additional keyword arguments passed to individual task execution. Returns: - List of results corresponding to input tasks + List[str]: List of results corresponding to input tasks in the same order. + + Note: + This method uses ThreadPoolExecutor for true parallel execution. + The number of concurrent executions is limited by max_workers parameter. + Each task runs independently through the full agent workflow. """ try: with ThreadPoolExecutor( @@ -613,11 +914,19 @@ class AgentRearrange: """ Serializes callable attributes by extracting their name and docstring. + This helper method handles the serialization of callable objects (functions, + methods, etc.) by extracting their metadata for storage or logging purposes. + Args: - attr_value (Callable): The callable to serialize. + attr_value (Callable): The callable object to serialize. Returns: - Dict[str, Any]: Dictionary with name and docstring of the callable. + Dict[str, Any]: Dictionary containing the callable's name and docstring. + Keys are "name" and "doc", values are the corresponding attributes. + + Note: + This method is used internally by to_dict() to handle non-serializable + callable attributes in a graceful manner. """ return { "name": getattr( @@ -630,12 +939,22 @@ class AgentRearrange: """ Serializes an individual attribute, handling non-serializable objects. + This helper method attempts to serialize individual attributes for storage + or logging. It handles different types of objects including callables, + objects with to_dict methods, and basic serializable types. + Args: - attr_name (str): The name of the attribute. - attr_value (Any): The value of the attribute. + attr_name (str): The name of the attribute being serialized. + attr_value (Any): The value of the attribute to serialize. Returns: - Any: The serialized value of the attribute. + Any: The serialized value of the attribute. For non-serializable objects, + returns a string representation indicating the object type. + + Note: + This method is used internally by to_dict() to handle various types + of attributes in a robust manner, ensuring the serialization process + doesn't fail on complex objects. """ try: if callable(attr_value): @@ -655,10 +974,21 @@ class AgentRearrange: def to_dict(self) -> Dict[str, Any]: """ Converts all attributes of the class, including callables, into a dictionary. - Handles non-serializable attributes by converting them or skipping them. + + This method provides a comprehensive serialization of the AgentRearrange + instance, converting all attributes into a dictionary format suitable for + storage, logging, or transmission. It handles complex objects gracefully + by using helper methods for serialization. Returns: - Dict[str, Any]: A dictionary representation of the class attributes. + Dict[str, Any]: A dictionary representation of all class attributes. + Non-serializable objects are converted to string representations + or serialized using their to_dict method if available. + + Note: + This method is used for telemetry logging and state persistence. + It recursively handles nested objects and provides fallback handling + for objects that cannot be directly serialized. """ return { attr_name: self._serialize_attr(attr_name, attr_value) @@ -677,23 +1007,45 @@ def rearrange( **kwargs, ): """ - Rearranges the given list of agents based on the specified flow. + Convenience function to create and execute an AgentRearrange system in one call. + + This function provides a simplified interface for creating an AgentRearrange + instance and immediately executing a task with it. It's useful for quick + prototyping or when you don't need to reuse the rearrange system. Parameters: - agents (List[Agent]): The list of agents to be rearranged. - flow (str): The flow used for rearranging the agents. - task (str, optional): The task to be performed during rearrangement. Defaults to None. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. + name (str, optional): Name for the agent rearrange system. + Defaults to None (uses AgentRearrange default). + description (str, optional): Description of the system. + Defaults to None (uses AgentRearrange default). + agents (List[Agent]): The list of agents to be included in the system. + flow (str): The flow pattern defining agent execution order. + Uses '->' for sequential and ',' for concurrent execution. + task (str, optional): The task to be performed during rearrangement. + Defaults to None. + img (str, optional): Image input for agents that support it. + Defaults to None. + *args: Additional positional arguments passed to AgentRearrange constructor. + **kwargs: Additional keyword arguments passed to AgentRearrange constructor. Returns: The result of running the agent system with the specified task. + The format depends on the output_type configuration. Example: - agents = [agent1, agent2, agent3] - flow = "agent1 -> agent2, agent3" - task = "Perform a task" - rearrange(agents, flow, task) + >>> from swarms import Agent, rearrange + >>> + >>> # Create agents + >>> agent1 = Agent(name="researcher", ...) + >>> agent2 = Agent(name="writer", ...) + >>> agent3 = Agent(name="reviewer", ...) + >>> + >>> # Execute task with flow + >>> result = rearrange( + ... agents=[agent1, agent2, agent3], + ... flow="researcher -> writer, reviewer", + ... task="Research and write a report" + ... ) """ agent_system = AgentRearrange( name=name, diff --git a/swarms/structs/multi_agent_router.py b/swarms/structs/multi_agent_router.py index ca719603..f724c53b 100644 --- a/swarms/structs/multi_agent_router.py +++ b/swarms/structs/multi_agent_router.py @@ -1,31 +1,92 @@ -import os -from typing import List, Optional +import concurrent.futures +import traceback +from concurrent.futures import ThreadPoolExecutor +from typing import Callable, List, Optional +import orjson from loguru import logger from pydantic import BaseModel, Field -from swarms.utils.function_caller_model import OpenAIFunctionCaller + from swarms.structs.conversation import Conversation -from swarms.utils.output_types import OutputType -from swarms.utils.any_to_str import any_to_str +from swarms.tools.base_tool import BaseTool +from swarms.utils.formatter import formatter +from swarms.utils.generate_keys import generate_api_key from swarms.utils.history_output_formatter import ( history_output_formatter, ) -from swarms.utils.formatter import formatter -from swarms.structs.omni_agent_types import AgentListType +from swarms.utils.litellm_wrapper import LiteLLM +from swarms.utils.output_types import OutputType -class AgentResponse(BaseModel): - """Response from the boss agent indicating which agent should handle the task""" +class HandOffsResponse(BaseModel): + """ + Response from the boss agent indicating which agent should handle the task. + + This model encapsulates the reasoning behind the agent selection, the name of the selected agent, and an optional modified version of the task. It is used to communicate the boss agent's decision and rationale to the rest of the system. + """ - selected_agent: str = Field( - description="Name of the agent selected to handle the task" - ) reasoning: str = Field( - description="Explanation for why this agent was selected" + description="A detailed explanation for why this agent was selected. This should include the logic or criteria used by the boss agent to make the selection, providing transparency and traceability for the routing decision." ) - modified_task: Optional[str] = Field( - None, description="Optional modified version of the task" + agent_name: str = Field( + description="The name of the agent selected to handle the task. This should match the identifier of one of the available agents in the system, ensuring the task is routed correctly." ) + task: Optional[str] = Field( + None, + description="An optional, modified version of the original task. If the boss agent determines that the task should be rephrased or clarified before execution, the new version is provided here. If no modification is needed, this field may be left as None.", + ) + + +class MultipleHandOffsResponse(BaseModel): + """ + Response from the boss agent indicating which agents should handle the task. + """ + + handoffs: List[HandOffsResponse] = Field( + description="A list of handoffs, each containing the reasoning, agent name, and task for each agent." + ) + + +def get_agent_response_schema(model_name: str = None): + return BaseTool().base_model_to_dict(MultipleHandOffsResponse) + + +def agent_boss_router_prompt(agent_descriptions: any): + return f""" + You are an intelligent boss agent responsible for routing tasks to the most appropriate specialized agents. + + Available agents: + {agent_descriptions} + + Your primary responsibilities: + 1. **Understand the user's intent and task requirements** - Carefully analyze what the user is asking for + 2. **Determine task complexity** - Identify if this is a single task or multiple related tasks + 3. **Match capabilities to requirements** - Select the most suitable agent(s) based on their descriptions and expertise + 4. **Provide clear reasoning** - Explain your selection logic transparently + 5. **Optimize task assignment** - Modify tasks if needed to better suit the selected agent's capabilities + + **Routing Logic:** + - **Single Task**: If the user presents one clear task, select the single best agent for that task + - **Multiple Tasks**: If the user presents multiple distinct tasks or a complex task that requires different expertise areas, select multiple agents and break down the work accordingly + - **Collaborative Tasks**: If a task benefits from multiple perspectives or requires different skill sets, assign it to multiple agents + + **Response Format:** + You must respond with JSON containing a "handoffs" array with one or more handoff objects. Each handoff object must contain: + - reasoning: Detailed explanation of why this agent was selected and how they fit the task requirements + - agent_name: Name of the chosen agent (must exactly match one of the available agents) + - task: A modified/optimized version of the task if needed, or the original task if no modification is required + + **Important Guidelines:** + - Always analyze the user's intent first before making routing decisions + - Consider task dependencies and whether agents need to work sequentially or in parallel + - If multiple agents are selected, ensure their tasks are complementary and don't conflict + - Provide specific, actionable reasoning for each agent selection + - Ensure all agent names exactly match the available agents list + - If a single agent can handle the entire request efficiently, use only one agent + - If the request requires different expertise areas, use multiple agents with clearly defined, non-overlapping tasks + + Remember: Your goal is to maximize task completion quality by matching the right agent(s) to the right work based on their capabilities and the user's actual needs. + """ class MultiAgentRouter: @@ -38,23 +99,32 @@ class MultiAgentRouter: name (str): The name of the router. description (str): A description of the router's purpose. agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object. - api_key (str): The API key for OpenAI. - output_type (str): The type of output expected from the agents. Can be either "json" or "string". - execute_task (bool): A flag indicating whether the task should be executed by the selected agent. - boss_system_prompt (str): A system prompt for the boss agent that includes information about all available agents. - function_caller (OpenAIFunctionCaller): An instance of OpenAIFunctionCaller for calling the boss agent. + model (str): The model to use for the boss agent. + temperature (float): The temperature for the boss agent's model. + shared_memory_system (callable): A shared memory system for agents to query. + output_type (OutputType): The type of output expected from the agents. + print_on (bool): Whether to print the boss agent's decision. + system_prompt (str): Custom system prompt for the router. + skip_null_tasks (bool): Whether to skip executing agents when their assigned task is null or None. + conversation (Conversation): The conversation history. + function_caller (LiteLLM): An instance of LiteLLM for calling the boss agent. """ def __init__( self, + id: str = generate_api_key(prefix="multi-agent-router"), name: str = "swarm-router", description: str = "Routes tasks to specialized agents based on their capabilities", - agents: AgentListType = None, + agents: List[Callable] = None, model: str = "gpt-4o-mini", temperature: float = 0.1, shared_memory_system: callable = None, output_type: OutputType = "dict", - if_print: bool = True, + print_on: bool = True, + system_prompt: str = None, + skip_null_tasks: bool = True, + *args, + **kwargs, ): """ Initializes the MultiAgentRouter with a list of agents and configuration options. @@ -63,10 +133,13 @@ class MultiAgentRouter: name (str, optional): The name of the router. Defaults to "swarm-router". description (str, optional): A description of the router's purpose. Defaults to "Routes tasks to specialized agents based on their capabilities". agents (List[Agent], optional): A list of agents to be managed by the router. Defaults to an empty list. - model (str, optional): The model to use for the boss agent. Defaults to "gpt-4-0125-preview". + model (str, optional): The model to use for the boss agent. Defaults to "gpt-4o-mini". temperature (float, optional): The temperature for the boss agent's model. Defaults to 0.1. - output_type (Literal["json", "string"], optional): The type of output expected from the agents. Defaults to "json". - execute_task (bool, optional): A flag indicating whether the task should be executed by the selected agent. Defaults to True. + shared_memory_system (callable, optional): A shared memory system for agents to query. Defaults to None. + output_type (OutputType, optional): The type of output expected from the agents. Defaults to "dict". + print_on (bool, optional): Whether to print the boss agent's decision. Defaults to True. + system_prompt (str, optional): Custom system prompt for the router. Defaults to None. + skip_null_tasks (bool, optional): Whether to skip executing agents when their assigned task is null or None. Defaults to True. """ self.name = name self.description = description @@ -74,23 +147,28 @@ class MultiAgentRouter: self.output_type = output_type self.model = model self.temperature = temperature - self.if_print = if_print - self.agents = {agent.name: agent for agent in agents} + self.print_on = print_on + self.system_prompt = system_prompt + self.skip_null_tasks = skip_null_tasks + self.agents = {agent.agent_name: agent for agent in agents} self.conversation = Conversation() - self.api_key = os.getenv("OPENAI_API_KEY") - - if not self.api_key: - raise ValueError("OpenAI API key must be provided") + router_system_prompt = "" - self.boss_system_prompt = self._create_boss_system_prompt() - - # Initialize the function caller - self.function_caller = OpenAIFunctionCaller( - system_prompt=self.boss_system_prompt, - api_key=self.api_key, - temperature=temperature, - base_model=AgentResponse, + router_system_prompt += ( + self.system_prompt if self.system_prompt else "" + ) + router_system_prompt += self._create_boss_system_prompt() + + self.function_caller = LiteLLM( + model_name=self.model, + system_prompt=router_system_prompt, + temperature=self.temperature, + tool_choice="auto", + parallel_tool_calls=True, + response_format=MultipleHandOffsResponse, + *args, + **kwargs, ) def __repr__(self): @@ -114,24 +192,110 @@ class MultiAgentRouter: ] ) - return f"""You are a boss agent responsible for routing tasks to the most appropriate specialized agent. - Available agents: - {agent_descriptions} + return agent_boss_router_prompt(agent_descriptions) + + def handle_single_handoff( + self, boss_response_str: dict, task: str + ) -> dict: + """ + Handles a single handoff to one agent. + + If skip_null_tasks is True and the assigned task is null or None, + the agent execution will be skipped. + """ - Your job is to: - 1. Analyze the incoming task - 2. Select the most appropriate agent based on their descriptions - 3. Provide clear reasoning for your selection - 4. Optionally modify the task to better suit the selected agent's capabilities + # Validate that the selected agent exists + if ( + boss_response_str["handoffs"][0]["agent_name"] + not in self.agents + ): + raise ValueError( + f"Boss selected unknown agent: {boss_response_str.agent_name}" + ) + + # Get the selected agent + selected_agent = self.agents[ + boss_response_str["handoffs"][0]["agent_name"] + ] + + # Use the modified task if provided, otherwise use original task + final_task = boss_response_str["handoffs"][0]["task"] or task + + # Skip execution if task is null/None and skip_null_tasks is True + if self.skip_null_tasks and ( + final_task is None or final_task == "" + ): + if self.print_on: + logger.info( + f"Skipping execution for agent {selected_agent.agent_name} - task is null/None" + ) - You must respond with JSON that contains: - - selected_agent: Name of the chosen agent (must be one of the available agents) - - reasoning: Brief explanation of why this agent was selected - - modified_task: (Optional) A modified version of the task if needed + # Use the agent's run method directly + agent_response = selected_agent.run(final_task) + + self.conversation.add( + role=selected_agent.agent_name, content=agent_response + ) - Always select exactly one agent that best matches the task requirements. + # return agent_response + + def handle_multiple_handoffs( + self, boss_response_str: dict, task: str + ) -> dict: + """ + Handles multiple handoffs to multiple agents. + + If skip_null_tasks is True and any assigned task is null or None, + those agents will be skipped and only agents with valid tasks will be executed. """ + # Validate that the selected agents exist + for handoff in boss_response_str["handoffs"]: + if handoff["agent_name"] not in self.agents: + raise ValueError( + f"Boss selected unknown agent: {handoff.agent_name}" + ) + + # Get the selected agents and their tasks + selected_agents = [] + final_tasks = [] + skipped_agents = [] + + for handoff in boss_response_str["handoffs"]: + agent = self.agents[handoff["agent_name"]] + final_task = handoff["task"] or task + + # Skip execution if task is null/None and skip_null_tasks is True + if self.skip_null_tasks and ( + final_task is None or final_task == "" + ): + if self.print_on: + logger.info( + f"Skipping execution for agent {agent.agent_name} - task is null/None" + ) + skipped_agents.append(agent.agent_name) + continue + + selected_agents.append(agent) + final_tasks.append(final_task) + + # Execute agents only if there are valid tasks + if selected_agents: + # Use the agents' run method directly + agent_responses = [ + agent.run(final_task) + for agent, final_task in zip( + selected_agents, final_tasks + ) + ] + + self.conversation.add( + role=selected_agents[0].agent_name, + content=agent_responses[0], + ) + + # return agent_responses + def route_task(self, task: str) -> dict: """ Routes a task to the appropriate agent and returns their response. @@ -146,46 +310,32 @@ class MultiAgentRouter: self.conversation.add(role="user", content=task) # Get boss decision using function calling - boss_response = self.function_caller.run(task) - boss_response_str = any_to_str(boss_response) + boss_response_str = self.function_caller.run(task) - if self.if_print: - formatter.print_panel( - boss_response_str, - title="Multi-Agent Router Decision", - ) - else: - pass + boss_response_str = orjson.loads(boss_response_str) - self.conversation.add( - role="Agent Router", content=boss_response_str - ) - - # Validate that the selected agent exists - if boss_response.selected_agent not in self.agents: - raise ValueError( - f"Boss selected unknown agent: {boss_response.selected_agent}" + if self.print_on: + formatter.print_panel( + # orjson.dumps(boss_response_str, indent=4), + orjson.dumps( + boss_response_str, option=orjson.OPT_INDENT_2 + ).decode("utf-8"), + title=self.name, ) - # Get the selected agent - selected_agent = self.agents[boss_response.selected_agent] - - # Use the modified task if provided, otherwise use original task - final_task = boss_response.modified_task or task - - # Use the agent's run method directly - agent_response = selected_agent.run(final_task) - - self.conversation.add( - role=selected_agent.name, content=agent_response - ) + if len(boss_response_str["handoffs"]) > 1: + self.handle_multiple_handoffs(boss_response_str, task) + else: + self.handle_single_handoff(boss_response_str, task) return history_output_formatter( conversation=self.conversation, type=self.output_type ) except Exception as e: - logger.error(f"Error routing task: {str(e)}") + logger.error( + f"Error routing task: {str(e)} Traceback: {traceback.format_exc()}" + ) raise def run(self, task: str): @@ -209,9 +359,6 @@ class MultiAgentRouter: def concurrent_batch_run(self, tasks: List[str] = []): """Concurrently route tasks to the appropriate agents""" - import concurrent.futures - from concurrent.futures import ThreadPoolExecutor - results = [] with ThreadPoolExecutor() as executor: futures = [ diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index d053f565..162ae698 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -9,7 +9,6 @@ import litellm import requests from litellm import completion, supports_vision from loguru import logger -from pydantic import BaseModel class LiteLLMException(Exception): @@ -235,6 +234,7 @@ class LiteLLM: drop_params: bool = True, thinking_tokens: int = None, reasoning_enabled: bool = False, + response_format: any = None, *args, **kwargs, ): @@ -292,6 +292,7 @@ class LiteLLM: self.thinking_tokens = thinking_tokens self.reasoning_enabled = reasoning_enabled self.verbose = verbose + self.response_format = response_format self.modalities = [] self.messages = [] # Initialize messages list @@ -394,46 +395,73 @@ class LiteLLM: completion_params["runtime_args"] = runtime_args def output_for_tools(self, response: any): - if self.mcp_call is True: + """ + Process tool calls from the LLM response and return formatted output. + + Args: + response: The response object from the LLM API call + + Returns: + dict or list: Formatted tool call data, or default response if no tool calls + """ + try: + # Convert response to dict if it's a Pydantic model + if hasattr(response, "model_dump"): + response_dict = response.model_dump() + else: + response_dict = response + # Check if tool_calls exists and is not None if ( - response.choices - and response.choices[0].message - and response.choices[0].message.tool_calls - and len(response.choices[0].message.tool_calls) > 0 - ): - out = ( - response.choices[0].message.tool_calls[0].function + response_dict.get("choices") + and response_dict["choices"][0].get("message") + and response_dict["choices"][0]["message"].get( + "tool_calls" ) - output = { - "function": { - "name": out.name, - "arguments": out.arguments, + and len( + response_dict["choices"][0]["message"][ + "tool_calls" + ] + ) + > 0 + ): + tool_call = response_dict["choices"][0]["message"][ + "tool_calls" + ][0] + if "function" in tool_call: + return { + "function": { + "name": tool_call["function"].get( + "name", "" + ), + "arguments": tool_call["function"].get( + "arguments", "{}" + ), + } } - } - return output + else: + # Handle case where tool_call structure is different + return tool_call else: # Return a default response when no tool calls are present + logger.warning( + "No tool calls found in response, returning default response" + ) return { "function": { "name": "no_tool_call", "arguments": "{}", } } - else: - # Check if tool_calls exists and is not None - if ( - response.choices - and response.choices[0].message - and response.choices[0].message.tool_calls - ): - out = response.choices[0].message.tool_calls - if isinstance(out, BaseModel): - out = out.model_dump() - return out - else: - # Return empty list when no tool calls are present - return [] + except Exception as e: + logger.error(f"Error processing tool calls: {str(e)}") + # Return a safe default response + return { + "function": { + "name": "error_processing_tool_calls", + "arguments": f'{{"error": "{str(e)}"}}', + } + } def output_for_reasoning(self, response: any): """ @@ -886,6 +914,11 @@ class LiteLLM: if self.base_url is not None: completion_params["base_url"] = self.base_url + if self.response_format is not None: + completion_params["response_format"] = ( + self.response_format + ) + # Add modalities if needed if self.modalities and len(self.modalities) >= 2: completion_params["modalities"] = self.modalities @@ -914,6 +947,14 @@ class LiteLLM: # Make the completion call response = completion(**completion_params) + # print(response) + + # Validate response + if not response: + logger.error( + "Received empty response from completion call" + ) + return None # Handle streaming response if self.stream: @@ -928,25 +969,19 @@ class LiteLLM: # Handle tool-based response elif self.tools_list_dictionary is not None: - return self.output_for_tools(response) + result = self.output_for_tools(response) + return result elif self.return_all is True: return response.model_dump() elif "gemini" in self.model_name.lower(): return gemini_output_img_handler(response) else: - # For non-Gemini models, return the content directly return response.choices[0].message.content except LiteLLMException as error: logger.error( f"Error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}" ) - if "rate_limit" in str(error).lower(): - logger.warning( - "Rate limit hit, retrying with exponential backoff..." - ) - return self.run(task, audio, img, *args, **kwargs) - raise error def __call__(self, task: str, *args, **kwargs): """