docs agent streaming with formatting name fix

pull/925/merge
Kye Gomez 4 days ago
parent 77bdaac350
commit 325199f4c2

@ -0,0 +1,62 @@
# Agent with Streaming
The Swarms framework provides powerful real-time streaming capabilities for agents, allowing you to see responses being generated token by token as they're produced by the language model. This creates a more engaging and interactive experience, especially useful for long-form content generation, debugging, or when you want to provide immediate feedback to users.
## Installation
Install the swarms package using pip:
```bash
pip install -U swarms
```
## Basic Setup
1. First, set up your environment variables:
```python
WORKSPACE_DIR="agent_workspace"
OPENAI_API_KEY=""
```
## Step by Step
- Install and put your keys in `.env`
- Turn on streaming in `Agent()` with `streaming_on=True`
- Optional: If you want to pretty print it, you can do `print_on=True`; if not, it will print normally
## Code
```python
from swarms import Agent
# Enable real-time streaming
agent = Agent(
agent_name="StoryAgent",
model_name="gpt-4o-mini",
streaming_on=True, # 🔥 This enables real streaming!
max_loops=1,
print_on=True, # By default, it's False for raw streaming!
)
# This will now stream in real-time with a beautiful UI!
response = agent.run("Tell me a detailed story about humanity colonizing the stars")
print(response)
```
## Connect With Us
If you'd like technical support, join our Discord below and stay updated on our Twitter for new updates!
| Platform | Link | Description |
|----------|------|-------------|
| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides |
| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles |
| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support |
| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements |
| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates |
| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos |
| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events |

@ -311,6 +311,7 @@ nav:
# - PreBuilt Templates: "examples/templates_index.md" # - PreBuilt Templates: "examples/templates_index.md"
- Customizing Agents: - Customizing Agents:
- Basic Agent: "swarms/examples/basic_agent.md" - Basic Agent: "swarms/examples/basic_agent.md"
- Agent with Streaming: "examples/agent_stream.md"
- Agents with Callable Tools: "swarms/examples/agent_with_tools.md" - Agents with Callable Tools: "swarms/examples/agent_with_tools.md"
# - Agent With MCP Integration: "swarms/examples/agent_with_mcp.md" # - Agent With MCP Integration: "swarms/examples/agent_with_mcp.md"
- Agent Output Types: "swarms/examples/agent_output_types.md" - Agent Output Types: "swarms/examples/agent_output_types.md"

@ -3,11 +3,12 @@ from swarms import Agent
# Enable real-time streaming # Enable real-time streaming
agent = Agent( agent = Agent(
agent_name="StoryAgent", agent_name="StoryAgent",
model_name="gpt-4o-mini", model_name="gpt-4o-mini",
streaming_on=True, # 🔥 This enables real streaming! streaming_on=True, # 🔥 This enables real streaming!
max_loops=1, max_loops=1,
print_on=True, # By Default its False, raw streaming !! print_on=True, # By Default its False, raw streaming !!
) )
# This will now stream in real-time with beautiful UI! # This will now stream in real-time with beautiful UI!
response = agent.run("Tell me a detailed story...") response = agent.run("Tell me a detailed story about Humanity colonizing the stars")
print(response)

@ -1,81 +0,0 @@
import json
from swarms import Agent, SwarmRouter
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = SwarmRouter(
agents=[
risk_metrics_agent,
portfolio_risk_agent,
],
max_loops=1,
swarm_type="MixtureOfAgents",
output_type="final",
)
# swarm.run(
# "Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
# )
print(f"Swarm config: {json.dumps(swarm.to_dict(), indent=4)}")

