Add real-time streaming support and example usage for Agent class

pull/927/head
harshalmore31 5 days ago
parent d8958481cf
commit 2f5e7bdca7

@ -0,0 +1,12 @@
from swarms import Agent
# Enable real-time streaming
agent = Agent(
agent_name="StoryAgent",
model_name="gpt-4o-mini",
streaming_on=True, # 🔥 This enables real streaming!
max_loops=1,
)
# This will now stream in real-time with beautiful UI!
response = agent.run("Tell me a detailed story...")

@ -286,6 +286,11 @@ class Agent:
>>> response = agent.run("Generate a report on the financials.")
>>> print(response)
>>> # Generate a report on the financials.
>>> # Real-time streaming example
>>> agent = Agent(llm=llm, max_loops=1, streaming_on=True)
>>> response = agent.run("Tell me a long story.") # Will stream in real-time
>>> print(response) # Final complete response
"""
@ -2469,14 +2474,45 @@ class Agent:
"""
try:
if img is not None:
out = self.llm.run(
task=task, img=img, *args, **kwargs
)
# Set streaming parameter in LLM if streaming is enabled
if self.streaming_on and hasattr(self.llm, 'stream'):
original_stream = self.llm.stream
self.llm.stream = True
if img is not None:
streaming_response = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
streaming_response = self.llm.run(task=task, *args, **kwargs)
# If we get a streaming response, handle it with the new streaming panel
if hasattr(streaming_response, '__iter__') and not isinstance(streaming_response, str):
# Use the new streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
streaming_response,
title=f"🤖 {self.agent_name} Streaming Response",
style="bold cyan"
)
# Restore original stream setting
self.llm.stream = original_stream
return complete_response
else:
# Restore original stream setting
self.llm.stream = original_stream
return streaming_response
else:
out = self.llm.run(task=task, *args, **kwargs)
# Non-streaming call
if img is not None:
out = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
out = self.llm.run(task=task, *args, **kwargs)
return out
return out
except AgentLLMError as e:
logger.error(
f"Error calling LLM: {e}. Task: {task}, Args: {args}, Kwargs: {kwargs}"
@ -2861,7 +2897,7 @@ class Agent:
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
stream=False, # Always disable streaming for tool summaries
tools_list_dictionary=None,
parallel_tool_calls=False,
base_url=self.llm_base_url,

@ -145,5 +145,75 @@ class Formatter:
)
time.sleep(delay)
def print_streaming_panel(
self,
streaming_response,
title: str = "🤖 Agent Streaming Response",
style: str = "bold cyan",
) -> str:
"""
Display real-time streaming response using Rich Live and Panel.
Similar to the approach used in litellm_stream.py.
Args:
streaming_response: The streaming response generator from LiteLLM.
title (str): Title of the panel.
style (str): Style for the panel border.
Returns:
str: The complete accumulated response text.
"""
def create_streaming_panel(text_obj, is_complete=False):
"""Create panel with proper text wrapping using Rich's built-in capabilities"""
panel_title = f"[bold cyan]{title}[/bold cyan]"
if is_complete:
panel_title += " [bold green]✅[/bold green]"
# Add blinking cursor if still streaming
display_text = Text.from_markup("")
display_text.append_text(text_obj)
if not is_complete:
display_text.append("", style="bold green blink")
panel = Panel(
display_text,
title=panel_title,
border_style=style,
padding=(1, 2),
width=self.console.size.width, # Rich handles wrapping automatically
)
return panel
# Create a Text object for streaming content
streaming_text = Text()
complete_response = ""
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
console=self.console,
refresh_per_second=20
) as live:
try:
for part in streaming_response:
if hasattr(part, 'choices') and part.choices and part.choices[0].delta.content:
# Add ONLY the new chunk to the Text object
chunk = part.choices[0].delta.content
streaming_text.append(chunk, style="white")
complete_response += chunk
# Update display with new text - Rich handles all wrapping automatically
live.update(create_streaming_panel(streaming_text, is_complete=False))
# Final update to show completion
live.update(create_streaming_panel(streaming_text, is_complete=True))
except Exception as e:
# Handle any streaming errors gracefully
streaming_text.append(f"\n[Error: {str(e)}]", style="bold red")
live.update(create_streaming_panel(streaming_text, is_complete=True))
return complete_response
formatter = Formatter()

@ -449,8 +449,12 @@ class LiteLLM:
# Make the completion call
response = completion(**completion_params)
# Handle streaming response
if self.stream:
return response # Return the streaming generator directly
# Handle tool-based response
if self.tools_list_dictionary is not None:
elif self.tools_list_dictionary is not None:
return self.output_for_tools(response)
elif self.return_all is True:
return response.model_dump()

Loading…
Cancel
Save