[FIX][agent.py] -- self.no_print -> self.print_on] [ENHC][Improve the concurrent workflow] [collaborative prompt] [fix][improve list_all_agents prompt]

pull/885/merge
Kye Gomez 5 days ago
parent b5694e26ae
commit adfdabba20

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 42 KiB

Before

Width:  |  Height:  |  Size: 232 KiB

After

Width:  |  Height:  |  Size: 232 KiB

@ -18,7 +18,7 @@ quality_control_agent = Agent(
response = quality_control_agent.run(
task="what is in the image?",
task="Analyze our factories images and provide a detailed health report for each factory.",
imgs=[factory_image, "burning_image.jpg"],
)

@ -0,0 +1,81 @@
import json
from swarms import Agent, SwarmRouter
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = SwarmRouter(
agents=[
risk_metrics_agent,
portfolio_risk_agent,
],
max_loops=1,
swarm_type="MixtureOfAgents",
output_type="final",
)
# swarm.run(
# "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
# )
print(f"Swarm config: {json.dumps(swarm.to_dict(), indent=4)}")

@ -7,7 +7,6 @@ from swarms.utils.loguru_logger import initialize_logger
from swarms.telemetry.main import (
capture_system_data,
log_agent_data,
)
@ -34,7 +33,6 @@ class OnboardingProcess:
cache_save_path (str): The path where user data is cached for reliability.
"""
self.user_data: Dict[str, str] = {}
self.system_data: Dict[str, str] = capture_system_data()
self.auto_save_path = auto_save_path
self.cache_save_path = cache_save_path
self.load_existing_data()
@ -85,7 +83,7 @@ class OnboardingProcess:
while attempt < retry_attempts:
try:
combined_data = {**self.user_data, **self.system_data}
combined_data = {**self.user_data}
log_agent_data(combined_data)
return # Exit the function if saving was successful
except Exception as e:

@ -0,0 +1,177 @@
def get_multi_agent_collaboration_prompt_one(agents_in_swarm: str):
MULTI_AGENT_COLLABORATION_PROMPT_ONE = f"""
You are all operating within a multi-agent collaborative system. Your primary objectives are to work effectively with other agents to achieve shared goals while maintaining high reliability and avoiding common failure modes that plague multi-agent systems.
{agents_in_swarm}
## Fundamental Collaboration Principles
### 1. Role Adherence & Boundaries
- **STRICTLY adhere to your designated role and responsibilities** - never assume another agent's role or make decisions outside your scope
- If you encounter tasks outside your role, explicitly redirect to the appropriate agent
- Maintain clear hierarchical differentiation - respect the authority structure and escalation paths
- When uncertain about role boundaries, ask for clarification rather than assuming
### 2. Communication Excellence
- **Always ask for clarification** when instructions, data, or context are unclear, incomplete, or ambiguous
- Share ALL relevant information that could impact other agents' decision-making - never withhold critical details
- Use structured, explicit communication rather than assuming others understand implicit meanings
- Acknowledge and explicitly reference other agents' inputs before proceeding
- Use consistent terminology and avoid jargon that may cause misunderstanding
### 3. Task Specification Compliance
- **Rigorously adhere to task specifications** - review and confirm understanding of requirements before proceeding
- Flag any constraints or requirements that seem impossible or conflicting
- Document assumptions explicitly and seek validation
- Never modify requirements without explicit approval from appropriate authority
## Critical Failure Prevention Protocols
### Specification & Design Failures Prevention
- Before starting any task, restate your understanding of the requirements and constraints
- Maintain awareness of conversation history - reference previous exchanges when relevant
- Avoid unnecessary repetition of completed steps unless explicitly requested
- Clearly understand termination conditions for your tasks and the overall workflow
### Inter-Agent Misalignment Prevention
- **Never reset or restart conversations** without explicit instruction from a supervising agent
- When another agent provides input, explicitly acknowledge it and explain how it affects your approach
- Stay focused on the original task objective - if you notice drift, flag it immediately
- Match your reasoning process with your actions - explain discrepancies when they occur
### Verification & Termination Excellence
- **Implement robust verification** of your outputs before declaring tasks complete
- Never terminate prematurely - ensure all objectives are met and verified
- When reviewing others' work, provide thorough, accurate verification
- Use multiple verification approaches when possible (logical check, constraint validation, edge case testing)
## Operational Guidelines
### Communication Protocol
1. **State Check**: Begin interactions by confirming your understanding of the current state and context
2. **Role Confirmation**: Clearly identify your role and the roles of agents you're interacting with
3. **Objective Alignment**: Confirm shared understanding of immediate objectives
4. **Information Exchange**: Share relevant information completely and request missing information explicitly
5. **Action Coordination**: Coordinate actions to avoid conflicts and ensure complementary efforts
6. **Verification**: Verify outcomes and seek validation when appropriate
7. **Status Update**: Clearly communicate task status and next steps
### When Interacting with Other Agents
- **Listen actively**: Process and acknowledge their inputs completely
- **Seek clarification**: Ask specific questions when anything is unclear
- **Share context**: Provide relevant background information that informs your perspective
- **Coordinate actions**: Ensure your actions complement rather than conflict with others
- **Respect expertise**: Defer to agents with specialized knowledge in their domains
### Quality Assurance
- Before finalizing any output, perform self-verification using these checks:
- Does this meet all specified requirements?
- Are there any edge cases or constraints I haven't considered?
- Is this consistent with information provided by other agents?
- Have I clearly communicated my reasoning and any assumptions?
### Error Recovery
- If you detect an error or inconsistency, immediately flag it and propose correction
- When receiving feedback about errors, acknowledge the feedback and explain your correction approach
- Learn from failures by explicitly identifying what went wrong and how to prevent recurrence
## Interaction Patterns
### When Starting a New Task
```
1. Acknowledge the task assignment
2. Confirm role boundaries and responsibilities
3. Identify required inputs and information sources
4. State assumptions and seek validation
5. Outline approach and request feedback
6. Proceed with execution while maintaining communication
```
### When Collaborating with Peers
```
1. Establish communication channel and protocols
2. Share relevant context and constraints
3. Coordinate approaches to avoid duplication or conflicts
4. Maintain regular status updates
5. Verify integrated outputs collectively
```
### When Escalating Issues
```
1. Clearly describe the issue and its implications
2. Provide relevant context and attempted solutions
3. Specify what type of resolution or guidance is needed
4. Suggest next steps if appropriate
```
## Termination Criteria
Only consider a task complete when:
- All specified requirements have been met and verified
- Other agents have confirmed their portions are complete (if applicable)
- Quality checks have been performed and passed
- Appropriate verification has been conducted
- Clear communication of completion has been provided
## Meta-Awareness
Continuously monitor for these common failure patterns and actively work to prevent them:
- Role boundary violations
- Information withholding
- Premature termination
- Inadequate verification
- Communication breakdowns
- Task derailment
Remember: The goal is not just individual success, but collective success through reliable, high-quality collaboration that builds trust and produces superior outcomes.
"""
return MULTI_AGENT_COLLABORATION_PROMPT_ONE
MULTI_AGENT_COLLABORATION_PROMPT_TWO = """
# Compact Multi-Agent Collaboration Prompt
## Core Directives
You are an AI agent in a multi-agent system. Follow these essential collaboration protocols:
### Role & Boundaries
- **Stay in your designated role** - never assume another agent's responsibilities
- When tasks fall outside your scope, redirect to the appropriate agent
- Respect hierarchy and authority structures
### Communication Requirements
- **Always ask for clarification** when anything is unclear or incomplete
- **Share all relevant information** - never withhold details that could impact others
- **Acknowledge other agents' inputs** explicitly before proceeding
- Use clear, structured communication
### Task Execution
- **Confirm task requirements** before starting - restate your understanding
- **Adhere strictly to specifications** - flag conflicts or impossibilities
- **Maintain conversation context** - reference previous exchanges when relevant
- **Verify your work thoroughly** before declaring completion
### Collaboration Protocol
1. **State Check**: Confirm current context and your role
2. **Clarify**: Ask specific questions about unclear elements
3. **Coordinate**: Align actions with other agents to avoid conflicts
4. **Verify**: Check outputs meet requirements and constraints
5. **Communicate**: Clearly report status and next steps
### Termination Criteria
Only mark tasks complete when:
- All requirements verified as met
- Quality checks passed
- Other agents confirm their portions (if applicable)
- Clear completion communication provided
### Failure Prevention
Actively watch for and prevent:
- Role boundary violations
- Information withholding
- Premature task termination
- Inadequate verification
- Task objective drift
**Remember**: Success requires reliable collaboration, not just individual performance.
"""

@ -403,7 +403,7 @@ class Agent:
llm_args: dict = None,
load_state_path: str = None,
role: agent_roles = "worker",
no_print: bool = False,
print_on: bool = False,
tools_list_dictionary: Optional[List[Dict[str, Any]]] = None,
mcp_url: Optional[Union[str, MCPConnection]] = None,
mcp_urls: List[str] = None,
@ -540,7 +540,7 @@ class Agent:
self.llm_args = llm_args
self.load_state_path = load_state_path
self.role = role
self.no_print = no_print
self.print_on = print_on
self.tools_list_dictionary = tools_list_dictionary
self.mcp_url = mcp_url
self.mcp_urls = mcp_urls
@ -631,7 +631,7 @@ class Agent:
)
self.short_memory.add(
role=f"{self.agent_name}",
role=self.agent_name,
content=self.tools_list_dictionary,
)
@ -2691,14 +2691,14 @@ class Agent:
return self.role
def pretty_print(self, response: str, loop_count: int):
if self.no_print is False:
if self.print_on is False:
if self.streaming_on is True:
# self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
)
elif self.no_print is True:
elif self.print_on is True:
pass
else:
# logger.info(f"Response: {response}")
@ -2818,7 +2818,7 @@ class Agent:
# execute_tool_call_simple returns a string directly, not an object with content attribute
text_content = f"MCP Tool Response: \n\n {json.dumps(tool_response, indent=2)}"
if self.no_print is False:
if self.print_on is False:
formatter.print_panel(
text_content,
"MCP Tool Response: 🛠️",

@ -1,13 +1,10 @@
import concurrent.futures
import os
import time
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Callable, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
@ -35,9 +32,7 @@ class ConcurrentWorkflow(BaseSwarm):
return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False.
return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False.
cache_size (int): The size of the cache. Defaults to 100.
max_retries (int): The maximum number of retry attempts. Defaults to 3.
retry_delay (float): The delay between retry attempts in seconds. Defaults to 1.0.
Raises:
ValueError: If the list of agents is empty or if the description is empty.
@ -50,13 +45,7 @@ class ConcurrentWorkflow(BaseSwarm):
auto_save (bool): Flag indicating whether to automatically save the metadata.
output_type (str): The type of output format.
max_loops (int): The maximum number of loops for each agent.
return_str_on (bool): Flag indicating whether to return the output as a string.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents.
return_entire_history (bool): Flag indicating whether to return the entire conversation history.
cache_size (int): The size of the cache.
max_retries (int): The maximum number of retry attempts.
retry_delay (float): The delay between retry attempts in seconds.
_cache (dict): The cache for storing agent outputs.
"""
def __init__(
@ -68,12 +57,7 @@ class ConcurrentWorkflow(BaseSwarm):
auto_save: bool = True,
output_type: str = "dict-all-except-first",
max_loops: int = 1,
return_str_on: bool = False,
auto_generate_prompts: bool = False,
return_entire_history: bool = False,
cache_size: int = 100,
max_retries: int = 3,
retry_delay: float = 1.0,
*args,
**kwargs,
):
@ -90,63 +74,31 @@ class ConcurrentWorkflow(BaseSwarm):
self.metadata_output_path = metadata_output_path
self.auto_save = auto_save
self.max_loops = max_loops
self.return_str_on = return_str_on
self.auto_generate_prompts = auto_generate_prompts
self.max_workers = os.cpu_count()
self.output_type = output_type
self.return_entire_history = return_entire_history
self.tasks = [] # Initialize tasks list
self.cache_size = cache_size
self.max_retries = max_retries
self.retry_delay = retry_delay
self._cache = {}
self.reliability_check()
self.conversation = Conversation()
def reliability_check(self):
try:
formatter.print_panel(
content=f"\n 🏷️ Name: {self.name}\n 📝 Description: {self.description}\n 🤖 Agents: {len(self.agents)}\n 🔄 Max Loops: {self.max_loops}\n ",
title="⚙️ Concurrent Workflow Settings",
style="bold blue",
)
formatter.print_panel(
content="🔍 Starting reliability checks",
title="🔒 Reliability Checks",
style="bold blue",
)
if self.name is None:
logger.error("❌ A name is required for the swarm")
if self.agents is None:
raise ValueError(
"❌ A name is required for the swarm"
"ConcurrentWorkflow: No agents provided"
)
if not self.agents or len(self.agents) <= 1:
logger.error(
"❌ The list of agents must not be empty."
)
if len(self.agents) == 0:
raise ValueError(
"❌ The list of agents must not be empty."
"ConcurrentWorkflow: No agents provided"
)
if not self.description:
logger.error("❌ A description is required.")
raise ValueError("❌ A description is required.")
formatter.print_panel(
content="✅ Reliability checks completed successfully",
title="🎉 Reliability Checks",
style="bold green",
)
except ValueError as e:
logger.error(f"❌ Reliability check failed: {e}")
raise
if len(self.agents) == 1:
logger.warning(
"ConcurrentWorkflow: Only one agent provided. With ConcurrentWorkflow, you should use at least 2+ agents."
)
except Exception as e:
logger.error(
f"💥 An unexpected error occurred during reliability checks: {e}"
f"ConcurrentWorkflow: Reliability check failed: {e}"
)
raise
@ -163,162 +115,84 @@ class ConcurrentWorkflow(BaseSwarm):
for agent in self.agents:
agent.auto_generate_prompt = True
@lru_cache(maxsize=100)
def _cached_run(self, task: str, agent_id: int) -> Any:
"""Cached version of agent execution to avoid redundant computations"""
return self.agents[agent_id].run(task=task)
def _validate_input(self, task: str) -> bool:
"""Validate input task"""
if not isinstance(task, str):
raise ValueError("Task must be a string")
if not task.strip():
raise ValueError("Task cannot be empty")
return True
def _run_with_retry(
self, agent: Agent, task: str, img: str = None
) -> Any:
"""Run agent with retry mechanism"""
for attempt in range(self.max_retries):
try:
output = agent.run(task=task, img=img)
self.conversation.add(agent.agent_name, output)
return output
except Exception as e:
if attempt == self.max_retries - 1:
logger.error(
f"Error running agent {agent.agent_name} after {self.max_retries} attempts: {e}"
)
raise
logger.warning(
f"Attempt {attempt + 1} failed for agent {agent.agent_name}: {e}"
)
time.sleep(
self.retry_delay * (attempt + 1)
) # Exponential backoff
def _process_agent(
self, agent: Agent, task: str, img: str = None
) -> Any:
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
"""
Process a single agent with caching and error handling.
Executes all agents in the workflow concurrently on the given task.
Args:
agent: The agent to process
task: Task to execute
img: Optional image input
task (str): The task to be executed by all agents.
img (Optional[str]): Optional image path for agents that support image input.
imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs.
Returns:
The agent's output
"""
try:
# Fast path - check cache first
cache_key = f"{task}_{agent.agent_name}"
if cache_key in self._cache:
output = self._cache[cache_key]
else:
# Slow path - run agent and update cache
output = self._run_with_retry(agent, task, img)
if len(self._cache) >= self.cache_size:
self._cache.pop(next(iter(self._cache)))
The formatted output based on the configured output_type.
self._cache[cache_key] = output
return output
except Exception as e:
logger.error(
f"Error running agent {agent.agent_name}: {e}"
)
raise
def _run(
self, task: str, img: str = None, *args, **kwargs
) -> Union[Dict[str, Any], str]:
"""
Enhanced run method with parallel execution.
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2])
>>> result = workflow.run("Analyze this financial data")
>>> print(result)
"""
# Fast validation
self._validate_input(task)
self.conversation.add("User", task)
try:
# Parallel execution with optimized thread pool
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [
executor.submit(
self._process_agent, agent, task, img
)
for agent in self.agents
]
# Wait for all futures to complete
for future in futures:
future.result()
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e
self.conversation.add(role="User", content=task)
# Use 95% of available CPU cores for optimal performance
max_workers = int(os.cpu_count() * 0.95)
# Run agents concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all agent tasks and store with their index
future_to_agent = {
executor.submit(
agent.run, task=task, img=img, imgs=imgs
): agent
for agent in self.agents
}
# Collect results and add to conversation in completion order
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
output = future.result()
self.conversation.add(role=agent.name, content=output)
return history_output_formatter(
self.conversation,
type=self.output_type,
conversation=self.conversation,
output_type=self.output_type,
)
def run(
def batch_run(
self,
task: Optional[str] = None,
tasks: List[str],
img: Optional[str] = None,
*args,
**kwargs,
) -> Any:
imgs: Optional[List[str]] = None,
):
"""
Executes the agent's run method with parallel execution.
Executes the workflow on multiple tasks sequentially.
Args:
task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
tasks (List[str]): List of tasks to be executed by all agents.
img (Optional[str]): Optional image path for agents that support image input.
imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs.
Returns:
Any: The result of the execution.
Raises:
ValueError: If task validation fails.
Exception: If any other error occurs during execution.
"""
if task is not None:
self.tasks.append(task)
try:
outputs = self._run(task, img, *args, **kwargs)
return outputs
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e
List of results, one for each task.
def run_batched(self, tasks: List[str]) -> Any:
"""
Enhanced batched execution
Example:
>>> workflow = ConcurrentWorkflow(agents=[agent1, agent2])
>>> tasks = ["Task 1", "Task 2", "Task 3"]
>>> results = workflow.batch_run(tasks)
>>> print(len(results)) # 3
"""
if not tasks:
raise ValueError("Tasks list cannot be empty")
return [self.run(task) for task in tasks]
def clear_cache(self):
"""Clear the task cache"""
self._cache.clear()
def get_cache_stats(self) -> Dict[str, int]:
"""Get cache statistics"""
return {
"cache_size": len(self._cache),
"max_cache_size": self.cache_size,
}
return [
self.run(task=task, img=img, imgs=imgs) for task in tasks
]
# if __name__ == "__main__":

@ -1,5 +1,8 @@
from typing import List, Any, Optional, Union, Callable
import random
from swarms.prompts.collaborative_prompts import (
get_multi_agent_collaboration_prompt_one,
)
def list_all_agents(
@ -8,6 +11,7 @@ def list_all_agents(
name: Optional[str] = None,
description: Optional[str] = None,
add_to_conversation: Optional[bool] = False,
add_collaboration_prompt: Optional[bool] = True,
) -> str:
"""Lists all agents in a swarm and optionally adds them to a conversation.
@ -60,7 +64,12 @@ def list_all_agents(
content=all_agents,
)
return all_agents
if add_collaboration_prompt:
return get_multi_agent_collaboration_prompt_one(
agents_in_swarm=all_agents
)
else:
return all_agents
models = [

@ -1,6 +1,7 @@
import concurrent.futures
import json
import os
import uuid
from datetime import datetime
import traceback
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field
@ -20,13 +21,15 @@ from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.telemetry.log_executions import log_execution
from swarms.utils.output_types import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.malt import MALT
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.interactive_groupchat import InteractiveGroupChat
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.generate_keys import generate_api_key
logger = initialize_logger(log_folder="swarm_router")
@ -54,25 +57,6 @@ class Document(BaseModel):
data: str
class SwarmLog(BaseModel):
"""
A Pydantic model to capture log entries.
"""
id: Optional[str] = Field(
default_factory=lambda: str(uuid.uuid4())
)
timestamp: Optional[datetime] = Field(
default_factory=datetime.utcnow
)
level: Optional[str] = None
message: Optional[str] = None
swarm_type: Optional[SwarmType] = None
task: Optional[str] = ""
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
documents: List[Document] = []
class SwarmRouterConfig(BaseModel):
"""Configuration model for SwarmRouter."""
@ -172,12 +156,11 @@ class SwarmRouter:
concurrent_batch_run(tasks: List[str], *args, **kwargs) -> List[Any]:
Executes multiple tasks concurrently
get_logs() -> List[SwarmLog]:
Retrieves execution logs
"""
def __init__(
self,
id: str = generate_api_key(prefix="swarm-router"),
name: str = "swarm-router",
description: str = "Routes your task to the desired swarm",
max_loops: int = 1,
@ -191,15 +174,18 @@ class SwarmRouter:
rules: str = None,
documents: List[str] = [], # A list of docs file paths
output_type: OutputType = "dict-all-except-first",
no_cluster_ops: bool = False,
speaker_fn: callable = None,
load_agents_from_csv: bool = False,
csv_file_path: str = None,
return_entire_history: bool = True,
multi_agent_collab_prompt: bool = True,
list_all_agents: bool = False,
conversation: Any = None,
agents_config: Optional[Dict[Any, Any]] = None,
*args,
**kwargs,
):
self.id = id
self.name = name
self.description = description
self.max_loops = max_loops
@ -213,13 +199,15 @@ class SwarmRouter:
self.rules = rules
self.documents = documents
self.output_type = output_type
self.no_cluster_ops = no_cluster_ops
self.speaker_fn = speaker_fn
self.logs = []
self.load_agents_from_csv = load_agents_from_csv
self.csv_file_path = csv_file_path
self.return_entire_history = return_entire_history
self.multi_agent_collab_prompt = multi_agent_collab_prompt
self.list_all_agents = list_all_agents
self.conversation = conversation
self.agents_config = agents_config
# Reliability check
self.reliability_check()
@ -230,6 +218,8 @@ class SwarmRouter:
csv_path=self.csv_file_path
).load_agents()
self.agent_config = self.agent_config()
def setup(self):
if self.auto_generate_prompts is True:
self.activate_ape()
@ -276,15 +266,12 @@ class SwarmRouter:
logger.info(
f"Successfully activated APE for {activated_count} agents"
)
self._log(
"info",
f"Activated automatic prompt engineering for {activated_count} agents",
)
except Exception as e:
error_msg = f"Error activating automatic prompt engineering: {str(e)}"
logger.error(error_msg)
self._log("error", error_msg)
logger.error(
f"Error activating automatic prompt engineering in SwarmRouter: {str(e)}"
)
raise RuntimeError(error_msg) from e
def reliability_check(self):
@ -293,48 +280,24 @@ class SwarmRouter:
Validates essential swarm parameters and configuration before execution.
Handles special case for CouncilAsAJudge which may not require agents.
"""
logger.info(
"🔍 [SYSTEM] Initializing advanced swarm reliability diagnostics..."
)
logger.info(
"⚡ [SYSTEM] Running pre-flight checks and system validation..."
)
# Check swarm type first since it affects other validations
if self.swarm_type is None:
logger.error(
"❌ [CRITICAL] Swarm type validation failed - type cannot be 'none'"
raise ValueError(
"SwarmRouter: Swarm type cannot be 'none'."
)
raise ValueError("Swarm type cannot be 'none'.")
# Special handling for CouncilAsAJudge
if self.swarm_type == "CouncilAsAJudge":
if self.agents is not None:
logger.warning(
"⚠️ [ADVISORY] CouncilAsAJudge detected with agents - this is atypical"
)
elif not self.agents:
logger.error(
"❌ [CRITICAL] Agent validation failed - no agents detected in swarm"
if self.agents is None:
raise ValueError(
"SwarmRouter: No agents provided for the swarm."
)
raise ValueError("No agents provided for the swarm.")
# Validate max_loops
if self.max_loops == 0:
logger.error(
"❌ [CRITICAL] Loop validation failed - max_loops cannot be 0"
)
raise ValueError("max_loops cannot be 0.")
raise ValueError("SwarmRouter: max_loops cannot be 0.")
# Setup other functionality
logger.info("🔄 [SYSTEM] Initializing swarm subsystems...")
self.setup()
logger.info(
"✅ [SYSTEM] All reliability checks passed successfully"
)
logger.info("🚀 [SYSTEM] Swarm is ready for deployment")
def _create_swarm(self, task: str = None, *args, **kwargs):
"""
Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task.
@ -509,37 +472,19 @@ class SwarmRouter:
for agent in self.agents
]
def _log(
self,
level: str,
message: str,
task: str = "",
metadata: Dict[str, Any] = None,
):
"""
Create a log entry and add it to the logs list.
def agent_config(self):
agent_config = {}
for agent in self.agents:
agent_config[agent.agent_name] = agent.to_dict()
Args:
level (str): The log level (e.g., "info", "error").
message (str): The log message.
task (str, optional): The task being performed. Defaults to "".
metadata (Dict[str, Any], optional): Additional metadata. Defaults to None.
"""
log_entry = SwarmLog(
level=level,
message=message,
swarm_type=self.swarm_type,
task=task,
metadata=metadata or {},
)
self.logs.append(log_entry)
logger.log(level.upper(), message)
return agent_config
def _run(
self,
task: str,
img: Optional[str] = None,
model_response: Optional[str] = None,
imgs: Optional[List[str]] = None,
*args,
**kwargs,
) -> Any:
@ -559,17 +504,34 @@ class SwarmRouter:
"""
self.swarm = self._create_swarm(task, *args, **kwargs)
self.conversation = self.swarm.conversation
if self.list_all_agents is True:
list_all_agents(
agents=self.agents,
conversation=self.swarm.conversation,
name=self.name,
description=self.description,
add_collaboration_prompt=True,
add_to_conversation=True,
)
if self.multi_agent_collab_prompt is True:
self.update_system_prompt_for_agent_in_swarm()
try:
logger.info(
f"Running task on {self.swarm_type} swarm with task: {task}"
)
log_execution(
swarm_id=self.id,
status="start",
swarm_config=self.to_dict(),
swarm_architecture="swarm_router",
)
try:
if self.swarm_type == "CouncilAsAJudge":
result = self.swarm.run(
task=task,
img=img,
imgs=imgs,
model_response=model_response,
*args,
**kwargs,
@ -577,21 +539,24 @@ class SwarmRouter:
else:
result = self.swarm.run(task=task, *args, **kwargs)
logger.info("Swarm completed successfully")
log_execution(
swarm_id=self.id,
status="completion",
swarm_config=self.to_dict(),
swarm_architecture="swarm_router",
)
return result
except Exception as e:
self._log(
"error",
f"Error occurred while running task on {self.swarm_type} swarm: {str(e)}",
task=task,
metadata={"error": str(e)},
raise RuntimeError(
f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}"
)
raise
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
model_response: Optional[str] = None,
*args,
**kwargs,
@ -617,15 +582,24 @@ class SwarmRouter:
return self._run(
task=task,
img=img,
imgs=imgs,
model_response=model_response,
*args,
**kwargs,
)
except Exception as e:
logger.error(f"Error executing task on swarm: {str(e)}")
raise
raise RuntimeError(
f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}"
)
def __call__(self, task: str, *args, **kwargs) -> Any:
def __call__(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
*args,
**kwargs,
) -> Any:
"""
Make the SwarmRouter instance callable.
@ -637,10 +611,17 @@ class SwarmRouter:
Returns:
Any: The result of the swarm's execution.
"""
return self.run(task=task, *args, **kwargs)
return self.run(
task=task, img=img, imgs=imgs, *args, **kwargs
)
def batch_run(
self, tasks: List[str], *args, **kwargs
self,
tasks: List[str],
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
*args,
**kwargs,
) -> List[Any]:
"""
Execute a batch of tasks on the selected or matched swarm type.
@ -659,21 +640,26 @@ class SwarmRouter:
results = []
for task in tasks:
try:
result = self.run(task, *args, **kwargs)
result = self.run(
task, img=img, imgs=imgs, *args, **kwargs
)
results.append(result)
except Exception as e:
self._log(
"error",
f"Error occurred while running batch task on {self.swarm_type} swarm: {str(e)}",
task=task,
metadata={"error": str(e)},
raise RuntimeError(
f"SwarmRouter: Error executing batch task on swarm: {str(e)} Traceback: {traceback.format_exc()}"
)
raise
return results
def async_run(self, task: str, *args, **kwargs) -> Any:
def concurrent_run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
*args,
**kwargs,
) -> Any:
"""
Execute a task on the selected or matched swarm type asynchronously.
Execute a task on the selected or matched swarm type concurrently.
Args:
task (str): The task to be executed by the swarm.
@ -686,95 +672,70 @@ class SwarmRouter:
Raises:
Exception: If an error occurs during task execution.
"""
import asyncio
async def run_async():
try:
result = await asyncio.to_thread(
self.run, task, *args, **kwargs
)
return result
except Exception as e:
self._log(
"error",
f"Error occurred while running task asynchronously on {self.swarm_type} swarm: {str(e)}",
task=task,
metadata={"error": str(e)},
)
raise
return asyncio.run(run_async())
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
future = executor.submit(
self.run, task, img=img, imgs=imgs, *args, **kwargs
)
result = future.result()
return result
def get_logs(self) -> List[SwarmLog]:
def _serialize_callable(
self, attr_value: Callable
) -> Dict[str, Any]:
"""
Retrieve all logged entries.
Serializes callable attributes by extracting their name and docstring.
Args:
attr_value (Callable): The callable to serialize.
Returns:
List[SwarmLog]: A list of all log entries.
Dict[str, Any]: Dictionary with name and docstring of the callable.
"""
return self.logs
def concurrent_run(self, task: str, *args, **kwargs) -> Any:
return {
"name": getattr(
attr_value, "__name__", type(attr_value).__name__
),
"doc": getattr(attr_value, "__doc__", None),
}
def _serialize_attr(self, attr_name: str, attr_value: Any) -> Any:
"""
Execute a task on the selected or matched swarm type concurrently.
Serializes an individual attribute, handling non-serializable objects.
Args:
task (str): The task to be executed by the swarm.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
attr_name (str): The name of the attribute.
attr_value (Any): The value of the attribute.
Returns:
Any: The result of the swarm's execution.
Raises:
Exception: If an error occurs during task execution.
Any: The serialized value of the attribute.
"""
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
future = executor.submit(self.run, task, *args, **kwargs)
result = future.result()
return result
def concurrent_batch_run(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
try:
if callable(attr_value):
return self._serialize_callable(attr_value)
elif hasattr(attr_value, "to_dict"):
return (
attr_value.to_dict()
) # Recursive serialization for nested objects
else:
json.dumps(
attr_value
) # Attempt to serialize to catch non-serializable objects
return attr_value
except (TypeError, ValueError):
return f"<Non-serializable: {type(attr_value).__name__}>"
def to_dict(self) -> Dict[str, Any]:
"""
Execute a batch of tasks on the selected or matched swarm type concurrently.
Args:
tasks (List[str]): A list of tasks to be executed by the swarm.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Converts all attributes of the class, including callables, into a dictionary.
Handles non-serializable attributes by converting them or skipping them.
Returns:
List[Any]: A list of results from the swarm's execution.
Raises:
Exception: If an error occurs during task execution.
Dict[str, Any]: A dictionary representation of the class attributes.
"""
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
)
results = []
with ThreadPoolExecutor() as executor:
# Submit all tasks to executor
futures = [
executor.submit(self.run, task, *args, **kwargs)
for task in tasks
]
# Process results as they complete rather than waiting for all
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Task execution failed: {str(e)}")
results.append(None)
return results
return {
attr_name: self._serialize_attr(attr_name, attr_value)
for attr_name, attr_value in self.__dict__.items()
}

@ -1,27 +1,13 @@
from swarms.telemetry.main import (
generate_unique_identifier,
generate_user_id,
get_cpu_info,
get_machine_id,
get_os_version,
get_pip_version,
get_python_version,
get_ram_info,
get_system_info,
get_user_device_data,
system_info,
get_comprehensive_system_info,
log_agent_data,
)
__all__ = [
"generate_user_id",
"get_machine_id",
"get_system_info",
"generate_unique_identifier",
"get_python_version",
"get_pip_version",
"get_os_version",
"get_cpu_info",
"get_ram_info",
"system_info",
"get_user_device_data",
"get_comprehensive_system_info",
"log_agent_data",
]

@ -0,0 +1,43 @@
from typing import Optional
from swarms.telemetry.main import log_agent_data
def log_execution(
swarm_id: Optional[str] = None,
status: Optional[str] = None,
swarm_config: Optional[dict] = None,
swarm_architecture: Optional[str] = None,
):
"""
Log execution data for a swarm router instance.
This function logs telemetry data about swarm router executions, including
the swarm ID, execution status, and configuration details. It silently
handles any logging errors to prevent execution interruption.
Args:
swarm_id (str): Unique identifier for the swarm router instance
status (str): Current status of the execution (e.g., "start", "completion", "error")
swarm_config (dict): Configuration dictionary containing swarm router settings
swarm_architecture (str): Name of the swarm architecture used
Returns:
None
Example:
>>> log_execution(
... swarm_id="swarm-router-abc123",
... status="start",
... swarm_config={"name": "my-swarm", "swarm_type": "SequentialWorkflow"}
... )
"""
try:
log_agent_data(
data_dict={
"swarm_router_id": swarm_id,
"status": status,
"swarm_router_config": swarm_config,
"swarm_architecture": swarm_architecture,
}
)
except Exception:
pass

