diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index e3f0a73d..83f01d6f 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -13,6 +13,7 @@ from rich.progress import ( from rich.table import Table from rich.text import Text from rich.spinner import Spinner +from rich.tree import Tree from rich.markdown import Markdown @@ -302,12 +303,13 @@ class Formatter: A class for formatting and printing rich text to the console. """ - def __init__(self, md: bool = True): + def __init__(self, md: bool = True, show_swarm_structure: bool = False): """ Initializes the Formatter with a Rich Console instance. Args: md (bool): Enable markdown output rendering. Defaults to True. + show_swarm_structure (bool): Enable automatic swarm structure visualization. Defaults to False. """ self.console = Console() self._dashboard_live = None @@ -329,6 +331,9 @@ class Formatter: self.markdown_handler = ( MarkdownOutputHandler(self.console) if md else None ) + + # Swarm structure visualization setting + self.show_swarm_structure = show_swarm_structure def _get_status_with_loading(self, status: str) -> Text: """ @@ -720,6 +725,326 @@ class Formatter: self.console.print() # Add blank line after stopping self._dashboard_live = None + def _is_swarm(self, obj: Any) -> bool: + """ + Check if an object is a swarm by examining its attributes and class name. + + This method works globally with any object that has swarm-like characteristics, + making it work across all swarm types without needing individual implementations. + + Args: + obj (Any): The object to check. + + Returns: + bool: True if the object appears to be a swarm, False otherwise. + """ + if obj is None: + return False + + # Check if object has 'agents' attribute (common to swarms) + has_agents = hasattr(obj, "agents") + + # Check if object has 'name' or 'agent_name' attribute (common to swarms) + has_name = hasattr(obj, "name") or hasattr(obj, "agent_name") + + # Check class name for swarm indicators + class_name = type(obj).__name__.lower() + is_swarm_class = ( + "swarm" in class_name + or "board" in class_name + or "hierarchical" in class_name + or "heavy" in class_name + or "parliament" in class_name + ) + + # Object is likely a swarm if it has agents and (name or is a swarm class) + return has_agents and (has_name or is_swarm_class) + + def _get_swarm_name(self, swarm: Any) -> str: + """ + Get the name of a swarm object. + + Args: + swarm (Any): The swarm object. + + Returns: + str: The name of the swarm, or a default name if not available. + """ + # Prefer the agent-facing name when available (`agent_name`), then `name`, + # and finally fall back to the class name. This ties displayed labels to + # the Agent API (`agent_name`) where swarms are used as agents. + agent_name = getattr(swarm, "agent_name", None) + if agent_name: + return agent_name + + name = getattr(swarm, "name", None) + if name: + return name + + return type(swarm).__name__ + + def _get_swarm_type(self, swarm: Any) -> str: + """ + Get the type/class name of a swarm object. + + Args: + swarm (Any): The swarm object. + + Returns: + str: The class name of the swarm. + """ + # If this object is an adapter/wrapper with an inner `swarm` attribute, + # prefer the inner swarm's class name so tree labels show the real swarm type. + inner = getattr(swarm, "swarm", None) + if inner is not None: + return type(inner).__name__ + return type(swarm).__name__ + + def _get_agent_type_summary(self, agents: List[Any]) -> str: + """ + Get a summary of agent types in a list. + + Args: + agents (List[Any]): List of agents. + + Returns: + str: Summary string describing the agent types. + """ + if not agents: + return "No agents" + + # Count different agent types + agent_types = {} + for agent in agents: + agent_type = type(agent).__name__ + agent_types[agent_type] = agent_types.get(agent_type, 0) + 1 + + # Format summary + if len(agent_types) == 1: + agent_type_name = list(agent_types.keys())[0] + count = agent_types[agent_type_name] + return f"{count} {agent_type_name}{'s' if count > 1 else ''} (Leaf Level)" + else: + parts = [f"{count} {name}" for name, count in agent_types.items()] + return f"{', '.join(parts)} (Leaf Level)" + + def _build_rich_tree( + self, swarm: Any, tree_node: Optional[Tree] = None, visited: Optional[set] = None, root_tree: Optional[Tree] = None + ) -> Tree: + """ + Recursively build a Rich Tree structure representation of a swarm hierarchy. + + Args: + swarm (Any): The swarm object to visualize. + tree_node (Optional[Tree]): Parent Rich Tree node. If None, creates root. + visited (Optional[set]): Set of visited swarm IDs to prevent cycles. + root_tree (Optional[Tree]): Root tree node for tracking. Internal use. + + Returns: + Tree: Rich Tree object representing the swarm hierarchy (root tree). + """ + if visited is None: + visited = set() + + # Get swarm name and type + swarm_name = self._get_swarm_name(swarm) + swarm_type = self._get_swarm_type(swarm) + + # Create tree node label with styling + label = Text() + label.append(swarm_name, style="bold cyan") + label.append(" (", style="white") + label.append(swarm_type, style="yellow") + label.append(")", style="white") + + # Check if we've already visited this swarm (prevent cycles) + swarm_id = id(swarm) + if swarm_id in visited: + if tree_node is None: + tree = Tree(label) + root_tree = tree + else: + tree = tree_node.add(label) + tree.add("[dim italic]Circular Reference[/dim italic]", style="red") + return root_tree if root_tree else tree + + visited.add(swarm_id) + + # Create root tree or add to parent + if tree_node is None: + # Create root tree expanded so deep branches are visible by default + tree = Tree(label, expanded=True) + root_tree = tree + else: + # Add child node and expand it to show its children inline + tree = tree_node.add(label, expanded=True) + + # Get agents if available + agents = [] + if hasattr(swarm, "agents"): + agents = swarm.agents if isinstance(swarm.agents, list) else [] + elif hasattr(swarm, "create_agents"): + # For HeavySwarm and similar, agents might be in a dict + try: + agents_dict = swarm.create_agents() + if isinstance(agents_dict, dict): + agents = list(agents_dict.values()) + elif isinstance(agents_dict, list): + agents = agents_dict + except Exception: + pass + + if not agents: + visited.remove(swarm_id) + return root_tree if root_tree else tree + + # Separate swarms from leaf agents + swarm_agents = [] + leaf_agents = [] + + for agent in agents: + if self._is_swarm(agent): + swarm_agents.append(agent) + else: + leaf_agents.append(agent) + + # Process swarm agents first - recursively build subtrees + for agent in swarm_agents: + self._build_rich_tree(agent, tree, visited.copy(), root_tree) + + # Process leaf agents - show each agent individually with its name + for agent in leaf_agents: + # Use canonical name and type helpers so labels consistently reflect + # `agent_name`/`name` and the underlying class (unwrapping adapters). + agent_name = self._get_swarm_name(agent) + agent_type = self._get_swarm_type(agent) + + # Create label for individual agent + agent_label = Text() + agent_label.append(agent_name, style="green") + agent_label.append(" (", style="white") + agent_label.append(agent_type, style="dim green") + agent_label.append(")", style="white") + + tree.add(agent_label) + + visited.remove(swarm_id) + return root_tree if root_tree else tree + + def print_swarm_structure(self, swarm: Any, title: str = "Nested Structure:") -> None: + """ + Print a visual tree representation of a nested swarm structure using Rich Tree. + + Args: + swarm (Any): The root swarm object to visualize. + title (str): Title to display above the structure. Defaults to "Nested Structure:". + """ + if not swarm: + return + + # Always print title first using regular print to ensure visibility + print(f"\n{title}") + + try: + # Build the Rich Tree structure + tree = self._build_rich_tree(swarm) + + if not tree: + print("[yellow]No tree structure to display[/yellow]") + return + + # Try to print using Rich console + try: + self.console.print(tree) + except Exception: + # If Rich fails, fall back to simple text representation + self._print_simple_tree(swarm) + except Exception as e: + # Fallback to basic printing if Rich Tree fails + print(f"Warning: Could not render swarm structure: {e}") + import traceback + traceback.print_exc() + # Try simple fallback + self._print_simple_tree(swarm) + + def _print_simple_tree(self, swarm: Any, prefix: str = "", is_last: bool = True) -> None: + """ + Print a simple text-based tree representation as fallback. + + Args: + swarm (Any): The swarm object to print. + prefix (str): Current prefix for indentation. + is_last (bool): Whether this is the last item at its level. + """ + swarm_name = self._get_swarm_name(swarm) + swarm_type = self._get_swarm_type(swarm) + connector = "└─ " if is_last else "├─ " + print(f"{prefix}{connector}{swarm_name} ({swarm_type})") + + # Get agents + agents = [] + if hasattr(swarm, "agents"): + agents = swarm.agents if isinstance(swarm.agents, list) else [] + elif hasattr(swarm, "create_agents"): + try: + agents_dict = swarm.create_agents() + if isinstance(agents_dict, dict): + agents = list(agents_dict.values()) + elif isinstance(agents_dict, list): + agents = agents_dict + except Exception: + pass + + if not agents: + return + + # Separate swarms from leaf agents + swarm_agents = [a for a in agents if self._is_swarm(a)] + leaf_agents = [a for a in agents if not self._is_swarm(a)] + + # Print swarm agents + extension = " " if is_last else "│ " + new_prefix = prefix + extension + for i, agent in enumerate(swarm_agents): + is_last_agent = (i == len(swarm_agents) - 1) and len(leaf_agents) == 0 + self._print_simple_tree(agent, new_prefix, is_last_agent) + + # Print leaf agents individually + for i, agent in enumerate(leaf_agents): + is_last_agent = (i == len(leaf_agents) - 1) and len(swarm_agents) == 0 + agent_name = getattr(agent, "agent_name", getattr(agent, "name", "Unknown")) + # Prefer underlying swarm/agent type if wrapped by an adapter + agent_type = type(getattr(agent, "swarm")).__name__ if hasattr(agent, "swarm") else type(agent).__name__ + connector = "└─ " if is_last_agent else "├─ " + print(f"{new_prefix}{connector}{agent_name} ({agent_type})") + # Global formatter instance with markdown output enabled by default formatter = Formatter(md=False) + +# Internal helpers to avoid duplicate/child prints when nested BaseSwarm +# instances are created during a top-level swarm initialization. +# We accumulate requested-print instances and only render once when the +# outermost initializer finishes. +_swarm_init_depth = 0 +_pending_swarm_prints: List[Any] = [] + + +# Note: monkeypatch removed — buffering is handled directly in +# `swarms.structs.various_alt_swarms.BaseSwarm.__init__` for explicitness. + + +def enable_swarm_structure_visualization(obj: Any, show: bool = True) -> None: + """ + Backwards-compatible public helper to trigger swarm structure visualization. + + Keeps the API stable: callers can import this name from + `swarms.utils.formatter` (used in examples) and it will invoke the + formatter's visualization. + """ + if show and obj is not None: + try: + formatter.print_swarm_structure(obj) + except Exception: + # Best-effort only + pass