parent
acd84efe46
commit
cd569322d3
@ -0,0 +1,331 @@
|
||||
# MultiAgentRouter Documentation
|
||||
|
||||
The MultiAgentRouter is a sophisticated task routing system that efficiently delegates tasks to specialized AI agents. It uses a "boss" agent to analyze incoming tasks and route them to the most appropriate specialized agent based on their capabilities and expertise.
|
||||
|
||||
## Table of Contents
|
||||
- [Installation](#installation)
|
||||
- [Key Components](#key-components)
|
||||
- [Arguments](#arguments)
|
||||
- [Methods](#methods)
|
||||
- [Usage Examples](#usage-examples)
|
||||
- [Healthcare](#healthcare-example)
|
||||
- [Finance](#finance-example)
|
||||
- [Legal](#legal-example)
|
||||
- [Research](#research-example)
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install swarms
|
||||
```
|
||||
|
||||
## Key Components
|
||||
|
||||
### Arguments Table
|
||||
|
||||
| Argument | Type | Default | Description |
|
||||
|----------|------|---------|-------------|
|
||||
| name | str | "swarm-router" | Name identifier for the router instance |
|
||||
| description | str | "Routes tasks..." | Description of the router's purpose |
|
||||
| agents | List[Agent] | [] | List of available specialized agents |
|
||||
| model | str | "gpt-4o-mini" | Base language model for the boss agent |
|
||||
| temperature | float | 0.1 | Temperature parameter for model outputs |
|
||||
| shared_memory_system | callable | None | Optional shared memory system |
|
||||
| output_type | Literal["json", "string"] | "json" | Format of agent outputs |
|
||||
| execute_task | bool | True | Whether to execute routed tasks |
|
||||
|
||||
### Methods Table
|
||||
|
||||
| Method | Arguments | Returns | Description |
|
||||
|--------|-----------|---------|-------------|
|
||||
| route_task | task: str | dict | Routes a single task to appropriate agent |
|
||||
| batch_route | tasks: List[str] | List[dict] | Sequentially routes multiple tasks |
|
||||
| concurrent_batch_route | tasks: List[str] | List[dict] | Concurrently routes multiple tasks |
|
||||
| query_ragent | task: str | str | Queries the research agent |
|
||||
| find_agent_in_list | agent_name: str | Optional[Agent] | Finds agent by name |
|
||||
|
||||
## Production Examples
|
||||
|
||||
### Healthcare Example
|
||||
|
||||
```python
|
||||
from swarms import Agent, MultiAgentRouter
|
||||
|
||||
# Define specialized healthcare agents
|
||||
agents = [
|
||||
Agent(
|
||||
agent_name="DiagnosisAgent",
|
||||
description="Specializes in preliminary symptom analysis and diagnostic suggestions",
|
||||
system_prompt="""You are a medical diagnostic assistant. Analyze symptoms and provide
|
||||
evidence-based diagnostic suggestions, always noting this is for informational purposes
|
||||
only and recommending professional medical consultation.""",
|
||||
model_name="openai/gpt-4o"
|
||||
),
|
||||
Agent(
|
||||
agent_name="TreatmentPlanningAgent",
|
||||
description="Assists in creating treatment plans and medical documentation",
|
||||
system_prompt="""You are a treatment planning assistant. Help create structured
|
||||
treatment plans based on confirmed diagnoses, following medical best practices
|
||||
and guidelines.""",
|
||||
model_name="openai/gpt-4o"
|
||||
),
|
||||
Agent(
|
||||
agent_name="MedicalResearchAgent",
|
||||
description="Analyzes medical research papers and clinical studies",
|
||||
system_prompt="""You are a medical research analyst. Analyze and summarize medical
|
||||
research papers, clinical trials, and scientific studies, providing evidence-based
|
||||
insights.""",
|
||||
model_name="openai/gpt-4o"
|
||||
)
|
||||
]
|
||||
|
||||
# Initialize router
|
||||
healthcare_router = MultiAgentRouter(
|
||||
name="Healthcare-Router",
|
||||
description="Routes medical and healthcare-related tasks to specialized agents",
|
||||
agents=agents,
|
||||
model="gpt-4o",
|
||||
temperature=0.1
|
||||
)
|
||||
|
||||
# Example usage
|
||||
try:
|
||||
# Process medical case
|
||||
case_analysis = healthcare_router.route_task(
|
||||
"""Patient presents with:
|
||||
- Persistent dry cough for 3 weeks
|
||||
- Mild fever (38.1°C)
|
||||
- Fatigue
|
||||
Analyze symptoms and suggest potential diagnoses for healthcare provider review."""
|
||||
)
|
||||
|
||||
# Research treatment options
|
||||
treatment_research = healthcare_router.route_task(
|
||||
"""Find recent clinical studies on treatment efficacy for community-acquired
|
||||
pneumonia in adult patients, focusing on outpatient care."""
|
||||
)
|
||||
|
||||
# Process multiple cases concurrently
|
||||
cases = [
|
||||
"Case 1: Patient symptoms...",
|
||||
"Case 2: Patient symptoms...",
|
||||
"Case 3: Patient symptoms..."
|
||||
]
|
||||
concurrent_results = healthcare_router.concurrent_batch_route(cases)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in healthcare processing: {str(e)}")
|
||||
```
|
||||
|
||||
### Finance Example
|
||||
|
||||
```python
|
||||
# Define specialized finance agents
|
||||
finance_agents = [
|
||||
Agent(
|
||||
agent_name="MarketAnalysisAgent",
|
||||
description="Analyzes market trends and provides trading insights",
|
||||
system_prompt="""You are a financial market analyst. Analyze market data, trends,
|
||||
and indicators to provide evidence-based market insights and trading suggestions.""",
|
||||
model_name="openai/gpt-4o"
|
||||
),
|
||||
Agent(
|
||||
agent_name="RiskAssessmentAgent",
|
||||
description="Evaluates financial risks and compliance requirements",
|
||||
system_prompt="""You are a risk assessment specialist. Analyze financial data
|
||||
and operations for potential risks, ensuring regulatory compliance and suggesting
|
||||
risk mitigation strategies.""",
|
||||
model_name="openai/gpt-4o"
|
||||
),
|
||||
Agent(
|
||||
agent_name="InvestmentAgent",
|
||||
description="Provides investment strategies and portfolio management",
|
||||
system_prompt="""You are an investment strategy specialist. Develop and analyze
|
||||
investment strategies, portfolio allocations, and provide long-term financial
|
||||
planning guidance.""",
|
||||
model_name="openai/gpt-4o"
|
||||
)
|
||||
]
|
||||
|
||||
# Initialize finance router
|
||||
finance_router = MultiAgentRouter(
|
||||
name="Finance-Router",
|
||||
description="Routes financial analysis and investment tasks",
|
||||
agents=finance_agents
|
||||
)
|
||||
|
||||
# Example tasks
|
||||
tasks = [
|
||||
"""Analyze current market conditions for technology sector, focusing on:
|
||||
- AI/ML companies
|
||||
- Semiconductor manufacturers
|
||||
- Cloud service providers
|
||||
Provide risk assessment and investment opportunities.""",
|
||||
|
||||
"""Develop a diversified portfolio strategy for a conservative investor with:
|
||||
- Investment horizon: 10 years
|
||||
- Risk tolerance: Low to medium
|
||||
- Initial investment: $500,000
|
||||
- Monthly contribution: $5,000""",
|
||||
|
||||
"""Conduct risk assessment for a fintech startup's crypto trading platform:
|
||||
- Regulatory compliance requirements
|
||||
- Security measures
|
||||
- Operational risks
|
||||
- Market risks"""
|
||||
]
|
||||
|
||||
# Process tasks concurrently
|
||||
results = finance_router.concurrent_batch_route(tasks)
|
||||
```
|
||||
|
||||
### Legal Example
|
||||
|
||||
```python
|
||||
# Define specialized legal agents
|
||||
legal_agents = [
|
||||
Agent(
|
||||
agent_name="ContractAnalysisAgent",
|
||||
description="Analyzes legal contracts and documents",
|
||||
system_prompt="""You are a legal document analyst. Review contracts and legal
|
||||
documents for key terms, potential issues, and compliance requirements.""",
|
||||
model_name="openai/gpt-4o"
|
||||
),
|
||||
Agent(
|
||||
agent_name="ComplianceAgent",
|
||||
description="Ensures regulatory compliance and updates",
|
||||
system_prompt="""You are a legal compliance specialist. Monitor and analyze
|
||||
regulatory requirements, ensuring compliance and suggesting necessary updates
|
||||
to policies and procedures.""",
|
||||
model_name="openai/gpt-4o"
|
||||
),
|
||||
Agent(
|
||||
agent_name="LegalResearchAgent",
|
||||
description="Conducts legal research and case analysis",
|
||||
system_prompt="""You are a legal researcher. Research relevant cases, statutes,
|
||||
and regulations, providing comprehensive legal analysis and citations.""",
|
||||
model_name="openai/gpt-4o"
|
||||
)
|
||||
]
|
||||
|
||||
# Initialize legal router
|
||||
legal_router = MultiAgentRouter(
|
||||
name="Legal-Router",
|
||||
description="Routes legal analysis and compliance tasks",
|
||||
agents=legal_agents
|
||||
)
|
||||
|
||||
# Example usage for legal department
|
||||
contract_analysis = legal_router.route_task(
|
||||
"""Review the following software licensing agreement:
|
||||
[contract text]
|
||||
|
||||
Analyze for:
|
||||
1. Key terms and conditions
|
||||
2. Potential risks and liabilities
|
||||
3. Compliance with current regulations
|
||||
4. Suggested modifications"""
|
||||
)
|
||||
```
|
||||
|
||||
## Error Handling and Best Practices
|
||||
|
||||
1. Always use try-except blocks for task routing:
|
||||
```python
|
||||
try:
|
||||
result = router.route_task(task)
|
||||
except Exception as e:
|
||||
logger.error(f"Task routing failed: {str(e)}")
|
||||
```
|
||||
|
||||
2. Monitor agent performance:
|
||||
```python
|
||||
if result["execution"]["execution_time"] > 5.0:
|
||||
logger.warning(f"Long execution time for task: {result['task']['original']}")
|
||||
```
|
||||
|
||||
3. Implement rate limiting for concurrent tasks:
|
||||
```python
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||
results = router.concurrent_batch_route(tasks)
|
||||
```
|
||||
|
||||
4. Regular agent validation:
|
||||
```python
|
||||
for agent in router.agents.values():
|
||||
if not agent.validate():
|
||||
logger.error(f"Agent validation failed: {agent.name}")
|
||||
```
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
1. Task Batching
|
||||
|
||||
- Group similar tasks together
|
||||
|
||||
- Use concurrent_batch_route for independent tasks
|
||||
|
||||
- Monitor memory usage with large batches
|
||||
|
||||
2. Model Selection
|
||||
|
||||
- Choose appropriate models based on task complexity
|
||||
|
||||
- Balance speed vs. accuracy requirements
|
||||
|
||||
- Consider cost implications
|
||||
|
||||
3. Response Caching
|
||||
|
||||
- Implement caching for frequently requested analyses
|
||||
|
||||
- Use shared memory system for repeated queries
|
||||
|
||||
- Regular cache invalidation for time-sensitive data
|
||||
|
||||
## Security Considerations
|
||||
|
||||
1. Data Privacy
|
||||
|
||||
- Implement data encryption
|
||||
|
||||
- Handle sensitive information appropriately
|
||||
|
||||
- Regular security audits
|
||||
|
||||
2. Access Control
|
||||
|
||||
- Implement role-based access
|
||||
|
||||
- Audit logging
|
||||
|
||||
- Regular permission reviews
|
||||
|
||||
## Monitoring and Logging
|
||||
|
||||
1. Performance Metrics
|
||||
|
||||
- Response times
|
||||
|
||||
- Success rates
|
||||
|
||||
- Error rates
|
||||
|
||||
- Resource utilization
|
||||
|
||||
2. Logging
|
||||
|
||||
- Use structured logging
|
||||
|
||||
- Implement log rotation
|
||||
|
||||
- Regular log analysis
|
||||
|
||||
3. Alerts
|
||||
|
||||
- Set up alerting for critical errors
|
||||
|
||||
- Monitor resource usage
|
||||
|
||||
- Track API rate limits
|
@ -0,0 +1,41 @@
|
||||
from swarms import Agent
|
||||
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
|
||||
|
||||
# Example usage:
|
||||
if __name__ == "__main__":
|
||||
# Define some example agents
|
||||
agents = [
|
||||
Agent(
|
||||
agent_name="ResearchAgent",
|
||||
description="Specializes in researching topics and providing detailed, factual information",
|
||||
system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.",
|
||||
model_name="openai/gpt-4o",
|
||||
),
|
||||
Agent(
|
||||
agent_name="CodeExpertAgent",
|
||||
description="Expert in writing, reviewing, and explaining code across multiple programming languages",
|
||||
system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.",
|
||||
model_name="openai/gpt-4o",
|
||||
),
|
||||
Agent(
|
||||
agent_name="WritingAgent",
|
||||
description="Skilled in creative and technical writing, content creation, and editing",
|
||||
system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.",
|
||||
model_name="openai/gpt-4o",
|
||||
),
|
||||
]
|
||||
|
||||
# Initialize routers with different configurations
|
||||
router_execute = MultiAgentRouter(agents=agents, execute_task=True)
|
||||
|
||||
# Example task
|
||||
task = "Write a Python function to calculate fibonacci numbers"
|
||||
|
||||
try:
|
||||
# Process the task with execution
|
||||
print("\nWith task execution:")
|
||||
result_execute = router_execute.route_task(task)
|
||||
print(result_execute)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error occurred: {str(e)}")
|
@ -0,0 +1,401 @@
|
||||
"""
|
||||
Todo:
|
||||
|
||||
- Add multi-agent selection for a task and then run them automatically
|
||||
- Add shared memory for large instances of agents
|
||||
|
||||
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
class AgentResponse(BaseModel):
|
||||
"""Response from the boss agent indicating which agent should handle the task"""
|
||||
|
||||
selected_agent: str = Field(
|
||||
description="Name of the agent selected to handle the task"
|
||||
)
|
||||
reasoning: str = Field(
|
||||
description="Explanation for why this agent was selected"
|
||||
)
|
||||
modified_task: Optional[str] = Field(
|
||||
None, description="Optional modified version of the task"
|
||||
)
|
||||
|
||||
|
||||
class OpenAIFunctionCaller:
|
||||
"""
|
||||
A class to interact with the OpenAI API for generating text based on a system prompt and a task.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
system_prompt: str,
|
||||
api_key: str,
|
||||
temperature: float,
|
||||
max_tokens: int = 4000,
|
||||
model_name: str = "gpt-4-0125-preview",
|
||||
):
|
||||
self.system_prompt = system_prompt
|
||||
self.api_key = api_key
|
||||
self.temperature = temperature
|
||||
self.max_tokens = max_tokens
|
||||
self.model_name = model_name
|
||||
|
||||
try:
|
||||
from openai import OpenAI
|
||||
except ImportError:
|
||||
logger.error(
|
||||
"OpenAI library not found. Please install it using 'pip install openai'"
|
||||
)
|
||||
subprocess.run(["pip", "install", "openai"])
|
||||
raise
|
||||
|
||||
try:
|
||||
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error initializing OpenAI client: {str(e)}"
|
||||
)
|
||||
raise
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
)
|
||||
def get_completion(self, task: str) -> AgentResponse:
|
||||
"""Get completion from OpenAI with retries"""
|
||||
try:
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model_name,
|
||||
messages=[
|
||||
{"role": "system", "content": self.system_prompt},
|
||||
{"role": "user", "content": task},
|
||||
],
|
||||
response_format={"type": "json_object"},
|
||||
temperature=self.temperature,
|
||||
max_tokens=self.max_tokens,
|
||||
)
|
||||
|
||||
return AgentResponse.model_validate_json(
|
||||
response.choices[0].message.content
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting completion: {str(e)}")
|
||||
raise
|
||||
|
||||
def get_agent_response(
|
||||
self, system_prompt: str, task: str
|
||||
) -> str:
|
||||
"""Get agent response without function calling"""
|
||||
try:
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model_name,
|
||||
messages=[
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": task},
|
||||
],
|
||||
temperature=self.temperature,
|
||||
max_tokens=self.max_tokens,
|
||||
)
|
||||
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting agent response: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
class MultiAgentRouter:
|
||||
"""
|
||||
Routes tasks to appropriate agents based on their capabilities.
|
||||
|
||||
This class is responsible for managing a pool of agents and routing incoming tasks to the most suitable agent. It uses a boss agent to analyze the task and select the best agent for the job. The boss agent's decision is based on the capabilities and descriptions of the available agents.
|
||||
|
||||
Attributes:
|
||||
name (str): The name of the router.
|
||||
description (str): A description of the router's purpose.
|
||||
agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object.
|
||||
api_key (str): The API key for OpenAI.
|
||||
output_type (str): The type of output expected from the agents. Can be either "json" or "string".
|
||||
execute_task (bool): A flag indicating whether the task should be executed by the selected agent.
|
||||
boss_system_prompt (str): A system prompt for the boss agent that includes information about all available agents.
|
||||
function_caller (OpenAIFunctionCaller): An instance of OpenAIFunctionCaller for calling the boss agent.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "swarm-router",
|
||||
description: str = "Routes tasks to specialized agents based on their capabilities",
|
||||
agents: List[Agent] = [],
|
||||
model: str = "gpt-4o-mini",
|
||||
temperature: float = 0.1,
|
||||
shared_memory_system: callable = None,
|
||||
output_type: Literal["json", "string"] = "json",
|
||||
execute_task: bool = True,
|
||||
):
|
||||
"""
|
||||
Initializes the MultiAgentRouter with a list of agents and configuration options.
|
||||
|
||||
Args:
|
||||
name (str, optional): The name of the router. Defaults to "swarm-router".
|
||||
description (str, optional): A description of the router's purpose. Defaults to "Routes tasks to specialized agents based on their capabilities".
|
||||
agents (List[Agent], optional): A list of agents to be managed by the router. Defaults to an empty list.
|
||||
model (str, optional): The model to use for the boss agent. Defaults to "gpt-4-0125-preview".
|
||||
temperature (float, optional): The temperature for the boss agent's model. Defaults to 0.1.
|
||||
output_type (Literal["json", "string"], optional): The type of output expected from the agents. Defaults to "json".
|
||||
execute_task (bool, optional): A flag indicating whether the task should be executed by the selected agent. Defaults to True.
|
||||
"""
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.shared_memory_system = shared_memory_system
|
||||
self.agents = {agent.name: agent for agent in agents}
|
||||
self.api_key = os.getenv("OPENAI_API_KEY")
|
||||
if not self.api_key:
|
||||
raise ValueError("OpenAI API key must be provided")
|
||||
|
||||
self.output_type = output_type
|
||||
self.execute_task = execute_task
|
||||
self.boss_system_prompt = self._create_boss_system_prompt()
|
||||
|
||||
# Initialize the function caller
|
||||
self.function_caller = OpenAIFunctionCaller(
|
||||
system_prompt=self.boss_system_prompt,
|
||||
api_key=self.api_key,
|
||||
temperature=temperature,
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"MultiAgentRouter(name={self.name}, agents={list(self.agents.keys())})"
|
||||
|
||||
def query_ragent(self, task: str) -> str:
|
||||
"""Query the ResearchAgent"""
|
||||
return self.shared_memory_system.query(task)
|
||||
|
||||
def _create_boss_system_prompt(self) -> str:
|
||||
"""
|
||||
Creates a system prompt for the boss agent that includes information about all available agents.
|
||||
|
||||
Returns:
|
||||
str: The system prompt for the boss agent.
|
||||
"""
|
||||
agent_descriptions = "\n".join(
|
||||
[
|
||||
f"- {name}: {agent.description}"
|
||||
for name, agent in self.agents.items()
|
||||
]
|
||||
)
|
||||
|
||||
return f"""You are a boss agent responsible for routing tasks to the most appropriate specialized agent.
|
||||
Available agents:
|
||||
{agent_descriptions}
|
||||
|
||||
Your job is to:
|
||||
1. Analyze the incoming task
|
||||
2. Select the most appropriate agent based on their descriptions
|
||||
3. Provide clear reasoning for your selection
|
||||
4. Optionally modify the task to better suit the selected agent's capabilities
|
||||
|
||||
You must respond with JSON that contains:
|
||||
- selected_agent: Name of the chosen agent (must be one of the available agents)
|
||||
- reasoning: Brief explanation of why this agent was selected
|
||||
- modified_task: (Optional) A modified version of the task if needed
|
||||
|
||||
Always select exactly one agent that best matches the task requirements.
|
||||
"""
|
||||
|
||||
def find_agent_in_list(self, agent_name: str) -> Optional[Agent]:
|
||||
"""
|
||||
Find an agent by name in a list of agents.
|
||||
|
||||
Args:
|
||||
agent_name (str): The name of the agent to find.
|
||||
|
||||
Returns:
|
||||
Optional[Agent]: The agent object if found, otherwise None.
|
||||
"""
|
||||
for agent in self.agent_list:
|
||||
if agent.name == agent_name:
|
||||
return agent
|
||||
return None
|
||||
|
||||
def route_task(self, task: str) -> dict:
|
||||
"""
|
||||
Routes a task to the appropriate agent and returns their response.
|
||||
|
||||
Args:
|
||||
task (str): The task to be routed.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the routing result, including the selected agent, reasoning, and response.
|
||||
"""
|
||||
try:
|
||||
start_time = datetime.now(UTC)
|
||||
|
||||
# Get boss decision using function calling
|
||||
boss_response = self.function_caller.get_completion(task)
|
||||
|
||||
# Validate that the selected agent exists
|
||||
if boss_response.selected_agent not in self.agents:
|
||||
raise ValueError(
|
||||
f"Boss selected unknown agent: {boss_response.selected_agent}"
|
||||
)
|
||||
|
||||
# Get the selected agent
|
||||
selected_agent = self.agents[boss_response.selected_agent]
|
||||
|
||||
# Use the modified task if provided, otherwise use original task
|
||||
final_task = boss_response.modified_task or task
|
||||
|
||||
# Execute the task with the selected agent if enabled
|
||||
execution_start = datetime.now(UTC)
|
||||
agent_response = None
|
||||
execution_time = 0
|
||||
|
||||
if self.execute_task:
|
||||
# Use the agent's run method directly
|
||||
agent_response = selected_agent.run(final_task)
|
||||
execution_time = (
|
||||
datetime.now(UTC) - execution_start
|
||||
).total_seconds()
|
||||
else:
|
||||
logger.info(
|
||||
"Task execution skipped (execute_task=False)"
|
||||
)
|
||||
|
||||
total_time = (
|
||||
datetime.now(UTC) - start_time
|
||||
).total_seconds()
|
||||
|
||||
result = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"task": {
|
||||
"original": task,
|
||||
"modified": (
|
||||
final_task
|
||||
if boss_response.modified_task
|
||||
else None
|
||||
),
|
||||
},
|
||||
"boss_decision": {
|
||||
"selected_agent": boss_response.selected_agent,
|
||||
"reasoning": boss_response.reasoning,
|
||||
},
|
||||
"execution": {
|
||||
"agent_name": selected_agent.name,
|
||||
"agent_id": selected_agent.id,
|
||||
"was_executed": self.execute_task,
|
||||
"response": (
|
||||
agent_response if self.execute_task else None
|
||||
),
|
||||
"execution_time": (
|
||||
execution_time if self.execute_task else None
|
||||
),
|
||||
},
|
||||
"total_time": total_time,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
f"Successfully routed task to {selected_agent.name}"
|
||||
)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error routing task: {str(e)}")
|
||||
raise
|
||||
|
||||
def batch_route(self, tasks: List[str] = []):
|
||||
"""Batch route tasks to the appropriate agents"""
|
||||
results = []
|
||||
for task in tasks:
|
||||
try:
|
||||
result = self.route_task(task)
|
||||
results.append(result)
|
||||
except Exception as e:
|
||||
logger.error(f"Error routing task: {str(e)}")
|
||||
return results
|
||||
|
||||
def concurrent_batch_route(self, tasks: List[str] = []):
|
||||
"""Concurrently route tasks to the appropriate agents"""
|
||||
import concurrent.futures
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
results = []
|
||||
with ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(self.route_task, task)
|
||||
for task in tasks
|
||||
]
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
result = future.result()
|
||||
results.append(result)
|
||||
except Exception as e:
|
||||
logger.error(f"Error routing task: {str(e)}")
|
||||
return results
|
||||
|
||||
|
||||
# # Example usage:
|
||||
# if __name__ == "__main__":
|
||||
# # Define some example agents
|
||||
# agents = [
|
||||
# Agent(
|
||||
# agent_name="ResearchAgent",
|
||||
# description="Specializes in researching topics and providing detailed, factual information",
|
||||
# system_prompt="You are a research specialist. Provide detailed, well-researched information about any topic, citing sources when possible.",
|
||||
# model_name="openai/gpt-4o",
|
||||
# ),
|
||||
# Agent(
|
||||
# agent_name="CodeExpertAgent",
|
||||
# description="Expert in writing, reviewing, and explaining code across multiple programming languages",
|
||||
# system_prompt="You are a coding expert. Write, review, and explain code with a focus on best practices and clean code principles.",
|
||||
# model_name="openai/gpt-4o",
|
||||
# ),
|
||||
# Agent(
|
||||
# agent_name="WritingAgent",
|
||||
# description="Skilled in creative and technical writing, content creation, and editing",
|
||||
# system_prompt="You are a writing specialist. Create, edit, and improve written content while maintaining appropriate tone and style.",
|
||||
# model_name="openai/gpt-4o",
|
||||
# ),
|
||||
# ]
|
||||
|
||||
# # Initialize routers with different configurations
|
||||
# router_execute = MultiAgentRouter(agents=agents, execute_task=True)
|
||||
# # router_no_execute = MultiAgentRouter(agents=agents, execute_task=False)
|
||||
|
||||
# # Example task
|
||||
# task = "Write a Python function to calculate fibonacci numbers"
|
||||
|
||||
# try:
|
||||
# # Process the task with execution
|
||||
# print("\nWith task execution:")
|
||||
# result_execute = router_execute.route_task(task)
|
||||
# print(
|
||||
# f"Selected Agent: {result_execute['boss_decision']['selected_agent']}"
|
||||
# )
|
||||
# print(
|
||||
# f"Reasoning: {result_execute['boss_decision']['reasoning']}"
|
||||
# )
|
||||
# if result_execute["execution"]["response"]:
|
||||
# print(
|
||||
# f"Response Preview: {result_execute['execution']['response'][:200]}..."
|
||||
# )
|
||||
# print(
|
||||
# f"Execution Time: {result_execute['execution']['execution_time']:.2f}s"
|
||||
# )
|
||||
# print(f"Total Time: {result_execute['total_time']:.2f}s")
|
||||
|
||||
# except Exception as e:
|
||||
# print(f"Error occurred: {str(e)}")
|
@ -0,0 +1,23 @@
|
||||
import time
|
||||
from typing import List
|
||||
import uuid
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class AgentResponde(BaseModel):
|
||||
id: str = Field(default=uuid.uuid4().hex)
|
||||
timestamp: str = Field(default=time.time())
|
||||
agent_position: int = Field(description="Agent in swarm position")
|
||||
agent_name: str
|
||||
agent_response: str = Field(description="Agent response")
|
||||
|
||||
|
||||
class SwarmOutput(BaseModel):
|
||||
id: str = Field(default=uuid.uuid4().hex)
|
||||
timestamp: str = Field(default=time.time())
|
||||
name: str = Field(description="Swarm name")
|
||||
description: str = Field(description="Swarm description")
|
||||
swarm_type: str = Field(description="Swarm type")
|
||||
agent_outputs: List[AgentResponde] = Field(
|
||||
description="List of agent responses"
|
||||
)
|
@ -0,0 +1,219 @@
|
||||
import os
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
|
||||
|
||||
|
||||
def create_test_agent(name: str) -> Agent:
|
||||
"""Helper function to create a test agent"""
|
||||
return Agent(
|
||||
agent_name=name,
|
||||
description=f"Test {name}",
|
||||
system_prompt=f"You are a {name}",
|
||||
model_name="openai/gpt-4o",
|
||||
)
|
||||
|
||||
|
||||
def test_boss_router_initialization():
|
||||
"""Test MultiAgentRouter initialization"""
|
||||
print("\nTesting MultiAgentRouter initialization...")
|
||||
|
||||
# Test successful initialization
|
||||
try:
|
||||
agents = [
|
||||
create_test_agent("TestAgent1"),
|
||||
create_test_agent("TestAgent2"),
|
||||
]
|
||||
router = MultiAgentRouter(agents=agents)
|
||||
assert (
|
||||
router.name == "swarm-router"
|
||||
), "Default name should be 'swarm-router'"
|
||||
assert len(router.agents) == 2, "Should have 2 agents"
|
||||
print("✓ Basic initialization successful")
|
||||
except Exception as e:
|
||||
print(f"✗ Basic initialization failed: {str(e)}")
|
||||
|
||||
# Test initialization without API key
|
||||
try:
|
||||
temp_key = os.getenv("OPENAI_API_KEY")
|
||||
os.environ["OPENAI_API_KEY"] = ""
|
||||
success = False
|
||||
try:
|
||||
router = MultiAgentRouter(agents=[])
|
||||
except ValueError as e:
|
||||
success = str(e) == "OpenAI API key must be provided"
|
||||
os.environ["OPENAI_API_KEY"] = temp_key
|
||||
assert (
|
||||
success
|
||||
), "Should raise ValueError when API key is missing"
|
||||
print("✓ API key validation successful")
|
||||
except Exception as e:
|
||||
print(f"✗ API key validation failed: {str(e)}")
|
||||
|
||||
|
||||
def test_boss_system_prompt():
|
||||
"""Test system prompt generation"""
|
||||
print("\nTesting system prompt generation...")
|
||||
|
||||
try:
|
||||
agents = [
|
||||
create_test_agent("Agent1"),
|
||||
create_test_agent("Agent2"),
|
||||
]
|
||||
router = MultiAgentRouter(agents=agents)
|
||||
prompt = router._create_boss_system_prompt()
|
||||
|
||||
# Check if prompt contains agent information
|
||||
assert (
|
||||
"Agent1" in prompt
|
||||
), "Prompt should contain first agent name"
|
||||
assert (
|
||||
"Agent2" in prompt
|
||||
), "Prompt should contain second agent name"
|
||||
assert (
|
||||
"You are a boss agent" in prompt
|
||||
), "Prompt should contain boss agent description"
|
||||
print("✓ System prompt generation successful")
|
||||
except Exception as e:
|
||||
print(f"✗ System prompt generation failed: {str(e)}")
|
||||
|
||||
|
||||
def test_find_agent_in_list():
|
||||
"""Test agent finding functionality"""
|
||||
print("\nTesting agent finding functionality...")
|
||||
|
||||
try:
|
||||
agent1 = create_test_agent("Agent1")
|
||||
agent2 = create_test_agent("Agent2")
|
||||
router = MultiAgentRouter(agents=[agent1, agent2])
|
||||
|
||||
# Test finding existing agent
|
||||
assert "Agent1" in router.agents, "Should find existing agent"
|
||||
assert (
|
||||
"NonexistentAgent" not in router.agents
|
||||
), "Should not find nonexistent agent"
|
||||
print("✓ Agent finding successful")
|
||||
except Exception as e:
|
||||
print(f"✗ Agent finding failed: {str(e)}")
|
||||
|
||||
|
||||
def test_task_routing():
|
||||
"""Test task routing functionality"""
|
||||
print("\nTesting task routing...")
|
||||
|
||||
try:
|
||||
# Create test agents
|
||||
agents = [
|
||||
create_test_agent("CodeAgent"),
|
||||
create_test_agent("WritingAgent"),
|
||||
]
|
||||
router = MultiAgentRouter(agents=agents)
|
||||
|
||||
# Test routing a coding task
|
||||
result = router.route_task(
|
||||
"Write a Python function to sort a list"
|
||||
)
|
||||
assert result["boss_decision"]["selected_agent"] in [
|
||||
"CodeAgent",
|
||||
"WritingAgent",
|
||||
], "Should select an appropriate agent"
|
||||
assert (
|
||||
"execution" in result
|
||||
), "Result should contain execution details"
|
||||
assert (
|
||||
"total_time" in result
|
||||
), "Result should contain timing information"
|
||||
print("✓ Task routing successful")
|
||||
except Exception as e:
|
||||
print(f"✗ Task routing failed: {str(e)}")
|
||||
|
||||
|
||||
def test_batch_routing():
|
||||
"""Test batch routing functionality"""
|
||||
print("\nTesting batch routing...")
|
||||
|
||||
try:
|
||||
agents = [create_test_agent("TestAgent")]
|
||||
router = MultiAgentRouter(agents=agents)
|
||||
|
||||
tasks = ["Task 1", "Task 2", "Task 3"]
|
||||
|
||||
# Test sequential batch routing
|
||||
results = router.batch_route(tasks)
|
||||
assert len(results) == len(
|
||||
tasks
|
||||
), "Should return result for each task"
|
||||
print("✓ Sequential batch routing successful")
|
||||
|
||||
# Test concurrent batch routing
|
||||
concurrent_results = router.concurrent_batch_route(tasks)
|
||||
assert len(concurrent_results) == len(
|
||||
tasks
|
||||
), "Should return result for each task"
|
||||
print("✓ Concurrent batch routing successful")
|
||||
except Exception as e:
|
||||
print(f"✗ Batch routing failed: {str(e)}")
|
||||
|
||||
|
||||
def test_error_handling():
|
||||
"""Test error handling in various scenarios"""
|
||||
print("\nTesting error handling...")
|
||||
|
||||
try:
|
||||
router = MultiAgentRouter(agents=[])
|
||||
|
||||
# Test routing with no agents
|
||||
success = False
|
||||
try:
|
||||
router.route_task("Test task")
|
||||
except Exception:
|
||||
success = True
|
||||
assert success, "Should handle routing with no agents"
|
||||
print("✓ Empty agent list handling successful")
|
||||
|
||||
# Test with invalid task
|
||||
success = False
|
||||
router = MultiAgentRouter(
|
||||
agents=[create_test_agent("TestAgent")]
|
||||
)
|
||||
try:
|
||||
router.route_task("")
|
||||
except ValueError:
|
||||
success = True
|
||||
assert success, "Should handle empty task"
|
||||
print("✓ Invalid task handling successful")
|
||||
except Exception as e:
|
||||
print(f"✗ Error handling failed: {str(e)}")
|
||||
|
||||
|
||||
def run_all_tests():
|
||||
"""Run all test functions"""
|
||||
print("Starting MultiAgentRouter tests...")
|
||||
|
||||
test_functions = [
|
||||
test_boss_router_initialization,
|
||||
test_boss_system_prompt,
|
||||
test_find_agent_in_list,
|
||||
test_task_routing,
|
||||
test_batch_routing,
|
||||
test_error_handling,
|
||||
]
|
||||
|
||||
total_tests = len(test_functions)
|
||||
passed_tests = 0
|
||||
|
||||
for test_func in test_functions:
|
||||
try:
|
||||
test_func()
|
||||
passed_tests += 1
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Test {test_func.__name__} failed with error: {str(e)}"
|
||||
)
|
||||
|
||||
print(
|
||||
f"\nTest Results: {passed_tests}/{total_tests} tests passed"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all_tests()
|
Loading…
Reference in new issue