pull/938/head
harshalmore31 2 months ago
parent c05acec895
commit 1b8d986a23

@ -1129,8 +1129,7 @@ class Agent:
self.short_memory.return_history_as_string() self.short_memory.return_history_as_string()
) )
# If streaming is OFF, use the simple, non-streaming path # Parameters
if not self.streaming_on:
attempt = 0 attempt = 0
success = False success = False
while attempt < self.retry_attempts and not success: while attempt < self.retry_attempts and not success:
@ -1153,6 +1152,65 @@ class Agent:
**kwargs, **kwargs,
) )
# Handle streaming response with tools
if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str):
# Parse streaming chunks with tools using wrapper
if hasattr(self.llm, 'parse_streaming_chunks_with_tools'):
full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools(
stream=response,
agent_name="Agent",
print_on=self.print_on,
verbose=self.verbose,
)
# Handle tool calls if any were detected
if tool_calls_in_stream:
# Add text response to memory first
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format and execute tool calls
import json
formatted_tool_calls = []
for tc in tool_calls_in_stream:
if tc and (tc.get("input") or tc.get("arguments_complete")):
if "input" in tc:
args_to_use = tc["input"]
else:
try:
args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError:
continue
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
"id": tc["id"]
})
if formatted_tool_calls:
# Execute tools using existing tool structure
response = {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
else:
# No tools called, use streamed text as response
response = full_text_response
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
else:
# Fallback: collect text manually
text_chunks = []
for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
text_chunks.append(content)
if self.print_on:
print(content, end="", flush=True)
if self.print_on:
print()
response = "".join(text_chunks)
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
else:
# Parse the response from the agent with the output type # Parse the response from the agent with the output type
if exists(self.tools_list_dictionary): if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel): if isinstance(response, BaseModel):
@ -1161,12 +1219,15 @@ class Agent:
response = self.parse_llm_output(response) response = self.parse_llm_output(response)
self.short_memory.add(role=self.agent_name, content=response) self.short_memory.add(role=self.agent_name, content=response)
# Print
if self.print_on is True: if self.print_on is True:
if isinstance(response, list): if isinstance(response, list):
self.pretty_print( self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count, loop_count,
) )
elif self.streaming_on:
pass
else: else:
self.pretty_print(response, loop_count) self.pretty_print(response, loop_count)
@ -1190,122 +1251,25 @@ class Agent:
f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" f"LLM returned None response in loop {loop_count}, skipping MCP tool handling"
) )
success = True # Exit retry loop success = True
except Exception as e: except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}") if self.autosave is True:
attempt += 1 log_agent_data(self.to_dict())
if attempt >= self.retry_attempts: self.save()
logger.error("All retry attempts failed.")
break # Exit main loop if retries fail
# If streaming is ON with tools, use streaming tool execution path
else:
try:
# Get streaming response with tools
if img is not None:
stream_response = self.call_llm(
task=task_prompt,
img=img,
current_loop=loop_count,
*args,
**kwargs,
)
else:
stream_response = self.call_llm(
task=task_prompt,
current_loop=loop_count,
*args,
**kwargs,
)
# Use streaming chunk parser from litellm_wrapper
if hasattr(self.llm, 'parse_streaming_chunks_with_tools'):
full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools(
stream=stream_response,
agent_name=self.agent_name,
print_on=self.print_on,
verbose=self.verbose,
)
else:
# Fallback to simple text collection if method not available
full_text_response = ""
tool_calls_in_stream = []
if hasattr(stream_response, "__iter__") and not isinstance(stream_response, str):
for chunk in stream_response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_text_response += content
if self.print_on:
print(content, end="", flush=True)
if self.print_on:
print()
# Handle tool calls if any were detected
if tool_calls_in_stream:
# Add text response to memory
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format and execute tool calls
formatted_tool_calls = []
for tc in tool_calls_in_stream:
if tc and (tc.get("input") or tc.get("arguments_complete")):
if "input" in tc:
args_to_use = tc["input"]
else:
try:
args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError:
continue
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
"id": tc["id"]
})
if formatted_tool_calls: logger.error(
tool_results = self.tool_struct.execute_function_calls_from_api_response( f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | "
{"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
) )
attempt += 1
self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) if not success:
if self.print_on: if self.autosave is True:
formatter.print_panel( log_agent_data(self.to_dict())
f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}", self.save()
"Tool Execution Result"
)
# Tool summary if enabled logger.error(
if self.tool_call_summary: "Failed to generate a valid response after retry attempts."
temp_llm = self.temp_llm_instance_for_tool_summary()
if self.streaming_on and hasattr(temp_llm, 'run_tool_summary_with_streaming'):
final_summary_response = temp_llm.run_tool_summary_with_streaming(
tool_results=format_data_structure(tool_results),
agent_name=f"{self.agent_name} - Real-time",
print_on=self.print_on,
verbose=self.verbose,
)
else:
final_summary_response = temp_llm.run(
task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}"
) )
response = self.parse_llm_output(final_summary_response)
self.short_memory.add(role=self.agent_name, content=response)
if self.print_on and not self.streaming_on:
self.pretty_print(response, loop_count)
else:
response = f"Tool execution completed: {format_data_structure(tool_results)}"
else:
# No tools called, use streamed text as response
response = full_text_response
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
except Exception as e:
logger.error(f"Error during streaming execution: {e}")
break break
# Check stopping conditions # Check stopping conditions
@ -2645,9 +2609,8 @@ class Agent:
del kwargs["is_last"] del kwargs["is_last"]
try: try:
# For streaming with tools, return raw stream for custom parsing # Special handling for streaming with tools - need raw stream for parsing
if self.streaming_on and exists(self.tools_list_dictionary): if self.streaming_on and exists(self.tools_list_dictionary):
# Set stream mode and get raw streaming response
original_stream = getattr(self.llm, 'stream', False) original_stream = getattr(self.llm, 'stream', False)
self.llm.stream = True self.llm.stream = True
@ -2658,7 +2621,6 @@ class Agent:
stream_response = self.llm.run(task=task, *args, **kwargs) stream_response = self.llm.run(task=task, *args, **kwargs)
return stream_response return stream_response
finally: finally:
# Restore original stream setting
self.llm.stream = original_stream self.llm.stream = original_stream
# Use centralized streaming logic from wrapper if streaming is enabled (no tools) # Use centralized streaming logic from wrapper if streaming is enabled (no tools)

Loading…
Cancel
Save