From 42ddace77b770935ca14c0861d8b58e79e6486e2 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Fri, 19 Sep 2025 19:51:39 -0700 Subject: [PATCH] [FIX] Streaming Example Function --- .../demo_sequential_workflow_streaming.py | 60 ++----------------- swarms/structs/sequential_workflow.py | 2 - 2 files changed, 6 insertions(+), 56 deletions(-) diff --git a/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py b/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py index 7e792aaa..6b6d104e 100644 --- a/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py +++ b/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py @@ -1,62 +1,16 @@ -import time from typing import Callable from swarms.structs.agent import Agent from swarms.structs.sequential_workflow import SequentialWorkflow def create_streaming_callback() -> Callable[[str, str, bool], None]: """ - Create a streaming callback that shows live paragraph formation. + Create a streaming callback that prints each chunk as it arrives, with no duplication or timestamps. """ - agent_buffers = {} - paragraph_count = {} - def streaming_callback(agent_name: str, chunk: str, is_final: bool): - timestamp = time.strftime("%H:%M:%S") - - # Initialize buffers for new agents - if agent_name not in agent_buffers: - agent_buffers[agent_name] = "" - paragraph_count[agent_name] = 1 - print(f"\nšŸŽ¬ [{timestamp}] {agent_name} starting...") - print("=" * 60) - - if chunk.strip(): - # Split chunk into tokens (words/punctuation) - tokens = chunk.replace("\n", " \n ").split() - - for token in tokens: - # Handle paragraph breaks - if token == "\n": - if agent_buffers[agent_name].strip(): - print( - f"\nšŸ“„ [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:" - ) - print(f"{agent_buffers[agent_name].strip()}") - print("=" * 60) - paragraph_count[agent_name] += 1 - agent_buffers[agent_name] = "" - else: - # Add token to buffer and show live accumulation - agent_buffers[agent_name] += token + " " - print( - f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}", - end="", - flush=True, - ) - + if chunk: + print(chunk, end="", flush=True) if is_final: - print() # New line after live updates - # Print any remaining content as final paragraph - if agent_buffers[agent_name].strip(): - print( - f"\nāœ… [{timestamp}] {agent_name} COMPLETED - Final Paragraph:" - ) - print(f"{agent_buffers[agent_name].strip()}") - print() - print(f"šŸŽÆ [{timestamp}] {agent_name} finished processing") - print(f"šŸ“Š Total paragraphs processed: {paragraph_count[agent_name] - 1}") - print("=" * 60) - + print() return streaming_callback def create_agents(): @@ -69,6 +23,7 @@ def create_agents(): model_name="gpt-4o-mini", max_loops=1, streaming_on=True, + print_on=False, ), Agent( agent_name="Analysis_Agent", @@ -77,6 +32,7 @@ def create_agents(): model_name="gpt-4o-mini", max_loops=1, streaming_on=True, + print_on=False, ), ] @@ -84,7 +40,6 @@ if __name__ == "__main__": print("šŸŽÆ SEQUENTIAL WORKFLOW STREAMING DEMO") print("=" * 50) - # Create agents and workflow agents = create_agents() workflow = SequentialWorkflow( id="research_analysis_workflow", @@ -96,18 +51,15 @@ if __name__ == "__main__": multi_agent_collab_prompt=True, ) - # Define task task = "What are the latest advancements in AI?" print(f"Task: {task.strip()}") - # Create streaming callback streaming_callback = create_streaming_callback() print("\nšŸŽ¬ EXECUTING WITH STREAMING CALLBACKS...") print("Watch real-time agent outputs below:\n") - # Execute with streaming result = workflow.run( task=task, streaming_callback=streaming_callback, diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index aa99fd13..c18aa7e0 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -133,7 +133,6 @@ class SequentialWorkflow: agent_names = [] for agent in self.agents: try: - # Try to get agent_name, fallback to name if not available agent_name = ( getattr(agent, "agent_name", None) or agent.name @@ -187,7 +186,6 @@ class SequentialWorkflow: Exception: If any error occurs during task execution. """ try: - # prompt = f"{MULTI_AGENT_COLLAB_PROMPT}\n\n{task}" return self.agent_rearrange.run( task=task, img=img,