From df83d9d144e5460d71a2ff5665d68cbee4fe2761 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Wed, 13 Aug 2025 09:25:21 +0530 Subject: [PATCH] centralized streaming ! --- swarms/structs/agent.py | 71 +++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index a9f5b434..3a7077ae 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1154,34 +1154,23 @@ class Agent: # Handle streaming response with tools if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str): - # Parse streaming chunks with tools using wrapper if hasattr(self.llm, 'parse_streaming_chunks_with_tools'): full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools( stream=response, - agent_name="Agent", + agent_name=self.agent_name, print_on=self.print_on, verbose=self.verbose, ) - # Handle tool calls if any were detected if tool_calls_in_stream: - # Add text response to memory first if full_text_response.strip(): self.short_memory.add(role=self.agent_name, content=full_text_response) - # Format and execute tool calls import json formatted_tool_calls = [] for tc in tool_calls_in_stream: if tc and (tc.get("input") or tc.get("arguments_complete")): - if "input" in tc: - args_to_use = tc["input"] - else: - try: - args_to_use = json.loads(tc.get("arguments", "{}")) - except json.JSONDecodeError: - continue - + args_to_use = tc.get("input") or json.loads(tc.get("arguments", "{}")) formatted_tool_calls.append({ "type": "function", "function": {"name": tc["name"], "arguments": json.dumps(args_to_use)}, @@ -1189,15 +1178,15 @@ class Agent: }) if formatted_tool_calls: - # Execute tools using existing tool structure response = {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} + else: + response = full_text_response else: - # No tools called, use streamed text as response response = full_text_response if response.strip(): self.short_memory.add(role=self.agent_name, content=response) else: - # Fallback: collect text manually + # Fallback for streaming without tool parsing text_chunks = [] for chunk in response: if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content: @@ -1208,8 +1197,6 @@ class Agent: if self.print_on: print() response = "".join(text_chunks) - if response.strip(): - self.short_memory.add(role=self.agent_name, content=response) else: # Parse the response from the agent with the output type if exists(self.tools_list_dictionary): @@ -1217,6 +1204,8 @@ class Agent: response = response.model_dump() response = self.parse_llm_output(response) + + if isinstance(response, str) and response.strip(): self.short_memory.add(role=self.agent_name, content=response) # Print @@ -1263,14 +1252,16 @@ class Agent: attempt += 1 if not success: + if self.autosave is True: log_agent_data(self.to_dict()) self.save() logger.error( - "Failed to generate a valid response after retry attempts." + "Failed to generate a valid response after" + " retry attempts." ) - break + break # Exit the loop if all retry attempts fail # Check stopping conditions if ( @@ -1293,8 +1284,11 @@ class Agent: break if self.interactive: + # logger.info("Interactive mode enabled.") user_input = input("You: ") + + # User-defined exit command if ( user_input.lower() == self.custom_exit_command.lower() @@ -1304,6 +1298,7 @@ class Agent: loop_count=loop_count, ) break + self.short_memory.add( role=self.user_name, content=user_input ) @@ -2629,7 +2624,7 @@ class Agent: task=task, img=img, streaming_callback=streaming_callback, - title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}", + title=f"Agent: {self.agent_name} Loops: {current_loop}", print_on=self.print_on, verbose=self.verbose, *args, @@ -2638,9 +2633,12 @@ class Agent: else: # Non-streaming call if img is not None: - out = self.llm.run(task=task, img=img, *args, **kwargs) + out = self.llm.run( + task=task, img=img, *args, **kwargs + ) else: out = self.llm.run(task=task, *args, **kwargs) + return out except AgentLLMError as e: @@ -3015,7 +3013,7 @@ class Agent: if self.streaming_on: summary = temp_llm.run_with_streaming( task=self.short_memory.get_str(), - title=f"🤖 Agent: {self.agent_name} - MCP Tool Summary", + title=f"Agent: {self.agent_name} - MCP Tool Summary", style="cyan", print_on=self.print_on, verbose=self.verbose, @@ -3030,7 +3028,6 @@ class Agent: # Fallback: provide a default summary summary = "I successfully executed the MCP tool and retrieved the information above." - # Only pretty_print if streaming is off (to avoid double printing) if self.print_on and not self.streaming_on: self.pretty_print(summary, loop_count=current_loop) @@ -3043,7 +3040,6 @@ class Agent: raise e def temp_llm_instance_for_tool_summary(self): - from swarms.utils.litellm_wrapper import LiteLLM return LiteLLM( model_name=self.model_name, temperature=self.temperature, @@ -3064,6 +3060,7 @@ class Agent: "This may indicate the LLM did not return a valid response." ) return + try: output = self.tool_struct.execute_function_calls_from_api_response( response @@ -3095,17 +3092,16 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_summary_prompt = f""" - Please analyze and summarize the following tool execution output in a clear and concise way. - Focus on the key information and insights that would be most relevant to the user's original request. - If there are any errors or issues, highlight them prominently. - - The user's original request was: - {self.task} - - Tool Output: - {output} - """ + tool_response = temp_llm.run( + f""" + Please analyze and summarize the following tool execution output in a clear and concise way. + Focus on the key information and insights that would be most relevant to the user's original request. + If there are any errors or issues, highlight them prominently. + + Tool Output: + {output} + """ + ) # Use centralized streaming logic for tool summary if self.streaming_on: @@ -3116,7 +3112,7 @@ class Agent: verbose=self.verbose, ) else: - tool_response = temp_llm.run(tool_summary_prompt) + tool_response = temp_llm.run(tool_response) # Add the tool response to memory self.short_memory.add( @@ -3124,7 +3120,6 @@ class Agent: content=tool_response, ) - # Only pretty_print if streaming is off (to avoid double printing) if self.print_on and not self.streaming_on: self.pretty_print( tool_response,