diff --git a/docs/swarms/structs/sequential_workflow.md b/docs/swarms/structs/sequential_workflow.md index 5f5e40ed..e0372f3c 100644 --- a/docs/swarms/structs/sequential_workflow.md +++ b/docs/swarms/structs/sequential_workflow.md @@ -283,78 +283,70 @@ The `run` method now includes enhanced logging to track the sequential awareness The SequentialWorkflow with sequential awareness represents a significant advancement in multi-agent coordination, enabling more sophisticated and professional workflows that closely mirror human team collaboration patterns. -## **NEW: SequentialWorkflow Streaming Callback Documentation** - - -The SequentialWorkflow now includes a powerful **streaming callback** feature that allows you to receive and process tokens in real-time as the workflow executes. This enables real-time streaming of agent responses, making it ideal for interactive applications and live monitoring of workflow progress. - -### What the Streaming Callback Does -- **Real-time Token Streaming**: Receive individual tokens as they are generated by agents -- **Live Progress Monitoring**: Track workflow execution progress in real-time -- **Interactive Applications**: Enable streaming responses in chat applications or live demos -- **Custom Processing**: Implement custom logic for handling streaming tokens (buffering, filtering, etc.) - -## `streaming_callback(self, streaming_callback: Optional[Callable[[str], None]] = None)` +## **NEW: SequentialWorkflow Streaming Callback Documentation** -Integrates streaming callback functionality into the SequentialWorkflow for real-time token processing. +The SequentialWorkflow now includes a powerful **streaming callback** feature that allows you to receive and process tokens in real-time as the workflow executes, helping you monitor execution progress from agents and create interactive chat applications/live demos. This enables real-time streaming of agent responses, making it ideal for interactive applications and live monitoring of workflow progress. -### Parameters: -- `streaming_callback` (`Optional[Callable[[str], None]]`): A callback function that receives streaming tokens in real-time. The function should accept a single string parameter (the token) and return None. Defaults to `None`. +###Parameter +## `streaming_callback: Optional[Callable[[str, str, bool], None]] = None` -### Callback Function Signature: -```python -def callback_function(token: str) -> None: - pass -``` +- The function should accept a single string parameter (the token) and return None. Defaults to `None`. ## **Usage Example with Streaming Callback:** ```python -from swarms import Agent, SequentialWorkflow - -def streaming_callback(token: str) -> None: - """ - Custom streaming callback function that buffers tokens and prints them - when a threshold is reached or a newline is encountered. - """ - buffer.append(token) - if len(buffer) >= 20 or token.endswith("\n"): - print("".join(buffer), end="", flush=True) - buffer.clear() - -# Initialize agents for research and analysis workflow -research_agent = Agent( - agent_name="Research Agent", - system_prompt="Conduct thorough research on the given topic and provide comprehensive findings.", - model_name="gpt-4o", - max_loops=1, -) - -analysis_agent = Agent( - agent_name="Analysis Agent", - system_prompt="Analyze the research findings and provide actionable insights and conclusions.", - model_name="gpt-4o-mini", - max_loops=1, -) - -# Create workflow with streaming callback -workflow = SequentialWorkflow( - agents=[research_agent, analysis_agent], - max_loops=1, - team_awareness=True, - streaming_callback=streaming_callback, # Enable real-time streaming - time_enabled=True, - message_id_on=True -) - -# Execute workflow with live streaming -buffer = [] # Initialize buffer for the callback -result = workflow.run( - "Research the latest advancements in quantum computing and analyze their potential impact on cryptography" -) - -print(f"\n\nFinal Result: {result}") +from swarms.structs.agent import Agent +from swarms.structs.sequential_workflow import SequentialWorkflow + +"""Callback Function to Be Used""" +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + if chunk: + print(chunk, end="", flush=True) + if is_final: + print() + +def create_agents(): + """Create specialized agents for the workflow.""" + return [ + Agent( + agent_name="Research_Agent", + agent_description="Specialized in gathering and analyzing information", + system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + print_on=False, + ), + Agent( + agent_name="Analysis_Agent", + agent_description="Expert at analyzing data and drawing insights", + system_prompt="You are an analysis expert. Break down complex information and provide clear insights.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + print_on=False, + ), + ] + +if __name__ == "__main__": + agents = create_agents() + workflow = SequentialWorkflow( + id="research_analysis_workflow", + name="Research Analysis Workflow", + description="A sequential workflow that researches and analyzes topics", + agents=agents, + max_loops=1, + output_type="str", + multi_agent_collab_prompt=True, + ) + + task = "What are the latest advancements in AI?" + + workflow.run( + task=task, + streaming_callback=streaming_callback, + ) ``` ### Expected Output: @@ -380,23 +372,7 @@ workflow = SequentialWorkflow( ) ``` -### **Example 2: Progress Bar Integration** -```python -import sys - -def progress_callback(token: str) -> None: - """Update a progress bar as tokens stream in.""" - sys.stdout.write(token) - sys.stdout.flush() - -# Use in your workflow -workflow = SequentialWorkflow( - agents=[agent1, agent2, agent3], - streaming_callback=progress_callback -) -``` - -### **Example 3: Token Filtering and Processing** +### **Example 2: Token Filtering and Processing** ```python def smart_callback(token: str) -> None: """Filter and process tokens based on custom logic.""" @@ -414,35 +390,6 @@ workflow = SequentialWorkflow( ) ``` -## **How Streaming Callback Works** - -### 1. **Real-Time Token Reception** -As each agent in the workflow generates responses, tokens are immediately passed to your callback function: - -```python -# Tokens flow like this: -# Agent1: "Research" -> callback("Research") -# Agent1: " shows" -> callback(" shows") -# Agent1: " that" -> callback(" that") -# Agent2: "Analysis" -> callback("Analysis") -# ...and so on -``` - -### 2. **Non-Blocking Execution** -The streaming callback operates asynchronously and doesn't block the workflow execution. Your callback function receives tokens as soon as they're available. - -### 3. **Memory Efficient** -Tokens are processed individually, making it memory-efficient for long-running workflows. - -## **Benefits of Streaming Callback** - -1. **Real-Time User Experience**: Users see responses as they're generated, improving perceived performance -2. **Live Monitoring**: Track workflow progress and agent outputs in real-time -3. **Interactive Applications**: Perfect for chat interfaces, dashboards, and live demos -4. **Debugging**: Monitor agent outputs token-by-token for debugging purposes -5. **Custom Integration**: Easily integrate with logging systems, progress bars, or custom UI components - -The streaming callback feature transforms the SequentialWorkflow into a powerful tool for real-time AI applications, enabling seamless integration with modern streaming interfaces and live monitoring systems. ## **Notes:** @@ -450,41 +397,3 @@ The streaming callback feature transforms the SequentialWorkflow into a powerful - **Performance**: Streaming adds minimal overhead while providing significant real-time benefits - **Error Handling**: Implement proper error handling in your callback function to prevent workflow interruption - **Thread Safety**: Ensure your callback function is thread-safe if used in concurrent workflows - -## **Integration Examples** - -### **WebSocket Streaming** -```python -import asyncio -import websockets - -async def websocket_callback(token: str) -> None: - """Send tokens via WebSocket for real-time web updates.""" - if websocket_connection: - await websocket_connection.send(token) - -# In your async workflow -workflow = SequentialWorkflow( - agents=[agent1, agent2], - streaming_callback=lambda t: asyncio.create_task(websocket_callback(t)) -) -``` - -### **Database Streaming** -```python -def database_callback(token: str) -> None: - """Stream tokens to database for real-time analytics.""" - # Buffer tokens and batch insert to database - token_buffer.append(token) - if len(token_buffer) >= 100: - # Batch insert to database - db.insert_tokens(token_buffer.copy()) - token_buffer.clear() - -workflow = SequentialWorkflow( - agents=[agent1, agent2, agent3], - streaming_callback=database_callback -) -``` - -Using a streaming callback in SequentialWorkflow enables real-time visibility into agent outputs, making it ideal for interactive applications and live monitoring. This feature enhances user experience and debugging by allowing immediate feedback and seamless integration with modern interfaces.