From bca8bbdf87019ef8b26ae80ff78567e1a025f4e9 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Fri, 19 Sep 2025 17:59:59 -0700 Subject: [PATCH] [FIX] Streaming Callback & Demo Reconfig --- .../demo_sequential_workflow_streaming.py | 119 ++++++++++++++++ swarms/structs/agent_rearrange.py | 131 ++++++++++++------ swarms/structs/sequential_workflow.py | 7 +- 3 files changed, 213 insertions(+), 44 deletions(-) create mode 100644 examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py diff --git a/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py b/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py new file mode 100644 index 00000000..7e792aaa --- /dev/null +++ b/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py @@ -0,0 +1,119 @@ +import time +from typing import Callable +from swarms.structs.agent import Agent +from swarms.structs.sequential_workflow import SequentialWorkflow + +def create_streaming_callback() -> Callable[[str, str, bool], None]: + """ + Create a streaming callback that shows live paragraph formation. + """ + agent_buffers = {} + paragraph_count = {} + + def streaming_callback(agent_name: str, chunk: str, is_final: bool): + timestamp = time.strftime("%H:%M:%S") + + # Initialize buffers for new agents + if agent_name not in agent_buffers: + agent_buffers[agent_name] = "" + paragraph_count[agent_name] = 1 + print(f"\nšŸŽ¬ [{timestamp}] {agent_name} starting...") + print("=" * 60) + + if chunk.strip(): + # Split chunk into tokens (words/punctuation) + tokens = chunk.replace("\n", " \n ").split() + + for token in tokens: + # Handle paragraph breaks + if token == "\n": + if agent_buffers[agent_name].strip(): + print( + f"\nšŸ“„ [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:" + ) + print(f"{agent_buffers[agent_name].strip()}") + print("=" * 60) + paragraph_count[agent_name] += 1 + agent_buffers[agent_name] = "" + else: + # Add token to buffer and show live accumulation + agent_buffers[agent_name] += token + " " + print( + f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}", + end="", + flush=True, + ) + + if is_final: + print() # New line after live updates + # Print any remaining content as final paragraph + if agent_buffers[agent_name].strip(): + print( + f"\nāœ… [{timestamp}] {agent_name} COMPLETED - Final Paragraph:" + ) + print(f"{agent_buffers[agent_name].strip()}") + print() + print(f"šŸŽÆ [{timestamp}] {agent_name} finished processing") + print(f"šŸ“Š Total paragraphs processed: {paragraph_count[agent_name] - 1}") + print("=" * 60) + + return streaming_callback + +def create_agents(): + """Create specialized agents for the workflow.""" + return [ + Agent( + agent_name="Research_Agent", + agent_description="Specialized in gathering and analyzing information", + system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + Agent( + agent_name="Analysis_Agent", + agent_description="Expert at analyzing data and drawing insights", + system_prompt="You are an analysis expert. Break down complex information and provide clear insights.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + ] + +if __name__ == "__main__": + print("šŸŽÆ SEQUENTIAL WORKFLOW STREAMING DEMO") + print("=" * 50) + + # Create agents and workflow + agents = create_agents() + workflow = SequentialWorkflow( + id="research_analysis_workflow", + name="Research Analysis Workflow", + description="A sequential workflow that researches and analyzes topics", + agents=agents, + max_loops=1, + output_type="str", + multi_agent_collab_prompt=True, + ) + + # Define task + task = "What are the latest advancements in AI?" + + print(f"Task: {task.strip()}") + + # Create streaming callback + streaming_callback = create_streaming_callback() + + print("\nšŸŽ¬ EXECUTING WITH STREAMING CALLBACKS...") + print("Watch real-time agent outputs below:\n") + + # Execute with streaming + result = workflow.run( + task=task, + streaming_callback=streaming_callback, + ) + + print("\nšŸŽ‰ EXECUTION COMPLETED!") + print("\nšŸ“Š FINAL RESULT:") + print("-" * 50) + print(result) \ No newline at end of file diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index e691ef18..0398636c 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -5,7 +5,6 @@ from typing import Any, Callable, Dict, List, Optional, Union from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.telemetry.main import log_agent_data -from swarms.utils.any_to_str import any_to_str from swarms.utils.history_output_formatter import ( history_output_formatter, ) @@ -313,6 +312,9 @@ class AgentRearrange: task: str = None, img: str = None, custom_tasks: Dict[str, str] = None, + streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs, ): @@ -347,8 +349,6 @@ class AgentRearrange: return "Invalid flow configuration." tasks = self.flow.split("->") - current_task = task - response_dict = {} logger.info( f"Starting task execution with {len(tasks)} steps" @@ -386,30 +386,51 @@ class AgentRearrange: for agent_name in agent_names: agent = self.agents[agent_name] - if self.streaming_callback is not None: - agent.streaming_on = True - result = agent.run( - task=self.conversation.get_str(), - img=img, - *args, - **kwargs, - ) - result = any_to_str(result) - - - if self.streaming_callback: - self.streaming_callback(result) - - self.conversation.add( - agent.agent_name, result - ) - - response_dict[agent_name] = result - logger.debug( - f"Agent {agent_name} output: {result}" - ) - - ",".join(agent_names) + # Handle streaming callback if provided + if streaming_callback is not None: + + def agent_streaming_callback(chunk: str): + """Wrapper for agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback( + agent_name, chunk, False + ) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}" + ) + + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + streaming_callback=agent_streaming_callback, + *args, + **kwargs, + ) + + # Call completion callback + try: + streaming_callback(agent_name, "", True) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}" + ) + else: + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + *args, + **kwargs, + ) + self.conversation.add(role=agent_name, content=output) + + if self.verbose: + logger.success( + f"[SUCCESS] Agent {agent_name} completed task successfully" + ) + + self.conversation.add(role=agent_name, content=output) else: # Sequential processing @@ -433,24 +454,50 @@ class AgentRearrange: f"Added sequential awareness for {agent_name}: {awareness_info}" ) - if self.streaming_callback is not None: - agent.streaming_on = True - current_task = agent.run( - task=self.conversation.get_str(), - img=img, - *args, - **kwargs, - ) - current_task = any_to_str(current_task) + # Handle streaming callback if provided + if streaming_callback is not None: + + def agent_streaming_callback(chunk: str): + """Wrapper for agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback( + agent_name, chunk, False + ) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}" + ) + + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + streaming_callback=agent_streaming_callback, + *args, + **kwargs, + ) - if self.streaming_callback: - self.streaming_callback(current_task) + # Call completion callback + try: + streaming_callback(agent_name, "", True) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}" + ) + else: + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + *args, + **kwargs, + ) + self.conversation.add(role=agent_name, content=output) - self.conversation.add( - agent.agent_name, current_task - ) + if self.verbose: + logger.success( + f"[SUCCESS] Agent {agent_name} completed task successfully" + ) - response_dict[agent_name] = current_task loop_count += 1 diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index c1de30a9..aa99fd13 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -49,7 +49,7 @@ class SequentialWorkflow: shared_memory_system: callable = None, multi_agent_collab_prompt: bool = True, team_awareness: bool = False, - streaming_callback: Optional[Callable[[str], None]] = None, + streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs, ): @@ -163,6 +163,9 @@ class SequentialWorkflow: task: str, img: Optional[str] = None, imgs: Optional[List[str]] = None, + streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs, ): @@ -188,7 +191,7 @@ class SequentialWorkflow: return self.agent_rearrange.run( task=task, img=img, - streaming_callback=self.streaming_callback, + streaming_callback=streaming_callback, *args, **kwargs, )