From f73cdc393428ac4f07ea6b2a52d37cbfa029eaf5 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Wed, 13 Aug 2025 18:44:21 +0530 Subject: [PATCH] cleanup litellm_wrapper --- swarms/utils/litellm_wrapper.py | 312 +++++++++++++++++--------------- 1 file changed, 166 insertions(+), 146 deletions(-) diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index 11dcd42d..16b08abd 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -512,10 +512,21 @@ class LiteLLM: f"Model {self.model_name} does not support vision" ) + def _collect_streaming_chunks(self, streaming_response, callback=None): + """Helper method to collect chunks from streaming response.""" + chunks = [] + for chunk in streaming_response: + if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + chunks.append(content) + if callback: + callback(content) + return "".join(chunks) + def _handle_streaming_response( self, streaming_response, - title: str = "šŸ¤– LLM Response", + title: str = "LLM Response", style: Optional[str] = None, streaming_callback: Optional[Callable[[str], None]] = None, print_on: bool = True, @@ -535,50 +546,35 @@ class LiteLLM: Returns: str: The complete response string """ + # Non-streaming response - return as is + if not (hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str)): + return streaming_response + + # Handle callback streaming + if streaming_callback is not None: + return self._collect_streaming_chunks(streaming_response, streaming_callback) + + # Handle silent streaming + if not print_on: + return self._collect_streaming_chunks(streaming_response) + + # Handle formatted streaming with panel from swarms.utils.formatter import formatter - import json from loguru import logger - if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): - if streaming_callback is not None: - # Real-time callback streaming for dashboard integration - chunks = [] - for chunk in streaming_response: - if hasattr(chunk, "choices") and chunk.choices[0].delta.content: - content = chunk.choices[0].delta.content - chunks.append(content) - streaming_callback(content) - return "".join(chunks) - elif not print_on: - # Silent streaming - no printing, just collect chunks - chunks = [] - for chunk in streaming_response: - if hasattr(chunk, "choices") and chunk.choices[0].delta.content: - content = chunk.choices[0].delta.content - chunks.append(content) - return "".join(chunks) - else: - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - if verbose: - logger.debug(f"Streaming chunk received: {chunk[:50]}...") - - # Use the streaming panel to display and collect the response - complete_response = formatter.print_streaming_panel( - streaming_response, - title=title, - style=style, - collect_chunks=True, - on_chunk_callback=on_chunk_received, - ) - return complete_response - else: - # Non-streaming response or string response - return streaming_response + collected_chunks = [] + def on_chunk_received(chunk: str): + collected_chunks.append(chunk) + if verbose: + logger.debug(f"Streaming chunk received: {chunk[:50]}...") + + return formatter.print_streaming_panel( + streaming_response, + title=title, + style=style, + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) def run_with_streaming( self, @@ -586,7 +582,7 @@ class LiteLLM: img: Optional[str] = None, audio: Optional[str] = None, streaming_callback: Optional[Callable[[str], None]] = None, - title: str = "šŸ¤– LLM Response", + title: str = "LLM Response", style: Optional[str] = None, print_on: bool = True, verbose: bool = False, @@ -609,20 +605,19 @@ class LiteLLM: Returns: str: The complete response """ - # Enable streaming if not already set original_stream = self.stream self.stream = True try: - # Call the LLM + # Build kwargs for run method + run_kwargs = {"task": task, **kwargs} if img is not None: - response = self.run(task=task, img=img, audio=audio, *args, **kwargs) - elif audio is not None: - response = self.run(task=task, audio=audio, *args, **kwargs) - else: - response = self.run(task=task, *args, **kwargs) + run_kwargs["img"] = img + if audio is not None: + run_kwargs["audio"] = audio + + response = self.run(*args, **run_kwargs) - # Handle the streaming response return self._handle_streaming_response( response, title=title, @@ -632,7 +627,6 @@ class LiteLLM: verbose=verbose, ) finally: - # Restore original stream setting self.stream = original_stream def run_tool_summary_with_streaming( @@ -656,11 +650,9 @@ class LiteLLM: Returns: str: The complete summary response """ - summary_task = f"Please analyze and summarize the following tool execution output:\n\n{tool_results}" - return self.run_with_streaming( - task=summary_task, - title=f"šŸ¤– Agent: {agent_name} - Tool Summary", + task=f"Please analyze and summarize the following tool execution output:\n\n{tool_results}", + title=f"Agent: {agent_name} - Tool Summary", style="green", print_on=print_on, verbose=verbose, @@ -682,14 +674,111 @@ class LiteLLM: print_on: Whether to print the streaming output delay: Delay between characters for streaming effect """ - if print_on and response: - # Simple character-by-character streaming for string responses - for char in response: - print(char, end="", flush=True) - if delay > 0: - import time - time.sleep(delay) - print() # Newline at the end + if not (print_on and response): + return + + import time + for char in response: + print(char, end="", flush=True) + if delay > 0: + time.sleep(delay) + print() # Newline at the end + + def _process_anthropic_chunk(self, chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose): + """Process Anthropic-style streaming chunks.""" + import json + chunk_type = getattr(chunk, 'type', None) + full_text_response = "" + + if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': + tool_name = chunk.content_block.name + if print_on: + print(f"\nTool Call: {tool_name}...", flush=True) + current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} + tool_call_buffer = "" + + elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'input_json_delta': + tool_call_buffer += chunk.delta.partial_json + elif chunk.delta.type == 'text_delta': + text_chunk = chunk.delta.text + full_text_response += text_chunk + if print_on: + print(text_chunk, end="", flush=True) + + elif chunk_type == 'content_block_stop' and current_tool_call: + try: + tool_input = json.loads(tool_call_buffer) + current_tool_call["input"] = tool_input + tool_calls_in_stream.append(current_tool_call) + except json.JSONDecodeError: + logger.error(f"Failed to parse tool arguments: {tool_call_buffer}") + current_tool_call = None + tool_call_buffer = "" + + return full_text_response, current_tool_call, tool_call_buffer + + def _process_openai_chunk(self, chunk, tool_calls_in_stream, print_on, verbose): + """Process OpenAI-style streaming chunks.""" + import json + full_text_response = "" + + if not (hasattr(chunk, 'choices') and chunk.choices): + return full_text_response + + choice = chunk.choices[0] + if not (hasattr(choice, 'delta') and choice.delta): + return full_text_response + + delta = choice.delta + + # Handle text content + if hasattr(delta, 'content') and delta.content: + text_chunk = delta.content + full_text_response += text_chunk + if print_on: + print(text_chunk, end="", flush=True) + + # Handle tool calls in streaming chunks + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + tool_index = getattr(tool_call, 'index', 0) + + # Ensure we have enough slots in the list + while len(tool_calls_in_stream) <= tool_index: + tool_calls_in_stream.append(None) + + if hasattr(tool_call, 'function') and tool_call.function: + func = tool_call.function + + # Create new tool call if slot is empty and we have a function name + if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: + if print_on: + print(f"\nTool Call: {func.name}...", flush=True) + tool_calls_in_stream[tool_index] = { + "id": getattr(tool_call, 'id', f"call_{tool_index}"), + "name": func.name, + "arguments": "" + } + + # Accumulate arguments + if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: + tool_calls_in_stream[tool_index]["arguments"] += func.arguments + + if verbose: + logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") + + # Try to parse if we have complete JSON + try: + args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) + tool_calls_in_stream[tool_index]["input"] = args_dict + tool_calls_in_stream[tool_index]["arguments_complete"] = True + if verbose: + logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") + except json.JSONDecodeError: + pass + + return full_text_response def parse_streaming_chunks_with_tools( self, @@ -710,101 +799,32 @@ class LiteLLM: Returns: tuple: (full_text_response, tool_calls_list) """ - import json - full_text_response = "" tool_calls_in_stream = [] current_tool_call = None tool_call_buffer = "" if print_on: - print(f"šŸ¤– {agent_name}: ", end="", flush=True) + print(f"{agent_name}: ", end="", flush=True) # Process streaming chunks in real-time for chunk in stream: if verbose: logger.debug(f"Processing streaming chunk: {type(chunk)}") - chunk_type = getattr(chunk, 'type', None) - - # Anthropic-style stream parsing - if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': - tool_name = chunk.content_block.name - if print_on: - print(f"\nšŸ”§ Tool Call: {tool_name}...", flush=True) - current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} - tool_call_buffer = "" + # Try Anthropic-style processing first + anthropic_result = self._process_anthropic_chunk( + chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose + ) + if anthropic_result[0]: # If text was processed + text_chunk, current_tool_call, tool_call_buffer = anthropic_result + full_text_response += text_chunk + continue - elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): - if chunk.delta.type == 'input_json_delta': - tool_call_buffer += chunk.delta.partial_json - elif chunk.delta.type == 'text_delta': - text_chunk = chunk.delta.text - full_text_response += text_chunk - if print_on: - print(text_chunk, end="", flush=True) - - elif chunk_type == 'content_block_stop' and current_tool_call: - try: - tool_input = json.loads(tool_call_buffer) - current_tool_call["input"] = tool_input - tool_calls_in_stream.append(current_tool_call) - except json.JSONDecodeError: - logger.error(f"Failed to parse tool arguments: {tool_call_buffer}") - current_tool_call = None - tool_call_buffer = "" - - # OpenAI-style stream parsing - elif hasattr(chunk, 'choices') and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, 'delta') and choice.delta: - delta = choice.delta - - # Handle text content - if hasattr(delta, 'content') and delta.content: - text_chunk = delta.content - full_text_response += text_chunk - if print_on: - print(text_chunk, end="", flush=True) - - # Handle tool calls in streaming chunks - if hasattr(delta, 'tool_calls') and delta.tool_calls: - for tool_call in delta.tool_calls: - tool_index = getattr(tool_call, 'index', 0) - - # Ensure we have enough slots in the list - while len(tool_calls_in_stream) <= tool_index: - tool_calls_in_stream.append(None) - - if hasattr(tool_call, 'function') and tool_call.function: - func = tool_call.function - - # Create new tool call if slot is empty and we have a function name - if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: - if print_on: - print(f"\nšŸ”§ Tool Call: {func.name}...", flush=True) - tool_calls_in_stream[tool_index] = { - "id": getattr(tool_call, 'id', f"call_{tool_index}"), - "name": func.name, - "arguments": "" - } - - # Accumulate arguments - if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: - tool_calls_in_stream[tool_index]["arguments"] += func.arguments - - if verbose: - logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") - - # Try to parse if we have complete JSON - try: - args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) - tool_calls_in_stream[tool_index]["input"] = args_dict - tool_calls_in_stream[tool_index]["arguments_complete"] = True - if verbose: - logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") - except json.JSONDecodeError: - pass + # If not Anthropic, try OpenAI-style processing + openai_text = self._process_openai_chunk(chunk, tool_calls_in_stream, print_on, verbose) + if openai_text: + full_text_response += openai_text if print_on: print() # Newline after streaming text