self healing structuted output

pull/663/head
Occupying-Mars 5 months ago
parent 77cb687263
commit 599776736b

@ -1,15 +1,19 @@
import os
import sys
import ast
import json
import traceback
from typing import Optional, Dict, List, Any
from dataclasses import dataclass
from pathlib import Path
from datetime import datetime
from swarms.utils.terminal_output import terminal
from swarms.structs.agent import Agent
@dataclass
class ErrorContext:
"""Context about an error that occurred"""
error_type: str
error_message: str
traceback: str
@ -21,43 +25,106 @@ class CodeFixerAgent(Agent):
"""An agent specialized in analyzing and fixing code errors"""
def __init__(self, *args, **kwargs):
system_prompt = """You are an expert code debugging and fixing agent. Your role is to:
1. Analyze error messages and stack traces to understand the root cause
2. Examine the code context where the error occurred
3. Propose specific fixes with clear explanations
4. Consider multiple potential solutions and their trade-offs
5. Ensure fixes maintain code quality and follow best practices
When proposing fixes:
- Explain why the error occurred
- Detail what changes need to be made and in which files
- Consider potential side effects of the changes
- Suggest any additional improvements or preventive measures
Format your response as:
ERROR ANALYSIS:
<explanation of what caused the error>
PROPOSED FIX:
File: <file_path>
Lines: <line_numbers>
```<language>
<code changes>
```
EXPLANATION:
<why this fix works and any considerations>
system_prompt = """You are an expert code debugging and fixing agent. Your role is to analyze errors and propose fixes.
When analyzing errors, follow these steps:
1. Examine the error message and stack trace carefully
2. Look at the code context where the error occurred
3. Consider multiple potential causes and solutions
4. Choose the most appropriate fix
5. Explain your reasoning clearly
Your output must follow this exact format:
{
"error_analysis": {
"root_cause": "Brief explanation of what caused the error",
"impact": "What effects this error has on the system",
"severity": "high|medium|low"
},
"proposed_fix": {
"file": "Path to the file that needs changes",
"line_range": "start-end or single line number",
"code_changes": "The actual code changes to make",
"type": "syntax|import|permission|memory|other"
},
"explanation": {
"why_it_works": "Why this fix will solve the problem",
"side_effects": "Any potential side effects to consider",
"alternatives": "Other possible solutions that were considered"
},
"prevention": {
"recommendations": ["List of recommendations to prevent similar errors"],
"best_practices": ["Relevant best practices to follow"]
}
}
Always ensure your response is valid JSON and includes all the required fields.
Be specific about file paths and line numbers.
Include complete code snippets that can be directly applied.
"""
kwargs["system_prompt"] = system_prompt
kwargs["agent_name"] = kwargs.get("agent_name", "Code-Fixer-Agent")
kwargs["output_type"] = "json" # Ensure JSON output
super().__init__(*args, **kwargs)
self.error_history: List[ErrorContext] = []
class SelfHealingAgent(Agent):
"""An agent that can diagnose and fix runtime errors using LLM-based analysis"""
"""An agent that can diagnose and fix runtime errors using LLM-based analysis
This agent uses a specialized CodeFixerAgent to analyze errors and propose fixes.
It can handle various types of errors including:
- Syntax errors
- Import errors
- Permission errors
- Memory errors
- General runtime errors
The agent maintains a history of errors and fixes, and can provide detailed reports
of its healing activities.
Attributes:
error_history (List[ErrorContext]): History of errors encountered
fixer_agent (CodeFixerAgent): Specialized agent for analyzing and fixing errors
max_fix_attempts (int): Maximum number of fix attempts per error
"""
def __init__(self, *args, **kwargs):
system_prompt = """You are a self-healing agent capable of detecting, analyzing, and fixing runtime errors.
Your responses should follow this format:
{
"status": {
"state": "running|error|fixed|failed",
"message": "Current status message",
"timestamp": "ISO timestamp"
},
"error_details": {
"type": "Error type if applicable",
"message": "Error message if applicable",
"location": "File and line number where error occurred"
},
"healing_actions": {
"attempted_fixes": ["List of fixes attempted"],
"successful_fixes": ["List of successful fixes"],
"failed_fixes": ["List of failed fixes"]
},
"system_health": {
"memory_usage": "Current memory usage",
"cpu_usage": "Current CPU usage",
"disk_usage": "Current disk usage"
},
"recommendations": {
"immediate": ["Immediate actions needed"],
"long_term": ["Long-term improvements suggested"]
}
}
"""
kwargs["system_prompt"] = system_prompt
kwargs["agent_name"] = kwargs.get("agent_name", "Self-Healing-Agent")
kwargs["output_type"] = "json" # Ensure JSON output
super().__init__(*args, **kwargs)
# Initialize the code fixer agent
@ -67,24 +134,32 @@ class SelfHealingAgent(Agent):
verbose=True
)
self.error_history = []
self.max_fix_attempts = 3
def diagnose_error(self, error: Exception) -> ErrorContext:
"""Gather context about an error"""
"""Gather detailed context about an error
Args:
error (Exception): The error that occurred
Returns:
ErrorContext: Detailed context about the error
"""
tb = traceback.extract_tb(sys.exc_info()[2])
file_path = None
line_number = None
code_snippet = ""
# Get the last frame from traceback which is usually where the error occurred
if tb:
last_frame = tb[-1]
file_path = last_frame.filename
line_number = last_frame.lineno
# Try to get code context
if file_path and os.path.exists(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
start = max(0, line_number - 5) # Get more context
start = max(0, line_number - 5)
end = min(len(lines), line_number + 5)
code_snippet = ''.join(lines[start:end])
@ -98,9 +173,16 @@ class SelfHealingAgent(Agent):
)
def get_fix_prompt(self, error_context: ErrorContext) -> str:
"""Create a detailed prompt for the fixer agent"""
"""Create a detailed prompt for the fixer agent
Args:
error_context (ErrorContext): Context about the error
Returns:
str: Prompt for the fixer agent
"""
return f"""
An error occurred in the code. Please analyze it and propose a fix.
Analyze this error and propose a fix following the required JSON format.
ERROR TYPE: {error_context.error_type}
ERROR MESSAGE: {error_context.error_message}
@ -115,80 +197,79 @@ class SelfHealingAgent(Agent):
FULL TRACEBACK:
{error_context.traceback}
Please analyze this error and propose a specific fix. Include:
1. What caused the error
2. Exact changes needed (file paths and line numbers)
3. The code that needs to be changed
4. Why the fix will work
5. Any potential side effects to consider
"""
def apply_fix(self, fix_proposal: str) -> bool:
"""Apply the fix proposed by the fixer agent"""
try:
# Parse the fix proposal to extract file and changes
import re
# Extract file path
file_match = re.search(r"File: (.+)", fix_proposal)
if not file_match:
terminal.status_panel("Could not find file path in fix proposal", "error")
return False
file_path = file_match.group(1).strip()
def apply_fix(self, fix_proposal: Dict[str, Any]) -> bool:
"""Apply a fix proposed by the fixer agent
Args:
fix_proposal (Dict[str, Any]): The fix proposal in JSON format
# Extract code changes
code_match = re.search(r"```(?:python)?\n(.*?)\n```", fix_proposal, re.DOTALL)
if not code_match:
terminal.status_panel("Could not find code changes in fix proposal", "error")
return False
new_code = code_match.group(1).strip()
Returns:
bool: Whether the fix was successfully applied
"""
try:
# Extract fix details
file_path = fix_proposal["proposed_fix"]["file"]
line_range = fix_proposal["proposed_fix"]["line_range"]
new_code = fix_proposal["proposed_fix"]["code_changes"]
# Extract line numbers if specified
lines_match = re.search(r"Lines: (.+)", fix_proposal)
line_range = None
if lines_match:
try:
# Parse line range (e.g., "5-10" or "5")
line_spec = lines_match.group(1).strip()
if '-' in line_spec:
start, end = map(int, line_spec.split('-'))
line_range = (start, end)
else:
line_num = int(line_spec)
line_range = (line_num, line_num)
except:
terminal.status_panel("Could not parse line numbers", "warning")
# Parse line range
if '-' in line_range:
start, end = map(int, line_range.split('-'))
else:
start = end = int(line_range)
# Apply the changes
with open(file_path, 'r') as f:
lines = f.readlines()
if line_range:
# Replace specific lines
start, end = line_range
start -= 1 # Convert to 0-based index
lines[start:end] = new_code.splitlines(True)
else:
# Replace the entire file
lines = new_code.splitlines(True)
# Convert to 0-based indexing
start -= 1
lines[start:end] = new_code.splitlines(True)
with open(file_path, 'w') as f:
f.writelines(lines)
terminal.status_panel(f"Applied fix to {file_path}", "success")
terminal.status_panel(
f"Applied fix to {file_path} lines {start+1}-{end}",
"success"
)
return True
except Exception as e:
terminal.status_panel(f"Failed to apply fix: {str(e)}", "error")
return False
def run(self, *args, **kwargs) -> Any:
"""Run with self-healing capabilities using LLM-based analysis"""
def run(self, *args, **kwargs) -> Dict[str, Any]:
"""Run with self-healing capabilities using LLM-based analysis
Returns:
Dict[str, Any]: Structured output about the run and any healing actions
"""
try:
return super().run(*args, **kwargs)
result = super().run(*args, **kwargs)
# Return success status
return {
"status": {
"state": "running",
"message": "Operation completed successfully",
"timestamp": datetime.now().isoformat()
},
"error_details": None,
"healing_actions": {
"attempted_fixes": [],
"successful_fixes": [],
"failed_fixes": []
},
"system_health": self.get_system_health(),
"recommendations": {
"immediate": [],
"long_term": ["Monitor system health regularly"]
}
}
except Exception as error:
terminal.status_panel("Error detected, analyzing with LLM...", "warning")
@ -201,13 +282,102 @@ class SelfHealingAgent(Agent):
fix_proposal = self.fixer_agent.run(fix_prompt)
terminal.status_panel("Fix proposed by LLM:", "info")
terminal.status_panel(fix_proposal, "info")
terminal.status_panel(json.dumps(fix_proposal, indent=2), "info")
# Track healing actions
attempted_fixes = [fix_proposal["proposed_fix"]["type"]]
successful_fixes = []
failed_fixes = []
# Apply the fix
if self.apply_fix(fix_proposal):
terminal.status_panel("Fix applied, retrying operation...", "info")
# Retry the operation
return super().run(*args, **kwargs)
successful_fixes.append(fix_proposal["proposed_fix"]["type"])
try:
# Retry the operation
result = super().run(*args, **kwargs)
return {
"status": {
"state": "fixed",
"message": "Error fixed and operation completed",
"timestamp": datetime.now().isoformat()
},
"error_details": {
"type": error_context.error_type,
"message": error_context.error_message,
"location": f"{error_context.file_path}:{error_context.line_number}"
},
"healing_actions": {
"attempted_fixes": attempted_fixes,
"successful_fixes": successful_fixes,
"failed_fixes": failed_fixes
},
"system_health": self.get_system_health(),
"recommendations": fix_proposal["prevention"]
}
except Exception as e:
failed_fixes.append(fix_proposal["proposed_fix"]["type"])
return {
"status": {
"state": "failed",
"message": "Fix applied but error persists",
"timestamp": datetime.now().isoformat()
},
"error_details": {
"type": type(e).__name__,
"message": str(e),
"location": f"{error_context.file_path}:{error_context.line_number}"
},
"healing_actions": {
"attempted_fixes": attempted_fixes,
"successful_fixes": successful_fixes,
"failed_fixes": failed_fixes
},
"system_health": self.get_system_health(),
"recommendations": {
"immediate": ["Manual intervention required"],
"long_term": fix_proposal["prevention"]["recommendations"]
}
}
else:
terminal.status_panel("Unable to apply fix automatically", "error")
raise
failed_fixes.append(fix_proposal["proposed_fix"]["type"])
return {
"status": {
"state": "error",
"message": "Unable to apply fix",
"timestamp": datetime.now().isoformat()
},
"error_details": {
"type": error_context.error_type,
"message": error_context.error_message,
"location": f"{error_context.file_path}:{error_context.line_number}"
},
"healing_actions": {
"attempted_fixes": attempted_fixes,
"successful_fixes": successful_fixes,
"failed_fixes": failed_fixes
},
"system_health": self.get_system_health(),
"recommendations": {
"immediate": ["Manual intervention required"],
"long_term": fix_proposal["prevention"]["recommendations"]
}
}
def get_system_health(self) -> Dict[str, str]:
"""Get current system health metrics
Returns:
Dict[str, str]: System health metrics
"""
import psutil
return {
"memory_usage": f"{psutil.virtual_memory().percent}%",
"cpu_usage": f"{psutil.cpu_percent()}%",
"disk_usage": f"{psutil.disk_usage('/').percent}%"
}

@ -1 +1,174 @@
import os
import sys
import time
import psutil
import threading
import traceback
from typing import Callable, Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
from swarms.utils.terminal_output import terminal
@dataclass
class SystemMetrics:
cpu_percent: float
memory_percent: float
disk_usage_percent: float
timestamp: datetime
class HealthCheck:
"""System health monitoring and self-healing capabilities"""
def __init__(self):
self.metrics_history: List[SystemMetrics] = []
self.error_count: Dict[str, int] = {}
self.recovery_actions: Dict[str, Callable] = {}
self.monitoring_thread: Optional[threading.Thread] = None
self.stop_monitoring = threading.Event()
# Default thresholds
self.thresholds = {
"cpu_percent": 90.0,
"memory_percent": 85.0,
"disk_usage_percent": 90.0,
"error_threshold": 3
}
def register_recovery_action(self, error_type: str, action: Callable):
"""Register a recovery action for a specific error type"""
self.recovery_actions[error_type] = action
terminal.status_panel(f"Registered recovery action for {error_type}", "info")
def collect_metrics(self) -> SystemMetrics:
"""Collect current system metrics"""
try:
cpu = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory().percent
disk = psutil.disk_usage('/').percent
metrics = SystemMetrics(
cpu_percent=cpu,
memory_percent=memory,
disk_usage_percent=disk,
timestamp=datetime.now()
)
self.metrics_history.append(metrics)
if len(self.metrics_history) > 100: # Keep last 100 readings
self.metrics_history.pop(0)
return metrics
except Exception as e:
terminal.status_panel(f"Error collecting metrics: {str(e)}", "error")
return None
def check_system_health(self) -> bool:
"""Check if system metrics are within acceptable thresholds"""
metrics = self.collect_metrics()
if not metrics:
return False
issues = []
if metrics.cpu_percent > self.thresholds["cpu_percent"]:
issues.append(f"High CPU usage: {metrics.cpu_percent}%")
if metrics.memory_percent > self.thresholds["memory_percent"]:
issues.append(f"High memory usage: {metrics.memory_percent}%")
if metrics.disk_usage_percent > self.thresholds["disk_usage_percent"]:
issues.append(f"High disk usage: {metrics.disk_usage_percent}%")
if issues:
terminal.status_panel("\n".join(issues), "warning")
return False
return True
def handle_error(self, error: Exception, context: str = ""):
"""Handle errors and attempt recovery"""
error_type = type(error).__name__
# Increment error count
self.error_count[error_type] = self.error_count.get(error_type, 0) + 1
terminal.status_panel(
f"Error occurred in {context}: {str(error)}\n{traceback.format_exc()}",
"error"
)
# Check if we need to take recovery action
if self.error_count[error_type] >= self.thresholds["error_threshold"]:
self.attempt_recovery(error_type, error)
def attempt_recovery(self, error_type: str, error: Exception):
"""Attempt to recover from an error"""
terminal.status_panel(f"Attempting recovery for {error_type}", "info")
if error_type in self.recovery_actions:
try:
self.recovery_actions[error_type](error)
terminal.status_panel(f"Recovery action completed for {error_type}", "success")
self.error_count[error_type] = 0 # Reset error count after successful recovery
except Exception as e:
terminal.status_panel(
f"Recovery action failed for {error_type}: {str(e)}",
"error"
)
else:
terminal.status_panel(
f"No recovery action registered for {error_type}",
"warning"
)
def start_monitoring(self):
"""Start continuous system monitoring"""
if self.monitoring_thread and self.monitoring_thread.is_alive():
return
def monitor():
while not self.stop_monitoring.is_set():
healthy = self.check_system_health()
if healthy:
terminal.status_panel("System health check passed", "success")
time.sleep(60) # Check every minute
self.monitoring_thread = threading.Thread(target=monitor, daemon=True)
self.monitoring_thread.start()
terminal.status_panel("System monitoring started", "info")
def stop_monitoring(self):
"""Stop system monitoring"""
if self.monitoring_thread and self.monitoring_thread.is_alive():
self.stop_monitoring.set()
self.monitoring_thread.join()
terminal.status_panel("System monitoring stopped", "info")
def get_health_report(self) -> dict:
"""Generate a health report"""
if not self.metrics_history:
return {"status": "No metrics collected yet"}
latest = self.metrics_history[-1]
avg_metrics = {
"cpu_percent": sum(m.cpu_percent for m in self.metrics_history) / len(self.metrics_history),
"memory_percent": sum(m.memory_percent for m in self.metrics_history) / len(self.metrics_history),
"disk_usage_percent": sum(m.disk_usage_percent for m in self.metrics_history) / len(self.metrics_history)
}
return {
"current_metrics": {
"cpu_percent": latest.cpu_percent,
"memory_percent": latest.memory_percent,
"disk_usage_percent": latest.disk_usage_percent,
"timestamp": latest.timestamp.isoformat()
},
"average_metrics": avg_metrics,
"error_counts": self.error_count,
"status": "Healthy" if self.check_system_health() else "Issues Detected"
}
# Create singleton instance
health_monitor = HealthCheck()
Loading…
Cancel
Save