diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 2d16f1fe..64eb1cfd 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1419,36 +1419,19 @@ class Agent: # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled if self.tool_call_summary: temp_llm = self.temp_llm_instance_for_tool_summary() - final_summary_response = temp_llm.run( - task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" - ) - # Handle streaming for final tool summary in real-time execution - if self.streaming_on and hasattr(final_summary_response, "__iter__") and not isinstance(final_summary_response, str): - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - if self.verbose: - logger.debug(f"Real-time tool summary streaming chunk: {chunk[:50]}...") - - # Use the streaming panel to display and collect the response - complete_response = formatter.print_streaming_panel( - final_summary_response, - title=f"🤖 Agent: {self.agent_name} - Tool Summary (Real-time)", - style="green", - collect_chunks=True, - on_chunk_callback=on_chunk_received, + # Use centralized streaming logic for real-time tool summary + if self.streaming_on: + final_summary_response = temp_llm.run_tool_summary_with_streaming( + tool_results=format_data_structure(tool_results), + agent_name=f"{self.agent_name} - Real-time", + print_on=self.print_on, + verbose=self.verbose, + ) + else: + final_summary_response = temp_llm.run( + task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" ) - - final_summary_response = complete_response - - elif self.streaming_on and isinstance(final_summary_response, str): - # If streaming is on but we got a string response, display it streamed - if self.print_on: - self.stream_response(final_summary_response, delay=0.01) response = self.parse_llm_output(final_summary_response) self.short_memory.add(role=self.agent_name, content=response) @@ -2467,11 +2450,11 @@ class Agent: self, response: str, delay: float = 0.001 ) -> None: """ - Streams the response token by token. + Streams the response token by token using centralized wrapper logic. Args: response (str): The response text to be streamed. - delay (float, optional): Delay in seconds between printing each token. Default is 0.1 seconds. + delay (float, optional): Delay in seconds between printing each token. Default is 0.001 seconds. Raises: ValueError: If the response is not provided. @@ -2479,18 +2462,26 @@ class Agent: Example: response = "This is a sample response from the API." - stream_response(response) + agent.stream_response(response) """ # Check for required inputs if not response: raise ValueError("Response is required.") try: - # Stream and print the response token by token - for token in response.split(): - print(token, end=" ", flush=True) - time.sleep(delay) - print() # Ensure a newline after streaming + # Use centralized string streaming from wrapper + if hasattr(self.llm, "handle_string_streaming"): + self.llm.handle_string_streaming( + response=response, + print_on=self.print_on, + delay=delay, + ) + else: + # Fallback to original implementation if wrapper doesn't support it + for token in response.split(): + print(token, end=" ", flush=True) + time.sleep(delay) + print() # Ensure a newline after streaming except Exception as e: print(f"An error occurred during streaming: {e}") @@ -2800,95 +2791,24 @@ class Agent: del kwargs["is_last"] try: - # Set streaming parameter in LLM if streaming is enabled - if self.streaming_on and hasattr(self.llm, "stream"): - original_stream = self.llm.stream - self.llm.stream = True - - if img is not None: - streaming_response = self.llm.run( - task=task, img=img, *args, **kwargs - ) - else: - streaming_response = self.llm.run( - task=task, *args, **kwargs - ) - - # If we get a streaming response, handle it with the new streaming panel - if hasattr( - streaming_response, "__iter__" - ) and not isinstance(streaming_response, str): - # Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration) - if streaming_callback is not None: - # Real-time callback streaming for dashboard integration - chunks = [] - for chunk in streaming_response: - if ( - hasattr(chunk, "choices") - and chunk.choices[0].delta.content - ): - content = chunk.choices[ - 0 - ].delta.content - chunks.append(content) - # Call the streaming callback with the new chunk - streaming_callback(content) - complete_response = "".join(chunks) - # Check print_on parameter for different streaming behaviors - elif self.print_on is False: - # Silent streaming - no printing, just collect chunks - chunks = [] - for chunk in streaming_response: - if ( - hasattr(chunk, "choices") - and chunk.choices[0].delta.content - ): - content = chunk.choices[ - 0 - ].delta.content - chunks.append(content) - complete_response = "".join(chunks) - else: - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - # Optional: Save each chunk to conversation in real-time - # This creates a more detailed conversation history - if self.verbose: - logger.debug( - f"Streaming chunk received: {chunk[:50]}..." - ) - - # Use the streaming panel to display and collect the response - complete_response = formatter.print_streaming_panel( - streaming_response, - title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}", - style=None, # Use random color like non-streaming approach - collect_chunks=True, - on_chunk_callback=on_chunk_received, - ) - - # Restore original stream setting - self.llm.stream = original_stream - - # Return the complete response for further processing - return complete_response - else: - # Restore original stream setting - self.llm.stream = original_stream - return streaming_response + # Use centralized streaming logic from wrapper if streaming is enabled + if self.streaming_on and hasattr(self.llm, "run_with_streaming"): + return self.llm.run_with_streaming( + task=task, + img=img, + streaming_callback=streaming_callback, + title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}", + print_on=self.print_on, + verbose=self.verbose, + *args, + **kwargs, + ) else: # Non-streaming call if img is not None: - out = self.llm.run( - task=task, img=img, *args, **kwargs - ) + out = self.llm.run(task=task, img=img, *args, **kwargs) else: out = self.llm.run(task=task, *args, **kwargs) - return out except AgentLLMError as e: @@ -3259,36 +3179,17 @@ class Agent: try: temp_llm = self.temp_llm_instance_for_tool_summary() - summary = temp_llm.run( - task=self.short_memory.get_str() - ) - - # Handle streaming MCP tool summary response - if self.streaming_on and hasattr(summary, "__iter__") and not isinstance(summary, str): - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - if self.verbose: - logger.debug(f"MCP tool summary streaming chunk received: {chunk[:50]}...") - - # Use the streaming panel to display and collect the response - complete_response = formatter.print_streaming_panel( - summary, - title=f"🤖 Agent: {self.agent_name} - MCP Tool Summary", + # Use centralized streaming logic for MCP tool summary + if self.streaming_on: + summary = temp_llm.run_with_streaming( + task=self.short_memory.get_str(), + title=f"🤖 Agent: {self.agent_name} - MCP Tool Summary", style="cyan", - collect_chunks=True, - on_chunk_callback=on_chunk_received, + print_on=self.print_on, + verbose=self.verbose, ) - - summary = complete_response - - elif self.streaming_on and isinstance(summary, str): - # If streaming is on but we got a string response, display it streamed - if self.print_on: - self.stream_response(summary, delay=0.01) + else: + summary = temp_llm.run(task=self.short_memory.get_str()) except Exception as e: logger.error( @@ -3310,6 +3211,7 @@ class Agent: raise e def temp_llm_instance_for_tool_summary(self): + from swarms.utils.litellm_wrapper import LiteLLM return LiteLLM( model_name=self.model_name, temperature=self.temperature, @@ -3373,34 +3275,16 @@ class Agent: {output} """ - tool_response = temp_llm.run(tool_summary_prompt) - - # Handle streaming tool response - if self.streaming_on and hasattr(tool_response, "__iter__") and not isinstance(tool_response, str): - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - if self.verbose: - logger.debug(f"Tool response streaming chunk received: {chunk[:50]}...") - - # Use the streaming panel to display and collect the response - complete_response = formatter.print_streaming_panel( - tool_response, - title=f"🤖 Agent: {self.agent_name} - Tool Summary", - style="blue", - collect_chunks=True, - on_chunk_callback=on_chunk_received, + # Use centralized streaming logic for tool summary + if self.streaming_on: + tool_response = temp_llm.run_tool_summary_with_streaming( + tool_results=str(output), + agent_name=self.agent_name, + print_on=self.print_on, + verbose=self.verbose, ) - - tool_response = complete_response - - elif self.streaming_on and isinstance(tool_response, str): - # If streaming is on but we got a string response, display it streamed - if self.print_on: - self.stream_response(tool_response, delay=0.01) + else: + tool_response = temp_llm.run(tool_summary_prompt) # Add the tool response to memory self.short_memory.add( diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index 86328135..abc60a61 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -1,5 +1,5 @@ import traceback -from typing import Optional +from typing import Optional, Callable import base64 import requests from pathlib import Path @@ -640,6 +640,185 @@ class LiteLLM: f"Model {self.model_name} does not support vision" ) + def _handle_streaming_response( + self, + streaming_response, + title: str = "🤖 LLM Response", + style: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + print_on: bool = True, + verbose: bool = False, + ) -> str: + """ + Centralized streaming response handler for all streaming scenarios. + + Args: + streaming_response: The streaming response object + title: Title for the streaming panel + style: Style for the panel (optional) + streaming_callback: Callback for real-time streaming + print_on: Whether to print the streaming output + verbose: Whether to enable verbose logging + + Returns: + str: The complete response string + """ + from swarms.utils.formatter import formatter + import json + from loguru import logger + + if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): + if streaming_callback is not None: + # Real-time callback streaming for dashboard integration + chunks = [] + for chunk in streaming_response: + if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + chunks.append(content) + streaming_callback(content) + return "".join(chunks) + elif not print_on: + # Silent streaming - no printing, just collect chunks + chunks = [] + for chunk in streaming_response: + if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + chunks.append(content) + return "".join(chunks) + else: + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if verbose: + logger.debug(f"Streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + streaming_response, + title=title, + style=style, + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + return complete_response + else: + # Non-streaming response or string response + return streaming_response + + def run_with_streaming( + self, + task: str, + img: Optional[str] = None, + audio: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + title: str = "🤖 LLM Response", + style: Optional[str] = None, + print_on: bool = True, + verbose: bool = False, + *args, + **kwargs, + ) -> str: + """ + Run LLM with centralized streaming handling. + + Args: + task: The task/prompt to send to the LLM + img: Optional image input + audio: Optional audio input + streaming_callback: Callback for real-time streaming + title: Title for streaming panel + style: Style for streaming panel + print_on: Whether to print streaming output + verbose: Whether to enable verbose logging + + Returns: + str: The complete response + """ + # Enable streaming if not already set + original_stream = self.stream + self.stream = True + + try: + # Call the LLM + if img is not None: + response = self.run(task=task, img=img, audio=audio, *args, **kwargs) + elif audio is not None: + response = self.run(task=task, audio=audio, *args, **kwargs) + else: + response = self.run(task=task, *args, **kwargs) + + # Handle the streaming response + return self._handle_streaming_response( + response, + title=title, + style=style, + streaming_callback=streaming_callback, + print_on=print_on, + verbose=verbose, + ) + finally: + # Restore original stream setting + self.stream = original_stream + + def run_tool_summary_with_streaming( + self, + tool_results: str, + agent_name: str = "Agent", + print_on: bool = True, + verbose: bool = False, + *args, + **kwargs, + ) -> str: + """ + Run tool summary with streaming support. + + Args: + tool_results: The tool execution results to summarize + agent_name: Name of the agent for the panel title + print_on: Whether to print streaming output + verbose: Whether to enable verbose logging + + Returns: + str: The complete summary response + """ + summary_task = f"Please analyze and summarize the following tool execution output:\n\n{tool_results}" + + return self.run_with_streaming( + task=summary_task, + title=f"🤖 Agent: {agent_name} - Tool Summary", + style="green", + print_on=print_on, + verbose=verbose, + *args, + **kwargs, + ) + + def handle_string_streaming( + self, + response: str, + print_on: bool = True, + delay: float = 0.01, + ) -> None: + """ + Handle streaming for string responses by simulating streaming output. + + Args: + response: The string response to stream + print_on: Whether to print the streaming output + delay: Delay between characters for streaming effect + """ + if print_on and response: + # Simple character-by-character streaming for string responses + for char in response: + print(char, end="", flush=True) + if delay > 0: + import time + time.sleep(delay) + print() # Newline at the end + def run( self, task: str, @@ -661,9 +840,7 @@ class LiteLLM: parameters with highest priority (overrides init kwargs). Returns: - str or generator: When streaming is disabled, returns the complete response content. - When streaming is enabled, returns a generator that yields content chunks. - Use collect_all_chunks() to get complete response from the generator. + str: The content of the response from the model. Raises: Exception: If there is an error in processing the request. @@ -738,7 +915,7 @@ class LiteLLM: # Handle streaming response if self.stream: - return response + return response # Return the streaming generator directly # Handle tool-based response elif self.tools_list_dictionary is not None: @@ -787,9 +964,7 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str or async generator: When streaming is disabled, returns the complete response content. - When streaming is enabled, returns an async generator that yields content chunks. - Use collect_all_chunks_async() to get complete response from the generator. + str: The content of the response from the model. """ try: messages = self._prepare_messages(task) @@ -838,10 +1013,6 @@ class LiteLLM: # Standard completion response = await acompletion(**completion_params) - # Handle streaming response for async - if self.stream: - return self._collect_streaming_response_async(response) - print(response) return response