From fc723044919da52cbe5c5bed20112fe30b17dc99 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Mon, 8 Sep 2025 16:25:33 -0700 Subject: [PATCH] [IMPROVEMENT][Improve HiearchicalSwarm with streaming callbacks][Add docs as well][New examples] --- .env.example | 3 + docs/swarms/structs/hierarchical_swarm.md | 189 ++++++++++++-- .../batched_grid_simple_example.py | 13 +- .../batched_grid_swarm_router.py | 41 ++++ .../hierarchical_swarm_basic_demo.py | 56 +++++ .../hierarchical_swarm_batch_demo.py | 97 ++++++++ .../hierarchical_swarm_comparison_demo.py | 124 ++++++++++ .../hierarchical_swarm_streaming_example.py | 232 ++++++++++++++++++ hierarchical_swarm_streaming_demo.py | 135 ++++++++++ swarms/structs/__init__.py | 2 +- swarms/structs/hiearchical_swarm.py | 88 +++++-- swarms/structs/swarm_router.py | 28 ++- swarms/utils/formatter.py | 3 +- 13 files changed, 957 insertions(+), 54 deletions(-) rename batched_grid_simple_example.py => examples/multi_agent/batched_grid_workflow/batched_grid_simple_example.py (58%) create mode 100644 examples/multi_agent/batched_grid_workflow/batched_grid_swarm_router.py create mode 100644 examples/multi_agent/hiearchical_swarm/hierarchical_swarm_basic_demo.py create mode 100644 examples/multi_agent/hiearchical_swarm/hierarchical_swarm_batch_demo.py create mode 100644 examples/multi_agent/hiearchical_swarm/hierarchical_swarm_comparison_demo.py create mode 100644 examples/multi_agent/hiearchical_swarm/hierarchical_swarm_streaming_example.py create mode 100644 hierarchical_swarm_streaming_demo.py diff --git a/.env.example b/.env.example index a7bd6b36..5d0a88ae 100644 --- a/.env.example +++ b/.env.example @@ -46,3 +46,6 @@ AZURE_OPENAI_DEPLOYMENT="" OPENAI_API_VERSION="" AZURE_OPENAI_API_KEY="" AZURE_OPENAI_AD_TOKEN="" + + +SWARMS_OUTPUT_FORMATTING_MARKDOWN_ENABLED=False \ No newline at end of file diff --git a/docs/swarms/structs/hierarchical_swarm.md b/docs/swarms/structs/hierarchical_swarm.md index 3ff7605f..00967404 100644 --- a/docs/swarms/structs/hierarchical_swarm.md +++ b/docs/swarms/structs/hierarchical_swarm.md @@ -8,7 +8,7 @@ The Hierarchical Swarm follows a clear workflow pattern: 1. **Task Reception**: User provides a task to the swarm 2. **Planning**: Director creates a comprehensive plan and distributes orders to agents -3. **Execution**: Individual agents execute their assigned tasks +3. **Execution**: Individual agents execute their assigned tasks (with optional real-time streaming) 4. **Feedback Loop**: Director evaluates results and issues new orders if needed (up to `max_loops`) 5. **Context Preservation**: All conversation history and context is maintained throughout the process @@ -34,12 +34,16 @@ graph TD ## Key Features -- **Hierarchical Coordination**: Director agent orchestrates all operations -- **Specialized Agents**: Each agent has specific expertise and responsibilities -- **Iterative Refinement**: Multiple feedback loops for improved results -- **Context Preservation**: Full conversation history maintained -- **Flexible Output Formats**: Support for various output types (dict, str, list) -- **Comprehensive Logging**: Detailed logging for debugging and monitoring +| Feature | Description | +|------------------------------|-----------------------------------------------------------------------------------------------| +| **Hierarchical Coordination**| Director agent orchestrates all operations | +| **Specialized Agents** | Each agent has specific expertise and responsibilities | +| **Iterative Refinement** | Multiple feedback loops for improved results | +| **Context Preservation** | Full conversation history maintained | +| **Flexible Output Formats** | Support for various output types (dict, str, list) | +| **Comprehensive Logging** | Detailed logging for debugging and monitoring | +| **Live Streaming** | Real-time streaming callbacks for monitoring agent outputs | +| **Token-by-Token Updates** | Watch text formation in real-time as agents generate responses | ## `HierarchicalSwarm` Constructor @@ -60,7 +64,7 @@ graph TD ## Core Methods -### `run(task, img=None, *args, **kwargs)` +### `run(task, img=None, streaming_callback=None, *args, **kwargs)` Executes the hierarchical swarm for a specified number of feedback loops, processing the task through multiple iterations for refinement and improvement. @@ -70,6 +74,7 @@ Executes the hierarchical swarm for a specified number of feedback loops, proces |-----------|------|---------|-------------| | `task` | `str` | **Required** | The initial task to be processed by the swarm | | `img` | `str` | `None` | Optional image input for the agents | +| `streaming_callback` | `Callable[[str, str, bool], None]` | `None` | Optional callback for real-time streaming of agent outputs | | `*args` | `Any` | - | Additional positional arguments | | `**kwargs` | `Any` | - | Additional keyword arguments | @@ -113,26 +118,75 @@ result = swarm.run(task=task) print(result) ``` -### `step(task, img=None, *args, **kwargs)` +#### Streaming Callback Example -Runs a single step of the hierarchical swarm, executing one complete cycle of planning, distribution, execution, and feedback. +```python +from swarms import Agent +from swarms.structs.hiearchical_swarm import HierarchicalSwarm -#### Parameters +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + """Callback function for real-time streaming of agent outputs.""" + if not hasattr(streaming_callback, 'buffers'): + streaming_callback.buffers = {} + streaming_callback.paragraph_count = {} + + if agent_name not in streaming_callback.buffers: + streaming_callback.buffers[agent_name] = "" + streaming_callback.paragraph_count[agent_name] = 1 + print(f"\nšŸŽ¬ {agent_name} starting...") + + if chunk.strip(): + tokens = chunk.replace('\n', ' \n ').split() + for token in tokens: + if token == '\n': + if streaming_callback.buffers[agent_name].strip(): + print(f"\nšŸ“„ {agent_name} - Paragraph {streaming_callback.paragraph_count[agent_name]} Complete:") + print(f"{streaming_callback.buffers[agent_name].strip()}") + streaming_callback.paragraph_count[agent_name] += 1 + streaming_callback.buffers[agent_name] = "" + else: + streaming_callback.buffers[agent_name] += token + " " + print(f"\r{agent_name} | {streaming_callback.buffers[agent_name].strip()}", end="", flush=True) + + if is_final: + print(f"\nāœ… {agent_name} completed!") + +# Create agents +agents = [ + Agent(agent_name="Researcher", model_name="gpt-4o-mini"), + Agent(agent_name="Analyst", model_name="gpt-4o-mini"), +] + +# Initialize swarm +swarm = HierarchicalSwarm( + name="Streaming-Analysis-Swarm", + agents=agents, + max_loops=1, + verbose=True, +) + +# Execute with streaming +task = "Analyze the impact of AI on the job market" +result = swarm.run(task=task, streaming_callback=streaming_callback) +``` + +#### Parameters (step method) | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `task` | `str` | **Required** | The task to be executed in this step | | `img` | `str` | `None` | Optional image input for the agents | +| `streaming_callback` | `Callable[[str, str, bool], None]` | `None` | Optional callback for real-time streaming of agent outputs | | `*args` | `Any` | - | Additional positional arguments | | `**kwargs` | `Any` | - | Additional keyword arguments | -#### Returns +#### Returns (step method) | Type | Description | |------|-------------| | `str` | Feedback from the director based on agent outputs | -#### Example +#### Example (step method) ```python from swarms import Agent @@ -166,26 +220,23 @@ feedback = swarm.step(task=task) print("Director Feedback:", feedback) ``` -### `batched_run(tasks, img=None, *args, **kwargs)` - -Executes the hierarchical swarm for a list of tasks, processing each task through the complete workflow. - -#### Parameters +#### Parameters (batched_run method) | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `tasks` | `List[str]` | **Required** | List of tasks to be processed | | `img` | `str` | `None` | Optional image input for the agents | +| `streaming_callback` | `Callable[[str, str, bool], None]` | `None` | Optional callback for real-time streaming of agent outputs | | `*args` | `Any` | - | Additional positional arguments | | `**kwargs` | `Any` | - | Additional keyword arguments | -#### Returns +#### Returns (batched_run method) | Type | Description | |------|-------------| | `List[Any]` | List of results for each task | -#### Example +#### Example (batched_run method) ```python from swarms import Agent @@ -334,14 +385,98 @@ The `HierarchicalSwarm` supports various output formats through the `output_type | `"str"` | Returns conversation history as a string | For simple text output | | `"list"` | Returns conversation history as a list | For sequential processing | +## Streaming Callbacks + +The `HierarchicalSwarm` supports real-time streaming of agent outputs through optional callback functions. This feature allows you to monitor the text generation process as it happens, token by token. + +### Streaming Callback Function Signature + +```python +def streaming_callback(agent_name: str, chunk: str, is_final: bool) -> None: + """ + Callback function for real-time streaming of agent outputs. + + Args: + agent_name (str): The name of the agent producing the output + chunk (str): The chunk of text generated (empty if is_final=True) + is_final (bool): True when the agent has completed its task + """ + pass +``` + +### Streaming Callback Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `agent_name` | `str` | The name of the agent currently generating output | +| `chunk` | `str` | The text chunk generated by the agent | +| `is_final` | `bool` | Indicates if this is the final chunk (agent completed) | + +### Live Paragraph Formation + +The streaming callback can accumulate tokens to show live paragraph formation: + +```python +def live_paragraph_callback(agent_name: str, chunk: str, is_final: bool): + """Shows live paragraph formation as text is generated.""" + if not hasattr(live_paragraph_callback, 'buffers'): + live_paragraph_callback.buffers = {} + + if agent_name not in live_paragraph_callback.buffers: + live_paragraph_callback.buffers[agent_name] = "" + print(f"\nšŸŽ¬ {agent_name} starting...") + + if chunk.strip(): + tokens = chunk.replace('\n', ' \n ').split() + for token in tokens: + if token == '\n': + if live_paragraph_callback.buffers[agent_name].strip(): + print(f"\nšŸ“„ {agent_name} - Paragraph Complete:") + print(f"{live_paragraph_callback.buffers[agent_name].strip()}") + live_paragraph_callback.buffers[agent_name] = "" + else: + live_paragraph_callback.buffers[agent_name] += token + " " + print(f"\r{agent_name} | {live_paragraph_callback.buffers[agent_name].strip()}", end="", flush=True) + + if is_final: + print(f"\nāœ… {agent_name} completed!") +``` + +### Streaming Use Cases + +- **Real-time Monitoring**: Watch agents work simultaneously +- **Progress Tracking**: See text formation token by token +- **Live Debugging**: Monitor agent performance in real-time +- **User Experience**: Provide live feedback to users +- **Logging**: Capture detailed execution traces + +### Streaming in Different Methods + +Streaming callbacks work with all execution methods: + +```python +# Single task with streaming +result = swarm.run(task=task, streaming_callback=my_callback) + +# Single step with streaming +result = swarm.step(task=task, streaming_callback=my_callback) + +# Batch processing with streaming +results = swarm.batched_run(tasks=tasks, streaming_callback=my_callback) +``` + ## Best Practices -1. **Agent Specialization**: Create agents with specific, well-defined expertise areas -2. **Clear Task Descriptions**: Provide detailed, actionable task descriptions -3. **Appropriate Loop Count**: Set `max_loops` based on task complexity (1-3 for most tasks) -4. **Verbose Logging**: Enable verbose mode during development for debugging -5. **Context Preservation**: Leverage the built-in conversation history for continuity -6. **Error Handling**: Implement proper error handling for production use +| Best Practice | Description | +|------------------------------|--------------------------------------------------------------------------------------------------| +| **Agent Specialization** | Create agents with specific, well-defined expertise areas | +| **Clear Task Descriptions** | Provide detailed, actionable task descriptions | +| **Appropriate Loop Count** | Set `max_loops` based on task complexity (1-3 for most tasks) | +| **Verbose Logging** | Enable verbose mode during development for debugging | +| **Context Preservation** | Leverage the built-in conversation history for continuity | +| **Error Handling** | Implement proper error handling for production use | +| **Streaming Callbacks** | Use streaming callbacks for real-time monitoring and user feedback | +| **Callback Performance** | Keep streaming callbacks lightweight to avoid blocking the main execution thread | ## Error Handling @@ -357,4 +492,4 @@ The `HierarchicalSwarm` includes comprehensive error handling with detailed logg - **Loop Optimization**: Balance between thoroughness and performance with `max_loops` - **Agent Count**: More agents increase coordination overhead - **Model Selection**: Choose appropriate models for your use case and budget -- **Verbose Mode**: Disable verbose logging in production for better performance \ No newline at end of file +- **Verbose Mode**: Disable verbose logging in production for better performance diff --git a/batched_grid_simple_example.py b/examples/multi_agent/batched_grid_workflow/batched_grid_simple_example.py similarity index 58% rename from batched_grid_simple_example.py rename to examples/multi_agent/batched_grid_workflow/batched_grid_simple_example.py index 2dfc8801..685aedcd 100644 --- a/batched_grid_simple_example.py +++ b/examples/multi_agent/batched_grid_workflow/batched_grid_simple_example.py @@ -5,7 +5,16 @@ from swarms.structs.batched_grid_workflow import BatchedGridWorkflow agent = Agent( agent_name="ETF-Research-Agent", agent_description="Specialized agent for researching, analyzing, and recommending Exchange-Traded Funds (ETFs) across various sectors and markets.", - model_name="claude-sonnet-4-20250514", + model_name="groq/moonshotai/kimi-k2-instruct", + dynamic_temperature_enabled=True, + max_loops=1, + dynamic_context_window=True, +) + +agent_two = Agent( + agent_name="ETF-Research-Agent-2", + agent_description="Specialized agent for researching, analyzing, and recommending Exchange-Traded Funds (ETFs) across various sectors and markets.", + model_name="groq/moonshotai/kimi-k2-instruct", dynamic_temperature_enabled=True, max_loops=1, dynamic_context_window=True, @@ -13,7 +22,7 @@ agent = Agent( # Create workflow with default settings -workflow = BatchedGridWorkflow(agents=[agent, agent]) +workflow = BatchedGridWorkflow(agents=[agent, agent_two]) # Define simple tasks tasks = [ diff --git a/examples/multi_agent/batched_grid_workflow/batched_grid_swarm_router.py b/examples/multi_agent/batched_grid_workflow/batched_grid_swarm_router.py new file mode 100644 index 00000000..8c1766ff --- /dev/null +++ b/examples/multi_agent/batched_grid_workflow/batched_grid_swarm_router.py @@ -0,0 +1,41 @@ +from swarms import Agent +from swarms.structs.swarm_router import SwarmRouter + +# Initialize the ETF-focused agent +agent = Agent( + agent_name="ETF-Research-Agent", + agent_description="Specialized agent for researching, analyzing, and recommending Exchange-Traded Funds (ETFs) across various sectors and markets.", + model_name="groq/moonshotai/kimi-k2-instruct", + dynamic_temperature_enabled=True, + max_loops=1, + dynamic_context_window=True, +) + +agent_two = Agent( + agent_name="ETF-Research-Agent-2", + agent_description="Specialized agent for researching, analyzing, and recommending Exchange-Traded Funds (ETFs) across various sectors and markets.", + model_name="groq/moonshotai/kimi-k2-instruct", + dynamic_temperature_enabled=True, + max_loops=1, + dynamic_context_window=True, +) + + +# Create workflow with default settings +workflow = SwarmRouter( + agents=[agent, agent_two], + swarm_type="BatchedGridWorkflow", + output_type="dict", +) + +# Define simple tasks +tasks = [ + "What are the best GOLD ETFs?", + "What are the best american energy ETFs?", +] + +# Run the workflow +result = workflow.run(tasks=tasks) + + +print(result) diff --git a/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_basic_demo.py b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_basic_demo.py new file mode 100644 index 00000000..bf188a8e --- /dev/null +++ b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_basic_demo.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +""" +Basic Hierarchical Swarm Streaming Demo + +Minimal example showing the core streaming callback functionality. +""" + +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms.agents import Agent + + +def simple_callback(agent_name: str, chunk: str, is_final: bool): + """Simple callback that shows agent progress.""" + if chunk.strip(): + if is_final: + print(f"āœ… {agent_name} finished") + else: + print(f"šŸ”„ {agent_name}: {chunk}") + + +if __name__ == "__main__": + print("šŸŽÆ BASIC HIERARCHICAL SWARM STREAMING") + + # Create a simple agent + agent = Agent( + agent_name="Simple_Agent", + agent_description="A simple agent for demonstration", + system_prompt="You are a helpful assistant.", + model_name="gpt-4o-mini", + max_loops=1, + ) + + # Create swarm + swarm = HierarchicalSwarm( + name="Basic_Swarm", + description="Basic streaming demo", + agents=[agent], + max_loops=1, + director_model_name="gpt-4o-mini", + ) + + # Simple task + task = "Explain what artificial intelligence is in simple terms." + + print(f"Task: {task}") + print("\nExecuting with streaming callback:\n") + + # Run with streaming + result = swarm.run( + task=task, + streaming_callback=simple_callback + ) + + print("\n" + "="*30) + print("Final result:") + print(result) diff --git a/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_batch_demo.py b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_batch_demo.py new file mode 100644 index 00000000..0e4c71d4 --- /dev/null +++ b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_batch_demo.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +Hierarchical Swarm Batch Processing Demo + +This demo shows how to use streaming callbacks with batch processing +to handle multiple tasks sequentially with real-time feedback. +""" + +import time +from typing import Callable +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms.agents import Agent + + +def create_batch_callback() -> Callable[[str, str, bool], None]: + """Create a callback optimized for batch processing.""" + + def batch_callback(agent_name: str, chunk: str, is_final: bool): + timestamp = time.strftime("%H:%M:%S") + + if chunk.strip(): + if is_final: + print(f"\nāœ… [{timestamp}] {agent_name} COMPLETED") + else: + # Shorter output for batch processing + print(f"šŸ”„ {agent_name}: {chunk[:30]}..." if len(chunk) > 30 else f"šŸ”„ {agent_name}: {chunk}") + + return batch_callback + + +def create_agents(): + """Create specialized agents for the swarm.""" + return [ + Agent( + agent_name="Research_Agent", + agent_description="Specialized in gathering and analyzing information", + system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.", + model_name="gpt-4o-mini", + max_loops=1, + ), + Agent( + agent_name="Analysis_Agent", + agent_description="Expert at analyzing data and drawing insights", + system_prompt="You are an analysis expert. Break down complex information and provide clear insights.", + model_name="gpt-4o-mini", + max_loops=1, + ), + ] + + +if __name__ == "__main__": + print("šŸ“¦ HIERARCHICAL SWARM BATCH PROCESSING DEMO") + print("="*50) + + # Create agents and swarm + agents = create_agents() + swarm = HierarchicalSwarm( + name="Batch_Processing_Swarm", + description="Swarm for batch processing multiple tasks", + agents=agents, + max_loops=1, + verbose=False, # Reduce verbosity for cleaner batch output + director_model_name="gpt-4o-mini", + ) + + # Define multiple tasks + tasks = [ + "What are the environmental benefits of solar energy?", + "How does wind power contribute to sustainable development?", + "What are the economic advantages of hydroelectric power?" + ] + + print(f"Processing {len(tasks)} tasks:") + for i, task in enumerate(tasks, 1): + print(f" {i}. {task}") + print() + + # Create streaming callback + streaming_callback = create_batch_callback() + + print("šŸŽ¬ EXECUTING BATCH WITH STREAMING CALLBACKS...") + print("Each task will show real-time progress:\n") + + # Execute batch with streaming + results = swarm.batched_run( + tasks=tasks, + streaming_callback=streaming_callback, + ) + + print("\n" + "="*50) + print("šŸŽ‰ BATCH PROCESSING COMPLETED!") + print(f"Processed {len(results)} tasks") + + # Show summary + print("\nšŸ“Š SUMMARY:") + for i, result in enumerate(results, 1): + print(f" Task {i}: {'Completed' if result else 'Failed'}") diff --git a/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_comparison_demo.py b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_comparison_demo.py new file mode 100644 index 00000000..1a5980f7 --- /dev/null +++ b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_comparison_demo.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +""" +Hierarchical Swarm Comparison Demo + +This demo compares traditional swarm execution (without streaming) +versus streaming execution to show the difference in behavior. +""" + +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms.agents import Agent + + +def create_agents(): + """Create specialized agents for the swarm.""" + return [ + Agent( + agent_name="Research_Agent", + agent_description="Specialized in gathering and analyzing information", + system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.", + model_name="gpt-4o-mini", + max_loops=1, + ), + Agent( + agent_name="Analysis_Agent", + agent_description="Expert at analyzing data and drawing insights", + system_prompt="You are an analysis expert. Break down complex information and provide clear insights.", + model_name="gpt-4o-mini", + max_loops=1, + ), + Agent( + agent_name="Summary_Agent", + agent_description="Skilled at creating concise summaries", + system_prompt="You are a summarization expert. Create clear, concise summaries of complex topics.", + model_name="gpt-4o-mini", + max_loops=1, + ), + ] + + +def run_traditional_swarm(): + """Run swarm without streaming callbacks.""" + print("šŸ”‡ TRADITIONAL SWARM EXECUTION (No Streaming)") + print("-" * 50) + + agents = create_agents() + swarm = HierarchicalSwarm( + name="Traditional_Swarm", + description="Traditional swarm execution", + agents=agents, + max_loops=1, + verbose=False, + director_model_name="gpt-4o-mini", + ) + + task = "What are the main benefits of renewable energy sources?" + + print(f"Task: {task}") + + result = swarm.run(task=task) + + print("\nResult:") + if isinstance(result, dict): + for key, value in result.items(): + print(f"{key}: {value[:200]}..." if len(str(value)) > 200 else f"{key}: {value}") + else: + print(result[:500] + "..." if len(str(result)) > 500 else result) + + +def run_streaming_swarm(): + """Run swarm with streaming callbacks.""" + import time + from typing import Callable + + def simple_callback(agent_name: str, chunk: str, is_final: bool): + if chunk.strip(): + if is_final: + print(f"\nāœ… {agent_name} completed") + else: + print(f"šŸ”„ {agent_name}: {chunk[:50]}..." if len(chunk) > 50 else f"šŸ”„ {agent_name}: {chunk}") + + print("\nšŸŽÆ STREAMING SWARM EXECUTION") + print("-" * 50) + + agents = create_agents() + swarm = HierarchicalSwarm( + name="Streaming_Swarm", + description="Swarm with streaming callbacks", + agents=agents, + max_loops=1, + verbose=False, + director_model_name="gpt-4o-mini", + ) + + task = "What are the main benefits of renewable energy sources?" + + print(f"Task: {task}") + + result = swarm.run( + task=task, + streaming_callback=simple_callback + ) + + print("\nResult:") + if isinstance(result, dict): + for key, value in result.items(): + print(f"{key}: {value[:200]}..." if len(str(value)) > 200 else f"{key}: {value}") + else: + print(result[:500] + "..." if len(str(result)) > 500 else result) + + +if __name__ == "__main__": + print("šŸ”„ HIERARCHICAL SWARM COMPARISON DEMO") + print("="*50) + print("Comparing traditional vs streaming execution\n") + + # Run traditional first + run_traditional_swarm() + + # Run streaming second + run_streaming_swarm() + + print("\n" + "="*50) + print("✨ Comparison complete!") + print("Notice how streaming shows progress in real-time") diff --git a/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_streaming_example.py b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_streaming_example.py new file mode 100644 index 00000000..41c23da0 --- /dev/null +++ b/examples/multi_agent/hiearchical_swarm/hierarchical_swarm_streaming_example.py @@ -0,0 +1,232 @@ +""" +Hierarchical Swarm Live Paragraph Streaming Example + +This example demonstrates how to use the streaming callback feature +in the HierarchicalSwarm to see live paragraph formation during agent execution. + +The streaming callback allows you to: +- Watch paragraphs build in real-time as tokens accumulate +- See the complete text forming word by word +- Track multiple agents working simultaneously +- View completed paragraphs with timestamps +- Monitor the entire generation process live +""" + +import time + +from swarms.structs.agent import Agent +from swarms.structs.hiearchical_swarm import HierarchicalSwarm + + +def streaming_callback(agent_name: str, chunk: str, is_final: bool): + """ + Example streaming callback function that shows live paragraph formation. + + This function is called whenever an agent produces output during streaming. + It shows tokens accumulating in real-time to form complete paragraphs. + + Args: + agent_name (str): The name of the agent producing the output + chunk (str): The chunk of output (empty string if is_final=True) + is_final (bool): True when the agent has completed its task + """ + timestamp = time.strftime("%H:%M:%S") + + # Store accumulated text for each agent to track paragraph formation + if not hasattr(streaming_callback, 'agent_buffers'): + streaming_callback.agent_buffers = {} + streaming_callback.paragraph_count = {} + + # Initialize buffers for new agents + if agent_name not in streaming_callback.agent_buffers: + streaming_callback.agent_buffers[agent_name] = "" + streaming_callback.paragraph_count[agent_name] = 1 + print(f"\nšŸŽ¬ [{timestamp}] {agent_name} starting...") + print("="*60) + + if chunk.strip(): + # Split chunk into tokens (words/punctuation) + tokens = chunk.replace('\n', ' \n ').split() + + for token in tokens: + # Handle paragraph breaks + if token == '\n': + if streaming_callback.agent_buffers[agent_name].strip(): + print(f"\nšŸ“„ [{timestamp}] {agent_name} - Paragraph {streaming_callback.paragraph_count[agent_name]} Complete:") + print(f"{streaming_callback.agent_buffers[agent_name].strip()}") + print("="*60) + streaming_callback.paragraph_count[agent_name] += 1 + streaming_callback.agent_buffers[agent_name] = "" + else: + # Add token to buffer and show live accumulation + streaming_callback.agent_buffers[agent_name] += token + " " + + # Clear line and show current paragraph + print(f"\r[{timestamp}] {agent_name} | {streaming_callback.agent_buffers[agent_name].strip()}", end="", flush=True) + + if is_final: + print() # New line after live updates + # Print any remaining content as final paragraph + if streaming_callback.agent_buffers[agent_name].strip(): + print(f"\nāœ… [{timestamp}] {agent_name} COMPLETED - Final Paragraph:") + print(f"{streaming_callback.agent_buffers[agent_name].strip()}") + print() + + print(f"šŸŽÆ [{timestamp}] {agent_name} finished processing") + print(f"šŸ“Š Total paragraphs processed: {streaming_callback.paragraph_count[agent_name] - 1}") + print("="*60) + + +def create_sample_agents(): + """Create sample agents for the hierarchical swarm.""" + # Marketing Strategist Agent + marketing_agent = Agent( + agent_name="MarketingStrategist", + agent_description="Expert in marketing strategy and campaign planning", + system_prompt="You are a marketing strategist. Provide creative and effective marketing strategies.", + model_name="gpt-4o-mini", + max_loops=1, + ) + + # Content Creator Agent + content_agent = Agent( + agent_name="ContentCreator", + agent_description="Expert in creating engaging content", + system_prompt="You are a content creator. Create engaging, well-written content for various platforms.", + model_name="gpt-4o-mini", + max_loops=1, + ) + + # Data Analyst Agent + analyst_agent = Agent( + agent_name="DataAnalyst", + agent_description="Expert in data analysis and insights", + system_prompt="You are a data analyst. Provide detailed analysis and insights from data.", + model_name="gpt-4o-mini", + max_loops=1, + ) + + return [marketing_agent, content_agent, analyst_agent] + + +def main(): + """Main function demonstrating hierarchical swarm with streaming.""" + print("šŸš€ Hierarchical Swarm Streaming Example") + print("=" * 60) + + # Create agents + agents = create_sample_agents() + + # Create hierarchical swarm + swarm = HierarchicalSwarm( + name="MarketingCampaignSwarm", + description="A swarm for planning and executing marketing campaigns", + agents=agents, + director_model_name="gpt-4o-mini", + max_loops=1, + verbose=True, + ) + + # Define the task + task = """ + Plan and execute a comprehensive marketing campaign for a new tech startup called 'CodeFlow' + that develops AI-powered code generation tools. The campaign should include: + + 1. Target audience analysis + 2. Content strategy development + 3. Social media campaign plan + 4. Performance metrics and KPIs + + Create a detailed campaign plan with specific tactics, timelines, and budget considerations. + """ + + print(f"šŸ“‹ Task: {task.strip()}") + print("\nšŸŽÆ Starting hierarchical swarm with live paragraph streaming...") + print("Watch as agents build complete paragraphs in real-time!\n") + print("Each token accumulates to form readable text, showing the full paragraph as it builds.\n") + + # Run the swarm with streaming callback + try: + result = swarm.run( + task=task, + streaming_callback=streaming_callback + ) + + print("\nšŸŽ‰ Swarm execution completed!") + print("\nšŸ“Š Final Results:") + print("-" * 30) + print(result) + + except Exception as e: + print(f"āŒ Error running swarm: {str(e)}") + + +def simple_callback_example(): + """Simpler example with token-by-token streaming.""" + print("\nšŸ”§ Simple Token-by-Token Callback Example") + print("=" * 50) + + def simple_callback(agent_name: str, chunk: str, is_final: bool): + """Simple callback that shows live paragraph formation.""" + if not hasattr(simple_callback, 'buffer'): + simple_callback.buffer = {} + simple_callback.token_count = {} + + if agent_name not in simple_callback.buffer: + simple_callback.buffer[agent_name] = "" + simple_callback.token_count[agent_name] = 0 + + if chunk.strip(): + tokens = chunk.replace('\n', ' \n ').split() + for token in tokens: + if token.strip(): + simple_callback.token_count[agent_name] += 1 + simple_callback.buffer[agent_name] += token + " " + # Show live accumulation + print(f"\r{agent_name} | {simple_callback.buffer[agent_name].strip()}", end="", flush=True) + + if is_final: + print() # New line after live updates + print(f"āœ“ {agent_name} finished! Total tokens: {simple_callback.token_count[agent_name]}") + print(f"Final text: {simple_callback.buffer[agent_name].strip()}") + print("-" * 40) + + # Create simple agents + agents = [ + Agent( + agent_name="Researcher", + agent_description="Research specialist", + system_prompt="You are a researcher. Provide thorough research on given topics.", + model_name="gpt-4o-mini", + max_loops=1, + ), + Agent( + agent_name="Writer", + agent_description="Content writer", + system_prompt="You are a writer. Create clear, concise content.", + model_name="gpt-4o-mini", + max_loops=1, + ), + ] + + swarm = HierarchicalSwarm( + name="SimpleSwarm", + description="Simple swarm example", + agents=agents, + director_model_name="gpt-4o-mini", + max_loops=1, + ) + + task = "Research the benefits of renewable energy and write a summary article." + + print(f"Task: {task}") + result = swarm.run(task=task, streaming_callback=simple_callback) + print(f"\nResult: {result}") + + +if __name__ == "__main__": + # Run the main streaming example + main() + + # Uncomment to run the simple example + # simple_callback_example() diff --git a/hierarchical_swarm_streaming_demo.py b/hierarchical_swarm_streaming_demo.py new file mode 100644 index 00000000..78b0b6cb --- /dev/null +++ b/hierarchical_swarm_streaming_demo.py @@ -0,0 +1,135 @@ +import time +from typing import Callable +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms import Agent + + +def create_streaming_callback() -> Callable[[str, str, bool], None]: + """Create a streaming callback that shows live paragraph formation.""" + + # Store accumulated text for each agent to track paragraph formation + agent_buffers = {} + paragraph_count = {} + + def streaming_callback(agent_name: str, chunk: str, is_final: bool): + timestamp = time.strftime("%H:%M:%S") + + # Initialize buffers for new agents + if agent_name not in agent_buffers: + agent_buffers[agent_name] = "" + paragraph_count[agent_name] = 1 + print(f"\nšŸŽ¬ [{timestamp}] {agent_name} starting...") + print("="*60) + + if chunk.strip(): + # Split chunk into tokens (words/punctuation) + tokens = chunk.replace('\n', ' \n ').split() + + for token in tokens: + # Handle paragraph breaks + if token == '\n': + if agent_buffers[agent_name].strip(): + print(f"\nšŸ“„ [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:") + print(f"{agent_buffers[agent_name].strip()}") + print("="*60) + paragraph_count[agent_name] += 1 + agent_buffers[agent_name] = "" + else: + # Add token to buffer and show live accumulation + agent_buffers[agent_name] += token + " " + + # Clear line and show current paragraph + print(f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}", end="", flush=True) + + if is_final: + print() # New line after live updates + # Print any remaining content as final paragraph + if agent_buffers[agent_name].strip(): + print(f"\nāœ… [{timestamp}] {agent_name} COMPLETED - Final Paragraph:") + print(f"{agent_buffers[agent_name].strip()}") + print() + + print(f"šŸŽÆ [{timestamp}] {agent_name} finished processing") + print(f"šŸ“Š Total paragraphs processed: {paragraph_count[agent_name] - 1}") + print("="*60) + + return streaming_callback + + +def create_agents(): + """Create specialized agents for the swarm.""" + return [ + Agent( + agent_name="Research_Agent", + agent_description="Specialized in gathering and analyzing information", + system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + Agent( + agent_name="Analysis_Agent", + agent_description="Expert at analyzing data and drawing insights", + system_prompt="You are an analysis expert. Break down complex information and provide clear insights.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + Agent( + agent_name="Summary_Agent", + agent_description="Skilled at creating concise summaries", + system_prompt="You are a summarization expert. Create clear, concise summaries of complex topics.", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ), + ] + + +if __name__ == "__main__": + print("šŸŽÆ HIERARCHICAL SWARM STREAMING DEMO") + print("="*50) + + # Create agents and swarm + agents = create_agents() + swarm = HierarchicalSwarm( + name="Research_and_Analysis_Swarm", + description="A swarm that researches topics, analyzes information, and creates summaries", + agents=agents, + max_loops=1, + verbose=True, + director_model_name="gpt-4o-mini", + ) + + # Define task + task = """ + Research the impact of artificial intelligence on the job market in 2024. + Analyze how different industries are being affected and provide insights + on future trends. Create a comprehensive summary of your findings. + """ + + print(f"Task: {task.strip()}") + + # Create streaming callback + streaming_callback = create_streaming_callback() + + print("\nšŸŽ¬ EXECUTING WITH STREAMING CALLBACKS...") + print("Watch real-time agent outputs below:\n") + + # Execute with streaming + result = swarm.run( + task=task, + streaming_callback=streaming_callback, + ) + + print("\nšŸŽ‰ EXECUTION COMPLETED!") + print("\nšŸ“Š FINAL RESULT:") + print("-" * 50) + + # Display final result + if isinstance(result, dict): + for key, value in result.items(): + print(f"\n{key}:") + print(f"{value}") + else: + print(result) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index a546c070..fc05d99c 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -6,6 +6,7 @@ from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm from swarms.structs.batch_agent_execution import batch_agent_execution +from swarms.structs.batched_grid_workflow import BatchedGridWorkflow from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.conversation import Conversation from swarms.structs.council_as_judge import CouncilAsAJudge @@ -100,7 +101,6 @@ from swarms.structs.swarming_architectures import ( staircase_swarm, star_swarm, ) -from swarms.structs.batched_grid_workflow import BatchedGridWorkflow __all__ = [ "Agent", diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py index 5e840d03..abf5cfdf 100644 --- a/swarms/structs/hiearchical_swarm.py +++ b/swarms/structs/hiearchical_swarm.py @@ -743,7 +743,11 @@ class HierarchicalSwarm: self.multi_agent_prompt_improvements = ( multi_agent_prompt_improvements ) + + self.reliability_checks() + + def reliability_checks(self): if self.interactive: self.agents_no_print() @@ -763,7 +767,7 @@ class HierarchicalSwarm: ) self.init_swarm() - + def list_worker_agents(self) -> str: return list_all_agents( agents=self.agents, @@ -794,6 +798,7 @@ class HierarchicalSwarm: Returns: str: The reasoning output from the agent """ + agent = Agent( agent_name=self.director_name, agent_description=f"You're the {self.director_name} agent that is responsible for reasoning about the task and creating a plan for the swarm to accomplish the task.", @@ -1039,7 +1044,9 @@ class HierarchicalSwarm: logger.error(error_msg) raise e - def step(self, task: str, img: str = None, *args, **kwargs): + def step(self, task: str, img: str = None, streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs): """ Execute a single step of the hierarchical swarm workflow. @@ -1052,6 +1059,9 @@ class HierarchicalSwarm: Args: task (str): The task to be processed in this step. img (str, optional): Optional image input for the task. + streaming_callback (Callable[[str, str, bool], None], optional): + Callback function for streaming agent outputs. Parameters are + (agent_name, chunk, is_final) where is_final indicates completion. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -1096,7 +1106,7 @@ class HierarchicalSwarm: self.dashboard.update_director_status("EXECUTING") # Execute the orders - outputs = self.execute_orders(orders) + outputs = self.execute_orders(orders, streaming_callback=streaming_callback) if self.verbose: logger.info(f"[EXEC] Executed {len(outputs)} orders") @@ -1121,6 +1131,9 @@ class HierarchicalSwarm: self, task: Optional[str] = None, img: Optional[str] = None, + streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs, ): @@ -1138,6 +1151,9 @@ class HierarchicalSwarm: task (str, optional): The initial task to be processed by the swarm. If None and interactive mode is enabled, will prompt for input. img (str, optional): Optional image input for the agents. + streaming_callback (Callable[[str, str, bool], None], optional): + Callback function for streaming agent outputs. Parameters are + (agent_name, chunk, is_final) where is_final indicates completion. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -1197,7 +1213,7 @@ class HierarchicalSwarm: # Execute one step of the swarm try: last_output = self.step( - task=loop_task, img=img, *args, **kwargs + task=loop_task, img=img, streaming_callback=streaming_callback, *args, **kwargs ) if self.verbose: @@ -1318,7 +1334,9 @@ class HierarchicalSwarm: logger.error(error_msg) def call_single_agent( - self, agent_name: str, task: str, *args, **kwargs + self, agent_name: str, task: str, streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs ): """ Call a single agent by name to execute a specific task. @@ -1330,6 +1348,9 @@ class HierarchicalSwarm: Args: agent_name (str): The name of the agent to call. task (str): The task to be executed by the agent. + streaming_callback (Callable[[str, str, bool], None], optional): + Callback function for streaming agent outputs. Parameters are + (agent_name, chunk, is_final) where is_final indicates completion. *args: Additional positional arguments for the agent. **kwargs: Additional keyword arguments for the agent. @@ -1370,11 +1391,40 @@ class HierarchicalSwarm: agent_name, "RUNNING", task, "Executing task..." ) - output = agent.run( - task=f"History: {self.conversation.get_str()} \n\n Task: {task}", - *args, - **kwargs, - ) + # Handle streaming callback if provided + if streaming_callback is not None: + def agent_streaming_callback(chunk: str): + """Wrapper for agent streaming callback.""" + try: + if chunk is not None and chunk.strip(): + streaming_callback(agent_name, chunk, False) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}" + ) + + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + streaming_callback=agent_streaming_callback, + *args, + **kwargs, + ) + + # Call completion callback + try: + streaming_callback(agent_name, "", True) + except Exception as callback_error: + if self.verbose: + logger.warning( + f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}" + ) + else: + output = agent.run( + task=f"History: {self.conversation.get_str()} \n\n Task: {task}", + *args, + **kwargs, + ) self.conversation.add(role=agent_name, content=output) if self.verbose: @@ -1539,7 +1589,9 @@ class HierarchicalSwarm: logger.error(error_msg) raise e - def execute_orders(self, orders: list): + def execute_orders(self, orders: list, streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None): """ Execute all orders from the director's output. @@ -1549,6 +1601,9 @@ class HierarchicalSwarm: Args: orders (list): List of HierarchicalOrder objects to execute. + streaming_callback (Callable[[str, str, bool], None], optional): + Callback function for streaming agent outputs. Parameters are + (agent_name, chunk, is_final) where is_final indicates completion. Returns: list: List of outputs from all executed orders. @@ -1577,7 +1632,7 @@ class HierarchicalSwarm: ) output = self.call_single_agent( - order.agent_name, order.task + order.agent_name, order.task, streaming_callback=streaming_callback ) # Update dashboard with completed status @@ -1606,7 +1661,9 @@ class HierarchicalSwarm: logger.error(error_msg) def batched_run( - self, tasks: List[str], img: str = None, *args, **kwargs + self, tasks: List[str], img: str = None, streaming_callback: Optional[ + Callable[[str, str, bool], None] + ] = None, *args, **kwargs ): """ Execute the hierarchical swarm for multiple tasks in sequence. @@ -1618,6 +1675,9 @@ class HierarchicalSwarm: Args: tasks (List[str]): List of tasks to be processed by the swarm. img (str, optional): Optional image input for the tasks. + streaming_callback (Callable[[str, str, bool], None], optional): + Callback function for streaming agent outputs. Parameters are + (agent_name, chunk, is_final) where is_final indicates completion. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -1641,7 +1701,7 @@ class HierarchicalSwarm: # Process each task in parallel for task in tasks: - result = self.run(task, img, *args, **kwargs) + result = self.run(task, img, streaming_callback=streaming_callback, *args, **kwargs) results.append(result) if self.verbose: diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index d3229b0a..e57a5f32 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -659,10 +659,9 @@ class SwarmRouter: def _run( self, - task: str, + task: Optional[str] = None, + tasks: Optional[List[str]] = None, img: Optional[str] = None, - model_response: Optional[str] = None, - imgs: Optional[List[str]] = None, *args, **kwargs, ) -> Any: @@ -690,8 +689,21 @@ class SwarmRouter: enabled_on=self.telemetry_enabled, ) + args = {} + + if tasks is not None: + args["tasks"] = tasks + else: + args["task"] = task + + if img is not None: + args["img"] = img + try: - result = self.swarm.run(task=task, *args, **kwargs) + if self.swarm_type == "BatchedGridWorkflow": + result = self.swarm.run(**args, **kwargs) + else: + result = self.swarm.run(**args, **kwargs) log_execution( swarm_id=self.id, @@ -718,10 +730,9 @@ class SwarmRouter: def run( self, - task: str, + task: Optional[str] = None, img: Optional[str] = None, - imgs: Optional[List[str]] = None, - model_response: Optional[str] = None, + tasks: Optional[List[str]] = None, *args, **kwargs, ) -> Any: @@ -746,8 +757,7 @@ class SwarmRouter: return self._run( task=task, img=img, - imgs=imgs, - model_response=model_response, + tasks=tasks, *args, **kwargs, ) diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index 16e8a719..e3f0a73d 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -16,6 +16,7 @@ from rich.spinner import Spinner from rich.markdown import Markdown + # Global Live display for the dashboard dashboard_live = None @@ -721,4 +722,4 @@ class Formatter: # Global formatter instance with markdown output enabled by default -formatter = Formatter(md=True) +formatter = Formatter(md=False)