[IMPROVEMENT][AgentRearrange][Integrate concurrent agent execution in agent rearrange through the comma type] [AgentRearrange][Docs]

pull/1088/head
Kye Gomez 3 weeks ago
parent 0813594074
commit db8437f52c

@ -91,6 +91,7 @@ The `Agent` class establishes a conversational loop with a language model, allow
| `callback` | `Optional[Callable]` | Callable function to be called after each agent loop. | | `callback` | `Optional[Callable]` | Callable function to be called after each agent loop. |
| `metadata` | `Optional[Dict[str, Any]]` | Dictionary containing metadata for the agent. | | `metadata` | `Optional[Dict[str, Any]]` | Dictionary containing metadata for the agent. |
| `callbacks` | `Optional[List[Callable]]` | List of callable functions to be called during execution. | | `callbacks` | `Optional[List[Callable]]` | List of callable functions to be called during execution. |
| `handoffs` | `Optional[Union[Sequence[Callable], Any]]` | List of Agent instances that can be delegated tasks to. When provided, the agent will use a MultiAgentRouter to intelligently route tasks to the most appropriate specialized agent. |
| `search_algorithm` | `Optional[Callable]` | Callable function for long-term memory retrieval. | | `search_algorithm` | `Optional[Callable]` | Callable function for long-term memory retrieval. |
| `logs_to_filename` | `Optional[str]` | File path for logging agent activities. | | `logs_to_filename` | `Optional[str]` | File path for logging agent activities. |
| `evaluator` | `Optional[Callable]` | Callable function for evaluating the agent's responses. | | `evaluator` | `Optional[Callable]` | Callable function for evaluating the agent's responses. |
@ -239,6 +240,7 @@ The `Agent` class establishes a conversational loop with a language model, allow
| `model_dump_yaml()` | Saves the agent model to a YAML file in the workspace directory. | None | `agent.model_dump_yaml()` | | `model_dump_yaml()` | Saves the agent model to a YAML file in the workspace directory. | None | `agent.model_dump_yaml()` |
| `log_agent_data()` | Logs the agent's data to an external API. | None | `agent.log_agent_data()` | | `log_agent_data()` | Logs the agent's data to an external API. | None | `agent.log_agent_data()` |
| `handle_tool_schema_ops()` | Handles operations related to tool schemas. | None | `agent.handle_tool_schema_ops()` | | `handle_tool_schema_ops()` | Handles operations related to tool schemas. | None | `agent.handle_tool_schema_ops()` |
| `handle_handoffs(task)` | Handles task delegation to specialized agents when handoffs are configured. | `task` (str): Task to be delegated to appropriate specialized agent. | `response = agent.handle_handoffs("Analyze market data")` |
| `call_llm(task, *args, **kwargs)` | Calls the appropriate method on the language model. | `task` (str): Task for the LLM.<br>`*args`, `**kwargs`: Additional arguments. | `response = agent.call_llm("Generate text")` | | `call_llm(task, *args, **kwargs)` | Calls the appropriate method on the language model. | `task` (str): Task for the LLM.<br>`*args`, `**kwargs`: Additional arguments. | `response = agent.call_llm("Generate text")` |
| `handle_sop_ops()` | Handles operations related to standard operating procedures. | None | `agent.handle_sop_ops()` | | `handle_sop_ops()` | Handles operations related to standard operating procedures. | None | `agent.handle_sop_ops()` |
| `agent_output_type(responses)` | Processes and returns the agent's output based on the specified output type. | `responses` (list): List of responses. | `formatted_output = agent.agent_output_type(responses)` | | `agent_output_type(responses)` | Processes and returns the agent's output based on the specified output type. | `responses` (list): List of responses. | `formatted_output = agent.agent_output_type(responses)` |
@ -427,6 +429,100 @@ response = agent.run("What are the components of a startup's stock incentive equ
print(response) print(response)
``` ```
### Agent Handoffs and Task Delegation
The `Agent` class supports intelligent task delegation through the `handoffs` parameter. When provided with a list of specialized agents, the main agent acts as a router that analyzes incoming tasks and delegates them to the most appropriate specialized agent based on their capabilities and descriptions.
#### How Handoffs Work
1. **Task Analysis**: When a task is received, the main agent uses a built-in "boss agent" to analyze the task requirements
2. **Agent Selection**: The boss agent evaluates all available specialized agents and selects the most suitable one(s) based on their descriptions and capabilities
3. **Task Delegation**: The selected agent(s) receive the task (potentially modified for better execution) and process it
4. **Response Aggregation**: Results from specialized agents are collected and returned
#### Key Features
| Feature | Description |
|---------------------------|-----------------------------------------------------------------------------------------------|
| **Intelligent Routing** | Uses AI to determine the best agent for each task |
| **Multiple Agent Support**| Can delegate to multiple agents for complex tasks requiring different expertise |
| **Task Modification** | Can modify tasks to better suit the selected agent's capabilities |
| **Transparent Reasoning** | Provides clear explanations for agent selection decisions |
| **Seamless Integration** | Works transparently with the existing `run()` method |
#### Basic Handoff Example
```python
from swarms import Agent
# Create specialized agents
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
)
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
dynamic_temperature_enabled=True,
)
# Create main agent with handoffs
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
dynamic_temperature_enabled=True,
handoffs=[
risk_metrics_agent,
market_risk_agent,
],
)
# Run task - will be automatically delegated to appropriate agent
response = portfolio_risk_agent.run(
"Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
)
print(response)
```
#### Use Cases
- **Financial Analysis**: Route different types of financial analysis to specialized agents (risk, valuation, market analysis)
- **Software Development**: Delegate coding, testing, documentation, and code review to different agents
- **Research Projects**: Route research tasks to domain-specific agents
- **Customer Support**: Delegate different types of inquiries to specialized support agents
- **Content Creation**: Route writing, editing, and fact-checking to different content specialists
### Interactive Mode ### Interactive Mode
To enable interactive mode, set the `interactive` parameter to `True` when initializing the `Agent`: To enable interactive mode, set the `interactive` parameter to `True` when initializing the `Agent`:
@ -1014,5 +1110,6 @@ The `run` method now supports several new parameters for advanced functionality:
| `mcp_url` or `mcp_urls` | Use mcp_url or mcp_urls to extend agent capabilities with external tools. | | `mcp_url` or `mcp_urls` | Use mcp_url or mcp_urls to extend agent capabilities with external tools. |
| `react_on` | Enable react_on for complex reasoning tasks requiring step-by-step analysis. | | `react_on` | Enable react_on for complex reasoning tasks requiring step-by-step analysis. |
| `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. | | `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. |
| `handoffs` | Use handoffs to create specialized agent teams that can intelligently route tasks based on complexity and expertise requirements. |
By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications. By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications.

@ -1,42 +1,31 @@
from swarms import Agent, MultiAgentRouter from swarms.structs.agent import Agent
from swarms.structs.multi_agent_router import MultiAgentRouter
# Example usage: # Example usage:
if __name__ == "__main__": agents = [
# Define some example agents Agent(
agents = [ agent_name="ResearchAgent",
Agent( agent_description="Specializes in researching topics and providing detailed, factual information",
agent_name="ResearchAgent", system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.",
description="Specializes in researching topics and providing detailed, factual information", ),
system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.", Agent(
model_name="openai/gpt-4o", agent_name="CodeExpertAgent",
), agent_description="Expert in writing, reviewing, and explaining code across multiple programming languages",
Agent( system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.",
agent_name="CodeExpertAgent", ),
description="Expert in writing, reviewing, and explaining code across multiple programming languages", Agent(
system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.", agent_name="WritingAgent",
model_name="openai/gpt-4o", agent_description="Skilled in creative and technical writing, content creation, and editing",
), system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.",
Agent( ),
agent_name="WritingAgent", ]
description="Skilled in creative and technical writing, content creation, and editing",
system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.",
model_name="openai/gpt-4o",
),
]
# Initialize routers with different configurations # Initialize routers with different configurations
router_execute = MultiAgentRouter( router_execute = MultiAgentRouter(
agents=agents, execute_task=True agents=agents, temperature=0.5, model="claude-sonnet-4-20250514"
) )
# Example task # Example task: Remake the Fibonacci task
task = "Write a Python function to calculate fibonacci numbers" task = "Use all the agents available to you to remake the Fibonacci function in Python, providing both an explanation and code."
result_execute = router_execute.run(task)
try: print(result_execute)
# Process the task with execution
print("\nWith task execution:")
result_execute = router_execute.route_task(task)
print(result_execute)
except Exception as e:
print(f"Error occurred: {str(e)}")

