new tools_streaming fixed !!!

pull/938/head
harshalmore31 2 days ago
parent 66e68b8405
commit 9ca24fd545

@ -1067,108 +1067,317 @@ class Agent:
self.short_memory.return_history_as_string() self.short_memory.return_history_as_string()
) )
# Parameters # If streaming is OFF, use the simple, non-streaming path
attempt = 0 if not self.streaming_on:
success = False attempt = 0
while attempt < self.retry_attempts and not success: success = False
try: while attempt < self.retry_attempts and not success:
try:
if img is not None: if img is not None:
response = self.call_llm( response = self.call_llm(
task=task_prompt, task=task_prompt,
img=img, img=img,
current_loop=loop_count,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
else:
response = self.call_llm(
task=task_prompt,
current_loop=loop_count,
streaming_callback=streaming_callback,
*args,
**kwargs,
)
# If streaming is enabled, then don't print the response
# Parse the response from the agent with the output type
if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel):
response = response.model_dump()
# Parse the response from the agent with the output type
response = self.parse_llm_output(response)
self.short_memory.add(
role=self.agent_name,
content=response,
)
# Print
if self.print_on is True:
if isinstance(response, list):
self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count,
)
elif self.streaming_on:
pass
else:
self.pretty_print(
response, loop_count
)
# Check and execute callable tools
if exists(self.tools):
self.tool_execution_retry(
response, loop_count
)
# Handle MCP tools
if (
exists(self.mcp_url)
or exists(self.mcp_config)
or exists(self.mcp_urls)
):
# Only handle MCP tools if response is not None
if response is not None:
self.mcp_tool_handling(
response=response,
current_loop=loop_count, current_loop=loop_count,
streaming_callback=streaming_callback,
*args,
**kwargs,
) )
else: else:
logger.warning( response = self.call_llm(
f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" task=task_prompt,
current_loop=loop_count,
streaming_callback=streaming_callback,
*args,
**kwargs,
) )
# self.sentiment_and_evaluator(response) # Parse the response from the agent with the output type
if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel):
response = response.model_dump()
success = True # Mark as successful to exit the retry loop response = self.parse_llm_output(response)
self.short_memory.add(role=self.agent_name, content=response)
except Exception as e: if self.print_on is True:
if isinstance(response, list):
self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count,
)
else:
self.pretty_print(response, loop_count)
if self.autosave is True: # Check and execute callable tools
log_agent_data(self.to_dict()) if exists(self.tools):
self.save() self.tool_execution_retry(response, loop_count)
logger.error( # Handle MCP tools
f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " if (
) exists(self.mcp_url)
attempt += 1 or exists(self.mcp_config)
or exists(self.mcp_urls)
):
if response is not None:
self.mcp_tool_handling(
response=response,
current_loop=loop_count,
)
else:
logger.warning(
f"LLM returned None response in loop {loop_count}, skipping MCP tool handling"
)
success = True # Exit retry loop
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
attempt += 1
if attempt >= self.retry_attempts:
logger.error("All retry attempts failed.")
break # Exit main loop if retries fail
# If streaming is ON, use the new real-time tool execution path
else:
try:
# Get the raw streaming response directly from the underlying LLM client
# Bypass the LiteLLM wrapper's post-processing for streaming
raw_messages = self.short_memory.to_dict()
# Convert to proper OpenAI message format
messages = []
for msg in raw_messages:
# Normalize role names to lowercase
role = msg.get('role', '').lower()
content = msg.get('content', '')
# Map common role variants to standard OpenAI roles
role_mapping = {
'system': 'system',
'user': 'user',
'assistant': 'assistant',
'human': 'user', # Map Human to user
'tool executor': 'assistant', # Map Tool Executor to assistant
'database': 'assistant', # Map Database to assistant
}
normalized_role = role_mapping.get(role, 'assistant')
# Ensure content is in string format (not complex objects)
if isinstance(content, (dict, list)):
content = str(content)
elif not isinstance(content, str):
content = str(content)
# Create properly formatted OpenAI message
openai_msg = {
'role': normalized_role,
'content': content
}
messages.append(openai_msg)
# Prepare completion parameters for direct streaming
completion_params = {
"model": self.llm.model_name,
"messages": messages,
"stream": True,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
}
# Add tools if available
if self.tools_list_dictionary:
completion_params["tools"] = self.tools_list_dictionary
completion_params["tool_choice"] = "auto"
# Add image if provided
if img is not None:
# Add image to the last message
if messages and len(messages) > 0:
last_message = messages[-1]
if isinstance(last_message.get("content"), str):
last_message["content"] = [
{"type": "text", "text": last_message["content"]},
{"type": "image_url", "image_url": {"url": img}}
]
# Get raw stream using the underlying client
from litellm import completion
stream = completion(**completion_params)
full_text_response = ""
tool_calls_in_stream = []
current_tool_call = None
tool_call_buffer = ""
if self.print_on:
print(f"🤖 {self.agent_name}: ", end="", flush=True)
# Process streaming chunks in real-time
for chunk in stream:
# Debug: Log chunk processing if verbose
if self.verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
chunk_type = getattr(chunk, 'type', None)
# Anthropic-style stream parsing
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if self.print_on:
print(f"\n🔧 Tool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
# OpenAI-style stream parsing
elif hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and choice.delta:
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
# Get tool call index
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# If this slot is empty and we have a function name, create new tool call
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if self.print_on:
print(f"\n🔧 Tool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments (whether it's a new call or continuation)
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if self.verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if self.verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
# Not complete yet, continue accumulating
pass
if self.print_on:
print() # Newline after streaming text
# After the stream is complete, check if any tools were called
if tool_calls_in_stream:
# Add the partial text response to memory
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format tool calls for execution - only execute complete tool calls
formatted_tool_calls = []
for tc in tool_calls_in_stream:
# Only execute tool calls that have complete arguments
if tc.get("input") or tc.get("arguments_complete"):
# Use input if available, otherwise parse arguments
if "input" in tc:
args_to_use = tc["input"]
else:
try:
args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError:
logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}")
continue
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
"id": tc["id"]
})
# Execute all buffered tool calls if we have any complete ones
if formatted_tool_calls:
if self.verbose:
logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}")
tool_results = self.tool_struct.execute_function_calls_from_api_response(
{"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
)
if not success: if self.verbose:
logger.info(f"Tool execution results: {tool_results}")
else:
if self.verbose:
logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}")
tool_results = []
# Add tool results to memory
self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results))
if self.print_on:
formatter.print_panel(
f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}",
"Tool Execution Result"
)
if self.autosave is True: # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled
log_agent_data(self.to_dict()) if self.tool_call_summary:
self.save() temp_llm = self.temp_llm_instance_for_tool_summary()
final_summary_response = temp_llm.run(
task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}"
)
response = self.parse_llm_output(final_summary_response)
self.short_memory.add(role=self.agent_name, content=response)
if self.print_on:
self.pretty_print(response, loop_count)
else:
response = f"Tool execution completed: {format_data_structure(tool_results)}"
logger.error( else:
"Failed to generate a valid response after" # If no tools were called, the streamed text is the final response
" retry attempts." response = full_text_response
) if response.strip():
break # Exit the loop if all retry attempts fail self.short_memory.add(role=self.agent_name, content=response)
except Exception as e:
logger.error(f"Error during streaming execution: {e}")
import traceback
traceback.print_exc()
break
# Check stopping conditions # Check stopping conditions
if ( if (
@ -1191,10 +1400,7 @@ class Agent:
break break
if self.interactive: if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ") user_input = input("You: ")
# User-defined exit command
if ( if (
user_input.lower() user_input.lower()
== self.custom_exit_command.lower() == self.custom_exit_command.lower()
@ -1204,7 +1410,6 @@ class Agent:
loop_count=loop_count, loop_count=loop_count,
) )
break break
self.short_memory.add( self.short_memory.add(
role=self.user_name, content=user_input role=self.user_name, content=user_input
) )
@ -3003,7 +3208,6 @@ class Agent:
"This may indicate the LLM did not return a valid response." "This may indicate the LLM did not return a valid response."
) )
return return
try: try:
output = self.tool_struct.execute_function_calls_from_api_response( output = self.tool_struct.execute_function_calls_from_api_response(
response response
@ -3041,6 +3245,9 @@ class Agent:
Focus on the key information and insights that would be most relevant to the user's original request. Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently. If there are any errors or issues, highlight them prominently.
The user's original request was:
{self.task}
Tool Output: Tool Output:
{output} {output}
""" """

Loading…
Cancel
Save