From 5029e7644bb04a183f1ca9b7664be610d7df0488 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Fri, 4 Jul 2025 23:20:22 +0530 Subject: [PATCH 01/11] enhance tool-call coordination and streaming behavior in Agent class --- swarms/structs/agent.py | 91 ++++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 1b30644c..d25500fb 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -574,6 +574,16 @@ class Agent: self.summarize_multiple_images = summarize_multiple_images self.tool_retry_attempts = tool_retry_attempts + # Streaming / tool-call coordination flags + # When a tool call is expected we temporarily disable streaming so the + # LLM returns a complete JSON payload that can be parsed reliably. After + # the first tool call has been executed we re-enable streaming for + # subsequent requests / summaries. + self.expecting_tool_call: bool = False + self.tool_call_completed: bool = False + self.original_streaming_state: bool = self.streaming_on + self.should_stream_after_tools: bool = False + # self.short_memory = self.short_memory_init() # Initialize the feedback @@ -1057,6 +1067,14 @@ class Agent: self.short_memory.return_history_as_string() ) + # Determine if this request is primarily to obtain the first tool call + if self.streaming_on and exists(self.tools) and not self.tool_call_completed: + # Disable streaming for this request so we can reliably parse JSON + self.expecting_tool_call = True + self.should_stream_after_tools = True + else: + self.expecting_tool_call = False + # Parameters attempt = 0 success = False @@ -1118,10 +1136,16 @@ class Agent: # Check and execute callable tools if exists(self.tools): + # Use standard tool execution for both streaming and non-streaming self.tool_execution_retry( response, loop_count ) + # Mark that at least one tool call has been processed + self.tool_call_completed = True + # Reset expecting_tool_call so subsequent requests can stream + self.expecting_tool_call = False + # Handle MCP tools if ( exists(self.mcp_url) @@ -2533,8 +2557,11 @@ class Agent: del kwargs["is_last"] try: - # Set streaming parameter in LLM if streaming is enabled - if self.streaming_on and hasattr(self.llm, "stream"): + # Decide whether streaming should be used for this call + streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False) + + # Set streaming parameter in LLM if streaming is enabled for this call + if streaming_enabled and hasattr(self.llm, "stream"): original_stream = self.llm.stream self.llm.stream = True @@ -2547,7 +2574,7 @@ class Agent: task=task, *args, **kwargs ) - # If we get a streaming response, handle it with the new streaming panel + # If we get a streaming response, handle it with the streaming panel if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): @@ -2566,26 +2593,12 @@ class Agent: chunks.append(content) complete_response = "".join(chunks) else: - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - # Optional: Save each chunk to conversation in real-time - # This creates a more detailed conversation history - if self.verbose: - logger.debug( - f"Streaming chunk received: {chunk[:50]}..." - ) - - # Use the streaming panel to display and collect the response + # Use the streaming panel to display the response complete_response = formatter.print_streaming_panel( streaming_response, title=f"šŸ¤– Agent: {self.agent_name} Loops: {current_loop}", - style=None, # Use random color like non-streaming approach + style=None, collect_chunks=True, - on_chunk_callback=on_chunk_received, ) # Restore original stream setting @@ -2990,12 +3003,15 @@ class Agent: raise e def temp_llm_instance_for_tool_summary(self): + # Enable streaming for tool summary if original streaming was enabled and we should stream after tools + should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False) + return LiteLLM( model_name=self.model_name, temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=False, # Always disable streaming for tool summaries + stream=should_stream, # Enable streaming for tool summaries if conditions are met tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3042,8 +3058,7 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_response = temp_llm.run( - f""" + tool_prompt = f""" Please analyze and summarize the following tool execution output in a clear and concise way. Focus on the key information and insights that would be most relevant to the user's original request. If there are any errors or issues, highlight them prominently. @@ -3051,19 +3066,37 @@ class Agent: Tool Output: {output} """ - ) + + # Check if we should stream the tool summary + should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False) + + if should_stream and self.print_on: + # Handle streaming response with streaming panel + streaming_response = temp_llm.run(tool_prompt) + + if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): + # Use streaming panel directly + tool_response = formatter.print_streaming_panel( + streaming_response, + title=f"šŸ¤– Agent: {self.agent_name} Tool Summary", + style=None, + collect_chunks=True, + ) + else: + # Fallback for non-streaming response + tool_response = streaming_response + self.pretty_print(tool_response, loop_count) + else: + # Non-streaming response + tool_response = temp_llm.run(tool_prompt) + if self.print_on: + self.pretty_print(tool_response, loop_count) self.short_memory.add( role=self.agent_name, content=tool_response, ) - if self.print_on is True: - self.pretty_print( - tool_response, - loop_count, - ) - def list_output_types(self): return OutputType From 840dce779b0cee7f1a5f4b7d936b059881901d06 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Sat, 5 Jul 2025 00:10:41 +0530 Subject: [PATCH 02/11] refactor agent class to streamline tool call handling and improve task reference management; enhance logging for API response parsing in base tool --- swarms/structs/agent.py | 56 +++++++++------------------------------ swarms/tools/base_tool.py | 9 ++++--- 2 files changed, 18 insertions(+), 47 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index d25500fb..c243d412 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -581,8 +581,6 @@ class Agent: # subsequent requests / summaries. self.expecting_tool_call: bool = False self.tool_call_completed: bool = False - self.original_streaming_state: bool = self.streaming_on - self.should_stream_after_tools: bool = False # self.short_memory = self.short_memory_init() @@ -1006,6 +1004,9 @@ class Agent: agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) """ try: + # Preserve the original user task so that tool summaries can reference it + if task is not None: + self.run_task = str(task) self.check_if_no_prompt_then_autogenerate(task) @@ -1071,7 +1072,6 @@ class Agent: if self.streaming_on and exists(self.tools) and not self.tool_call_completed: # Disable streaming for this request so we can reliably parse JSON self.expecting_tool_call = True - self.should_stream_after_tools = True else: self.expecting_tool_call = False @@ -1367,37 +1367,6 @@ class Agent: ) return self.run(task=improved_prompt, *args, **kwargs) - # def parse_and_execute_tools(self, response: str, *args, **kwargs): - # max_retries = 3 # Maximum number of retries - # retries = 0 - # while retries < max_retries: - # try: - # logger.info("Executing tool...") - - # # try to Execute the tool and return a string - # out = parse_and_execute_json( - # functions=self.tools, - # json_string=response, - # parse_md=True, - # *args, - # **kwargs, - # ) - # logger.info(f"Tool Output: {out}") - # # Add the output to the memory - # # self.short_memory.add( - # # role="Tool Executor", - # # content=out, - # # ) - # return out - # except Exception as error: - # retries += 1 - # logger.error( - # f"Attempt {retries}: Error executing tool: {error}" - # ) - # if retries == max_retries: - # raise error - # time.sleep(1) # Wait for a bit before retrying - def add_memory(self, message: str): """Add a memory to the agent @@ -3003,15 +2972,18 @@ class Agent: raise e def temp_llm_instance_for_tool_summary(self): - # Enable streaming for tool summary if original streaming was enabled and we should stream after tools - should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False) - + """Create a temporary LiteLLM instance for the post-tool summary. + + If the agent was configured with `streaming_on=True`, the summary + request will also stream; otherwise it will be a normal synchronous + call. No extra coordination flags are required. + """ return LiteLLM( model_name=self.model_name, temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=should_stream, # Enable streaming for tool summaries if conditions are met + stream=self.streaming_on, tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3061,16 +3033,14 @@ class Agent: tool_prompt = f""" Please analyze and summarize the following tool execution output in a clear and concise way. Focus on the key information and insights that would be most relevant to the user's original request. + {self.run_task} If there are any errors or issues, highlight them prominently. - Tool Output: {output} """ - # Check if we should stream the tool summary - should_stream = getattr(self, 'should_stream_after_tools', False) and getattr(self, 'original_streaming_state', False) - - if should_stream and self.print_on: + # Stream the tool summary only if the agent is configured for streaming + if self.streaming_on and self.print_on: # Handle streaming response with streaming panel streaming_response = temp_llm.run(tool_prompt) diff --git a/swarms/tools/base_tool.py b/swarms/tools/base_tool.py index af08f11e..538cf729 100644 --- a/swarms/tools/base_tool.py +++ b/swarms/tools/base_tool.py @@ -2262,15 +2262,16 @@ class BaseTool(BaseModel): api_response = json.loads(api_response) except json.JSONDecodeError as e: self._log_if_verbose( - "error", - f"Failed to parse JSON from API response: {e}. Response: '{api_response[:100]}...'", + "debug", + f"API response is not JSON format: {e}. This is normal for plain text responses. Response: '{api_response[:100]}...'", ) + # If it's not JSON, it might be plain text without function calls return [] if not isinstance(api_response, dict): self._log_if_verbose( - "warning", - f"API response is not a dictionary (type: {type(api_response)}), returning empty list", + "debug", + f"API response is not a dictionary (type: {type(api_response)}), no function calls detected", ) return [] From a69485c288f4a7368458812df844286dc21a2290 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Sat, 5 Jul 2025 00:33:19 +0530 Subject: [PATCH 03/11] enhance streaming response handling by collecting chunks for detailed conversation history; add callback for real-time chunk logging --- swarms/structs/agent.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index c243d412..030517f1 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -2562,14 +2562,28 @@ class Agent: chunks.append(content) complete_response = "".join(chunks) else: - # Use the streaming panel to display the response + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + # Optional: Save each chunk to conversation in real-time + # This creates a more detailed conversation history + if self.verbose: + logger.debug( + f"Streaming chunk received: {chunk[:50]}..." + ) + + # Use the streaming panel to display and collect the response complete_response = formatter.print_streaming_panel( streaming_response, title=f"šŸ¤– Agent: {self.agent_name} Loops: {current_loop}", - style=None, + style=None, # Use random color like non-streaming approach collect_chunks=True, + on_chunk_callback=on_chunk_received, ) - + # Restore original stream setting self.llm.stream = original_stream From 638d81c338795842add49dffd2891667be714752 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Sat, 5 Jul 2025 00:38:37 +0530 Subject: [PATCH 04/11] refactor Agent class by removing commented-out tool execution method; for improved readability --- swarms/structs/agent.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 030517f1..081038ff 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -573,7 +573,6 @@ class Agent: ) self.summarize_multiple_images = summarize_multiple_images self.tool_retry_attempts = tool_retry_attempts - # Streaming / tool-call coordination flags # When a tool call is expected we temporarily disable streaming so the # LLM returns a complete JSON payload that can be parsed reliably. After @@ -1367,6 +1366,37 @@ class Agent: ) return self.run(task=improved_prompt, *args, **kwargs) + # def parse_and_execute_tools(self, response: str, *args, **kwargs): + # max_retries = 3 # Maximum number of retries + # retries = 0 + # while retries < max_retries: + # try: + # logger.info("Executing tool...") + + # # try to Execute the tool and return a string + # out = parse_and_execute_json( + # functions=self.tools, + # json_string=response, + # parse_md=True, + # *args, + # **kwargs, + # ) + # logger.info(f"Tool Output: {out}") + # # Add the output to the memory + # # self.short_memory.add( + # # role="Tool Executor", + # # content=out, + # # ) + # return out + # except Exception as error: + # retries += 1 + # logger.error( + # f"Attempt {retries}: Error executing tool: {error}" + # ) + # if retries == max_retries: + # raise error + # time.sleep(1) # Wait for a bit before retrying + def add_memory(self, message: str): """Add a memory to the agent @@ -2583,7 +2613,7 @@ class Agent: collect_chunks=True, on_chunk_callback=on_chunk_received, ) - + # Restore original stream setting self.llm.stream = original_stream @@ -3049,6 +3079,7 @@ class Agent: Focus on the key information and insights that would be most relevant to the user's original request. {self.run_task} If there are any errors or issues, highlight them prominently. + Tool Output: {output} """ From ccddb17cccb4559db007399103a9e64742c00199 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Sat, 5 Jul 2025 01:45:35 +0530 Subject: [PATCH 05/11] refactor streaming response handling in Formatter class; implement lock for concurrent Live panel management and improve error handling --- swarms/structs/agent.py | 8 +-- swarms/utils/formatter.py | 100 +++++++++++++++++++++----------------- 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 081038ff..40eaa005 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1144,7 +1144,6 @@ class Agent: self.tool_call_completed = True # Reset expecting_tool_call so subsequent requests can stream self.expecting_tool_call = False - # Handle MCP tools if ( exists(self.mcp_url) @@ -2558,7 +2557,6 @@ class Agent: try: # Decide whether streaming should be used for this call streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False) - # Set streaming parameter in LLM if streaming is enabled for this call if streaming_enabled and hasattr(self.llm, "stream"): original_stream = self.llm.stream @@ -2573,7 +2571,7 @@ class Agent: task=task, *args, **kwargs ) - # If we get a streaming response, handle it with the streaming panel + # If we get a streaming response, handle it with the new streaming panel if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): @@ -2613,7 +2611,7 @@ class Agent: collect_chunks=True, on_chunk_callback=on_chunk_received, ) - + # Restore original stream setting self.llm.stream = original_stream @@ -3079,11 +3077,9 @@ class Agent: Focus on the key information and insights that would be most relevant to the user's original request. {self.run_task} If there are any errors or issues, highlight them prominently. - Tool Output: {output} """ - # Stream the tool summary only if the agent is configured for streaming if self.streaming_on and self.print_on: # Handle streaming response with streaming panel diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index 34aa5eb8..7954983e 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -9,6 +9,12 @@ from rich.progress import Progress, SpinnerColumn, TextColumn from rich.table import Table from rich.text import Text +# Global lock to ensure only a single Rich Live context is active at any moment. +# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same +# console raise runtime errors. Using a module-level lock serialises access and +# prevents crashes when multiple agents stream simultaneously in different +# threads (e.g., in ConcurrentWorkflow). +live_render_lock = threading.Lock() def choose_random_color(): import random @@ -209,56 +215,60 @@ class Formatter: complete_response = "" chunks_collected = [] - # TRUE streaming with Rich's automatic text wrapping - with Live( - create_streaming_panel(streaming_text), - console=self.console, - refresh_per_second=20, - ) as live: - try: - for part in streaming_response: - if ( - hasattr(part, "choices") - and part.choices - and part.choices[0].delta.content - ): - # Add ONLY the new chunk to the Text object with random color style - chunk = part.choices[0].delta.content - streaming_text.append(chunk, style=text_style) - complete_response += chunk - - # Collect chunks if requested - if collect_chunks: - chunks_collected.append(chunk) - - # Call chunk callback if provided - if on_chunk_callback: - on_chunk_callback(chunk) - - # Update display with new text - Rich handles all wrapping automatically - live.update( - create_streaming_panel( - streaming_text, is_complete=False + # Acquire the lock so that only one Live panel is active at a time. + # Other threads will wait here until the current streaming completes, + # avoiding Rich.Live concurrency errors. + with live_render_lock: + # TRUE streaming with Rich's automatic text wrapping + with Live( + create_streaming_panel(streaming_text), + console=self.console, + refresh_per_second=20, + ) as live: + try: + for part in streaming_response: + if ( + hasattr(part, "choices") + and part.choices + and part.choices[0].delta.content + ): + # Add ONLY the new chunk to the Text object with random color style + chunk = part.choices[0].delta.content + streaming_text.append(chunk, style=text_style) + complete_response += chunk + + # Collect chunks if requested + if collect_chunks: + chunks_collected.append(chunk) + + # Call chunk callback if provided + if on_chunk_callback: + on_chunk_callback(chunk) + + # Update display with new text - Rich handles all wrapping automatically + live.update( + create_streaming_panel( + streaming_text, is_complete=False + ) ) - ) - # Final update to show completion - live.update( - create_streaming_panel( - streaming_text, is_complete=True + # Final update to show completion + live.update( + create_streaming_panel( + streaming_text, is_complete=True + ) ) - ) - except Exception as e: - # Handle any streaming errors gracefully - streaming_text.append( - f"\n[Error: {str(e)}]", style="bold red" - ) - live.update( - create_streaming_panel( - streaming_text, is_complete=True + except Exception as e: + # Handle any streaming errors gracefully + streaming_text.append( + f"\n[Error: {str(e)}]", style="bold red" + ) + live.update( + create_streaming_panel( + streaming_text, is_complete=True + ) ) - ) return complete_response From a528139cc426e986f3e8ae3a5907ae1cd38f897d Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Thu, 17 Jul 2025 21:59:41 +0530 Subject: [PATCH 06/11] Refactor Agent class to simplify tool call handling and remove unnecessary streaming coordination flags --- swarms/structs/agent.py | 70 ++++++++--------------------------------- 1 file changed, 13 insertions(+), 57 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index d6b503a1..5275f25d 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -573,13 +573,6 @@ class Agent: ) self.summarize_multiple_images = summarize_multiple_images self.tool_retry_attempts = tool_retry_attempts - # Streaming / tool-call coordination flags - # When a tool call is expected we temporarily disable streaming so the - # LLM returns a complete JSON payload that can be parsed reliably. After - # the first tool call has been executed we re-enable streaming for - # subsequent requests / summaries. - self.expecting_tool_call: bool = False - self.tool_call_completed: bool = False self.speed_mode = speed_mode # Initialize the feedback @@ -1024,9 +1017,6 @@ class Agent: agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) """ try: - # Preserve the original user task so that tool summaries can reference it - if task is not None: - self.run_task = str(task) self.check_if_no_prompt_then_autogenerate(task) @@ -1076,13 +1066,6 @@ class Agent: self.short_memory.return_history_as_string() ) - # Determine if this request is primarily to obtain the first tool call - if self.streaming_on and exists(self.tools) and not self.tool_call_completed: - # Disable streaming for this request so we can reliably parse JSON - self.expecting_tool_call = True - else: - self.expecting_tool_call = False - # Parameters attempt = 0 success = False @@ -1134,15 +1117,10 @@ class Agent: # Check and execute callable tools if exists(self.tools): - # Use standard tool execution for both streaming and non-streaming self.tool_execution_retry( response, loop_count ) - # Mark that at least one tool call has been processed - self.tool_call_completed = True - # Reset expecting_tool_call so subsequent requests can stream - self.expecting_tool_call = False # Handle MCP tools if ( exists(self.mcp_url) @@ -2552,10 +2530,8 @@ class Agent: del kwargs["is_last"] try: - # Decide whether streaming should be used for this call - streaming_enabled = self.streaming_on and not getattr(self, "expecting_tool_call", False) - # Set streaming parameter in LLM if streaming is enabled for this call - if streaming_enabled and hasattr(self.llm, "stream"): + # Set streaming parameter in LLM if streaming is enabled + if self.streaming_on and hasattr(self.llm, "stream"): original_stream = self.llm.stream self.llm.stream = True @@ -3004,18 +2980,12 @@ class Agent: raise e def temp_llm_instance_for_tool_summary(self): - """Create a temporary LiteLLM instance for the post-tool summary. - - If the agent was configured with `streaming_on=True`, the summary - request will also stream; otherwise it will be a normal synchronous - call. No extra coordination flags are required. - """ return LiteLLM( model_name=self.model_name, temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=self.streaming_on, + stream=False, # Always disable streaming for tool summaries tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3062,42 +3032,28 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_prompt = f""" + tool_response = temp_llm.run( + f""" Please analyze and summarize the following tool execution output in a clear and concise way. Focus on the key information and insights that would be most relevant to the user's original request. - {self.run_task} If there are any errors or issues, highlight them prominently. + Tool Output: {output} """ - # Stream the tool summary only if the agent is configured for streaming - if self.streaming_on and self.print_on: - # Handle streaming response with streaming panel - streaming_response = temp_llm.run(tool_prompt) - - if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): - # Use streaming panel directly - tool_response = formatter.print_streaming_panel( - streaming_response, - title=f"šŸ¤– Agent: {self.agent_name} Tool Summary", - style=None, - collect_chunks=True, - ) - else: - # Fallback for non-streaming response - tool_response = streaming_response - self.pretty_print(tool_response, loop_count) - else: - # Non-streaming response - tool_response = temp_llm.run(tool_prompt) - if self.print_on: - self.pretty_print(tool_response, loop_count) + ) self.short_memory.add( role=self.agent_name, content=tool_response, ) + if self.print_on is True: + self.pretty_print( + tool_response, + loop_count, + ) + def list_output_types(self): return OutputType From cebcd454c289a1c64b457e685dfa5ec1b494e9cc Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Thu, 17 Jul 2025 22:02:48 +0530 Subject: [PATCH 07/11] cleanup --- swarms/tools/base_tool.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/swarms/tools/base_tool.py b/swarms/tools/base_tool.py index 538cf729..af08f11e 100644 --- a/swarms/tools/base_tool.py +++ b/swarms/tools/base_tool.py @@ -2262,16 +2262,15 @@ class BaseTool(BaseModel): api_response = json.loads(api_response) except json.JSONDecodeError as e: self._log_if_verbose( - "debug", - f"API response is not JSON format: {e}. This is normal for plain text responses. Response: '{api_response[:100]}...'", + "error", + f"Failed to parse JSON from API response: {e}. Response: '{api_response[:100]}...'", ) - # If it's not JSON, it might be plain text without function calls return [] if not isinstance(api_response, dict): self._log_if_verbose( - "debug", - f"API response is not a dictionary (type: {type(api_response)}), no function calls detected", + "warning", + f"API response is not a dictionary (type: {type(api_response)}), returning empty list", ) return [] From 5e35951e4ca09a50db133745a0715d795168229d Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Fri, 18 Jul 2025 23:48:43 +0530 Subject: [PATCH 08/11] Enhance tool execution handling for streaming responses, adding support for partial JSON and tool call detection --- swarms/structs/agent.py | 284 +++++++++++++++++++++++++++++++++++--- swarms/tools/base_tool.py | 103 +++++++++++++- 2 files changed, 365 insertions(+), 22 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 5275f25d..dbd5c952 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1116,7 +1116,7 @@ class Agent: ) # Check and execute callable tools - if exists(self.tools): + if exists(self.tools) and self._response_contains_tool_calls(response): self.tool_execution_retry( response, loop_count ) @@ -3001,35 +3001,202 @@ class Agent: ) return + # Check if this is a streaming response + if self.streaming_on and hasattr(response, '__iter__') and not isinstance(response, (str, dict)): + self._execute_tools_streaming(response, loop_count) + else: + self._execute_tools_non_streaming(response, loop_count) + + def _execute_tools_streaming(self, streaming_response, loop_count: int): + """Handle tool execution for streaming responses with real-time parsing""" + tool_call_accumulators = {} # Dictionary to track multiple tool calls by ID + executed_tools = set() # Track which tools have been executed + + try: + if self.print_on: + logger.info(f"šŸ”§ Starting streaming tool execution for agent {self.agent_name}") + + for chunk in streaming_response: + if hasattr(chunk, 'choices') and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + # Get tool call index to handle multiple parallel tool calls + tool_index = getattr(tool_call, 'index', 0) + tool_id = getattr(tool_call, 'id', f"tool_{tool_index}") + + # Initialize accumulator for new tool call + if tool_id not in tool_call_accumulators: + tool_call_accumulators[tool_id] = { + 'name': '', + 'arguments': '', + 'id': tool_id, + 'index': tool_index, + 'complete': False + } + + # Accumulate tool name + if hasattr(tool_call, 'function') and hasattr(tool_call.function, 'name'): + if tool_call.function.name: + tool_call_accumulators[tool_id]['name'] = tool_call.function.name + if self.print_on and self.verbose: + logger.info(f"šŸ› ļø Tool call detected: {tool_call.function.name}") + + # Accumulate tool arguments + if hasattr(tool_call, 'function') and hasattr(tool_call.function, 'arguments'): + if tool_call.function.arguments: + tool_call_accumulators[tool_id]['arguments'] += tool_call.function.arguments + + # Try to parse arguments to see if they're complete valid JSON + try: + parsed_args = json.loads(tool_call_accumulators[tool_id]['arguments']) + # If parsing succeeds and tool hasn't been executed yet, execute it + if (not tool_call_accumulators[tool_id]['complete'] and + tool_id not in executed_tools and + tool_call_accumulators[tool_id]['name']): + + tool_call_accumulators[tool_id]['complete'] = True + executed_tools.add(tool_id) + + # Execute tool immediately + self._execute_single_tool_streaming( + tool_call_accumulators[tool_id], + loop_count + ) + + except json.JSONDecodeError: + # Arguments not complete yet, continue accumulating + if self.verbose: + logger.debug(f"Accumulating args for {tool_call_accumulators[tool_id]['name']}: {tool_call_accumulators[tool_id]['arguments'][:50]}...") + continue + + # Handle any remaining tools that might not have been executed + for tool_id, tool_data in tool_call_accumulators.items(): + if not tool_data['complete'] and tool_data['arguments'] and tool_id not in executed_tools: + try: + json.loads(tool_data['arguments']) + self._execute_single_tool_streaming(tool_data, loop_count) + executed_tools.add(tool_id) + except json.JSONDecodeError: + logger.warning(f"Tool {tool_data['name']} had incomplete arguments: {tool_data['arguments'][:100]}...") + + except Exception as e: + logger.error(f"Error during streaming tool execution: {e}") + # Fallback to non-streaming execution if something goes wrong + logger.info("Falling back to non-streaming tool execution") + self._execute_tools_non_streaming(streaming_response, loop_count) + + def _execute_single_tool_streaming(self, tool_data: dict, loop_count: int): + """Execute a single tool with its accumulated data during streaming""" try: + if self.print_on: + formatter.print_panel( + f"šŸš€ Executing tool: {tool_data['name']}\nArguments: {tool_data['arguments']}", + f"Real-time Tool Execution [{time.strftime('%H:%M:%S')}]", + style="cyan" + ) + + # Create a mock response object that the existing tool_struct can handle + mock_response = { + 'choices': [{ + 'message': { + 'tool_calls': [{ + 'id': tool_data['id'], + 'type': 'function', + 'function': { + 'name': tool_data['name'], + 'arguments': tool_data['arguments'] + } + }] + } + }] + } + + # Execute the tool with streaming mode enabled output = self.tool_struct.execute_function_calls_from_api_response( - response + mock_response, + is_streaming=True ) + + if output: + # Add tool output to memory immediately + tool_output_content = f"Tool '{tool_data['name']}' executed successfully:\n{format_data_structure(output)}" + self.short_memory.add( + role="Tool Executor", + content=tool_output_content, + ) + + if self.print_on: + formatter.print_panel( + format_data_structure(output), + f"āœ… Tool '{tool_data['name']}' Output [{time.strftime('%H:%M:%S')}]", + style="green" + ) + + # Generate tool summary if enabled + if self.tool_call_summary: + self._generate_tool_summary(output, loop_count, tool_data['name']) + else: + logger.warning(f"Tool {tool_data['name']} returned no output") + except Exception as e: - # Retry the tool call + error_msg = f"Error executing streaming tool {tool_data['name']}: {str(e)}" + logger.error(error_msg) + + # Add error to memory + self.short_memory.add( + role="Tool Executor", + content=f"Tool execution failed: {error_msg}", + ) + + if self.print_on: + formatter.print_panel( + error_msg, + f"āŒ Tool Execution Error [{time.strftime('%H:%M:%S')}]", + style="red" + ) + + def _execute_tools_non_streaming(self, response: any, loop_count: int): + """Handle tool execution for non-streaming responses (existing logic)""" + try: output = self.tool_struct.execute_function_calls_from_api_response( - response + response, + is_streaming=False ) + except Exception as e: + # Retry the tool call + try: + output = self.tool_struct.execute_function_calls_from_api_response( + response, + is_streaming=False + ) + except Exception as retry_error: + logger.error(f"Error executing tools after retry: {retry_error}") + raise retry_error if output is None: logger.error(f"Error executing tools: {e}") raise e - self.short_memory.add( - role="Tool Executor", - content=format_data_structure(output), - ) - - if self.print_on is True: - self.pretty_print( - f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]", - loop_count, + if output: + self.short_memory.add( + role="Tool Executor", + content=format_data_structure(output), ) - # Now run the LLM again without tools - create a temporary LLM instance - # instead of modifying the cached one - # Create a temporary LLM instance without tools for the follow-up call - if self.tool_call_summary is True: + if self.print_on is True: + self.pretty_print( + f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]", + loop_count, + ) + + if self.tool_call_summary is True: + self._generate_tool_summary(output, loop_count) + + def _generate_tool_summary(self, output, loop_count: int, tool_name: str = ""): + """Generate tool execution summary""" + try: temp_llm = self.temp_llm_instance_for_tool_summary() tool_response = temp_llm.run( @@ -3038,6 +3205,7 @@ class Agent: Focus on the key information and insights that would be most relevant to the user's original request. If there are any errors or issues, highlight them prominently. + Tool Name: {tool_name} Tool Output: {output} """ @@ -3053,6 +3221,8 @@ class Agent: tool_response, loop_count, ) + except Exception as e: + logger.error(f"Error generating tool summary: {e}") def list_output_types(self): return OutputType @@ -3188,6 +3358,86 @@ class Agent: f"Failed to find correct answer '{correct_answer}' after {max_attempts} attempts" ) + def _response_contains_tool_calls(self, response: any) -> bool: + """ + Check if a response contains tool calls that should be executed. + + Args: + response: The response from the LLM + + Returns: + bool: True if response contains tool calls, False otherwise + """ + if response is None: + return False + + try: + # Handle string responses + if isinstance(response, str): + # Check for empty or whitespace-only strings + if not response.strip(): + return False + + # Try to parse as JSON + try: + response_dict = json.loads(response) + except json.JSONDecodeError: + # If it's not JSON, it's likely just text without tool calls + return False + + response = response_dict + + # Handle BaseModel objects + if isinstance(response, BaseModel): + response = response.model_dump() + + # Check if it's a dictionary with tool call indicators + if isinstance(response, dict): + # Check for OpenAI format + if "choices" in response: + choices = response.get("choices", []) + for choice in choices: + message = choice.get("message", {}) + if "tool_calls" in message and message["tool_calls"]: + return True + + # Check for direct tool_calls + if "tool_calls" in response and response["tool_calls"]: + return True + + # Check for Anthropic format + if "content" in response: + content = response.get("content", []) + if isinstance(content, list): + for item in content: + if isinstance(item, dict) and item.get("type") == "tool_use": + return True + + # Check for single tool call format + if (response.get("type") == "function" and "function" in response) or \ + (response.get("type") == "tool_use" and "name" in response): + return True + + # Handle list of tool calls + if isinstance(response, list): + for item in response: + if isinstance(item, dict): + if (item.get("type") == "function" and "function" in item) or \ + (item.get("type") == "tool_use" and "name" in item): + return True + elif isinstance(item, BaseModel): + # Convert BaseModel to dict and check + item_dict = item.model_dump() + if (item_dict.get("type") == "function" and "function" in item_dict) or \ + (item_dict.get("type") == "tool_use" and "name" in item_dict): + return True + + return False + + except Exception as e: + logger.debug(f"Error checking for tool calls in response: {e}") + return False + def tool_execution_retry(self, response: any, loop_count: int): """ Execute tools with retry logic for handling failures. diff --git a/swarms/tools/base_tool.py b/swarms/tools/base_tool.py index af08f11e..7d6e9cb9 100644 --- a/swarms/tools/base_tool.py +++ b/swarms/tools/base_tool.py @@ -2185,6 +2185,7 @@ class BaseTool(BaseModel): sequential: bool = False, max_workers: int = 4, return_as_string: bool = True, + is_streaming: bool = False, ) -> Union[List[Any], List[str]]: """ Automatically detect and execute function calls from OpenAI or Anthropic API responses. @@ -2196,12 +2197,14 @@ class BaseTool(BaseModel): - Pydantic BaseModel objects from Anthropic responses - Parallel function execution with concurrent.futures or sequential execution - Multiple function calls in a single response + - Streaming responses with partial JSON chunks Args: api_response (Union[Dict[str, Any], str, List[Any]]): The API response containing function calls sequential (bool): If True, execute functions sequentially. If False, execute in parallel (default) max_workers (int): Maximum number of worker threads for parallel execution (default: 4) return_as_string (bool): If True, return results as formatted strings (default: True) + is_streaming (bool): If True, handle partial/incomplete streaming responses gracefully (default: False) Returns: Union[List[Any], List[str]]: List of results from executed functions @@ -2222,6 +2225,9 @@ class BaseTool(BaseModel): >>> # Direct tool calls list (including BaseModel objects) >>> tool_calls = [ChatCompletionMessageToolCall(...), ...] >>> results = tool.execute_function_calls_from_api_response(tool_calls) + + >>> # Streaming response handling + >>> results = tool.execute_function_calls_from_api_response(partial_response, is_streaming=True) """ # Handle None API response gracefully by returning empty results if api_response is None: @@ -2230,6 +2236,26 @@ class BaseTool(BaseModel): "API response is None, returning empty results. This may indicate the LLM did not return a valid response.", ) return [] if not return_as_string else [] + + # Handle streaming mode with empty or partial responses + if is_streaming: + # For streaming, we may get empty strings or partial JSON - handle gracefully + if isinstance(api_response, str) and api_response.strip() == "": + self._log_if_verbose( + "debug", + "Empty streaming response, returning empty results", + ) + return [] if not return_as_string else [] + + # If it's a string that looks like incomplete JSON, return empty results + if isinstance(api_response, str): + stripped_response = api_response.strip() + if stripped_response and not self._is_likely_complete_json(stripped_response): + self._log_if_verbose( + "debug", + f"Incomplete streaming JSON detected: '{stripped_response[:50]}...', returning empty results", + ) + return [] if not return_as_string else [] # Handle direct list of tool call objects (e.g., from OpenAI ChatCompletionMessageToolCall or Anthropic BaseModels) if isinstance(api_response, list): @@ -2261,11 +2287,19 @@ class BaseTool(BaseModel): try: api_response = json.loads(api_response) except json.JSONDecodeError as e: - self._log_if_verbose( - "error", - f"Failed to parse JSON from API response: {e}. Response: '{api_response[:100]}...'", - ) - return [] + # In streaming mode, this is expected for partial responses + if is_streaming: + self._log_if_verbose( + "debug", + f"JSON parsing failed in streaming mode (expected for partial responses): {e}. Response: '{api_response[:100]}...'", + ) + return [] if not return_as_string else [] + else: + self._log_if_verbose( + "error", + f"Failed to parse JSON from API response: {e}. Response: '{api_response[:100]}...'", + ) + return [] if not isinstance(api_response, dict): self._log_if_verbose( @@ -2966,6 +3000,65 @@ class BaseTool(BaseModel): return function_calls + def _is_likely_complete_json(self, json_str: str) -> bool: + """ + Check if a JSON string appears to be complete by examining its structure. + + This is a heuristic method for streaming responses to avoid parsing incomplete JSON. + + Args: + json_str (str): JSON string to check + + Returns: + bool: True if the JSON appears complete, False otherwise + """ + if not json_str or not isinstance(json_str, str): + return False + + json_str = json_str.strip() + if not json_str: + return False + + try: + # Try to parse - if it succeeds, it's complete + json.loads(json_str) + return True + except json.JSONDecodeError: + # If parsing fails, use heuristics to check if it might be incomplete + + # Check for basic structural completeness + if json_str.startswith('{'): + # Count braces to see if they're balanced + open_braces = json_str.count('{') + close_braces = json_str.count('}') + if open_braces > close_braces: + return False # Likely incomplete + + elif json_str.startswith('['): + # Count brackets to see if they're balanced + open_brackets = json_str.count('[') + close_brackets = json_str.count(']') + if open_brackets > close_brackets: + return False # Likely incomplete + + # Check for incomplete strings (odd number of unescaped quotes) + quote_count = 0 + escaped = False + for char in json_str: + if char == '\\' and not escaped: + escaped = True + elif char == '"' and not escaped: + quote_count += 1 + else: + escaped = False + + if quote_count % 2 != 0: + return False # Incomplete string + + # If we get here, the JSON might be malformed but not necessarily incomplete + # Return False to be safe + return False + def _format_results_as_strings( self, results: List[Any], function_calls: List[Dict[str, Any]] ) -> List[str]: From 66e68b840552bb47517c426f6039981832b7d547 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Fri, 25 Jul 2025 19:59:16 +0530 Subject: [PATCH 09/11] fixes & clean up ! --- swarms/structs/agent.py | 284 +++----------------------------------- swarms/tools/base_tool.py | 103 +------------- 2 files changed, 22 insertions(+), 365 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 3726c43b..ce57c65f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1121,7 +1121,7 @@ class Agent: ) # Check and execute callable tools - if exists(self.tools) and self._response_contains_tool_calls(response): + if exists(self.tools): self.tool_execution_retry( response, loop_count ) @@ -3004,202 +3004,35 @@ class Agent: ) return - # Check if this is a streaming response - if self.streaming_on and hasattr(response, '__iter__') and not isinstance(response, (str, dict)): - self._execute_tools_streaming(response, loop_count) - else: - self._execute_tools_non_streaming(response, loop_count) - - def _execute_tools_streaming(self, streaming_response, loop_count: int): - """Handle tool execution for streaming responses with real-time parsing""" - tool_call_accumulators = {} # Dictionary to track multiple tool calls by ID - executed_tools = set() # Track which tools have been executed - - try: - if self.print_on: - logger.info(f"šŸ”§ Starting streaming tool execution for agent {self.agent_name}") - - for chunk in streaming_response: - if hasattr(chunk, 'choices') and len(chunk.choices) > 0: - delta = chunk.choices[0].delta - - if hasattr(delta, 'tool_calls') and delta.tool_calls: - for tool_call in delta.tool_calls: - # Get tool call index to handle multiple parallel tool calls - tool_index = getattr(tool_call, 'index', 0) - tool_id = getattr(tool_call, 'id', f"tool_{tool_index}") - - # Initialize accumulator for new tool call - if tool_id not in tool_call_accumulators: - tool_call_accumulators[tool_id] = { - 'name': '', - 'arguments': '', - 'id': tool_id, - 'index': tool_index, - 'complete': False - } - - # Accumulate tool name - if hasattr(tool_call, 'function') and hasattr(tool_call.function, 'name'): - if tool_call.function.name: - tool_call_accumulators[tool_id]['name'] = tool_call.function.name - if self.print_on and self.verbose: - logger.info(f"šŸ› ļø Tool call detected: {tool_call.function.name}") - - # Accumulate tool arguments - if hasattr(tool_call, 'function') and hasattr(tool_call.function, 'arguments'): - if tool_call.function.arguments: - tool_call_accumulators[tool_id]['arguments'] += tool_call.function.arguments - - # Try to parse arguments to see if they're complete valid JSON - try: - parsed_args = json.loads(tool_call_accumulators[tool_id]['arguments']) - # If parsing succeeds and tool hasn't been executed yet, execute it - if (not tool_call_accumulators[tool_id]['complete'] and - tool_id not in executed_tools and - tool_call_accumulators[tool_id]['name']): - - tool_call_accumulators[tool_id]['complete'] = True - executed_tools.add(tool_id) - - # Execute tool immediately - self._execute_single_tool_streaming( - tool_call_accumulators[tool_id], - loop_count - ) - - except json.JSONDecodeError: - # Arguments not complete yet, continue accumulating - if self.verbose: - logger.debug(f"Accumulating args for {tool_call_accumulators[tool_id]['name']}: {tool_call_accumulators[tool_id]['arguments'][:50]}...") - continue - - # Handle any remaining tools that might not have been executed - for tool_id, tool_data in tool_call_accumulators.items(): - if not tool_data['complete'] and tool_data['arguments'] and tool_id not in executed_tools: - try: - json.loads(tool_data['arguments']) - self._execute_single_tool_streaming(tool_data, loop_count) - executed_tools.add(tool_id) - except json.JSONDecodeError: - logger.warning(f"Tool {tool_data['name']} had incomplete arguments: {tool_data['arguments'][:100]}...") - - except Exception as e: - logger.error(f"Error during streaming tool execution: {e}") - # Fallback to non-streaming execution if something goes wrong - logger.info("Falling back to non-streaming tool execution") - self._execute_tools_non_streaming(streaming_response, loop_count) - - def _execute_single_tool_streaming(self, tool_data: dict, loop_count: int): - """Execute a single tool with its accumulated data during streaming""" try: - if self.print_on: - formatter.print_panel( - f"šŸš€ Executing tool: {tool_data['name']}\nArguments: {tool_data['arguments']}", - f"Real-time Tool Execution [{time.strftime('%H:%M:%S')}]", - style="cyan" - ) - - # Create a mock response object that the existing tool_struct can handle - mock_response = { - 'choices': [{ - 'message': { - 'tool_calls': [{ - 'id': tool_data['id'], - 'type': 'function', - 'function': { - 'name': tool_data['name'], - 'arguments': tool_data['arguments'] - } - }] - } - }] - } - - # Execute the tool with streaming mode enabled output = self.tool_struct.execute_function_calls_from_api_response( - mock_response, - is_streaming=True + response ) - - if output: - # Add tool output to memory immediately - tool_output_content = f"Tool '{tool_data['name']}' executed successfully:\n{format_data_structure(output)}" - self.short_memory.add( - role="Tool Executor", - content=tool_output_content, - ) - - if self.print_on: - formatter.print_panel( - format_data_structure(output), - f"āœ… Tool '{tool_data['name']}' Output [{time.strftime('%H:%M:%S')}]", - style="green" - ) - - # Generate tool summary if enabled - if self.tool_call_summary: - self._generate_tool_summary(output, loop_count, tool_data['name']) - else: - logger.warning(f"Tool {tool_data['name']} returned no output") - except Exception as e: - error_msg = f"Error executing streaming tool {tool_data['name']}: {str(e)}" - logger.error(error_msg) - - # Add error to memory - self.short_memory.add( - role="Tool Executor", - content=f"Tool execution failed: {error_msg}", - ) - - if self.print_on: - formatter.print_panel( - error_msg, - f"āŒ Tool Execution Error [{time.strftime('%H:%M:%S')}]", - style="red" - ) - - def _execute_tools_non_streaming(self, response: any, loop_count: int): - """Handle tool execution for non-streaming responses (existing logic)""" - try: + # Retry the tool call output = self.tool_struct.execute_function_calls_from_api_response( - response, - is_streaming=False + response ) - except Exception as e: - # Retry the tool call - try: - output = self.tool_struct.execute_function_calls_from_api_response( - response, - is_streaming=False - ) - except Exception as retry_error: - logger.error(f"Error executing tools after retry: {retry_error}") - raise retry_error if output is None: logger.error(f"Error executing tools: {e}") raise e - if output: - self.short_memory.add( - role="Tool Executor", - content=format_data_structure(output), - ) - - if self.print_on is True: - self.pretty_print( - f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]", - loop_count, - ) + self.short_memory.add( + role="Tool Executor", + content=format_data_structure(output), + ) - if self.tool_call_summary is True: - self._generate_tool_summary(output, loop_count) + if self.print_on is True: + self.pretty_print( + f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]", + loop_count, + ) - def _generate_tool_summary(self, output, loop_count: int, tool_name: str = ""): - """Generate tool execution summary""" - try: + # Now run the LLM again without tools - create a temporary LLM instance + # instead of modifying the cached one + # Create a temporary LLM instance without tools for the follow-up call + if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() tool_response = temp_llm.run( @@ -3208,7 +3041,6 @@ class Agent: Focus on the key information and insights that would be most relevant to the user's original request. If there are any errors or issues, highlight them prominently. - Tool Name: {tool_name} Tool Output: {output} """ @@ -3224,8 +3056,6 @@ class Agent: tool_response, loop_count, ) - except Exception as e: - logger.error(f"Error generating tool summary: {e}") def list_output_types(self): return OutputType @@ -3361,86 +3191,6 @@ class Agent: f"Failed to find correct answer '{correct_answer}' after {max_attempts} attempts" ) - def _response_contains_tool_calls(self, response: any) -> bool: - """ - Check if a response contains tool calls that should be executed. - - Args: - response: The response from the LLM - - Returns: - bool: True if response contains tool calls, False otherwise - """ - if response is None: - return False - - try: - # Handle string responses - if isinstance(response, str): - # Check for empty or whitespace-only strings - if not response.strip(): - return False - - # Try to parse as JSON - try: - response_dict = json.loads(response) - except json.JSONDecodeError: - # If it's not JSON, it's likely just text without tool calls - return False - - response = response_dict - - # Handle BaseModel objects - if isinstance(response, BaseModel): - response = response.model_dump() - - # Check if it's a dictionary with tool call indicators - if isinstance(response, dict): - # Check for OpenAI format - if "choices" in response: - choices = response.get("choices", []) - for choice in choices: - message = choice.get("message", {}) - if "tool_calls" in message and message["tool_calls"]: - return True - - # Check for direct tool_calls - if "tool_calls" in response and response["tool_calls"]: - return True - - # Check for Anthropic format - if "content" in response: - content = response.get("content", []) - if isinstance(content, list): - for item in content: - if isinstance(item, dict) and item.get("type") == "tool_use": - return True - - # Check for single tool call format - if (response.get("type") == "function" and "function" in response) or \ - (response.get("type") == "tool_use" and "name" in response): - return True - - # Handle list of tool calls - if isinstance(response, list): - for item in response: - if isinstance(item, dict): - if (item.get("type") == "function" and "function" in item) or \ - (item.get("type") == "tool_use" and "name" in item): - return True - elif isinstance(item, BaseModel): - # Convert BaseModel to dict and check - item_dict = item.model_dump() - if (item_dict.get("type") == "function" and "function" in item_dict) or \ - (item_dict.get("type") == "tool_use" and "name" in item_dict): - return True - - return False - - except Exception as e: - logger.debug(f"Error checking for tool calls in response: {e}") - return False - def tool_execution_retry(self, response: any, loop_count: int): """ Execute tools with retry logic for handling failures. diff --git a/swarms/tools/base_tool.py b/swarms/tools/base_tool.py index 7d6e9cb9..af08f11e 100644 --- a/swarms/tools/base_tool.py +++ b/swarms/tools/base_tool.py @@ -2185,7 +2185,6 @@ class BaseTool(BaseModel): sequential: bool = False, max_workers: int = 4, return_as_string: bool = True, - is_streaming: bool = False, ) -> Union[List[Any], List[str]]: """ Automatically detect and execute function calls from OpenAI or Anthropic API responses. @@ -2197,14 +2196,12 @@ class BaseTool(BaseModel): - Pydantic BaseModel objects from Anthropic responses - Parallel function execution with concurrent.futures or sequential execution - Multiple function calls in a single response - - Streaming responses with partial JSON chunks Args: api_response (Union[Dict[str, Any], str, List[Any]]): The API response containing function calls sequential (bool): If True, execute functions sequentially. If False, execute in parallel (default) max_workers (int): Maximum number of worker threads for parallel execution (default: 4) return_as_string (bool): If True, return results as formatted strings (default: True) - is_streaming (bool): If True, handle partial/incomplete streaming responses gracefully (default: False) Returns: Union[List[Any], List[str]]: List of results from executed functions @@ -2225,9 +2222,6 @@ class BaseTool(BaseModel): >>> # Direct tool calls list (including BaseModel objects) >>> tool_calls = [ChatCompletionMessageToolCall(...), ...] >>> results = tool.execute_function_calls_from_api_response(tool_calls) - - >>> # Streaming response handling - >>> results = tool.execute_function_calls_from_api_response(partial_response, is_streaming=True) """ # Handle None API response gracefully by returning empty results if api_response is None: @@ -2236,26 +2230,6 @@ class BaseTool(BaseModel): "API response is None, returning empty results. This may indicate the LLM did not return a valid response.", ) return [] if not return_as_string else [] - - # Handle streaming mode with empty or partial responses - if is_streaming: - # For streaming, we may get empty strings or partial JSON - handle gracefully - if isinstance(api_response, str) and api_response.strip() == "": - self._log_if_verbose( - "debug", - "Empty streaming response, returning empty results", - ) - return [] if not return_as_string else [] - - # If it's a string that looks like incomplete JSON, return empty results - if isinstance(api_response, str): - stripped_response = api_response.strip() - if stripped_response and not self._is_likely_complete_json(stripped_response): - self._log_if_verbose( - "debug", - f"Incomplete streaming JSON detected: '{stripped_response[:50]}...', returning empty results", - ) - return [] if not return_as_string else [] # Handle direct list of tool call objects (e.g., from OpenAI ChatCompletionMessageToolCall or Anthropic BaseModels) if isinstance(api_response, list): @@ -2287,19 +2261,11 @@ class BaseTool(BaseModel): try: api_response = json.loads(api_response) except json.JSONDecodeError as e: - # In streaming mode, this is expected for partial responses - if is_streaming: - self._log_if_verbose( - "debug", - f"JSON parsing failed in streaming mode (expected for partial responses): {e}. Response: '{api_response[:100]}...'", - ) - return [] if not return_as_string else [] - else: - self._log_if_verbose( - "error", - f"Failed to parse JSON from API response: {e}. Response: '{api_response[:100]}...'", - ) - return [] + self._log_if_verbose( + "error", + f"Failed to parse JSON from API response: {e}. Response: '{api_response[:100]}...'", + ) + return [] if not isinstance(api_response, dict): self._log_if_verbose( @@ -3000,65 +2966,6 @@ class BaseTool(BaseModel): return function_calls - def _is_likely_complete_json(self, json_str: str) -> bool: - """ - Check if a JSON string appears to be complete by examining its structure. - - This is a heuristic method for streaming responses to avoid parsing incomplete JSON. - - Args: - json_str (str): JSON string to check - - Returns: - bool: True if the JSON appears complete, False otherwise - """ - if not json_str or not isinstance(json_str, str): - return False - - json_str = json_str.strip() - if not json_str: - return False - - try: - # Try to parse - if it succeeds, it's complete - json.loads(json_str) - return True - except json.JSONDecodeError: - # If parsing fails, use heuristics to check if it might be incomplete - - # Check for basic structural completeness - if json_str.startswith('{'): - # Count braces to see if they're balanced - open_braces = json_str.count('{') - close_braces = json_str.count('}') - if open_braces > close_braces: - return False # Likely incomplete - - elif json_str.startswith('['): - # Count brackets to see if they're balanced - open_brackets = json_str.count('[') - close_brackets = json_str.count(']') - if open_brackets > close_brackets: - return False # Likely incomplete - - # Check for incomplete strings (odd number of unescaped quotes) - quote_count = 0 - escaped = False - for char in json_str: - if char == '\\' and not escaped: - escaped = True - elif char == '"' and not escaped: - quote_count += 1 - else: - escaped = False - - if quote_count % 2 != 0: - return False # Incomplete string - - # If we get here, the JSON might be malformed but not necessarily incomplete - # Return False to be safe - return False - def _format_results_as_strings( self, results: List[Any], function_calls: List[Dict[str, Any]] ) -> List[str]: From 9ca24fd5453e9937d8d21a965a7a7ea09d0ec160 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Fri, 25 Jul 2025 22:16:53 +0530 Subject: [PATCH 10/11] new tools_streaming fixed !!! --- swarms/structs/agent.py | 399 ++++++++++++++++++++++++++++++---------- 1 file changed, 303 insertions(+), 96 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index ce57c65f..312b7e19 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1067,108 +1067,317 @@ class Agent: self.short_memory.return_history_as_string() ) - # Parameters - attempt = 0 - success = False - while attempt < self.retry_attempts and not success: - try: - - if img is not None: - response = self.call_llm( - task=task_prompt, - img=img, - current_loop=loop_count, - streaming_callback=streaming_callback, - *args, - **kwargs, - ) - else: - response = self.call_llm( - task=task_prompt, - current_loop=loop_count, - streaming_callback=streaming_callback, - *args, - **kwargs, - ) - - # If streaming is enabled, then don't print the response - - # Parse the response from the agent with the output type - if exists(self.tools_list_dictionary): - if isinstance(response, BaseModel): - response = response.model_dump() - - # Parse the response from the agent with the output type - response = self.parse_llm_output(response) - - self.short_memory.add( - role=self.agent_name, - content=response, - ) - - # Print - if self.print_on is True: - if isinstance(response, list): - self.pretty_print( - f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", - loop_count, + # If streaming is OFF, use the simple, non-streaming path + if not self.streaming_on: + attempt = 0 + success = False + while attempt < self.retry_attempts and not success: + try: + if img is not None: + response = self.call_llm( + task=task_prompt, + img=img, + current_loop=loop_count, + streaming_callback=streaming_callback, + *args, + **kwargs, ) - elif self.streaming_on: - pass else: - self.pretty_print( - response, loop_count + response = self.call_llm( + task=task_prompt, + current_loop=loop_count, + streaming_callback=streaming_callback, + *args, + **kwargs, ) - # Check and execute callable tools - if exists(self.tools): - self.tool_execution_retry( - response, loop_count - ) - - # Handle MCP tools - if ( - exists(self.mcp_url) - or exists(self.mcp_config) - or exists(self.mcp_urls) - ): - # Only handle MCP tools if response is not None - if response is not None: - self.mcp_tool_handling( - response=response, - current_loop=loop_count, + # Parse the response from the agent with the output type + if exists(self.tools_list_dictionary): + if isinstance(response, BaseModel): + response = response.model_dump() + + response = self.parse_llm_output(response) + self.short_memory.add(role=self.agent_name, content=response) + + if self.print_on is True: + if isinstance(response, list): + self.pretty_print( + f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", + loop_count, + ) + else: + self.pretty_print(response, loop_count) + + # Check and execute callable tools + if exists(self.tools): + self.tool_execution_retry(response, loop_count) + + # Handle MCP tools + if ( + exists(self.mcp_url) + or exists(self.mcp_config) + or exists(self.mcp_urls) + ): + if response is not None: + self.mcp_tool_handling( + response=response, + current_loop=loop_count, + ) + else: + logger.warning( + f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" + ) + + success = True # Exit retry loop + except Exception as e: + logger.error(f"Attempt {attempt + 1} failed: {e}") + attempt += 1 + if attempt >= self.retry_attempts: + logger.error("All retry attempts failed.") + break # Exit main loop if retries fail + + # If streaming is ON, use the new real-time tool execution path + else: + try: + # Get the raw streaming response directly from the underlying LLM client + # Bypass the LiteLLM wrapper's post-processing for streaming + raw_messages = self.short_memory.to_dict() + + # Convert to proper OpenAI message format + messages = [] + for msg in raw_messages: + # Normalize role names to lowercase + role = msg.get('role', '').lower() + content = msg.get('content', '') + + # Map common role variants to standard OpenAI roles + role_mapping = { + 'system': 'system', + 'user': 'user', + 'assistant': 'assistant', + 'human': 'user', # Map Human to user + 'tool executor': 'assistant', # Map Tool Executor to assistant + 'database': 'assistant', # Map Database to assistant + } + normalized_role = role_mapping.get(role, 'assistant') + + # Ensure content is in string format (not complex objects) + if isinstance(content, (dict, list)): + content = str(content) + elif not isinstance(content, str): + content = str(content) + + # Create properly formatted OpenAI message + openai_msg = { + 'role': normalized_role, + 'content': content + } + messages.append(openai_msg) + + # Prepare completion parameters for direct streaming + completion_params = { + "model": self.llm.model_name, + "messages": messages, + "stream": True, + "max_tokens": self.max_tokens, + "temperature": self.temperature, + } + + # Add tools if available + if self.tools_list_dictionary: + completion_params["tools"] = self.tools_list_dictionary + completion_params["tool_choice"] = "auto" + + # Add image if provided + if img is not None: + # Add image to the last message + if messages and len(messages) > 0: + last_message = messages[-1] + if isinstance(last_message.get("content"), str): + last_message["content"] = [ + {"type": "text", "text": last_message["content"]}, + {"type": "image_url", "image_url": {"url": img}} + ] + + # Get raw stream using the underlying client + from litellm import completion + stream = completion(**completion_params) + + full_text_response = "" + tool_calls_in_stream = [] + current_tool_call = None + tool_call_buffer = "" + + if self.print_on: + print(f"šŸ¤– {self.agent_name}: ", end="", flush=True) + + # Process streaming chunks in real-time + for chunk in stream: + # Debug: Log chunk processing if verbose + if self.verbose: + logger.debug(f"Processing streaming chunk: {type(chunk)}") + + chunk_type = getattr(chunk, 'type', None) + + # Anthropic-style stream parsing + if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': + tool_name = chunk.content_block.name + if self.print_on: + print(f"\nšŸ”§ Tool Call: {tool_name}...", flush=True) + current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} + tool_call_buffer = "" + + elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'input_json_delta': + tool_call_buffer += chunk.delta.partial_json + elif chunk.delta.type == 'text_delta': + text_chunk = chunk.delta.text + full_text_response += text_chunk + if self.print_on: + print(text_chunk, end="", flush=True) + + elif chunk_type == 'content_block_stop' and current_tool_call: + try: + tool_input = json.loads(tool_call_buffer) + current_tool_call["input"] = tool_input + tool_calls_in_stream.append(current_tool_call) + except json.JSONDecodeError: + logger.error(f"Failed to parse tool arguments: {tool_call_buffer}") + current_tool_call = None + tool_call_buffer = "" + + # OpenAI-style stream parsing + elif hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta') and choice.delta: + delta = choice.delta + + # Handle text content + if hasattr(delta, 'content') and delta.content: + text_chunk = delta.content + full_text_response += text_chunk + if self.print_on: + print(text_chunk, end="", flush=True) + + # Handle tool calls in streaming chunks + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + # Get tool call index + tool_index = getattr(tool_call, 'index', 0) + + # Ensure we have enough slots in the list + while len(tool_calls_in_stream) <= tool_index: + tool_calls_in_stream.append(None) + + if hasattr(tool_call, 'function') and tool_call.function: + func = tool_call.function + + # If this slot is empty and we have a function name, create new tool call + if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: + if self.print_on: + print(f"\nšŸ”§ Tool Call: {func.name}...", flush=True) + tool_calls_in_stream[tool_index] = { + "id": getattr(tool_call, 'id', f"call_{tool_index}"), + "name": func.name, + "arguments": "" + } + + # Accumulate arguments (whether it's a new call or continuation) + if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: + tool_calls_in_stream[tool_index]["arguments"] += func.arguments + + if self.verbose: + logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") + + # Try to parse if we have complete JSON + try: + args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) + tool_calls_in_stream[tool_index]["input"] = args_dict + tool_calls_in_stream[tool_index]["arguments_complete"] = True + if self.verbose: + logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") + except json.JSONDecodeError: + # Not complete yet, continue accumulating + pass + + if self.print_on: + print() # Newline after streaming text + + # After the stream is complete, check if any tools were called + if tool_calls_in_stream: + # Add the partial text response to memory + if full_text_response.strip(): + self.short_memory.add(role=self.agent_name, content=full_text_response) + + # Format tool calls for execution - only execute complete tool calls + formatted_tool_calls = [] + for tc in tool_calls_in_stream: + # Only execute tool calls that have complete arguments + if tc.get("input") or tc.get("arguments_complete"): + # Use input if available, otherwise parse arguments + if "input" in tc: + args_to_use = tc["input"] + else: + try: + args_to_use = json.loads(tc.get("arguments", "{}")) + except json.JSONDecodeError: + logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}") + continue + + formatted_tool_calls.append({ + "type": "function", + "function": {"name": tc["name"], "arguments": json.dumps(args_to_use)}, + "id": tc["id"] + }) + + # Execute all buffered tool calls if we have any complete ones + if formatted_tool_calls: + if self.verbose: + logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}") + + tool_results = self.tool_struct.execute_function_calls_from_api_response( + {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} ) + + if self.verbose: + logger.info(f"Tool execution results: {tool_results}") else: - logger.warning( - f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" + if self.verbose: + logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}") + tool_results = [] + + # Add tool results to memory + self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) + if self.print_on: + formatter.print_panel( + f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}", + "Tool Execution Result" ) - # self.sentiment_and_evaluator(response) + # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled + if self.tool_call_summary: + temp_llm = self.temp_llm_instance_for_tool_summary() + final_summary_response = temp_llm.run( + task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" + ) + response = self.parse_llm_output(final_summary_response) + self.short_memory.add(role=self.agent_name, content=response) + if self.print_on: + self.pretty_print(response, loop_count) + else: + response = f"Tool execution completed: {format_data_structure(tool_results)}" - success = True # Mark as successful to exit the retry loop + else: + # If no tools were called, the streamed text is the final response + response = full_text_response + if response.strip(): + self.short_memory.add(role=self.agent_name, content=response) except Exception as e: - - if self.autosave is True: - log_agent_data(self.to_dict()) - self.save() - - logger.error( - f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " - ) - attempt += 1 - - if not success: - - if self.autosave is True: - log_agent_data(self.to_dict()) - self.save() - - logger.error( - "Failed to generate a valid response after" - " retry attempts." - ) - break # Exit the loop if all retry attempts fail + logger.error(f"Error during streaming execution: {e}") + import traceback + traceback.print_exc() + break # Check stopping conditions if ( @@ -1191,10 +1400,7 @@ class Agent: break if self.interactive: - # logger.info("Interactive mode enabled.") user_input = input("You: ") - - # User-defined exit command if ( user_input.lower() == self.custom_exit_command.lower() @@ -1204,7 +1410,6 @@ class Agent: loop_count=loop_count, ) break - self.short_memory.add( role=self.user_name, content=user_input ) @@ -3003,7 +3208,6 @@ class Agent: "This may indicate the LLM did not return a valid response." ) return - try: output = self.tool_struct.execute_function_calls_from_api_response( response @@ -3041,6 +3245,9 @@ class Agent: Focus on the key information and insights that would be most relevant to the user's original request. If there are any errors or issues, highlight them prominently. + The user's original request was: + {self.task} + Tool Output: {output} """ From beac03c67d2caff938372ec8971263214b2ac01e Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Sat, 26 Jul 2025 17:16:06 +0530 Subject: [PATCH 11/11] updates ! --- swarms/structs/agent.py | 120 +++++++++++++++++++++++---- swarms/utils/litellm_wrapper.py | 142 +++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 19 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 312b7e19..5b2a0e49 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1360,9 +1360,39 @@ class Agent: final_summary_response = temp_llm.run( task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" ) + + # Handle streaming for final tool summary in real-time execution + if self.streaming_on and hasattr(final_summary_response, "__iter__") and not isinstance(final_summary_response, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"Real-time tool summary streaming chunk: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + final_summary_response, + title=f"šŸ¤– Agent: {self.agent_name} - Tool Summary (Real-time)", + style="green", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + final_summary_response = complete_response + + elif self.streaming_on and isinstance(final_summary_response, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(final_summary_response, delay=0.01) + response = self.parse_llm_output(final_summary_response) self.short_memory.add(role=self.agent_name, content=response) - if self.print_on: + + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print(response, loop_count) else: response = f"Tool execution completed: {format_data_structure(tool_results)}" @@ -3169,6 +3199,34 @@ class Agent: summary = temp_llm.run( task=self.short_memory.get_str() ) + + # Handle streaming MCP tool summary response + if self.streaming_on and hasattr(summary, "__iter__") and not isinstance(summary, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"MCP tool summary streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + summary, + title=f"šŸ¤– Agent: {self.agent_name} - MCP Tool Summary", + style="cyan", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + summary = complete_response + + elif self.streaming_on and isinstance(summary, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(summary, delay=0.01) + except Exception as e: logger.error( f"Error calling LLM after MCP tool execution: {e}" @@ -3176,7 +3234,8 @@ class Agent: # Fallback: provide a default summary summary = "I successfully executed the MCP tool and retrieved the information above." - if self.print_on is True: + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print(summary, loop_count=current_loop) # Add to the memory @@ -3193,7 +3252,7 @@ class Agent: temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=False, # Always disable streaming for tool summaries + stream=self.streaming_on, tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3239,26 +3298,55 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_response = temp_llm.run( - f""" - Please analyze and summarize the following tool execution output in a clear and concise way. - Focus on the key information and insights that would be most relevant to the user's original request. - If there are any errors or issues, highlight them prominently. + tool_summary_prompt = f""" + Please analyze and summarize the following tool execution output in a clear and concise way. + Focus on the key information and insights that would be most relevant to the user's original request. + If there are any errors or issues, highlight them prominently. + + The user's original request was: + {self.task} + + Tool Output: + {output} + """ + + tool_response = temp_llm.run(tool_summary_prompt) + + # Handle streaming tool response + if self.streaming_on and hasattr(tool_response, "__iter__") and not isinstance(tool_response, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"Tool response streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + tool_response, + title=f"šŸ¤– Agent: {self.agent_name} - Tool Summary", + style="blue", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) - The user's original request was: - {self.task} + tool_response = complete_response - Tool Output: - {output} - """ - ) - + elif self.streaming_on and isinstance(tool_response, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(tool_response, delay=0.01) + + # Add the tool response to memory self.short_memory.add( role=self.agent_name, content=tool_response, ) - if self.print_on is True: + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print( tool_response, loop_count, diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index aaa3f71e..e7a41671 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -153,6 +153,134 @@ class LiteLLM: litellm.drop_params = True + def _collect_streaming_response(self, streaming_response): + """ + Parse and yield individual content chunks from a streaming response. + + Args: + streaming_response: The streaming response object from litellm + + Yields: + str: Individual content chunks as they arrive + """ + try: + for chunk in streaming_response: + content = None + + # Handle different chunk formats + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + + # OpenAI-style chunks + if hasattr(choice, 'delta') and choice.delta: + if hasattr(choice.delta, 'content') and choice.delta.content: + content = choice.delta.content + + # Alternative chunk format + elif hasattr(choice, 'message') and choice.message: + if hasattr(choice.message, 'content') and choice.message.content: + content = choice.message.content + + # Anthropic-style chunks + elif hasattr(chunk, 'type'): + if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'text_delta': + content = chunk.delta.text + + # Handle direct content chunks + elif hasattr(chunk, 'content'): + content = chunk.content + + # Yield content chunk if we found any + if content: + yield content + + except Exception as e: + logger.error(f"Error parsing streaming chunks: {e}") + return + + async def _collect_streaming_response_async(self, streaming_response): + """ + Parse and yield individual content chunks from an async streaming response. + + Args: + streaming_response: The async streaming response object from litellm + + Yields: + str: Individual content chunks as they arrive + """ + try: + async for chunk in streaming_response: + content = None + + # Handle different chunk formats + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + + # OpenAI-style chunks + if hasattr(choice, 'delta') and choice.delta: + if hasattr(choice.delta, 'content') and choice.delta.content: + content = choice.delta.content + + # Alternative chunk format + elif hasattr(choice, 'message') and choice.message: + if hasattr(choice.message, 'content') and choice.message.content: + content = choice.message.content + + # Anthropic-style chunks + elif hasattr(chunk, 'type'): + if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'text_delta': + content = chunk.delta.text + + # Handle direct content chunks + elif hasattr(chunk, 'content'): + content = chunk.content + + # Yield content chunk if we found any + if content: + yield content + + except Exception as e: + logger.error(f"Error parsing async streaming chunks: {e}") + return + + def collect_all_chunks(self, streaming_response): + """ + Helper method to collect all chunks from a streaming response into a complete text. + This provides backward compatibility for code that expects a complete response. + + Args: + streaming_response: The streaming response object from litellm + + Returns: + str: The complete response text collected from all chunks + """ + chunks = [] + for chunk in self._collect_streaming_response(streaming_response): + chunks.append(chunk) + complete_response = "".join(chunks) + logger.info(f"Collected complete streaming response: {len(complete_response)} characters") + return complete_response + + async def collect_all_chunks_async(self, streaming_response): + """ + Helper method to collect all chunks from an async streaming response into a complete text. + This provides backward compatibility for code that expects a complete response. + + Args: + streaming_response: The async streaming response object from litellm + + Returns: + str: The complete response text collected from all chunks + """ + chunks = [] + async for chunk in self._collect_streaming_response_async(streaming_response): + chunks.append(chunk) + complete_response = "".join(chunks) + logger.info(f"Collected complete async streaming response: {len(complete_response)} characters") + return complete_response + def output_for_tools(self, response: any): if self.mcp_call is True: out = response.choices[0].message.tool_calls[0].function @@ -471,7 +599,9 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str: The content of the response from the model. + str or generator: When streaming is disabled, returns the complete response content. + When streaming is enabled, returns a generator that yields content chunks. + Use collect_all_chunks() to get complete response from the generator. Raises: Exception: If there is an error in processing the request. @@ -525,7 +655,7 @@ class LiteLLM: # Handle streaming response if self.stream: - return response # Return the streaming generator directly + return response # Handle tool-based response elif self.tools_list_dictionary is not None: @@ -574,7 +704,9 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str: The content of the response from the model. + str or async generator: When streaming is disabled, returns the complete response content. + When streaming is enabled, returns an async generator that yields content chunks. + Use collect_all_chunks_async() to get complete response from the generator. """ try: messages = self._prepare_messages(task) @@ -608,6 +740,10 @@ class LiteLLM: # Standard completion response = await acompletion(**completion_params) + # Handle streaming response for async + if self.stream: + return self._collect_streaming_response_async(response) + print(response) return response