From 1b8d986a235cf4d11f7c702ee1f84e3487890543 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Wed, 13 Aug 2025 09:02:01 +0530 Subject: [PATCH] cleanup ! --- swarms/structs/agent.py | 274 +++++++++++++++++----------------------- 1 file changed, 118 insertions(+), 156 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 006817cb..a9f5b434 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1129,184 +1129,148 @@ class Agent: self.short_memory.return_history_as_string() ) - # If streaming is OFF, use the simple, non-streaming path - if not self.streaming_on: - attempt = 0 - success = False - while attempt < self.retry_attempts and not success: - try: - if img is not None: - response = self.call_llm( - task=task_prompt, - img=img, - current_loop=loop_count, - streaming_callback=streaming_callback, - *args, - **kwargs, - ) - else: - response = self.call_llm( - task=task_prompt, - current_loop=loop_count, - streaming_callback=streaming_callback, - *args, - **kwargs, - ) - - # Parse the response from the agent with the output type - if exists(self.tools_list_dictionary): - if isinstance(response, BaseModel): - response = response.model_dump() - - response = self.parse_llm_output(response) - self.short_memory.add(role=self.agent_name, content=response) - - if self.print_on is True: - if isinstance(response, list): - self.pretty_print( - f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", - loop_count, - ) - else: - self.pretty_print(response, loop_count) - - # Check and execute callable tools - if exists(self.tools): - self.tool_execution_retry(response, loop_count) - - # Handle MCP tools - if ( - exists(self.mcp_url) - or exists(self.mcp_config) - or exists(self.mcp_urls) - ): - if response is not None: - self.mcp_tool_handling( - response=response, - current_loop=loop_count, - ) - else: - logger.warning( - f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" - ) - - success = True # Exit retry loop - except Exception as e: - logger.error(f"Attempt {attempt + 1} failed: {e}") - attempt += 1 - if attempt >= self.retry_attempts: - logger.error("All retry attempts failed.") - break # Exit main loop if retries fail - - # If streaming is ON with tools, use streaming tool execution path - else: + # Parameters + attempt = 0 + success = False + while attempt < self.retry_attempts and not success: try: - # Get streaming response with tools if img is not None: - stream_response = self.call_llm( + response = self.call_llm( task=task_prompt, img=img, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) else: - stream_response = self.call_llm( + response = self.call_llm( task=task_prompt, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) - # Use streaming chunk parser from litellm_wrapper - if hasattr(self.llm, 'parse_streaming_chunks_with_tools'): - full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools( - stream=stream_response, - agent_name=self.agent_name, - print_on=self.print_on, - verbose=self.verbose, - ) - else: - # Fallback to simple text collection if method not available - full_text_response = "" - tool_calls_in_stream = [] - if hasattr(stream_response, "__iter__") and not isinstance(stream_response, str): - for chunk in stream_response: + # Handle streaming response with tools + if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str): + # Parse streaming chunks with tools using wrapper + if hasattr(self.llm, 'parse_streaming_chunks_with_tools'): + full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools( + stream=response, + agent_name="Agent", + print_on=self.print_on, + verbose=self.verbose, + ) + + # Handle tool calls if any were detected + if tool_calls_in_stream: + # Add text response to memory first + if full_text_response.strip(): + self.short_memory.add(role=self.agent_name, content=full_text_response) + + # Format and execute tool calls + import json + formatted_tool_calls = [] + for tc in tool_calls_in_stream: + if tc and (tc.get("input") or tc.get("arguments_complete")): + if "input" in tc: + args_to_use = tc["input"] + else: + try: + args_to_use = json.loads(tc.get("arguments", "{}")) + except json.JSONDecodeError: + continue + + formatted_tool_calls.append({ + "type": "function", + "function": {"name": tc["name"], "arguments": json.dumps(args_to_use)}, + "id": tc["id"] + }) + + if formatted_tool_calls: + # Execute tools using existing tool structure + response = {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} + else: + # No tools called, use streamed text as response + response = full_text_response + if response.strip(): + self.short_memory.add(role=self.agent_name, content=response) + else: + # Fallback: collect text manually + text_chunks = [] + for chunk in response: if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content - full_text_response += content + text_chunks.append(content) if self.print_on: print(content, end="", flush=True) if self.print_on: print() - - # Handle tool calls if any were detected - if tool_calls_in_stream: - # Add text response to memory - if full_text_response.strip(): - self.short_memory.add(role=self.agent_name, content=full_text_response) - - # Format and execute tool calls - formatted_tool_calls = [] - for tc in tool_calls_in_stream: - if tc and (tc.get("input") or tc.get("arguments_complete")): - if "input" in tc: - args_to_use = tc["input"] - else: - try: - args_to_use = json.loads(tc.get("arguments", "{}")) - except json.JSONDecodeError: - continue - - formatted_tool_calls.append({ - "type": "function", - "function": {"name": tc["name"], "arguments": json.dumps(args_to_use)}, - "id": tc["id"] - }) - - if formatted_tool_calls: - tool_results = self.tool_struct.execute_function_calls_from_api_response( - {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} - ) - - self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) - if self.print_on: - formatter.print_panel( - f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}", - "Tool Execution Result" - ) - - # Tool summary if enabled - if self.tool_call_summary: - temp_llm = self.temp_llm_instance_for_tool_summary() - if self.streaming_on and hasattr(temp_llm, 'run_tool_summary_with_streaming'): - final_summary_response = temp_llm.run_tool_summary_with_streaming( - tool_results=format_data_structure(tool_results), - agent_name=f"{self.agent_name} - Real-time", - print_on=self.print_on, - verbose=self.verbose, - ) - else: - final_summary_response = temp_llm.run( - task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" - ) - - response = self.parse_llm_output(final_summary_response) + response = "".join(text_chunks) + if response.strip(): self.short_memory.add(role=self.agent_name, content=response) - - if self.print_on and not self.streaming_on: - self.pretty_print(response, loop_count) - else: - response = f"Tool execution completed: {format_data_structure(tool_results)}" else: - # No tools called, use streamed text as response - response = full_text_response - if response.strip(): - self.short_memory.add(role=self.agent_name, content=response) + # Parse the response from the agent with the output type + if exists(self.tools_list_dictionary): + if isinstance(response, BaseModel): + response = response.model_dump() + response = self.parse_llm_output(response) + self.short_memory.add(role=self.agent_name, content=response) + + # Print + if self.print_on is True: + if isinstance(response, list): + self.pretty_print( + f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", + loop_count, + ) + elif self.streaming_on: + pass + else: + self.pretty_print(response, loop_count) + + # Check and execute callable tools + if exists(self.tools): + self.tool_execution_retry(response, loop_count) + + # Handle MCP tools + if ( + exists(self.mcp_url) + or exists(self.mcp_config) + or exists(self.mcp_urls) + ): + if response is not None: + self.mcp_tool_handling( + response=response, + current_loop=loop_count, + ) + else: + logger.warning( + f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" + ) + + success = True except Exception as e: - logger.error(f"Error during streaming execution: {e}") - break + if self.autosave is True: + log_agent_data(self.to_dict()) + self.save() + + logger.error( + f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " + ) + attempt += 1 + + if not success: + if self.autosave is True: + log_agent_data(self.to_dict()) + self.save() + + logger.error( + "Failed to generate a valid response after retry attempts." + ) + break # Check stopping conditions if ( @@ -2645,9 +2609,8 @@ class Agent: del kwargs["is_last"] try: - # For streaming with tools, return raw stream for custom parsing + # Special handling for streaming with tools - need raw stream for parsing if self.streaming_on and exists(self.tools_list_dictionary): - # Set stream mode and get raw streaming response original_stream = getattr(self.llm, 'stream', False) self.llm.stream = True @@ -2658,7 +2621,6 @@ class Agent: stream_response = self.llm.run(task=task, *args, **kwargs) return stream_response finally: - # Restore original stream setting self.llm.stream = original_stream # Use centralized streaming logic from wrapper if streaming is enabled (no tools)