From 599776736ba1642805fdc3637cb72d264a8fd819 Mon Sep 17 00:00:00 2001 From: Occupying-Mars Date: Thu, 5 Dec 2024 19:40:37 +0530 Subject: [PATCH] self healing structuted output --- swarms/agents/self_healing_agent.py | 354 ++++++++++++++++++++-------- swarms/utils/self_healing.py | 175 +++++++++++++- 2 files changed, 436 insertions(+), 93 deletions(-) diff --git a/swarms/agents/self_healing_agent.py b/swarms/agents/self_healing_agent.py index 392995af..d782ca52 100644 --- a/swarms/agents/self_healing_agent.py +++ b/swarms/agents/self_healing_agent.py @@ -1,15 +1,19 @@ import os import sys +import ast +import json import traceback from typing import Optional, Dict, List, Any from dataclasses import dataclass from pathlib import Path +from datetime import datetime from swarms.utils.terminal_output import terminal from swarms.structs.agent import Agent @dataclass class ErrorContext: + """Context about an error that occurred""" error_type: str error_message: str traceback: str @@ -21,43 +25,106 @@ class CodeFixerAgent(Agent): """An agent specialized in analyzing and fixing code errors""" def __init__(self, *args, **kwargs): - system_prompt = """You are an expert code debugging and fixing agent. Your role is to: - 1. Analyze error messages and stack traces to understand the root cause - 2. Examine the code context where the error occurred - 3. Propose specific fixes with clear explanations - 4. Consider multiple potential solutions and their trade-offs - 5. Ensure fixes maintain code quality and follow best practices - - When proposing fixes: - - Explain why the error occurred - - Detail what changes need to be made and in which files - - Consider potential side effects of the changes - - Suggest any additional improvements or preventive measures - - Format your response as: - ERROR ANALYSIS: - - - PROPOSED FIX: - File: - Lines: - ``` - - ``` - - EXPLANATION: - + system_prompt = """You are an expert code debugging and fixing agent. Your role is to analyze errors and propose fixes. + + When analyzing errors, follow these steps: + 1. Examine the error message and stack trace carefully + 2. Look at the code context where the error occurred + 3. Consider multiple potential causes and solutions + 4. Choose the most appropriate fix + 5. Explain your reasoning clearly + + Your output must follow this exact format: + + { + "error_analysis": { + "root_cause": "Brief explanation of what caused the error", + "impact": "What effects this error has on the system", + "severity": "high|medium|low" + }, + "proposed_fix": { + "file": "Path to the file that needs changes", + "line_range": "start-end or single line number", + "code_changes": "The actual code changes to make", + "type": "syntax|import|permission|memory|other" + }, + "explanation": { + "why_it_works": "Why this fix will solve the problem", + "side_effects": "Any potential side effects to consider", + "alternatives": "Other possible solutions that were considered" + }, + "prevention": { + "recommendations": ["List of recommendations to prevent similar errors"], + "best_practices": ["Relevant best practices to follow"] + } + } + + Always ensure your response is valid JSON and includes all the required fields. + Be specific about file paths and line numbers. + Include complete code snippets that can be directly applied. """ kwargs["system_prompt"] = system_prompt kwargs["agent_name"] = kwargs.get("agent_name", "Code-Fixer-Agent") + kwargs["output_type"] = "json" # Ensure JSON output super().__init__(*args, **kwargs) self.error_history: List[ErrorContext] = [] class SelfHealingAgent(Agent): - """An agent that can diagnose and fix runtime errors using LLM-based analysis""" + """An agent that can diagnose and fix runtime errors using LLM-based analysis + + This agent uses a specialized CodeFixerAgent to analyze errors and propose fixes. + It can handle various types of errors including: + - Syntax errors + - Import errors + - Permission errors + - Memory errors + - General runtime errors + + The agent maintains a history of errors and fixes, and can provide detailed reports + of its healing activities. + + Attributes: + error_history (List[ErrorContext]): History of errors encountered + fixer_agent (CodeFixerAgent): Specialized agent for analyzing and fixing errors + max_fix_attempts (int): Maximum number of fix attempts per error + """ def __init__(self, *args, **kwargs): + system_prompt = """You are a self-healing agent capable of detecting, analyzing, and fixing runtime errors. + Your responses should follow this format: + + { + "status": { + "state": "running|error|fixed|failed", + "message": "Current status message", + "timestamp": "ISO timestamp" + }, + "error_details": { + "type": "Error type if applicable", + "message": "Error message if applicable", + "location": "File and line number where error occurred" + }, + "healing_actions": { + "attempted_fixes": ["List of fixes attempted"], + "successful_fixes": ["List of successful fixes"], + "failed_fixes": ["List of failed fixes"] + }, + "system_health": { + "memory_usage": "Current memory usage", + "cpu_usage": "Current CPU usage", + "disk_usage": "Current disk usage" + }, + "recommendations": { + "immediate": ["Immediate actions needed"], + "long_term": ["Long-term improvements suggested"] + } + } + """ + + kwargs["system_prompt"] = system_prompt + kwargs["agent_name"] = kwargs.get("agent_name", "Self-Healing-Agent") + kwargs["output_type"] = "json" # Ensure JSON output super().__init__(*args, **kwargs) # Initialize the code fixer agent @@ -67,24 +134,32 @@ class SelfHealingAgent(Agent): verbose=True ) + self.error_history = [] + self.max_fix_attempts = 3 + def diagnose_error(self, error: Exception) -> ErrorContext: - """Gather context about an error""" + """Gather detailed context about an error + + Args: + error (Exception): The error that occurred + + Returns: + ErrorContext: Detailed context about the error + """ tb = traceback.extract_tb(sys.exc_info()[2]) file_path = None line_number = None code_snippet = "" - # Get the last frame from traceback which is usually where the error occurred if tb: last_frame = tb[-1] file_path = last_frame.filename line_number = last_frame.lineno - # Try to get code context if file_path and os.path.exists(file_path): with open(file_path, 'r') as f: lines = f.readlines() - start = max(0, line_number - 5) # Get more context + start = max(0, line_number - 5) end = min(len(lines), line_number + 5) code_snippet = ''.join(lines[start:end]) @@ -98,9 +173,16 @@ class SelfHealingAgent(Agent): ) def get_fix_prompt(self, error_context: ErrorContext) -> str: - """Create a detailed prompt for the fixer agent""" + """Create a detailed prompt for the fixer agent + + Args: + error_context (ErrorContext): Context about the error + + Returns: + str: Prompt for the fixer agent + """ return f""" - An error occurred in the code. Please analyze it and propose a fix. + Analyze this error and propose a fix following the required JSON format. ERROR TYPE: {error_context.error_type} ERROR MESSAGE: {error_context.error_message} @@ -115,80 +197,79 @@ class SelfHealingAgent(Agent): FULL TRACEBACK: {error_context.traceback} - - Please analyze this error and propose a specific fix. Include: - 1. What caused the error - 2. Exact changes needed (file paths and line numbers) - 3. The code that needs to be changed - 4. Why the fix will work - 5. Any potential side effects to consider """ - def apply_fix(self, fix_proposal: str) -> bool: - """Apply the fix proposed by the fixer agent""" - try: - # Parse the fix proposal to extract file and changes - import re - - # Extract file path - file_match = re.search(r"File: (.+)", fix_proposal) - if not file_match: - terminal.status_panel("Could not find file path in fix proposal", "error") - return False - - file_path = file_match.group(1).strip() + def apply_fix(self, fix_proposal: Dict[str, Any]) -> bool: + """Apply a fix proposed by the fixer agent + + Args: + fix_proposal (Dict[str, Any]): The fix proposal in JSON format - # Extract code changes - code_match = re.search(r"```(?:python)?\n(.*?)\n```", fix_proposal, re.DOTALL) - if not code_match: - terminal.status_panel("Could not find code changes in fix proposal", "error") - return False - - new_code = code_match.group(1).strip() + Returns: + bool: Whether the fix was successfully applied + """ + try: + # Extract fix details + file_path = fix_proposal["proposed_fix"]["file"] + line_range = fix_proposal["proposed_fix"]["line_range"] + new_code = fix_proposal["proposed_fix"]["code_changes"] - # Extract line numbers if specified - lines_match = re.search(r"Lines: (.+)", fix_proposal) - line_range = None - if lines_match: - try: - # Parse line range (e.g., "5-10" or "5") - line_spec = lines_match.group(1).strip() - if '-' in line_spec: - start, end = map(int, line_spec.split('-')) - line_range = (start, end) - else: - line_num = int(line_spec) - line_range = (line_num, line_num) - except: - terminal.status_panel("Could not parse line numbers", "warning") + # Parse line range + if '-' in line_range: + start, end = map(int, line_range.split('-')) + else: + start = end = int(line_range) # Apply the changes with open(file_path, 'r') as f: lines = f.readlines() - if line_range: - # Replace specific lines - start, end = line_range - start -= 1 # Convert to 0-based index - lines[start:end] = new_code.splitlines(True) - else: - # Replace the entire file - lines = new_code.splitlines(True) + # Convert to 0-based indexing + start -= 1 + lines[start:end] = new_code.splitlines(True) with open(file_path, 'w') as f: f.writelines(lines) - terminal.status_panel(f"Applied fix to {file_path}", "success") + terminal.status_panel( + f"Applied fix to {file_path} lines {start+1}-{end}", + "success" + ) return True except Exception as e: terminal.status_panel(f"Failed to apply fix: {str(e)}", "error") return False - def run(self, *args, **kwargs) -> Any: - """Run with self-healing capabilities using LLM-based analysis""" + def run(self, *args, **kwargs) -> Dict[str, Any]: + """Run with self-healing capabilities using LLM-based analysis + + Returns: + Dict[str, Any]: Structured output about the run and any healing actions + """ try: - return super().run(*args, **kwargs) + result = super().run(*args, **kwargs) + + # Return success status + return { + "status": { + "state": "running", + "message": "Operation completed successfully", + "timestamp": datetime.now().isoformat() + }, + "error_details": None, + "healing_actions": { + "attempted_fixes": [], + "successful_fixes": [], + "failed_fixes": [] + }, + "system_health": self.get_system_health(), + "recommendations": { + "immediate": [], + "long_term": ["Monitor system health regularly"] + } + } + except Exception as error: terminal.status_panel("Error detected, analyzing with LLM...", "warning") @@ -201,13 +282,102 @@ class SelfHealingAgent(Agent): fix_proposal = self.fixer_agent.run(fix_prompt) terminal.status_panel("Fix proposed by LLM:", "info") - terminal.status_panel(fix_proposal, "info") + terminal.status_panel(json.dumps(fix_proposal, indent=2), "info") + + # Track healing actions + attempted_fixes = [fix_proposal["proposed_fix"]["type"]] + successful_fixes = [] + failed_fixes = [] # Apply the fix if self.apply_fix(fix_proposal): terminal.status_panel("Fix applied, retrying operation...", "info") - # Retry the operation - return super().run(*args, **kwargs) + successful_fixes.append(fix_proposal["proposed_fix"]["type"]) + + try: + # Retry the operation + result = super().run(*args, **kwargs) + + return { + "status": { + "state": "fixed", + "message": "Error fixed and operation completed", + "timestamp": datetime.now().isoformat() + }, + "error_details": { + "type": error_context.error_type, + "message": error_context.error_message, + "location": f"{error_context.file_path}:{error_context.line_number}" + }, + "healing_actions": { + "attempted_fixes": attempted_fixes, + "successful_fixes": successful_fixes, + "failed_fixes": failed_fixes + }, + "system_health": self.get_system_health(), + "recommendations": fix_proposal["prevention"] + } + + except Exception as e: + failed_fixes.append(fix_proposal["proposed_fix"]["type"]) + + return { + "status": { + "state": "failed", + "message": "Fix applied but error persists", + "timestamp": datetime.now().isoformat() + }, + "error_details": { + "type": type(e).__name__, + "message": str(e), + "location": f"{error_context.file_path}:{error_context.line_number}" + }, + "healing_actions": { + "attempted_fixes": attempted_fixes, + "successful_fixes": successful_fixes, + "failed_fixes": failed_fixes + }, + "system_health": self.get_system_health(), + "recommendations": { + "immediate": ["Manual intervention required"], + "long_term": fix_proposal["prevention"]["recommendations"] + } + } else: - terminal.status_panel("Unable to apply fix automatically", "error") - raise \ No newline at end of file + failed_fixes.append(fix_proposal["proposed_fix"]["type"]) + return { + "status": { + "state": "error", + "message": "Unable to apply fix", + "timestamp": datetime.now().isoformat() + }, + "error_details": { + "type": error_context.error_type, + "message": error_context.error_message, + "location": f"{error_context.file_path}:{error_context.line_number}" + }, + "healing_actions": { + "attempted_fixes": attempted_fixes, + "successful_fixes": successful_fixes, + "failed_fixes": failed_fixes + }, + "system_health": self.get_system_health(), + "recommendations": { + "immediate": ["Manual intervention required"], + "long_term": fix_proposal["prevention"]["recommendations"] + } + } + + def get_system_health(self) -> Dict[str, str]: + """Get current system health metrics + + Returns: + Dict[str, str]: System health metrics + """ + import psutil + + return { + "memory_usage": f"{psutil.virtual_memory().percent}%", + "cpu_usage": f"{psutil.cpu_percent()}%", + "disk_usage": f"{psutil.disk_usage('/').percent}%" + } \ No newline at end of file diff --git a/swarms/utils/self_healing.py b/swarms/utils/self_healing.py index 0519ecba..08b75b99 100644 --- a/swarms/utils/self_healing.py +++ b/swarms/utils/self_healing.py @@ -1 +1,174 @@ - \ No newline at end of file +import os +import sys +import time +import psutil +import threading +import traceback +from typing import Callable, Dict, List, Optional +from dataclasses import dataclass +from datetime import datetime + +from swarms.utils.terminal_output import terminal + +@dataclass +class SystemMetrics: + cpu_percent: float + memory_percent: float + disk_usage_percent: float + timestamp: datetime + +class HealthCheck: + """System health monitoring and self-healing capabilities""" + + def __init__(self): + self.metrics_history: List[SystemMetrics] = [] + self.error_count: Dict[str, int] = {} + self.recovery_actions: Dict[str, Callable] = {} + self.monitoring_thread: Optional[threading.Thread] = None + self.stop_monitoring = threading.Event() + + # Default thresholds + self.thresholds = { + "cpu_percent": 90.0, + "memory_percent": 85.0, + "disk_usage_percent": 90.0, + "error_threshold": 3 + } + + def register_recovery_action(self, error_type: str, action: Callable): + """Register a recovery action for a specific error type""" + self.recovery_actions[error_type] = action + terminal.status_panel(f"Registered recovery action for {error_type}", "info") + + def collect_metrics(self) -> SystemMetrics: + """Collect current system metrics""" + try: + cpu = psutil.cpu_percent(interval=1) + memory = psutil.virtual_memory().percent + disk = psutil.disk_usage('/').percent + + metrics = SystemMetrics( + cpu_percent=cpu, + memory_percent=memory, + disk_usage_percent=disk, + timestamp=datetime.now() + ) + + self.metrics_history.append(metrics) + if len(self.metrics_history) > 100: # Keep last 100 readings + self.metrics_history.pop(0) + + return metrics + + except Exception as e: + terminal.status_panel(f"Error collecting metrics: {str(e)}", "error") + return None + + def check_system_health(self) -> bool: + """Check if system metrics are within acceptable thresholds""" + metrics = self.collect_metrics() + if not metrics: + return False + + issues = [] + + if metrics.cpu_percent > self.thresholds["cpu_percent"]: + issues.append(f"High CPU usage: {metrics.cpu_percent}%") + + if metrics.memory_percent > self.thresholds["memory_percent"]: + issues.append(f"High memory usage: {metrics.memory_percent}%") + + if metrics.disk_usage_percent > self.thresholds["disk_usage_percent"]: + issues.append(f"High disk usage: {metrics.disk_usage_percent}%") + + if issues: + terminal.status_panel("\n".join(issues), "warning") + return False + + return True + + def handle_error(self, error: Exception, context: str = ""): + """Handle errors and attempt recovery""" + error_type = type(error).__name__ + + # Increment error count + self.error_count[error_type] = self.error_count.get(error_type, 0) + 1 + + terminal.status_panel( + f"Error occurred in {context}: {str(error)}\n{traceback.format_exc()}", + "error" + ) + + # Check if we need to take recovery action + if self.error_count[error_type] >= self.thresholds["error_threshold"]: + self.attempt_recovery(error_type, error) + + def attempt_recovery(self, error_type: str, error: Exception): + """Attempt to recover from an error""" + terminal.status_panel(f"Attempting recovery for {error_type}", "info") + + if error_type in self.recovery_actions: + try: + self.recovery_actions[error_type](error) + terminal.status_panel(f"Recovery action completed for {error_type}", "success") + self.error_count[error_type] = 0 # Reset error count after successful recovery + except Exception as e: + terminal.status_panel( + f"Recovery action failed for {error_type}: {str(e)}", + "error" + ) + else: + terminal.status_panel( + f"No recovery action registered for {error_type}", + "warning" + ) + + def start_monitoring(self): + """Start continuous system monitoring""" + if self.monitoring_thread and self.monitoring_thread.is_alive(): + return + + def monitor(): + while not self.stop_monitoring.is_set(): + healthy = self.check_system_health() + if healthy: + terminal.status_panel("System health check passed", "success") + time.sleep(60) # Check every minute + + self.monitoring_thread = threading.Thread(target=monitor, daemon=True) + self.monitoring_thread.start() + terminal.status_panel("System monitoring started", "info") + + def stop_monitoring(self): + """Stop system monitoring""" + if self.monitoring_thread and self.monitoring_thread.is_alive(): + self.stop_monitoring.set() + self.monitoring_thread.join() + terminal.status_panel("System monitoring stopped", "info") + + def get_health_report(self) -> dict: + """Generate a health report""" + if not self.metrics_history: + return {"status": "No metrics collected yet"} + + latest = self.metrics_history[-1] + avg_metrics = { + "cpu_percent": sum(m.cpu_percent for m in self.metrics_history) / len(self.metrics_history), + "memory_percent": sum(m.memory_percent for m in self.metrics_history) / len(self.metrics_history), + "disk_usage_percent": sum(m.disk_usage_percent for m in self.metrics_history) / len(self.metrics_history) + } + + return { + "current_metrics": { + "cpu_percent": latest.cpu_percent, + "memory_percent": latest.memory_percent, + "disk_usage_percent": latest.disk_usage_percent, + "timestamp": latest.timestamp.isoformat() + }, + "average_metrics": avg_metrics, + "error_counts": self.error_count, + "status": "Healthy" if self.check_system_health() else "Issues Detected" + } + +# Create singleton instance +health_monitor = HealthCheck() \ No newline at end of file