From 5a2d28f7c2a5cbc2a1677859554e1617cc78d727 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 13 Sep 2025 13:30:56 -0700 Subject: [PATCH 1/5] [FEAT] SequentialWorkflow add Streaming Callback Option --- swarms/structs/agent_rearrange.py | 17 +++++++++++++++++ swarms/structs/sequential_workflow.py | 8 ++++++++ 2 files changed, 25 insertions(+) diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index a4f4944f..6480cb14 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -38,6 +38,7 @@ class AgentRearrange: team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): @@ -55,6 +56,7 @@ class AgentRearrange: self.autosave = autosave self.time_enabled = time_enabled self.message_id_on = message_id_on + self.streaming_callback = streaming_callback self.conversation = Conversation( name=f"{self.name}-Conversation", @@ -384,6 +386,9 @@ class AgentRearrange: for agent_name in agent_names: agent = self.agents[agent_name] + # Set agent.streaming_on if no streaming_callback + if self.streaming_callback is not None: + agent.streaming_on = True result = agent.run( task=self.conversation.get_str(), img=img, @@ -392,6 +397,11 @@ class AgentRearrange: ) result = any_to_str(result) + + # Call streaming callback with the result if provided + if self.streaming_callback: + self.streaming_callback(result) + self.conversation.add( agent.agent_name, result ) @@ -426,6 +436,9 @@ class AgentRearrange: f"Added sequential awareness for {agent_name}: {awareness_info}" ) + # Set agent.streaming_on if no streaming_callback + if self.streaming_callback is not None: + agent.streaming_on = True current_task = agent.run( task=self.conversation.get_str(), img=img, @@ -434,6 +447,10 @@ class AgentRearrange: ) current_task = any_to_str(current_task) + # Call streaming callback with the result if provided + if self.streaming_callback: + self.streaming_callback(current_task) + self.conversation.add( agent.agent_name, current_task ) diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index ef6f7268..c1de30a9 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -49,6 +49,7 @@ class SequentialWorkflow: shared_memory_system: callable = None, multi_agent_collab_prompt: bool = True, team_awareness: bool = False, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): @@ -64,6 +65,8 @@ class SequentialWorkflow: output_type (OutputType, optional): Output format for the workflow. Defaults to "dict". shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None. multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent. + team_awareness (bool, optional): Whether to enable team awareness. Defaults to False. + streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -79,6 +82,7 @@ class SequentialWorkflow: self.shared_memory_system = shared_memory_system self.multi_agent_collab_prompt = multi_agent_collab_prompt self.team_awareness = team_awareness + self.streaming_callback = streaming_callback self.reliability_check() self.flow = self.sequential_flow() @@ -91,6 +95,7 @@ class SequentialWorkflow: max_loops=self.max_loops, output_type=self.output_type, team_awareness=self.team_awareness, + streaming_callback=self.streaming_callback, *args, **kwargs, ) @@ -183,6 +188,9 @@ class SequentialWorkflow: return self.agent_rearrange.run( task=task, img=img, + streaming_callback=self.streaming_callback, + *args, + **kwargs, ) except Exception as e: From 83fae0070b90f27beec3715e93d6967282c29057 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sat, 13 Sep 2025 13:31:18 -0700 Subject: [PATCH 2/5] [TEST] Streaming Callback for Sequential Workflow --- .../test_sequentialworkflow_streaming.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 examples/multi_agent/test_sequentialworkflow_streaming.py diff --git a/examples/multi_agent/test_sequentialworkflow_streaming.py b/examples/multi_agent/test_sequentialworkflow_streaming.py new file mode 100644 index 00000000..310c9cc8 --- /dev/null +++ b/examples/multi_agent/test_sequentialworkflow_streaming.py @@ -0,0 +1,88 @@ +from swarms.structs.agent import Agent +from swarms.structs.sequential_workflow import SequentialWorkflow + +def run_workflow_with_streaming_callback(task, streaming_callback): + """ + Run a sequential workflow with two agents and a streaming callback. + + Args: + task (str): The task to process through the workflow. + streaming_callback (callable): Function to handle streaming output. + + Returns: + The final result from the workflow. + """ + + agent1 = Agent( + name="Research Agent", + description="A research agent that can answer questions", + model_name="gpt-4o", + system_prompt=( + "You are a ResearchAgent. Your task is to research and gather " + "information about the given topic. Provide comprehensive research " + "findings and key insights." + ), + max_loops=1, + interactive=True, + verbose=True, + ) + + agent2 = Agent( + name="Analysis Agent", + description="An analysis agent that draws conclusions from research", + model_name="gpt-4o-mini", + system_prompt=( + "You are an AnalysisAgent. Your task is to analyze the research " + "provided by the previous agent and draw meaningful conclusions. " + "Provide detailed analysis and actionable insights." + ), + max_loops=1, + interactive=True, + verbose=True, + ) + + workflow = SequentialWorkflow( + id="research_analysis_workflow", + name="Research Analysis Workflow", + description="A sequential workflow that researches and analyzes topics", + agents=[agent1, agent2], + max_loops=1, + output_type="str", + streaming_callback=streaming_callback, + multi_agent_collab_prompt=True, + ) + return workflow.run(task) + +if __name__ == "__main__": + +# ## REGULAR STREAMING CALLBACK +# def streaming_callback(token): +# print(token, end="", flush=True) + +# run_workflow_with_streaming_callback( +# task="What are the latest advancements in AI?", +# streaming_callback=streaming_callback, +# ) + + +# ## CUSTOM BUFFERING STREAMING_CALLBACK BASED ON DEV PREFERED +# buffer = [] +# def streaming_callback(token): +# buffer.append(token) +# # Print in bigger chunks (e.g., every 20 tokens or on final flush) +# if len(buffer) >= 20 or token.endswith("\n"): +# print("".join(buffer), end="", flush=True) +# buffer.clear() +# # Optionally, you could add a flush at the end of the run if needed + +# run_workflow_with_streaming_callback( +# task="What are the latest advancements in AI?", +# streaming_callback=streaming_callback, +# ) + + +## NO ADDED STREAMING_CALLBACK + run_workflow_with_streaming_callback( + task="What are the latest advancements in AI?", + streaming_callback=None, + ) \ No newline at end of file From cd66613dff49bb7209efc2ce00a9d83cc3387aa7 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Sun, 14 Sep 2025 00:49:36 -0700 Subject: [PATCH 3/5] cleaned up comments --- .../test_sequentialworkflow_streaming.py | 45 ++++--------------- swarms/structs/agent_rearrange.py | 5 --- 2 files changed, 8 insertions(+), 42 deletions(-) diff --git a/examples/multi_agent/test_sequentialworkflow_streaming.py b/examples/multi_agent/test_sequentialworkflow_streaming.py index 310c9cc8..95fcf55d 100644 --- a/examples/multi_agent/test_sequentialworkflow_streaming.py +++ b/examples/multi_agent/test_sequentialworkflow_streaming.py @@ -1,17 +1,13 @@ from swarms.structs.agent import Agent from swarms.structs.sequential_workflow import SequentialWorkflow -def run_workflow_with_streaming_callback(task, streaming_callback): - """ - Run a sequential workflow with two agents and a streaming callback. - - Args: - task (str): The task to process through the workflow. - streaming_callback (callable): Function to handle streaming output. +def streaming_callback(token): + buffer.append(token) + if len(buffer) >= 20 or token.endswith("\n"): + print("".join(buffer), end="", flush=True) + buffer.clear() - Returns: - The final result from the workflow. - """ +def run_workflow_with_streaming_callback(task, streaming_callback): agent1 = Agent( name="Research Agent", @@ -55,34 +51,9 @@ def run_workflow_with_streaming_callback(task, streaming_callback): if __name__ == "__main__": -# ## REGULAR STREAMING CALLBACK -# def streaming_callback(token): -# print(token, end="", flush=True) - -# run_workflow_with_streaming_callback( -# task="What are the latest advancements in AI?", -# streaming_callback=streaming_callback, -# ) - + buffer = [] -# ## CUSTOM BUFFERING STREAMING_CALLBACK BASED ON DEV PREFERED -# buffer = [] -# def streaming_callback(token): -# buffer.append(token) -# # Print in bigger chunks (e.g., every 20 tokens or on final flush) -# if len(buffer) >= 20 or token.endswith("\n"): -# print("".join(buffer), end="", flush=True) -# buffer.clear() -# # Optionally, you could add a flush at the end of the run if needed - -# run_workflow_with_streaming_callback( -# task="What are the latest advancements in AI?", -# streaming_callback=streaming_callback, -# ) - - -## NO ADDED STREAMING_CALLBACK run_workflow_with_streaming_callback( task="What are the latest advancements in AI?", - streaming_callback=None, + streaming_callback=streaming_callback, ) \ No newline at end of file diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index 6480cb14..e691ef18 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -386,7 +386,6 @@ class AgentRearrange: for agent_name in agent_names: agent = self.agents[agent_name] - # Set agent.streaming_on if no streaming_callback if self.streaming_callback is not None: agent.streaming_on = True result = agent.run( @@ -398,7 +397,6 @@ class AgentRearrange: result = any_to_str(result) - # Call streaming callback with the result if provided if self.streaming_callback: self.streaming_callback(result) @@ -422,7 +420,6 @@ class AgentRearrange: agent = self.agents[agent_name] - # Add sequential awareness information for the agent awareness_info = ( self._get_sequential_awareness( agent_name, tasks @@ -436,7 +433,6 @@ class AgentRearrange: f"Added sequential awareness for {agent_name}: {awareness_info}" ) - # Set agent.streaming_on if no streaming_callback if self.streaming_callback is not None: agent.streaming_on = True current_task = agent.run( @@ -447,7 +443,6 @@ class AgentRearrange: ) current_task = any_to_str(current_task) - # Call streaming callback with the result if provided if self.streaming_callback: self.streaming_callback(current_task) From 93612a93078316e20a2a820b9c7bf6f8c44a93cc Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Mon, 15 Sep 2025 06:57:57 -0700 Subject: [PATCH 4/5] [DOCS] Streaming Callback in Sequential --- docs/swarms/structs/sequential_workflow.md | 210 ++++++++++++++++++++- 1 file changed, 209 insertions(+), 1 deletion(-) diff --git a/docs/swarms/structs/sequential_workflow.md b/docs/swarms/structs/sequential_workflow.md index 25b93a93..84907583 100644 --- a/docs/swarms/structs/sequential_workflow.md +++ b/docs/swarms/structs/sequential_workflow.md @@ -56,7 +56,7 @@ The SequentialWorkflow now includes a powerful **sequential awareness** feature ## Methods -### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)` +### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, streaming_callback: Optional[Callable[[str], None]] = None, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)` The constructor initializes the `SequentialWorkflow` object with enhanced sequential awareness capabilities. @@ -66,6 +66,7 @@ The constructor initializes the `SequentialWorkflow` object with enhanced sequen - `team_awareness` (`bool`, optional): **NEW**: Enables sequential awareness features. Defaults to `False`. - `time_enabled` (`bool`, optional): **NEW**: Enables timestamps in conversation. Defaults to `False`. - `message_id_on` (`bool`, optional): **NEW**: Enables message IDs in conversation. Defaults to `False`. + - `streaming_callback` (`Callable[[str], None]`, optional): **NEW**: Enables streaming callback in conversation. Defaults to `None`. - `*args`: Variable length argument list. - `**kwargs`: Arbitrary keyword arguments. @@ -281,3 +282,210 @@ The `run` method now includes enhanced logging to track the sequential awareness 5. **Professional Workflows**: Mimics real-world team collaboration patterns The SequentialWorkflow with sequential awareness represents a significant advancement in multi-agent coordination, enabling more sophisticated and professional workflows that closely mirror human team collaboration patterns. + +# SequentialWorkflow Streaming Callback Documentation + +## **NEW: Streaming Callback Feature** + +The SequentialWorkflow now includes a powerful **streaming callback** feature that allows you to receive and process tokens in real-time as the workflow executes. This enables real-time streaming of agent responses, making it ideal for interactive applications and live monitoring of workflow progress. + +### What the Streaming Callback Does + +- **Real-time Token Streaming**: Receive individual tokens as they are generated by agents +- **Live Progress Monitoring**: Track workflow execution progress in real-time +- **Interactive Applications**: Enable streaming responses in chat applications or live demos +- **Custom Processing**: Implement custom logic for handling streaming tokens (buffering, filtering, etc.) + +## `streaming_callback(self, streaming_callback: Optional[Callable[[str], None]] = None)` + +Integrates streaming callback functionality into the SequentialWorkflow for real-time token processing. + +### Parameters: +- `streaming_callback` (`Optional[Callable[[str], None]]`): A callback function that receives streaming tokens in real-time. The function should accept a single string parameter (the token) and return None. Defaults to `None`. + +### Callback Function Signature: +```python +def callback_function(token: str) -> None: + pass +``` + +## **Usage Example with Streaming Callback:** + +```python +from swarms import Agent, SequentialWorkflow + +def streaming_callback(token: str) -> None: + """ + Custom streaming callback function that buffers tokens and prints them + when a threshold is reached or a newline is encountered. + """ + buffer.append(token) + if len(buffer) >= 20 or token.endswith("\n"): + print("".join(buffer), end="", flush=True) + buffer.clear() + +# Initialize agents for research and analysis workflow +research_agent = Agent( + agent_name="Research Agent", + system_prompt="Conduct thorough research on the given topic and provide comprehensive findings.", + model_name="gpt-4o", + max_loops=1, +) + +analysis_agent = Agent( + agent_name="Analysis Agent", + system_prompt="Analyze the research findings and provide actionable insights and conclusions.", + model_name="gpt-4o-mini", + max_loops=1, +) + +# Create workflow with streaming callback +workflow = SequentialWorkflow( + agents=[research_agent, analysis_agent], + max_loops=1, + team_awareness=True, + streaming_callback=streaming_callback, # Enable real-time streaming + time_enabled=True, + message_id_on=True +) + +# Execute workflow with live streaming +buffer = [] # Initialize buffer for the callback +result = workflow.run( + "Research the latest advancements in quantum computing and analyze their potential impact on cryptography" +) + +print(f"\n\nFinal Result: {result}") +``` + +### Expected Output: +- Output appears in real time, streaming partial results as they are generated. +- Chunks of text are printed to the terminal as soon as they are available. +- Each agent's output is shown in sequence (e.g., research findings, then analysis). +- The final result is printed at the end after all agents have finished. +- There may be brief pauses between streamed outputs as each agent completes their step. + + +## **Advanced Streaming Examples** + +### **Example 1: File Logging with Streaming** +```python +def file_logging_callback(token: str) -> None: + """Stream tokens to a log file in real-time.""" + with open("workflow_stream.log", "a", encoding="utf-8") as f: + f.write(token) + +workflow = SequentialWorkflow( + agents=[research_agent, analysis_agent], + streaming_callback=file_logging_callback +) +``` + +### **Example 2: Progress Bar Integration** +```python +import sys + +def progress_callback(token: str) -> None: + """Update a progress bar as tokens stream in.""" + sys.stdout.write(token) + sys.stdout.flush() + +# Use in your workflow +workflow = SequentialWorkflow( + agents=[agent1, agent2, agent3], + streaming_callback=progress_callback +) +``` + +### **Example 3: Token Filtering and Processing** +```python +def smart_callback(token: str) -> None: + """Filter and process tokens based on custom logic.""" + # Skip whitespace-only tokens + if token.strip(): + # Highlight key terms + if any(keyword in token.lower() for keyword in ["error", "warning", "success"]): + print(f"\033[93m{token}\033[0m", end="", flush=True) # Yellow highlighting + else: + print(token, end="", flush=True) + +workflow = SequentialWorkflow( + agents=[agent1, agent2], + streaming_callback=smart_callback +) +``` + +## **How Streaming Callback Works** + +### 1. **Real-Time Token Reception** +As each agent in the workflow generates responses, tokens are immediately passed to your callback function: + +```python +# Tokens flow like this: +# Agent1: "Research" -> callback("Research") +# Agent1: " shows" -> callback(" shows") +# Agent1: " that" -> callback(" that") +# Agent2: "Analysis" -> callback("Analysis") +# ...and so on +``` + +### 2. **Non-Blocking Execution** +The streaming callback operates asynchronously and doesn't block the workflow execution. Your callback function receives tokens as soon as they're available. + +### 3. **Memory Efficient** +Tokens are processed individually, making it memory-efficient for long-running workflows. + +## **Benefits of Streaming Callback** + +1. **Real-Time User Experience**: Users see responses as they're generated, improving perceived performance +2. **Live Monitoring**: Track workflow progress and agent outputs in real-time +3. **Interactive Applications**: Perfect for chat interfaces, dashboards, and live demos +4. **Debugging**: Monitor agent outputs token-by-token for debugging purposes +5. **Custom Integration**: Easily integrate with logging systems, progress bars, or custom UI components + +The streaming callback feature transforms the SequentialWorkflow into a powerful tool for real-time AI applications, enabling seamless integration with modern streaming interfaces and live monitoring systems. + +## **Notes:** + +- **Backward Compatibility**: Existing workflows continue to work without changes when `streaming_callback=None` +- **Performance**: Streaming adds minimal overhead while providing significant real-time benefits +- **Error Handling**: Implement proper error handling in your callback function to prevent workflow interruption +- **Thread Safety**: Ensure your callback function is thread-safe if used in concurrent workflows + +## **Integration Examples** + +### **WebSocket Streaming** +```python +import asyncio +import websockets + +async def websocket_callback(token: str) -> None: + """Send tokens via WebSocket for real-time web updates.""" + if websocket_connection: + await websocket_connection.send(token) + +# In your async workflow +workflow = SequentialWorkflow( + agents=[agent1, agent2], + streaming_callback=lambda t: asyncio.create_task(websocket_callback(t)) +) +``` + +### **Database Streaming** +```python +def database_callback(token: str) -> None: + """Stream tokens to database for real-time analytics.""" + # Buffer tokens and batch insert to database + token_buffer.append(token) + if len(token_buffer) >= 100: + # Batch insert to database + db.insert_tokens(token_buffer.copy()) + token_buffer.clear() + +workflow = SequentialWorkflow( + agents=[agent1, agent2, agent3], + streaming_callback=database_callback +) +``` + +Using a streaming callback in SequentialWorkflow enables real-time visibility into agent outputs, making it ideal for interactive applications and live monitoring. This feature enhances user experience and debugging by allowing immediate feedback and seamless integration with modern interfaces. From bca8bbdf87019ef8b26ae80ff78567e1a025f4e9 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Fri, 19 Sep 2025 17:59:59 -0700 Subject: [PATCH 5/5] [FIX] Streaming Callback & Demo Reconfig --- .../demo_sequential_workflow_streaming.py | 119 ++++++++++++++++ swarms/structs/agent_rearrange.py | 131 ++++++++++++------ swarms/structs/sequential_workflow.py | 7 +- 3 files changed, 213 insertions(+), 44 deletions(-) create mode 100644 examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py diff --git a/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py b/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py new file mode 100644 index 00000000..7e792aaa --- /dev/null +++ b/examples/multi_agent/sequential_workflow/demo_sequential_workflow_streaming.py @@ -0,0 +1,119 @@ +import time +from typing import Callable +from swarms.structs.agent import Agent +from swarms.structs.sequential_workflow import SequentialWorkflow + +def create_streaming_callback() -> Callable[[str, str, bool], None]: + """ + Create a streaming callback that shows live paragraph formation. + """ + agent_buffers = {} + paragraph_count = {} + + def streaming_callback(agent_name: str, chunk: str, is_final: bool): + timestamp = time.strftime("%H:%M:%S") + + # Initialize buffers for new agents + if agent_name not in agent_buffers: + agent_buffers[agent_name] = "" + paragraph_count[agent_name] = 1 + print(f"\nšŸŽ¬ [{timestamp}] {agent_name} starting...") + print("=" * 60) + + if chunk.strip(): + # Split chunk into tokens (words/punctuation) + tokens = chunk.replace("\n", " \n ").split() + + for token in tokens: + # Handle paragraph breaks + if token == "\n": + if agent_buffers[agent_name].strip(): + print( + f"\nšŸ“„ [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:" + ) + print(f"{agent_buffers[agent_name].strip()}") + print("=" * 60) + paragraph_count[agent_name] += 1 + agent_buffers[agent_name] = "" + else: + # Add token to buffer and show live accumulation + agent_buffers[agent_name] += token + " " + print( + f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}", + end="", + flush=True, + ) + + if is_final: + print() # New line after live updates + # Print any remaining content as final paragraph + if agent_buffers[agent_name].strip(): + print( + f"\nāœ… [{timestamp}] {agent_name} COMPLETED - Final Paragraph:" + ) + print(f"{agent_buffers[agent_name].strip()}") + print() + print(f"šŸŽÆ [{timestamp}] {agent_name} finished processing") + print(f"šŸ“Š Total paragraphs processed: {paragraph_count[agent_name] - 1}") + print("=" * 60) + + return streaming_callback + +def create_agents(): + """Create specialized agents for the workflow.""" + return [ + Agent( + agent_name="Research_Agent", + agent_description="Specialized in gathering and analyzing information", + system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + Agent( + agent_name="Analysis_Agent", + agent_description="Expert at analyzing data and drawing insights", + system_prompt="You are an analysis expert. Break down complex information and provide clear insights.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + ] + +if __name__ == "__main__": + print("šŸŽÆ SEQUENTIAL WORKFLOW STREAMING DEMO") + print("=" * 50) + + # Create agents and workflow + agents = create_agents() + workflow = SequentialWorkflow( + id="research_analysis_workflow", + name="Research Analysis Workflow", + description="A sequential workflow that researches and analyzes topics", + agents=agents, + max_loops=1, + output_type="str", + multi_agent_collab_prompt=True, + ) + + # Define task + task = "What are the latest advancements in AI?" + + print(f"Task: {task.strip()}") + + # Create streaming callback + streaming_callback = create_streaming_callback() + + print("\nšŸŽ¬ EXECUTING WITH STREAMING CALLBACKS...") + print("Watch real-time agent outputs below:\n") + + # Execute with streaming + result = workflow.run( + task=task, + streaming_callback=streaming_callback, + ) + + print("\nšŸŽ‰ EXECUTION COMPLETED!") + print("\nšŸ“Š FINAL RESULT:") + print("-" * 50) + print(result) \ No newline at end of file diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index e691ef18..0398636c 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -5,7 +5,6 @@ from typing import Any, Callable, Dict, List, Optional, Union from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.telemetry.main import log_agent_data -from swarms.utils.any_to_str import any_to_str from swarms.utils.history_output_formatter import ( history_output_formatter, ) @@ -313,6 +312,9 @@ class AgentRearrange: task: str = None, img: str = None, custom_tasks: Dict[str, str] = None, + streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs, ): @@ -347,8 +349,6 @@ class AgentRearrange: return "Invalid flow configuration." tasks = self.flow.split("->") - current_task = task - response_dict = {} logger.info( f"Starting task execution with {len(tasks)} steps" @@ -386,30 +386,51 @@ class AgentRearrange: for agent_name in agent_names: agent = self.agents[agent_name] - if self.streaming_callback is not None: - agent.streaming_on = True - result = agent.run( - task=self.conversation.get_str(), - img=img, - *args, - **kwargs, - ) - result = any_to_str(result) - - - if self.streaming_callback: - self.streaming_callback(result) - - self.conversation.add( - agent.agent_name, result - ) - - response_dict[agent_name] = result - logger.debug( - f"Agent {agent_name} output: {result}" - ) - - ",".join(agent_names) + # Handle streaming callback if provided + if streaming_callback is not None: + + def agent_streaming_callback(chunk: str): + """Wrapper for agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback( + agent_name, chunk, False + ) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}" + ) + + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + streaming_callback=agent_streaming_callback, + *args, + **kwargs, + ) + + # Call completion callback + try: + streaming_callback(agent_name, "", True) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}" + ) + else: + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + *args, + **kwargs, + ) + self.conversation.add(role=agent_name, content=output) + + if self.verbose: + logger.success( + f"[SUCCESS] Agent {agent_name} completed task successfully" + ) + + self.conversation.add(role=agent_name, content=output) else: # Sequential processing @@ -433,24 +454,50 @@ class AgentRearrange: f"Added sequential awareness for {agent_name}: {awareness_info}" ) - if self.streaming_callback is not None: - agent.streaming_on = True - current_task = agent.run( - task=self.conversation.get_str(), - img=img, - *args, - **kwargs, - ) - current_task = any_to_str(current_task) + # Handle streaming callback if provided + if streaming_callback is not None: + + def agent_streaming_callback(chunk: str): + """Wrapper for agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback( + agent_name, chunk, False + ) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}" + ) + + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + streaming_callback=agent_streaming_callback, + *args, + **kwargs, + ) - if self.streaming_callback: - self.streaming_callback(current_task) + # Call completion callback + try: + streaming_callback(agent_name, "", True) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}" + ) + else: + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + *args, + **kwargs, + ) + self.conversation.add(role=agent_name, content=output) - self.conversation.add( - agent.agent_name, current_task - ) + if self.verbose: + logger.success( + f"[SUCCESS] Agent {agent_name} completed task successfully" + ) - response_dict[agent_name] = current_task loop_count += 1 diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index c1de30a9..aa99fd13 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -49,7 +49,7 @@ class SequentialWorkflow: shared_memory_system: callable = None, multi_agent_collab_prompt: bool = True, team_awareness: bool = False, - streaming_callback: Optional[Callable[[str], None]] = None, + streaming_callback: Optional[Callable[[str, str, bool], None]] = None, *args, **kwargs, ): @@ -163,6 +163,9 @@ class SequentialWorkflow: task: str, img: Optional[str] = None, imgs: Optional[List[str]] = None, + streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs, ): @@ -188,7 +191,7 @@ class SequentialWorkflow: return self.agent_rearrange.run( task=task, img=img, - streaming_callback=self.streaming_callback, + streaming_callback=streaming_callback, *args, **kwargs, )