From adfdabba20cb580493a7f1017fb6f4eb533816aa Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 26 Jun 2025 14:21:05 -0700 Subject: [PATCH] [FIX][agent.py] -- self.no_print -> self.print_on] [ENHC][Improve the concurrent workflow] [collaborative prompt] [fix][improve list_all_agents prompt] --- .../single_agent/vision/burning_image.jpg | Bin .../single_agent/vision/image.jpg | Bin .../vision/multiple_image_processing.py | 2 +- .../single_agent/vision/vision_tools.py | 0 swarm_router_test.py | 81 +++++ swarms/cli/onboarding_process.py | 4 +- swarms/prompts/collaborative_prompts.py | 177 +++++++++ swarms/structs/agent.py | 12 +- swarms/structs/concurrent_workflow.py | 266 ++++---------- swarms/structs/ma_utils.py | 11 +- swarms/structs/swarm_router.py | 335 ++++++++---------- swarms/telemetry/__init__.py | 22 +- swarms/telemetry/log_executions.py | 43 +++ swarms/telemetry/main.py | 278 +++------------ 14 files changed, 598 insertions(+), 633 deletions(-) rename burning_image.jpg => examples/single_agent/vision/burning_image.jpg (100%) rename image.jpg => examples/single_agent/vision/image.jpg (100%) rename multiple_image_processing.py => examples/single_agent/vision/multiple_image_processing.py (86%) rename vision_tools.py => examples/single_agent/vision/vision_tools.py (100%) create mode 100644 swarm_router_test.py create mode 100644 swarms/prompts/collaborative_prompts.py create mode 100644 swarms/telemetry/log_executions.py diff --git a/burning_image.jpg b/examples/single_agent/vision/burning_image.jpg similarity index 100% rename from burning_image.jpg rename to examples/single_agent/vision/burning_image.jpg diff --git a/image.jpg b/examples/single_agent/vision/image.jpg similarity index 100% rename from image.jpg rename to examples/single_agent/vision/image.jpg diff --git a/multiple_image_processing.py b/examples/single_agent/vision/multiple_image_processing.py similarity index 86% rename from multiple_image_processing.py rename to examples/single_agent/vision/multiple_image_processing.py index 3d90f612..da67bb94 100644 --- a/multiple_image_processing.py +++ b/examples/single_agent/vision/multiple_image_processing.py @@ -18,7 +18,7 @@ quality_control_agent = Agent( response = quality_control_agent.run( - task="what is in the image?", + task="Analyze our factories images and provide a detailed health report for each factory.", imgs=[factory_image, "burning_image.jpg"], ) diff --git a/vision_tools.py b/examples/single_agent/vision/vision_tools.py similarity index 100% rename from vision_tools.py rename to examples/single_agent/vision/vision_tools.py diff --git a/swarm_router_test.py b/swarm_router_test.py new file mode 100644 index 00000000..016953ff --- /dev/null +++ b/swarm_router_test.py @@ -0,0 +1,81 @@ +import json +from swarms import Agent, SwarmRouter + +# Agent 1: Risk Metrics Calculator +risk_metrics_agent = Agent( + agent_name="Risk-Metrics-Calculator", + agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility", + system_prompt="""You are a risk metrics specialist. Calculate and explain: + - Value at Risk (VaR) + - Sharpe ratio + - Volatility + - Maximum drawdown + - Beta coefficient + + Provide clear, numerical results with brief explanations.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 2: Portfolio Risk Analyzer +portfolio_risk_agent = Agent( + agent_name="Portfolio-Risk-Analyzer", + agent_description="Analyzes portfolio diversification and concentration risk", + system_prompt="""You are a portfolio risk analyst. Focus on: + - Portfolio diversification analysis + - Concentration risk assessment + - Correlation analysis + - Sector/asset allocation risk + - Liquidity risk evaluation + + Provide actionable insights for risk reduction.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + +# Agent 3: Market Risk Monitor +market_risk_agent = Agent( + agent_name="Market-Risk-Monitor", + agent_description="Monitors market conditions and identifies risk factors", + system_prompt="""You are a market risk monitor. Identify and assess: + - Market volatility trends + - Economic risk factors + - Geopolitical risks + - Interest rate risks + - Currency risks + + Provide current risk alerts and trends.""", + max_loops=1, + # model_name="gpt-4o-mini", + random_model_enabled=True, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_tokens=4096, +) + + +swarm = SwarmRouter( + agents=[ + risk_metrics_agent, + portfolio_risk_agent, + ], + max_loops=1, + swarm_type="MixtureOfAgents", + output_type="final", +) + + +# swarm.run( +# "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility" +# ) + + +print(f"Swarm config: {json.dumps(swarm.to_dict(), indent=4)}") diff --git a/swarms/cli/onboarding_process.py b/swarms/cli/onboarding_process.py index e279d9e3..8085c688 100644 --- a/swarms/cli/onboarding_process.py +++ b/swarms/cli/onboarding_process.py @@ -7,7 +7,6 @@ from swarms.utils.loguru_logger import initialize_logger from swarms.telemetry.main import ( - capture_system_data, log_agent_data, ) @@ -34,7 +33,6 @@ class OnboardingProcess: cache_save_path (str): The path where user data is cached for reliability. """ self.user_data: Dict[str, str] = {} - self.system_data: Dict[str, str] = capture_system_data() self.auto_save_path = auto_save_path self.cache_save_path = cache_save_path self.load_existing_data() @@ -85,7 +83,7 @@ class OnboardingProcess: while attempt < retry_attempts: try: - combined_data = {**self.user_data, **self.system_data} + combined_data = {**self.user_data} log_agent_data(combined_data) return # Exit the function if saving was successful except Exception as e: diff --git a/swarms/prompts/collaborative_prompts.py b/swarms/prompts/collaborative_prompts.py new file mode 100644 index 00000000..4a04245b --- /dev/null +++ b/swarms/prompts/collaborative_prompts.py @@ -0,0 +1,177 @@ +def get_multi_agent_collaboration_prompt_one(agents_in_swarm: str): + MULTI_AGENT_COLLABORATION_PROMPT_ONE = f""" + You are all operating within a multi-agent collaborative system. Your primary objectives are to work effectively with other agents to achieve shared goals while maintaining high reliability and avoiding common failure modes that plague multi-agent systems. + + {agents_in_swarm} + + ## Fundamental Collaboration Principles + + ### 1. Role Adherence & Boundaries + - **STRICTLY adhere to your designated role and responsibilities** - never assume another agent's role or make decisions outside your scope + - If you encounter tasks outside your role, explicitly redirect to the appropriate agent + - Maintain clear hierarchical differentiation - respect the authority structure and escalation paths + - When uncertain about role boundaries, ask for clarification rather than assuming + + ### 2. Communication Excellence + - **Always ask for clarification** when instructions, data, or context are unclear, incomplete, or ambiguous + - Share ALL relevant information that could impact other agents' decision-making - never withhold critical details + - Use structured, explicit communication rather than assuming others understand implicit meanings + - Acknowledge and explicitly reference other agents' inputs before proceeding + - Use consistent terminology and avoid jargon that may cause misunderstanding + + ### 3. Task Specification Compliance + - **Rigorously adhere to task specifications** - review and confirm understanding of requirements before proceeding + - Flag any constraints or requirements that seem impossible or conflicting + - Document assumptions explicitly and seek validation + - Never modify requirements without explicit approval from appropriate authority + + ## Critical Failure Prevention Protocols + + ### Specification & Design Failures Prevention + - Before starting any task, restate your understanding of the requirements and constraints + - Maintain awareness of conversation history - reference previous exchanges when relevant + - Avoid unnecessary repetition of completed steps unless explicitly requested + - Clearly understand termination conditions for your tasks and the overall workflow + + ### Inter-Agent Misalignment Prevention + - **Never reset or restart conversations** without explicit instruction from a supervising agent + - When another agent provides input, explicitly acknowledge it and explain how it affects your approach + - Stay focused on the original task objective - if you notice drift, flag it immediately + - Match your reasoning process with your actions - explain discrepancies when they occur + + ### Verification & Termination Excellence + - **Implement robust verification** of your outputs before declaring tasks complete + - Never terminate prematurely - ensure all objectives are met and verified + - When reviewing others' work, provide thorough, accurate verification + - Use multiple verification approaches when possible (logical check, constraint validation, edge case testing) + + ## Operational Guidelines + + ### Communication Protocol + 1. **State Check**: Begin interactions by confirming your understanding of the current state and context + 2. **Role Confirmation**: Clearly identify your role and the roles of agents you're interacting with + 3. **Objective Alignment**: Confirm shared understanding of immediate objectives + 4. **Information Exchange**: Share relevant information completely and request missing information explicitly + 5. **Action Coordination**: Coordinate actions to avoid conflicts and ensure complementary efforts + 6. **Verification**: Verify outcomes and seek validation when appropriate + 7. **Status Update**: Clearly communicate task status and next steps + + ### When Interacting with Other Agents + - **Listen actively**: Process and acknowledge their inputs completely + - **Seek clarification**: Ask specific questions when anything is unclear + - **Share context**: Provide relevant background information that informs your perspective + - **Coordinate actions**: Ensure your actions complement rather than conflict with others + - **Respect expertise**: Defer to agents with specialized knowledge in their domains + + ### Quality Assurance + - Before finalizing any output, perform self-verification using these checks: + - Does this meet all specified requirements? + - Are there any edge cases or constraints I haven't considered? + - Is this consistent with information provided by other agents? + - Have I clearly communicated my reasoning and any assumptions? + + ### Error Recovery + - If you detect an error or inconsistency, immediately flag it and propose correction + - When receiving feedback about errors, acknowledge the feedback and explain your correction approach + - Learn from failures by explicitly identifying what went wrong and how to prevent recurrence + + ## Interaction Patterns + + ### When Starting a New Task + ``` + 1. Acknowledge the task assignment + 2. Confirm role boundaries and responsibilities + 3. Identify required inputs and information sources + 4. State assumptions and seek validation + 5. Outline approach and request feedback + 6. Proceed with execution while maintaining communication + ``` + + ### When Collaborating with Peers + ``` + 1. Establish communication channel and protocols + 2. Share relevant context and constraints + 3. Coordinate approaches to avoid duplication or conflicts + 4. Maintain regular status updates + 5. Verify integrated outputs collectively + ``` + + ### When Escalating Issues + ``` + 1. Clearly describe the issue and its implications + 2. Provide relevant context and attempted solutions + 3. Specify what type of resolution or guidance is needed + 4. Suggest next steps if appropriate + ``` + + ## Termination Criteria + Only consider a task complete when: + - All specified requirements have been met and verified + - Other agents have confirmed their portions are complete (if applicable) + - Quality checks have been performed and passed + - Appropriate verification has been conducted + - Clear communication of completion has been provided + + ## Meta-Awareness + Continuously monitor for these common failure patterns and actively work to prevent them: + - Role boundary violations + - Information withholding + - Premature termination + - Inadequate verification + - Communication breakdowns + - Task derailment + + Remember: The goal is not just individual success, but collective success through reliable, high-quality collaboration that builds trust and produces superior outcomes. + """ + + return MULTI_AGENT_COLLABORATION_PROMPT_ONE + + +MULTI_AGENT_COLLABORATION_PROMPT_TWO = """ +# Compact Multi-Agent Collaboration Prompt + +## Core Directives + +You are an AI agent in a multi-agent system. Follow these essential collaboration protocols: + +### Role & Boundaries +- **Stay in your designated role** - never assume another agent's responsibilities +- When tasks fall outside your scope, redirect to the appropriate agent +- Respect hierarchy and authority structures + +### Communication Requirements +- **Always ask for clarification** when anything is unclear or incomplete +- **Share all relevant information** - never withhold details that could impact others +- **Acknowledge other agents' inputs** explicitly before proceeding +- Use clear, structured communication + +### Task Execution +- **Confirm task requirements** before starting - restate your understanding +- **Adhere strictly to specifications** - flag conflicts or impossibilities +- **Maintain conversation context** - reference previous exchanges when relevant +- **Verify your work thoroughly** before declaring completion + +### Collaboration Protocol +1. **State Check**: Confirm current context and your role +2. **Clarify**: Ask specific questions about unclear elements +3. **Coordinate**: Align actions with other agents to avoid conflicts +4. **Verify**: Check outputs meet requirements and constraints +5. **Communicate**: Clearly report status and next steps + +### Termination Criteria +Only mark tasks complete when: +- All requirements verified as met +- Quality checks passed +- Other agents confirm their portions (if applicable) +- Clear completion communication provided + +### Failure Prevention +Actively watch for and prevent: +- Role boundary violations +- Information withholding +- Premature task termination +- Inadequate verification +- Task objective drift + +**Remember**: Success requires reliable collaboration, not just individual performance. +""" diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 12d2306a..bf8d1ab7 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -403,7 +403,7 @@ class Agent: llm_args: dict = None, load_state_path: str = None, role: agent_roles = "worker", - no_print: bool = False, + print_on: bool = False, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, mcp_url: Optional[Union[str, MCPConnection]] = None, mcp_urls: List[str] = None, @@ -540,7 +540,7 @@ class Agent: self.llm_args = llm_args self.load_state_path = load_state_path self.role = role - self.no_print = no_print + self.print_on = print_on self.tools_list_dictionary = tools_list_dictionary self.mcp_url = mcp_url self.mcp_urls = mcp_urls @@ -631,7 +631,7 @@ class Agent: ) self.short_memory.add( - role=f"{self.agent_name}", + role=self.agent_name, content=self.tools_list_dictionary, ) @@ -2691,14 +2691,14 @@ class Agent: return self.role def pretty_print(self, response: str, loop_count: int): - if self.no_print is False: + if self.print_on is False: if self.streaming_on is True: # self.stream_response(response) formatter.print_panel_token_by_token( f"{self.agent_name}: {response}", title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", ) - elif self.no_print is True: + elif self.print_on is True: pass else: # logger.info(f"Response: {response}") @@ -2818,7 +2818,7 @@ class Agent: # execute_tool_call_simple returns a string directly, not an object with content attribute text_content = f"MCP Tool Response: \n\n {json.dumps(tool_response, indent=2)}" - if self.no_print is False: + if self.print_on is False: formatter.print_panel( text_content, "MCP Tool Response: 🛠️", diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index c6a653ae..86951e95 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -1,13 +1,10 @@ +import concurrent.futures import os -import time -from concurrent.futures import ThreadPoolExecutor -from functools import lru_cache -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Callable, List, Optional, Union from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation -from swarms.utils.formatter import formatter from swarms.utils.history_output_formatter import ( history_output_formatter, ) @@ -35,9 +32,7 @@ class ConcurrentWorkflow(BaseSwarm): return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False. return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False. - cache_size (int): The size of the cache. Defaults to 100. - max_retries (int): The maximum number of retry attempts. Defaults to 3. - retry_delay (float): The delay between retry attempts in seconds. Defaults to 1.0. + Raises: ValueError: If the list of agents is empty or if the description is empty. @@ -50,13 +45,7 @@ class ConcurrentWorkflow(BaseSwarm): auto_save (bool): Flag indicating whether to automatically save the metadata. output_type (str): The type of output format. max_loops (int): The maximum number of loops for each agent. - return_str_on (bool): Flag indicating whether to return the output as a string. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. - return_entire_history (bool): Flag indicating whether to return the entire conversation history. - cache_size (int): The size of the cache. - max_retries (int): The maximum number of retry attempts. - retry_delay (float): The delay between retry attempts in seconds. - _cache (dict): The cache for storing agent outputs. """ def __init__( @@ -68,12 +57,7 @@ class ConcurrentWorkflow(BaseSwarm): auto_save: bool = True, output_type: str = "dict-all-except-first", max_loops: int = 1, - return_str_on: bool = False, auto_generate_prompts: bool = False, - return_entire_history: bool = False, - cache_size: int = 100, - max_retries: int = 3, - retry_delay: float = 1.0, *args, **kwargs, ): @@ -90,63 +74,31 @@ class ConcurrentWorkflow(BaseSwarm): self.metadata_output_path = metadata_output_path self.auto_save = auto_save self.max_loops = max_loops - self.return_str_on = return_str_on self.auto_generate_prompts = auto_generate_prompts - self.max_workers = os.cpu_count() self.output_type = output_type - self.return_entire_history = return_entire_history - self.tasks = [] # Initialize tasks list - self.cache_size = cache_size - self.max_retries = max_retries - self.retry_delay = retry_delay - self._cache = {} self.reliability_check() self.conversation = Conversation() def reliability_check(self): try: - formatter.print_panel( - content=f"\n 🏷️ Name: {self.name}\n 📝 Description: {self.description}\n 🤖 Agents: {len(self.agents)}\n 🔄 Max Loops: {self.max_loops}\n ", - title="⚙️ Concurrent Workflow Settings", - style="bold blue", - ) - formatter.print_panel( - content="🔍 Starting reliability checks", - title="🔒 Reliability Checks", - style="bold blue", - ) - - if self.name is None: - logger.error("❌ A name is required for the swarm") + if self.agents is None: raise ValueError( - "❌ A name is required for the swarm" + "ConcurrentWorkflow: No agents provided" ) - if not self.agents or len(self.agents) <= 1: - logger.error( - "❌ The list of agents must not be empty." - ) + if len(self.agents) == 0: raise ValueError( - "❌ The list of agents must not be empty." + "ConcurrentWorkflow: No agents provided" ) - if not self.description: - logger.error("❌ A description is required.") - raise ValueError("❌ A description is required.") - - formatter.print_panel( - content="✅ Reliability checks completed successfully", - title="🎉 Reliability Checks", - style="bold green", - ) - - except ValueError as e: - logger.error(f"❌ Reliability check failed: {e}") - raise + if len(self.agents) == 1: + logger.warning( + "ConcurrentWorkflow: Only one agent provided. With ConcurrentWorkflow, you should use at least 2+ agents." + ) except Exception as e: logger.error( - f"💥 An unexpected error occurred during reliability checks: {e}" + f"ConcurrentWorkflow: Reliability check failed: {e}" ) raise @@ -163,162 +115,84 @@ class ConcurrentWorkflow(BaseSwarm): for agent in self.agents: agent.auto_generate_prompt = True - @lru_cache(maxsize=100) - def _cached_run(self, task: str, agent_id: int) -> Any: - """Cached version of agent execution to avoid redundant computations""" - return self.agents[agent_id].run(task=task) - - def _validate_input(self, task: str) -> bool: - """Validate input task""" - if not isinstance(task, str): - raise ValueError("Task must be a string") - if not task.strip(): - raise ValueError("Task cannot be empty") - return True - - def _run_with_retry( - self, agent: Agent, task: str, img: str = None - ) -> Any: - """Run agent with retry mechanism""" - for attempt in range(self.max_retries): - try: - output = agent.run(task=task, img=img) - self.conversation.add(agent.agent_name, output) - return output - except Exception as e: - if attempt == self.max_retries - 1: - logger.error( - f"Error running agent {agent.agent_name} after {self.max_retries} attempts: {e}" - ) - raise - logger.warning( - f"Attempt {attempt + 1} failed for agent {agent.agent_name}: {e}" - ) - time.sleep( - self.retry_delay * (attempt + 1) - ) # Exponential backoff - - def _process_agent( - self, agent: Agent, task: str, img: str = None - ) -> Any: + def run( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): """ - Process a single agent with caching and error handling. + Executes all agents in the workflow concurrently on the given task. Args: - agent: The agent to process - task: Task to execute - img: Optional image input + task (str): The task to be executed by all agents. + img (Optional[str]): Optional image path for agents that support image input. + imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs. Returns: - The agent's output - """ - try: - # Fast path - check cache first - cache_key = f"{task}_{agent.agent_name}" - if cache_key in self._cache: - output = self._cache[cache_key] - else: - # Slow path - run agent and update cache - output = self._run_with_retry(agent, task, img) - - if len(self._cache) >= self.cache_size: - self._cache.pop(next(iter(self._cache))) + The formatted output based on the configured output_type. - self._cache[cache_key] = output - - return output - except Exception as e: - logger.error( - f"Error running agent {agent.agent_name}: {e}" - ) - raise - - def _run( - self, task: str, img: str = None, *args, **kwargs - ) -> Union[Dict[str, Any], str]: - """ - Enhanced run method with parallel execution. + Example: + >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) + >>> result = workflow.run("Analyze this financial data") + >>> print(result) """ - # Fast validation - self._validate_input(task) - self.conversation.add("User", task) - - try: - # Parallel execution with optimized thread pool - with ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = [ - executor.submit( - self._process_agent, agent, task, img - ) - for agent in self.agents - ] - # Wait for all futures to complete - for future in futures: - future.result() - - except Exception as e: - logger.error(f"An error occurred during execution: {e}") - raise e + self.conversation.add(role="User", content=task) + + # Use 95% of available CPU cores for optimal performance + max_workers = int(os.cpu_count() * 0.95) + + # Run agents concurrently using ThreadPoolExecutor + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + # Submit all agent tasks and store with their index + future_to_agent = { + executor.submit( + agent.run, task=task, img=img, imgs=imgs + ): agent + for agent in self.agents + } + + # Collect results and add to conversation in completion order + for future in concurrent.futures.as_completed( + future_to_agent + ): + agent = future_to_agent[future] + output = future.result() + self.conversation.add(role=agent.name, content=output) return history_output_formatter( - self.conversation, - type=self.output_type, + conversation=self.conversation, + output_type=self.output_type, ) - def run( + def batch_run( self, - task: Optional[str] = None, + tasks: List[str], img: Optional[str] = None, - *args, - **kwargs, - ) -> Any: + imgs: Optional[List[str]] = None, + ): """ - Executes the agent's run method with parallel execution. + Executes the workflow on multiple tasks sequentially. Args: - task (Optional[str], optional): The task to be executed. Defaults to None. - img (Optional[str], optional): The image to be processed. Defaults to None. - *args: Additional positional arguments to be passed to the execution method. - **kwargs: Additional keyword arguments to be passed to the execution method. + tasks (List[str]): List of tasks to be executed by all agents. + img (Optional[str]): Optional image path for agents that support image input. + imgs (Optional[List[str]]): Optional list of image paths for agents that support multiple image inputs. Returns: - Any: The result of the execution. - - Raises: - ValueError: If task validation fails. - Exception: If any other error occurs during execution. - """ - if task is not None: - self.tasks.append(task) - - try: - outputs = self._run(task, img, *args, **kwargs) - return outputs - except Exception as e: - logger.error(f"An error occurred during execution: {e}") - raise e + List of results, one for each task. - def run_batched(self, tasks: List[str]) -> Any: - """ - Enhanced batched execution + Example: + >>> workflow = ConcurrentWorkflow(agents=[agent1, agent2]) + >>> tasks = ["Task 1", "Task 2", "Task 3"] + >>> results = workflow.batch_run(tasks) + >>> print(len(results)) # 3 """ - if not tasks: - raise ValueError("Tasks list cannot be empty") - - return [self.run(task) for task in tasks] - - def clear_cache(self): - """Clear the task cache""" - self._cache.clear() - - def get_cache_stats(self) -> Dict[str, int]: - """Get cache statistics""" - return { - "cache_size": len(self._cache), - "max_cache_size": self.cache_size, - } + return [ + self.run(task=task, img=img, imgs=imgs) for task in tasks + ] # if __name__ == "__main__": diff --git a/swarms/structs/ma_utils.py b/swarms/structs/ma_utils.py index a3a9eeb8..b47080b8 100644 --- a/swarms/structs/ma_utils.py +++ b/swarms/structs/ma_utils.py @@ -1,5 +1,8 @@ from typing import List, Any, Optional, Union, Callable import random +from swarms.prompts.collaborative_prompts import ( + get_multi_agent_collaboration_prompt_one, +) def list_all_agents( @@ -8,6 +11,7 @@ def list_all_agents( name: Optional[str] = None, description: Optional[str] = None, add_to_conversation: Optional[bool] = False, + add_collaboration_prompt: Optional[bool] = True, ) -> str: """Lists all agents in a swarm and optionally adds them to a conversation. @@ -60,7 +64,12 @@ def list_all_agents( content=all_agents, ) - return all_agents + if add_collaboration_prompt: + return get_multi_agent_collaboration_prompt_one( + agents_in_swarm=all_agents + ) + else: + return all_agents models = [ diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 023db9d0..4cfe119e 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -1,6 +1,7 @@ +import concurrent.futures +import json import os -import uuid -from datetime import datetime +import traceback from typing import Any, Callable, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field @@ -20,13 +21,15 @@ from swarms.structs.rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from swarms.structs.swarm_matcher import swarm_matcher +from swarms.telemetry.log_executions import log_execution from swarms.utils.output_types import OutputType from swarms.utils.loguru_logger import initialize_logger from swarms.structs.malt import MALT from swarms.structs.deep_research_swarm import DeepResearchSwarm from swarms.structs.council_judge import CouncilAsAJudge from swarms.structs.interactive_groupchat import InteractiveGroupChat - +from swarms.structs.ma_utils import list_all_agents +from swarms.utils.generate_keys import generate_api_key logger = initialize_logger(log_folder="swarm_router") @@ -54,25 +57,6 @@ class Document(BaseModel): data: str -class SwarmLog(BaseModel): - """ - A Pydantic model to capture log entries. - """ - - id: Optional[str] = Field( - default_factory=lambda: str(uuid.uuid4()) - ) - timestamp: Optional[datetime] = Field( - default_factory=datetime.utcnow - ) - level: Optional[str] = None - message: Optional[str] = None - swarm_type: Optional[SwarmType] = None - task: Optional[str] = "" - metadata: Optional[Dict[str, Any]] = Field(default_factory=dict) - documents: List[Document] = [] - - class SwarmRouterConfig(BaseModel): """Configuration model for SwarmRouter.""" @@ -172,12 +156,11 @@ class SwarmRouter: concurrent_batch_run(tasks: List[str], *args, **kwargs) -> List[Any]: Executes multiple tasks concurrently - get_logs() -> List[SwarmLog]: - Retrieves execution logs """ def __init__( self, + id: str = generate_api_key(prefix="swarm-router"), name: str = "swarm-router", description: str = "Routes your task to the desired swarm", max_loops: int = 1, @@ -191,15 +174,18 @@ class SwarmRouter: rules: str = None, documents: List[str] = [], # A list of docs file paths output_type: OutputType = "dict-all-except-first", - no_cluster_ops: bool = False, speaker_fn: callable = None, load_agents_from_csv: bool = False, csv_file_path: str = None, return_entire_history: bool = True, multi_agent_collab_prompt: bool = True, + list_all_agents: bool = False, + conversation: Any = None, + agents_config: Optional[Dict[Any, Any]] = None, *args, **kwargs, ): + self.id = id self.name = name self.description = description self.max_loops = max_loops @@ -213,13 +199,15 @@ class SwarmRouter: self.rules = rules self.documents = documents self.output_type = output_type - self.no_cluster_ops = no_cluster_ops self.speaker_fn = speaker_fn self.logs = [] self.load_agents_from_csv = load_agents_from_csv self.csv_file_path = csv_file_path self.return_entire_history = return_entire_history self.multi_agent_collab_prompt = multi_agent_collab_prompt + self.list_all_agents = list_all_agents + self.conversation = conversation + self.agents_config = agents_config # Reliability check self.reliability_check() @@ -230,6 +218,8 @@ class SwarmRouter: csv_path=self.csv_file_path ).load_agents() + self.agent_config = self.agent_config() + def setup(self): if self.auto_generate_prompts is True: self.activate_ape() @@ -276,15 +266,12 @@ class SwarmRouter: logger.info( f"Successfully activated APE for {activated_count} agents" ) - self._log( - "info", - f"Activated automatic prompt engineering for {activated_count} agents", - ) except Exception as e: error_msg = f"Error activating automatic prompt engineering: {str(e)}" - logger.error(error_msg) - self._log("error", error_msg) + logger.error( + f"Error activating automatic prompt engineering in SwarmRouter: {str(e)}" + ) raise RuntimeError(error_msg) from e def reliability_check(self): @@ -293,48 +280,24 @@ class SwarmRouter: Validates essential swarm parameters and configuration before execution. Handles special case for CouncilAsAJudge which may not require agents. """ - logger.info( - "🔍 [SYSTEM] Initializing advanced swarm reliability diagnostics..." - ) - logger.info( - "⚡ [SYSTEM] Running pre-flight checks and system validation..." - ) # Check swarm type first since it affects other validations if self.swarm_type is None: - logger.error( - "❌ [CRITICAL] Swarm type validation failed - type cannot be 'none'" + raise ValueError( + "SwarmRouter: Swarm type cannot be 'none'." ) - raise ValueError("Swarm type cannot be 'none'.") - # Special handling for CouncilAsAJudge - if self.swarm_type == "CouncilAsAJudge": - if self.agents is not None: - logger.warning( - "⚠️ [ADVISORY] CouncilAsAJudge detected with agents - this is atypical" - ) - elif not self.agents: - logger.error( - "❌ [CRITICAL] Agent validation failed - no agents detected in swarm" + if self.agents is None: + raise ValueError( + "SwarmRouter: No agents provided for the swarm." ) - raise ValueError("No agents provided for the swarm.") # Validate max_loops if self.max_loops == 0: - logger.error( - "❌ [CRITICAL] Loop validation failed - max_loops cannot be 0" - ) - raise ValueError("max_loops cannot be 0.") + raise ValueError("SwarmRouter: max_loops cannot be 0.") - # Setup other functionality - logger.info("🔄 [SYSTEM] Initializing swarm subsystems...") self.setup() - logger.info( - "✅ [SYSTEM] All reliability checks passed successfully" - ) - logger.info("🚀 [SYSTEM] Swarm is ready for deployment") - def _create_swarm(self, task: str = None, *args, **kwargs): """ Dynamically create and return the specified swarm type or automatically match the best swarm type for a given task. @@ -509,37 +472,19 @@ class SwarmRouter: for agent in self.agents ] - def _log( - self, - level: str, - message: str, - task: str = "", - metadata: Dict[str, Any] = None, - ): - """ - Create a log entry and add it to the logs list. + def agent_config(self): + agent_config = {} + for agent in self.agents: + agent_config[agent.agent_name] = agent.to_dict() - Args: - level (str): The log level (e.g., "info", "error"). - message (str): The log message. - task (str, optional): The task being performed. Defaults to "". - metadata (Dict[str, Any], optional): Additional metadata. Defaults to None. - """ - log_entry = SwarmLog( - level=level, - message=message, - swarm_type=self.swarm_type, - task=task, - metadata=metadata or {}, - ) - self.logs.append(log_entry) - logger.log(level.upper(), message) + return agent_config def _run( self, task: str, img: Optional[str] = None, model_response: Optional[str] = None, + imgs: Optional[List[str]] = None, *args, **kwargs, ) -> Any: @@ -559,17 +504,34 @@ class SwarmRouter: """ self.swarm = self._create_swarm(task, *args, **kwargs) + self.conversation = self.swarm.conversation + + if self.list_all_agents is True: + list_all_agents( + agents=self.agents, + conversation=self.swarm.conversation, + name=self.name, + description=self.description, + add_collaboration_prompt=True, + add_to_conversation=True, + ) + if self.multi_agent_collab_prompt is True: self.update_system_prompt_for_agent_in_swarm() - try: - logger.info( - f"Running task on {self.swarm_type} swarm with task: {task}" - ) + log_execution( + swarm_id=self.id, + status="start", + swarm_config=self.to_dict(), + swarm_architecture="swarm_router", + ) + try: if self.swarm_type == "CouncilAsAJudge": result = self.swarm.run( task=task, + img=img, + imgs=imgs, model_response=model_response, *args, **kwargs, @@ -577,21 +539,24 @@ class SwarmRouter: else: result = self.swarm.run(task=task, *args, **kwargs) - logger.info("Swarm completed successfully") + log_execution( + swarm_id=self.id, + status="completion", + swarm_config=self.to_dict(), + swarm_architecture="swarm_router", + ) + return result except Exception as e: - self._log( - "error", - f"Error occurred while running task on {self.swarm_type} swarm: {str(e)}", - task=task, - metadata={"error": str(e)}, + raise RuntimeError( + f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}" ) - raise def run( self, task: str, img: Optional[str] = None, + imgs: Optional[List[str]] = None, model_response: Optional[str] = None, *args, **kwargs, @@ -617,15 +582,24 @@ class SwarmRouter: return self._run( task=task, img=img, + imgs=imgs, model_response=model_response, *args, **kwargs, ) except Exception as e: - logger.error(f"Error executing task on swarm: {str(e)}") - raise + raise RuntimeError( + f"SwarmRouter: Error executing task on swarm: {str(e)} Traceback: {traceback.format_exc()}" + ) - def __call__(self, task: str, *args, **kwargs) -> Any: + def __call__( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + *args, + **kwargs, + ) -> Any: """ Make the SwarmRouter instance callable. @@ -637,10 +611,17 @@ class SwarmRouter: Returns: Any: The result of the swarm's execution. """ - return self.run(task=task, *args, **kwargs) + return self.run( + task=task, img=img, imgs=imgs, *args, **kwargs + ) def batch_run( - self, tasks: List[str], *args, **kwargs + self, + tasks: List[str], + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + *args, + **kwargs, ) -> List[Any]: """ Execute a batch of tasks on the selected or matched swarm type. @@ -659,21 +640,26 @@ class SwarmRouter: results = [] for task in tasks: try: - result = self.run(task, *args, **kwargs) + result = self.run( + task, img=img, imgs=imgs, *args, **kwargs + ) results.append(result) except Exception as e: - self._log( - "error", - f"Error occurred while running batch task on {self.swarm_type} swarm: {str(e)}", - task=task, - metadata={"error": str(e)}, + raise RuntimeError( + f"SwarmRouter: Error executing batch task on swarm: {str(e)} Traceback: {traceback.format_exc()}" ) - raise return results - def async_run(self, task: str, *args, **kwargs) -> Any: + def concurrent_run( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + *args, + **kwargs, + ) -> Any: """ - Execute a task on the selected or matched swarm type asynchronously. + Execute a task on the selected or matched swarm type concurrently. Args: task (str): The task to be executed by the swarm. @@ -686,95 +672,70 @@ class SwarmRouter: Raises: Exception: If an error occurs during task execution. """ - import asyncio - - async def run_async(): - try: - result = await asyncio.to_thread( - self.run, task, *args, **kwargs - ) - return result - except Exception as e: - self._log( - "error", - f"Error occurred while running task asynchronously on {self.swarm_type} swarm: {str(e)}", - task=task, - metadata={"error": str(e)}, - ) - raise - return asyncio.run(run_async()) + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + future = executor.submit( + self.run, task, img=img, imgs=imgs, *args, **kwargs + ) + result = future.result() + return result - def get_logs(self) -> List[SwarmLog]: + def _serialize_callable( + self, attr_value: Callable + ) -> Dict[str, Any]: """ - Retrieve all logged entries. + Serializes callable attributes by extracting their name and docstring. + + Args: + attr_value (Callable): The callable to serialize. Returns: - List[SwarmLog]: A list of all log entries. + Dict[str, Any]: Dictionary with name and docstring of the callable. """ - return self.logs - - def concurrent_run(self, task: str, *args, **kwargs) -> Any: + return { + "name": getattr( + attr_value, "__name__", type(attr_value).__name__ + ), + "doc": getattr(attr_value, "__doc__", None), + } + + def _serialize_attr(self, attr_name: str, attr_value: Any) -> Any: """ - Execute a task on the selected or matched swarm type concurrently. + Serializes an individual attribute, handling non-serializable objects. Args: - task (str): The task to be executed by the swarm. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. + attr_name (str): The name of the attribute. + attr_value (Any): The value of the attribute. Returns: - Any: The result of the swarm's execution. - - Raises: - Exception: If an error occurs during task execution. + Any: The serialized value of the attribute. """ - from concurrent.futures import ThreadPoolExecutor - - with ThreadPoolExecutor( - max_workers=os.cpu_count() - ) as executor: - future = executor.submit(self.run, task, *args, **kwargs) - result = future.result() - return result - - def concurrent_batch_run( - self, tasks: List[str], *args, **kwargs - ) -> List[Any]: + try: + if callable(attr_value): + return self._serialize_callable(attr_value) + elif hasattr(attr_value, "to_dict"): + return ( + attr_value.to_dict() + ) # Recursive serialization for nested objects + else: + json.dumps( + attr_value + ) # Attempt to serialize to catch non-serializable objects + return attr_value + except (TypeError, ValueError): + return f"" + + def to_dict(self) -> Dict[str, Any]: """ - Execute a batch of tasks on the selected or matched swarm type concurrently. - - Args: - tasks (List[str]): A list of tasks to be executed by the swarm. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. + Converts all attributes of the class, including callables, into a dictionary. + Handles non-serializable attributes by converting them or skipping them. Returns: - List[Any]: A list of results from the swarm's execution. - - Raises: - Exception: If an error occurs during task execution. + Dict[str, Any]: A dictionary representation of the class attributes. """ - from concurrent.futures import ( - ThreadPoolExecutor, - as_completed, - ) - - results = [] - with ThreadPoolExecutor() as executor: - # Submit all tasks to executor - futures = [ - executor.submit(self.run, task, *args, **kwargs) - for task in tasks - ] - - # Process results as they complete rather than waiting for all - for future in as_completed(futures): - try: - result = future.result() - results.append(result) - except Exception as e: - logger.error(f"Task execution failed: {str(e)}") - results.append(None) - - return results + return { + attr_name: self._serialize_attr(attr_name, attr_value) + for attr_name, attr_value in self.__dict__.items() + } diff --git a/swarms/telemetry/__init__.py b/swarms/telemetry/__init__.py index 4322217a..a7f92a78 100644 --- a/swarms/telemetry/__init__.py +++ b/swarms/telemetry/__init__.py @@ -1,27 +1,13 @@ from swarms.telemetry.main import ( - generate_unique_identifier, generate_user_id, - get_cpu_info, get_machine_id, - get_os_version, - get_pip_version, - get_python_version, - get_ram_info, - get_system_info, - get_user_device_data, - system_info, + get_comprehensive_system_info, + log_agent_data, ) __all__ = [ "generate_user_id", "get_machine_id", - "get_system_info", - "generate_unique_identifier", - "get_python_version", - "get_pip_version", - "get_os_version", - "get_cpu_info", - "get_ram_info", - "system_info", - "get_user_device_data", + "get_comprehensive_system_info", + "log_agent_data", ] diff --git a/swarms/telemetry/log_executions.py b/swarms/telemetry/log_executions.py new file mode 100644 index 00000000..8fd13837 --- /dev/null +++ b/swarms/telemetry/log_executions.py @@ -0,0 +1,43 @@ +from typing import Optional +from swarms.telemetry.main import log_agent_data + + +def log_execution( + swarm_id: Optional[str] = None, + status: Optional[str] = None, + swarm_config: Optional[dict] = None, + swarm_architecture: Optional[str] = None, +): + """ + Log execution data for a swarm router instance. + + This function logs telemetry data about swarm router executions, including + the swarm ID, execution status, and configuration details. It silently + handles any logging errors to prevent execution interruption. + + Args: + swarm_id (str): Unique identifier for the swarm router instance + status (str): Current status of the execution (e.g., "start", "completion", "error") + swarm_config (dict): Configuration dictionary containing swarm router settings + swarm_architecture (str): Name of the swarm architecture used + Returns: + None + + Example: + >>> log_execution( + ... swarm_id="swarm-router-abc123", + ... status="start", + ... swarm_config={"name": "my-swarm", "swarm_type": "SequentialWorkflow"} + ... ) + """ + try: + log_agent_data( + data_dict={ + "swarm_router_id": swarm_id, + "status": status, + "swarm_router_config": swarm_config, + "swarm_architecture": swarm_architecture, + } + ) + except Exception: + pass diff --git a/swarms/telemetry/main.py b/swarms/telemetry/main.py index 9e64a1d9..5502a51a 100644 --- a/swarms/telemetry/main.py +++ b/swarms/telemetry/main.py @@ -1,16 +1,13 @@ -import os import datetime import hashlib import platform import socket -import subprocess import uuid -from typing import Dict +from typing import Any, Dict -import pkg_resources import psutil import requests -import toml +from functools import lru_cache # Helper functions @@ -34,265 +31,104 @@ def get_machine_id(): return hashed_id -def get_system_info(): - """ - Gathers basic system information. - - Returns: - dict: A dictionary containing system-related information. - """ - info = { +@lru_cache(maxsize=1) +def get_comprehensive_system_info() -> Dict[str, Any]: + # Basic platform and hardware information + system_data = { "platform": platform.system(), "platform_release": platform.release(), "platform_version": platform.version(), + "platform_full": platform.platform(), "architecture": platform.machine(), + "architecture_details": platform.architecture()[0], + "processor": platform.processor(), "hostname": socket.gethostname(), - "ip_address": socket.gethostbyname(socket.gethostname()), - "mac_address": ":".join( + } + + # MAC address + try: + system_data["mac_address"] = ":".join( [ f"{(uuid.getnode() >> elements) & 0xFF:02x}" for elements in range(0, 2 * 6, 8) ][::-1] - ), - "processor": platform.processor(), - "python_version": platform.python_version(), - "Misc": system_info(), - } - return info - - -def generate_unique_identifier(): - """Generate unique identifier - - Returns: - str: unique id - - """ - system_info = get_system_info() - unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_info)) - return str(unique_id) - - -def get_local_ip(): - """Get local ip - - Returns: - str: local ip - - """ - return socket.gethostbyname(socket.gethostname()) - - -def get_user_device_data(): - data = { - "ID": generate_user_id(), - "Machine ID": get_machine_id(), - "System Info": get_system_info(), - "UniqueID": generate_unique_identifier(), - } - return data - - -def get_python_version(): - return platform.python_version() - - -def get_pip_version() -> str: - """Get pip version - - Returns: - str: The version of pip installed - """ - try: - pip_version = ( - subprocess.check_output(["pip", "--version"]) - .decode() - .split()[1] ) except Exception as e: - pip_version = str(e) - return pip_version - - -def get_swarms_verison() -> tuple[str, str]: - """Get swarms version from both command line and package - - Returns: - tuple[str, str]: A tuple containing (command line version, package version) - """ - try: - swarms_verison_cmd = ( - subprocess.check_output(["swarms", "--version"]) - .decode() - .split()[1] - ) - except Exception as e: - swarms_verison_cmd = str(e) - swarms_verison_pkg = pkg_resources.get_distribution( - "swarms" - ).version - swarms_verison = swarms_verison_cmd, swarms_verison_pkg - return swarms_verison - - -def get_os_version() -> str: - """Get operating system version - - Returns: - str: The operating system version and platform details - """ - return platform.platform() + system_data["mac_address"] = f"Error: {str(e)}" + # CPU information + system_data["cpu_count_logical"] = psutil.cpu_count(logical=True) + system_data["cpu_count_physical"] = psutil.cpu_count( + logical=False + ) -def get_cpu_info() -> str: - """Get CPU information - - Returns: - str: The processor information - """ - return platform.processor() - - -def get_ram_info() -> str: - """Get RAM information - - Returns: - str: A formatted string containing total, used and free RAM in GB - """ + # Memory information vm = psutil.virtual_memory() + total_ram_gb = vm.total / (1024**3) used_ram_gb = vm.used / (1024**3) free_ram_gb = vm.free / (1024**3) - total_ram_gb = vm.total / (1024**3) - return ( - f"{total_ram_gb:.2f} GB, used: {used_ram_gb:.2f}, free:" - f" {free_ram_gb:.2f}" + available_ram_gb = vm.available / (1024**3) + + system_data.update( + { + "memory_total_gb": f"{total_ram_gb:.2f}", + "memory_used_gb": f"{used_ram_gb:.2f}", + "memory_free_gb": f"{free_ram_gb:.2f}", + "memory_available_gb": f"{available_ram_gb:.2f}", + "memory_summary": f"Total: {total_ram_gb:.2f} GB, Used: {used_ram_gb:.2f} GB, Free: {free_ram_gb:.2f} GB, Available: {available_ram_gb:.2f} GB", + } ) + # Python version + system_data["python_version"] = platform.python_version() -def get_package_mismatches(file_path: str = "pyproject.toml") -> str: - """Get package version mismatches between pyproject.toml and installed packages - - Args: - file_path (str, optional): Path to pyproject.toml file. Defaults to "pyproject.toml". - - Returns: - str: A formatted string containing package version mismatches - """ - with open(file_path) as file: - pyproject = toml.load(file) - dependencies = pyproject["tool"]["poetry"]["dependencies"] - dev_dependencies = pyproject["tool"]["poetry"]["group"]["dev"][ - "dependencies" - ] - dependencies.update(dev_dependencies) - - installed_packages = { - pkg.key: pkg.version for pkg in pkg_resources.working_set - } - - mismatches = [] - for package, version_info in dependencies.items(): - if isinstance(version_info, dict): - version_info = version_info["version"] - installed_version = installed_packages.get(package) - if installed_version and version_info.startswith("^"): - expected_version = version_info[1:] - if not installed_version.startswith(expected_version): - mismatches.append( - f"\t {package}: Mismatch," - f" pyproject.toml={expected_version}," - f" pip={installed_version}" - ) - else: - mismatches.append(f"\t {package}: Not found in pip list") - - return "\n" + "\n".join(mismatches) - - -def system_info() -> dict[str, str]: - """Get system information including Python, pip, OS, CPU and RAM details - - Returns: - dict[str, str]: A dictionary containing system information - """ - return { - "Python Version": get_python_version(), - "Pip Version": get_pip_version(), - # "Swarms Version": swarms_verison, - "OS Version and Architecture": get_os_version(), - "CPU Info": get_cpu_info(), - "RAM Info": get_ram_info(), - } - - -def capture_system_data() -> Dict[str, str]: - """ - Captures extensive system data including platform information, user ID, IP address, CPU count, - memory information, and other system details. - - Returns: - Dict[str, str]: A dictionary containing system data. - """ + # Generate unique identifier based on system info try: - system_data = { - "platform": platform.system(), - "platform_version": platform.version(), - "platform_release": platform.release(), - "hostname": socket.gethostname(), - "ip_address": socket.gethostbyname(socket.gethostname()), - "cpu_count": psutil.cpu_count(logical=True), - "memory_total": f"{psutil.virtual_memory().total / (1024 ** 3):.2f} GB", - "memory_available": f"{psutil.virtual_memory().available / (1024 ** 3):.2f} GB", - "user_id": str(uuid.uuid4()), # Unique user identifier - "machine_type": platform.machine(), - "processor": platform.processor(), - "architecture": platform.architecture()[0], - } - - return system_data + unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_data)) + system_data["unique_identifier"] = str(unique_id) except Exception as e: - # logger.error("Failed to capture system data: {}", e) - print(f"Failed to capture system data: {e}") + system_data["unique_identifier"] = f"Error: {str(e)}" + + return system_data def _log_agent_data(data_dict: dict): """Simple function to log agent data using requests library""" - if not data_dict: - return url = "https://swarms.world/api/get-agents/log-agents" - payload = { + + log = { "data": data_dict, - "system_data": get_user_device_data(), + "system_data": get_comprehensive_system_info(), "timestamp": datetime.datetime.now( datetime.timezone.utc ).isoformat(), } - key = ( - os.getenv("SWARMS_API_KEY") - or "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24" - ) + payload = { + "data": log, + } + + key = "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24" headers = { "Content-Type": "application/json", "Authorization": key, } - try: - response = requests.post( - url, json=payload, headers=headers, timeout=10 - ) - if response.status_code == 200: - return - except Exception: - return + response = requests.post( + url, json=payload, headers=headers, timeout=10 + ) + print(response.json()) + if response.status_code == 200: + return response.json() - return + print(response.json()) + return response.json() def log_agent_data(data_dict: dict): try: - _log_agent_data(data_dict) + return _log_agent_data(data_dict) except Exception: pass