Enhance streaming response handling by collecting chunks and adding callback support

pull/927/head
harshalmore31 5 days ago
parent 2f5e7bdca7
commit 5e5819fc48

@ -2488,15 +2488,30 @@ class Agent:
# If we get a streaming response, handle it with the new streaming panel # If we get a streaming response, handle it with the new streaming panel
if hasattr(streaming_response, '__iter__') and not isinstance(streaming_response, str): if hasattr(streaming_response, '__iter__') and not isinstance(streaming_response, str):
# Use the new streaming panel to display and collect the response # Collect chunks for conversation saving
collected_chunks = []
def on_chunk_received(chunk: str):
"""Callback to collect chunks as they arrive"""
collected_chunks.append(chunk)
# Optional: Save each chunk to conversation in real-time
# This creates a more detailed conversation history
if self.verbose:
logger.debug(f"Streaming chunk received: {chunk[:50]}...")
# Use the streaming panel to display and collect the response
complete_response = formatter.print_streaming_panel( complete_response = formatter.print_streaming_panel(
streaming_response, streaming_response,
title=f"🤖 {self.agent_name} Streaming Response", title=f"🤖 {self.agent_name} Streaming Response",
style="bold cyan" style="bold cyan",
collect_chunks=True,
on_chunk_callback=on_chunk_received
) )
# Restore original stream setting # Restore original stream setting
self.llm.stream = original_stream self.llm.stream = original_stream
# Return the complete response for further processing
return complete_response return complete_response
else: else:
# Restore original stream setting # Restore original stream setting

@ -1,6 +1,6 @@
import threading import threading
import time import time
from typing import Any, Callable, Dict, List from typing import Any, Callable, Dict, List, Optional
from rich.console import Console from rich.console import Console
from rich.live import Live from rich.live import Live
@ -150,6 +150,8 @@ class Formatter:
streaming_response, streaming_response,
title: str = "🤖 Agent Streaming Response", title: str = "🤖 Agent Streaming Response",
style: str = "bold cyan", style: str = "bold cyan",
collect_chunks: bool = False,
on_chunk_callback: Optional[Callable] = None,
) -> str: ) -> str:
""" """
Display real-time streaming response using Rich Live and Panel. Display real-time streaming response using Rich Live and Panel.
@ -159,6 +161,8 @@ class Formatter:
streaming_response: The streaming response generator from LiteLLM. streaming_response: The streaming response generator from LiteLLM.
title (str): Title of the panel. title (str): Title of the panel.
style (str): Style for the panel border. style (str): Style for the panel border.
collect_chunks (bool): Whether to collect individual chunks for conversation saving.
on_chunk_callback (Optional[Callable]): Callback function to call for each chunk.
Returns: Returns:
str: The complete accumulated response text. str: The complete accumulated response text.
@ -187,6 +191,7 @@ class Formatter:
# Create a Text object for streaming content # Create a Text object for streaming content
streaming_text = Text() streaming_text = Text()
complete_response = "" complete_response = ""
chunks_collected = []
# TRUE streaming with Rich's automatic text wrapping # TRUE streaming with Rich's automatic text wrapping
with Live( with Live(
@ -202,6 +207,14 @@ class Formatter:
streaming_text.append(chunk, style="white") streaming_text.append(chunk, style="white")
complete_response += chunk complete_response += chunk
# Collect chunks if requested
if collect_chunks:
chunks_collected.append(chunk)
# Call chunk callback if provided
if on_chunk_callback:
on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically # Update display with new text - Rich handles all wrapping automatically
live.update(create_streaming_panel(streaming_text, is_complete=False)) live.update(create_streaming_panel(streaming_text, is_complete=False))

Loading…
Cancel
Save