diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 3ffb39df..f0bf8ae3 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -3,6 +3,7 @@ import json import logging import os import random +import sys import threading import time import traceback @@ -106,6 +107,70 @@ from swarms.utils.swarms_marketplace_utils import ( ) +# REACT workflow tools for auto mode +REACT_ACTION_TOOL = { + "type": "function", + "function": { + "name": "action", + "description": "Execute an action or use a tool to make progress on the current subtask. Use this when you need to perform a specific action to advance toward completing the subtask.", + "parameters": { + "type": "object", + "properties": { + "action_description": { + "type": "string", + "description": "A clear description of the action being taken", + }, + "action_result": { + "type": "string", + "description": "The result or outcome of the action", + }, + }, + "required": ["action_description", "action_result"], + }, + }, +} + +REACT_SUBTASK_COMPLETE_TOOL = { + "type": "function", + "function": { + "name": "subtask_complete", + "description": "Mark the current subtask as complete. Use this when you have successfully completed a subtask from your plan and are ready to move to the next one.", + "parameters": { + "type": "object", + "properties": { + "subtask_number": { + "type": "integer", + "description": "The number of the subtask that was completed (1-indexed)", + }, + "subtask_summary": { + "type": "string", + "description": "A brief summary of what was accomplished in this subtask", + }, + }, + "required": ["subtask_number", "subtask_summary"], + }, + }, +} + +REACT_OBJECTIVE_COMPLETE_TOOL = { + "type": "function", + "function": { + "name": "objective_complete", + "description": "Mark the entire objective as complete. Use this ONLY when all subtasks have been completed and the overall objective is fully achieved.", + "parameters": { + "type": "object", + "properties": { + "final_summary": { + "type": "string", + "description": "A comprehensive summary of what was accomplished overall", + }, + }, + "required": ["final_summary"], + }, + }, +} + + def stop_when_repeats(response: str) -> bool: # Stop if the word stop appears in the response return "stop" in response.lower() @@ -1236,21 +1301,28 @@ class Agent: if self.plan_enabled is True: self.plan(task) + # Autosave + if self.autosave: + log_agent_data(self.to_dict()) + self.save() + + # Use REACT workflow when max_loops is "auto" + if self.max_loops == "auto": + return self._run_react_workflow( + task=task, + img=img, + streaming_callback=streaming_callback, + *args, + **kwargs, + ) + # Set the loop count loop_count = 0 # Clear the short memory response = None - # Autosave - if self.autosave: - log_agent_data(self.to_dict()) - self.save() - - while ( - self.max_loops == "auto" - or loop_count < self.max_loops - ): + while loop_count < self.max_loops: loop_count += 1 # Handle RAG query every loop @@ -1631,6 +1703,448 @@ class Agent: ) raise error + def _create_react_plan(self, task: str) -> str: + """ + Create a detailed plan for REACT workflow execution. + + Args: + task (str): The task to create a plan for + + Returns: + str: The generated plan as a string + """ + try: + plan_prompt = f"""Create a detailed, step-by-step plan to complete the following task. + Break down the task into clear, actionable subtasks that can be completed sequentially. + Number each subtask starting from 1. + + Task: {task} + + Format your response as a numbered list of subtasks, where each subtask is a specific, + actionable step that moves toward completing the overall objective. + + Example format: + 1. [First subtask description] + 2. [Second subtask description] + 3. [Third subtask description] + ...""" + + plan = self.llm.run(task=plan_prompt) + self.short_memory.add( + role=self.agent_name, + content=f"Plan created:\n{plan}", + ) + + if self.print_on: + self.pretty_print( + f"Plan Created:\n{plan}", loop_count=0 + ) + + return plan + except Exception as error: + logger.error(f"Failed to create REACT plan: {error}") + raise error + + def _handle_react_tool_call( + self, response: Any, current_loop: int + ) -> Tuple[bool, bool]: + """ + Handle REACT tool calls (action, subtask_complete, objective_complete). + + Args: + response (Any): The response from the LLM that may contain tool calls + current_loop (int): Current loop count for logging + + Returns: + Tuple[bool, bool]: (is_subtask_complete, is_objective_complete) + """ + is_subtask_complete = False + is_objective_complete = False + + # Handle different response formats + tool_calls_list = [] + + # Case 1: Response is a list + if isinstance(response, list): + for item in response: + if isinstance(item, dict): + # Check for tool_calls in dict + if "tool_calls" in item: + tool_calls_list.extend(item["tool_calls"]) + # Check if the item itself is a tool call + elif "function" in item: + tool_calls_list.append(item) + + # Case 2: Response is a dict + elif isinstance(response, dict): + if "tool_calls" in response: + tool_calls_list = response["tool_calls"] + elif "choices" in response: + # OpenAI format + for choice in response.get("choices", []): + message = choice.get("message", {}) + if "tool_calls" in message: + tool_calls_list.extend(message["tool_calls"]) + elif "function" in response: + tool_calls_list.append(response) + + # Case 3: Response has tool_calls attribute + elif hasattr(response, "tool_calls"): + tool_calls_list = response.tool_calls + + # Process tool calls + for tool_call in tool_calls_list: + # Handle different tool call formats + if isinstance(tool_call, dict): + function_info = tool_call.get("function", {}) + if not function_info: + function_info = tool_call + + function_name = function_info.get("name", "") + arguments = function_info.get("arguments", "") + + # Parse arguments if it's a string + if isinstance(arguments, str): + try: + arguments = json.loads(arguments) + except json.JSONDecodeError: + logger.warning( + f"Failed to parse tool arguments: {arguments}" + ) + continue + elif not isinstance(arguments, dict): + continue + + # Handle REACT tools + if function_name == "action": + action_desc = arguments.get( + "action_description", "" + ) + action_result = arguments.get("action_result", "") + action_msg = f"Action: {action_desc}\nResult: {action_result}" + + self.short_memory.add( + role="Tool Executor", content=action_msg + ) + + if self.print_on: + self.pretty_print(action_msg, current_loop) + + elif function_name == "subtask_complete": + subtask_num = arguments.get("subtask_number", 0) + subtask_summary = arguments.get( + "subtask_summary", "" + ) + subtask_msg = f"Subtask {subtask_num} Complete: {subtask_summary}" + + self.short_memory.add( + role=self.agent_name, content=subtask_msg + ) + + if self.print_on: + self.pretty_print(subtask_msg, current_loop) + + is_subtask_complete = True + + elif function_name == "objective_complete": + final_summary = arguments.get("final_summary", "") + objective_msg = f"Objective Complete!\nFinal Summary: {final_summary}" + + self.short_memory.add( + role=self.agent_name, content=objective_msg + ) + + if self.print_on: + self.pretty_print(objective_msg, current_loop) + + is_objective_complete = True + + return is_subtask_complete, is_objective_complete + + def _run_react_workflow( + self, + task: Optional[Union[str, Any]] = None, + img: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + *args, + **kwargs, + ) -> Any: + """ + Execute REACT workflow for auto mode: Plan -> Think -> Act -> Observe loop. + + This method implements a REACT-style workflow where: + 1. A plan is created first + 2. For each subtask, the agent loops through: Think -> Act -> Observe + 3. Subtasks are marked complete when done + 4. Objective is marked complete when all subtasks are done + + Args: + task (Optional[Union[str, Any]]): The task to execute + img (Optional[str]): Optional image input + streaming_callback (Optional[Callable[[str], None]]): Streaming callback + *args: Additional positional arguments + **kwargs: Additional keyword arguments + + Returns: + Any: The final output from the agent + """ + try: + # Step 1: Create a plan + if self.print_on: + self.pretty_print( + "Starting REACT Workflow - Creating Plan...", + loop_count=0, + ) + + plan = self._create_react_plan(task) + + # Add REACT system prompt if not already added + if not self.react_on: + react_prompt = ( + REACT_SYS_PROMPT + + """ + +You are now in REACT workflow mode. Follow this process: +1. Review the plan and current subtask +2. Think about what needs to be done +3. Use the 'action' tool to execute actions +4. Use 'subtask_complete' when a subtask is finished +5. Use 'objective_complete' when all subtasks are done + +Available tools: +- action: Execute an action for the current subtask +- subtask_complete: Mark current subtask as complete +- objective_complete: Mark entire objective as complete (use only when ALL subtasks are done) +""" + ) + self.short_memory.add( + role="system", content=react_prompt + ) + + # Prepare REACT tools list + react_tools = [ + REACT_ACTION_TOOL, + REACT_SUBTASK_COMPLETE_TOOL, + REACT_OBJECTIVE_COMPLETE_TOOL, + ] + + # Add user's custom tools if they exist + if self.tools_list_dictionary: + react_tools.extend(self.tools_list_dictionary) + + # Create a temporary LLM with REACT tools + original_tools = self.tools_list_dictionary + self.tools_list_dictionary = react_tools + temp_llm = self.llm_handling() + + # Set context_length to infinite for auto mode + original_context_length = self.context_length + # Use a very large number instead of float('inf') to avoid type issues + infinite_context = sys.maxsize # Largest integer value + self.context_length = infinite_context + # Also update the conversation's context_length + self.short_memory.context_length = infinite_context + + # Track completed subtasks + completed_subtasks = set() + loop_count = 0 + max_iterations = 100 # Safety limit + objective_complete = False + + while ( + not objective_complete and loop_count < max_iterations + ): + loop_count += 1 + + # Build context for current iteration + context_prompt = f"""You are working on the following task: {task} + +Plan: +{plan} + +Completed subtasks: {', '.join(map(str, sorted(completed_subtasks))) if completed_subtasks else 'None'} + +Current status: Working through the plan. Think about what needs to be done next, then take action using the available tools. +Remember to use 'subtask_complete' when you finish a subtask, and 'objective_complete' when all subtasks are done.""" + + self.short_memory.add( + role=self.user_name, content=context_prompt + ) + + # Get task prompt + if self.transforms is not None: + task_prompt = handle_transforms( + transforms=self.transforms, + short_memory=self.short_memory, + model_name=self.model_name, + ) + else: + task_prompt = ( + self.short_memory.return_history_as_string() + ) + + # Call LLM with REACT tools + attempt = 0 + success = False + + while attempt < self.retry_attempts and not success: + try: + if img is not None: + response = temp_llm.run( + task=task_prompt, + img=img, + *args, + **kwargs, + ) + else: + response = temp_llm.run( + task=task_prompt, *args, **kwargs + ) + + # Parse response + response = self.parse_llm_output(response) + + # Add response to memory + self.short_memory.add( + role=self.agent_name, content=response + ) + + # Print response + if self.print_on: + if isinstance(response, list): + self.pretty_print( + f"[REACT Loop {loop_count}] [Time: {time.strftime('%H:%M:%S')}]\n\n{json.dumps(response, indent=4)}", + loop_count, + ) + else: + self.pretty_print( + f"[REACT Loop {loop_count}]\n{response}", + loop_count, + ) + + # Handle REACT tool calls + is_subtask_complete, is_obj_complete = ( + self._handle_react_tool_call( + response, loop_count + ) + ) + + if is_subtask_complete: + # Extract subtask number from response if possible + # This is a fallback - ideally it comes from the tool call + for item in ( + response + if isinstance(response, list) + else [] + ): + if isinstance(item, dict): + tool_calls = item.get( + "tool_calls", [] + ) + for tool_call in tool_calls: + if ( + tool_call.get( + "function", {} + ).get("name") + == "subtask_complete" + ): + args = tool_call.get( + "function", {} + ).get("arguments", {}) + if isinstance(args, str): + try: + args = json.loads( + args + ) + except: + pass + if isinstance(args, dict): + subtask_num = args.get( + "subtask_number" + ) + if subtask_num: + completed_subtasks.add( + subtask_num + ) + + if is_obj_complete: + objective_complete = True + break + + # Also handle regular tools if they exist + if exists(self.tools): + self.tool_execution_retry( + response, loop_count + ) + + # Handle MCP tools + if ( + exists(self.mcp_url) + or exists(self.mcp_config) + or exists(self.mcp_urls) + ): + if response is not None: + self.mcp_tool_handling( + response=response, + current_loop=loop_count, + ) + + success = True + + except Exception as e: + logger.error( + f"Error in REACT workflow loop {loop_count}: {e}" + ) + attempt += 1 + if attempt >= self.retry_attempts: + raise + + if not success: + logger.error( + "Failed to generate response in REACT workflow" + ) + break + + # Check stopping conditions + if ( + self.stopping_condition is not None + and self._check_stopping_condition(response) + ): + logger.info( + "Stopping condition met in REACT workflow" + ) + break + + # Restore original tools and context_length + self.tools_list_dictionary = original_tools + self.context_length = original_context_length + self.short_memory.context_length = original_context_length + self.llm = self.llm_handling() + + if self.print_on and objective_complete: + self.pretty_print( + "REACT Workflow Completed Successfully!", + loop_count, + ) + + # Return formatted output + return history_output_formatter( + self.short_memory, type=self.output_type + ) + + except Exception as error: + logger.error(f"Error in REACT workflow: {error}") + # Restore original tools and context_length on error + if hasattr(self, "tools_list_dictionary"): + self.tools_list_dictionary = original_tools + if hasattr(self, "context_length"): + self.context_length = original_context_length + if hasattr(self, "short_memory"): + self.short_memory.context_length = ( + original_context_length + ) + raise error + async def run_concurrent(self, task: str, *args, **kwargs): """ Run a task concurrently.