@ -1,16 +1,13 @@
import os
import datetime
import hashlib
import platform
import socket
import subprocess
import uuid
from typing import Dict
from typing import Any, Dict
import pkg_resources
import psutil
import requests
import toml
from functools import lru_cache
# Helper functions
@ -34,265 +31,104 @@ def get_machine_id():
return hashed_id
def get_system_info():
"""
Gathers basic system information.
Returns:
dict: A dictionary containing system-related information.
"""
info = {
@lru_cache(maxsize=1)
def get_comprehensive_system_info() -> Dict[str, Any]:
# Basic platform and hardware information
system_data = {
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"platform_full": platform.platform(),
"architecture": platform.machine(),
"architecture_details": platform.architecture()[0],
"processor": platform.processor(),
"hostname": socket.gethostname(),
"ip_address": socket.gethostbyname(socket.gethostname()),
"mac_address": ":".join(
}
# MAC address
try:
system_data["mac_address"] = ":".join(
[
f"{(uuid.getnode() >> elements) & 0xFF:02x}"
for elements in range(0, 2 * 6, 8)
][::-1]
),
"processor": platform.processor(),
"python_version": platform.python_version(),
"Misc": system_info(),
}
return info
def generate_unique_identifier():
"""Generate unique identifier
Returns:
str: unique id
"""
system_info = get_system_info()
unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_info))
return str(unique_id)
def get_local_ip():
"""Get local ip
Returns:
str: local ip
"""
return socket.gethostbyname(socket.gethostname())
def get_user_device_data():
data = {
"ID": generate_user_id(),
"Machine ID": get_machine_id(),
"System Info": get_system_info(),
"UniqueID": generate_unique_identifier(),
}
return data
def get_python_version():
return platform.python_version()
def get_pip_version() -> str:
"""Get pip version
Returns:
str: The version of pip installed
"""
try:
pip_version = (
subprocess.check_output(["pip", "--version"])
.decode()
.split()[1]
)
except Exception as e:
pip_version = str(e)
return pip_version
def get_swarms_verison() -> tuple[str, str]:
"""Get swarms version from both command line and package
Returns:
tuple[str, str]: A tuple containing (command line version, package version)
"""
try:
swarms_verison_cmd = (
subprocess.check_output(["swarms", "--version"])
.decode()
.split()[1]
)
except Exception as e:
swarms_verison_cmd = str(e)
swarms_verison_pkg = pkg_resources.get_distribution(
"swarms"
).version
swarms_verison = swarms_verison_cmd, swarms_verison_pkg
return swarms_verison
def get_os_version() -> str:
"""Get operating system version
Returns:
str: The operating system version and platform details
"""
return platform.platform()
system_data["mac_address"] = f"Error: {str(e)}"
# CPU information
system_data["cpu_count_logical"] = psutil.cpu_count(logical=True)
system_data["cpu_count_physical"] = psutil.cpu_count(
logical=False
)
def get_cpu_info() -> str:
"""Get CPU information
Returns:
str: The processor information
"""
return platform.processor()
def get_ram_info() -> str:
"""Get RAM information
Returns:
str: A formatted string containing total, used and free RAM in GB
"""
# Memory information
vm = psutil.virtual_memory()
total_ram_gb = vm.total / (1024**3)
used_ram_gb = vm.used / (1024**3)
free_ram_gb = vm.free / (1024**3)
total_ram_gb = vm.total / (1024**3)
return (
f"{total_ram_gb:.2f} GB, used: {used_ram_gb:.2f}, free:"
f" {free_ram_gb:.2f}"
available_ram_gb = vm.available / (1024**3)
system_data.update(
{
"memory_total_gb": f"{total_ram_gb:.2f}",
"memory_used_gb": f"{used_ram_gb:.2f}",
"memory_free_gb": f"{free_ram_gb:.2f}",
"memory_available_gb": f"{available_ram_gb:.2f}",
"memory_summary": f"Total: {total_ram_gb:.2f} GB, Used: {used_ram_gb:.2f} GB, Free: {free_ram_gb:.2f} GB, Available: {available_ram_gb:.2f} GB",
}
)
# Python version
system_data["python_version"] = platform.python_version()
def get_package_mismatches(file_path: str = "pyproject.toml") -> str:
"""Get package version mismatches between pyproject.toml and installed packages
Args:
file_path (str, optional): Path to pyproject.toml file. Defaults to "pyproject.toml".
Returns:
str: A formatted string containing package version mismatches
"""
with open(file_path) as file:
pyproject = toml.load(file)
dependencies = pyproject["tool"]["poetry"]["dependencies"]
dev_dependencies = pyproject["tool"]["poetry"]["group"]["dev"][
"dependencies"
]
dependencies.update(dev_dependencies)
installed_packages = {
pkg.key: pkg.version for pkg in pkg_resources.working_set
}
mismatches = []
for package, version_info in dependencies.items():
if isinstance(version_info, dict):
version_info = version_info["version"]
installed_version = installed_packages.get(package)
if installed_version and version_info.startswith("^"):
expected_version = version_info[1:]
if not installed_version.startswith(expected_version):
mismatches.append(
f"\t {package}: Mismatch,"
f" pyproject.toml={expected_version},"
f" pip={installed_version}"
)
else:
mismatches.append(f"\t {package}: Not found in pip list")
return "\n" + "\n".join(mismatches)
def system_info() -> dict[str, str]:
"""Get system information including Python, pip, OS, CPU and RAM details
Returns:
dict[str, str]: A dictionary containing system information
"""
return {
"Python Version": get_python_version(),
"Pip Version": get_pip_version(),
# "Swarms Version": swarms_verison,
"OS Version and Architecture": get_os_version(),
"CPU Info": get_cpu_info(),
"RAM Info": get_ram_info(),
}
def capture_system_data() -> Dict[str, str]:
"""
Captures extensive system data including platform information, user ID, IP address, CPU count,
memory information, and other system details.
Returns:
Dict[str, str]: A dictionary containing system data.
"""
# Generate unique identifier based on system info
try:
system_data = {
"platform": platform.system(),
"platform_version": platform.version(),
"platform_release": platform.release(),
"hostname": socket.gethostname(),
"ip_address": socket.gethostbyname(socket.gethostname()),
"cpu_count": psutil.cpu_count(logical=True),
"memory_total": f"{psutil.virtual_memory().total / (1024 ** 3):.2f} GB",
"memory_available": f"{psutil.virtual_memory().available / (1024 ** 3):.2f} GB",
"user_id": str(uuid.uuid4()), # Unique user identifier
"machine_type": platform.machine(),
"processor": platform.processor(),
"architecture": platform.architecture()[0],
}
return system_data
unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_data))
system_data["unique_identifier"] = str(unique_id)
except Exception as e:
# logger.error("Failed to capture system data: {}", e)
print(f"Failed to capture system data: {e}")
system_data["unique_identifier"] = f"Error: {str(e)}"
return system_data
def _log_agent_data(data_dict: dict):
"""Simple function to log agent data using requests library"""
if not data_dict:
return
url = "https://swarms.world/api/get-agents/log-agents"
payload = {
log = {
"data": data_dict,
"system_data": get_user_device_data(),
"system_data": get_comprehensive_system_info(),
"timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
}
key = (
os.getenv("SWARMS_API_KEY")
or "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24"
)
payload = {
"data": log,
}
key = "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24"
headers = {
"Content-Type": "application/json",
"Authorization": key,
}
try:
response = requests.post(
url, json=payload, headers=headers, timeout=10
)
if response.status_code == 200:
return
except Exception:
return
response = requests.post(
url, json=payload, headers=headers, timeout=10
)
print(response.json())
if response.status_code == 200:
return response.json()
return
print(response.json())
return response.json()
def log_agent_data(data_dict: dict):
try:
_log_agent_data(data_dict)
return _log_agent_data(data_dict)
except Exception:
pass

Loading…
Cancel
Save