From 1cc1eb406bb55e72e2bccf7e1fb608fa62d31fd2 Mon Sep 17 00:00:00 2001 From: CI-DEV <154627941+IlumCI@users.noreply.github.com> Date: Tue, 4 Nov 2025 18:00:43 +0200 Subject: [PATCH] Implement REACT workflow tools and update execution Added REACT workflow tools for automated task management, including functions for action execution, subtask completion, and objective completion. Updated the REACT workflow execution method to integrate these tools and handle responses accordingly. --- swarms/structs/agent.py | 519 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 510 insertions(+), 9 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 489e91f9..6617066a 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -3,6 +3,7 @@ import json import logging import os import random +import sys import threading import time import traceback @@ -103,6 +104,70 @@ from swarms.utils.output_types import OutputType from swarms.utils.pdf_to_text import pdf_to_text +# REACT workflow tools for auto mode +REACT_ACTION_TOOL = { + "type": "function", + "function": { + "name": "action", + "description": "Execute an action or use a tool to make progress on the current subtask. Use this when you need to perform a specific action to advance toward completing the subtask.", + "parameters": { + "type": "object", + "properties": { + "action_description": { + "type": "string", + "description": "A clear description of the action being taken", + }, + "action_result": { + "type": "string", + "description": "The result or outcome of the action", + }, + }, + "required": ["action_description", "action_result"], + }, + }, +} + +REACT_SUBTASK_COMPLETE_TOOL = { + "type": "function", + "function": { + "name": "subtask_complete", + "description": "Mark the current subtask as complete. Use this when you have successfully completed a subtask from your plan and are ready to move to the next one.", + "parameters": { + "type": "object", + "properties": { + "subtask_number": { + "type": "integer", + "description": "The number of the subtask that was completed (1-indexed)", + }, + "subtask_summary": { + "type": "string", + "description": "A brief summary of what was accomplished in this subtask", + }, + }, + "required": ["subtask_number", "subtask_summary"], + }, + }, +} + +REACT_OBJECTIVE_COMPLETE_TOOL = { + "type": "function", + "function": { + "name": "objective_complete", + "description": "Mark the entire objective as complete. Use this ONLY when all subtasks have been completed and the overall objective is fully achieved.", + "parameters": { + "type": "object", + "properties": { + "final_summary": { + "type": "string", + "description": "A comprehensive summary of what was accomplished overall", + }, + }, + "required": ["final_summary"], + }, + }, +} + + def stop_when_repeats(response: str) -> bool: # Stop if the word stop appears in the response return "stop" in response.lower() @@ -1204,21 +1269,28 @@ class Agent: if self.plan_enabled is True: self.plan(task) + # Autosave + if self.autosave: + log_agent_data(self.to_dict()) + self.save() + + # Use REACT workflow when max_loops is "auto" + if self.max_loops == "auto": + return self._run_react_workflow( + task=task, + img=img, + streaming_callback=streaming_callback, + *args, + **kwargs, + ) + # Set the loop count loop_count = 0 # Clear the short memory response = None - # Autosave - if self.autosave: - log_agent_data(self.to_dict()) - self.save() - - while ( - self.max_loops == "auto" - or loop_count < self.max_loops - ): + while loop_count < self.max_loops: loop_count += 1 # Handle RAG query every loop @@ -1597,6 +1669,435 @@ class Agent: ) raise error + def _create_react_plan(self, task: str) -> str: + """ + Create a detailed plan for REACT workflow execution. + + Args: + task (str): The task to create a plan for + + Returns: + str: The generated plan as a string + """ + try: + plan_prompt = f"""Create a detailed, step-by-step plan to complete the following task. + Break down the task into clear, actionable subtasks that can be completed sequentially. + Number each subtask starting from 1. + + Task: {task} + + Format your response as a numbered list of subtasks, where each subtask is a specific, + actionable step that moves toward completing the overall objective. + + Example format: + 1. [First subtask description] + 2. [Second subtask description] + 3. [Third subtask description] + ...""" + + plan = self.llm.run(task=plan_prompt) + self.short_memory.add( + role=self.agent_name, + content=f"Plan created:\n{plan}", + ) + + if self.print_on: + self.pretty_print( + f"Plan Created:\n{plan}", loop_count=0 + ) + + return plan + except Exception as error: + logger.error(f"Failed to create REACT plan: {error}") + raise error + + def _handle_react_tool_call( + self, response: Any, current_loop: int + ) -> Tuple[bool, bool]: + """ + Handle REACT tool calls (action, subtask_complete, objective_complete). + + Args: + response (Any): The response from the LLM that may contain tool calls + current_loop (int): Current loop count for logging + + Returns: + Tuple[bool, bool]: (is_subtask_complete, is_objective_complete) + """ + is_subtask_complete = False + is_objective_complete = False + + # Handle different response formats + tool_calls_list = [] + + # Case 1: Response is a list + if isinstance(response, list): + for item in response: + if isinstance(item, dict): + # Check for tool_calls in dict + if "tool_calls" in item: + tool_calls_list.extend(item["tool_calls"]) + # Check if the item itself is a tool call + elif "function" in item: + tool_calls_list.append(item) + + # Case 2: Response is a dict + elif isinstance(response, dict): + if "tool_calls" in response: + tool_calls_list = response["tool_calls"] + elif "choices" in response: + # OpenAI format + for choice in response.get("choices", []): + message = choice.get("message", {}) + if "tool_calls" in message: + tool_calls_list.extend(message["tool_calls"]) + elif "function" in response: + tool_calls_list.append(response) + + # Case 3: Response has tool_calls attribute + elif hasattr(response, "tool_calls"): + tool_calls_list = response.tool_calls + + # Process tool calls + for tool_call in tool_calls_list: + # Handle different tool call formats + if isinstance(tool_call, dict): + function_info = tool_call.get("function", {}) + if not function_info: + function_info = tool_call + + function_name = function_info.get("name", "") + arguments = function_info.get("arguments", "") + + # Parse arguments if it's a string + if isinstance(arguments, str): + try: + arguments = json.loads(arguments) + except json.JSONDecodeError: + logger.warning( + f"Failed to parse tool arguments: {arguments}" + ) + continue + elif not isinstance(arguments, dict): + continue + + # Handle REACT tools + if function_name == "action": + action_desc = arguments.get("action_description", "") + action_result = arguments.get("action_result", "") + action_msg = ( + f"Action: {action_desc}\nResult: {action_result}" + ) + + self.short_memory.add( + role="Tool Executor", content=action_msg + ) + + if self.print_on: + self.pretty_print(action_msg, current_loop) + + elif function_name == "subtask_complete": + subtask_num = arguments.get("subtask_number", 0) + subtask_summary = arguments.get("subtask_summary", "") + subtask_msg = ( + f"Subtask {subtask_num} Complete: {subtask_summary}" + ) + + self.short_memory.add( + role=self.agent_name, content=subtask_msg + ) + + if self.print_on: + self.pretty_print(subtask_msg, current_loop) + + is_subtask_complete = True + + elif function_name == "objective_complete": + final_summary = arguments.get("final_summary", "") + objective_msg = ( + f"Objective Complete!\nFinal Summary: {final_summary}" + ) + + self.short_memory.add( + role=self.agent_name, content=objective_msg + ) + + if self.print_on: + self.pretty_print(objective_msg, current_loop) + + is_objective_complete = True + + return is_subtask_complete, is_objective_complete + + def _run_react_workflow( + self, + task: Optional[Union[str, Any]] = None, + img: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + *args, + **kwargs, + ) -> Any: + """ + Execute REACT workflow for auto mode: Plan -> Think -> Act -> Observe loop. + + This method implements a REACT-style workflow where: + 1. A plan is created first + 2. For each subtask, the agent loops through: Think -> Act -> Observe + 3. Subtasks are marked complete when done + 4. Objective is marked complete when all subtasks are done + + Args: + task (Optional[Union[str, Any]]): The task to execute + img (Optional[str]): Optional image input + streaming_callback (Optional[Callable[[str], None]]): Streaming callback + *args: Additional positional arguments + **kwargs: Additional keyword arguments + + Returns: + Any: The final output from the agent + """ + try: + # Step 1: Create a plan + if self.print_on: + self.pretty_print( + "Starting REACT Workflow - Creating Plan...", + loop_count=0, + ) + + plan = self._create_react_plan(task) + + # Add REACT system prompt if not already added + if not self.react_on: + react_prompt = REACT_SYS_PROMPT + """ + +You are now in REACT workflow mode. Follow this process: +1. Review the plan and current subtask +2. Think about what needs to be done +3. Use the 'action' tool to execute actions +4. Use 'subtask_complete' when a subtask is finished +5. Use 'objective_complete' when all subtasks are done + +Available tools: +- action: Execute an action for the current subtask +- subtask_complete: Mark current subtask as complete +- objective_complete: Mark entire objective as complete (use only when ALL subtasks are done) +""" + self.short_memory.add( + role="system", content=react_prompt + ) + + # Prepare REACT tools list + react_tools = [ + REACT_ACTION_TOOL, + REACT_SUBTASK_COMPLETE_TOOL, + REACT_OBJECTIVE_COMPLETE_TOOL, + ] + + # Add user's custom tools if they exist + if self.tools_list_dictionary: + react_tools.extend(self.tools_list_dictionary) + + # Create a temporary LLM with REACT tools + original_tools = self.tools_list_dictionary + self.tools_list_dictionary = react_tools + temp_llm = self.llm_handling() + + # Set context_length to infinite for auto mode + original_context_length = self.context_length + # Use a very large number instead of float('inf') to avoid type issues + infinite_context = sys.maxsize # Largest integer value + self.context_length = infinite_context + # Also update the conversation's context_length + self.short_memory.context_length = infinite_context + + # Track completed subtasks + completed_subtasks = set() + loop_count = 0 + max_iterations = 100 # Safety limit + objective_complete = False + + while not objective_complete and loop_count < max_iterations: + loop_count += 1 + + # Build context for current iteration + context_prompt = f"""You are working on the following task: {task} + +Plan: +{plan} + +Completed subtasks: {', '.join(map(str, sorted(completed_subtasks))) if completed_subtasks else 'None'} + +Current status: Working through the plan. Think about what needs to be done next, then take action using the available tools. +Remember to use 'subtask_complete' when you finish a subtask, and 'objective_complete' when all subtasks are done.""" + + self.short_memory.add( + role=self.user_name, content=context_prompt + ) + + # Get task prompt + if self.transforms is not None: + task_prompt = handle_transforms( + transforms=self.transforms, + short_memory=self.short_memory, + model_name=self.model_name, + ) + else: + task_prompt = ( + self.short_memory.return_history_as_string() + ) + + # Call LLM with REACT tools + attempt = 0 + success = False + + while attempt < self.retry_attempts and not success: + try: + if img is not None: + response = temp_llm.run( + task=task_prompt, + img=img, + *args, + **kwargs, + ) + else: + response = temp_llm.run( + task=task_prompt, *args, **kwargs + ) + + # Parse response + response = self.parse_llm_output(response) + + # Add response to memory + self.short_memory.add( + role=self.agent_name, content=response + ) + + # Print response + if self.print_on: + if isinstance(response, list): + self.pretty_print( + f"[REACT Loop {loop_count}] [Time: {time.strftime('%H:%M:%S')}]\n\n{json.dumps(response, indent=4)}", + loop_count, + ) + else: + self.pretty_print( + f"[REACT Loop {loop_count}]\n{response}", + loop_count, + ) + + # Handle REACT tool calls + is_subtask_complete, is_obj_complete = ( + self._handle_react_tool_call( + response, loop_count + ) + ) + + if is_subtask_complete: + # Extract subtask number from response if possible + # This is a fallback - ideally it comes from the tool call + for item in ( + response if isinstance(response, list) else [] + ): + if isinstance(item, dict): + tool_calls = item.get("tool_calls", []) + for tool_call in tool_calls: + if ( + tool_call.get("function", {}).get( + "name" + ) + == "subtask_complete" + ): + args = tool_call.get( + "function", {} + ).get("arguments", {}) + if isinstance(args, str): + try: + args = json.loads(args) + except: + pass + if isinstance(args, dict): + subtask_num = args.get( + "subtask_number" + ) + if subtask_num: + completed_subtasks.add( + subtask_num + ) + + if is_obj_complete: + objective_complete = True + break + + # Also handle regular tools if they exist + if exists(self.tools): + self.tool_execution_retry( + response, loop_count + ) + + # Handle MCP tools + if ( + exists(self.mcp_url) + or exists(self.mcp_config) + or exists(self.mcp_urls) + ): + if response is not None: + self.mcp_tool_handling( + response=response, + current_loop=loop_count, + ) + + success = True + + except Exception as e: + logger.error( + f"Error in REACT workflow loop {loop_count}: {e}" + ) + attempt += 1 + if attempt >= self.retry_attempts: + raise + + if not success: + logger.error( + "Failed to generate response in REACT workflow" + ) + break + + # Check stopping conditions + if ( + self.stopping_condition is not None + and self._check_stopping_condition(response) + ): + logger.info("Stopping condition met in REACT workflow") + break + + # Restore original tools and context_length + self.tools_list_dictionary = original_tools + self.context_length = original_context_length + self.short_memory.context_length = original_context_length + self.llm = self.llm_handling() + + if self.print_on and objective_complete: + self.pretty_print( + "REACT Workflow Completed Successfully!", + loop_count, + ) + + # Return formatted output + return history_output_formatter( + self.short_memory, type=self.output_type + ) + + except Exception as error: + logger.error(f"Error in REACT workflow: {error}") + # Restore original tools and context_length on error + if hasattr(self, "tools_list_dictionary"): + self.tools_list_dictionary = original_tools + if hasattr(self, "context_length"): + self.context_length = original_context_length + if hasattr(self, "short_memory"): + self.short_memory.context_length = original_context_length + raise error + async def run_concurrent(self, task: str, *args, **kwargs): """ Run a task concurrently.