Merge pull request #927 from harshalmore31/feat/921-stream

feat: Implement real-time streaming with Rich UI for Agent responses
pull/925/merge
Kye Gomez 4 days ago committed by GitHub
commit a5f61de89c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,13 @@
from swarms import Agent
# Enable real-time streaming
agent = Agent(
agent_name="StoryAgent",
model_name="gpt-4o-mini",
streaming_on=True, # 🔥 This enables real streaming!
max_loops=1,
print_on=True, # By Default its False, raw streaming !!
)
# This will now stream in real-time with beautiful UI!
response = agent.run("Tell me a detailed story...")

@ -287,6 +287,11 @@ class Agent:
>>> print(response) >>> print(response)
>>> # Generate a report on the financials. >>> # Generate a report on the financials.
>>> # Real-time streaming example
>>> agent = Agent(llm=llm, max_loops=1, streaming_on=True)
>>> response = agent.run("Tell me a long story.") # Will stream in real-time
>>> print(response) # Final complete response
""" """
def __init__( def __init__(
@ -2469,14 +2474,73 @@ class Agent:
""" """
try: try:
if img is not None: # Set streaming parameter in LLM if streaming is enabled
out = self.llm.run( if self.streaming_on and hasattr(self.llm, 'stream'):
task=task, img=img, *args, **kwargs original_stream = self.llm.stream
) self.llm.stream = True
if img is not None:
streaming_response = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
streaming_response = self.llm.run(task=task, *args, **kwargs)
# If we get a streaming response, handle it with the new streaming panel
if hasattr(streaming_response, '__iter__') and not isinstance(streaming_response, str):
# Check print_on parameter for different streaming behaviors
if self.print_on is False:
# Show raw streaming text without formatting panels
chunks = []
print(f"\n{self.agent_name}: ", end="", flush=True)
for chunk in streaming_response:
if hasattr(chunk, 'choices') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True) # Print raw streaming text
chunks.append(content)
print() # New line after streaming completes
complete_response = ''.join(chunks)
else:
# Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
# Optional: Save each chunk to conversation in real-time
# This creates a more detailed conversation history
if self.verbose:
logger.debug(f"Streaming chunk received: {chunk[:50]}...")
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
streaming_response,
title=f"🤖 {self.agent_name} Streaming Response",
style="bold cyan",
collect_chunks=True,
on_chunk_callback=on_chunk_received
)
# Restore original stream setting
self.llm.stream = original_stream
# Return the complete response for further processing
return complete_response
else:
# Restore original stream setting
self.llm.stream = original_stream
return streaming_response
else: else:
out = self.llm.run(task=task, *args, **kwargs) # Non-streaming call
if img is not None:
out = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
out = self.llm.run(task=task, *args, **kwargs)
return out
return out
except AgentLLMError as e: except AgentLLMError as e:
logger.error( logger.error(
f"Error calling LLM: {e}. Task: {task}, Args: {args}, Kwargs: {kwargs}" f"Error calling LLM: {e}. Task: {task}, Args: {args}, Kwargs: {kwargs}"
@ -2693,12 +2757,10 @@ class Agent:
def pretty_print(self, response: str, loop_count: int): def pretty_print(self, response: str, loop_count: int):
if self.print_on is False: if self.print_on is False:
if self.streaming_on is True: if self.streaming_on is True:
# self.stream_response(response) # Skip printing here since real streaming is handled in call_llm
formatter.print_panel_token_by_token( # This avoids double printing when streaming_on=True
f"{self.agent_name}: {response}", pass
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", elif self.no_print is True:
)
elif self.print_on is True:
pass pass
else: else:
# logger.info(f"Response: {response}") # logger.info(f"Response: {response}")
@ -2861,7 +2923,7 @@ class Agent:
temperature=self.temperature, temperature=self.temperature,
max_tokens=self.max_tokens, max_tokens=self.max_tokens,
system_prompt=self.system_prompt, system_prompt=self.system_prompt,
stream=self.streaming_on, stream=False, # Always disable streaming for tool summaries
tools_list_dictionary=None, tools_list_dictionary=None,
parallel_tool_calls=False, parallel_tool_calls=False,
base_url=self.llm_base_url, base_url=self.llm_base_url,

@ -1,6 +1,6 @@
import threading import threading
import time import time
from typing import Any, Callable, Dict, List from typing import Any, Callable, Dict, List, Optional
from rich.console import Console from rich.console import Console
from rich.live import Live from rich.live import Live
@ -145,5 +145,88 @@ class Formatter:
) )
time.sleep(delay) time.sleep(delay)
def print_streaming_panel(
self,
streaming_response,
title: str = "🤖 Agent Streaming Response",
style: str = "bold cyan",
collect_chunks: bool = False,
on_chunk_callback: Optional[Callable] = None,
) -> str:
"""
Display real-time streaming response using Rich Live and Panel.
Similar to the approach used in litellm_stream.py.
Args:
streaming_response: The streaming response generator from LiteLLM.
title (str): Title of the panel.
style (str): Style for the panel border.
collect_chunks (bool): Whether to collect individual chunks for conversation saving.
on_chunk_callback (Optional[Callable]): Callback function to call for each chunk.
Returns:
str: The complete accumulated response text.
"""
def create_streaming_panel(text_obj, is_complete=False):
"""Create panel with proper text wrapping using Rich's built-in capabilities"""
panel_title = f"[bold cyan]{title}[/bold cyan]"
if is_complete:
panel_title += " [bold green]✅[/bold green]"
# Add blinking cursor if still streaming
display_text = Text.from_markup("")
display_text.append_text(text_obj)
if not is_complete:
display_text.append("", style="bold green blink")
panel = Panel(
display_text,
title=panel_title,
border_style=style,
padding=(1, 2),
width=self.console.size.width, # Rich handles wrapping automatically
)
return panel
# Create a Text object for streaming content
streaming_text = Text()
complete_response = ""
chunks_collected = []
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
console=self.console,
refresh_per_second=20
) as live:
try:
for part in streaming_response:
if hasattr(part, 'choices') and part.choices and part.choices[0].delta.content:
# Add ONLY the new chunk to the Text object
chunk = part.choices[0].delta.content
streaming_text.append(chunk, style="white")
complete_response += chunk
# Collect chunks if requested
if collect_chunks:
chunks_collected.append(chunk)
# Call chunk callback if provided
if on_chunk_callback:
on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically
live.update(create_streaming_panel(streaming_text, is_complete=False))
# Final update to show completion
live.update(create_streaming_panel(streaming_text, is_complete=True))
except Exception as e:
# Handle any streaming errors gracefully
streaming_text.append(f"\n[Error: {str(e)}]", style="bold red")
live.update(create_streaming_panel(streaming_text, is_complete=True))
return complete_response
formatter = Formatter() formatter = Formatter()

@ -449,8 +449,12 @@ class LiteLLM:
# Make the completion call # Make the completion call
response = completion(**completion_params) response = completion(**completion_params)
# Handle streaming response
if self.stream:
return response # Return the streaming generator directly
# Handle tool-based response # Handle tool-based response
if self.tools_list_dictionary is not None: elif self.tools_list_dictionary is not None:
return self.output_for_tools(response) return self.output_for_tools(response)
elif self.return_all is True: elif self.return_all is True:
return response.model_dump() return response.model_dump()

Loading…
Cancel
Save