diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index ce57c65f..5b2a0e49 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1067,108 +1067,347 @@ class Agent: self.short_memory.return_history_as_string() ) - # Parameters - attempt = 0 - success = False - while attempt < self.retry_attempts and not success: - try: - - if img is not None: - response = self.call_llm( - task=task_prompt, - img=img, - current_loop=loop_count, - streaming_callback=streaming_callback, - *args, - **kwargs, - ) - else: - response = self.call_llm( - task=task_prompt, - current_loop=loop_count, - streaming_callback=streaming_callback, - *args, - **kwargs, - ) - - # If streaming is enabled, then don't print the response - - # Parse the response from the agent with the output type - if exists(self.tools_list_dictionary): - if isinstance(response, BaseModel): - response = response.model_dump() - - # Parse the response from the agent with the output type - response = self.parse_llm_output(response) - - self.short_memory.add( - role=self.agent_name, - content=response, - ) - - # Print - if self.print_on is True: - if isinstance(response, list): - self.pretty_print( - f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", - loop_count, + # If streaming is OFF, use the simple, non-streaming path + if not self.streaming_on: + attempt = 0 + success = False + while attempt < self.retry_attempts and not success: + try: + if img is not None: + response = self.call_llm( + task=task_prompt, + img=img, + current_loop=loop_count, + streaming_callback=streaming_callback, + *args, + **kwargs, ) - elif self.streaming_on: - pass else: - self.pretty_print( - response, loop_count + response = self.call_llm( + task=task_prompt, + current_loop=loop_count, + streaming_callback=streaming_callback, + *args, + **kwargs, ) - # Check and execute callable tools - if exists(self.tools): - self.tool_execution_retry( - response, loop_count - ) - - # Handle MCP tools - if ( - exists(self.mcp_url) - or exists(self.mcp_config) - or exists(self.mcp_urls) - ): - # Only handle MCP tools if response is not None - if response is not None: - self.mcp_tool_handling( - response=response, - current_loop=loop_count, + # Parse the response from the agent with the output type + if exists(self.tools_list_dictionary): + if isinstance(response, BaseModel): + response = response.model_dump() + + response = self.parse_llm_output(response) + self.short_memory.add(role=self.agent_name, content=response) + + if self.print_on is True: + if isinstance(response, list): + self.pretty_print( + f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", + loop_count, + ) + else: + self.pretty_print(response, loop_count) + + # Check and execute callable tools + if exists(self.tools): + self.tool_execution_retry(response, loop_count) + + # Handle MCP tools + if ( + exists(self.mcp_url) + or exists(self.mcp_config) + or exists(self.mcp_urls) + ): + if response is not None: + self.mcp_tool_handling( + response=response, + current_loop=loop_count, + ) + else: + logger.warning( + f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" + ) + + success = True # Exit retry loop + except Exception as e: + logger.error(f"Attempt {attempt + 1} failed: {e}") + attempt += 1 + if attempt >= self.retry_attempts: + logger.error("All retry attempts failed.") + break # Exit main loop if retries fail + + # If streaming is ON, use the new real-time tool execution path + else: + try: + # Get the raw streaming response directly from the underlying LLM client + # Bypass the LiteLLM wrapper's post-processing for streaming + raw_messages = self.short_memory.to_dict() + + # Convert to proper OpenAI message format + messages = [] + for msg in raw_messages: + # Normalize role names to lowercase + role = msg.get('role', '').lower() + content = msg.get('content', '') + + # Map common role variants to standard OpenAI roles + role_mapping = { + 'system': 'system', + 'user': 'user', + 'assistant': 'assistant', + 'human': 'user', # Map Human to user + 'tool executor': 'assistant', # Map Tool Executor to assistant + 'database': 'assistant', # Map Database to assistant + } + normalized_role = role_mapping.get(role, 'assistant') + + # Ensure content is in string format (not complex objects) + if isinstance(content, (dict, list)): + content = str(content) + elif not isinstance(content, str): + content = str(content) + + # Create properly formatted OpenAI message + openai_msg = { + 'role': normalized_role, + 'content': content + } + messages.append(openai_msg) + + # Prepare completion parameters for direct streaming + completion_params = { + "model": self.llm.model_name, + "messages": messages, + "stream": True, + "max_tokens": self.max_tokens, + "temperature": self.temperature, + } + + # Add tools if available + if self.tools_list_dictionary: + completion_params["tools"] = self.tools_list_dictionary + completion_params["tool_choice"] = "auto" + + # Add image if provided + if img is not None: + # Add image to the last message + if messages and len(messages) > 0: + last_message = messages[-1] + if isinstance(last_message.get("content"), str): + last_message["content"] = [ + {"type": "text", "text": last_message["content"]}, + {"type": "image_url", "image_url": {"url": img}} + ] + + # Get raw stream using the underlying client + from litellm import completion + stream = completion(**completion_params) + + full_text_response = "" + tool_calls_in_stream = [] + current_tool_call = None + tool_call_buffer = "" + + if self.print_on: + print(f"šŸ¤– {self.agent_name}: ", end="", flush=True) + + # Process streaming chunks in real-time + for chunk in stream: + # Debug: Log chunk processing if verbose + if self.verbose: + logger.debug(f"Processing streaming chunk: {type(chunk)}") + + chunk_type = getattr(chunk, 'type', None) + + # Anthropic-style stream parsing + if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': + tool_name = chunk.content_block.name + if self.print_on: + print(f"\nšŸ”§ Tool Call: {tool_name}...", flush=True) + current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} + tool_call_buffer = "" + + elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'input_json_delta': + tool_call_buffer += chunk.delta.partial_json + elif chunk.delta.type == 'text_delta': + text_chunk = chunk.delta.text + full_text_response += text_chunk + if self.print_on: + print(text_chunk, end="", flush=True) + + elif chunk_type == 'content_block_stop' and current_tool_call: + try: + tool_input = json.loads(tool_call_buffer) + current_tool_call["input"] = tool_input + tool_calls_in_stream.append(current_tool_call) + except json.JSONDecodeError: + logger.error(f"Failed to parse tool arguments: {tool_call_buffer}") + current_tool_call = None + tool_call_buffer = "" + + # OpenAI-style stream parsing + elif hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, 'delta') and choice.delta: + delta = choice.delta + + # Handle text content + if hasattr(delta, 'content') and delta.content: + text_chunk = delta.content + full_text_response += text_chunk + if self.print_on: + print(text_chunk, end="", flush=True) + + # Handle tool calls in streaming chunks + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + # Get tool call index + tool_index = getattr(tool_call, 'index', 0) + + # Ensure we have enough slots in the list + while len(tool_calls_in_stream) <= tool_index: + tool_calls_in_stream.append(None) + + if hasattr(tool_call, 'function') and tool_call.function: + func = tool_call.function + + # If this slot is empty and we have a function name, create new tool call + if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: + if self.print_on: + print(f"\nšŸ”§ Tool Call: {func.name}...", flush=True) + tool_calls_in_stream[tool_index] = { + "id": getattr(tool_call, 'id', f"call_{tool_index}"), + "name": func.name, + "arguments": "" + } + + # Accumulate arguments (whether it's a new call or continuation) + if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: + tool_calls_in_stream[tool_index]["arguments"] += func.arguments + + if self.verbose: + logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") + + # Try to parse if we have complete JSON + try: + args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) + tool_calls_in_stream[tool_index]["input"] = args_dict + tool_calls_in_stream[tool_index]["arguments_complete"] = True + if self.verbose: + logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") + except json.JSONDecodeError: + # Not complete yet, continue accumulating + pass + + if self.print_on: + print() # Newline after streaming text + + # After the stream is complete, check if any tools were called + if tool_calls_in_stream: + # Add the partial text response to memory + if full_text_response.strip(): + self.short_memory.add(role=self.agent_name, content=full_text_response) + + # Format tool calls for execution - only execute complete tool calls + formatted_tool_calls = [] + for tc in tool_calls_in_stream: + # Only execute tool calls that have complete arguments + if tc.get("input") or tc.get("arguments_complete"): + # Use input if available, otherwise parse arguments + if "input" in tc: + args_to_use = tc["input"] + else: + try: + args_to_use = json.loads(tc.get("arguments", "{}")) + except json.JSONDecodeError: + logger.warning(f"Could not parse arguments for tool {tc.get('name')}: {tc.get('arguments')}") + continue + + formatted_tool_calls.append({ + "type": "function", + "function": {"name": tc["name"], "arguments": json.dumps(args_to_use)}, + "id": tc["id"] + }) + + # Execute all buffered tool calls if we have any complete ones + if formatted_tool_calls: + if self.verbose: + logger.info(f"Executing {len(formatted_tool_calls)} tool calls: {formatted_tool_calls}") + + tool_results = self.tool_struct.execute_function_calls_from_api_response( + {"choices": [{"message": {"tool_calls": formatted_tool_calls}}]} ) + + if self.verbose: + logger.info(f"Tool execution results: {tool_results}") else: - logger.warning( - f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" + if self.verbose: + logger.warning(f"No complete tool calls found. Tool calls in stream: {tool_calls_in_stream}") + tool_results = [] + + # Add tool results to memory + self.short_memory.add(role="Tool Executor", content=format_data_structure(tool_results)) + if self.print_on: + formatter.print_panel( + f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]\nResults:\n{format_data_structure(tool_results)}", + "Tool Execution Result" ) - # self.sentiment_and_evaluator(response) + # Make a final call to the LLM to summarize the tool results if tool_call_summary is enabled + if self.tool_call_summary: + temp_llm = self.temp_llm_instance_for_tool_summary() + final_summary_response = temp_llm.run( + task=f"Please analyze and summarize the following tool execution output:\n\n{format_data_structure(tool_results)}" + ) + + # Handle streaming for final tool summary in real-time execution + if self.streaming_on and hasattr(final_summary_response, "__iter__") and not isinstance(final_summary_response, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"Real-time tool summary streaming chunk: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + final_summary_response, + title=f"šŸ¤– Agent: {self.agent_name} - Tool Summary (Real-time)", + style="green", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + final_summary_response = complete_response + + elif self.streaming_on and isinstance(final_summary_response, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(final_summary_response, delay=0.01) + + response = self.parse_llm_output(final_summary_response) + self.short_memory.add(role=self.agent_name, content=response) + + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: + self.pretty_print(response, loop_count) + else: + response = f"Tool execution completed: {format_data_structure(tool_results)}" - success = True # Mark as successful to exit the retry loop + else: + # If no tools were called, the streamed text is the final response + response = full_text_response + if response.strip(): + self.short_memory.add(role=self.agent_name, content=response) except Exception as e: - - if self.autosave is True: - log_agent_data(self.to_dict()) - self.save() - - logger.error( - f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " - ) - attempt += 1 - - if not success: - - if self.autosave is True: - log_agent_data(self.to_dict()) - self.save() - - logger.error( - "Failed to generate a valid response after" - " retry attempts." - ) - break # Exit the loop if all retry attempts fail + logger.error(f"Error during streaming execution: {e}") + import traceback + traceback.print_exc() + break # Check stopping conditions if ( @@ -1191,10 +1430,7 @@ class Agent: break if self.interactive: - # logger.info("Interactive mode enabled.") user_input = input("You: ") - - # User-defined exit command if ( user_input.lower() == self.custom_exit_command.lower() @@ -1204,7 +1440,6 @@ class Agent: loop_count=loop_count, ) break - self.short_memory.add( role=self.user_name, content=user_input ) @@ -2964,6 +3199,34 @@ class Agent: summary = temp_llm.run( task=self.short_memory.get_str() ) + + # Handle streaming MCP tool summary response + if self.streaming_on and hasattr(summary, "__iter__") and not isinstance(summary, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"MCP tool summary streaming chunk received: {chunk[:50]}...") + + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + summary, + title=f"šŸ¤– Agent: {self.agent_name} - MCP Tool Summary", + style="cyan", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + summary = complete_response + + elif self.streaming_on and isinstance(summary, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(summary, delay=0.01) + except Exception as e: logger.error( f"Error calling LLM after MCP tool execution: {e}" @@ -2971,7 +3234,8 @@ class Agent: # Fallback: provide a default summary summary = "I successfully executed the MCP tool and retrieved the information above." - if self.print_on is True: + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print(summary, loop_count=current_loop) # Add to the memory @@ -2988,7 +3252,7 @@ class Agent: temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=False, # Always disable streaming for tool summaries + stream=self.streaming_on, tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3003,7 +3267,6 @@ class Agent: "This may indicate the LLM did not return a valid response." ) return - try: output = self.tool_struct.execute_function_calls_from_api_response( response @@ -3035,23 +3298,55 @@ class Agent: if self.tool_call_summary is True: temp_llm = self.temp_llm_instance_for_tool_summary() - tool_response = temp_llm.run( - f""" - Please analyze and summarize the following tool execution output in a clear and concise way. - Focus on the key information and insights that would be most relevant to the user's original request. - If there are any errors or issues, highlight them prominently. - - Tool Output: - {output} - """ - ) + tool_summary_prompt = f""" + Please analyze and summarize the following tool execution output in a clear and concise way. + Focus on the key information and insights that would be most relevant to the user's original request. + If there are any errors or issues, highlight them prominently. + + The user's original request was: + {self.task} + + Tool Output: + {output} + """ + + tool_response = temp_llm.run(tool_summary_prompt) + + # Handle streaming tool response + if self.streaming_on and hasattr(tool_response, "__iter__") and not isinstance(tool_response, str): + # Collect chunks for conversation saving + collected_chunks = [] + + def on_chunk_received(chunk: str): + """Callback to collect chunks as they arrive""" + collected_chunks.append(chunk) + if self.verbose: + logger.debug(f"Tool response streaming chunk received: {chunk[:50]}...") + # Use the streaming panel to display and collect the response + complete_response = formatter.print_streaming_panel( + tool_response, + title=f"šŸ¤– Agent: {self.agent_name} - Tool Summary", + style="blue", + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + tool_response = complete_response + + elif self.streaming_on and isinstance(tool_response, str): + # If streaming is on but we got a string response, display it streamed + if self.print_on: + self.stream_response(tool_response, delay=0.01) + + # Add the tool response to memory self.short_memory.add( role=self.agent_name, content=tool_response, ) - if self.print_on is True: + # Only pretty_print if streaming is off (to avoid double printing) + if self.print_on and not self.streaming_on: self.pretty_print( tool_response, loop_count, diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index 0b546be5..a56c9f1f 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -27,6 +27,12 @@ dashboard_live = None # Create a spinner for loading animation spinner = Spinner("dots", style="yellow") +# Global lock to ensure only a single Rich Live context is active at any moment. +# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same +# console raise runtime errors. Using a module-level lock serialises access and +# prevents crashes when multiple agents stream simultaneously in different +# threads (e.g., in ConcurrentWorkflow). +live_render_lock = threading.Lock() def choose_random_color(): import random @@ -308,6 +314,7 @@ class Formatter: ): # Add ONLY the new chunk to the Text object with random color style chunk = part.choices[0].delta.content + streaming_text.append( chunk, style=text_style ) diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index aaa3f71e..e7a41671 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -153,6 +153,134 @@ class LiteLLM: litellm.drop_params = True + def _collect_streaming_response(self, streaming_response): + """ + Parse and yield individual content chunks from a streaming response. + + Args: + streaming_response: The streaming response object from litellm + + Yields: + str: Individual content chunks as they arrive + """ + try: + for chunk in streaming_response: + content = None + + # Handle different chunk formats + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + + # OpenAI-style chunks + if hasattr(choice, 'delta') and choice.delta: + if hasattr(choice.delta, 'content') and choice.delta.content: + content = choice.delta.content + + # Alternative chunk format + elif hasattr(choice, 'message') and choice.message: + if hasattr(choice.message, 'content') and choice.message.content: + content = choice.message.content + + # Anthropic-style chunks + elif hasattr(chunk, 'type'): + if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'text_delta': + content = chunk.delta.text + + # Handle direct content chunks + elif hasattr(chunk, 'content'): + content = chunk.content + + # Yield content chunk if we found any + if content: + yield content + + except Exception as e: + logger.error(f"Error parsing streaming chunks: {e}") + return + + async def _collect_streaming_response_async(self, streaming_response): + """ + Parse and yield individual content chunks from an async streaming response. + + Args: + streaming_response: The async streaming response object from litellm + + Yields: + str: Individual content chunks as they arrive + """ + try: + async for chunk in streaming_response: + content = None + + # Handle different chunk formats + if hasattr(chunk, 'choices') and chunk.choices: + choice = chunk.choices[0] + + # OpenAI-style chunks + if hasattr(choice, 'delta') and choice.delta: + if hasattr(choice.delta, 'content') and choice.delta.content: + content = choice.delta.content + + # Alternative chunk format + elif hasattr(choice, 'message') and choice.message: + if hasattr(choice.message, 'content') and choice.message.content: + content = choice.message.content + + # Anthropic-style chunks + elif hasattr(chunk, 'type'): + if chunk.type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'text_delta': + content = chunk.delta.text + + # Handle direct content chunks + elif hasattr(chunk, 'content'): + content = chunk.content + + # Yield content chunk if we found any + if content: + yield content + + except Exception as e: + logger.error(f"Error parsing async streaming chunks: {e}") + return + + def collect_all_chunks(self, streaming_response): + """ + Helper method to collect all chunks from a streaming response into a complete text. + This provides backward compatibility for code that expects a complete response. + + Args: + streaming_response: The streaming response object from litellm + + Returns: + str: The complete response text collected from all chunks + """ + chunks = [] + for chunk in self._collect_streaming_response(streaming_response): + chunks.append(chunk) + complete_response = "".join(chunks) + logger.info(f"Collected complete streaming response: {len(complete_response)} characters") + return complete_response + + async def collect_all_chunks_async(self, streaming_response): + """ + Helper method to collect all chunks from an async streaming response into a complete text. + This provides backward compatibility for code that expects a complete response. + + Args: + streaming_response: The async streaming response object from litellm + + Returns: + str: The complete response text collected from all chunks + """ + chunks = [] + async for chunk in self._collect_streaming_response_async(streaming_response): + chunks.append(chunk) + complete_response = "".join(chunks) + logger.info(f"Collected complete async streaming response: {len(complete_response)} characters") + return complete_response + def output_for_tools(self, response: any): if self.mcp_call is True: out = response.choices[0].message.tool_calls[0].function @@ -471,7 +599,9 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str: The content of the response from the model. + str or generator: When streaming is disabled, returns the complete response content. + When streaming is enabled, returns a generator that yields content chunks. + Use collect_all_chunks() to get complete response from the generator. Raises: Exception: If there is an error in processing the request. @@ -525,7 +655,7 @@ class LiteLLM: # Handle streaming response if self.stream: - return response # Return the streaming generator directly + return response # Handle tool-based response elif self.tools_list_dictionary is not None: @@ -574,7 +704,9 @@ class LiteLLM: **kwargs: Additional keyword arguments. Returns: - str: The content of the response from the model. + str or async generator: When streaming is disabled, returns the complete response content. + When streaming is enabled, returns an async generator that yields content chunks. + Use collect_all_chunks_async() to get complete response from the generator. """ try: messages = self._prepare_messages(task) @@ -608,6 +740,10 @@ class LiteLLM: # Standard completion response = await acompletion(**completion_params) + # Handle streaming response for async + if self.stream: + return self._collect_streaming_response_async(response) + print(response) return response