From 5a2d28f7c2a5cbc2a1677859554e1617cc78d727 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 13 Sep 2025 13:30:56 -0700 Subject: [PATCH] [FEAT] SequentialWorkflow add Streaming Callback Option --- swarms/structs/agent_rearrange.py | 17 +++++++++++++++++ swarms/structs/sequential_workflow.py | 8 ++++++++ 2 files changed, 25 insertions(+) diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index a4f4944f..6480cb14 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -38,6 +38,7 @@ class AgentRearrange: team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): @@ -55,6 +56,7 @@ class AgentRearrange: self.autosave = autosave self.time_enabled = time_enabled self.message_id_on = message_id_on + self.streaming_callback = streaming_callback self.conversation = Conversation( name=f"{self.name}-Conversation", @@ -384,6 +386,9 @@ class AgentRearrange: for agent_name in agent_names: agent = self.agents[agent_name] + # Set agent.streaming_on if no streaming_callback + if self.streaming_callback is not None: + agent.streaming_on = True result = agent.run( task=self.conversation.get_str(), img=img, @@ -392,6 +397,11 @@ class AgentRearrange: ) result = any_to_str(result) + + # Call streaming callback with the result if provided + if self.streaming_callback: + self.streaming_callback(result) + self.conversation.add( agent.agent_name, result ) @@ -426,6 +436,9 @@ class AgentRearrange: f"Added sequential awareness for {agent_name}: {awareness_info}" ) + # Set agent.streaming_on if no streaming_callback + if self.streaming_callback is not None: + agent.streaming_on = True current_task = agent.run( task=self.conversation.get_str(), img=img, @@ -434,6 +447,10 @@ class AgentRearrange: ) current_task = any_to_str(current_task) + # Call streaming callback with the result if provided + if self.streaming_callback: + self.streaming_callback(current_task) + self.conversation.add( agent.agent_name, current_task ) diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index ef6f7268..c1de30a9 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -49,6 +49,7 @@ class SequentialWorkflow: shared_memory_system: callable = None, multi_agent_collab_prompt: bool = True, team_awareness: bool = False, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): @@ -64,6 +65,8 @@ class SequentialWorkflow: output_type (OutputType, optional): Output format for the workflow. Defaults to "dict". shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None. multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent. + team_awareness (bool, optional): Whether to enable team awareness. Defaults to False. + streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -79,6 +82,7 @@ class SequentialWorkflow: self.shared_memory_system = shared_memory_system self.multi_agent_collab_prompt = multi_agent_collab_prompt self.team_awareness = team_awareness + self.streaming_callback = streaming_callback self.reliability_check() self.flow = self.sequential_flow() @@ -91,6 +95,7 @@ class SequentialWorkflow: max_loops=self.max_loops, output_type=self.output_type, team_awareness=self.team_awareness, + streaming_callback=self.streaming_callback, *args, **kwargs, ) @@ -183,6 +188,9 @@ class SequentialWorkflow: return self.agent_rearrange.run( task=task, img=img, + streaming_callback=self.streaming_callback, + *args, + **kwargs, ) except Exception as e: