harshalmore31 2 days ago committed by GitHub
commit 672befd71a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1067,12 +1067,12 @@ class Agent:
self.short_memory.return_history_as_string() self.short_memory.return_history_as_string()
) )
# Parameters # If streaming is OFF, use the simple, non-streaming path
if not self.streaming_on:
attempt = 0 attempt = 0
success = False success = False
while attempt < self.retry_attempts and not success: while attempt < self.retry_attempts and not success:
try: try:
if img is not None: if img is not None:
response = self.call_llm( response = self.call_llm(
task=task_prompt, task=task_prompt,
@ -1091,40 +1091,26 @@ class Agent:
**kwargs, **kwargs,
) )
# If streaming is enabled, then don't print the response
# Parse the response from the agent with the output type # Parse the response from the agent with the output type
if exists(self.tools_list_dictionary): if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel): if isinstance(response, BaseModel):
response = response.model_dump() response = response.model_dump()
# Parse the response from the agent with the output type
response = self.parse_llm_output(response) response = self.parse_llm_output(response)
self.short_memory.add(role=self.agent_name, content=response)
self.short_memory.add(
role=self.agent_name,
content=response,
)
# Print
if self.print_on is True: if self.print_on is True:
if isinstance(response, list): if isinstance(response, list):
self.pretty_print( self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count, loop_count,
) )
elif self.streaming_on:
pass
else: else:
self.pretty_print( self.pretty_print(response, loop_count)
response, loop_count
)
# Check and execute callable tools # Check and execute callable tools
if exists(self.tools): if exists(self.tools):
self.tool_execution_retry( self.tool_execution_retry(response, loop_count)
response, loop_count
)
# Handle MCP tools # Handle MCP tools
if ( if (
@ -1132,7 +1118,6 @@ class Agent:
or exists(self.mcp_config) or exists(self.mcp_config)
or exists(self.mcp_urls) or exists(self.mcp_urls)
): ):
# Only handle MCP tools if response is not None
if response is not None: if response is not None:
self.mcp_tool_handling( self.mcp_tool_handling(
response=response, response=response,
@ -1143,32 +1128,256 @@ class Agent:
f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" f"LLM returned None response in loop {loop_count}, skipping MCP tool handling"
) )
# self.sentiment_and_evaluator(response) success = True # Exit retry loop
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
attempt += 1
if attempt >= self.retry_attempts:
logger.error("All retry attempts failed.")
break # Exit main loop if retries fail
success = True # Mark as successful to exit the retry loop # If streaming is ON, use the new real-time tool execution path
else:
try:
# Get the raw streaming response directly from the underlying LLM client
# Bypass the LiteLLM wrapper's post-processing for streaming
raw_messages = self.short_memory.to_dict()
# Convert to proper OpenAI message format
messages = []
for msg in raw_messages:
# Normalize role names to lowercase
role = msg.get('role', '').lower()
content = msg.get('content', '')
# Map common role variants to standard OpenAI roles
role_mapping = {
'system': 'system',
'user': 'user',
'assistant': 'assistant',
'human': 'user', # Map Human to user
'tool executor': 'assistant', # Map Tool Executor to assistant
'database': 'assistant', # Map Database to assistant
}
normalized_role = role_mapping.get(role, 'assistant')
# Ensure content is in string format (not complex objects)
if isinstance(content, (dict, list)):
content = str(content)
elif not isinstance(content, str):
content = str(content)
# Create properly formatted OpenAI message
openai_msg = {
'role': normalized_role,
'content': content
}
messages.append(openai_msg)
except Exception as e: # Prepare completion parameters for direct streaming
completion_params = {
"model": self.llm.model_name,
"messages": messages,
"stream": True,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
}
if self.autosave is True: # Add tools if available
log_agent_data(self.to_dict()) if self.tools_list_dictionary:
self.save() completion_params["tools"] = self.tools_list_dictionary
completion_params["tool_choice"] = "auto"
logger.error( # Add image if provided
f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " if img is not None:
# Add image to the last message
if messages and len(messages) > 0:
last_message = messages[-1]
if isinstance(last_message.get("content"), str):
last_message["content"] = [
{"type": "text", "text": last_message["content"]},
{"type": "image_url", "image_url": {"url": img}}
]
# Get raw stream using the underlying client
from litellm import completion
stream = completion(**completion_params)
full_text_response = ""
tool_calls_in_stream = []
current_tool_call = None
tool_call_buffer = ""
if self.print_on:
print(f"🤖 {self.agent_name}: ", end="", flush=True)
# Process streaming chunks in real-time
for chunk in stream:
# Debug: Log chunk processing if verbose
if self.verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
chunk_type = getattr(chunk, 'type', None)
# Anthropic-style stream parsing
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if self.print_on:
print(f"\n🔧 Tool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
# OpenAI-style stream parsing
elif hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and choice.delta:
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if self.print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
# Get tool call index
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# If this slot is empty and we have a function name, create new tool call
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if self.print_on:
print(f"\n🔧 Tool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments (whether it's a new call or continuation)
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if self.verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if self.verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
# Not complete yet, continue accumulating
pass
if self.print_on:
print() # Newline after streaming text
# After the stream is complete, check if any tools were called
if tool_calls_in_stream:
# Add the partial text response to memory
if full_text_response.strip():
self.short_memory.add(role=self.agent_name, content=full_text_response)
# Format tool calls for execution - only execute complete tool calls
formatted_tool_calls = []
for tc in tool_calls_in_stream:
# Only execute tool calls that have complete arguments
if tc.get("input") or tc.get("arguments_complete"):
# Use input if available, otherwise parse arguments
if "input" in tc:
args_to_use = tc["input"]
else:
try:
args_to_use = json.loads(tc.get("arguments", "{}"))
except json.JSONDecodeError:
logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}")
continue
formatted_tool_calls.append({
"type": "function",
"function": {"name": tc["name"], "arguments": json.dumps(args_to_use)},
"id": tc["id"]
})
# Execute all buffered tool calls if we have any complete ones
if formatted_tool_calls:
if self.verbose:
logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}")
tool_results = self.tool_struct.execute_function_calls_from_api_response(
{"choices": [{"message": {"tool_calls": formatted_tool_calls}}]}
) )
attempt += 1
if not success: if self.verbose:
logger.info(f"Tool execution results: {tool_results}")
else:
if self.verbose:
logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}")
tool_results = []
if self.autosave is True: # Add tool results to memory
log_agent_data(self.to_dict()) self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results))
self.save() if self.print_on:
formatter.print_panel(
f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}",
"Tool Execution Result"
)
logger.error( # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled
"Failed to generate a valid response after" if self.tool_call_summary:
" retry attempts." temp_llm = self.temp_llm_instance_for_tool_summary()
final_summary_response = temp_llm.run(
task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}"
) )
break # Exit the loop if all retry attempts fail response = self.parse_llm_output(final_summary_response)
self.short_memory.add(role=self.agent_name, content=response)
if self.print_on:
self.pretty_print(response, loop_count)
else:
response = f"Tool execution completed: {format_data_structure(tool_results)}"
else:
# If no tools were called, the streamed text is the final response
response = full_text_response
if response.strip():
self.short_memory.add(role=self.agent_name, content=response)
except Exception as e:
logger.error(f"Error during streaming execution: {e}")
import traceback
traceback.print_exc()
break
# Check stopping conditions # Check stopping conditions
if ( if (
@ -1191,10 +1400,7 @@ class Agent:
break break
if self.interactive: if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ") user_input = input("You: ")
# User-defined exit command
if ( if (
user_input.lower() user_input.lower()
== self.custom_exit_command.lower() == self.custom_exit_command.lower()
@ -1204,7 +1410,6 @@ class Agent:
loop_count=loop_count, loop_count=loop_count,
) )
break break
self.short_memory.add( self.short_memory.add(
role=self.user_name, content=user_input role=self.user_name, content=user_input
) )
@ -3003,7 +3208,6 @@ class Agent:
"This may indicate the LLM did not return a valid response." "This may indicate the LLM did not return a valid response."
) )
return return
try: try:
output = self.tool_struct.execute_function_calls_from_api_response( output = self.tool_struct.execute_function_calls_from_api_response(
response response
@ -3041,6 +3245,9 @@ class Agent:
Focus on the key information and insights that would be most relevant to the user's original request. Focus on the key information and insights that would be most relevant to the user's original request.
If there are any errors or issues, highlight them prominently. If there are any errors or issues, highlight them prominently.
The user's original request was:
{self.task}
Tool Output: Tool Output:
{output} {output}
""" """

@ -27,6 +27,12 @@ dashboard_live = None
# Create a spinner for loading animation # Create a spinner for loading animation
spinner = Spinner("dots", style="yellow") spinner = Spinner("dots", style="yellow")
# Global lock to ensure only a single Rich Live context is active at any moment.
# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same
# console raise runtime errors. Using a module-level lock serialises access and
# prevents crashes when multiple agents stream simultaneously in different
# threads (e.g., in ConcurrentWorkflow).
live_render_lock = threading.Lock()
def choose_random_color(): def choose_random_color():
import random import random
@ -308,6 +314,7 @@ class Formatter:
): ):
# Add ONLY the new chunk to the Text object with random color style # Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content chunk = part.choices[0].delta.content
streaming_text.append( streaming_text.append(
chunk, style=text_style chunk, style=text_style
) )

Loading…
Cancel
Save