diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 489e91f9..b6950162 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -215,7 +215,8 @@ class Agent: preset_stopping_token (bool): Enable preset stopping token traceback (Any): The traceback traceback_handlers (Any): The traceback handlers - streaming_on (bool): Enable streaming + streaming_on (bool): Enable basic streaming with formatted panels + stream (bool): Enable detailed token-by-token streaming with metadata (citations, tokens used, etc.) docs (List[str]): The list of documents docs_folder (str): The folder containing the documents verbose (bool): Enable verbose mode @@ -307,9 +308,9 @@ class Agent: >>> print(response) >>> # Generate a report on the financials. - >>> # Real-time streaming example - >>> agent = Agent(model_name="gpt-4.1", max_loops=1, streaming_on=True) - >>> response = agent.run("Tell me a long story.") # Will stream in real-time + >>> # Detailed token streaming example + >>> agent = Agent(model_name="gpt-4.1", max_loops=1, stream=True) + >>> response = agent.run("Tell me a story.") # Will stream each token with detailed metadata >>> print(response) # Final complete response >>> # Fallback model example @@ -363,6 +364,7 @@ class Agent: traceback: Optional[Any] = None, traceback_handlers: Optional[Any] = None, streaming_on: Optional[bool] = False, + stream: Optional[bool] = False, docs: List[str] = None, docs_folder: Optional[str] = None, verbose: Optional[bool] = False, @@ -512,6 +514,7 @@ class Agent: self.traceback = traceback self.traceback_handlers = traceback_handlers self.streaming_on = streaming_on + self.stream = stream self.docs = docs self.docs_folder = docs_folder self.verbose = verbose @@ -1317,6 +1320,8 @@ class Agent: ) elif self.streaming_on: pass + elif self.stream: + pass else: self.pretty_print( response, loop_count @@ -2537,8 +2542,105 @@ class Agent: del kwargs["is_last"] try: - # Set streaming parameter in LLM if streaming is enabled - if self.streaming_on and hasattr(self.llm, "stream"): + if self.stream and hasattr(self.llm, "stream"): + original_stream = self.llm.stream + self.llm.stream = True + + if img is not None: + streaming_response = self.llm.run( + task=task, img=img, *args, **kwargs + ) + else: + streaming_response = self.llm.run( + task=task, *args, **kwargs + ) + + if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): + complete_response = "" + token_count = 0 + final_chunk = None + first_chunk = None + + for chunk in streaming_response: + if first_chunk is None: + first_chunk = chunk + + if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + complete_response += content + token_count += 1 + + # Schema per token outputted + token_info = { + "token_index": token_count, + "model": getattr(chunk, 'model', self.get_current_model()), + "id": getattr(chunk, 'id', ''), + "created": getattr(chunk, 'created', int(time.time())), + "object": getattr(chunk, 'object', 'chat.completion.chunk'), + "token": content, + "system_fingerprint": getattr(chunk, 'system_fingerprint', ''), + "finish_reason": chunk.choices[0].finish_reason, + "citations": getattr(chunk, 'citations', None), + "provider_specific_fields": getattr(chunk, 'provider_specific_fields', None), + "service_tier": getattr(chunk, 'service_tier', 'default'), + "obfuscation": getattr(chunk, 'obfuscation', None), + "usage": getattr(chunk, 'usage', None), + "logprobs": chunk.choices[0].logprobs, + "timestamp": time.time() + } + + print(f"ResponseStream {token_info}") + + if streaming_callback is not None: + streaming_callback(token_info) + + final_chunk = chunk + + #Final ModelResponse to stream + if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage: + usage = final_chunk.usage + print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " + f"created={getattr(final_chunk, 'created', 'N/A')}, " + f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " + f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " + f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " + f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " + f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " + f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " + f"provider_specific_fields=None, " + f"usage=Usage(completion_tokens={usage.completion_tokens}, " + f"prompt_tokens={usage.prompt_tokens}, " + f"total_tokens={usage.total_tokens}, " + f"completion_tokens_details=CompletionTokensDetailsWrapper(" + f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, " + f"audio_tokens={usage.completion_tokens_details.audio_tokens}, " + f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, " + f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, " + f"text_tokens={usage.completion_tokens_details.text_tokens}), " + f"prompt_tokens_details=PromptTokensDetailsWrapper(" + f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, " + f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, " + f"text_tokens={usage.prompt_tokens_details.text_tokens}, " + f"image_tokens={usage.prompt_tokens_details.image_tokens})))") + else: + print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " + f"created={getattr(final_chunk, 'created', 'N/A')}, " + f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " + f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " + f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " + f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " + f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " + f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " + f"provider_specific_fields=None)") + + + self.llm.stream = original_stream + return complete_response + else: + self.llm.stream = original_stream + return streaming_response + + elif self.streaming_on and hasattr(self.llm, "stream"): original_stream = self.llm.stream self.llm.stream = True @@ -3023,6 +3125,8 @@ class Agent: if self.streaming_on: pass + elif self.stream: + pass if self.print_on: formatter.print_panel(