@ -286,7 +286,7 @@ class Agent:
>>> response = agent.run("Generate a report on the financials.") >>> response = agent.run("Generate a report on the financials.")
>>> print(response) >>> print(response)
>>> # Generate a report on the financials. >>> # Generate a report on the financials.
>>> # Real-time streaming example >>> # Real-time streaming example
>>> agent = Agent(llm=llm, max_loops=1, streaming_on=True) >>> agent = Agent(llm=llm, max_loops=1, streaming_on=True)
>>> response = agent.run("Tell me a long story.") # Will stream in real-time >>> response = agent.run("Tell me a long story.") # Will stream in real-time
@ -1061,12 +1061,16 @@ class Agent:
response = self.call_llm( response = self.call_llm(
task=task_prompt, task=task_prompt,
img=img, img=img,
current_loop=loop_count,
*args, *args,
**kwargs, **kwargs,
) )
else: else:
response = self.call_llm( response = self.call_llm(
task=task_prompt, *args, **kwargs task=task_prompt,
current_loop=loop_count,
*args,
**kwargs,
) )
# Parse the response from the agent with the output type # Parse the response from the agent with the output type
@ -2463,7 +2467,12 @@ class Agent:
return None return None
def call_llm( def call_llm(
self, task: str, img: Optional[str] = None, *args, **kwargs self,
task: str,
img: Optional[str] = None,
current_loop: int = 0,
*args,
**kwargs,
) -> str: ) -> str:
""" """
Calls the appropriate method on the `llm` object based on the given task. Calls the appropriate method on the `llm` object based on the given task.
@ -2486,55 +2495,72 @@ class Agent:
try: try:
# Set streaming parameter in LLM if streaming is enabled # Set streaming parameter in LLM if streaming is enabled
if self.streaming_on and hasattr(self.llm, 'stream'): if self.streaming_on and hasattr(self.llm, "stream"):
original_stream = self.llm.stream original_stream = self.llm.stream
self.llm.stream = True self.llm.stream = True
if img is not None: if img is not None:
streaming_response = self.llm.run( streaming_response = self.llm.run(
task=task, img=img, *args, **kwargs task=task, img=img, *args, **kwargs
) )
else: else:
streaming_response = self.llm.run(task=task, *args, **kwargs) streaming_response = self.llm.run(
task=task, *args, **kwargs
)
# If we get a streaming response, handle it with the new streaming panel # If we get a streaming response, handle it with the new streaming panel
if hasattr(streaming_response, '__iter__') and not isinstance(streaming_response, str): if hasattr(
streaming_response, "__iter__"
) and not isinstance(streaming_response, str):
# Check print_on parameter for different streaming behaviors # Check print_on parameter for different streaming behaviors
if self.print_on is False: if self.print_on is False:
# Show raw streaming text without formatting panels # Show raw streaming text without formatting panels
chunks = [] chunks = []
print(f"\n{self.agent_name}: ", end="", flush=True) print(
f"\n{self.agent_name}: ",
end="",
flush=True,
)
for chunk in streaming_response: for chunk in streaming_response:
if hasattr(chunk, 'choices') and chunk.choices[0].delta.content: if (
content = chunk.choices[0].delta.content hasattr(chunk, "choices")
print(content, end="", flush=True) # Print raw streaming text and chunk.choices[0].delta.content
):
content = chunk.choices[
0
].delta.content
print(
content, end="", flush=True
) # Print raw streaming text
chunks.append(content) chunks.append(content)
print() # New line after streaming completes print() # New line after streaming completes
complete_response = ''.join(chunks) complete_response = "".join(chunks)
else: else:
# Collect chunks for conversation saving # Collect chunks for conversation saving
collected_chunks = [] collected_chunks = []
def on_chunk_received(chunk: str): def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive""" """Callback to collect chunks as they arrive"""
collected_chunks.append(chunk) collected_chunks.append(chunk)
# Optional: Save each chunk to conversation in real-time # Optional: Save each chunk to conversation in real-time
# This creates a more detailed conversation history # This creates a more detailed conversation history
if self.verbose: if self.verbose:
logger.debug(f"Streaming chunk received: {chunk[:50]}...") logger.debug(
f"Streaming chunk received: {chunk[:50]}..."
)
# Use the streaming panel to display and collect the response # Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel( complete_response = formatter.print_streaming_panel(
streaming_response, streaming_response,
title=f"🤖 {self.agent_name} Streaming Response", title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}",
style="bold cyan", style="bold cyan",
collect_chunks=True, collect_chunks=True,
on_chunk_callback=on_chunk_received on_chunk_callback=on_chunk_received,
) )
# Restore original stream setting # Restore original stream setting
self.llm.stream = original_stream self.llm.stream = original_stream
# Return the complete response for further processing # Return the complete response for further processing
return complete_response return complete_response
else: else:

@ -10,6 +10,23 @@ from rich.table import Table
from rich.text import Text from rich.text import Text
def choose_random_color():
import random
colors = [
"red",
"green",
"blue",
"yellow",
"magenta",
"cyan",
"white",
]
random_color = random.choice(colors)
return random_color
class Formatter: class Formatter:
""" """
A class for formatting and printing rich text to the console. A class for formatting and printing rich text to the console.
@ -32,18 +49,8 @@ class Formatter:
title (str, optional): The title of the panel. Defaults to "". title (str, optional): The title of the panel. Defaults to "".
style (str, optional): The style of the panel. Defaults to "bold blue". style (str, optional): The style of the panel. Defaults to "bold blue".
""" """
import random random_color = choose_random_color()
colors = [
"red",
"green",
"blue",
"yellow",
"magenta",
"cyan",
"white",
]
random_color = random.choice(colors)
panel = Panel( panel = Panel(
content, title=title, style=f"bold {random_color}" content, title=title, style=f"bold {random_color}"
) )
@ -149,7 +156,7 @@ class Formatter:
self, self,
streaming_response, streaming_response,
title: str = "🤖 Agent Streaming Response", title: str = "🤖 Agent Streaming Response",
style: str = "bold cyan", style: str = choose_random_color(),
collect_chunks: bool = False, collect_chunks: bool = False,
on_chunk_callback: Optional[Callable] = None, on_chunk_callback: Optional[Callable] = None,
) -> str: ) -> str:
@ -167,18 +174,19 @@ class Formatter:
Returns: Returns:
str: The complete accumulated response text. str: The complete accumulated response text.
""" """
def create_streaming_panel(text_obj, is_complete=False): def create_streaming_panel(text_obj, is_complete=False):
"""Create panel with proper text wrapping using Rich's built-in capabilities""" """Create panel with proper text wrapping using Rich's built-in capabilities"""
panel_title = f"[bold cyan]{title}[/bold cyan]" panel_title = f"[bold cyan]{title}[/bold cyan]"
if is_complete: if is_complete:
panel_title += " [bold green]✅[/bold green]" panel_title += " [bold green]✅[/bold green]"
# Add blinking cursor if still streaming # Add blinking cursor if still streaming
display_text = Text.from_markup("") display_text = Text.from_markup("")
display_text.append_text(text_obj) display_text.append_text(text_obj)
if not is_complete: if not is_complete:
display_text.append("", style="bold green blink") display_text.append("", style="bold green blink")
panel = Panel( panel = Panel(
display_text, display_text,
title=panel_title, title=panel_title,
@ -195,36 +203,54 @@ class Formatter:
# TRUE streaming with Rich's automatic text wrapping # TRUE streaming with Rich's automatic text wrapping
with Live( with Live(
create_streaming_panel(streaming_text), create_streaming_panel(streaming_text),
console=self.console, console=self.console,
refresh_per_second=20 refresh_per_second=20,
) as live: ) as live:
try: try:
for part in streaming_response: for part in streaming_response:
if hasattr(part, 'choices') and part.choices and part.choices[0].delta.content: if (
hasattr(part, "choices")
and part.choices
and part.choices[0].delta.content
):
# Add ONLY the new chunk to the Text object # Add ONLY the new chunk to the Text object
chunk = part.choices[0].delta.content chunk = part.choices[0].delta.content
streaming_text.append(chunk, style="white") streaming_text.append(chunk, style="white")
complete_response += chunk complete_response += chunk
# Collect chunks if requested # Collect chunks if requested
if collect_chunks: if collect_chunks:
chunks_collected.append(chunk) chunks_collected.append(chunk)
# Call chunk callback if provided # Call chunk callback if provided
if on_chunk_callback: if on_chunk_callback:
on_chunk_callback(chunk) on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically # Update display with new text - Rich handles all wrapping automatically
live.update(create_streaming_panel(streaming_text, is_complete=False)) live.update(
create_streaming_panel(
streaming_text, is_complete=False
)
)
# Final update to show completion # Final update to show completion
live.update(create_streaming_panel(streaming_text, is_complete=True)) live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
except Exception as e: except Exception as e:
# Handle any streaming errors gracefully # Handle any streaming errors gracefully
streaming_text.append(f"\n[Error: {str(e)}]", style="bold red") streaming_text.append(
live.update(create_streaming_panel(streaming_text, is_complete=True)) f"\n[Error: {str(e)}]", style="bold red"
)
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
return complete_response return complete_response

@ -452,7 +452,7 @@ class LiteLLM:
# Handle streaming response # Handle streaming response
if self.stream: if self.stream:
return response # Return the streaming generator directly return response # Return the streaming generator directly
# Handle tool-based response # Handle tool-based response
elif self.tools_list_dictionary is not None: elif self.tools_list_dictionary is not None:
return self.output_for_tools(response) return self.output_for_tools(response)

Loading…
Cancel
Save