diff --git a/README.md b/README.md index 5a68c584..05b0cbdb 100644 --- a/README.md +++ b/README.md @@ -1178,35 +1178,10 @@ Coming soon... ----------------- -## `GraphSwarm` +## `GraphWorkflow` -GraphSwarm is a workflow management system using a directed acyclic graph (DAG) to orchestrate complex tasks. Nodes (agents or tasks) and edges define dependencies, with agents executing tasks concurrently. It features entry/end points, visualization for debugging, and scalability for dynamic task assignment. Benefits include concurrency, flexibility, scalability, and clear workflow visualization. [Learn more:](https://docs.swarms.world/en/latest/swarms/structs/graph_swarm/) - - -### Methods - -| Method | Description | Parameters | Return Value | -|--------|-------------|------------|--------------| -| `add_node` | Add a node to the graph | `node`: Node object | None | -| `add_edge` | Add an edge to the graph | `edge`: Edge object | None | -| `set_entry_points` | Set the entry points of the graph | `entry_points`: List of node IDs | None | -| `set_end_points` | Set the end points of the graph | `end_points`: List of node IDs | None | -| `visualize` | Generate a visual representation of the graph | None | String representation of the graph | -| `run` | Execute the workflow | None | Dictionary of execution results | - -### Inputs - -| Input | Type | Description | -|-------|------|-------------| -| `Node` | Object | Represents a node in the graph (agent or task) | -| `Edge` | Object | Represents an edge connecting two nodes | -| `entry_points` | List[str] | List of node IDs where the workflow starts | -| `end_points` | List[str] | List of node IDs where the workflow ends | - -### Output - -The `run` method returns a dictionary containing the execution results of all nodes in the graph. +GraphWorkflow is a workflow management system using a directed acyclic graph (DAG) to orchestrate complex tasks. Nodes (agents or tasks) and edges define dependencies, with agents executing tasks concurrently. It features entry/end points, visualization for debugging, and scalability for dynamic task assignment. Benefits include concurrency, flexibility, scalability, and clear workflow visualization. [Learn more:](https://docs.swarms.world/en/latest/swarms/structs/graph_swarm/) The `run` method returns a dictionary containing the execution results of all nodes in the graph. diff --git a/examples/graph_swarm_example.py b/examples/graph_swarm_example.py deleted file mode 100644 index 5173a1a5..00000000 --- a/examples/graph_swarm_example.py +++ /dev/null @@ -1,56 +0,0 @@ -from loguru import logger -from swarms.structs.agent import Agent -from swarms.structs.graph_swarm import GraphSwarm - - -if __name__ == "__main__": - try: - # Create agents - data_collector = Agent( - agent_name="Market-Data-Collector", - model_name="openai/gpt-4o", - max_loops=1, - streaming_on=True, - ) - - trend_analyzer = Agent( - agent_name="Market-Trend-Analyzer", - model_name="openai/gpt-4o", - max_loops=1, - streaming_on=True, - ) - - report_generator = Agent( - agent_name="Investment-Report-Generator", - model_name="openai/gpt-4o", - max_loops=1, - streaming_on=True, - ) - - # Create swarm - swarm = GraphSwarm( - agents=[ - (data_collector, []), - (trend_analyzer, ["Market-Data-Collector"]), - (report_generator, ["Market-Trend-Analyzer"]), - ], - swarm_name="Market Analysis Intelligence Network", - ) - - # Run the swarm - result = swarm.run( - "Analyze current market trends for tech stocks and provide investment recommendations" - ) - - # Print results - print(f"Execution success: {result.success}") - print(f"Total time: {result.execution_time:.2f} seconds") - - for agent_name, output in result.outputs.items(): - print(f"\nAgent: {agent_name}") - print(f"Output: {output.output}") - if output.error: - print(f"Error: {output.error}") - except Exception as error: - logger.error(error) - raise error diff --git a/swarms/structs/graph_swarm.py b/swarms/structs/graph_swarm.py deleted file mode 100644 index 1bbc1673..00000000 --- a/swarms/structs/graph_swarm.py +++ /dev/null @@ -1,612 +0,0 @@ -import asyncio -import json -import time -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Tuple, Union - -import networkx as nx -from loguru import logger -from pydantic import BaseModel, Field - -from swarms.structs.agent import Agent -from swarms.utils.auto_download_check_packages import ( - auto_check_and_download_package, -) - - -class AgentOutput(BaseModel): - """Structured output from an agent.""" - - agent_name: str - timestamp: float = Field(default_factory=time.time) - output: Any - execution_time: float - error: Optional[str] = None - metadata: Dict = Field(default_factory=dict) - - -class SwarmOutput(BaseModel): - """Structured output from the entire swarm.""" - - timestamp: float = Field(default_factory=time.time) - outputs: Dict[str, AgentOutput] - execution_time: float - success: bool - error: Optional[str] = None - metadata: Dict = Field(default_factory=dict) - - -class SwarmMemory: - """Vector-based memory system for GraphSwarm using ChromaDB.""" - - def __init__(self, collection_name: str = "swarm_memories"): - """Initialize SwarmMemory with ChromaDB.""" - - try: - import chromadb - except ImportError: - auto_check_and_download_package( - "chromadb", package_manager="pip", upgrade=True - ) - import chromadb - - self.client = chromadb.Client() - - # Get or create collection - self.collection = self.client.get_or_create_collection( - name=collection_name, - metadata={"description": "GraphSwarm execution memories"}, - ) - - def store_execution(self, task: str, result: SwarmOutput): - """Store execution results in vector memory.""" - try: - # Create metadata - metadata = { - "timestamp": datetime.now().isoformat(), - "success": result.success, - "execution_time": result.execution_time, - "agent_sequence": json.dumps( - [name for name in result.outputs.keys()] - ), - "error": result.error if result.error else "", - } - - # Create document from outputs - document = { - "task": task, - "outputs": json.dumps( - { - name: { - "output": str(output.output), - "execution_time": output.execution_time, - "error": output.error, - } - for name, output in result.outputs.items() - } - ), - } - - # Store in ChromaDB - self.collection.add( - documents=[json.dumps(document)], - metadatas=[metadata], - ids=[f"exec_{datetime.now().timestamp()}"], - ) - - print("added to database") - - logger.info(f"Stored execution in memory: {task}") - - except Exception as e: - logger.error( - f"Failed to store execution in memory: {str(e)}" - ) - - def get_similar_executions(self, task: str, limit: int = 5): - """Retrieve similar past executions.""" - try: - # Query ChromaDB for similar executions - results = self.collection.query( - query_texts=[task], - n_results=limit, - include=["documents", "metadatas"], - ) - - print(results) - - if not results["documents"]: - return [] - - # Process results - executions = [] - for doc, metadata in zip( - results["documents"][0], results["metadatas"][0] - ): - doc_dict = json.loads(doc) - executions.append( - { - "task": doc_dict["task"], - "outputs": json.loads(doc_dict["outputs"]), - "success": metadata["success"], - "execution_time": metadata["execution_time"], - "agent_sequence": json.loads( - metadata["agent_sequence"] - ), - "timestamp": metadata["timestamp"], - } - ) - - return executions - - except Exception as e: - logger.error( - f"Failed to retrieve similar executions: {str(e)}" - ) - return [] - - def get_optimal_sequence(self, task: str) -> Optional[List[str]]: - """Get the most successful agent sequence for similar tasks.""" - similar_executions = self.get_similar_executions(task) - print(f"similar_executions {similar_executions}") - - if not similar_executions: - return None - - # Sort by success and execution time - successful_execs = [ - ex for ex in similar_executions if ex["success"] - ] - - if not successful_execs: - return None - - # Return sequence from most successful execution - return successful_execs[0]["agent_sequence"] - - def clear_memory(self): - """Clear all memories.""" - self.client.delete_collection(self.collection.name) - self.collection = self.client.get_or_create_collection( - name=self.collection.name - ) - - -class GraphSwarm: - """ - Enhanced framework for creating and managing swarms of collaborative agents. - """ - - def __init__( - self, - name: str = "graph-swarm-01", - description: str = "Graph swarm : build your own graph of agents", - agents: Union[ - List[Agent], List[Tuple[Agent, List[str]]], List[Callable] - ] = None, - max_workers: Optional[int] = None, - swarm_name: str = "Collaborative Agent Swarm", - memory_collection: str = "swarm_memory", - *args, - **kwargs, - ): - """Initialize GraphSwarm.""" - self.name = name - self.description = description - self.graph = nx.DiGraph() - self.agents: Dict[str, Agent] = {} - self.dependencies: Dict[str, List[str]] = {} - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.swarm_name = swarm_name - self.memory_collection = memory_collection - self.memory = SwarmMemory(collection_name=memory_collection) - - if agents: - self.initialize_agents(agents) - - logger.info(f"Initialized GraphSwarm: {swarm_name}") - - def initialize_agents( - self, - agents: Union[List[Agent], List[Tuple[Agent, List[str]]]], - ): - """Initialize agents and their dependencies.""" - try: - # Handle list of Agents or (Agent, dependencies) tuples - for item in agents: - if isinstance(item, tuple): - agent, dependencies = item - else: - agent, dependencies = item, [] - - if not isinstance(agent, Agent): - raise ValueError( - f"Expected Agent object, got {type(agent)}" - ) - - self.agents[agent.agent_name] = agent - self.dependencies[agent.agent_name] = dependencies - self.graph.add_node(agent.agent_name, agent=agent) - - # Add dependencies - for dep in dependencies: - if dep not in self.agents: - raise ValueError( - f"Dependency {dep} not found for agent {agent.agent_name}" - ) - self.graph.add_edge(dep, agent.agent_name) - - self._validate_graph() - - except Exception as e: - logger.error(f"Failed to initialize agents: {str(e)}") - raise - - def _validate_graph(self): - """Validate the agent dependency graph.""" - if not self.graph.nodes(): - raise ValueError("No agents added to swarm") - - if not nx.is_directed_acyclic_graph(self.graph): - cycles = list(nx.simple_cycles(self.graph)) - raise ValueError( - f"Agent dependency graph contains cycles: {cycles}" - ) - - def _get_agent_role_description(self, agent_name: str) -> str: - """Generate a description of the agent's role in the swarm.""" - predecessors = list(self.graph.predecessors(agent_name)) - successors = list(self.graph.successors(agent_name)) - position = ( - "initial" - if not predecessors - else ("final" if not successors else "intermediate") - ) - - role = f"""You are {agent_name}, a specialized agent in the {self.swarm_name}. - Position: {position} agent in the workflow - - Your relationships:""" - - if predecessors: - role += ( - f"\nYou receive input from: {', '.join(predecessors)}" - ) - if successors: - role += f"\nYour output will be used by: {', '.join(successors)}" - - return role - - def _generate_workflow_context(self) -> str: - """Generate a description of the entire workflow.""" - execution_order = list(nx.topological_sort(self.graph)) - - workflow = f"""Workflow Overview of {self.swarm_name}: - - Processing Order: - {' -> '.join(execution_order)} - - Agent Roles: - """ - - for agent_name in execution_order: - predecessors = list(self.graph.predecessors(agent_name)) - successors = list(self.graph.successors(agent_name)) - - workflow += f"\n\n{agent_name}:" - if predecessors: - workflow += ( - f"\n- Receives from: {', '.join(predecessors)}" - ) - if successors: - workflow += f"\n- Sends to: {', '.join(successors)}" - if not predecessors and not successors: - workflow += "\n- Independent agent" - - return workflow - - def _build_agent_prompt( - self, agent_name: str, task: str, context: Dict = None - ) -> str: - """Build a comprehensive prompt for the agent including role and context.""" - prompt_parts = [ - self._get_agent_role_description(agent_name), - "\nWorkflow Context:", - self._generate_workflow_context(), - "\nYour Task:", - task, - ] - - if context: - prompt_parts.extend( - ["\nContext from Previous Agents:", str(context)] - ) - - prompt_parts.extend( - [ - "\nInstructions:", - "1. Process the task according to your role", - "2. Consider the input from previous agents when available", - "3. Provide clear, structured output", - "4. Remember that your output will be used by subsequent agents", - "\nResponse Guidelines:", - "- Provide clear, well-organized output", - "- Include relevant details and insights", - "- Highlight key findings", - "- Flag any uncertainties or issues", - ] - ) - - return "\n".join(prompt_parts) - - async def _execute_agent( - self, agent_name: str, task: str, context: Dict = None - ) -> AgentOutput: - """Execute a single agent.""" - start_time = time.time() - agent = self.agents[agent_name] - - try: - # Build comprehensive prompt - full_prompt = self._build_agent_prompt( - agent_name, task, context - ) - logger.debug(f"Prompt for {agent_name}:\n{full_prompt}") - - # Execute agent - output = await asyncio.to_thread(agent.run, full_prompt) - - return AgentOutput( - agent_name=agent_name, - output=output, - execution_time=time.time() - start_time, - metadata={ - "task": task, - "context": context, - "position_in_workflow": list( - nx.topological_sort(self.graph) - ).index(agent_name), - }, - ) - - except Exception as e: - logger.error( - f"Error executing agent {agent_name}: {str(e)}" - ) - return AgentOutput( - agent_name=agent_name, - output=None, - execution_time=time.time() - start_time, - error=str(e), - metadata={"task": task}, - ) - - async def execute(self, task: str) -> SwarmOutput: - """ - Execute the entire swarm of agents with memory integration. - - Args: - task: Initial task to execute - - Returns: - SwarmOutput: Structured output from all agents - """ - start_time = time.time() - outputs = {} - success = True - error = None - - try: - # Get similar past executions - similar_executions = self.memory.get_similar_executions( - task, limit=3 - ) - optimal_sequence = self.memory.get_optimal_sequence(task) - - # Get base execution order - base_execution_order = list( - nx.topological_sort(self.graph) - ) - - # Determine final execution order - if optimal_sequence and all( - agent in base_execution_order - for agent in optimal_sequence - ): - logger.info( - f"Using optimal sequence from memory: {optimal_sequence}" - ) - execution_order = optimal_sequence - else: - execution_order = base_execution_order - - # Get historical context if available - historical_context = {} - if similar_executions: - best_execution = similar_executions[0] - if best_execution["success"]: - historical_context = { - "similar_task": best_execution["task"], - "previous_outputs": best_execution["outputs"], - "execution_time": best_execution[ - "execution_time" - ], - "success_patterns": self._extract_success_patterns( - similar_executions - ), - } - - # Execute agents in order - for agent_name in execution_order: - try: - # Get context from dependencies and history - agent_context = { - "dependencies": { - dep: outputs[dep].output - for dep in self.graph.predecessors( - agent_name - ) - if dep in outputs - }, - "historical": historical_context, - "position": execution_order.index(agent_name), - "total_agents": len(execution_order), - } - - # Execute agent with enhanced context - output = await self._execute_agent( - agent_name, task, agent_context - ) - outputs[agent_name] = output - - # Update historical context with current execution - if output.output: - historical_context.update( - { - f"current_{agent_name}_output": output.output - } - ) - - # Check for errors - if output.error: - success = False - error = f"Agent {agent_name} failed: {output.error}" - - # Try to recover using memory - if similar_executions: - recovery_output = self._attempt_recovery( - agent_name, task, similar_executions - ) - if recovery_output: - outputs[agent_name] = recovery_output - success = True - error = None - continue - break - - except Exception as agent_error: - logger.error( - f"Error executing agent {agent_name}: {str(agent_error)}" - ) - success = False - error = f"Agent {agent_name} failed: {str(agent_error)}" - break - - # Create result - result = SwarmOutput( - outputs=outputs, - execution_time=time.time() - start_time, - success=success, - error=error, - metadata={ - "task": task, - "used_optimal_sequence": optimal_sequence - is not None, - "similar_executions_found": len( - similar_executions - ), - "execution_order": execution_order, - "historical_context_used": bool( - historical_context - ), - }, - ) - - # Store execution in memory - await self._store_execution_async(task, result) - - return result - - except Exception as e: - logger.error(f"Swarm execution failed: {str(e)}") - return SwarmOutput( - outputs=outputs, - execution_time=time.time() - start_time, - success=False, - error=str(e), - metadata={"task": task}, - ) - - def run(self, task: str) -> SwarmOutput: - """Synchronous interface to execute the swarm.""" - return asyncio.run(self.execute(task)) - - def _extract_success_patterns( - self, similar_executions: List[Dict] - ) -> Dict: - """Extract success patterns from similar executions.""" - patterns = {} - successful_execs = [ - ex for ex in similar_executions if ex["success"] - ] - - if successful_execs: - patterns = { - "common_sequences": self._find_common_sequences( - successful_execs - ), - "avg_execution_time": sum( - ex["execution_time"] for ex in successful_execs - ) - / len(successful_execs), - "successful_strategies": self._extract_strategies( - successful_execs - ), - } - - return patterns - - def _attempt_recovery( - self, - failed_agent: str, - task: str, - similar_executions: List[Dict], - ) -> Optional[AgentOutput]: - """Attempt to recover from failure using memory.""" - for execution in similar_executions: - if ( - execution["success"] - and failed_agent in execution["outputs"] - ): - historical_output = execution["outputs"][failed_agent] - - return AgentOutput( - agent_name=failed_agent, - output=historical_output["output"], - execution_time=historical_output[ - "execution_time" - ], - metadata={ - "recovered_from_memory": True, - "original_task": execution["task"], - }, - ) - return None - - async def _store_execution_async( - self, task: str, result: SwarmOutput - ): - """Asynchronously store execution in memory.""" - try: - await asyncio.to_thread( - self.memory.store_execution, task, result - ) - except Exception as e: - logger.error( - f"Failed to store execution in memory: {str(e)}" - ) - - def add_agent(self, agent: Agent, dependencies: List[str] = None): - """Add a new agent to the swarm.""" - dependencies = dependencies or [] - self.agents[agent.agent_name] = agent - self.dependencies[agent.agent_name] = dependencies - self.graph.add_node(agent.agent_name, agent=agent) - - for dep in dependencies: - if dep not in self.agents: - raise ValueError(f"Dependency {dep} not found") - self.graph.add_edge(dep, agent.agent_name) - - self._validate_graph()