fixes and cleanups !

pull/938/head
harshalmore31 2 months ago
parent 4ca7546f0c
commit 584b4f900d

@ -1198,192 +1198,64 @@ class Agent:
logger.error("All retry attempts failed.") logger.error("All retry attempts failed.")
break # Exit main loop if retries fail break # Exit main loop if retries fail
# If streaming is ON, use the new real-time tool execution path # If streaming is ON with tools, use streaming tool execution path
else: else:
try: try:
# Get the raw streaming response directly from the underlying LLM client # Get streaming response with tools
# Bypass the LiteLLM wrapper's post-processing for streaming
raw_messages = self.short_memory.to_dict()
# Convert to proper OpenAI message format
messages = []
for msg in raw_messages:
# Normalize role names to lowercase
role = msg.get('role', '').lower()
content = msg.get('content', '')
# Map common role variants to standard OpenAI roles
role_mapping = {
'system': 'system',
'user': 'user',
'assistant': 'assistant',
'human': 'user', # Map Human to user
'tool executor': 'assistant', # Map Tool Executor to assistant
'database': 'assistant', # Map Database to assistant
}
normalized_role = role_mapping.get(role, 'assistant')
# Ensure content is in string format (not complex objects)
if isinstance(content, (dict, list)):
content = str(content)
elif not isinstance(content, str):
content = str(content)
# Create properly formatted OpenAI message
openai_msg = {
'role': normalized_role,
'content': content
}
messages.append(openai_msg)
# Prepare completion parameters for direct streaming
completion_params = {
"model": self.llm.model_name,
"messages": messages,
"stream": True,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
}
# Add tools if available
if self.tools_list_dictionary:
completion_params["tools"] = self.tools_list_dictionary
completion_params["tool_choice"] = "auto"
# Add image if provided
if img is not None: if img is not None:
# Add image to the last message stream_response = self.call_llm(
if messages and len(messages) > 0: task=task_prompt,
last_message = messages[-1] img=img,
if isinstance(last_message.get("content"), str): current_loop=loop_count,
last_message["content"] = [ *args,
{"type": "text", "text": last_message["content"]}, **kwargs,
{"type": "image_url", "image_url": {"url": img}} )
] else:
stream_response = self.call_llm(
# Get raw stream using the underlying client task=task_prompt,
from litellm import completion current_loop=loop_count,
stream = completion(**completion_params) *args,
**kwargs,
)
# Use streaming chunk parser from litellm_wrapper
if hasattr(self.llm, 'parse_streaming_chunks_with_tools'):
full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools(
stream=stream_response,
agent_name=self.agent_name,
print_on=self.print_on,
verbose=self.verbose,
)
else:
# Fallback to simple text collection if method not available
full_text_response = "" full_text_response = ""
tool_calls_in_stream = [] tool_calls_in_stream = []
current_tool_call = None if hasattr(stream_response, "__iter__") and not isinstance(stream_response, str):
tool_call_buffer = "" for chunk in stream_response:
if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content:
if self.print_on: content = chunk.choices[0].delta.content
print(f"🤖 {self.agent_name}: ", end="", flush=True) full_text_response += content
# Process streaming chunks in real-time
for chunk in stream:
# Debug: Log chunk processing if verbose
if self.verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
chunk_type = getattr(chunk, 'type', None)
# Anthropic-style stream parsing
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if self.print_on:
print(f"\n🔧 Tool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if self.print_on: if self.print_on:
print(text_chunk, end="", flush=True) print(content, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
# OpenAI-style stream parsing
elif hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and choice.delta:
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
# Get tool call index
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# If this slot is empty and we have a function name, create new tool call
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if self.print_on:
print(f"\n🔧 Tool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments (whether it's a new call or continuation)
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if self.verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if self.verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
# Not complete yet, continue accumulating
pass
if self.print_on: if self.print_on:
print() # Newline after streaming text print()
# After the stream is complete, check if any tools were called # Handle tool calls if any were detected
if tool_calls_in_stream: if tool_calls_in_stream:
# Add the partial text response to memory # Add text response to memory
if full_text_response.strip(): if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response) self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format tool calls for execution - only execute complete tool calls # Format and execute tool calls
formatted_tool_calls = [] formatted_tool_calls = []
for tc in tool_calls_in_stream: for tc in tool_calls_in_stream:
# Only execute tool calls that have complete arguments if tc and (tc.get("input") or tc.get("arguments_complete")):
if tc.get("input") or tc.get("arguments_complete"):
# Use input if available, otherwise parse arguments
if "input" in tc: if "input" in tc:
args_to_use = tc["input"] args_to_use = tc["input"]
else: else:
try: try:
args_to_use = json.loads(tc.get("arguments", "{}")) args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError: except json.JSONDecodeError:
logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}")
continue continue
formatted_tool_calls.append({ formatted_tool_calls.append({
@ -1392,23 +1264,11 @@ class Agent:
"id": tc["id"] "id": tc["id"]
}) })
# Execute all buffered tool calls if we have any complete ones
if formatted_tool_calls: if formatted_tool_calls:
if self.verbose:
logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}")
tool_results = self.tool_struct.execute_function_calls_from_api_response( tool_results = self.tool_struct.execute_function_calls_from_api_response(
{"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
) )
if self.verbose:
logger.info(f"Tool execution results: {tool_results}")
else:
if self.verbose:
logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}")
tool_results = []
# Add tool results to memory
self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results))
if self.print_on: if self.print_on:
formatter.print_panel( formatter.print_panel(
@ -1416,12 +1276,10 @@ class Agent:
"Tool Execution Result" "Tool Execution Result"
) )
# Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled # Tool summary if enabled
if self.tool_call_summary: if self.tool_call_summary:
temp_llm = self.temp_llm_instance_for_tool_summary() temp_llm = self.temp_llm_instance_for_tool_summary()
if self.streaming_on and hasattr(temp_llm, 'run_tool_summary_with_streaming'):
# Use centralized streaming logic for real-time tool summary
if self.streaming_on:
final_summary_response = temp_llm.run_tool_summary_with_streaming( final_summary_response = temp_llm.run_tool_summary_with_streaming(
tool_results=format_data_structure(tool_results), tool_results=format_data_structure(tool_results),
agent_name=f"{self.agent_name} - Real-time", agent_name=f"{self.agent_name} - Real-time",
@ -1436,22 +1294,18 @@ class Agent:
response = self.parse_llm_output(final_summary_response) response = self.parse_llm_output(final_summary_response)
self.short_memory.add(role=self.agent_name, content=response) self.short_memory.add(role=self.agent_name, content=response)
# Only pretty_print if streaming is off (to avoid double printing)
if self.print_on and not self.streaming_on: if self.print_on and not self.streaming_on:
self.pretty_print(response, loop_count) self.pretty_print(response, loop_count)
else: else:
response = f"Tool execution completed: {format_data_structure(tool_results)}" response = f"Tool execution completed: {format_data_structure(tool_results)}"
else: else:
# If no tools were called, the streamed text is the final response # No tools called, use streamed text as response
response = full_text_response response = full_text_response
if response.strip(): if response.strip():
self.short_memory.add(role=self.agent_name, content=response) self.short_memory.add(role=self.agent_name, content=response)
except Exception as e: except Exception as e:
logger.error(f"Error during streaming execution: {e}") logger.error(f"Error during streaming execution: {e}")
import traceback
traceback.print_exc()
break break
# Check stopping conditions # Check stopping conditions
@ -2791,8 +2645,24 @@ class Agent:
del kwargs["is_last"] del kwargs["is_last"]
try: try:
# Use centralized streaming logic from wrapper if streaming is enabled # For streaming with tools, return raw stream for custom parsing
if self.streaming_on and hasattr(self.llm, "run_with_streaming"): if self.streaming_on and exists(self.tools_list_dictionary):
# Set stream mode and get raw streaming response
original_stream = getattr(self.llm, 'stream', False)
self.llm.stream = True
try:
if img is not None:
stream_response = self.llm.run(task=task, img=img, *args, **kwargs)
else:
stream_response = self.llm.run(task=task, *args, **kwargs)
return stream_response
finally:
# Restore original stream setting
self.llm.stream = original_stream
# Use centralized streaming logic from wrapper if streaming is enabled (no tools)
elif self.streaming_on and hasattr(self.llm, "run_with_streaming"):
return self.llm.run_with_streaming( return self.llm.run_with_streaming(
task=task, task=task,
img=img, img=img,

@ -819,6 +819,126 @@ class LiteLLM:
time.sleep(delay) time.sleep(delay)
print() # Newline at the end print() # Newline at the end
def parse_streaming_chunks_with_tools(
self,
stream,
agent_name: str = "Agent",
print_on: bool = True,
verbose: bool = False,
) -> tuple:
"""
Parse streaming chunks and extract both text and tool calls.
Args:
stream: The streaming response object
agent_name: Name of the agent for printing
print_on: Whether to print streaming output
verbose: Whether to enable verbose logging
Returns:
tuple: (full_text_response, tool_calls_list)
"""
import json
full_text_response = ""
tool_calls_in_stream = []
current_tool_call = None
tool_call_buffer = ""
if print_on:
print(f"🤖 {agent_name}: ", end="", flush=True)
# Process streaming chunks in real-time
for chunk in stream:
if verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
chunk_type = getattr(chunk, 'type', None)
# Anthropic-style stream parsing
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if print_on:
print(f"\n🔧 Tool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
# OpenAI-style stream parsing
elif hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and choice.delta:
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# Create new tool call if slot is empty and we have a function name
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if print_on:
print(f"\n🔧 Tool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
pass
if print_on:
print() # Newline after streaming text
return full_text_response, tool_calls_in_stream
def run( def run(
self, self,
task: str, task: str,

Loading…
Cancel
Save