simplified streaming logic !

pull/938/head
harshalmore31 2 months ago
parent df83d9d144
commit f3ed5b9055

@ -1154,49 +1154,12 @@ class Agent:
# Handle streaming response with tools
if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str):
if hasattr(self.llm, 'parse_streaming_chunks_with_tools'):
full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools(
stream=response,
agent_name=self.agent_name,
print_on=self.print_on,
verbose=self.verbose,
)
if tool_calls_in_stream:
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
import json
formatted_tool_calls = []
for tc in tool_calls_in_stream:
if tc and (tc.get("input") or tc.get("arguments_complete")):
args_to_use = tc.get("input") or json.loads(tc.get("arguments", "{}"))
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
"id": tc["id"]
})
if formatted_tool_calls:
response = {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
else:
response = full_text_response
else:
response = full_text_response
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
else:
# Fallback for streaming without tool parsing
text_chunks = []
for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
text_chunks.append(content)
if self.print_on:
print(content, end="", flush=True)
if self.print_on:
print()
response = "".join(text_chunks)
response = self.tool_struct.handle_streaming_with_tools(
response=response,
llm=self.llm,
agent_name=self.agent_name,
print_on=self.print_on
)
else:
# Parse the response from the agent with the output type
if exists(self.tools_list_dictionary):

@ -3063,3 +3063,63 @@ class BaseTool(BaseModel):
)
return function_calls
def handle_streaming_with_tools(
self,
response: Any,
llm: Any,
agent_name: str = "agent",
print_on: bool = True
) -> Union[str, Dict[str, Any]]:
"""
Simplified streaming response handler with tool support.
Args:
response: Streaming response object
llm: Language model instance
agent_name: Name of the agent
print_on: Whether to print streaming output
Returns:
Union[str, Dict[str, Any]]: Processed response (text or tool calls)
"""
if hasattr(llm, 'parse_streaming_chunks_with_tools'):
text_response, tool_calls = llm.parse_streaming_chunks_with_tools(
stream=response,
agent_name=agent_name,
print_on=print_on,
verbose=self.verbose
)
if tool_calls:
formatted_calls = []
for tc in tool_calls:
if tc and tc.get("name"):
args = tc.get("input") or tc.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
formatted_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args)},
"id": tc.get("id")
})
return {"choices": [{"message": {"tool_calls": formatted_calls}}]} if formatted_calls else text_response
return text_response
else:
# Simple fallback streaming
chunks = []
for chunk in response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
if print_on:
print(content, end="", flush=True)
if print_on and chunks:
print()
return "".join(chunks)

Loading…
Cancel
Save