diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 312b7e19..5b2a0e49 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1360,9 +1360,39 @@ class Agent: final_summary_response = temp_llm.run( task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" ) + + # Handle streaming for final tool summary in real-time execution + if self.streaming_on and hasattr(final_summary_response, "__iter__") and not isinstance(final_summary_response, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"Real-time tool summary streaming chunk: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + final_summary_response, + title=f"🤖 Agent: {self.agent_name} - Tool Summary (Real-time)", + style="green", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + final_summary_response = complete_response + + elif self.streaming_on and isinstance(final_summary_response, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(final_summary_response, delay=0.01) + response = self.parse_llm_output(final_summary_response) self.short_memory.add(role=self.agent_name, content=response) - if self.print_on: + + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print(response, loop_count) else: response = f"Tool execution completed: {format_data_structure(tool_results)}" @@ -3169,6 +3199,34 @@ class Agent: summary = temp_llm.run( task=self.short_memory.get_str() ) + + # Handle streaming MCP tool summary response + if self.streaming_on and hasattr(summary, "__iter__") and not isinstance(summary, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"MCP tool summary streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + summary, + title=f"🤖 Agent: {self.agent_name} - MCP Tool Summary", + style="cyan", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + summary = complete_response + + elif self.streaming_on and isinstance(summary, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(summary, delay=0.01) + except Exception as e: logger.error( f"Error calling LLM after MCP tool execution: {e}" @@ -3176,7 +3234,8 @@ class Agent: # Fallback: provide a default summary summary = "I successfully executed the MCP tool and retrieved the information above." - if self.print_on is True: + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print(summary, loop_count=current_loop) # Add to the memory @@ -3193,7 +3252,7 @@ class Agent: temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=False, # Always disable streaming for tool summaries + stream=self.streaming_on, tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3239,26 +3298,55 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_response = temp_llm.run( - f""" - Please analyze and summarize the following tool execution output in a clear and concise way. - Focus on the key information and insights that would be most relevant to the user's original request. - If there are any errors or issues, highlight them prominently. + tool_summary_prompt = f""" + Please analyze and summarize the following tool execution output in a clear and concise way. + Focus on the key information and insights that would be most relevant to the user's original request. + If there are any errors or issues, highlight them prominently. + + The user's original request was: + {self.task} + + Tool Output: + {output} + """ + + tool_response = temp_llm.run(tool_summary_prompt) + + # Handle streaming tool response + if self.streaming_on and hasattr(tool_response, "__iter__") and not isinstance(tool_response, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"Tool response streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + tool_response, + title=f"🤖 Agent: {self.agent_name} - Tool Summary", + style="blue", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) - The user's original request was: - {self.task} + tool_response = complete_response - Tool Output: - {output} - """ - ) - + elif self.streaming_on and isinstance(tool_response, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(tool_response, delay=0.01) + + # Add the tool response to memory self.short_memory.add( role=self.agent_name, content=tool_response, ) - if self.print_on is True: + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print( tool_response, loop_count, diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index aaa3f71e..e7a41671 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -153,6 +153,134 @@ class LiteLLM: litellm.drop_params = True + def _collect_streaming_response(self, streaming_response): + """ + Parse and yield individual content chunks from a streaming response. + + Args: + streaming_response: The streaming response object from litellm + + Yields: + str: Individual content chunks as they arrive + """ + try: + for chunk in streaming_response: + content = None + + # Handle different chunk formats + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + + # OpenAI-style chunks + if hasattr(choice, 'delta') and choice.delta: + if hasattr(choice.delta, 'content') and choice.delta.content: + content = choice.delta.content + + # Alternative chunk format + elif hasattr(choice, 'message') and choice.message: + if hasattr(choice.message, 'content') and choice.message.content: + content = choice.message.content + + # Anthropic-style chunks + elif hasattr(chunk, 'type'): + if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'text_delta': + content = chunk.delta.text + + # Handle direct content chunks + elif hasattr(chunk, 'content'): + content = chunk.content + + # Yield content chunk if we found any + if content: + yield content + + except Exception as e: + logger.error(f"Error parsing streaming chunks: {e}") + return + + async def _collect_streaming_response_async(self, streaming_response): + """ + Parse and yield individual content chunks from an async streaming response. + + Args: + streaming_response: The async streaming response object from litellm + + Yields: + str: Individual content chunks as they arrive + """ + try: + async for chunk in streaming_response: + content = None + + # Handle different chunk formats + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + + # OpenAI-style chunks + if hasattr(choice, 'delta') and choice.delta: + if hasattr(choice.delta, 'content') and choice.delta.content: + content = choice.delta.content + + # Alternative chunk format + elif hasattr(choice, 'message') and choice.message: + if hasattr(choice.message, 'content') and choice.message.content: + content = choice.message.content + + # Anthropic-style chunks + elif hasattr(chunk, 'type'): + if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'text_delta': + content = chunk.delta.text + + # Handle direct content chunks + elif hasattr(chunk, 'content'): + content = chunk.content + + # Yield content chunk if we found any + if content: + yield content + + except Exception as e: + logger.error(f"Error parsing async streaming chunks: {e}") + return + + def collect_all_chunks(self, streaming_response): + """ + Helper method to collect all chunks from a streaming response into a complete text. + This provides backward compatibility for code that expects a complete response. + + Args: + streaming_response: The streaming response object from litellm + + Returns: + str: The complete response text collected from all chunks + """ + chunks = [] + for chunk in self._collect_streaming_response(streaming_response): + chunks.append(chunk) + complete_response = "".join(chunks) + logger.info(f"Collected complete streaming response: {len(complete_response)} characters") + return complete_response + + async def collect_all_chunks_async(self, streaming_response): + """ + Helper method to collect all chunks from an async streaming response into a complete text. + This provides backward compatibility for code that expects a complete response. + + Args: + streaming_response: The async streaming response object from litellm + + Returns: + str: The complete response text collected from all chunks + """ + chunks = [] + async for chunk in self._collect_streaming_response_async(streaming_response): + chunks.append(chunk) + complete_response = "".join(chunks) + logger.info(f"Collected complete async streaming response: {len(complete_response)} characters") + return complete_response + def output_for_tools(self, response: any): if self.mcp_call is True: out = response.choices[0].message.tool_calls[0].function @@ -471,7 +599,9 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str: The content of the response from the model. + str or generator: When streaming is disabled, returns the complete response content. + When streaming is enabled, returns a generator that yields content chunks. + Use collect_all_chunks() to get complete response from the generator. Raises: Exception: If there is an error in processing the request. @@ -525,7 +655,7 @@ class LiteLLM: # Handle streaming response if self.stream: - return response # Return the streaming generator directly + return response # Handle tool-based response elif self.tools_list_dictionary is not None: @@ -574,7 +704,9 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str: The content of the response from the model. + str or async generator: When streaming is disabled, returns the complete response content. + When streaming is enabled, returns an async generator that yields content chunks. + Use collect_all_chunks_async() to get complete response from the generator. """ try: messages = self._prepare_messages(task) @@ -608,6 +740,10 @@ class LiteLLM: # Standard completion response = await acompletion(**completion_params) + # Handle streaming response for async + if self.stream: + return self._collect_streaming_response_async(response) + print(response) return response