cleanup litellm_wrapper

pull/938/head
harshalmore31 2 months ago
parent 8cc1b67c85
commit f73cdc3934

@ -512,10 +512,21 @@ class LiteLLM:
f"Model {self.model_name} does not support vision"
)
def _collect_streaming_chunks(self, streaming_response, callback=None):
"""Helper method to collect chunks from streaming response."""
chunks = []
for chunk in streaming_response:
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
if callback:
callback(content)
return "".join(chunks)
def _handle_streaming_response(
self,
streaming_response,
title: str = "🤖 LLM Response",
title: str = "LLM Response",
style: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
print_on: bool = True,
@ -535,50 +546,35 @@ class LiteLLM:
Returns:
str: The complete response string
"""
# Non-streaming response - return as is
if not (hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str)):
return streaming_response
# Handle callback streaming
if streaming_callback is not None:
return self._collect_streaming_chunks(streaming_response, streaming_callback)
# Handle silent streaming
if not print_on:
return self._collect_streaming_chunks(streaming_response)
# Handle formatted streaming with panel
from swarms.utils.formatter import formatter
import json
from loguru import logger
if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str):
if streaming_callback is not None:
# Real-time callback streaming for dashboard integration
chunks = []
for chunk in streaming_response:
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
streaming_callback(content)
return "".join(chunks)
elif not print_on:
# Silent streaming - no printing, just collect chunks
chunks = []
for chunk in streaming_response:
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunks.append(content)
return "".join(chunks)
else:
# Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
if verbose:
logger.debug(f"Streaming chunk received: {chunk[:50]}...")
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel(
streaming_response,
title=title,
style=style,
collect_chunks=True,
on_chunk_callback=on_chunk_received,
)
return complete_response
else:
# Non-streaming response or string response
return streaming_response
collected_chunks = []
def on_chunk_received(chunk: str):
collected_chunks.append(chunk)
if verbose:
logger.debug(f"Streaming chunk received: {chunk[:50]}...")
return formatter.print_streaming_panel(
streaming_response,
title=title,
style=style,
collect_chunks=True,
on_chunk_callback=on_chunk_received,
)
def run_with_streaming(
self,
@ -586,7 +582,7 @@ class LiteLLM:
img: Optional[str] = None,
audio: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
title: str = "🤖 LLM Response",
title: str = "LLM Response",
style: Optional[str] = None,
print_on: bool = True,
verbose: bool = False,
@ -609,20 +605,19 @@ class LiteLLM:
Returns:
str: The complete response
"""
# Enable streaming if not already set
original_stream = self.stream
self.stream = True
try:
# Call the LLM
# Build kwargs for run method
run_kwargs = {"task": task, **kwargs}
if img is not None:
response = self.run(task=task, img=img, audio=audio, *args, **kwargs)
elif audio is not None:
response = self.run(task=task, audio=audio, *args, **kwargs)
else:
response = self.run(task=task, *args, **kwargs)
run_kwargs["img"] = img
if audio is not None:
run_kwargs["audio"] = audio
response = self.run(*args, **run_kwargs)
# Handle the streaming response
return self._handle_streaming_response(
response,
title=title,
@ -632,7 +627,6 @@ class LiteLLM:
verbose=verbose,
)
finally:
# Restore original stream setting
self.stream = original_stream
def run_tool_summary_with_streaming(
@ -656,11 +650,9 @@ class LiteLLM:
Returns:
str: The complete summary response
"""
summary_task = f"Please analyze and summarize the following tool execution output:\n\n{tool_results}"
return self.run_with_streaming(
task=summary_task,
title=f"🤖 Agent: {agent_name} - Tool Summary",
task=f"Please analyze and summarize the following tool execution output:\n\n{tool_results}",
title=f"Agent: {agent_name} - Tool Summary",
style="green",
print_on=print_on,
verbose=verbose,
@ -682,14 +674,111 @@ class LiteLLM:
print_on: Whether to print the streaming output
delay: Delay between characters for streaming effect
"""
if print_on and response:
# Simple character-by-character streaming for string responses
for char in response:
print(char, end="", flush=True)
if delay > 0:
import time
time.sleep(delay)
print() # Newline at the end
if not (print_on and response):
return
import time
for char in response:
print(char, end="", flush=True)
if delay > 0:
time.sleep(delay)
print() # Newline at the end
def _process_anthropic_chunk(self, chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose):
"""Process Anthropic-style streaming chunks."""
import json
chunk_type = getattr(chunk, 'type', None)
full_text_response = ""
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if print_on:
print(f"\nTool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
return full_text_response, current_tool_call, tool_call_buffer
def _process_openai_chunk(self, chunk, tool_calls_in_stream, print_on, verbose):
"""Process OpenAI-style streaming chunks."""
import json
full_text_response = ""
if not (hasattr(chunk, 'choices') and chunk.choices):
return full_text_response
choice = chunk.choices[0]
if not (hasattr(choice, 'delta') and choice.delta):
return full_text_response
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# Create new tool call if slot is empty and we have a function name
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if print_on:
print(f"\nTool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
pass
return full_text_response
def parse_streaming_chunks_with_tools(
self,
@ -710,101 +799,32 @@ class LiteLLM:
Returns:
tuple: (full_text_response, tool_calls_list)
"""
import json
full_text_response = ""
tool_calls_in_stream = []
current_tool_call = None
tool_call_buffer = ""
if print_on:
print(f"🤖 {agent_name}: ", end="", flush=True)
print(f"{agent_name}: ", end="", flush=True)
# Process streaming chunks in real-time
for chunk in stream:
if verbose:
logger.debug(f"Processing streaming chunk: {type(chunk)}")
chunk_type = getattr(chunk, 'type', None)
# Anthropic-style stream parsing
if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use':
tool_name = chunk.content_block.name
if print_on:
print(f"\n🔧 Tool Call: {tool_name}...", flush=True)
current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""}
tool_call_buffer = ""
# Try Anthropic-style processing first
anthropic_result = self._process_anthropic_chunk(
chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose
)
if anthropic_result[0]: # If text was processed
text_chunk, current_tool_call, tool_call_buffer = anthropic_result
full_text_response += text_chunk
continue
elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'):
if chunk.delta.type == 'input_json_delta':
tool_call_buffer += chunk.delta.partial_json
elif chunk.delta.type == 'text_delta':
text_chunk = chunk.delta.text
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
elif chunk_type == 'content_block_stop' and current_tool_call:
try:
tool_input = json.loads(tool_call_buffer)
current_tool_call["input"] = tool_input
tool_calls_in_stream.append(current_tool_call)
except json.JSONDecodeError:
logger.error(f"Failed to parse tool arguments: {tool_call_buffer}")
current_tool_call = None
tool_call_buffer = ""
# OpenAI-style stream parsing
elif hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
if hasattr(choice, 'delta') and choice.delta:
delta = choice.delta
# Handle text content
if hasattr(delta, 'content') and delta.content:
text_chunk = delta.content
full_text_response += text_chunk
if print_on:
print(text_chunk, end="", flush=True)
# Handle tool calls in streaming chunks
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
tool_index = getattr(tool_call, 'index', 0)
# Ensure we have enough slots in the list
while len(tool_calls_in_stream) <= tool_index:
tool_calls_in_stream.append(None)
if hasattr(tool_call, 'function') and tool_call.function:
func = tool_call.function
# Create new tool call if slot is empty and we have a function name
if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name:
if print_on:
print(f"\n🔧 Tool Call: {func.name}...", flush=True)
tool_calls_in_stream[tool_index] = {
"id": getattr(tool_call, 'id', f"call_{tool_index}"),
"name": func.name,
"arguments": ""
}
# Accumulate arguments
if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None:
tool_calls_in_stream[tool_index]["arguments"] += func.arguments
if verbose:
logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'")
# Try to parse if we have complete JSON
try:
args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"])
tool_calls_in_stream[tool_index]["input"] = args_dict
tool_calls_in_stream[tool_index]["arguments_complete"] = True
if verbose:
logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}")
except json.JSONDecodeError:
pass
# If not Anthropic, try OpenAI-style processing
openai_text = self._process_openai_chunk(chunk, tool_calls_in_stream, print_on, verbose)
if openai_text:
full_text_response += openai_text
if print_on:
print() # Newline after streaming text

Loading…
Cancel
Save