diff --git a/stream_example.py b/stream_example.py new file mode 100644 index 00000000..a09a4260 --- /dev/null +++ b/stream_example.py @@ -0,0 +1,13 @@ +from swarms import Agent + +# Enable real-time streaming +agent = Agent( + agent_name="StoryAgent", + model_name="gpt-4o-mini", + streaming_on=True, # 🔥 This enables real streaming! + max_loops=1, + print_on=True, # By Default its False, raw streaming !! +) + +# This will now stream in real-time with beautiful UI! +response = agent.run("Tell me a detailed story...") \ No newline at end of file diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index bf8d1ab7..9c33ceea 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -286,6 +286,11 @@ class Agent: >>> response = agent.run("Generate a report on the financials.") >>> print(response) >>> # Generate a report on the financials. + + >>> # Real-time streaming example + >>> agent = Agent(llm=llm, max_loops=1, streaming_on=True) + >>> response = agent.run("Tell me a long story.") # Will stream in real-time + >>> print(response) # Final complete response """ @@ -2469,14 +2474,73 @@ class Agent: """ try: - if img is not None: - out = self.llm.run( - task=task, img=img, *args, **kwargs - ) + # Set streaming parameter in LLM if streaming is enabled + if self.streaming_on and hasattr(self.llm, 'stream'): + original_stream = self.llm.stream + self.llm.stream = True + + if img is not None: + streaming_response = self.llm.run( + task=task, img=img, *args, **kwargs + ) + else: + streaming_response = self.llm.run(task=task, *args, **kwargs) + + # If we get a streaming response, handle it with the new streaming panel + if hasattr(streaming_response, '__iter__') and not isinstance(streaming_response, str): + # Check print_on parameter for different streaming behaviors + if self.print_on is False: + # Show raw streaming text without formatting panels + chunks = [] + print(f"\n{self.agent_name}: ", end="", flush=True) + for chunk in streaming_response: + if hasattr(chunk, 'choices') and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + print(content, end="", flush=True) # Print raw streaming text + chunks.append(content) + print() # New line after streaming completes + complete_response = ''.join(chunks) + else: + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + # Optional: Save each chunk to conversation in real-time + # This creates a more detailed conversation history + if self.verbose: + logger.debug(f"Streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + streaming_response, + title=f"🤖 {self.agent_name} Streaming Response", + style="bold cyan", + collect_chunks=True, + on_chunk_callback=on_chunk_received + ) + + # Restore original stream setting + self.llm.stream = original_stream + + # Return the complete response for further processing + return complete_response + else: + # Restore original stream setting + self.llm.stream = original_stream + return streaming_response else: - out = self.llm.run(task=task, *args, **kwargs) + # Non-streaming call + if img is not None: + out = self.llm.run( + task=task, img=img, *args, **kwargs + ) + else: + out = self.llm.run(task=task, *args, **kwargs) + + return out - return out except AgentLLMError as e: logger.error( f"Error calling LLM: {e}. Task: {task}, Args: {args}, Kwargs: {kwargs}" @@ -2693,12 +2757,10 @@ class Agent: def pretty_print(self, response: str, loop_count: int): if self.print_on is False: if self.streaming_on is True: - # self.stream_response(response) - formatter.print_panel_token_by_token( - f"{self.agent_name}: {response}", - title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", - ) - elif self.print_on is True: + # Skip printing here since real streaming is handled in call_llm + # This avoids double printing when streaming_on=True + pass + elif self.no_print is True: pass else: # logger.info(f"Response: {response}") @@ -2861,7 +2923,7 @@ class Agent: temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=self.streaming_on, + stream=False, # Always disable streaming for tool summaries tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index 3f418647..0d608f6f 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -1,6 +1,6 @@ import threading import time -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Optional from rich.console import Console from rich.live import Live @@ -145,5 +145,88 @@ class Formatter: ) time.sleep(delay) + def print_streaming_panel( + self, + streaming_response, + title: str = "🤖 Agent Streaming Response", + style: str = "bold cyan", + collect_chunks: bool = False, + on_chunk_callback: Optional[Callable] = None, + ) -> str: + """ + Display real-time streaming response using Rich Live and Panel. + Similar to the approach used in litellm_stream.py. + + Args: + streaming_response: The streaming response generator from LiteLLM. + title (str): Title of the panel. + style (str): Style for the panel border. + collect_chunks (bool): Whether to collect individual chunks for conversation saving. + on_chunk_callback (Optional[Callable]): Callback function to call for each chunk. + + Returns: + str: The complete accumulated response text. + """ + def create_streaming_panel(text_obj, is_complete=False): + """Create panel with proper text wrapping using Rich's built-in capabilities""" + panel_title = f"[bold cyan]{title}[/bold cyan]" + if is_complete: + panel_title += " [bold green]✅[/bold green]" + + # Add blinking cursor if still streaming + display_text = Text.from_markup("") + display_text.append_text(text_obj) + if not is_complete: + display_text.append("▊", style="bold green blink") + + panel = Panel( + display_text, + title=panel_title, + border_style=style, + padding=(1, 2), + width=self.console.size.width, # Rich handles wrapping automatically + ) + return panel + + # Create a Text object for streaming content + streaming_text = Text() + complete_response = "" + chunks_collected = [] + + # TRUE streaming with Rich's automatic text wrapping + with Live( + create_streaming_panel(streaming_text), + console=self.console, + refresh_per_second=20 + ) as live: + try: + for part in streaming_response: + if hasattr(part, 'choices') and part.choices and part.choices[0].delta.content: + # Add ONLY the new chunk to the Text object + chunk = part.choices[0].delta.content + streaming_text.append(chunk, style="white") + complete_response += chunk + + # Collect chunks if requested + if collect_chunks: + chunks_collected.append(chunk) + + # Call chunk callback if provided + if on_chunk_callback: + on_chunk_callback(chunk) + + # Update display with new text - Rich handles all wrapping automatically + live.update(create_streaming_panel(streaming_text, is_complete=False)) + + # Final update to show completion + live.update(create_streaming_panel(streaming_text, is_complete=True)) + + except Exception as e: + # Handle any streaming errors gracefully + streaming_text.append(f"\n[Error: {str(e)}]", style="bold red") + live.update(create_streaming_panel(streaming_text, is_complete=True)) + + return complete_response + formatter = Formatter() diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index 6aa5c7d3..840ec073 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -449,8 +449,12 @@ class LiteLLM: # Make the completion call response = completion(**completion_params) + # Handle streaming response + if self.stream: + return response # Return the streaming generator directly + # Handle tool-based response - if self.tools_list_dictionary is not None: + elif self.tools_list_dictionary is not None: return self.output_for_tools(response) elif self.return_all is True: return response.model_dump()