refactor streaming response handling in Formatter class; implement lock for concurrent Live panel management and improve error handling

pull/938/head
harshalmore31 3 weeks ago
parent 638d81c338
commit ccddb17ccc

@ -1144,7 +1144,6 @@ class Agent:
self.tool_call_completed = True self.tool_call_completed = True
# Reset expecting_tool_call so subsequent requests can stream # Reset expecting_tool_call so subsequent requests can stream
self.expecting_tool_call = False self.expecting_tool_call = False
# Handle MCP tools # Handle MCP tools
if ( if (
exists(self.mcp_url) exists(self.mcp_url)
@ -2558,7 +2557,6 @@ class Agent:
try: try:
# Decide whether streaming should be used for this call # Decide whether streaming should be used for this call
streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False) streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False)
# Set streaming parameter in LLM if streaming is enabled for this call # Set streaming parameter in LLM if streaming is enabled for this call
if streaming_enabled and hasattr(self.llm, "stream"): if streaming_enabled and hasattr(self.llm, "stream"):
original_stream = self.llm.stream original_stream = self.llm.stream
@ -2573,7 +2571,7 @@ class Agent:
task=task, *args, **kwargs task=task, *args, **kwargs
) )
# If we get a streaming response, handle it with the streaming panel # If we get a streaming response, handle it with the new streaming panel
if hasattr( if hasattr(
streaming_response, "__iter__" streaming_response, "__iter__"
) and not isinstance(streaming_response, str): ) and not isinstance(streaming_response, str):
@ -3079,11 +3077,9 @@ class Agent:
Focus on the key information and insights that would be most relevant to the user's original request. Focus on the key information and insights that would be most relevant to the user's original request.
{self.run_task} {self.run_task}
If there are any errors or issues, highlight them prominently. If there are any errors or issues, highlight them prominently.
Tool Output: Tool Output:
{output} {output}
""" """
# Stream the tool summary only if the agent is configured for streaming # Stream the tool summary only if the agent is configured for streaming
if self.streaming_on and self.print_on: if self.streaming_on and self.print_on:
# Handle streaming response with streaming panel # Handle streaming response with streaming panel

@ -9,6 +9,12 @@ from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
# Global lock to ensure only a single Rich Live context is active at any moment.
# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same
# console raise runtime errors. Using a module-level lock serialises access and
# prevents crashes when multiple agents stream simultaneously in different
# threads (e.g., in ConcurrentWorkflow).
live_render_lock = threading.Lock()
def choose_random_color(): def choose_random_color():
import random import random
@ -209,56 +215,60 @@ class Formatter:
complete_response = "" complete_response = ""
chunks_collected = [] chunks_collected = []
# TRUE streaming with Rich's automatic text wrapping # Acquire the lock so that only one Live panel is active at a time.
with Live( # Other threads will wait here until the current streaming completes,
create_streaming_panel(streaming_text), # avoiding Rich.Live concurrency errors.
console=self.console, with live_render_lock:
refresh_per_second=20, # TRUE streaming with Rich's automatic text wrapping
) as live: with Live(
try: create_streaming_panel(streaming_text),
for part in streaming_response: console=self.console,
if ( refresh_per_second=20,
hasattr(part, "choices") ) as live:
and part.choices try:
and part.choices[0].delta.content for part in streaming_response:
): if (
# Add ONLY the new chunk to the Text object with random color style hasattr(part, "choices")
chunk = part.choices[0].delta.content and part.choices
streaming_text.append(chunk, style=text_style) and part.choices[0].delta.content
complete_response += chunk ):
# Add ONLY the new chunk to the Text object with random color style
# Collect chunks if requested chunk = part.choices[0].delta.content
if collect_chunks: streaming_text.append(chunk, style=text_style)
chunks_collected.append(chunk) complete_response += chunk
# Call chunk callback if provided # Collect chunks if requested
if on_chunk_callback: if collect_chunks:
on_chunk_callback(chunk) chunks_collected.append(chunk)
# Update display with new text - Rich handles all wrapping automatically # Call chunk callback if provided
live.update( if on_chunk_callback:
create_streaming_panel( on_chunk_callback(chunk)
streaming_text, is_complete=False
# Update display with new text - Rich handles all wrapping automatically
live.update(
create_streaming_panel(
streaming_text, is_complete=False
)
) )
)
# Final update to show completion # Final update to show completion
live.update( live.update(
create_streaming_panel( create_streaming_panel(
streaming_text, is_complete=True streaming_text, is_complete=True
)
) )
)
except Exception as e: except Exception as e:
# Handle any streaming errors gracefully # Handle any streaming errors gracefully
streaming_text.append( streaming_text.append(
f"\n[Error: {str(e)}]", style="bold red" f"\n[Error: {str(e)}]", style="bold red"
) )
live.update( live.update(
create_streaming_panel( create_streaming_panel(
streaming_text, is_complete=True streaming_text, is_complete=True
)
) )
)
return complete_response return complete_response

Loading…
Cancel
Save