centralized streaming !

pull/938/head
harshalmore31 2 months ago
parent 1b8d986a23
commit df83d9d144

@ -1154,34 +1154,23 @@ class Agent:
# Handle streaming response with tools
if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str):
# Parse streaming chunks with tools using wrapper
if hasattr(self.llm, 'parse_streaming_chunks_with_tools'):
full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools(
stream=response,
agent_name="Agent",
agent_name=self.agent_name,
print_on=self.print_on,
verbose=self.verbose,
)
# Handle tool calls if any were detected
if tool_calls_in_stream:
# Add text response to memory first
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format and execute tool calls
import json
formatted_tool_calls = []
for tc in tool_calls_in_stream:
if tc and (tc.get("input") or tc.get("arguments_complete")):
if "input" in tc:
args_to_use = tc["input"]
else:
try:
args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError:
continue
args_to_use = tc.get("input") or json.loads(tc.get("arguments", "{}"))
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
@ -1189,15 +1178,15 @@ class Agent:
})
if formatted_tool_calls:
# Execute tools using existing tool structure
response = {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
else:
response = full_text_response
else:
# No tools called, use streamed text as response
response = full_text_response
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
else:
# Fallback: collect text manually
# Fallback for streaming without tool parsing
text_chunks = []
for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
@ -1208,8 +1197,6 @@ class Agent:
if self.print_on:
print()
response = "".join(text_chunks)
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
else:
# Parse the response from the agent with the output type
if exists(self.tools_list_dictionary):
@ -1217,6 +1204,8 @@ class Agent:
response = response.model_dump()
response = self.parse_llm_output(response)
if isinstance(response, str) and response.strip():
self.short_memory.add(role=self.agent_name, content=response)
# Print
@ -1263,14 +1252,16 @@ class Agent:
attempt += 1
if not success:
if self.autosave is True:
log_agent_data(self.to_dict())
self.save()
logger.error(
"Failed to generate a valid response after retry attempts."
"Failed to generate a valid response after"
" retry attempts."
)
break
break # Exit the loop if all retry attempts fail
# Check stopping conditions
if (
@ -1293,8 +1284,11 @@ class Agent:
break
if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ")
# User-defined exit command
if (
user_input.lower()
== self.custom_exit_command.lower()
@ -1304,6 +1298,7 @@ class Agent:
loop_count=loop_count,
)
break
self.short_memory.add(
role=self.user_name, content=user_input
)
@ -2629,7 +2624,7 @@ class Agent:
task=task,
img=img,
streaming_callback=streaming_callback,
title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}",
title=f"Agent: {self.agent_name} Loops: {current_loop}",
print_on=self.print_on,
verbose=self.verbose,
*args,
@ -2638,9 +2633,12 @@ class Agent:
else:
# Non-streaming call
if img is not None:
out = self.llm.run(task=task, img=img, *args, **kwargs)
out = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
out = self.llm.run(task=task, *args, **kwargs)
return out
except AgentLLMError as e:
@ -3015,7 +3013,7 @@ class Agent:
if self.streaming_on:
summary = temp_llm.run_with_streaming(
task=self.short_memory.get_str(),
title=f"🤖 Agent: {self.agent_name} - MCP Tool Summary",
title=f"Agent: {self.agent_name} - MCP Tool Summary",
style="cyan",
print_on=self.print_on,
verbose=self.verbose,
@ -3030,7 +3028,6 @@ class Agent:
# Fallback: provide a default summary
summary = "I successfully executed the MCP tool and retrieved the information above."
# Only pretty_print if streaming is off (to avoid double printing)
if self.print_on and not self.streaming_on:
self.pretty_print(summary, loop_count=current_loop)
@ -3043,7 +3040,6 @@ class Agent:
raise e
def temp_llm_instance_for_tool_summary(self):
from swarms.utils.litellm_wrapper import LiteLLM
return LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
@ -3064,6 +3060,7 @@ class Agent:
"This may indicate the LLM did not return a valid response."
)
return
try:
output = self.tool_struct.execute_function_calls_from_api_response(
response
@ -3095,17 +3092,16 @@ class Agent:
if self.tool_call_summary is True:
temp_llm = self.temp_llm_instance_for_tool_summary()
tool_summary_prompt = f"""
Please analyze and summarize the following tool execution output in a clear and concise way.
Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently.
The user's original request was:
{self.task}
Tool Output:
{output}
"""
tool_response = temp_llm.run(
f"""
Please analyze and summarize the following tool execution output in a clear and concise way.
Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently.
Tool Output:
{output}
"""
)
# Use centralized streaming logic for tool summary
if self.streaming_on:
@ -3116,7 +3112,7 @@ class Agent:
verbose=self.verbose,
)
else:
tool_response = temp_llm.run(tool_summary_prompt)
tool_response = temp_llm.run(tool_response)
# Add the tool response to memory
self.short_memory.add(
@ -3124,7 +3120,6 @@ class Agent:
content=tool_response,
)
# Only pretty_print if streaming is off (to avoid double printing)
if self.print_on and not self.streaming_on:
self.pretty_print(
tool_response,

Loading…
Cancel
Save