diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md index 8947374a..42b4f4dd 100644 --- a/docs/swarms/structs/agent.md +++ b/docs/swarms/structs/agent.md @@ -83,6 +83,7 @@ The `Agent` class establishes a conversational loop with a language model, allow | `traceback` | `Optional[Any]` | Object used for traceback handling. | | `traceback_handlers` | `Optional[Any]` | List of traceback handlers. | | `streaming_on` | `Optional[bool]` | Boolean indicating whether to stream responses. | +| `stream` | `Optional[bool]` | Boolean indicating whether to enable detailed token-by-token streaming with metadata. | | `docs` | `List[str]` | List of document paths or contents to be ingested. | | `docs_folder` | `Optional[str]` | Path to a folder containing documents to be ingested. | | `verbose` | `Optional[bool]` | Boolean indicating whether to print verbose output. | @@ -759,6 +760,22 @@ print(agent.system_prompt) ``` +### Token-by-Token Streaming + +```python +from swarms import Agent + +# Initialize agent with detailed streaming +agent = Agent( + model_name="gpt-4.1", + max_loops=1, + stream=True, # Enable detailed token-by-token streaming +) + +# Run with detailed streaming - each token shows metadata +agent.run("Tell me a short story about a robot learning to paint.") +``` + ## Agent Structured Outputs - Create a structured output schema for the agent [List[Dict]] @@ -1112,4 +1129,4 @@ The `run` method now supports several new parameters for advanced functionality: | `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. | | `handoffs` | Use handoffs to create specialized agent teams that can intelligently route tasks based on complexity and expertise requirements. | -By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications. \ No newline at end of file +By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications. diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index f8c84d73..29222e1f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -218,7 +218,8 @@ class Agent: preset_stopping_token (bool): Enable preset stopping token traceback (Any): The traceback traceback_handlers (Any): The traceback handlers - streaming_on (bool): Enable streaming + streaming_on (bool): Enable basic streaming with formatted panels + stream (bool): Enable detailed token-by-token streaming with metadata (citations, tokens used, etc.) docs (List[str]): The list of documents docs_folder (str): The folder containing the documents verbose (bool): Enable verbose mode @@ -310,9 +311,9 @@ class Agent: >>> print(response) >>> # Generate a report on the financials. - >>> # Real-time streaming example - >>> agent = Agent(model_name="gpt-4.1", max_loops=1, streaming_on=True) - >>> response = agent.run("Tell me a long story.") # Will stream in real-time + >>> # Detailed token streaming example + >>> agent = Agent(model_name="gpt-4.1", max_loops=1, stream=True) + >>> response = agent.run("Tell me a story.") # Will stream each token with detailed metadata >>> print(response) # Final complete response >>> # Fallback model example @@ -366,6 +367,7 @@ class Agent: traceback: Optional[Any] = None, traceback_handlers: Optional[Any] = None, streaming_on: Optional[bool] = False, + stream: Optional[bool] = False, docs: List[str] = None, docs_folder: Optional[str] = None, verbose: Optional[bool] = False, @@ -516,6 +518,7 @@ class Agent: self.traceback = traceback self.traceback_handlers = traceback_handlers self.streaming_on = streaming_on + self.stream = stream self.docs = docs self.docs_folder = docs_folder self.verbose = verbose @@ -1346,6 +1349,8 @@ class Agent: ) elif self.streaming_on: pass + elif self.stream: + pass else: self.pretty_print( response, loop_count @@ -2566,8 +2571,105 @@ class Agent: del kwargs["is_last"] try: - # Set streaming parameter in LLM if streaming is enabled - if self.streaming_on and hasattr(self.llm, "stream"): + if self.stream and hasattr(self.llm, "stream"): + original_stream = self.llm.stream + self.llm.stream = True + + if img is not None: + streaming_response = self.llm.run( + task=task, img=img, *args, **kwargs + ) + else: + streaming_response = self.llm.run( + task=task, *args, **kwargs + ) + + if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): + complete_response = "" + token_count = 0 + final_chunk = None + first_chunk = None + + for chunk in streaming_response: + if first_chunk is None: + first_chunk = chunk + + if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + complete_response += content + token_count += 1 + + # Schema per token outputted + token_info = { + "token_index": token_count, + "model": getattr(chunk, 'model', self.get_current_model()), + "id": getattr(chunk, 'id', ''), + "created": getattr(chunk, 'created', int(time.time())), + "object": getattr(chunk, 'object', 'chat.completion.chunk'), + "token": content, + "system_fingerprint": getattr(chunk, 'system_fingerprint', ''), + "finish_reason": chunk.choices[0].finish_reason, + "citations": getattr(chunk, 'citations', None), + "provider_specific_fields": getattr(chunk, 'provider_specific_fields', None), + "service_tier": getattr(chunk, 'service_tier', 'default'), + "obfuscation": getattr(chunk, 'obfuscation', None), + "usage": getattr(chunk, 'usage', None), + "logprobs": chunk.choices[0].logprobs, + "timestamp": time.time() + } + + print(f"ResponseStream {token_info}") + + if streaming_callback is not None: + streaming_callback(token_info) + + final_chunk = chunk + + #Final ModelResponse to stream + if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage: + usage = final_chunk.usage + print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " + f"created={getattr(final_chunk, 'created', 'N/A')}, " + f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " + f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " + f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " + f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " + f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " + f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " + f"provider_specific_fields=None, " + f"usage=Usage(completion_tokens={usage.completion_tokens}, " + f"prompt_tokens={usage.prompt_tokens}, " + f"total_tokens={usage.total_tokens}, " + f"completion_tokens_details=CompletionTokensDetailsWrapper(" + f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, " + f"audio_tokens={usage.completion_tokens_details.audio_tokens}, " + f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, " + f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, " + f"text_tokens={usage.completion_tokens_details.text_tokens}), " + f"prompt_tokens_details=PromptTokensDetailsWrapper(" + f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, " + f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, " + f"text_tokens={usage.prompt_tokens_details.text_tokens}, " + f"image_tokens={usage.prompt_tokens_details.image_tokens})))") + else: + print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " + f"created={getattr(final_chunk, 'created', 'N/A')}, " + f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " + f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " + f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " + f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " + f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " + f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " + f"provider_specific_fields=None)") + + + self.llm.stream = original_stream + return complete_response + else: + self.llm.stream = original_stream + return streaming_response + + elif self.streaming_on and hasattr(self.llm, "stream"): original_stream = self.llm.stream self.llm.stream = True @@ -3052,6 +3154,8 @@ class Agent: if self.streaming_on: pass + elif self.stream: + pass if self.print_on: formatter.print_panel( diff --git a/tests/structs/test_agent_stream_token.py b/tests/structs/test_agent_stream_token.py new file mode 100644 index 00000000..5cd02207 --- /dev/null +++ b/tests/structs/test_agent_stream_token.py @@ -0,0 +1,9 @@ +from swarms.structs.agent import Agent + +agent = Agent( + model_name="gpt-4.1", + max_loops=1, + stream=True, +) + +agent.run("Tell me a short story about a robot learning to paint.")