From ccddb17cccb4559db007399103a9e64742c00199 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Sat, 5 Jul 2025 01:45:35 +0530 Subject: [PATCH] refactor streaming response handling in Formatter class; implement lock for concurrent Live panel management and improve error handling --- swarms/structs/agent.py | 8 +-- swarms/utils/formatter.py | 100 +++++++++++++++++++++----------------- 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 081038ff..40eaa005 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1144,7 +1144,6 @@ class Agent: self.tool_call_completed = True # Reset expecting_tool_call so subsequent requests can stream self.expecting_tool_call = False - # Handle MCP tools if ( exists(self.mcp_url) @@ -2558,7 +2557,6 @@ class Agent: try: # Decide whether streaming should be used for this call streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False) - # Set streaming parameter in LLM if streaming is enabled for this call if streaming_enabled and hasattr(self.llm, "stream"): original_stream = self.llm.stream @@ -2573,7 +2571,7 @@ class Agent: task=task, *args, **kwargs ) - # If we get a streaming response, handle it with the streaming panel + # If we get a streaming response, handle it with the new streaming panel if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): @@ -2613,7 +2611,7 @@ class Agent: collect_chunks=True, on_chunk_callback=on_chunk_received, ) - + # Restore original stream setting self.llm.stream = original_stream @@ -3079,11 +3077,9 @@ class Agent: Focus on the key information and insights that would be most relevant to the user's original request. {self.run_task} If there are any errors or issues, highlight them prominently. - Tool Output: {output} """ - # Stream the tool summary only if the agent is configured for streaming if self.streaming_on and self.print_on: # Handle streaming response with streaming panel diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index 34aa5eb8..7954983e 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -9,6 +9,12 @@ from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from rich.text import Text +# Global lock to ensure only a single Rich Live context is active at any moment. +# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same +# console raise runtime errors. Using a module-level lock serialises access and +# prevents crashes when multiple agents stream simultaneously in different +# threads (e.g., in ConcurrentWorkflow). +live_render_lock = threading.Lock() def choose_random_color(): import random @@ -209,56 +215,60 @@ class Formatter: complete_response = "" chunks_collected = [] - # TRUE streaming with Rich's automatic text wrapping - with Live( - create_streaming_panel(streaming_text), - console=self.console, - refresh_per_second=20, - ) as live: - try: - for part in streaming_response: - if ( - hasattr(part, "choices") - and part.choices - and part.choices[0].delta.content - ): - # Add ONLY the new chunk to the Text object with random color style - chunk = part.choices[0].delta.content - streaming_text.append(chunk, style=text_style) - complete_response += chunk - - # Collect chunks if requested - if collect_chunks: - chunks_collected.append(chunk) - - # Call chunk callback if provided - if on_chunk_callback: - on_chunk_callback(chunk) - - # Update display with new text - Rich handles all wrapping automatically - live.update( - create_streaming_panel( - streaming_text, is_complete=False + # Acquire the lock so that only one Live panel is active at a time. + # Other threads will wait here until the current streaming completes, + # avoiding Rich.Live concurrency errors. + with live_render_lock: + # TRUE streaming with Rich's automatic text wrapping + with Live( + create_streaming_panel(streaming_text), + console=self.console, + refresh_per_second=20, + ) as live: + try: + for part in streaming_response: + if ( + hasattr(part, "choices") + and part.choices + and part.choices[0].delta.content + ): + # Add ONLY the new chunk to the Text object with random color style + chunk = part.choices[0].delta.content + streaming_text.append(chunk, style=text_style) + complete_response += chunk + + # Collect chunks if requested + if collect_chunks: + chunks_collected.append(chunk) + + # Call chunk callback if provided + if on_chunk_callback: + on_chunk_callback(chunk) + + # Update display with new text - Rich handles all wrapping automatically + live.update( + create_streaming_panel( + streaming_text, is_complete=False + ) ) - ) - # Final update to show completion - live.update( - create_streaming_panel( - streaming_text, is_complete=True + # Final update to show completion + live.update( + create_streaming_panel( + streaming_text, is_complete=True + ) ) - ) - except Exception as e: - # Handle any streaming errors gracefully - streaming_text.append( - f"\n[Error: {str(e)}]", style="bold red" - ) - live.update( - create_streaming_panel( - streaming_text, is_complete=True + except Exception as e: + # Handle any streaming errors gracefully + streaming_text.append( + f"\n[Error: {str(e)}]", style="bold red" + ) + live.update( + create_streaming_panel( + streaming_text, is_complete=True + ) ) - ) return complete_response