fixed ire agent infinite spiral

pull/1265/head
Steve-Dusty 2 days ago
parent c317eb98e3
commit ff11ef7ac1

@ -18,6 +18,7 @@ Workflow:
""" """
import re
from typing import List, Tuple from typing import List, Tuple
from loguru import logger from loguru import logger
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
@ -36,6 +37,12 @@ Do not include any finance-related content.
""" """
# Configuration constants
MAX_PATHS_PER_ITERATION = 5
SCORE_THRESHOLD = 0.7
EARLY_TERMINATION_SCORE = 0.85
DEFAULT_SCORE = 0.5
class IterativeReflectiveExpansion: class IterativeReflectiveExpansion:
""" """
@ -77,6 +84,66 @@ class IterativeReflectiveExpansion:
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
) )
def _extract_score_robust(self, response: str) -> float:
"""
Robustly extract a score from LLM response using multiple strategies.
:param response: The LLM response text.
:return: Extracted score between 0.0 and 1.0, or DEFAULT_SCORE if extraction fails.
"""
# Strategy 1: Look for "Score: X.X" format (with or without markdown formatting)
for line in response.splitlines():
line_clean = line.strip().replace('*', '') # Remove markdown formatting
if 'score:' in line_clean.lower():
try:
# Extract everything after "score:"
score_str = line_clean.lower().split('score:')[-1].strip()
# Remove any non-numeric characters except decimal point
score_str = re.sub(r'[^\d.]', '', score_str)
if score_str: # Make sure we have something to parse
score = float(score_str)
# Clamp to valid range
return max(0.0, min(1.0, score))
except (ValueError, IndexError):
pass
# Strategy 2: Look for any number between 0 and 1 with context
score_patterns = [
r'score[:\s]+(\d+\.?\d*)',
r'rating[:\s]+(\d+\.?\d*)',
r'effectiveness[:\s]+(\d+\.?\d*)',
r'(\d+\.?\d*)\s*(?:/|out of)\s*(?:10|1\.0|1)',
]
for pattern in score_patterns:
matches = re.findall(pattern, response.lower())
if matches:
try:
score = float(matches[0])
# Normalize if score is out of 10
if score > 1.0:
score = score / 10.0
return max(0.0, min(1.0, score))
except ValueError:
continue
# Strategy 3: Sentiment analysis fallback
positive_keywords = ['excellent', 'good', 'promising', 'effective', 'successful', 'optimal']
negative_keywords = ['poor', 'bad', 'ineffective', 'failed', 'error', 'wrong', 'incorrect']
response_lower = response.lower()
positive_count = sum(1 for kw in positive_keywords if kw in response_lower)
negative_count = sum(1 for kw in negative_keywords if kw in response_lower)
if positive_count > negative_count and positive_count > 0:
return 0.75 # Likely good
elif negative_count > positive_count and negative_count > 0:
return 0.4 # Likely poor
# Default fallback
logger.warning(f"Could not extract score from response, using default: {DEFAULT_SCORE}")
return DEFAULT_SCORE
def generate_initial_hypotheses(self, task: str) -> List[str]: def generate_initial_hypotheses(self, task: str) -> List[str]:
""" """
Generate an initial set of reasoning hypotheses based on the problem input. Generate an initial set of reasoning hypotheses based on the problem input.
@ -110,35 +177,43 @@ class IterativeReflectiveExpansion:
:param path: A candidate reasoning path. :param path: A candidate reasoning path.
:return: A tuple containing the simulated outcome, a numerical score (0.0 to 1.0), and error information. :return: A tuple containing the simulated outcome, a numerical score (0.0 to 1.0), and error information.
""" """
logger.info(f"Simulating path: {path}") logger.info(f"Simulating path: {path[:100]}...")
prompt = ( prompt = (
f"Simulate the following reasoning path step by step and provide:\n" f"Simulate the following reasoning path step by step and provide:\n"
f"1. Outcome: A brief summary of the resulting solution.\n" f"1. Outcome: A brief summary of the resulting solution.\n"
f"2. Score: A numerical effectiveness score between 0.0 and 1.0.\n" f"2. Score: A numerical effectiveness score between 0.0 and 1.0 (REQUIRED - provide a decimal number).\n"
f"3. Errors: Any potential errors or shortcomings identified during the reasoning.\n\n" f"3. Errors: Any potential errors or shortcomings identified during the reasoning.\n\n"
f"IMPORTANT: You MUST provide a score as a decimal number (e.g., 0.8, 0.65, 0.9).\n\n"
f"Reasoning Path: {path}" f"Reasoning Path: {path}"
) )
response = self.agent.run(prompt) response = self.agent.run(prompt)
self.conversation.add( self.conversation.add(
role=self.agent.agent_name, content=response role=self.agent.agent_name, content=response
) )
outcome = "" outcome = ""
score = 0.0
error_info = "" error_info = ""
try:
# Expecting a response with lines starting with "Outcome:", "Score:", and "Errors:" # Extract outcome and errors (handle markdown formatting)
for line in response.splitlines(): for line in response.splitlines():
if line.startswith("Outcome:"): line_stripped = line.strip().replace('*', '') # Remove markdown
outcome = line[len("Outcome:") :].strip() line_lower = line_stripped.lower()
elif line.startswith("Score:"):
score = float(line[len("Score:") :].strip()) if 'outcome:' in line_lower:
elif line.startswith("Errors:"): outcome = line_stripped.split(':', 1)[-1].strip()
error_info = line[len("Errors:") :].strip() elif 'errors:' in line_lower or 'error:' in line_lower:
except Exception as e: error_info = line_stripped.split(':', 1)[-1].strip()
logger.error(f"Error parsing simulation response: {e}")
logger.debug( # Use robust score extraction
f"Simulated outcome: {outcome}, Score: {score}, Errors: {error_info}" score = self._extract_score_robust(response)
)
# If no explicit errors found, check for error indicators in outcome
if not error_info and outcome:
error_keywords = ['error', 'fail', 'incorrect', 'wrong', 'issue', 'problem']
if any(kw in outcome.lower() for kw in error_keywords):
error_info = "Potential issues identified in outcome"
logger.info(f"Path score: {score:.2f} | Outcome length: {len(outcome)} chars")
return outcome, score, error_info return outcome, score, error_info
def meta_reflect(self, error_info: str) -> str: def meta_reflect(self, error_info: str) -> str:
@ -195,24 +270,48 @@ class IterativeReflectiveExpansion:
Select the most promising reasoning paths from a list of candidates. Select the most promising reasoning paths from a list of candidates.
:param paths: A list of candidate reasoning paths. :param paths: A list of candidate reasoning paths.
:return: A pruned list containing the most promising paths. :return: A pruned list containing the most promising paths (max MAX_PATHS_PER_ITERATION).
""" """
logger.info("Selecting promising reasoning paths.") if not paths:
logger.warning("No paths provided for selection")
return []
# If already within limit, return as is
if len(paths) <= MAX_PATHS_PER_ITERATION:
logger.info(f"Path count ({len(paths)}) within limit, keeping all")
return paths
logger.info(f"Selecting top {MAX_PATHS_PER_ITERATION} from {len(paths)} paths")
# Truncate paths for display to avoid overwhelming the LLM
paths_display = [p[:200] + "..." if len(p) > 200 else p for p in paths]
prompt = ( prompt = (
"Evaluate the following reasoning paths and select the ones that appear most promising for further exploration. " f"Evaluate the following {len(paths)} reasoning paths and select ONLY the {MAX_PATHS_PER_ITERATION} most promising ones. "
"List each selected path on a new line:\n" f"Return EXACTLY {MAX_PATHS_PER_ITERATION} paths, each on a new line. Do not add commentary.\n\n"
+ "\n".join(paths) "Paths:\n"
+ "\n".join(f"{i+1}. {p}" for i, p in enumerate(paths_display))
) )
response = self.agent.run(prompt) response = self.agent.run(prompt)
self.conversation.add( self.conversation.add(
role=self.agent.agent_name, content=response role=self.agent.agent_name, content=response
) )
selected_paths = [ selected_paths = [
line.strip() line.strip()
for line in response.split("\n") for line in response.split("\n")
if line.strip() if line.strip() and not line.strip().startswith('#')
] ]
logger.debug(f"Selected paths: {selected_paths}")
# Hard limit enforcement - take first MAX_PATHS_PER_ITERATION
selected_paths = selected_paths[:MAX_PATHS_PER_ITERATION]
# If LLM failed to return paths, fall back to first N original paths
if len(selected_paths) < MAX_PATHS_PER_ITERATION:
logger.warning(f"LLM returned only {len(selected_paths)} paths, using first {MAX_PATHS_PER_ITERATION} original paths")
selected_paths = paths[:MAX_PATHS_PER_ITERATION]
logger.info(f"Selected {len(selected_paths)} paths for next iteration")
return selected_paths return selected_paths
def synthesize_solution( def synthesize_solution(
@ -250,37 +349,106 @@ class IterativeReflectiveExpansion:
:return: The final solution generated after iterative reasoning. :return: The final solution generated after iterative reasoning.
""" """
logger.info( logger.info(
f"Starting iterative reflective expansion for problem: {task}" f"Starting IRE reasoning | Max iterations: {self.max_iterations} | Task: {task[:100]}..."
) )
candidate_paths = self.generate_initial_hypotheses(task) candidate_paths = self.generate_initial_hypotheses(task)
logger.info(f"Generated {len(candidate_paths)} initial hypotheses")
# Limit initial paths
if len(candidate_paths) > MAX_PATHS_PER_ITERATION:
logger.warning(f"Limiting initial paths from {len(candidate_paths)} to {MAX_PATHS_PER_ITERATION}")
candidate_paths = candidate_paths[:MAX_PATHS_PER_ITERATION]
memory_pool: List[str] = [] memory_pool: List[str] = []
best_score_overall = 0.0
early_termination = False
for iteration in range(self.max_iterations): for iteration in range(self.max_iterations):
logger.info( logger.info(
f"Iteration {iteration + 1}/{self.max_iterations}" f"\n{'='*60}\nIteration {iteration + 1}/{self.max_iterations} | Processing {len(candidate_paths)} paths\n{'='*60}"
) )
expanded_paths: List[str] = [] expanded_paths: List[str] = []
iteration_best_score = 0.0
high_quality_paths = 0
for path in candidate_paths: for idx, path in enumerate(candidate_paths):
logger.info(f"[Path {idx + 1}/{len(candidate_paths)}] Simulating...")
outcome, score, error_info = self.simulate_path(path) outcome, score, error_info = self.simulate_path(path)
# Use a threshold score of 0.7 (this can be adjusted)
if score < 0.7: # Track best score
feedback = self.meta_reflect(error_info) iteration_best_score = max(iteration_best_score, score)
revised_paths = self.revise_path(path, feedback) best_score_overall = max(best_score_overall, score)
expanded_paths.extend(revised_paths)
# Check for early termination
if score >= EARLY_TERMINATION_SCORE:
high_quality_paths += 1
logger.info(f"High-quality path found (score: {score:.2f})")
expanded_paths.append(path)
# Early termination if we have excellent solution
if score >= 0.9:
logger.info(f"Excellent solution found (score: {score:.2f})! Triggering early termination.")
expanded_paths = [path] # Use only this path
early_termination = True
break
elif score < SCORE_THRESHOLD:
# Only revise if score is below threshold
logger.info(f"Path scored {score:.2f} (below {SCORE_THRESHOLD}), revising...")
if error_info:
feedback = self.meta_reflect(error_info)
revised_paths = self.revise_path(path, feedback)
# Limit number of revisions per path
revised_paths = revised_paths[:3]
expanded_paths.extend(revised_paths)
logger.info(f"Generated {len(revised_paths)} revised paths")
else:
# No explicit errors, keep original path
expanded_paths.append(path)
else: else:
# Good enough, keep it
logger.info(f"Path scored {score:.2f}, keeping as-is")
expanded_paths.append(path) expanded_paths.append(path)
memory_pool.extend(candidate_paths)
candidate_paths = self.select_promising_paths(
expanded_paths
)
logger.info( logger.info(
f"Candidate paths for next iteration: {candidate_paths}" f"\nIteration {iteration + 1} Summary:\n"
f" - Paths processed: {len(candidate_paths)}\n"
f" - Expanded to: {len(expanded_paths)} paths\n"
f" - Best score this iteration: {iteration_best_score:.2f}\n"
f" - Best score overall: {best_score_overall:.2f}\n"
f" - High-quality paths: {high_quality_paths}"
) )
# Check for early termination
if early_termination:
logger.info("Early termination triggered - excellent solution found")
memory_pool.extend(candidate_paths)
candidate_paths = expanded_paths
break
# If we have multiple high-quality paths, we can stop iterating
if high_quality_paths >= 2 and iteration >= 1:
logger.info(f"Found {high_quality_paths} high-quality paths, stopping iteration")
memory_pool.extend(candidate_paths)
candidate_paths = expanded_paths
break
memory_pool.extend(candidate_paths)
# Select promising paths for next iteration
candidate_paths = self.select_promising_paths(expanded_paths)
# Safety check: if no paths remain, break
if not candidate_paths:
logger.warning("No candidate paths remain, terminating early")
candidate_paths = expanded_paths[:MAX_PATHS_PER_ITERATION] if expanded_paths else []
break
logger.info(f"\n{'='*60}\nSynthesizing final solution from {len(candidate_paths)} paths\n{'='*60}")
self.synthesize_solution(candidate_paths, memory_pool) self.synthesize_solution(candidate_paths, memory_pool)
logger.info("Final solution generated.") logger.info("IRE reasoning complete.")
return history_output_formatter( return history_output_formatter(
self.conversation, self.output_type self.conversation, self.output_type

Loading…
Cancel
Save