Implement REACT workflow tools and update execution

Added REACT workflow tools for automated task management, including functions for action execution, subtask completion, and objective completion. Updated the REACT workflow execution method to integrate these tools and handle responses accordingly.
pull/1182/head
CI-DEV 2 months ago committed by GitHub
parent 638e9e2ba2
commit 1cc1eb406b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -3,6 +3,7 @@ import json
import logging
import os
import random
import sys
import threading
import time
import traceback
@ -103,6 +104,70 @@ from swarms.utils.output_types import OutputType
from swarms.utils.pdf_to_text import pdf_to_text
# REACT workflow tools for auto mode
REACT_ACTION_TOOL = {
"type": "function",
"function": {
"name": "action",
"description": "Execute an action or use a tool to make progress on the current subtask. Use this when you need to perform a specific action to advance toward completing the subtask.",
"parameters": {
"type": "object",
"properties": {
"action_description": {
"type": "string",
"description": "A clear description of the action being taken",
},
"action_result": {
"type": "string",
"description": "The result or outcome of the action",
},
},
"required": ["action_description", "action_result"],
},
},
}
REACT_SUBTASK_COMPLETE_TOOL = {
"type": "function",
"function": {
"name": "subtask_complete",
"description": "Mark the current subtask as complete. Use this when you have successfully completed a subtask from your plan and are ready to move to the next one.",
"parameters": {
"type": "object",
"properties": {
"subtask_number": {
"type": "integer",
"description": "The number of the subtask that was completed (1-indexed)",
},
"subtask_summary": {
"type": "string",
"description": "A brief summary of what was accomplished in this subtask",
},
},
"required": ["subtask_number", "subtask_summary"],
},
},
}
REACT_OBJECTIVE_COMPLETE_TOOL = {
"type": "function",
"function": {
"name": "objective_complete",
"description": "Mark the entire objective as complete. Use this ONLY when all subtasks have been completed and the overall objective is fully achieved.",
"parameters": {
"type": "object",
"properties": {
"final_summary": {
"type": "string",
"description": "A comprehensive summary of what was accomplished overall",
},
},
"required": ["final_summary"],
},
},
}
def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response
return "stop" in response.lower()
@ -1204,21 +1269,28 @@ class Agent:
if self.plan_enabled is True:
self.plan(task)
# Autosave
if self.autosave:
log_agent_data(self.to_dict())
self.save()
# Use REACT workflow when max_loops is "auto"
if self.max_loops == "auto":
return self._run_react_workflow(
task=task,
img=img,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
# Set the loop count
loop_count = 0
# Clear the short memory
response = None
# Autosave
if self.autosave:
log_agent_data(self.to_dict())
self.save()
while (
self.max_loops == "auto"
or loop_count < self.max_loops
):
while loop_count < self.max_loops:
loop_count += 1
# Handle RAG query every loop
@ -1597,6 +1669,435 @@ class Agent:
)
raise error
def _create_react_plan(self, task: str) -> str:
"""
Create a detailed plan for REACT workflow execution.
Args:
task (str): The task to create a plan for
Returns:
str: The generated plan as a string
"""
try:
plan_prompt = f"""Create a detailed, step-by-step plan to complete the following task.
Break down the task into clear, actionable subtasks that can be completed sequentially.
Number each subtask starting from 1.
Task: {task}
Format your response as a numbered list of subtasks, where each subtask is a specific,
actionable step that moves toward completing the overall objective.
Example format:
1. [First subtask description]
2. [Second subtask description]
3. [Third subtask description]
..."""
plan = self.llm.run(task=plan_prompt)
self.short_memory.add(
role=self.agent_name,
content=f"Plan created:\n{plan}",
)
if self.print_on:
self.pretty_print(
f"Plan Created:\n{plan}", loop_count=0
)
return plan
except Exception as error:
logger.error(f"Failed to create REACT plan: {error}")
raise error
def _handle_react_tool_call(
self, response: Any, current_loop: int
) -> Tuple[bool, bool]:
"""
Handle REACT tool calls (action, subtask_complete, objective_complete).
Args:
response (Any): The response from the LLM that may contain tool calls
current_loop (int): Current loop count for logging
Returns:
Tuple[bool, bool]: (is_subtask_complete, is_objective_complete)
"""
is_subtask_complete = False
is_objective_complete = False
# Handle different response formats
tool_calls_list = []
# Case 1: Response is a list
if isinstance(response, list):
for item in response:
if isinstance(item, dict):
# Check for tool_calls in dict
if "tool_calls" in item:
tool_calls_list.extend(item["tool_calls"])
# Check if the item itself is a tool call
elif "function" in item:
tool_calls_list.append(item)
# Case 2: Response is a dict
elif isinstance(response, dict):
if "tool_calls" in response:
tool_calls_list = response["tool_calls"]
elif "choices" in response:
# OpenAI format
for choice in response.get("choices", []):
message = choice.get("message", {})
if "tool_calls" in message:
tool_calls_list.extend(message["tool_calls"])
elif "function" in response:
tool_calls_list.append(response)
# Case 3: Response has tool_calls attribute
elif hasattr(response, "tool_calls"):
tool_calls_list = response.tool_calls
# Process tool calls
for tool_call in tool_calls_list:
# Handle different tool call formats
if isinstance(tool_call, dict):
function_info = tool_call.get("function", {})
if not function_info:
function_info = tool_call
function_name = function_info.get("name", "")
arguments = function_info.get("arguments", "")
# Parse arguments if it's a string
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
logger.warning(
f"Failed to parse tool arguments: {arguments}"
)
continue
elif not isinstance(arguments, dict):
continue
# Handle REACT tools
if function_name == "action":
action_desc = arguments.get("action_description", "")
action_result = arguments.get("action_result", "")
action_msg = (
f"Action: {action_desc}\nResult: {action_result}"
)
self.short_memory.add(
role="Tool Executor", content=action_msg
)
if self.print_on:
self.pretty_print(action_msg, current_loop)
elif function_name == "subtask_complete":
subtask_num = arguments.get("subtask_number", 0)
subtask_summary = arguments.get("subtask_summary", "")
subtask_msg = (
f"Subtask {subtask_num} Complete: {subtask_summary}"
)
self.short_memory.add(
role=self.agent_name, content=subtask_msg
)
if self.print_on:
self.pretty_print(subtask_msg, current_loop)
is_subtask_complete = True
elif function_name == "objective_complete":
final_summary = arguments.get("final_summary", "")
objective_msg = (
f"Objective Complete!\nFinal Summary: {final_summary}"
)
self.short_memory.add(
role=self.agent_name, content=objective_msg
)
if self.print_on:
self.pretty_print(objective_msg, current_loop)
is_objective_complete = True
return is_subtask_complete, is_objective_complete
def _run_react_workflow(
self,
task: Optional[Union[str, Any]] = None,
img: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> Any:
"""
Execute REACT workflow for auto mode: Plan -> Think -> Act -> Observe loop.
This method implements a REACT-style workflow where:
1. A plan is created first
2. For each subtask, the agent loops through: Think -> Act -> Observe
3. Subtasks are marked complete when done
4. Objective is marked complete when all subtasks are done
Args:
task (Optional[Union[str, Any]]): The task to execute
img (Optional[str]): Optional image input
streaming_callback (Optional[Callable[[str], None]]): Streaming callback
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
Any: The final output from the agent
"""
try:
# Step 1: Create a plan
if self.print_on:
self.pretty_print(
"Starting REACT Workflow - Creating Plan...",
loop_count=0,
)
plan = self._create_react_plan(task)
# Add REACT system prompt if not already added
if not self.react_on:
react_prompt = REACT_SYS_PROMPT + """
You are now in REACT workflow mode. Follow this process:
1. Review the plan and current subtask
2. Think about what needs to be done
3. Use the 'action' tool to execute actions
4. Use 'subtask_complete' when a subtask is finished
5. Use 'objective_complete' when all subtasks are done
Available tools:
- action: Execute an action for the current subtask
- subtask_complete: Mark current subtask as complete
- objective_complete: Mark entire objective as complete (use only when ALL subtasks are done)
"""
self.short_memory.add(
role="system", content=react_prompt
)
# Prepare REACT tools list
react_tools = [
REACT_ACTION_TOOL,
REACT_SUBTASK_COMPLETE_TOOL,
REACT_OBJECTIVE_COMPLETE_TOOL,
]
# Add user's custom tools if they exist
if self.tools_list_dictionary:
react_tools.extend(self.tools_list_dictionary)
# Create a temporary LLM with REACT tools
original_tools = self.tools_list_dictionary
self.tools_list_dictionary = react_tools
temp_llm = self.llm_handling()
# Set context_length to infinite for auto mode
original_context_length = self.context_length
# Use a very large number instead of float('inf') to avoid type issues
infinite_context = sys.maxsize # Largest integer value
self.context_length = infinite_context
# Also update the conversation's context_length
self.short_memory.context_length = infinite_context
# Track completed subtasks
completed_subtasks = set()
loop_count = 0
max_iterations = 100 # Safety limit
objective_complete = False
while not objective_complete and loop_count < max_iterations:
loop_count += 1
# Build context for current iteration
context_prompt = f"""You are working on the following task: {task}
Plan:
{plan}
Completed subtasks: {', '.join(map(str, sorted(completed_subtasks))) if completed_subtasks else 'None'}
Current status: Working through the plan. Think about what needs to be done next, then take action using the available tools.
Remember to use 'subtask_complete' when you finish a subtask, and 'objective_complete' when all subtasks are done."""
self.short_memory.add(
role=self.user_name, content=context_prompt
)
# Get task prompt
if self.transforms is not None:
task_prompt = handle_transforms(
transforms=self.transforms,
short_memory=self.short_memory,
model_name=self.model_name,
)
else:
task_prompt = (
self.short_memory.return_history_as_string()
)
# Call LLM with REACT tools
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if img is not None:
response = temp_llm.run(
task=task_prompt,
img=img,
*args,
**kwargs,
)
else:
response = temp_llm.run(
task=task_prompt, *args, **kwargs
)
# Parse response
response = self.parse_llm_output(response)
# Add response to memory
self.short_memory.add(
role=self.agent_name, content=response
)
# Print response
if self.print_on:
if isinstance(response, list):
self.pretty_print(
f"[REACT Loop {loop_count}] [Time: {time.strftime('%H:%M:%S')}]\n\n{json.dumps(response, indent=4)}",
loop_count,
)
else:
self.pretty_print(
f"[REACT Loop {loop_count}]\n{response}",
loop_count,
)
# Handle REACT tool calls
is_subtask_complete, is_obj_complete = (
self._handle_react_tool_call(
response, loop_count
)
)
if is_subtask_complete:
# Extract subtask number from response if possible
# This is a fallback - ideally it comes from the tool call
for item in (
response if isinstance(response, list) else []
):
if isinstance(item, dict):
tool_calls = item.get("tool_calls", [])
for tool_call in tool_calls:
if (
tool_call.get("function", {}).get(
"name"
)
== "subtask_complete"
):
args = tool_call.get(
"function", {}
).get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except:
pass
if isinstance(args, dict):
subtask_num = args.get(
"subtask_number"
)
if subtask_num:
completed_subtasks.add(
subtask_num
)
if is_obj_complete:
objective_complete = True
break
# Also handle regular tools if they exist
if exists(self.tools):
self.tool_execution_retry(
response, loop_count
)
# Handle MCP tools
if (
exists(self.mcp_url)
or exists(self.mcp_config)
or exists(self.mcp_urls)
):
if response is not None:
self.mcp_tool_handling(
response=response,
current_loop=loop_count,
)
success = True
except Exception as e:
logger.error(
f"Error in REACT workflow loop {loop_count}: {e}"
)
attempt += 1
if attempt >= self.retry_attempts:
raise
if not success:
logger.error(
"Failed to generate response in REACT workflow"
)
break
# Check stopping conditions
if (
self.stopping_condition is not None
and self._check_stopping_condition(response)
):
logger.info("Stopping condition met in REACT workflow")
break
# Restore original tools and context_length
self.tools_list_dictionary = original_tools
self.context_length = original_context_length
self.short_memory.context_length = original_context_length
self.llm = self.llm_handling()
if self.print_on and objective_complete:
self.pretty_print(
"REACT Workflow Completed Successfully!",
loop_count,
)
# Return formatted output
return history_output_formatter(
self.short_memory, type=self.output_type
)
except Exception as error:
logger.error(f"Error in REACT workflow: {error}")
# Restore original tools and context_length on error
if hasattr(self, "tools_list_dictionary"):
self.tools_list_dictionary = original_tools
if hasattr(self, "context_length"):
self.context_length = original_context_length
if hasattr(self, "short_memory"):
self.short_memory.context_length = original_context_length
raise error
async def run_concurrent(self, task: str, *args, **kwargs):
"""
Run a task concurrently.

Loading…
Cancel
Save