pull/1182/merge
CI-DEV 5 days ago committed by GitHub
commit d02546ec7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -3,6 +3,7 @@ import json
import logging import logging
import os import os
import random import random
import sys
import threading import threading
import time import time
import traceback import traceback
@ -106,6 +107,70 @@ from swarms.utils.swarms_marketplace_utils import (
) )
# REACT workflow tools for auto mode
REACT_ACTION_TOOL = {
"type": "function",
"function": {
"name": "action",
"description": "Execute an action or use a tool to make progress on the current subtask. Use this when you need to perform a specific action to advance toward completing the subtask.",
"parameters": {
"type": "object",
"properties": {
"action_description": {
"type": "string",
"description": "A clear description of the action being taken",
},
"action_result": {
"type": "string",
"description": "The result or outcome of the action",
},
},
"required": ["action_description", "action_result"],
},
},
}
REACT_SUBTASK_COMPLETE_TOOL = {
"type": "function",
"function": {
"name": "subtask_complete",
"description": "Mark the current subtask as complete. Use this when you have successfully completed a subtask from your plan and are ready to move to the next one.",
"parameters": {
"type": "object",
"properties": {
"subtask_number": {
"type": "integer",
"description": "The number of the subtask that was completed (1-indexed)",
},
"subtask_summary": {
"type": "string",
"description": "A brief summary of what was accomplished in this subtask",
},
},
"required": ["subtask_number", "subtask_summary"],
},
},
}
REACT_OBJECTIVE_COMPLETE_TOOL = {
"type": "function",
"function": {
"name": "objective_complete",
"description": "Mark the entire objective as complete. Use this ONLY when all subtasks have been completed and the overall objective is fully achieved.",
"parameters": {
"type": "object",
"properties": {
"final_summary": {
"type": "string",
"description": "A comprehensive summary of what was accomplished overall",
},
},
"required": ["final_summary"],
},
},
}
def stop_when_repeats(response: str) -> bool: def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response # Stop if the word stop appears in the response
return "stop" in response.lower() return "stop" in response.lower()
@ -1236,21 +1301,28 @@ class Agent:
if self.plan_enabled is True: if self.plan_enabled is True:
self.plan(task) self.plan(task)
# Autosave
if self.autosave:
log_agent_data(self.to_dict())
self.save()
# Use REACT workflow when max_loops is "auto"
if self.max_loops == "auto":
return self._run_react_workflow(
task=task,
img=img,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
# Set the loop count # Set the loop count
loop_count = 0 loop_count = 0
# Clear the short memory # Clear the short memory
response = None response = None
# Autosave while loop_count < self.max_loops:
if self.autosave:
log_agent_data(self.to_dict())
self.save()
while (
self.max_loops == "auto"
or loop_count < self.max_loops
):
loop_count += 1 loop_count += 1
# Handle RAG query every loop # Handle RAG query every loop
@ -1631,6 +1703,448 @@ class Agent:
) )
raise error raise error
def _create_react_plan(self, task: str) -> str:
"""
Create a detailed plan for REACT workflow execution.
Args:
task (str): The task to create a plan for
Returns:
str: The generated plan as a string
"""
try:
plan_prompt = f"""Create a detailed, step-by-step plan to complete the following task.
Break down the task into clear, actionable subtasks that can be completed sequentially.
Number each subtask starting from 1.
Task: {task}
Format your response as a numbered list of subtasks, where each subtask is a specific,
actionable step that moves toward completing the overall objective.
Example format:
1. [First subtask description]
2. [Second subtask description]
3. [Third subtask description]
..."""
plan = self.llm.run(task=plan_prompt)
self.short_memory.add(
role=self.agent_name,
content=f"Plan created:\n{plan}",
)
if self.print_on:
self.pretty_print(
f"Plan Created:\n{plan}", loop_count=0
)
return plan
except Exception as error:
logger.error(f"Failed to create REACT plan: {error}")
raise error
def _handle_react_tool_call(
self, response: Any, current_loop: int
) -> Tuple[bool, bool]:
"""
Handle REACT tool calls (action, subtask_complete, objective_complete).
Args:
response (Any): The response from the LLM that may contain tool calls
current_loop (int): Current loop count for logging
Returns:
Tuple[bool, bool]: (is_subtask_complete, is_objective_complete)
"""
is_subtask_complete = False
is_objective_complete = False
# Handle different response formats
tool_calls_list = []
# Case 1: Response is a list
if isinstance(response, list):
for item in response:
if isinstance(item, dict):
# Check for tool_calls in dict
if "tool_calls" in item:
tool_calls_list.extend(item["tool_calls"])
# Check if the item itself is a tool call
elif "function" in item:
tool_calls_list.append(item)
# Case 2: Response is a dict
elif isinstance(response, dict):
if "tool_calls" in response:
tool_calls_list = response["tool_calls"]
elif "choices" in response:
# OpenAI format
for choice in response.get("choices", []):
message = choice.get("message", {})
if "tool_calls" in message:
tool_calls_list.extend(message["tool_calls"])
elif "function" in response:
tool_calls_list.append(response)
# Case 3: Response has tool_calls attribute
elif hasattr(response, "tool_calls"):
tool_calls_list = response.tool_calls
# Process tool calls
for tool_call in tool_calls_list:
# Handle different tool call formats
if isinstance(tool_call, dict):
function_info = tool_call.get("function", {})
if not function_info:
function_info = tool_call
function_name = function_info.get("name", "")
arguments = function_info.get("arguments", "")
# Parse arguments if it's a string
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
logger.warning(
f"Failed to parse tool arguments: {arguments}"
)
continue
elif not isinstance(arguments, dict):
continue
# Handle REACT tools
if function_name == "action":
action_desc = arguments.get(
"action_description", ""
)
action_result = arguments.get("action_result", "")
action_msg = f"Action: {action_desc}\nResult: {action_result}"
self.short_memory.add(
role="Tool Executor", content=action_msg
)
if self.print_on:
self.pretty_print(action_msg, current_loop)
elif function_name == "subtask_complete":
subtask_num = arguments.get("subtask_number", 0)
subtask_summary = arguments.get(
"subtask_summary", ""
)
subtask_msg = f"Subtask {subtask_num} Complete: {subtask_summary}"
self.short_memory.add(
role=self.agent_name, content=subtask_msg
)
if self.print_on:
self.pretty_print(subtask_msg, current_loop)
is_subtask_complete = True
elif function_name == "objective_complete":
final_summary = arguments.get("final_summary", "")
objective_msg = f"Objective Complete!\nFinal Summary: {final_summary}"
self.short_memory.add(
role=self.agent_name, content=objective_msg
)
if self.print_on:
self.pretty_print(objective_msg, current_loop)
is_objective_complete = True
return is_subtask_complete, is_objective_complete
def _run_react_workflow(
self,
task: Optional[Union[str, Any]] = None,
img: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> Any:
"""
Execute REACT workflow for auto mode: Plan -> Think -> Act -> Observe loop.
This method implements a REACT-style workflow where:
1. A plan is created first
2. For each subtask, the agent loops through: Think -> Act -> Observe
3. Subtasks are marked complete when done
4. Objective is marked complete when all subtasks are done
Args:
task (Optional[Union[str, Any]]): The task to execute
img (Optional[str]): Optional image input
streaming_callback (Optional[Callable[[str], None]]): Streaming callback
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
Any: The final output from the agent
"""
try:
# Step 1: Create a plan
if self.print_on:
self.pretty_print(
"Starting REACT Workflow - Creating Plan...",
loop_count=0,
)
plan = self._create_react_plan(task)
# Add REACT system prompt if not already added
if not self.react_on:
react_prompt = (
REACT_SYS_PROMPT
+ """
You are now in REACT workflow mode. Follow this process:
1. Review the plan and current subtask
2. Think about what needs to be done
3. Use the 'action' tool to execute actions
4. Use 'subtask_complete' when a subtask is finished
5. Use 'objective_complete' when all subtasks are done
Available tools:
- action: Execute an action for the current subtask
- subtask_complete: Mark current subtask as complete
- objective_complete: Mark entire objective as complete (use only when ALL subtasks are done)
"""
)
self.short_memory.add(
role="system", content=react_prompt
)
# Prepare REACT tools list
react_tools = [
REACT_ACTION_TOOL,
REACT_SUBTASK_COMPLETE_TOOL,
REACT_OBJECTIVE_COMPLETE_TOOL,
]
# Add user's custom tools if they exist
if self.tools_list_dictionary:
react_tools.extend(self.tools_list_dictionary)
# Create a temporary LLM with REACT tools
original_tools = self.tools_list_dictionary
self.tools_list_dictionary = react_tools
temp_llm = self.llm_handling()
# Set context_length to infinite for auto mode
original_context_length = self.context_length
# Use a very large number instead of float('inf') to avoid type issues
infinite_context = sys.maxsize # Largest integer value
self.context_length = infinite_context
# Also update the conversation's context_length
self.short_memory.context_length = infinite_context
# Track completed subtasks
completed_subtasks = set()
loop_count = 0
max_iterations = 100 # Safety limit
objective_complete = False
while (
not objective_complete and loop_count < max_iterations
):
loop_count += 1
# Build context for current iteration
context_prompt = f"""You are working on the following task: {task}
Plan:
{plan}
Completed subtasks: {', '.join(map(str, sorted(completed_subtasks))) if completed_subtasks else 'None'}
Current status: Working through the plan. Think about what needs to be done next, then take action using the available tools.
Remember to use 'subtask_complete' when you finish a subtask, and 'objective_complete' when all subtasks are done."""
self.short_memory.add(
role=self.user_name, content=context_prompt
)
# Get task prompt
if self.transforms is not None:
task_prompt = handle_transforms(
transforms=self.transforms,
short_memory=self.short_memory,
model_name=self.model_name,
)
else:
task_prompt = (
self.short_memory.return_history_as_string()
)
# Call LLM with REACT tools
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if img is not None:
response = temp_llm.run(
task=task_prompt,
img=img,
*args,
**kwargs,
)
else:
response = temp_llm.run(
task=task_prompt, *args, **kwargs
)
# Parse response
response = self.parse_llm_output(response)
# Add response to memory
self.short_memory.add(
role=self.agent_name, content=response
)
# Print response
if self.print_on:
if isinstance(response, list):
self.pretty_print(
f"[REACT Loop {loop_count}] [Time: {time.strftime('%H:%M:%S')}]\n\n{json.dumps(response, indent=4)}",
loop_count,
)
else:
self.pretty_print(
f"[REACT Loop {loop_count}]\n{response}",
loop_count,
)
# Handle REACT tool calls
is_subtask_complete, is_obj_complete = (
self._handle_react_tool_call(
response, loop_count
)
)
if is_subtask_complete:
# Extract subtask number from response if possible
# This is a fallback - ideally it comes from the tool call
for item in (
response
if isinstance(response, list)
else []
):
if isinstance(item, dict):
tool_calls = item.get(
"tool_calls", []
)
for tool_call in tool_calls:
if (
tool_call.get(
"function", {}
).get("name")
== "subtask_complete"
):
args = tool_call.get(
"function", {}
).get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(
args
)
except:
pass
if isinstance(args, dict):
subtask_num = args.get(
"subtask_number"
)
if subtask_num:
completed_subtasks.add(
subtask_num
)
if is_obj_complete:
objective_complete = True
break
# Also handle regular tools if they exist
if exists(self.tools):
self.tool_execution_retry(
response, loop_count
)
# Handle MCP tools
if (
exists(self.mcp_url)
or exists(self.mcp_config)
or exists(self.mcp_urls)
):
if response is not None:
self.mcp_tool_handling(
response=response,
current_loop=loop_count,
)
success = True
except Exception as e:
logger.error(
f"Error in REACT workflow loop {loop_count}: {e}"
)
attempt += 1
if attempt >= self.retry_attempts:
raise
if not success:
logger.error(
"Failed to generate response in REACT workflow"
)
break
# Check stopping conditions
if (
self.stopping_condition is not None
and self._check_stopping_condition(response)
):
logger.info(
"Stopping condition met in REACT workflow"
)
break
# Restore original tools and context_length
self.tools_list_dictionary = original_tools
self.context_length = original_context_length
self.short_memory.context_length = original_context_length
self.llm = self.llm_handling()
if self.print_on and objective_complete:
self.pretty_print(
"REACT Workflow Completed Successfully!",
loop_count,
)
# Return formatted output
return history_output_formatter(
self.short_memory, type=self.output_type
)
except Exception as error:
logger.error(f"Error in REACT workflow: {error}")
# Restore original tools and context_length on error
if hasattr(self, "tools_list_dictionary"):
self.tools_list_dictionary = original_tools
if hasattr(self, "context_length"):
self.context_length = original_context_length
if hasattr(self, "short_memory"):
self.short_memory.context_length = (
original_context_length
)
raise error
async def run_concurrent(self, task: str, *args, **kwargs): async def run_concurrent(self, task: str, *args, **kwargs):
""" """
Run a task concurrently. Run a task concurrently.

Loading…
Cancel
Save