@ -1,120 +0,0 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the boss agent (Director)
boss_agent = Agent(
agent_name="BossAgent",
system_prompt="""
You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses.
Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently.
After receiving a report on the company's expenses, you will break down the work into smaller tasks,
assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures,
and identifying unnecessary transactions. Ensure the results are communicated back in a structured way
so the finance team can take actionable steps to cut off unproductive spending. You also monitor and
dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings
into a coherent report.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="boss_agent.json",
)
# Initialize worker 1: Expense Analyzer
worker1 = Agent(
agent_name="ExpenseAnalyzer",
system_prompt="""
Your task is to carefully analyze the company's expense data provided to you.
You will focus on identifying high-cost recurring transactions, categorizing expenditures
(e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending.
You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting.
Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2: Summary Generator
worker2 = Agent(
agent_name="SummaryGenerator",
system_prompt="""
After receiving the detailed breakdown from the ExpenseAnalyzer,
your task is to create a concise summary of the findings. You will focus on the most actionable insights,
such as highlighting the specific transactions that can be immediately cut off and summarizing the areas
where the company is overspending. Your summary will be used by the BossAgent to generate the final report.
Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off.
You will work collaboratively to break down the entire process of expense analysis into manageable steps.
The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first
focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them,
and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then
consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses.
Together, your collaboration is essential to streamlining and improving the companys financial health.
"""
# Create a list of agents
agents = [boss_agent, worker1, worker2]
# Define the flow pattern for the swarm
flow = "BossAgent -> ExpenseAnalyzer -> SummaryGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
name="pe-swarm",
description="ss",
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
)
# Input task for the swarm
task = f"""
{swarm_prompt}
The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed
analysis of recent transactions to identify which expenses can be cut off to improve profitability.
Analyze the provided transaction data and create a detailed report on cost-cutting opportunities,
focusing on recurring transactions and non-essential expenditures.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -1,7 +1,8 @@
from typing import Dict, Union, Any
import os import os
import requests
from enum import Enum from enum import Enum
from typing import Any, Dict, Union
import requests
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()

@ -1,20 +1,14 @@
from swarms import Agent from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import ( from swarms_tools.finance.okx_tool import okx_api_tool
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms_tools import yahoo_finance_api
# Initialize the agent # Initialize the agent
agent = Agent( agent = Agent(
agent_name="Financial-Analysis-Agent", agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent", agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1, max_loops=1,
model_name="gpt-4o-mini", model_name="gpt-4o-mini",
tools=[yahoo_finance_api], tools=[okx_api_tool],
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
) )
agent.run( agent.run("fetch the current price of bitcoin with okx")
"Fetch the data for nvidia and tesla both with the yahoo finance api"
)

@ -0,0 +1,63 @@
from swarms.structs.agent import Agent
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
dynamic_temperature_enabled=True,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
dynamic_temperature_enabled=True,
handoffs=[
risk_metrics_agent,
market_risk_agent,
],
)
out = portfolio_risk_agent.run(
"Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility using the risk metrics agent and market risk agent"
)
print(out)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "8.2.2" version = "8.3.0"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,86 @@
from swarms import Agent, AgentRearrange
# Define all prompts as strings before agent initializations
boss_agent_prompt = """
You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses.
Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently.
After receiving a report on the company's expenses, you will break down the work into smaller tasks,
assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures,
and identifying unnecessary transactions. Ensure the results are communicated back in a structured way
so the finance team can take actionable steps to cut off unproductive spending. You also monitor and
dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings
into a coherent report.
"""
expense_analyzer_prompt = """
Your task is to carefully analyze the company's expense data provided to you.
You will focus on identifying high-cost recurring transactions, categorizing expenditures
(e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending.
You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting.
Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures.
"""
summary_generator_prompt = """
After receiving the detailed breakdown from the ExpenseAnalyzer,
your task is to create a concise summary of the findings. You will focus on the most actionable insights,
such as highlighting the specific transactions that can be immediately cut off and summarizing the areas
where the company is overspending. Your summary will be used by the BossAgent to generate the final report.
Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses.
"""
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off.
You will work collaboratively to break down the entire process of expense analysis into manageable steps.
The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first
focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them,
and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then
consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses.
Together, your collaboration is essential to streamlining and improving the company's financial health.
"""
# Initialize the boss agent (Director)
boss_agent = Agent(
agent_name="BossAgent",
system_prompt=boss_agent_prompt,
max_loops=1,
)
# Initialize worker 1: Expense Analyzer
worker1 = Agent(
agent_name="ExpenseAnalyzer",
system_prompt=expense_analyzer_prompt,
max_loops=1,
dashboard=False,
)
# Initialize worker 2: Summary Generator
worker2 = Agent(
agent_name="SummaryGenerator",
system_prompt=summary_generator_prompt,
max_loops=1,
)
# Create a list of agents
agents = [boss_agent, worker1, worker2]
# Define the flow pattern for the swarm
flow = "BossAgent -> ExpenseAnalyzer, SummaryGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(agents=agents, flow=flow)
# Input task for the swarm
task = f"""
{swarm_prompt}
The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed
analysis of recent transactions to identify which expenses can be cut off to improve profitability.
Analyze the provided transaction data and create a detailed report on cost-cutting opportunities,
focusing on recurring transactions and non-essential expenditures.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -17,6 +17,7 @@ from typing import (
Optional, Optional,
Tuple, Tuple,
Union, Union,
Sequence,
) )
import toml import toml
@ -94,6 +95,7 @@ from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.output_types import OutputType from swarms.utils.output_types import OutputType
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.structs.multi_agent_router import MultiAgentRouter
def stop_when_repeats(response: str) -> bool: def stop_when_repeats(response: str) -> bool:
@ -456,6 +458,8 @@ class Agent:
drop_params: bool = True, drop_params: bool = True,
thinking_tokens: int = None, thinking_tokens: int = None,
reasoning_enabled: bool = False, reasoning_enabled: bool = False,
handoffs: Optional[Union[Sequence[Callable], Any]] = None,
capabilities: Optional[List[str]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -479,19 +483,6 @@ class Agent:
self.user_name = user_name self.user_name = user_name
self.context_length = context_length self.context_length = context_length
# Initialize transforms
if transforms is None:
self.transforms = None
elif isinstance(transforms, TransformConfig):
self.transforms = MessageTransforms(transforms)
elif isinstance(transforms, dict):
config = TransformConfig(**transforms)
self.transforms = MessageTransforms(config)
else:
raise ValueError(
"transforms must be a TransformConfig object or a dictionary"
)
self.sop = sop self.sop = sop
self.sop_list = sop_list self.sop_list = sop_list
self.tools = tools self.tools = tools
@ -617,6 +608,20 @@ class Agent:
self.thinking_tokens = thinking_tokens self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled self.reasoning_enabled = reasoning_enabled
self.fallback_model_name = fallback_model_name self.fallback_model_name = fallback_model_name
self.handoffs = handoffs
self.capabilities = capabilities
# Initialize transforms
if transforms is None:
self.transforms = None
elif isinstance(transforms, TransformConfig):
self.transforms = MessageTransforms(transforms)
elif isinstance(transforms, dict):
config = TransformConfig(**transforms)
self.transforms = MessageTransforms(config)
else:
pass
self.fallback_models = fallback_models or [] self.fallback_models = fallback_models or []
self.current_model_index = 0 self.current_model_index = 0
self.model_attempts = {} self.model_attempts = {}
@ -674,6 +679,18 @@ class Agent:
self.reliability_check() self.reliability_check()
def handle_handoffs(self, task: Optional[str] = None):
router = MultiAgentRouter(
name=self.agent_name,
description=self.agent_description,
agents=self.handoffs,
model=self.model_name,
temperature=self.temperature,
output_type=self.output_type,
)
return router.run(task=task)
def setup_tools(self): def setup_tools(self):
return BaseTool( return BaseTool(
tools=self.tools, tools=self.tools,
@ -2647,6 +2664,8 @@ class Agent:
*args, *args,
**kwargs, **kwargs,
) )
elif exists(self.handoffs):
output = self.handle_handoffs(task=task)
else: else:
output = self._run( output = self._run(
task=task, task=task,

@ -4,6 +4,8 @@ from typing import Any, Callable, Dict, List, Optional, Union
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.structs.swarm_id import swarm_id
from swarms.telemetry.main import log_agent_data from swarms.telemetry.main import log_agent_data
from swarms.utils.any_to_str import any_to_str from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import ( from swarms.utils.history_output_formatter import (
@ -11,12 +13,75 @@ from swarms.utils.history_output_formatter import (
) )
from swarms.utils.loguru_logger import initialize_logger from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType from swarms.utils.output_types import OutputType
from swarms.structs.swarm_id import swarm_id
logger = initialize_logger(log_folder="rearrange") logger = initialize_logger(log_folder="rearrange")
class AgentRearrange: class AgentRearrange:
"""
A sophisticated multi-agent system for task rearrangement and orchestration.
The AgentRearrange class enables complex workflows where multiple agents can work
sequentially or concurrently based on a defined flow pattern. It supports both
sequential execution (using '->') and concurrent execution (using ',') within
the same workflow.
Key Features:
- Sequential and concurrent agent execution
- Custom flow patterns with arrow (->) and comma (,) syntax
- Team awareness and sequential flow information
- Human-in-the-loop integration
- Memory system support
- Batch and concurrent processing capabilities
- Comprehensive error handling and logging
Flow Syntax:
- Use '->' to define sequential execution: "agent1 -> agent2 -> agent3"
- Use ',' to define concurrent execution: "agent1, agent2 -> agent3"
- Combine both: "agent1 -> agent2, agent3 -> agent4"
- Use 'H' for human-in-the-loop: "agent1 -> H -> agent2"
Attributes:
id (str): Unique identifier for the agent rearrange system
name (str): Human-readable name for the system
description (str): Description of the system's purpose
agents (Dict[str, Agent]): Dictionary mapping agent names to Agent objects
flow (str): Flow pattern defining agent execution order
max_loops (int): Maximum number of execution loops
verbose (bool): Whether to enable verbose logging
memory_system (Any): Optional memory system for persistence
human_in_the_loop (bool): Whether to enable human interaction
custom_human_in_the_loop (Callable): Custom human interaction handler
output_type (OutputType): Format for output results
autosave (bool): Whether to automatically save execution data
rules (str): System rules and constraints
team_awareness (bool): Whether agents are aware of team structure
time_enabled (bool): Whether to track timestamps
message_id_on (bool): Whether to include message IDs
conversation (Conversation): Conversation history management
Example:
>>> from swarms import Agent, AgentRearrange
>>>
>>> # Create agents
>>> agent1 = Agent(name="researcher", ...)
>>> agent2 = Agent(name="writer", ...)
>>> agent3 = Agent(name="reviewer", ...)
>>>
>>> # Define flow: agent1 runs first, then agent2 and agent3 run concurrently
>>> flow = "researcher -> writer, reviewer"
>>>
>>> # Create rearrange system
>>> rearrange_system = AgentRearrange(
... agents=[agent1, agent2, agent3],
... flow=flow,
... max_loops=1,
... team_awareness=True
... )
>>>
>>> # Execute task
>>> result = rearrange_system.run("Research and write a report")
"""
def __init__( def __init__(
self, self,
@ -38,9 +103,55 @@ class AgentRearrange:
team_awareness: bool = False, team_awareness: bool = False,
time_enabled: bool = False, time_enabled: bool = False,
message_id_on: bool = False, message_id_on: bool = False,
*args,
**kwargs,
): ):
"""
Initialize the AgentRearrange system.
Args:
id (str): Unique identifier for the agent rearrange system.
Defaults to a generated swarm ID.
name (str): Human-readable name for the system.
Defaults to "AgentRearrange".
description (str): Description of the system's purpose.
Defaults to "A swarm of agents for rearranging tasks.".
agents (List[Union[Agent, Callable]], optional): List of agents to include
in the system. Can be Agent objects or callable functions.
Defaults to None.
flow (str, optional): Flow pattern defining agent execution order.
Uses '->' for sequential and ',' for concurrent execution.
Defaults to None.
max_loops (int): Maximum number of execution loops. Must be > 0.
Defaults to 1.
verbose (bool): Whether to enable verbose logging.
Defaults to True.
memory_system (Any, optional): Optional memory system for persistence.
Defaults to None.
human_in_the_loop (bool): Whether to enable human interaction points.
Defaults to False.
custom_human_in_the_loop (Callable[[str], str], optional): Custom function
for handling human interaction. Takes input string, returns response.
Defaults to None.
output_type (OutputType): Format for output results. Can be "all", "final",
"list", or "dict". Defaults to "all".
autosave (bool): Whether to automatically save execution data.
Defaults to True.
rules (str, optional): System rules and constraints to add to conversation.
Defaults to None.
team_awareness (bool): Whether agents should be aware of team structure
and sequential flow. Defaults to False.
time_enabled (bool): Whether to track timestamps in conversations.
Defaults to False.
message_id_on (bool): Whether to include message IDs in conversations.
Defaults to False.
Raises:
ValueError: If agents list is None or empty, max_loops is 0,
flow is None or empty, or output_type is None or empty.
Note:
The agents parameter is converted to a dictionary mapping agent names
to Agent objects for efficient lookup during execution.
"""
self.name = name self.name = name
self.description = description self.description = description
self.id = id self.id = id
@ -80,6 +191,23 @@ class AgentRearrange:
self.reliability_check() self.reliability_check()
def reliability_check(self): def reliability_check(self):
"""
Validates the configuration parameters to ensure the system can run properly.
Performs comprehensive validation checks on critical parameters including
agents list, max_loops, flow pattern, and output_type to prevent runtime errors.
Raises:
ValueError: If any of the following conditions are met:
- agents list is None or empty
- max_loops is 0
- flow is None or empty string
- output_type is None or empty string
Note:
This method is called automatically during initialization to ensure
the system is properly configured before execution.
"""
if self.agents is None or len(self.agents) == 0: if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty") raise ValueError("Agents list cannot be None or empty")
@ -93,6 +221,24 @@ class AgentRearrange:
raise ValueError("output_type cannot be None or empty") raise ValueError("output_type cannot be None or empty")
def set_custom_flow(self, flow: str): def set_custom_flow(self, flow: str):
"""
Sets a custom flow pattern for agent execution.
Allows dynamic modification of the execution flow after initialization.
The flow pattern defines how agents should be executed in sequence or
parallel using the standard syntax ('->' for sequential, ',' for concurrent).
Args:
flow (str): The new flow pattern to use for agent execution.
Must follow the syntax: "agent1 -> agent2, agent3 -> agent4"
Note:
The flow will be validated on the next execution. If invalid,
a ValueError will be raised during the run() method.
Example:
>>> rearrange_system.set_custom_flow("researcher -> writer, editor")
"""
self.flow = flow self.flow = flow
logger.info(f"Custom flow set: {flow}") logger.info(f"Custom flow set: {flow}")
@ -111,6 +257,20 @@ class AgentRearrange:
agent_name: str, agent_name: str,
result: str, result: str,
): ):
"""
Tracks the execution history for a specific agent.
Records the result of an agent's execution in the swarm history
for later analysis or debugging purposes.
Args:
agent_name (str): The name of the agent whose result to track.
result (str): The result/output from the agent's execution.
Note:
This method is typically called internally during agent execution
to maintain a complete history of all agent activities.
"""
self.swarm_history[agent_name].append(result) self.swarm_history[agent_name].append(result)
def remove_agent(self, agent_name: str): def remove_agent(self, agent_name: str):
@ -306,6 +466,120 @@ class AgentRearrange:
""" """
return self._get_sequential_flow_info() return self._get_sequential_flow_info()
def _run_concurrent_workflow(
self,
agent_names: List[str],
img: str = None,
*args,
**kwargs,
) -> Dict[str, str]:
"""
Executes agents concurrently when comma is detected in the flow.
This method handles the parallel execution of multiple agents when they
are separated by commas in the flow pattern. All specified agents run
simultaneously and their results are collected and returned.
Args:
agent_names (List[str]): List of agent names to run concurrently.
These agents will execute in parallel.
img (str, optional): Image input for agents that support it.
Defaults to None.
*args: Additional positional arguments passed to agent execution.
**kwargs: Additional keyword arguments passed to agent execution.
Returns:
Dict[str, str]: Dictionary mapping agent names to their execution results.
Keys are agent names, values are their respective outputs.
Note:
This method uses the run_agents_concurrently utility function
to handle the actual parallel execution and result collection.
"""
logger.info(f"Running agents in parallel: {agent_names}")
# Get agent objects for concurrent execution
agents_to_run = [
self.agents[agent_name] for agent_name in agent_names
]
# Run agents concurrently
results = run_agents_concurrently(
agents=agents_to_run,
task=self.conversation.get_str(),
)
# Process results and update conversation
response_dict = {}
for i, agent_name in enumerate(agent_names):
result = results[i]
# print(f"Result: {result}")
self.conversation.add(agent_name, result)
response_dict[agent_name] = result
logger.debug(f"Agent {agent_name} output: {result}")
return response_dict
def _run_sequential_workflow(
self,
agent_name: str,
tasks: List[str],
img: str = None,
*args,
**kwargs,
) -> str:
"""
Executes a single agent sequentially.
This method handles the sequential execution of a single agent in the flow.
It provides sequential awareness information to the agent if team_awareness
is enabled, allowing the agent to understand its position in the workflow.
Args:
agent_name (str): Name of the agent to run sequentially.
tasks (List[str]): List of all tasks in the flow for awareness context.
Used to determine the agent's position and provide awareness info.
img (str, optional): Image input for agents that support it.
Defaults to None.
*args: Additional positional arguments passed to agent execution.
**kwargs: Additional keyword arguments passed to agent execution.
Returns:
str: The result from the agent's execution, converted to string format.
Note:
If team_awareness is enabled, this method will add sequential awareness
information to the conversation before executing the agent, informing
the agent about its position in the workflow sequence.
"""
logger.info(f"Running agent sequentially: {agent_name}")
agent = self.agents[agent_name]
# Add sequential awareness information for the agent
awareness_info = self._get_sequential_awareness(
agent_name, tasks
)
if awareness_info:
self.conversation.add("system", awareness_info)
logger.info(
f"Added sequential awareness for {agent_name}: {awareness_info}"
)
current_task = agent.run(
task=self.conversation.get_str(),
img=img,
*args,
**kwargs,
)
current_task = any_to_str(current_task)
self.conversation.add(agent.agent_name, current_task)
return current_task
def _run( def _run(
self, self,
task: str = None, task: str = None,
@ -315,27 +589,39 @@ class AgentRearrange:
**kwargs, **kwargs,
): ):
""" """
Runs the swarm to rearrange the tasks. Runs the swarm to rearrange the tasks according to the defined flow.
This is the core execution method that orchestrates the entire workflow.
It processes the flow pattern, executes agents sequentially or concurrently
as specified, and returns the results in the requested format.
Args: Args:
task (str, optional): The initial task to be processed. Defaults to None. task (str, optional): The initial task to be processed by the swarm.
img (str, optional): Image input for agents that support it. Defaults to None. This is added to the conversation history. Defaults to None.
custom_tasks (Dict[str, str], optional): Custom tasks for specific agents. Defaults to None. img (str, optional): Image input for agents that support it.
output_type (str, optional): Format of the output. Can be: Defaults to None.
custom_tasks (Dict[str, str], optional): Custom tasks for specific agents.
Allows overriding the main task for specific agents in the flow.
Defaults to None.
*args: Additional positional arguments passed to agent execution.
**kwargs: Additional keyword arguments passed to agent execution.
Returns:
Union[str, List[str], Dict[str, str]]: The processed output in the format
specified by output_type:
- "all": String containing all agent responses concatenated - "all": String containing all agent responses concatenated
- "final": Only the final agent's response - "final": Only the final agent's response
- "list": List of all agent responses - "list": List of all agent responses
- "dict": Dict mapping agent names to their responses - "dict": Dict mapping agent names to their responses
Defaults to "final".
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
Union[str, List[str], Dict[str, str]]: The processed output in the specified format
Raises: Raises:
ValueError: If flow validation fails ValueError: If flow validation fails or configuration is invalid.
Exception: For any other errors during execution Exception: For any other errors during execution.
Note:
This method handles both sequential and concurrent execution patterns
based on the flow syntax. It also supports custom task injection
and multiple execution loops as configured.
""" """
try: try:
self.conversation.add("User", task) self.conversation.add("User", task)
@ -345,14 +631,13 @@ class AgentRearrange:
return "Invalid flow configuration." return "Invalid flow configuration."
tasks = self.flow.split("->") tasks = self.flow.split("->")
current_task = task
response_dict = {} response_dict = {}
logger.info( logger.info(
f"Starting task execution with {len(tasks)} steps" f"Starting task execution with {len(tasks)} steps"
) )
# # Handle custom tasks # Handle custom tasks
if custom_tasks is not None: if custom_tasks is not None:
logger.info("Processing custom tasks") logger.info("Processing custom tasks")
c_agent_name, c_task = next( c_agent_name, c_task = next(
@ -377,68 +662,28 @@ class AgentRearrange:
] ]
if len(agent_names) > 1: if len(agent_names) > 1:
# Parallel processing # Concurrent processing - comma detected
logger.info( concurrent_results = (
f"Running agents in parallel: {agent_names}" self._run_concurrent_workflow(
) agent_names=agent_names,
for agent_name in agent_names:
agent = self.agents[agent_name]
result = agent.run(
task=self.conversation.get_str(),
img=img, img=img,
*args, *args,
**kwargs, **kwargs,
) )
result = any_to_str(result) )
response_dict.update(concurrent_results)
self.conversation.add(
agent.agent_name, result
)
response_dict[agent_name] = result
logger.debug(
f"Agent {agent_name} output: {result}"
)
",".join(agent_names)
else: else:
# Sequential processing # Sequential processing
logger.info(
f"Running agent sequentially: {agent_names[0]}"
)
agent_name = agent_names[0] agent_name = agent_names[0]
result = self._run_sequential_workflow(
agent = self.agents[agent_name] agent_name=agent_name,
tasks=tasks,
# Add sequential awareness information for the agent
awareness_info = (
self._get_sequential_awareness(
agent_name, tasks
)
)
if awareness_info:
self.conversation.add(
"system", awareness_info
)
logger.info(
f"Added sequential awareness for {agent_name}: {awareness_info}"
)
current_task = agent.run(
task=self.conversation.get_str(),
img=img, img=img,
*args, *args,
**kwargs, **kwargs,
) )
current_task = any_to_str(current_task) response_dict[agent_name] = result
self.conversation.add(
agent.agent_name, current_task
)
response_dict[agent_name] = current_task
loop_count += 1 loop_count += 1
@ -453,11 +698,28 @@ class AgentRearrange:
self._catch_error(e) self._catch_error(e)
def _catch_error(self, e: Exception): def _catch_error(self, e: Exception):
"""
Handles errors that occur during swarm execution.
Provides comprehensive error handling including logging, data persistence,
and error reporting. This method is called whenever an exception occurs
during the execution of the swarm.
Args:
e (Exception): The exception that occurred during execution.
Returns:
Exception: The original exception for potential re-raising.
Note:
If autosave is enabled, the current state of the swarm will be
automatically saved to the logging system before error reporting.
"""
if self.autosave is True: if self.autosave is True:
log_agent_data(self.to_dict()) log_agent_data(self.to_dict())
logger.error( logger.error(
f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}" f"AgentRearrange: Id: {self.id}, Name: {self.name}. An error occurred with your agent '{self.name}': Error: {e}. Traceback: {e.__traceback__}"
) )
return e return e
@ -470,16 +732,28 @@ class AgentRearrange:
**kwargs, **kwargs,
): ):
""" """
Execute the agent rearrangement task with specified compute resources. Execute the agent rearrangement task with comprehensive logging and error handling.
This is the main public method for executing tasks through the agent rearrange
system. It provides telemetry logging, error handling, and delegates to the
internal _run method for actual execution.
Args: Args:
task (str, optional): The task to execute. Defaults to None. task (str, optional): The task to execute through the agent workflow.
img (str, optional): Path to input image if required. Defaults to None. Defaults to None.
*args: Additional positional arguments passed to _run(). img (str, optional): Path to input image if required by any agents.
**kwargs: Additional keyword arguments passed to _run(). Defaults to None.
*args: Additional positional arguments passed to the internal _run() method.
**kwargs: Additional keyword arguments passed to the internal _run() method.
Returns: Returns:
The result from executing the task through the cluster operations wrapper. The result from executing the task through the agent rearrange system.
The format depends on the configured output_type.
Note:
This method automatically logs agent data before and after execution
for telemetry and debugging purposes. Any exceptions are caught and
handled by the _catch_error method.
""" """
try: try:
log_agent_data(self.to_dict()) log_agent_data(self.to_dict())
@ -502,13 +776,20 @@ class AgentRearrange:
""" """
Make the class callable by executing the run() method. Make the class callable by executing the run() method.
Enables the AgentRearrange instance to be called directly as a function,
providing a convenient interface for task execution.
Args: Args:
task (str): The task to execute. task (str): The task to execute through the agent workflow.
*args: Additional positional arguments passed to run(). *args: Additional positional arguments passed to run().
**kwargs: Additional keyword arguments passed to run(). **kwargs: Additional keyword arguments passed to run().
Returns: Returns:
The result from executing run(). The result from executing the task through the agent rearrange system.
Example:
>>> rearrange_system = AgentRearrange(agents=[agent1, agent2], flow="agent1 -> agent2")
>>> result = rearrange_system("Process this data")
""" """
try: try:
return self.run(task=task, *args, **kwargs) return self.run(task=task, *args, **kwargs)
@ -525,19 +806,29 @@ class AgentRearrange:
**kwargs, **kwargs,
) -> List[str]: ) -> List[str]:
""" """
Process multiple tasks in batches. Process multiple tasks in batches for efficient execution.
This method allows processing multiple tasks by dividing them into
smaller batches and processing each batch sequentially. This is useful
for managing memory usage and resource allocation when dealing with
large numbers of tasks.
Args: Args:
tasks: List of tasks to process tasks (List[str]): List of tasks to process through the agent workflow.
img: Optional list of images corresponding to tasks img (Optional[List[str]]): Optional list of images corresponding to tasks.
batch_size: Number of tasks to process simultaneously Must be the same length as tasks list. Defaults to None.
device: Computing device to use batch_size (int): Number of tasks to process simultaneously in each batch.
device_id: Specific device ID if applicable Defaults to 10.
all_cores: Whether to use all CPU cores *args: Additional positional arguments passed to individual task execution.
all_gpus: Whether to use all available GPUs **kwargs: Additional keyword arguments passed to individual task execution.
Returns: Returns:
List of results corresponding to input tasks List[str]: List of results corresponding to input tasks in the same order.
Note:
This method processes tasks in batches to manage resource usage.
Each batch is processed sequentially, but individual tasks within
a batch may run concurrently depending on the flow configuration.
""" """
try: try:
results = [] results = []
@ -576,17 +867,27 @@ class AgentRearrange:
""" """
Process multiple tasks concurrently using ThreadPoolExecutor. Process multiple tasks concurrently using ThreadPoolExecutor.
This method enables true parallel processing of multiple tasks by using
Python's ThreadPoolExecutor to run tasks simultaneously across multiple
threads. This is ideal for I/O-bound tasks or when you want maximum
parallelization.
Args: Args:
tasks: List of tasks to process tasks (List[str]): List of tasks to process through the agent workflow.
img: Optional list of images corresponding to tasks img (Optional[List[str]]): Optional list of images corresponding to tasks.
max_workers: Maximum number of worker threads Must be the same length as tasks list. Defaults to None.
device: Computing device to use max_workers (Optional[int]): Maximum number of worker threads to use.
device_id: Specific device ID if applicable If None, uses the default ThreadPoolExecutor behavior. Defaults to None.
all_cores: Whether to use all CPU cores *args: Additional positional arguments passed to individual task execution.
all_gpus: Whether to use all available GPUs **kwargs: Additional keyword arguments passed to individual task execution.
Returns: Returns:
List of results corresponding to input tasks List[str]: List of results corresponding to input tasks in the same order.
Note:
This method uses ThreadPoolExecutor for true parallel execution.
The number of concurrent executions is limited by max_workers parameter.
Each task runs independently through the full agent workflow.
""" """
try: try:
with ThreadPoolExecutor( with ThreadPoolExecutor(
@ -613,11 +914,19 @@ class AgentRearrange:
""" """
Serializes callable attributes by extracting their name and docstring. Serializes callable attributes by extracting their name and docstring.
This helper method handles the serialization of callable objects (functions,
methods, etc.) by extracting their metadata for storage or logging purposes.
Args: Args:
attr_value (Callable): The callable to serialize. attr_value (Callable): The callable object to serialize.
Returns: Returns:
Dict[str, Any]: Dictionary with name and docstring of the callable. Dict[str, Any]: Dictionary containing the callable's name and docstring.
Keys are "name" and "doc", values are the corresponding attributes.
Note:
This method is used internally by to_dict() to handle non-serializable
callable attributes in a graceful manner.
""" """
return { return {
"name": getattr( "name": getattr(
@ -630,12 +939,22 @@ class AgentRearrange:
""" """
Serializes an individual attribute, handling non-serializable objects. Serializes an individual attribute, handling non-serializable objects.
This helper method attempts to serialize individual attributes for storage
or logging. It handles different types of objects including callables,
objects with to_dict methods, and basic serializable types.
Args: Args:
attr_name (str): The name of the attribute. attr_name (str): The name of the attribute being serialized.
attr_value (Any): The value of the attribute. attr_value (Any): The value of the attribute to serialize.
Returns: Returns:
Any: The serialized value of the attribute. Any: The serialized value of the attribute. For non-serializable objects,
returns a string representation indicating the object type.
Note:
This method is used internally by to_dict() to handle various types
of attributes in a robust manner, ensuring the serialization process
doesn't fail on complex objects.
""" """
try: try:
if callable(attr_value): if callable(attr_value):
@ -655,10 +974,21 @@ class AgentRearrange:
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> Dict[str, Any]:
""" """
Converts all attributes of the class, including callables, into a dictionary. Converts all attributes of the class, including callables, into a dictionary.
Handles non-serializable attributes by converting them or skipping them.
This method provides a comprehensive serialization of the AgentRearrange
instance, converting all attributes into a dictionary format suitable for
storage, logging, or transmission. It handles complex objects gracefully
by using helper methods for serialization.
Returns: Returns:
Dict[str, Any]: A dictionary representation of the class attributes. Dict[str, Any]: A dictionary representation of all class attributes.
Non-serializable objects are converted to string representations
or serialized using their to_dict method if available.
Note:
This method is used for telemetry logging and state persistence.
It recursively handles nested objects and provides fallback handling
for objects that cannot be directly serialized.
""" """
return { return {
attr_name: self._serialize_attr(attr_name, attr_value) attr_name: self._serialize_attr(attr_name, attr_value)
@ -677,23 +1007,45 @@ def rearrange(
**kwargs, **kwargs,
): ):
""" """
Rearranges the given list of agents based on the specified flow. Convenience function to create and execute an AgentRearrange system in one call.
This function provides a simplified interface for creating an AgentRearrange
instance and immediately executing a task with it. It's useful for quick
prototyping or when you don't need to reuse the rearrange system.
Parameters: Parameters:
agents (List[Agent]): The list of agents to be rearranged. name (str, optional): Name for the agent rearrange system.
flow (str): The flow used for rearranging the agents. Defaults to None (uses AgentRearrange default).
task (str, optional): The task to be performed during rearrangement. Defaults to None. description (str, optional): Description of the system.
*args: Additional positional arguments. Defaults to None (uses AgentRearrange default).
**kwargs: Additional keyword arguments. agents (List[Agent]): The list of agents to be included in the system.
flow (str): The flow pattern defining agent execution order.
Uses '->' for sequential and ',' for concurrent execution.
task (str, optional): The task to be performed during rearrangement.
Defaults to None.
img (str, optional): Image input for agents that support it.
Defaults to None.
*args: Additional positional arguments passed to AgentRearrange constructor.
**kwargs: Additional keyword arguments passed to AgentRearrange constructor.
Returns: Returns:
The result of running the agent system with the specified task. The result of running the agent system with the specified task.
The format depends on the output_type configuration.
Example: Example:
agents = [agent1, agent2, agent3] >>> from swarms import Agent, rearrange
flow = "agent1 -> agent2, agent3" >>>
task = "Perform a task" >>> # Create agents
rearrange(agents, flow, task) >>> agent1 = Agent(name="researcher", ...)
>>> agent2 = Agent(name="writer", ...)
>>> agent3 = Agent(name="reviewer", ...)
>>>
>>> # Execute task with flow
>>> result = rearrange(
... agents=[agent1, agent2, agent3],
... flow="researcher -> writer, reviewer",
... task="Research and write a report"
... )
""" """
agent_system = AgentRearrange( agent_system = AgentRearrange(
name=name, name=name,

@ -1,31 +1,92 @@
import os import concurrent.futures
from typing import List, Optional import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, List, Optional
import orjson
from loguru import logger from loguru import logger
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.utils.output_types import OutputType from swarms.tools.base_tool import BaseTool
from swarms.utils.any_to_str import any_to_str from swarms.utils.formatter import formatter
from swarms.utils.generate_keys import generate_api_key
from swarms.utils.history_output_formatter import ( from swarms.utils.history_output_formatter import (
history_output_formatter, history_output_formatter,
) )
from swarms.utils.formatter import formatter from swarms.utils.litellm_wrapper import LiteLLM
from swarms.structs.omni_agent_types import AgentListType from swarms.utils.output_types import OutputType
class AgentResponse(BaseModel): class HandOffsResponse(BaseModel):
"""Response from the boss agent indicating which agent should handle the task""" """
Response from the boss agent indicating which agent should handle the task.
This model encapsulates the reasoning behind the agent selection, the name of the selected agent, and an optional modified version of the task. It is used to communicate the boss agent's decision and rationale to the rest of the system.
"""
selected_agent: str = Field(
description="Name of the agent selected to handle the task"
)
reasoning: str = Field( reasoning: str = Field(
description="Explanation for why this agent was selected" description="A detailed explanation for why this agent was selected. This should include the logic or criteria used by the boss agent to make the selection, providing transparency and traceability for the routing decision."
) )
modified_task: Optional[str] = Field( agent_name: str = Field(
None, description="Optional modified version of the task" description="The name of the agent selected to handle the task. This should match the identifier of one of the available agents in the system, ensuring the task is routed correctly."
) )
task: Optional[str] = Field(
None,
description="An optional, modified version of the original task. If the boss agent determines that the task should be rephrased or clarified before execution, the new version is provided here. If no modification is needed, this field may be left as None.",
)
class MultipleHandOffsResponse(BaseModel):
"""
Response from the boss agent indicating which agents should handle the task.
"""
handoffs: List[HandOffsResponse] = Field(
description="A list of handoffs, each containing the reasoning, agent name, and task for each agent."
)
def get_agent_response_schema(model_name: str = None):
return BaseTool().base_model_to_dict(MultipleHandOffsResponse)
def agent_boss_router_prompt(agent_descriptions: any):
return f"""
You are an intelligent boss agent responsible for routing tasks to the most appropriate specialized agents.
Available agents:
{agent_descriptions}
Your primary responsibilities:
1. **Understand the user's intent and task requirements** - Carefully analyze what the user is asking for
2. **Determine task complexity** - Identify if this is a single task or multiple related tasks
3. **Match capabilities to requirements** - Select the most suitable agent(s) based on their descriptions and expertise
4. **Provide clear reasoning** - Explain your selection logic transparently
5. **Optimize task assignment** - Modify tasks if needed to better suit the selected agent's capabilities
**Routing Logic:**
- **Single Task**: If the user presents one clear task, select the single best agent for that task
- **Multiple Tasks**: If the user presents multiple distinct tasks or a complex task that requires different expertise areas, select multiple agents and break down the work accordingly
- **Collaborative Tasks**: If a task benefits from multiple perspectives or requires different skill sets, assign it to multiple agents
**Response Format:**
You must respond with JSON containing a "handoffs" array with one or more handoff objects. Each handoff object must contain:
- reasoning: Detailed explanation of why this agent was selected and how they fit the task requirements
- agent_name: Name of the chosen agent (must exactly match one of the available agents)
- task: A modified/optimized version of the task if needed, or the original task if no modification is required
**Important Guidelines:**
- Always analyze the user's intent first before making routing decisions
- Consider task dependencies and whether agents need to work sequentially or in parallel
- If multiple agents are selected, ensure their tasks are complementary and don't conflict
- Provide specific, actionable reasoning for each agent selection
- Ensure all agent names exactly match the available agents list
- If a single agent can handle the entire request efficiently, use only one agent
- If the request requires different expertise areas, use multiple agents with clearly defined, non-overlapping tasks
Remember: Your goal is to maximize task completion quality by matching the right agent(s) to the right work based on their capabilities and the user's actual needs.
"""
class MultiAgentRouter: class MultiAgentRouter:
@ -38,23 +99,32 @@ class MultiAgentRouter:
name (str): The name of the router. name (str): The name of the router.
description (str): A description of the router's purpose. description (str): A description of the router's purpose.
agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object. agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object.
api_key (str): The API key for OpenAI. model (str): The model to use for the boss agent.
output_type (str): The type of output expected from the agents. Can be either "json" or "string". temperature (float): The temperature for the boss agent's model.
execute_task (bool): A flag indicating whether the task should be executed by the selected agent. shared_memory_system (callable): A shared memory system for agents to query.
boss_system_prompt (str): A system prompt for the boss agent that includes information about all available agents. output_type (OutputType): The type of output expected from the agents.
function_caller (OpenAIFunctionCaller): An instance of OpenAIFunctionCaller for calling the boss agent. print_on (bool): Whether to print the boss agent's decision.
system_prompt (str): Custom system prompt for the router.
skip_null_tasks (bool): Whether to skip executing agents when their assigned task is null or None.
conversation (Conversation): The conversation history.
function_caller (LiteLLM): An instance of LiteLLM for calling the boss agent.
""" """
def __init__( def __init__(
self, self,
id: str = generate_api_key(prefix="multi-agent-router"),
name: str = "swarm-router", name: str = "swarm-router",
description: str = "Routes tasks to specialized agents based on their capabilities", description: str = "Routes tasks to specialized agents based on their capabilities",
agents: AgentListType = None, agents: List[Callable] = None,
model: str = "gpt-4o-mini", model: str = "gpt-4o-mini",
temperature: float = 0.1, temperature: float = 0.1,
shared_memory_system: callable = None, shared_memory_system: callable = None,
output_type: OutputType = "dict", output_type: OutputType = "dict",
if_print: bool = True, print_on: bool = True,
system_prompt: str = None,
skip_null_tasks: bool = True,
*args,
**kwargs,
): ):
""" """
Initializes the MultiAgentRouter with a list of agents and configuration options. Initializes the MultiAgentRouter with a list of agents and configuration options.
@ -63,10 +133,13 @@ class MultiAgentRouter:
name (str, optional): The name of the router. Defaults to "swarm-router". name (str, optional): The name of the router. Defaults to "swarm-router".
description (str, optional): A description of the router's purpose. Defaults to "Routes tasks to specialized agents based on their capabilities". description (str, optional): A description of the router's purpose. Defaults to "Routes tasks to specialized agents based on their capabilities".
agents (List[Agent], optional): A list of agents to be managed by the router. Defaults to an empty list. agents (List[Agent], optional): A list of agents to be managed by the router. Defaults to an empty list.
model (str, optional): The model to use for the boss agent. Defaults to "gpt-4-0125-preview". model (str, optional): The model to use for the boss agent. Defaults to "gpt-4o-mini".
temperature (float, optional): The temperature for the boss agent's model. Defaults to 0.1. temperature (float, optional): The temperature for the boss agent's model. Defaults to 0.1.
output_type (Literal["json", "string"], optional): The type of output expected from the agents. Defaults to "json". shared_memory_system (callable, optional): A shared memory system for agents to query. Defaults to None.
execute_task (bool, optional): A flag indicating whether the task should be executed by the selected agent. Defaults to True. output_type (OutputType, optional): The type of output expected from the agents. Defaults to "dict".
print_on (bool, optional): Whether to print the boss agent's decision. Defaults to True.
system_prompt (str, optional): Custom system prompt for the router. Defaults to None.
skip_null_tasks (bool, optional): Whether to skip executing agents when their assigned task is null or None. Defaults to True.
""" """
self.name = name self.name = name
self.description = description self.description = description
@ -74,23 +147,28 @@ class MultiAgentRouter:
self.output_type = output_type self.output_type = output_type
self.model = model self.model = model
self.temperature = temperature self.temperature = temperature
self.if_print = if_print self.print_on = print_on
self.agents = {agent.name: agent for agent in agents} self.system_prompt = system_prompt
self.skip_null_tasks = skip_null_tasks
self.agents = {agent.agent_name: agent for agent in agents}
self.conversation = Conversation() self.conversation = Conversation()
self.api_key = os.getenv("OPENAI_API_KEY") router_system_prompt = ""
if not self.api_key:
raise ValueError("OpenAI API key must be provided")
self.boss_system_prompt = self._create_boss_system_prompt() router_system_prompt += (
self.system_prompt if self.system_prompt else ""
# Initialize the function caller )
self.function_caller = OpenAIFunctionCaller( router_system_prompt += self._create_boss_system_prompt()
system_prompt=self.boss_system_prompt,
api_key=self.api_key, self.function_caller = LiteLLM(
temperature=temperature, model_name=self.model,
base_model=AgentResponse, system_prompt=router_system_prompt,
temperature=self.temperature,
tool_choice="auto",
parallel_tool_calls=True,
response_format=MultipleHandOffsResponse,
*args,
**kwargs,
) )
def __repr__(self): def __repr__(self):
@ -114,24 +192,110 @@ class MultiAgentRouter:
] ]
) )
return f"""You are a boss agent responsible for routing tasks to the most appropriate specialized agent. return agent_boss_router_prompt(agent_descriptions)
Available agents:
{agent_descriptions} def handle_single_handoff(
self, boss_response_str: dict, task: str
) -> dict:
"""
Handles a single handoff to one agent.
If skip_null_tasks is True and the assigned task is null or None,
the agent execution will be skipped.
"""
Your job is to: # Validate that the selected agent exists
1. Analyze the incoming task if (
2. Select the most appropriate agent based on their descriptions boss_response_str["handoffs"][0]["agent_name"]
3. Provide clear reasoning for your selection not in self.agents
4. Optionally modify the task to better suit the selected agent's capabilities ):
raise ValueError(
f"Boss selected unknown agent: {boss_response_str.agent_name}"
)
# Get the selected agent
selected_agent = self.agents[
boss_response_str["handoffs"][0]["agent_name"]
]
# Use the modified task if provided, otherwise use original task
final_task = boss_response_str["handoffs"][0]["task"] or task
# Skip execution if task is null/None and skip_null_tasks is True
if self.skip_null_tasks and (
final_task is None or final_task == ""
):
if self.print_on:
logger.info(
f"Skipping execution for agent {selected_agent.agent_name} - task is null/None"
)
You must respond with JSON that contains: # Use the agent's run method directly
- selected_agent: Name of the chosen agent (must be one of the available agents) agent_response = selected_agent.run(final_task)
- reasoning: Brief explanation of why this agent was selected
- modified_task: (Optional) A modified version of the task if needed self.conversation.add(
role=selected_agent.agent_name, content=agent_response
)
Always select exactly one agent that best matches the task requirements. # return agent_response
def handle_multiple_handoffs(
self, boss_response_str: dict, task: str
) -> dict:
"""
Handles multiple handoffs to multiple agents.
If skip_null_tasks is True and any assigned task is null or None,
those agents will be skipped and only agents with valid tasks will be executed.
""" """
# Validate that the selected agents exist
for handoff in boss_response_str["handoffs"]:
if handoff["agent_name"] not in self.agents:
raise ValueError(
f"Boss selected unknown agent: {handoff.agent_name}"
)
# Get the selected agents and their tasks
selected_agents = []
final_tasks = []
skipped_agents = []
for handoff in boss_response_str["handoffs"]:
agent = self.agents[handoff["agent_name"]]
final_task = handoff["task"] or task
# Skip execution if task is null/None and skip_null_tasks is True
if self.skip_null_tasks and (
final_task is None or final_task == ""
):
if self.print_on:
logger.info(
f"Skipping execution for agent {agent.agent_name} - task is null/None"
)
skipped_agents.append(agent.agent_name)
continue
selected_agents.append(agent)
final_tasks.append(final_task)
# Execute agents only if there are valid tasks
if selected_agents:
# Use the agents' run method directly
agent_responses = [
agent.run(final_task)
for agent, final_task in zip(
selected_agents, final_tasks
)
]
self.conversation.add(
role=selected_agents[0].agent_name,
content=agent_responses[0],
)
# return agent_responses
def route_task(self, task: str) -> dict: def route_task(self, task: str) -> dict:
""" """
Routes a task to the appropriate agent and returns their response. Routes a task to the appropriate agent and returns their response.
@ -146,46 +310,32 @@ class MultiAgentRouter:
self.conversation.add(role="user", content=task) self.conversation.add(role="user", content=task)
# Get boss decision using function calling # Get boss decision using function calling
boss_response = self.function_caller.run(task) boss_response_str = self.function_caller.run(task)
boss_response_str = any_to_str(boss_response)
if self.if_print: boss_response_str = orjson.loads(boss_response_str)
formatter.print_panel(
boss_response_str,
title="Multi-Agent Router Decision",
)
else:
pass
self.conversation.add( if self.print_on:
role="Agent Router", content=boss_response_str formatter.print_panel(
) # orjson.dumps(boss_response_str, indent=4),
orjson.dumps(
# Validate that the selected agent exists boss_response_str, option=orjson.OPT_INDENT_2
if boss_response.selected_agent not in self.agents: ).decode("utf-8"),
raise ValueError( title=self.name,
f"Boss selected unknown agent: {boss_response.selected_agent}"
) )
# Get the selected agent if len(boss_response_str["handoffs"]) > 1:
selected_agent = self.agents[boss_response.selected_agent] self.handle_multiple_handoffs(boss_response_str, task)
else:
# Use the modified task if provided, otherwise use original task self.handle_single_handoff(boss_response_str, task)
final_task = boss_response.modified_task or task
# Use the agent's run method directly
agent_response = selected_agent.run(final_task)
self.conversation.add(
role=selected_agent.name, content=agent_response
)
return history_output_formatter( return history_output_formatter(
conversation=self.conversation, type=self.output_type conversation=self.conversation, type=self.output_type
) )
except Exception as e: except Exception as e:
logger.error(f"Error routing task: {str(e)}") logger.error(
f"Error routing task: {str(e)} Traceback: {traceback.format_exc()}"
)
raise raise
def run(self, task: str): def run(self, task: str):
@ -209,9 +359,6 @@ class MultiAgentRouter:
def concurrent_batch_run(self, tasks: List[str] = []): def concurrent_batch_run(self, tasks: List[str] = []):
"""Concurrently route tasks to the appropriate agents""" """Concurrently route tasks to the appropriate agents"""
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
results = [] results = []
with ThreadPoolExecutor() as executor: with ThreadPoolExecutor() as executor:
futures = [ futures = [

@ -9,7 +9,6 @@ import litellm
import requests import requests
from litellm import completion, supports_vision from litellm import completion, supports_vision
from loguru import logger from loguru import logger
from pydantic import BaseModel
class LiteLLMException(Exception): class LiteLLMException(Exception):
@ -235,6 +234,7 @@ class LiteLLM:
drop_params: bool = True, drop_params: bool = True,
thinking_tokens: int = None, thinking_tokens: int = None,
reasoning_enabled: bool = False, reasoning_enabled: bool = False,
response_format: any = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -292,6 +292,7 @@ class LiteLLM:
self.thinking_tokens = thinking_tokens self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled self.reasoning_enabled = reasoning_enabled
self.verbose = verbose self.verbose = verbose
self.response_format = response_format
self.modalities = [] self.modalities = []
self.messages = [] # Initialize messages list self.messages = [] # Initialize messages list
@ -394,46 +395,73 @@ class LiteLLM:
completion_params["runtime_args"] = runtime_args completion_params["runtime_args"] = runtime_args
def output_for_tools(self, response: any): def output_for_tools(self, response: any):
if self.mcp_call is True: """
Process tool calls from the LLM response and return formatted output.
Args:
response: The response object from the LLM API call
Returns:
dict or list: Formatted tool call data, or default response if no tool calls
"""
try:
# Convert response to dict if it's a Pydantic model
if hasattr(response, "model_dump"):
response_dict = response.model_dump()
else:
response_dict = response
# Check if tool_calls exists and is not None # Check if tool_calls exists and is not None
if ( if (
response.choices response_dict.get("choices")
and response.choices[0].message and response_dict["choices"][0].get("message")
and response.choices[0].message.tool_calls and response_dict["choices"][0]["message"].get(
and len(response.choices[0].message.tool_calls) > 0 "tool_calls"
):
out = (
response.choices[0].message.tool_calls[0].function
) )
output = { and len(
"function": { response_dict["choices"][0]["message"][
"name": out.name, "tool_calls"
"arguments": out.arguments, ]
)
> 0
):
tool_call = response_dict["choices"][0]["message"][
"tool_calls"
][0]
if "function" in tool_call:
return {
"function": {
"name": tool_call["function"].get(
"name", ""
),
"arguments": tool_call["function"].get(
"arguments", "{}"
),
}
} }
} else:
return output # Handle case where tool_call structure is different
return tool_call
else: else:
# Return a default response when no tool calls are present # Return a default response when no tool calls are present
logger.warning(
"No tool calls found in response, returning default response"
)
return { return {
"function": { "function": {
"name": "no_tool_call", "name": "no_tool_call",
"arguments": "{}", "arguments": "{}",
} }
} }
else: except Exception as e:
# Check if tool_calls exists and is not None logger.error(f"Error processing tool calls: {str(e)}")
if ( # Return a safe default response
response.choices return {
and response.choices[0].message "function": {
and response.choices[0].message.tool_calls "name": "error_processing_tool_calls",
): "arguments": f'{{"error": "{str(e)}"}}',
out = response.choices[0].message.tool_calls }
if isinstance(out, BaseModel): }
out = out.model_dump()
return out
else:
# Return empty list when no tool calls are present
return []
def output_for_reasoning(self, response: any): def output_for_reasoning(self, response: any):
""" """
@ -886,6 +914,11 @@ class LiteLLM:
if self.base_url is not None: if self.base_url is not None:
completion_params["base_url"] = self.base_url completion_params["base_url"] = self.base_url
if self.response_format is not None:
completion_params["response_format"] = (
self.response_format
)
# Add modalities if needed # Add modalities if needed
if self.modalities and len(self.modalities) >= 2: if self.modalities and len(self.modalities) >= 2:
completion_params["modalities"] = self.modalities completion_params["modalities"] = self.modalities
@ -914,6 +947,14 @@ class LiteLLM:
# Make the completion call # Make the completion call
response = completion(**completion_params) response = completion(**completion_params)
# print(response)
# Validate response
if not response:
logger.error(
"Received empty response from completion call"
)
return None
# Handle streaming response # Handle streaming response
if self.stream: if self.stream:
@ -928,25 +969,19 @@ class LiteLLM:
# Handle tool-based response # Handle tool-based response
elif self.tools_list_dictionary is not None: elif self.tools_list_dictionary is not None:
return self.output_for_tools(response) result = self.output_for_tools(response)
return result
elif self.return_all is True: elif self.return_all is True:
return response.model_dump() return response.model_dump()
elif "gemini" in self.model_name.lower(): elif "gemini" in self.model_name.lower():
return gemini_output_img_handler(response) return gemini_output_img_handler(response)
else: else:
# For non-Gemini models, return the content directly
return response.choices[0].message.content return response.choices[0].message.content
except LiteLLMException as error: except LiteLLMException as error:
logger.error( logger.error(
f"Error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}" f"Error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}"
) )
if "rate_limit" in str(error).lower():
logger.warning(
"Rate limit hit, retrying with exponential backoff..."
)
return self.run(task, audio, img, *args, **kwargs)
raise error
def __call__(self, task: str, *args, **kwargs): def __call__(self, task: str, *args, **kwargs):
""" """

Loading…
Cancel
Save