pull/938/merge
harshalmore31 1 day ago committed by GitHub
commit d7c00703a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1067,12 +1067,12 @@ class Agent:
self.short_memory.return_history_as_string() self.short_memory.return_history_as_string()
) )
# Parameters # If streaming is OFF, use the simple, non-streaming path
if not self.streaming_on:
attempt = 0 attempt = 0
success = False success = False
while attempt < self.retry_attempts and not success: while attempt < self.retry_attempts and not success:
try: try:
if img is not None: if img is not None:
response = self.call_llm( response = self.call_llm(
task=task_prompt, task=task_prompt,
@ -1091,40 +1091,26 @@ class Agent:
**kwargs, **kwargs,
) )
# If streaming is enabled, then don't print the response
# Parse the response from the agent with the output type # Parse the response from the agent with the output type
if exists(self.tools_list_dictionary): if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel): if isinstance(response, BaseModel):
response = response.model_dump() response = response.model_dump()
# Parse the response from the agent with the output type
response = self.parse_llm_output(response) response = self.parse_llm_output(response)
self.short_memory.add(role=self.agent_name, content=response)
self.short_memory.add(
role=self.agent_name,
content=response,
)
# Print
if self.print_on is True: if self.print_on is True:
if isinstance(response, list): if isinstance(response, list):
self.pretty_print( self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count, loop_count,
) )
elif self.streaming_on:
pass
else: else:
self.pretty_print( self.pretty_print(response, loop_count)
response, loop_count
)
# Check and execute callable tools # Check and execute callable tools
if exists(self.tools): if exists(self.tools):
self.tool_execution_retry( self.tool_execution_retry(response, loop_count)
response, loop_count
)
# Handle MCP tools # Handle MCP tools
if ( if (
@ -1132,7 +1118,6 @@ class Agent:
or exists(self.mcp_config) or exists(self.mcp_config)
or exists(self.mcp_urls) or exists(self.mcp_urls)
): ):
# Only handle MCP tools if response is not None
if response is not None: if response is not None:
self.mcp_tool_handling( self.mcp_tool_handling(
response=response, response=response,
@ -1143,32 +1128,286 @@ class Agent:
f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" f"LLM returned None response in loop {loop_count}, skipping MCP tool handling"
) )
# self.sentiment_and_evaluator(response) success = True # Exit retry loop
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
attempt += 1
if attempt >= self.retry_attempts:
logger.error("All retry attempts failed.")
break # Exit main loop if retries fail
# If streaming is ON, use the new real-time tool execution path
else:
try:
# Get the raw streaming response directly from the underlying LLM client
# Bypass the LiteLLM wrapper's post-processing for streaming
raw_messages = self.short_memory.to_dict()
# Convert to proper OpenAI message format
messages = []
for msg in raw_messages:
# Normalize role names to lowercase
role = msg.get('role', '').lower()
content = msg.get('content', '')
# Map common role variants to standard OpenAI roles
role_mapping = {
'system': 'system',
'user': 'user',
'assistant': 'assistant',
'human': 'user', # Map Human to user
'tool executor': 'assistant', # Map Tool Executor to assistant
'database': 'assistant', # Map Database to assistant
}
normalized_role = role_mapping.get(role, 'assistant')
# Ensure content is in string format (not complex objects)
if isinstance(content, (dict, list)):
content = str(content)
elif not isinstance(content, str):
content = str(content)
# Create properly formatted OpenAI message
openai_msg = {
'role': normalized_role,
'content': content
}
messages.append(openai_msg)
# Prepare completion parameters for direct streaming
completion_params = {
"model": self.llm.model_name,
"messages": messages,
"stream": True,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
}
success = True # Mark as successful to exit the retry loop # Add tools if available
if self.tools_list_dictionary:
completion_params["tools"] = self.tools_list_dictionary
completion_params["tool_choice"] = "auto"
except Exception as e: # Add image if provided
if img is not None:
# Add image to the last message
if messages and len(messages) > 0:
last_message = messages[-1]
if isinstance(last_message.get("content"), str):
last_message["content"] = [
{"type": "text", "text": last_message["content"]},
{"type": "image_url", "image_url": {"url": img}}
]
if self.autosave is True: # Get raw stream using the underlying client
log_agent_data(self.to_dict()) from litellm import completion
self.save() stream = completion(**completion_params)
logger.error( full_text_response = ""
f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " tool_calls_in_stream = []
current_tool_call = None
tool_call_buffer = ""
if self.print_on:
print(f"🤖 {self.agent_name}: ", end="", flush=True)
# Process streaming chunks in real-time
for chunk in stream:
# Debug: Log chunk processing if verbose
if self.verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
chunk_type = getattr(chunk, 'type', None)
# Anthropic-style stream parsing
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if self.print_on:
print(f"\n🔧 Tool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
# OpenAI-style stream parsing
elif hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and choice.delta:
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
# Get tool call index
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# If this slot is empty and we have a function name, create new tool call
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if self.print_on:
print(f"\n🔧 Tool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments (whether it's a new call or continuation)
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if self.verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if self.verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
# Not complete yet, continue accumulating
pass
if self.print_on:
print() # Newline after streaming text
# After the stream is complete, check if any tools were called
if tool_calls_in_stream:
# Add the partial text response to memory
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format tool calls for execution - only execute complete tool calls
formatted_tool_calls = []
for tc in tool_calls_in_stream:
# Only execute tool calls that have complete arguments
if tc.get("input") or tc.get("arguments_complete"):
# Use input if available, otherwise parse arguments
if "input" in tc:
args_to_use = tc["input"]
else:
try:
args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError:
logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}")
continue
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
"id": tc["id"]
})
# Execute all buffered tool calls if we have any complete ones
if formatted_tool_calls:
if self.verbose:
logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}")
tool_results = self.tool_struct.execute_function_calls_from_api_response(
{"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
) )
attempt += 1
if not success: if self.verbose:
logger.info(f"Tool execution results: {tool_results}")
else:
if self.verbose:
logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}")
tool_results = []
if self.autosave is True: # Add tool results to memory
log_agent_data(self.to_dict()) self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results))
self.save() if self.print_on:
formatter.print_panel(
f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}",
"Tool Execution Result"
)
logger.error( # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled
"Failed to generate a valid response after" if self.tool_call_summary:
" retry attempts." temp_llm = self.temp_llm_instance_for_tool_summary()
final_summary_response = temp_llm.run(
task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}"
) )
break # Exit the loop if all retry attempts fail
# Handle streaming for final tool summary in real-time execution
if self.streaming_on and hasattr(final_summary_response, "__iter__") and not isinstance(final_summary_response, str):
# Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
if self.verbose:
logger.debug(f"Real-time tool summary streaming chunk: {chunk[:50]}...")
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
final_summary_response,
title=f"🤖 Agent: {self.agent_name} - Tool Summary (Real-time)",
style="green",
collect_chunks=True,
on_chunk_callback=on_chunk_received,
)
final_summary_response = complete_response
elif self.streaming_on and isinstance(final_summary_response, str):
# If streaming is on but we got a string response, display it streamed
if self.print_on:
self.stream_response(final_summary_response, delay=0.01)
response = self.parse_llm_output(final_summary_response)
self.short_memory.add(role=self.agent_name, content=response)
# Only pretty_print if streaming is off (to avoid double printing)
if self.print_on and not self.streaming_on:
self.pretty_print(response, loop_count)
else:
response = f"Tool execution completed: {format_data_structure(tool_results)}"
else:
# If no tools were called, the streamed text is the final response
response = full_text_response
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
except Exception as e:
logger.error(f"Error during streaming execution: {e}")
import traceback
traceback.print_exc()
break
# Check stopping conditions # Check stopping conditions
if ( if (
@ -1191,10 +1430,7 @@ class Agent:
break break
if self.interactive: if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ") user_input = input("You: ")
# User-defined exit command
if ( if (
user_input.lower() user_input.lower()
== self.custom_exit_command.lower() == self.custom_exit_command.lower()
@ -1204,7 +1440,6 @@ class Agent:
loop_count=loop_count, loop_count=loop_count,
) )
break break
self.short_memory.add( self.short_memory.add(
role=self.user_name, content=user_input role=self.user_name, content=user_input
) )
@ -2964,6 +3199,34 @@ class Agent:
summary = temp_llm.run( summary = temp_llm.run(
task=self.short_memory.get_str() task=self.short_memory.get_str()
) )
# Handle streaming MCP tool summary response
if self.streaming_on and hasattr(summary, "__iter__") and not isinstance(summary, str):
# Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
if self.verbose:
logger.debug(f"MCP tool summary streaming chunk received: {chunk[:50]}...")
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
summary,
title=f"🤖 Agent: {self.agent_name} - MCP Tool Summary",
style="cyan",
collect_chunks=True,
on_chunk_callback=on_chunk_received,
)
summary = complete_response
elif self.streaming_on and isinstance(summary, str):
# If streaming is on but we got a string response, display it streamed
if self.print_on:
self.stream_response(summary, delay=0.01)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error calling LLM after MCP tool execution: {e}" f"Error calling LLM after MCP tool execution: {e}"
@ -2971,7 +3234,8 @@ class Agent:
# Fallback: provide a default summary # Fallback: provide a default summary
summary = "I successfully executed the MCP tool and retrieved the information above." summary = "I successfully executed the MCP tool and retrieved the information above."
if self.print_on is True: # Only pretty_print if streaming is off (to avoid double printing)
if self.print_on and not self.streaming_on:
self.pretty_print(summary, loop_count=current_loop) self.pretty_print(summary, loop_count=current_loop)
# Add to the memory # Add to the memory
@ -2988,7 +3252,7 @@ class Agent:
temperature=self.temperature, temperature=self.temperature,
max_tokens=self.max_tokens, max_tokens=self.max_tokens,
system_prompt=self.system_prompt, system_prompt=self.system_prompt,
stream=False, # Always disable streaming for tool summaries stream=self.streaming_on,
tools_list_dictionary=None, tools_list_dictionary=None,
parallel_tool_calls=False, parallel_tool_calls=False,
base_url=self.llm_base_url, base_url=self.llm_base_url,
@ -3003,7 +3267,6 @@ class Agent:
"This may indicate the LLM did not return a valid response." "This may indicate the LLM did not return a valid response."
) )
return return
try: try:
output = self.tool_struct.execute_function_calls_from_api_response( output = self.tool_struct.execute_function_calls_from_api_response(
response response
@ -3035,23 +3298,55 @@ class Agent:
if self.tool_call_summary is True: if self.tool_call_summary is True:
temp_llm = self.temp_llm_instance_for_tool_summary() temp_llm = self.temp_llm_instance_for_tool_summary()
tool_response = temp_llm.run( tool_summary_prompt = f"""
f"""
Please analyze and summarize the following tool execution output in a clear and concise way. Please analyze and summarize the following tool execution output in a clear and concise way.
Focus on the key information and insights that would be most relevant to the user's original request. Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently. If there are any errors or issues, highlight them prominently.
The user's original request was:
{self.task}
Tool Output: Tool Output:
{output} {output}
""" """
tool_response = temp_llm.run(tool_summary_prompt)
# Handle streaming tool response
if self.streaming_on and hasattr(tool_response, "__iter__") and not isinstance(tool_response, str):
# Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
if self.verbose:
logger.debug(f"Tool response streaming chunk received: {chunk[:50]}...")
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
tool_response,
title=f"🤖 Agent: {self.agent_name} - Tool Summary",
style="blue",
collect_chunks=True,
on_chunk_callback=on_chunk_received,
) )
tool_response = complete_response
elif self.streaming_on and isinstance(tool_response, str):
# If streaming is on but we got a string response, display it streamed
if self.print_on:
self.stream_response(tool_response, delay=0.01)
# Add the tool response to memory
self.short_memory.add( self.short_memory.add(
role=self.agent_name, role=self.agent_name,
content=tool_response, content=tool_response,
) )
if self.print_on is True: # Only pretty_print if streaming is off (to avoid double printing)
if self.print_on and not self.streaming_on:
self.pretty_print( self.pretty_print(
tool_response, tool_response,
loop_count, loop_count,

@ -27,6 +27,12 @@ dashboard_live = None
# Create a spinner for loading animation # Create a spinner for loading animation
spinner = Spinner("dots", style="yellow") spinner = Spinner("dots", style="yellow")
# Global lock to ensure only a single Rich Live context is active at any moment.
# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same
# console raise runtime errors. Using a module-level lock serialises access and
# prevents crashes when multiple agents stream simultaneously in different
# threads (e.g., in ConcurrentWorkflow).
live_render_lock = threading.Lock()
def choose_random_color(): def choose_random_color():
import random import random
@ -308,6 +314,7 @@ class Formatter:
): ):
# Add ONLY the new chunk to the Text object with random color style # Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content chunk = part.choices[0].delta.content
streaming_text.append( streaming_text.append(
chunk, style=text_style chunk, style=text_style
) )

@ -153,6 +153,134 @@ class LiteLLM:
litellm.drop_params = True litellm.drop_params = True
def _collect_streaming_response(self, streaming_response):
"""
Parse and yield individual content chunks from a streaming response.
Args:
streaming_response: The streaming response object from litellm
Yields:
str: Individual content chunks as they arrive
"""
try:
for chunk in streaming_response:
content = None
# Handle different chunk formats
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
# OpenAI-style chunks
if hasattr(choice, 'delta') and choice.delta:
if hasattr(choice.delta, 'content') and choice.delta.content:
content = choice.delta.content
# Alternative chunk format
elif hasattr(choice, 'message') and choice.message:
if hasattr(choice.message, 'content') and choice.message.content:
content = choice.message.content
# Anthropic-style chunks
elif hasattr(chunk, 'type'):
if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'text_delta':
content = chunk.delta.text
# Handle direct content chunks
elif hasattr(chunk, 'content'):
content = chunk.content
# Yield content chunk if we found any
if content:
yield content
except Exception as e:
logger.error(f"Error parsing streaming chunks: {e}")
return
async def _collect_streaming_response_async(self, streaming_response):
"""
Parse and yield individual content chunks from an async streaming response.
Args:
streaming_response: The async streaming response object from litellm
Yields:
str: Individual content chunks as they arrive
"""
try:
async for chunk in streaming_response:
content = None
# Handle different chunk formats
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
# OpenAI-style chunks
if hasattr(choice, 'delta') and choice.delta:
if hasattr(choice.delta, 'content') and choice.delta.content:
content = choice.delta.content
# Alternative chunk format
elif hasattr(choice, 'message') and choice.message:
if hasattr(choice.message, 'content') and choice.message.content:
content = choice.message.content
# Anthropic-style chunks
elif hasattr(chunk, 'type'):
if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'text_delta':
content = chunk.delta.text
# Handle direct content chunks
elif hasattr(chunk, 'content'):
content = chunk.content
# Yield content chunk if we found any
if content:
yield content
except Exception as e:
logger.error(f"Error parsing async streaming chunks: {e}")
return
def collect_all_chunks(self, streaming_response):
"""
Helper method to collect all chunks from a streaming response into a complete text.
This provides backward compatibility for code that expects a complete response.
Args:
streaming_response: The streaming response object from litellm
Returns:
str: The complete response text collected from all chunks
"""
chunks = []
for chunk in self._collect_streaming_response(streaming_response):
chunks.append(chunk)
complete_response = "".join(chunks)
logger.info(f"Collected complete streaming response: {len(complete_response)} characters")
return complete_response
async def collect_all_chunks_async(self, streaming_response):
"""
Helper method to collect all chunks from an async streaming response into a complete text.
This provides backward compatibility for code that expects a complete response.
Args:
streaming_response: The async streaming response object from litellm
Returns:
str: The complete response text collected from all chunks
"""
chunks = []
async for chunk in self._collect_streaming_response_async(streaming_response):
chunks.append(chunk)
complete_response = "".join(chunks)
logger.info(f"Collected complete async streaming response: {len(complete_response)} characters")
return complete_response
def output_for_tools(self, response: any): def output_for_tools(self, response: any):
if self.mcp_call is True: if self.mcp_call is True:
out = response.choices[0].message.tool_calls[0].function out = response.choices[0].message.tool_calls[0].function
@ -471,7 +599,9 @@ class LiteLLM:
**kwargs: Additional keyword arguments. **kwargs: Additional keyword arguments.
Returns: Returns:
str: The content of the response from the model. str or generator: When streaming is disabled, returns the complete response content.
When streaming is enabled, returns a generator that yields content chunks.
Use collect_all_chunks() to get complete response from the generator.
Raises: Raises:
Exception: If there is an error in processing the request. Exception: If there is an error in processing the request.
@ -525,7 +655,7 @@ class LiteLLM:
# Handle streaming response # Handle streaming response
if self.stream: if self.stream:
return response # Return the streaming generator directly return response
# Handle tool-based response # Handle tool-based response
elif self.tools_list_dictionary is not None: elif self.tools_list_dictionary is not None:
@ -574,7 +704,9 @@ class LiteLLM:
**kwargs: Additional keyword arguments. **kwargs: Additional keyword arguments.
Returns: Returns:
str: The content of the response from the model. str or async generator: When streaming is disabled, returns the complete response content.
When streaming is enabled, returns an async generator that yields content chunks.
Use collect_all_chunks_async() to get complete response from the generator.
""" """
try: try:
messages = self._prepare_messages(task) messages = self._prepare_messages(task)
@ -608,6 +740,10 @@ class LiteLLM:
# Standard completion # Standard completion
response = await acompletion(**completion_params) response = await acompletion(**completion_params)
# Handle streaming response for async
if self.stream:
return self._collect_streaming_response_async(response)
print(response) print(response)
return response return response

Loading…
Cancel
Save