[FEAT] Token Stream in Agent class

pull/1167/head
Aksh Parekh 3 weeks ago committed by GitHub
parent 9faf30f102
commit 540a61ab78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -215,7 +215,8 @@ class Agent:
preset_stopping_token (bool): Enable preset stopping token
traceback (Any): The traceback
traceback_handlers (Any): The traceback handlers
streaming_on (bool): Enable streaming
streaming_on (bool): Enable basic streaming with formatted panels
stream (bool): Enable detailed token-by-token streaming with metadata (citations, tokens used, etc.)
docs (List[str]): The list of documents
docs_folder (str): The folder containing the documents
verbose (bool): Enable verbose mode
@ -307,9 +308,9 @@ class Agent:
>>> print(response)
>>> # Generate a report on the financials.
>>> # Real-time streaming example
>>> agent = Agent(model_name="gpt-4.1", max_loops=1, streaming_on=True)
>>> response = agent.run("Tell me a long story.") # Will stream in real-time
>>> # Detailed token streaming example
>>> agent = Agent(model_name="gpt-4.1", max_loops=1, stream=True)
>>> response = agent.run("Tell me a story.") # Will stream each token with detailed metadata
>>> print(response) # Final complete response
>>> # Fallback model example
@ -363,6 +364,7 @@ class Agent:
traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None,
streaming_on: Optional[bool] = False,
stream: Optional[bool] = False,
docs: List[str] = None,
docs_folder: Optional[str] = None,
verbose: Optional[bool] = False,
@ -512,6 +514,7 @@ class Agent:
self.traceback = traceback
self.traceback_handlers = traceback_handlers
self.streaming_on = streaming_on
self.stream = stream
self.docs = docs
self.docs_folder = docs_folder
self.verbose = verbose
@ -1317,6 +1320,8 @@ class Agent:
)
elif self.streaming_on:
pass
elif self.stream:
pass
else:
self.pretty_print(
response, loop_count
@ -2537,8 +2542,105 @@ class Agent:
del kwargs["is_last"]
try:
# Set streaming parameter in LLM if streaming is enabled
if self.streaming_on and hasattr(self.llm, "stream"):
if self.stream and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
self.llm.stream = True
if img is not None:
streaming_response = self.llm.run(
task=task, img=img, *args, **kwargs
)
else:
streaming_response = self.llm.run(
task=task, *args, **kwargs
)
if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str):
complete_response = ""
token_count = 0
final_chunk = None
first_chunk = None
for chunk in streaming_response:
if first_chunk is None:
first_chunk = chunk
if hasattr(chunk, "choices") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
complete_response += content
token_count += 1
# Schema per token outputted
token_info = {
"token_index": token_count,
"model": getattr(chunk, 'model', self.get_current_model()),
"id": getattr(chunk, 'id', ''),
"created": getattr(chunk, 'created', int(time.time())),
"object": getattr(chunk, 'object', 'chat.completion.chunk'),
"token": content,
"system_fingerprint": getattr(chunk, 'system_fingerprint', ''),
"finish_reason": chunk.choices[0].finish_reason,
"citations": getattr(chunk, 'citations', None),
"provider_specific_fields": getattr(chunk, 'provider_specific_fields', None),
"service_tier": getattr(chunk, 'service_tier', 'default'),
"obfuscation": getattr(chunk, 'obfuscation', None),
"usage": getattr(chunk, 'usage', None),
"logprobs": chunk.choices[0].logprobs,
"timestamp": time.time()
}
print(f"ResponseStream {token_info}")
if streaming_callback is not None:
streaming_callback(token_info)
final_chunk = chunk
#Final ModelResponse to stream
if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage:
usage = final_chunk.usage
print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None, "
f"usage=Usage(completion_tokens={usage.completion_tokens}, "
f"prompt_tokens={usage.prompt_tokens}, "
f"total_tokens={usage.total_tokens}, "
f"completion_tokens_details=CompletionTokensDetailsWrapper("
f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, "
f"audio_tokens={usage.completion_tokens_details.audio_tokens}, "
f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, "
f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, "
f"text_tokens={usage.completion_tokens_details.text_tokens}), "
f"prompt_tokens_details=PromptTokensDetailsWrapper("
f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, "
f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, "
f"text_tokens={usage.prompt_tokens_details.text_tokens}, "
f"image_tokens={usage.prompt_tokens_details.image_tokens})))")
else:
print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', "
f"created={getattr(final_chunk, 'created', 'N/A')}, "
f"model='{getattr(final_chunk, 'model', self.get_current_model())}', "
f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', "
f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', "
f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', "
f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, "
f"function_call=None, tool_calls=None, audio=None), logprobs=None)], "
f"provider_specific_fields=None)")
self.llm.stream = original_stream
return complete_response
else:
self.llm.stream = original_stream
return streaming_response
elif self.streaming_on and hasattr(self.llm, "stream"):
original_stream = self.llm.stream
self.llm.stream = True
@ -3023,6 +3125,8 @@ class Agent:
if self.streaming_on:
pass
elif self.stream:
pass
if self.print_on:
formatter.print_panel(

Loading…
Cancel
Save