diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 64eb1cfd..006817cb 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1198,192 +1198,64 @@ class Agent: logger.error("All retry attempts failed.") break # Exit main loop if retries fail - # If streaming is ON, use the new real-time tool execution path + # If streaming is ON with tools, use streaming tool execution path else: try: - # Get the raw streaming response directly from the underlying LLM client - # Bypass the LiteLLM wrapper's post-processing for streaming - raw_messages = self.short_memory.to_dict() - - # Convert to proper OpenAI message format - messages = [] - for msg in raw_messages: - # Normalize role names to lowercase - role = msg.get('role', '').lower() - content = msg.get('content', '') - - # Map common role variants to standard OpenAI roles - role_mapping = { - 'system': 'system', - 'user': 'user', - 'assistant': 'assistant', - 'human': 'user', # Map Human to user - 'tool executor': 'assistant', # Map Tool Executor to assistant - 'database': 'assistant', # Map Database to assistant - } - normalized_role = role_mapping.get(role, 'assistant') - - # Ensure content is in string format (not complex objects) - if isinstance(content, (dict, list)): - content = str(content) - elif not isinstance(content, str): - content = str(content) - - # Create properly formatted OpenAI message - openai_msg = { - 'role': normalized_role, - 'content': content - } - messages.append(openai_msg) - - # Prepare completion parameters for direct streaming - completion_params = { - "model": self.llm.model_name, - "messages": messages, - "stream": True, - "max_tokens": self.max_tokens, - "temperature": self.temperature, - } - - # Add tools if available - if self.tools_list_dictionary: - completion_params["tools"] = self.tools_list_dictionary - completion_params["tool_choice"] = "auto" - - # Add image if provided + # Get streaming response with tools if img is not None: - # Add image to the last message - if messages and len(messages) > 0: - last_message = messages[-1] - if isinstance(last_message.get("content"), str): - last_message["content"] = [ - {"type": "text", "text": last_message["content"]}, - {"type": "image_url", "image_url": {"url": img}} - ] - - # Get raw stream using the underlying client - from litellm import completion - stream = completion(**completion_params) - - full_text_response = "" - tool_calls_in_stream = [] - current_tool_call = None - tool_call_buffer = "" - - if self.print_on: - print(f"šŸ¤– {self.agent_name}: ", end="", flush=True) - - # Process streaming chunks in real-time - for chunk in stream: - # Debug: Log chunk processing if verbose - if self.verbose: - logger.debug(f"Processing streaming chunk: {type(chunk)}") - - chunk_type = getattr(chunk, 'type', None) - - # Anthropic-style stream parsing - if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': - tool_name = chunk.content_block.name - if self.print_on: - print(f"\nšŸ”§ Tool Call: {tool_name}...", flush=True) - current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} - tool_call_buffer = "" - - elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): - if chunk.delta.type == 'input_json_delta': - tool_call_buffer += chunk.delta.partial_json - elif chunk.delta.type == 'text_delta': - text_chunk = chunk.delta.text - full_text_response += text_chunk - if self.print_on: - print(text_chunk, end="", flush=True) - - elif chunk_type == 'content_block_stop' and current_tool_call: - try: - tool_input = json.loads(tool_call_buffer) - current_tool_call["input"] = tool_input - tool_calls_in_stream.append(current_tool_call) - except json.JSONDecodeError: - logger.error(f"Failed to parse tool arguments: {tool_call_buffer}") - current_tool_call = None - tool_call_buffer = "" - - # OpenAI-style stream parsing - elif hasattr(chunk, 'choices') and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, 'delta') and choice.delta: - delta = choice.delta - - # Handle text content - if hasattr(delta, 'content') and delta.content: - text_chunk = delta.content - full_text_response += text_chunk + stream_response = self.call_llm( + task=task_prompt, + img=img, + current_loop=loop_count, + *args, + **kwargs, + ) + else: + stream_response = self.call_llm( + task=task_prompt, + current_loop=loop_count, + *args, + **kwargs, + ) + + # Use streaming chunk parser from litellm_wrapper + if hasattr(self.llm, 'parse_streaming_chunks_with_tools'): + full_text_response, tool_calls_in_stream = self.llm.parse_streaming_chunks_with_tools( + stream=stream_response, + agent_name=self.agent_name, + print_on=self.print_on, + verbose=self.verbose, + ) + else: + # Fallback to simple text collection if method not available + full_text_response = "" + tool_calls_in_stream = [] + if hasattr(stream_response, "__iter__") and not isinstance(stream_response, str): + for chunk in stream_response: + if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + full_text_response += content if self.print_on: - print(text_chunk, end="", flush=True) - - # Handle tool calls in streaming chunks - if hasattr(delta, 'tool_calls') and delta.tool_calls: - for tool_call in delta.tool_calls: - # Get tool call index - tool_index = getattr(tool_call, 'index', 0) - - # Ensure we have enough slots in the list - while len(tool_calls_in_stream) <= tool_index: - tool_calls_in_stream.append(None) - - if hasattr(tool_call, 'function') and tool_call.function: - func = tool_call.function - - # If this slot is empty and we have a function name, create new tool call - if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: - if self.print_on: - print(f"\nšŸ”§ Tool Call: {func.name}...", flush=True) - tool_calls_in_stream[tool_index] = { - "id": getattr(tool_call, 'id', f"call_{tool_index}"), - "name": func.name, - "arguments": "" - } - - # Accumulate arguments (whether it's a new call or continuation) - if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: - tool_calls_in_stream[tool_index]["arguments"] += func.arguments - - if self.verbose: - logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") - - # Try to parse if we have complete JSON - try: - args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) - tool_calls_in_stream[tool_index]["input"] = args_dict - tool_calls_in_stream[tool_index]["arguments_complete"] = True - if self.verbose: - logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") - except json.JSONDecodeError: - # Not complete yet, continue accumulating - pass - - if self.print_on: - print() # Newline after streaming text - - # After the stream is complete, check if any tools were called + print(content, end="", flush=True) + if self.print_on: + print() + + # Handle tool calls if any were detected if tool_calls_in_stream: - # Add the partial text response to memory + # Add text response to memory if full_text_response.strip(): self.short_memory.add(role=self.agent_name, content=full_text_response) - # Format tool calls for execution - only execute complete tool calls + # Format and execute tool calls formatted_tool_calls = [] for tc in tool_calls_in_stream: - # Only execute tool calls that have complete arguments - if tc.get("input") or tc.get("arguments_complete"): - # Use input if available, otherwise parse arguments + if tc and (tc.get("input") or tc.get("arguments_complete")): if "input" in tc: args_to_use = tc["input"] else: try: args_to_use = json.loads(tc.get("arguments", "{}")) except json.JSONDecodeError: - logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}") continue formatted_tool_calls.append({ @@ -1392,66 +1264,48 @@ class Agent: "id": tc["id"] }) - # Execute all buffered tool calls if we have any complete ones if formatted_tool_calls: - if self.verbose: - logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}") - tool_results = self.tool_struct.execute_function_calls_from_api_response( {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} ) - if self.verbose: - logger.info(f"Tool execution results: {tool_results}") - else: - if self.verbose: - logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}") - tool_results = [] - - # Add tool results to memory - self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) - if self.print_on: - formatter.print_panel( - f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}", - "Tool Execution Result" - ) - - # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled - if self.tool_call_summary: - temp_llm = self.temp_llm_instance_for_tool_summary() - - # Use centralized streaming logic for real-time tool summary - if self.streaming_on: - final_summary_response = temp_llm.run_tool_summary_with_streaming( - tool_results=format_data_structure(tool_results), - agent_name=f"{self.agent_name} - Real-time", - print_on=self.print_on, - verbose=self.verbose, - ) - else: - final_summary_response = temp_llm.run( - task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" + self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) + if self.print_on: + formatter.print_panel( + f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}", + "Tool Execution Result" ) - - response = self.parse_llm_output(final_summary_response) - self.short_memory.add(role=self.agent_name, content=response) - - # Only pretty_print if streaming is off (to avoid double printing) - if self.print_on and not self.streaming_on: - self.pretty_print(response, loop_count) - else: - response = f"Tool execution completed: {format_data_structure(tool_results)}" + # Tool summary if enabled + if self.tool_call_summary: + temp_llm = self.temp_llm_instance_for_tool_summary() + if self.streaming_on and hasattr(temp_llm, 'run_tool_summary_with_streaming'): + final_summary_response = temp_llm.run_tool_summary_with_streaming( + tool_results=format_data_structure(tool_results), + agent_name=f"{self.agent_name} - Real-time", + print_on=self.print_on, + verbose=self.verbose, + ) + else: + final_summary_response = temp_llm.run( + task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" + ) + + response = self.parse_llm_output(final_summary_response) + self.short_memory.add(role=self.agent_name, content=response) + + if self.print_on and not self.streaming_on: + self.pretty_print(response, loop_count) + else: + response = f"Tool execution completed: {format_data_structure(tool_results)}" else: - # If no tools were called, the streamed text is the final response + # No tools called, use streamed text as response response = full_text_response if response.strip(): self.short_memory.add(role=self.agent_name, content=response) except Exception as e: logger.error(f"Error during streaming execution: {e}") - import traceback - traceback.print_exc() break # Check stopping conditions @@ -2791,8 +2645,24 @@ class Agent: del kwargs["is_last"] try: - # Use centralized streaming logic from wrapper if streaming is enabled - if self.streaming_on and hasattr(self.llm, "run_with_streaming"): + # For streaming with tools, return raw stream for custom parsing + if self.streaming_on and exists(self.tools_list_dictionary): + # Set stream mode and get raw streaming response + original_stream = getattr(self.llm, 'stream', False) + self.llm.stream = True + + try: + if img is not None: + stream_response = self.llm.run(task=task, img=img, *args, **kwargs) + else: + stream_response = self.llm.run(task=task, *args, **kwargs) + return stream_response + finally: + # Restore original stream setting + self.llm.stream = original_stream + + # Use centralized streaming logic from wrapper if streaming is enabled (no tools) + elif self.streaming_on and hasattr(self.llm, "run_with_streaming"): return self.llm.run_with_streaming( task=task, img=img, diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index abc60a61..a03605b1 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -819,6 +819,126 @@ class LiteLLM: time.sleep(delay) print() # Newline at the end + def parse_streaming_chunks_with_tools( + self, + stream, + agent_name: str = "Agent", + print_on: bool = True, + verbose: bool = False, + ) -> tuple: + """ + Parse streaming chunks and extract both text and tool calls. + + Args: + stream: The streaming response object + agent_name: Name of the agent for printing + print_on: Whether to print streaming output + verbose: Whether to enable verbose logging + + Returns: + tuple: (full_text_response, tool_calls_list) + """ + import json + + full_text_response = "" + tool_calls_in_stream = [] + current_tool_call = None + tool_call_buffer = "" + + if print_on: + print(f"šŸ¤– {agent_name}: ", end="", flush=True) + + # Process streaming chunks in real-time + for chunk in stream: + if verbose: + logger.debug(f"Processing streaming chunk: {type(chunk)}") + + chunk_type = getattr(chunk, 'type', None) + + # Anthropic-style stream parsing + if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': + tool_name = chunk.content_block.name + if print_on: + print(f"\nšŸ”§ Tool Call: {tool_name}...", flush=True) + current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} + tool_call_buffer = "" + + elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'input_json_delta': + tool_call_buffer += chunk.delta.partial_json + elif chunk.delta.type == 'text_delta': + text_chunk = chunk.delta.text + full_text_response += text_chunk + if print_on: + print(text_chunk, end="", flush=True) + + elif chunk_type == 'content_block_stop' and current_tool_call: + try: + tool_input = json.loads(tool_call_buffer) + current_tool_call["input"] = tool_input + tool_calls_in_stream.append(current_tool_call) + except json.JSONDecodeError: + logger.error(f"Failed to parse tool arguments: {tool_call_buffer}") + current_tool_call = None + tool_call_buffer = "" + + # OpenAI-style stream parsing + elif hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta') and choice.delta: + delta = choice.delta + + # Handle text content + if hasattr(delta, 'content') and delta.content: + text_chunk = delta.content + full_text_response += text_chunk + if print_on: + print(text_chunk, end="", flush=True) + + # Handle tool calls in streaming chunks + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + tool_index = getattr(tool_call, 'index', 0) + + # Ensure we have enough slots in the list + while len(tool_calls_in_stream) <= tool_index: + tool_calls_in_stream.append(None) + + if hasattr(tool_call, 'function') and tool_call.function: + func = tool_call.function + + # Create new tool call if slot is empty and we have a function name + if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: + if print_on: + print(f"\nšŸ”§ Tool Call: {func.name}...", flush=True) + tool_calls_in_stream[tool_index] = { + "id": getattr(tool_call, 'id', f"call_{tool_index}"), + "name": func.name, + "arguments": "" + } + + # Accumulate arguments + if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: + tool_calls_in_stream[tool_index]["arguments"] += func.arguments + + if verbose: + logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") + + # Try to parse if we have complete JSON + try: + args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) + tool_calls_in_stream[tool_index]["input"] = args_dict + tool_calls_in_stream[tool_index]["arguments_complete"] = True + if verbose: + logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") + except json.JSONDecodeError: + pass + + if print_on: + print() # Newline after streaming text + + return full_text_response, tool_calls_in_stream + def run( self, task: str,