diff --git a/swarms/agents/i_agent.py b/swarms/agents/i_agent.py index 674c3f4a..7a257964 100644 --- a/swarms/agents/i_agent.py +++ b/swarms/agents/i_agent.py @@ -18,6 +18,7 @@ Workflow: """ +import re from typing import List, Tuple from loguru import logger from swarms.structs.agent import Agent @@ -36,6 +37,12 @@ Do not include any finance-related content. """ +# Configuration constants +MAX_PATHS_PER_ITERATION = 5 +SCORE_THRESHOLD = 0.7 +EARLY_TERMINATION_SCORE = 0.85 +DEFAULT_SCORE = 0.5 + class IterativeReflectiveExpansion: """ @@ -77,6 +84,66 @@ class IterativeReflectiveExpansion: dynamic_temperature_enabled=True, ) + def _extract_score_robust(self, response: str) -> float: + """ + Robustly extract a score from LLM response using multiple strategies. + + :param response: The LLM response text. + :return: Extracted score between 0.0 and 1.0, or DEFAULT_SCORE if extraction fails. + """ + # Strategy 1: Look for "Score: X.X" format (with or without markdown formatting) + for line in response.splitlines(): + line_clean = line.strip().replace('*', '') # Remove markdown formatting + if 'score:' in line_clean.lower(): + try: + # Extract everything after "score:" + score_str = line_clean.lower().split('score:')[-1].strip() + # Remove any non-numeric characters except decimal point + score_str = re.sub(r'[^\d.]', '', score_str) + if score_str: # Make sure we have something to parse + score = float(score_str) + # Clamp to valid range + return max(0.0, min(1.0, score)) + except (ValueError, IndexError): + pass + + # Strategy 2: Look for any number between 0 and 1 with context + score_patterns = [ + r'score[:\s]+(\d+\.?\d*)', + r'rating[:\s]+(\d+\.?\d*)', + r'effectiveness[:\s]+(\d+\.?\d*)', + r'(\d+\.?\d*)\s*(?:/|out of)\s*(?:10|1\.0|1)', + ] + + for pattern in score_patterns: + matches = re.findall(pattern, response.lower()) + if matches: + try: + score = float(matches[0]) + # Normalize if score is out of 10 + if score > 1.0: + score = score / 10.0 + return max(0.0, min(1.0, score)) + except ValueError: + continue + + # Strategy 3: Sentiment analysis fallback + positive_keywords = ['excellent', 'good', 'promising', 'effective', 'successful', 'optimal'] + negative_keywords = ['poor', 'bad', 'ineffective', 'failed', 'error', 'wrong', 'incorrect'] + + response_lower = response.lower() + positive_count = sum(1 for kw in positive_keywords if kw in response_lower) + negative_count = sum(1 for kw in negative_keywords if kw in response_lower) + + if positive_count > negative_count and positive_count > 0: + return 0.75 # Likely good + elif negative_count > positive_count and negative_count > 0: + return 0.4 # Likely poor + + # Default fallback + logger.warning(f"Could not extract score from response, using default: {DEFAULT_SCORE}") + return DEFAULT_SCORE + def generate_initial_hypotheses(self, task: str) -> List[str]: """ Generate an initial set of reasoning hypotheses based on the problem input. @@ -110,35 +177,43 @@ class IterativeReflectiveExpansion: :param path: A candidate reasoning path. :return: A tuple containing the simulated outcome, a numerical score (0.0 to 1.0), and error information. """ - logger.info(f"Simulating path: {path}") + logger.info(f"Simulating path: {path[:100]}...") prompt = ( f"Simulate the following reasoning path step by step and provide:\n" f"1. Outcome: A brief summary of the resulting solution.\n" - f"2. Score: A numerical effectiveness score between 0.0 and 1.0.\n" + f"2. Score: A numerical effectiveness score between 0.0 and 1.0 (REQUIRED - provide a decimal number).\n" f"3. Errors: Any potential errors or shortcomings identified during the reasoning.\n\n" + f"IMPORTANT: You MUST provide a score as a decimal number (e.g., 0.8, 0.65, 0.9).\n\n" f"Reasoning Path: {path}" ) response = self.agent.run(prompt) self.conversation.add( role=self.agent.agent_name, content=response ) + outcome = "" - score = 0.0 error_info = "" - try: - # Expecting a response with lines starting with "Outcome:", "Score:", and "Errors:" - for line in response.splitlines(): - if line.startswith("Outcome:"): - outcome = line[len("Outcome:") :].strip() - elif line.startswith("Score:"): - score = float(line[len("Score:") :].strip()) - elif line.startswith("Errors:"): - error_info = line[len("Errors:") :].strip() - except Exception as e: - logger.error(f"Error parsing simulation response: {e}") - logger.debug( - f"Simulated outcome: {outcome}, Score: {score}, Errors: {error_info}" - ) + + # Extract outcome and errors (handle markdown formatting) + for line in response.splitlines(): + line_stripped = line.strip().replace('*', '') # Remove markdown + line_lower = line_stripped.lower() + + if 'outcome:' in line_lower: + outcome = line_stripped.split(':', 1)[-1].strip() + elif 'errors:' in line_lower or 'error:' in line_lower: + error_info = line_stripped.split(':', 1)[-1].strip() + + # Use robust score extraction + score = self._extract_score_robust(response) + + # If no explicit errors found, check for error indicators in outcome + if not error_info and outcome: + error_keywords = ['error', 'fail', 'incorrect', 'wrong', 'issue', 'problem'] + if any(kw in outcome.lower() for kw in error_keywords): + error_info = "Potential issues identified in outcome" + + logger.info(f"Path score: {score:.2f} | Outcome length: {len(outcome)} chars") return outcome, score, error_info def meta_reflect(self, error_info: str) -> str: @@ -195,24 +270,48 @@ class IterativeReflectiveExpansion: Select the most promising reasoning paths from a list of candidates. :param paths: A list of candidate reasoning paths. - :return: A pruned list containing the most promising paths. + :return: A pruned list containing the most promising paths (max MAX_PATHS_PER_ITERATION). """ - logger.info("Selecting promising reasoning paths.") + if not paths: + logger.warning("No paths provided for selection") + return [] + + # If already within limit, return as is + if len(paths) <= MAX_PATHS_PER_ITERATION: + logger.info(f"Path count ({len(paths)}) within limit, keeping all") + return paths + + logger.info(f"Selecting top {MAX_PATHS_PER_ITERATION} from {len(paths)} paths") + + # Truncate paths for display to avoid overwhelming the LLM + paths_display = [p[:200] + "..." if len(p) > 200 else p for p in paths] + prompt = ( - "Evaluate the following reasoning paths and select the ones that appear most promising for further exploration. " - "List each selected path on a new line:\n" - + "\n".join(paths) + f"Evaluate the following {len(paths)} reasoning paths and select ONLY the {MAX_PATHS_PER_ITERATION} most promising ones. " + f"Return EXACTLY {MAX_PATHS_PER_ITERATION} paths, each on a new line. Do not add commentary.\n\n" + "Paths:\n" + + "\n".join(f"{i+1}. {p}" for i, p in enumerate(paths_display)) ) response = self.agent.run(prompt) self.conversation.add( role=self.agent.agent_name, content=response ) + selected_paths = [ line.strip() for line in response.split("\n") - if line.strip() + if line.strip() and not line.strip().startswith('#') ] - logger.debug(f"Selected paths: {selected_paths}") + + # Hard limit enforcement - take first MAX_PATHS_PER_ITERATION + selected_paths = selected_paths[:MAX_PATHS_PER_ITERATION] + + # If LLM failed to return paths, fall back to first N original paths + if len(selected_paths) < MAX_PATHS_PER_ITERATION: + logger.warning(f"LLM returned only {len(selected_paths)} paths, using first {MAX_PATHS_PER_ITERATION} original paths") + selected_paths = paths[:MAX_PATHS_PER_ITERATION] + + logger.info(f"Selected {len(selected_paths)} paths for next iteration") return selected_paths def synthesize_solution( @@ -250,37 +349,106 @@ class IterativeReflectiveExpansion: :return: The final solution generated after iterative reasoning. """ logger.info( - f"Starting iterative reflective expansion for problem: {task}" + f"Starting IRE reasoning | Max iterations: {self.max_iterations} | Task: {task[:100]}..." ) + candidate_paths = self.generate_initial_hypotheses(task) + logger.info(f"Generated {len(candidate_paths)} initial hypotheses") + + # Limit initial paths + if len(candidate_paths) > MAX_PATHS_PER_ITERATION: + logger.warning(f"Limiting initial paths from {len(candidate_paths)} to {MAX_PATHS_PER_ITERATION}") + candidate_paths = candidate_paths[:MAX_PATHS_PER_ITERATION] + memory_pool: List[str] = [] + best_score_overall = 0.0 + early_termination = False for iteration in range(self.max_iterations): logger.info( - f"Iteration {iteration + 1}/{self.max_iterations}" + f"\n{'='*60}\nIteration {iteration + 1}/{self.max_iterations} | Processing {len(candidate_paths)} paths\n{'='*60}" ) + expanded_paths: List[str] = [] + iteration_best_score = 0.0 + high_quality_paths = 0 - for path in candidate_paths: + for idx, path in enumerate(candidate_paths): + logger.info(f"[Path {idx + 1}/{len(candidate_paths)}] Simulating...") outcome, score, error_info = self.simulate_path(path) - # Use a threshold score of 0.7 (this can be adjusted) - if score < 0.7: - feedback = self.meta_reflect(error_info) - revised_paths = self.revise_path(path, feedback) - expanded_paths.extend(revised_paths) + + # Track best score + iteration_best_score = max(iteration_best_score, score) + best_score_overall = max(best_score_overall, score) + + # Check for early termination + if score >= EARLY_TERMINATION_SCORE: + high_quality_paths += 1 + logger.info(f"High-quality path found (score: {score:.2f})") + expanded_paths.append(path) + + # Early termination if we have excellent solution + if score >= 0.9: + logger.info(f"Excellent solution found (score: {score:.2f})! Triggering early termination.") + expanded_paths = [path] # Use only this path + early_termination = True + break + + elif score < SCORE_THRESHOLD: + # Only revise if score is below threshold + logger.info(f"Path scored {score:.2f} (below {SCORE_THRESHOLD}), revising...") + if error_info: + feedback = self.meta_reflect(error_info) + revised_paths = self.revise_path(path, feedback) + # Limit number of revisions per path + revised_paths = revised_paths[:3] + expanded_paths.extend(revised_paths) + logger.info(f"Generated {len(revised_paths)} revised paths") + else: + # No explicit errors, keep original path + expanded_paths.append(path) else: + # Good enough, keep it + logger.info(f"Path scored {score:.2f}, keeping as-is") expanded_paths.append(path) - memory_pool.extend(candidate_paths) - candidate_paths = self.select_promising_paths( - expanded_paths - ) logger.info( - f"Candidate paths for next iteration: {candidate_paths}" + f"\nIteration {iteration + 1} Summary:\n" + f" - Paths processed: {len(candidate_paths)}\n" + f" - Expanded to: {len(expanded_paths)} paths\n" + f" - Best score this iteration: {iteration_best_score:.2f}\n" + f" - Best score overall: {best_score_overall:.2f}\n" + f" - High-quality paths: {high_quality_paths}" ) + # Check for early termination + if early_termination: + logger.info("Early termination triggered - excellent solution found") + memory_pool.extend(candidate_paths) + candidate_paths = expanded_paths + break + + # If we have multiple high-quality paths, we can stop iterating + if high_quality_paths >= 2 and iteration >= 1: + logger.info(f"Found {high_quality_paths} high-quality paths, stopping iteration") + memory_pool.extend(candidate_paths) + candidate_paths = expanded_paths + break + + memory_pool.extend(candidate_paths) + + # Select promising paths for next iteration + candidate_paths = self.select_promising_paths(expanded_paths) + + # Safety check: if no paths remain, break + if not candidate_paths: + logger.warning("No candidate paths remain, terminating early") + candidate_paths = expanded_paths[:MAX_PATHS_PER_ITERATION] if expanded_paths else [] + break + + logger.info(f"\n{'='*60}\nSynthesizing final solution from {len(candidate_paths)} paths\n{'='*60}") self.synthesize_solution(candidate_paths, memory_pool) - logger.info("Final solution generated.") + logger.info("IRE reasoning complete.") return history_output_formatter( self.conversation, self.output_type