enhance tool-call coordination and streaming behavior in Agent class

pull/938/head
harshalmore31 3 weeks ago
parent ab7781b100
commit 5029e7644b

@ -574,6 +574,16 @@ class Agent:
self.summarize_multiple_images = summarize_multiple_images self.summarize_multiple_images = summarize_multiple_images
self.tool_retry_attempts = tool_retry_attempts self.tool_retry_attempts = tool_retry_attempts
# Streaming / tool-call coordination flags
# When a tool call is expected we temporarily disable streaming so the
# LLM returns a complete JSON payload that can be parsed reliably. After
# the first tool call has been executed we re-enable streaming for
# subsequent requests / summaries.
self.expecting_tool_call: bool = False
self.tool_call_completed: bool = False
self.original_streaming_state: bool = self.streaming_on
self.should_stream_after_tools: bool = False
# self.short_memory = self.short_memory_init() # self.short_memory = self.short_memory_init()
# Initialize the feedback # Initialize the feedback
@ -1057,6 +1067,14 @@ class Agent:
self.short_memory.return_history_as_string() self.short_memory.return_history_as_string()
) )
# Determine if this request is primarily to obtain the first tool call
if self.streaming_on and exists(self.tools) and not self.tool_call_completed:
# Disable streaming for this request so we can reliably parse JSON
self.expecting_tool_call = True
self.should_stream_after_tools = True
else:
self.expecting_tool_call = False
# Parameters # Parameters
attempt = 0 attempt = 0
success = False success = False
@ -1118,10 +1136,16 @@ class Agent:
# Check and execute callable tools # Check and execute callable tools
if exists(self.tools): if exists(self.tools):
# Use standard tool execution for both streaming and non-streaming
self.tool_execution_retry( self.tool_execution_retry(
response, loop_count response, loop_count
) )
# Mark that at least one tool call has been processed
self.tool_call_completed = True
# Reset expecting_tool_call so subsequent requests can stream
self.expecting_tool_call = False
# Handle MCP tools # Handle MCP tools
if ( if (
exists(self.mcp_url) exists(self.mcp_url)
@ -2533,8 +2557,11 @@ class Agent:
del kwargs["is_last"] del kwargs["is_last"]
try: try:
# Set streaming parameter in LLM if streaming is enabled # Decide whether streaming should be used for this call
if self.streaming_on and hasattr(self.llm, "stream"): streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False)
# Set streaming parameter in LLM if streaming is enabled for this call
if streaming_enabled and hasattr(self.llm, "stream"):
original_stream = self.llm.stream original_stream = self.llm.stream
self.llm.stream = True self.llm.stream = True
@ -2547,7 +2574,7 @@ class Agent:
task=task, *args, **kwargs task=task, *args, **kwargs
) )
# If we get a streaming response, handle it with the new streaming panel # If we get a streaming response, handle it with the streaming panel
if hasattr( if hasattr(
streaming_response, "__iter__" streaming_response, "__iter__"
) and not isinstance(streaming_response, str): ) and not isinstance(streaming_response, str):
@ -2566,26 +2593,12 @@ class Agent:
chunks.append(content) chunks.append(content)
complete_response = "".join(chunks) complete_response = "".join(chunks)
else: else:
# Collect chunks for conversation saving # Use the streaming panel to display the response
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
# Optional: Save each chunk to conversation in real-time
# This creates a more detailed conversation history
if self.verbose:
logger.debug(
f"Streaming chunk received: {chunk[:50]}..."
)
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel( complete_response = formatter.print_streaming_panel(
streaming_response, streaming_response,
title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}", title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}",
style=None, # Use random color like non-streaming approach style=None,
collect_chunks=True, collect_chunks=True,
on_chunk_callback=on_chunk_received,
) )
# Restore original stream setting # Restore original stream setting
@ -2990,12 +3003,15 @@ class Agent:
raise e raise e
def temp_llm_instance_for_tool_summary(self): def temp_llm_instance_for_tool_summary(self):
# Enable streaming for tool summary if original streaming was enabled and we should stream after tools
should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False)
return LiteLLM( return LiteLLM(
model_name=self.model_name, model_name=self.model_name,
temperature=self.temperature, temperature=self.temperature,
max_tokens=self.max_tokens, max_tokens=self.max_tokens,
system_prompt=self.system_prompt, system_prompt=self.system_prompt,
stream=False, # Always disable streaming for tool summaries stream=should_stream, # Enable streaming for tool summaries if conditions are met
tools_list_dictionary=None, tools_list_dictionary=None,
parallel_tool_calls=False, parallel_tool_calls=False,
base_url=self.llm_base_url, base_url=self.llm_base_url,
@ -3042,8 +3058,7 @@ class Agent:
if self.tool_call_summary is True: if self.tool_call_summary is True:
temp_llm = self.temp_llm_instance_for_tool_summary() temp_llm = self.temp_llm_instance_for_tool_summary()
tool_response = temp_llm.run( tool_prompt = f"""
f"""
Please analyze and summarize the following tool execution output in a clear and concise way. Please analyze and summarize the following tool execution output in a clear and concise way.
Focus on the key information and insights that would be most relevant to the user's original request. Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently. If there are any errors or issues, highlight them prominently.
@ -3051,19 +3066,37 @@ class Agent:
Tool Output: Tool Output:
{output} {output}
""" """
)
# Check if we should stream the tool summary
should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False)
if should_stream and self.print_on:
# Handle streaming response with streaming panel
streaming_response = temp_llm.run(tool_prompt)
if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str):
# Use streaming panel directly
tool_response = formatter.print_streaming_panel(
streaming_response,
title=f"🤖 Agent: {self.agent_name} Tool Summary",
style=None,
collect_chunks=True,
)
else:
# Fallback for non-streaming response
tool_response = streaming_response
self.pretty_print(tool_response, loop_count)
else:
# Non-streaming response
tool_response = temp_llm.run(tool_prompt)
if self.print_on:
self.pretty_print(tool_response, loop_count)
self.short_memory.add( self.short_memory.add(
role=self.agent_name, role=self.agent_name,
content=tool_response, content=tool_response,
) )
if self.print_on is True:
self.pretty_print(
tool_response,
loop_count,
)
def list_output_types(self): def list_output_types(self):
return OutputType return OutputType

Loading…
Cancel
Save