diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 1b30644c..d25500fb 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -574,6 +574,16 @@ class Agent: self.summarize_multiple_images = summarize_multiple_images self.tool_retry_attempts = tool_retry_attempts + # Streaming / tool-call coordination flags + # When a tool call is expected we temporarily disable streaming so the + # LLM returns a complete JSON payload that can be parsed reliably. After + # the first tool call has been executed we re-enable streaming for + # subsequent requests / summaries. + self.expecting_tool_call: bool = False + self.tool_call_completed: bool = False + self.original_streaming_state: bool = self.streaming_on + self.should_stream_after_tools: bool = False + # self.short_memory = self.short_memory_init() # Initialize the feedback @@ -1057,6 +1067,14 @@ class Agent: self.short_memory.return_history_as_string() ) + # Determine if this request is primarily to obtain the first tool call + if self.streaming_on and exists(self.tools) and not self.tool_call_completed: + # Disable streaming for this request so we can reliably parse JSON + self.expecting_tool_call = True + self.should_stream_after_tools = True + else: + self.expecting_tool_call = False + # Parameters attempt = 0 success = False @@ -1118,10 +1136,16 @@ class Agent: # Check and execute callable tools if exists(self.tools): + # Use standard tool execution for both streaming and non-streaming self.tool_execution_retry( response, loop_count ) + # Mark that at least one tool call has been processed + self.tool_call_completed = True + # Reset expecting_tool_call so subsequent requests can stream + self.expecting_tool_call = False + # Handle MCP tools if ( exists(self.mcp_url) @@ -2533,8 +2557,11 @@ class Agent: del kwargs["is_last"] try: - # Set streaming parameter in LLM if streaming is enabled - if self.streaming_on and hasattr(self.llm, "stream"): + # Decide whether streaming should be used for this call + streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False) + + # Set streaming parameter in LLM if streaming is enabled for this call + if streaming_enabled and hasattr(self.llm, "stream"): original_stream = self.llm.stream self.llm.stream = True @@ -2547,7 +2574,7 @@ class Agent: task=task, *args, **kwargs ) - # If we get a streaming response, handle it with the new streaming panel + # If we get a streaming response, handle it with the streaming panel if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): @@ -2566,26 +2593,12 @@ class Agent: chunks.append(content) complete_response = "".join(chunks) else: - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - # Optional: Save each chunk to conversation in real-time - # This creates a more detailed conversation history - if self.verbose: - logger.debug( - f"Streaming chunk received: {chunk[:50]}..." - ) - - # Use the streaming panel to display and collect the response + # Use the streaming panel to display the response complete_response = formatter.print_streaming_panel( streaming_response, title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}", - style=None, # Use random color like non-streaming approach + style=None, collect_chunks=True, - on_chunk_callback=on_chunk_received, ) # Restore original stream setting @@ -2990,12 +3003,15 @@ class Agent: raise e def temp_llm_instance_for_tool_summary(self): + # Enable streaming for tool summary if original streaming was enabled and we should stream after tools + should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False) + return LiteLLM( model_name=self.model_name, temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=False, # Always disable streaming for tool summaries + stream=should_stream, # Enable streaming for tool summaries if conditions are met tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3042,8 +3058,7 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_response = temp_llm.run( - f""" + tool_prompt = f""" Please analyze and summarize the following tool execution output in a clear and concise way. Focus on the key information and insights that would be most relevant to the user's original request. If there are any errors or issues, highlight them prominently. @@ -3051,19 +3066,37 @@ class Agent: Tool Output: {output} """ - ) + + # Check if we should stream the tool summary + should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False) + + if should_stream and self.print_on: + # Handle streaming response with streaming panel + streaming_response = temp_llm.run(tool_prompt) + + if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): + # Use streaming panel directly + tool_response = formatter.print_streaming_panel( + streaming_response, + title=f"🤖 Agent: {self.agent_name} Tool Summary", + style=None, + collect_chunks=True, + ) + else: + # Fallback for non-streaming response + tool_response = streaming_response + self.pretty_print(tool_response, loop_count) + else: + # Non-streaming response + tool_response = temp_llm.run(tool_prompt) + if self.print_on: + self.pretty_print(tool_response, loop_count) self.short_memory.add( role=self.agent_name, content=tool_response, ) - if self.print_on is True: - self.pretty_print( - tool_response, - loop_count, - ) - def list_output_types(self): return OutputType