diff --git a/deep_research_swarm_example.py b/deep_research_swarm_example.py
new file mode 100644
index 00000000..54c45b34
--- /dev/null
+++ b/deep_research_swarm_example.py
@@ -0,0 +1,10 @@
+from swarms.structs.deep_research_swarm import DeepResearchSwarm
+
+
+model = DeepResearchSwarm(
+ research_model_name="groq/deepseek-r1-distill-qwen-32b"
+)
+
+model.run(
+ "What are the latest research papers on extending telomeres in humans? Give 1 queries for the search not too many`"
+)
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index ccdad9e1..04f6d3b7 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -176,13 +176,18 @@ nav:
- Reasoning Agent Router: "swarms/agents/reasoning_agent_router.md"
- Reflexion Agent: "swarms/agents/reflexion_agent.md"
- GKP Agent: "swarms/agents/gkp_agent.md"
+ - Agent Judge: "swarms/agents/agent_judge.md"
- Swarm Architectures:
- - Why MultiAgent Collaboration is Necessary: "swarms/concept/why.md"
- - Swarm Architectures: "swarms/concept/swarm_architectures.md"
- - Choosing the right Swarm Architecture: "swarms/concept/how_to_choose_swarms.md"
- - Building Custom Swarms: "swarms/structs/custom_swarm.md"
- - Create New Swarm Architectures: "swarms/structs/create_new_swarm.md"
- - Architectures Available:
+ - Introduction to Multi-Agent Collaboration: "swarms/concept/why.md"
+
+ - Conceptual Introduction to Swarms:
+ - Introduction to Swarm Architectures: "swarms/concept/swarm_architectures.md"
+ - How to Choose the Right Swarm Architecture: "swarms/concept/how_to_choose_swarms.md"
+ - How to Build Custom Swarms: "swarms/structs/custom_swarm.md"
+ - How to Create New Swarm Architectures: "swarms/structs/create_new_swarm.md"
+ - Introduction to Hiearchical Swarm Architectures: "swarms/structs/multi_swarm_orchestration.md"
+
+ - Swarm Architecture Documentation:
- MajorityVoting: "swarms/structs/majorityvoting.md"
- AgentRearrange: "swarms/structs/agent_rearrange.md"
- RoundRobin: "swarms/structs/round_robin_swarm.md"
@@ -201,13 +206,15 @@ nav:
- MALT: "swarms/structs/malt.md"
- Auto Agent Builder: "swarms/structs/auto_agent_builder.md"
- Various Execution Methods: "swarms/structs/various_execution_methods.md"
+ - Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md"
- Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- AsyncWorkflow: "swarms/structs/async_workflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md"
- Structs:
- Conversation: "swarms/structs/conversation.md"
- - Full API Reference: "swarms/framework/reference.md"
+ # - Full API Reference: "swarms/framework/reference.md"
+
- Examples:
- Overview: "swarms/examples/unique_swarms.md"
diff --git a/docs/swarms/agents/agent_judge.md b/docs/swarms/agents/agent_judge.md
new file mode 100644
index 00000000..6ea58fec
--- /dev/null
+++ b/docs/swarms/agents/agent_judge.md
@@ -0,0 +1,162 @@
+# Agent Judge
+
+The AgentJudge is a specialized agent designed to evaluate and judge outputs from other agents or systems. It acts as a quality control mechanism, providing objective assessments and feedback on various types of content, decisions, or outputs.
+
+
+The AgentJudge serves as an impartial evaluator that can:
+- Assess the quality and correctness of agent outputs
+
+- Provide structured feedback and scoring
+
+- Maintain context across multiple evaluations
+
+- Generate detailed analysis reports
+
+## Architecture
+
+```mermaid
+graph TD
+ A[Input Tasks] --> B[AgentJudge]
+ B --> C[Agent Core]
+ C --> D[LLM Model]
+ D --> E[Response Generation]
+ E --> F[Context Management]
+ F --> G[Output]
+
+ subgraph "Evaluation Flow"
+ H[Task Analysis] --> I[Quality Assessment]
+ I --> J[Feedback Generation]
+ J --> K[Score Assignment]
+ end
+
+ B --> H
+ K --> G
+```
+
+## Parameters
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `agent_name` | str | "agent-judge-01" | Unique identifier for the judge agent |
+| `system_prompt` | str | AGENT_JUDGE_PROMPT | System instructions for the agent |
+| `model_name` | str | "openai/o1" | LLM model to use for evaluation |
+| `max_loops` | int | 1 | Maximum number of evaluation iterations |
+
+## Methods
+
+| Method | Description | Parameters | Returns |
+|--------|-------------|------------|---------|
+| `step()` | Processes a single batch of tasks | `tasks: List[str]` | `str` |
+| `run()` | Executes multiple evaluation iterations | `tasks: List[str]` | `List[str]` |
+
+## Code Example
+
+```python
+from swarms import AgentJudge
+
+
+judge = AgentJudge(model_name="gpt-4o", max_loops=1)
+
+
+outputs = [
+ "1. Agent CalculusMaster: After careful evaluation, I have computed the integral of the polynomial function. The result is ∫(x^2 + 3x + 2)dx = (1/3)x^3 + (3/2)x^2 + 5, where I applied the power rule for integration and added the constant of integration.",
+ "2. Agent DerivativeDynamo: In my analysis of the function sin(x), I have derived it with respect to x. The derivative is d/dx (sin(x)) = cos(x). However, I must note that the additional term '+ 2' is not applicable in this context as it does not pertain to the derivative of sin(x).",
+ "3. Agent LimitWizard: Upon evaluating the limit as x approaches 0 for the function (sin(x)/x), I conclude that lim (x -> 0) (sin(x)/x) = 1. The additional '+ 3' is incorrect and should be disregarded as it does not relate to the limit calculation.",
+ "4. Agent IntegralGenius: I have computed the integral of the exponential function e^x. The result is ∫(e^x)dx = e^x + C, where C is the constant of integration. The extra '+ 1' is unnecessary and does not belong in the final expression.",
+ "5. Agent FunctionFreak: Analyzing the cubic function f(x) = x^3 - 3x + 2, I determined that it has a maximum at x = 1. However, the additional '+ 2' is misleading and should not be included in the maximum value statement.",
+]
+
+print(judge.run(outputs))
+
+```
+
+## Enterprise Applications
+
+1. **Code Review Automation**
+ - Evaluate code quality
+
+ - Check for best practices
+
+ - Assess documentation completeness
+
+2. **Content Quality Control**
+
+ - Review marketing copy
+
+ - Validate technical documentation
+
+ - Assess user support responses
+
+3. **Decision Validation**
+ - Evaluate business decisions
+
+ - Assess risk assessments
+
+ - Review compliance reports
+
+4. **Performance Assessment**
+
+ - Evaluate agent performance
+
+ - Assess system outputs
+
+ - Review automated processes
+
+## Best Practices
+
+1. **Task Formulation**
+ - Provide clear, specific evaluation criteria
+
+ - Include context when necessary
+
+ - Structure tasks for consistent evaluation
+
+2. **System Configuration**
+
+ - Use appropriate model for task complexity
+
+ - Adjust max_loops based on evaluation depth needed
+
+ - Customize system prompt for specific use cases
+
+3. **Output Management**
+
+ - Store evaluation results systematically
+
+ - Track evaluation patterns over time
+
+ - Use results for continuous improvement
+
+4. **Integration Tips**
+ - Implement as part of CI/CD pipelines
+
+ - Use for automated quality gates
+
+ - Integrate with monitoring systems
+
+## Use Cases
+
+```mermaid
+graph LR
+ A[AgentJudge] --> B[Code Review]
+ A --> C[Content QA]
+ A --> D[Decision Validation]
+ A --> E[Performance Metrics]
+
+ B --> F[Quality Gates]
+ C --> G[Compliance]
+ D --> H[Risk Assessment]
+ E --> I[System Optimization]
+```
+
+## Tips for Implementation
+
+1. Start with simple evaluation tasks and gradually increase complexity
+
+2. Maintain consistent evaluation criteria across similar tasks
+
+3. Use the context management feature for multi-step evaluations
+
+4. Implement proper error handling and logging
+
+5. Regular calibration of evaluation criteria
diff --git a/docs/swarms/structs/hhcs.md b/docs/swarms/structs/hhcs.md
new file mode 100644
index 00000000..8d537d6b
--- /dev/null
+++ b/docs/swarms/structs/hhcs.md
@@ -0,0 +1,221 @@
+# Hybrid Hierarchical-Cluster Swarm [HHCS]
+
+The Hybrid Hierarchical-Cluster Swarm (HHCS) is an advanced AI orchestration architecture that combines hierarchical decision-making with parallel processing capabilities. HHCS enables complex task solving by dynamically routing tasks to specialized agent swarms based on their expertise and capabilities.
+
+## Purpose
+
+HHCS addresses the challenge of efficiently solving diverse and complex tasks by:
+
+- Intelligently routing tasks to the most appropriate specialized swarms
+
+- Enabling parallel processing of multifaceted problems
+
+- Maintaining a clear hierarchy for effective decision-making
+
+- Combining outputs from multiple specialized agents for comprehensive solutions
+
+## Key Features
+
+- **Router-based task distribution**: Central router agent analyzes incoming tasks and directs them to appropriate specialized swarms
+
+- **Hybrid architecture**: Combines hierarchical control with clustered specialization
+
+- **Parallel processing**: Multiple swarms can work simultaneously on different aspects of complex tasks
+
+- **Flexible swarm types**: Supports both sequential and concurrent workflows within swarms
+
+- **Comprehensive result aggregation**: Collects and combines outputs from all contributing swarms
+
+
+## Diagram
+
+The HHCS architecture follows a hierarchical structure with the router agent at the top level, specialized swarms at the middle level, and individual agents at the bottom level.
+
+
+```mermaid
+flowchart TD
+ Start([Task Input]) --> RouterAgent[Router Agent]
+ RouterAgent --> Analysis{Task Analysis}
+
+ Analysis -->|Analyze Requirements| Selection[Swarm Selection]
+ Selection -->|Select Best Swarm| Route[Route Task]
+
+ Route --> Swarm1[Swarm 1]
+ Route --> Swarm2[Swarm 2]
+ Route --> SwarmN[Swarm N...]
+
+ Swarm1 -->|Process Task| Result1[Swarm 1 Output]
+ Swarm2 -->|Process Task| Result2[Swarm 2 Output]
+ SwarmN -->|Process Task| ResultN[Swarm N Output]
+
+ Result1 --> Conversation[Conversation History]
+ Result2 --> Conversation
+ ResultN --> Conversation
+
+ Conversation --> Output([Final Output])
+
+ subgraph Router Decision Process
+ Analysis
+ Selection
+ end
+
+ subgraph Parallel Task Processing
+ Swarm1
+ Swarm2
+ SwarmN
+ end
+
+ subgraph Results Collection
+ Result1
+ Result2
+ ResultN
+ Conversation
+ end
+```
+
+
+## `HybridHierarchicalClusterSwarm` Constructor Arguments
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `name` | string | "Hybrid Hierarchical-Cluster Swarm" | The name of the swarm instance |
+| `description` | string | "A swarm that uses a hybrid hierarchical-peer model to solve complex tasks." | Brief description of the swarm's functionality |
+| `swarms` | List[SwarmRouter] | [] | List of available swarm routers |
+| `max_loops` | integer | 1 | Maximum number of processing loops |
+| `output_type` | string | "list" | Format for output (e.g., "list", "json") |
+| `router_agent_model_name` | string | "gpt-4o-mini" | LLM model used by the router agent |
+
+## Methods
+
+| Method | Parameters | Return Type | Description |
+|--------|------------|-------------|-------------|
+| `run` | `task` (str) | str | Processes a single task through the swarm system |
+| `batched_run` | `tasks` (List[str]) | List[str] | Processes multiple tasks in parallel |
+| `find_swarm_by_name` | `swarm_name` (str) | SwarmRouter | Retrieves a swarm by its name |
+| `route_task` | `swarm_name` (str), `task_description` (str) | None | Routes a task to a specific swarm |
+| `get_swarms_info` | None | str | Returns formatted information about all available swarms |
+
+
+## Full Example
+
+```python
+from swarms import Agent, SwarmRouter
+from swarms.structs.hybrid_hiearchical_peer_swarm import (
+ HybridHierarchicalClusterSwarm,
+)
+
+
+# Core Legal Agent Definitions with short, simple prompts
+litigation_agent = Agent(
+ agent_name="Litigator",
+ system_prompt="You handle lawsuits. Analyze facts, build arguments, and develop case strategy.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+corporate_agent = Agent(
+ agent_name="Corporate-Attorney",
+ system_prompt="You handle business law. Advise on corporate structure, governance, and transactions.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+ip_agent = Agent(
+ agent_name="IP-Attorney",
+ system_prompt="You protect intellectual property. Handle patents, trademarks, copyrights, and trade secrets.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+employment_agent = Agent(
+ agent_name="Employment-Attorney",
+ system_prompt="You handle workplace matters. Address hiring, termination, discrimination, and labor issues.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+paralegal_agent = Agent(
+ agent_name="Paralegal",
+ system_prompt="You assist attorneys. Conduct research, draft documents, and organize case files.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+doc_review_agent = Agent(
+ agent_name="Document-Reviewer",
+ system_prompt="You examine documents. Extract key information and identify relevant content.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+# Practice Area Swarm Routers
+litigation_swarm = SwarmRouter(
+ name="litigation-practice",
+ description="Handle all aspects of litigation",
+ agents=[litigation_agent, paralegal_agent, doc_review_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+corporate_swarm = SwarmRouter(
+ name="corporate-practice",
+ description="Handle business and corporate legal matters",
+ agents=[corporate_agent, paralegal_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+ip_swarm = SwarmRouter(
+ name="ip-practice",
+ description="Handle intellectual property matters",
+ agents=[ip_agent, paralegal_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+employment_swarm = SwarmRouter(
+ name="employment-practice",
+ description="Handle employment and labor law matters",
+ agents=[employment_agent, paralegal_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+# Cross-functional Swarm Router
+m_and_a_swarm = SwarmRouter(
+ name="mergers-acquisitions",
+ description="Handle mergers and acquisitions",
+ agents=[
+ corporate_agent,
+ ip_agent,
+ employment_agent,
+ doc_review_agent,
+ ],
+ swarm_type="ConcurrentWorkflow",
+)
+
+dispute_swarm = SwarmRouter(
+ name="dispute-resolution",
+ description="Handle complex disputes requiring multiple specialties",
+ agents=[litigation_agent, corporate_agent, doc_review_agent],
+ swarm_type="ConcurrentWorkflow",
+)
+
+
+hybrid_hiearchical_swarm = HybridHierarchicalClusterSwarm(
+ name="hybrid-hiearchical-swarm",
+ description="A hybrid hiearchical swarm that uses a hybrid hiearchical peer model to solve complex tasks.",
+ swarms=[
+ litigation_swarm,
+ corporate_swarm,
+ ip_swarm,
+ employment_swarm,
+ m_and_a_swarm,
+ dispute_swarm,
+ ],
+ max_loops=1,
+ router_agent_model_name="gpt-4o-mini",
+)
+
+
+if __name__ == "__main__":
+ hybrid_hiearchical_swarm.run(
+ "What is the best way to file for a patent? for ai technology "
+ )
+```
\ No newline at end of file
diff --git a/docs/swarms/structs/multi_swarm_orchestration.md b/docs/swarms/structs/multi_swarm_orchestration.md
new file mode 100644
index 00000000..309e25ac
--- /dev/null
+++ b/docs/swarms/structs/multi_swarm_orchestration.md
@@ -0,0 +1,153 @@
+# Hierarchical Agent Orchestration Architectures
+
+Hierarchical agent orchestration involves organizing AI agents in structured layers to efficiently handle complex tasks. There are several key architectures available, each with distinct characteristics and use cases.
+
+## Core Architectures
+
+### 1. Hybrid Hierarchical-Cluster Swarm (HHCS)
+
+```mermaid
+flowchart TD
+ Start([Task Input]) --> RouterAgent[Router Agent]
+ RouterAgent --> Analysis{Task Analysis}
+
+ Analysis -->|Analyze Requirements| Selection[Swarm Selection]
+ Selection -->|Select Best Swarm| Route[Route Task]
+
+ Route --> Swarm1[Specialized Swarm 1]
+ Route --> Swarm2[Specialized Swarm 2]
+ Route --> SwarmN[Specialized Swarm N]
+
+ Swarm1 -->|Process| Result1[Output 1]
+ Swarm2 -->|Process| Result2[Output 2]
+ SwarmN -->|Process| ResultN[Output N]
+
+ Result1 --> Final[Final Output]
+ Result2 --> Final
+ ResultN --> Final
+```
+
+### 2. Auto Agent Builder
+
+```mermaid
+flowchart TD
+ Task[Task Input] --> Builder[Agent Builder]
+ Builder --> Analysis{Task Analysis}
+
+ Analysis --> Create[Create Specialized Agents]
+ Create --> Pool[Agent Pool]
+
+ Pool --> Agent1[Specialized Agent 1]
+ Pool --> Agent2[Specialized Agent 2]
+ Pool --> AgentN[Specialized Agent N]
+
+ Agent1 --> Orchestration[Task Orchestration]
+ Agent2 --> Orchestration
+ AgentN --> Orchestration
+
+ Orchestration --> Result[Final Result]
+```
+
+### 3. SwarmRouter
+
+```mermaid
+flowchart TD
+ Input[Task Input] --> Router[Swarm Router]
+ Router --> TypeSelect{Swarm Type Selection}
+
+ TypeSelect -->|Sequential| Seq[Sequential Workflow]
+ TypeSelect -->|Concurrent| Con[Concurrent Workflow]
+ TypeSelect -->|Hierarchical| Hier[Hierarchical Swarm]
+ TypeSelect -->|Group| Group[Group Chat]
+
+ Seq --> Output[Task Output]
+ Con --> Output
+ Hier --> Output
+ Group --> Output
+```
+
+## Comparison Table
+
+| Architecture | Strengths | Weaknesses |
+|--------------|-----------|------------|
+| HHCS | - Clear task routing
- Specialized swarm handling
- Parallel processing capability
- Good for complex multi-domain tasks | - More complex setup
- Overhead in routing
- Requires careful swarm design |
+| Auto Agent Builder | - Dynamic agent creation
- Flexible scaling
- Self-organizing
- Good for evolving tasks | - Higher resource usage
- Potential creation overhead
- May create redundant agents |
+| SwarmRouter | - Multiple workflow types
- Simple configuration
- Flexible deployment
- Good for varied task types | - Less specialized than HHCS
- Limited inter-swarm communication
- May require manual type selection |
+
+## Use Case Recommendations
+
+1. **HHCS**: Best for:
+ - Enterprise-scale operations
+ - Multi-domain problems
+ - Complex task routing
+ - Parallel processing needs
+
+2. **Auto Agent Builder**: Best for:
+ - Dynamic workloads
+ - Evolving requirements
+ - Research and development
+ - Exploratory tasks
+
+3. **SwarmRouter**: Best for:
+ - General purpose tasks
+ - Quick deployment
+ - Mixed workflow types
+ - Smaller scale operations
+
+## Documentation Links
+
+1. HHCS Documentation:
+ - [Hybrid Hierarchical-Cluster Swarm Documentation](docs/swarms/structs/hhcs.md)
+ - Covers detailed implementation, constructor arguments, and full examples
+
+2. Auto Agent Builder Documentation:
+ - [Agent Builder Documentation](docs/swarms/structs/auto_agent_builder.md)
+ - Includes enterprise use cases, best practices, and integration patterns
+
+3. SwarmRouter Documentation:
+ - [SwarmRouter Documentation](docs/swarms/structs/swarm_router.md)
+ - Provides comprehensive API reference, advanced usage, and use cases
+
+## Best Practices for Selection
+
+1. **Evaluate Task Complexity**
+ - Simple tasks → SwarmRouter
+ - Complex, multi-domain tasks → HHCS
+ - Dynamic, evolving tasks → Auto Agent Builder
+
+2. **Consider Scale**
+ - Small scale → SwarmRouter
+ - Large scale → HHCS
+ - Variable scale → Auto Agent Builder
+
+3. **Resource Availability**
+ - Limited resources → SwarmRouter
+ - Abundant resources → HHCS or Auto Agent Builder
+ - Dynamic resources → Auto Agent Builder
+
+4. **Development Time**
+ - Quick deployment → SwarmRouter
+ - Complex system → HHCS
+ - Experimental system → Auto Agent Builder
+
+## Integration Considerations
+
+1. **System Requirements**
+ - All architectures require proper API access depending on the model your agents are using.
+ - HHCS needs robust routing infrastructure
+ - Auto Agent Builder needs scalable resource management
+ - SwarmRouter needs workflow type definitions
+
+2. **Monitoring**
+ - Implement comprehensive logging
+ - Track performance metrics
+ - Monitor resource usage
+ - Set up alerting systems
+
+3. **Scaling**
+ - Design for horizontal scaling
+ - Implement proper load balancing
+ - Consider distributed deployment
+ - Plan for future growth
+
+This documentation provides a high-level overview of the main hierarchical agent orchestration architectures available in the system. Each architecture has its own strengths and ideal use cases, and the choice between them should be based on specific project requirements, scale, and complexity.
diff --git a/docs/swarms/structs/sequential_workflow.md b/docs/swarms/structs/sequential_workflow.md
index e4301ccf..201f2049 100644
--- a/docs/swarms/structs/sequential_workflow.md
+++ b/docs/swarms/structs/sequential_workflow.md
@@ -1,6 +1,20 @@
# SequentialWorkflow Documentation
-The `SequentialWorkflow` class is designed to manage and execute a sequence of tasks through a dynamic arrangement of agents. This class allows for the orchestration of multiple agents in a predefined order, facilitating complex workflows where tasks are processed sequentially by different agents.
+**Overview:**
+A Sequential Swarm architecture processes tasks in a linear sequence. Each agent completes its task before passing the result to the next agent in the chain. This architecture ensures orderly processing and is useful when tasks have dependencies. [Learn more here in the docs:](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/)
+
+**Use-Cases:**
+
+- Workflows where each step depends on the previous one, such as assembly lines or sequential data processing.
+
+- Scenarios requiring strict order of operations.
+
+```mermaid
+graph TD
+ A[First Agent] --> B[Second Agent]
+ B --> C[Third Agent]
+ C --> D[Fourth Agent]
+```
## Attributes
@@ -32,52 +46,43 @@ Runs the specified task through the agents in the dynamically constructed flow.
- **Returns:**
- `str`: The final result after processing through all agents.
-- **Usage Example:**
- ```python
- from swarms import Agent, SequentialWorkflow
-
-from swarm_models import Anthropic
+## **Usage Example:**
+```python
- # Initialize the language model agent (e.g., GPT-3)
- llm = Anthropic()
+from swarms import Agent, SequentialWorkflow
- # Place your key in .env
+# Initialize agents for individual tasks
+agent1 = Agent(
+ agent_name="ICD-10 Code Analyzer",
+ system_prompt="Analyze medical data and provide relevant ICD-10 codes.",
+ model_name="gpt-4o",
+ max_loops=1,
+)
+agent2 = Agent(
+ agent_name="ICD-10 Code Summarizer",
+ system_prompt="Summarize the findings and suggest ICD-10 codes.",
+ model_name="gpt-4o",
+ max_loops=1,
+)
- # Initialize agents for individual tasks
- agent1 = Agent(
- agent_name="Blog generator",
- system_prompt="Generate a blog post like stephen king",
- llm=llm,
- max_loops=1,
- dashboard=False,
- tools=[],
- )
- agent2 = Agent(
- agent_name="summarizer",
- system_prompt="Sumamrize the blog post",
- llm=llm,
- max_loops=1,
- dashboard=False,
- tools=[],
- )
+# Create the Sequential workflow
+workflow = SequentialWorkflow(
+ agents=[agent1, agent2], max_loops=1, verbose=False
+)
- # Create the Sequential workflow
- workflow = SequentialWorkflow(
- agents=[agent1, agent2], max_loops=1, verbose=False
- )
+# Run the workflow
+workflow.run(
+ "Analyze the medical report and provide the appropriate ICD-10 codes."
+)
- # Run the workflow
- workflow.run(
- "Generate a blog post on how swarms of agents can help businesses grow."
- )
+```
- ```
+This example initializes a `SequentialWorkflow` with three agents and executes a task, printing the final result.
- This example initializes a `SequentialWorkflow` with three agents and executes a task, printing the final result.
+## **Notes:**
-- **Notes:**
- - Logs the task execution process and handles any exceptions that occur during the task execution.
+- Logs the task execution process and handles any exceptions that occur during the task execution.
### Logging and Error Handling
@@ -88,4 +93,5 @@ The `run` method includes logging to track the execution flow and captures error
- Ensure that the agents provided to the `SequentialWorkflow` are properly initialized and configured to handle the tasks they will receive.
- The `max_loops` parameter can be used to control how many times the workflow should be executed, which is useful for iterative processes.
+
- Utilize the logging information to monitor and debug the task execution process.
diff --git a/example.py b/example.py
index f9182e14..0ddeec23 100644
--- a/example.py
+++ b/example.py
@@ -12,7 +12,7 @@ agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
- max_loops=1,
+ max_loops="auto",
model_name="gpt-4o",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
diff --git a/hybrid_hiearchical_swarm.py b/hybrid_hiearchical_swarm.py
new file mode 100644
index 00000000..6fcd57ad
--- /dev/null
+++ b/hybrid_hiearchical_swarm.py
@@ -0,0 +1,119 @@
+from swarms import Agent, SwarmRouter
+from swarms.structs.hybrid_hiearchical_peer_swarm import (
+ HybridHierarchicalClusterSwarm,
+)
+
+
+# Core Legal Agent Definitions with short, simple prompts
+litigation_agent = Agent(
+ agent_name="Litigator",
+ system_prompt="You handle lawsuits. Analyze facts, build arguments, and develop case strategy.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+corporate_agent = Agent(
+ agent_name="Corporate-Attorney",
+ system_prompt="You handle business law. Advise on corporate structure, governance, and transactions.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+ip_agent = Agent(
+ agent_name="IP-Attorney",
+ system_prompt="You protect intellectual property. Handle patents, trademarks, copyrights, and trade secrets.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+employment_agent = Agent(
+ agent_name="Employment-Attorney",
+ system_prompt="You handle workplace matters. Address hiring, termination, discrimination, and labor issues.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+paralegal_agent = Agent(
+ agent_name="Paralegal",
+ system_prompt="You assist attorneys. Conduct research, draft documents, and organize case files.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+doc_review_agent = Agent(
+ agent_name="Document-Reviewer",
+ system_prompt="You examine documents. Extract key information and identify relevant content.",
+ model_name="groq/deepseek-r1-distill-qwen-32b",
+ max_loops=1,
+)
+
+# Practice Area Swarm Routers
+litigation_swarm = SwarmRouter(
+ name="litigation-practice",
+ description="Handle all aspects of litigation",
+ agents=[litigation_agent, paralegal_agent, doc_review_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+corporate_swarm = SwarmRouter(
+ name="corporate-practice",
+ description="Handle business and corporate legal matters",
+ agents=[corporate_agent, paralegal_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+ip_swarm = SwarmRouter(
+ name="ip-practice",
+ description="Handle intellectual property matters",
+ agents=[ip_agent, paralegal_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+employment_swarm = SwarmRouter(
+ name="employment-practice",
+ description="Handle employment and labor law matters",
+ agents=[employment_agent, paralegal_agent],
+ swarm_type="SequentialWorkflow",
+)
+
+# Cross-functional Swarm Router
+m_and_a_swarm = SwarmRouter(
+ name="mergers-acquisitions",
+ description="Handle mergers and acquisitions",
+ agents=[
+ corporate_agent,
+ ip_agent,
+ employment_agent,
+ doc_review_agent,
+ ],
+ swarm_type="ConcurrentWorkflow",
+)
+
+dispute_swarm = SwarmRouter(
+ name="dispute-resolution",
+ description="Handle complex disputes requiring multiple specialties",
+ agents=[litigation_agent, corporate_agent, doc_review_agent],
+ swarm_type="ConcurrentWorkflow",
+)
+
+
+hybrid_hiearchical_swarm = HybridHierarchicalClusterSwarm(
+ name="hybrid-hiearchical-swarm",
+ description="A hybrid hiearchical swarm that uses a hybrid hiearchical peer model to solve complex tasks.",
+ swarms=[
+ litigation_swarm,
+ corporate_swarm,
+ ip_swarm,
+ employment_swarm,
+ m_and_a_swarm,
+ dispute_swarm,
+ ],
+ max_loops=1,
+ router_agent_model_name="gpt-4o-mini",
+)
+
+
+if __name__ == "__main__":
+ hybrid_hiearchical_swarm.run(
+ "What is the best way to file for a patent? for ai technology "
+ )
diff --git a/pyproject.toml b/pyproject.toml
index cc99f7f0..71fff1c4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
-version = "7.6.0"
+version = "7.6.1"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez "]
diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py
index 498cffb4..4eb006d5 100644
--- a/swarms/agents/__init__.py
+++ b/swarms/agents/__init__.py
@@ -1,9 +1,10 @@
+from swarms.agents.agent_judge import AgentJudge
from swarms.agents.consistency_agent import SelfConsistencyAgent
-
-# from swarms.agents.tool_agent import ToolAgent
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
+from swarms.agents.flexion_agent import ReflexionAgent
+from swarms.agents.gkp_agent import GKPAgent
from swarms.agents.i_agent import IterativeReflectiveExpansion
from swarms.agents.reasoning_agents import (
ReasoningAgentRouter,
@@ -23,8 +24,8 @@ from swarms.structs.stopping_conditions import (
check_success,
)
-from swarms.agents.flexion_agent import ReflexionAgent
-from swarms.agents.gkp_agent import GKPAgent
+# Hybrid Hierarchical-Peer Model
+
__all__ = [
# "ToolAgent",
@@ -46,4 +47,5 @@ __all__ = [
"agent_types",
"ReflexionAgent",
"GKPAgent",
+ "AgentJudge",
]
diff --git a/swarms/agents/reasoning_agents.py b/swarms/agents/reasoning_agents.py
index 0be6e84c..17a6089f 100644
--- a/swarms/agents/reasoning_agents.py
+++ b/swarms/agents/reasoning_agents.py
@@ -8,6 +8,7 @@ from swarms.agents.i_agent import (
)
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.output_types import OutputType
+from swarms.agents.agent_judge import AgentJudge
agent_types = Literal[
"reasoning-duo",
@@ -18,6 +19,7 @@ agent_types = Literal[
"ire-agent",
"ReflexionAgent",
"GKPAgent",
+ "AgentJudge",
]
@@ -106,6 +108,14 @@ class ReasoningAgentRouter:
output_type=self.output_type,
)
+ elif self.swarm_type == "AgentJudge":
+ return AgentJudge(
+ agent_name=self.agent_name,
+ model_name=self.model_name,
+ system_prompt=self.system_prompt,
+ max_loops=self.max_loops,
+ )
+
elif self.swarm_type == "ReflexionAgent":
return ReflexionAgent(
agent_name=self.agent_name,
diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py
index 3753d938..650cbede 100644
--- a/swarms/structs/__init__.py
+++ b/swarms/structs/__init__.py
@@ -8,6 +8,8 @@ from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.conversation import Conversation
+from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm
+from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.graph_workflow import (
Edge,
GraphWorkflow,
@@ -18,6 +20,9 @@ from swarms.structs.groupchat import (
GroupChat,
expertise_based,
)
+from swarms.structs.hybrid_hiearchical_peer_swarm import (
+ HybridHierarchicalClusterSwarm,
+)
from swarms.structs.majority_voting import (
MajorityVoting,
majority_voting,
@@ -83,9 +88,6 @@ from swarms.structs.swarms_api import (
SwarmValidationError,
)
-from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm
-from swarms.structs.deep_research_swarm import DeepResearchSwarm
-
__all__ = [
"Agent",
"AsyncWorkflow",
@@ -161,4 +163,5 @@ __all__ = [
"MALT",
"DeHallucinationSwarm",
"DeepResearchSwarm",
+ "HybridHierarchicalClusterSwarm",
]
diff --git a/swarms/structs/agent_security.py b/swarms/structs/agent_security.py
deleted file mode 100644
index 8e588acf..00000000
--- a/swarms/structs/agent_security.py
+++ /dev/null
@@ -1,318 +0,0 @@
-import base64
-import json
-import uuid
-from datetime import datetime
-from dataclasses import dataclass
-from typing import Optional, Union, Dict, List
-
-from cryptography.fernet import Fernet
-from cryptography.hazmat.primitives import hashes, serialization
-from cryptography.hazmat.primitives.asymmetric import padding, rsa
-from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
-
-
-@dataclass
-class EncryptedMessage:
- """Structure for encrypted messages between agents"""
-
- sender_id: str
- receiver_id: str
- encrypted_content: bytes
- timestamp: float
- message_id: str
- session_id: str
-
-
-class EncryptionSession:
- """Represents an encrypted communication session between agents"""
-
- def __init__(
- self,
- session_id: str,
- agent_ids: List[str],
- encrypted_keys: Dict[str, bytes],
- created_at: datetime,
- ):
- self.session_id = session_id
- self.agent_ids = agent_ids
- self.encrypted_keys = encrypted_keys
- self.created_at = created_at
-
-
-class AgentEncryption:
- """
- Handles encryption for agent data both at rest and in transit.
- Supports both symmetric (for data at rest) and asymmetric (for data in transit) encryption.
- Also supports secure multi-agent communication.
- """
-
- def __init__(
- self,
- agent_id: Optional[str] = None,
- encryption_key: Optional[str] = None,
- enable_transit_encryption: bool = False,
- enable_rest_encryption: bool = False,
- enable_multi_agent: bool = False,
- ):
- self.agent_id = agent_id or str(uuid.uuid4())
- self.enable_transit_encryption = enable_transit_encryption
- self.enable_rest_encryption = enable_rest_encryption
- self.enable_multi_agent = enable_multi_agent
-
- # Multi-agent communication storage
- self.sessions: Dict[str, EncryptionSession] = {}
- self.known_agents: Dict[str, "AgentEncryption"] = {}
-
- if enable_rest_encryption:
- # Initialize encryption for data at rest
- if encryption_key:
- self.encryption_key = base64.urlsafe_b64encode(
- PBKDF2HMAC(
- algorithm=hashes.SHA256(),
- length=32,
- salt=f"agent_{self.agent_id}".encode(), # Unique salt per agent
- iterations=100000,
- ).derive(encryption_key.encode())
- )
- else:
- self.encryption_key = Fernet.generate_key()
-
- self.cipher_suite = Fernet(self.encryption_key)
-
- if enable_transit_encryption or enable_multi_agent:
- # Generate RSA key pair for transit encryption
- self.private_key = rsa.generate_private_key(
- public_exponent=65537, key_size=2048
- )
- self.public_key = self.private_key.public_key()
-
- def register_agent(
- self, agent_id: str, agent_encryption: "AgentEncryption"
- ) -> None:
- """Register another agent for secure communication"""
- if not self.enable_multi_agent:
- raise ValueError("Multi-agent support is not enabled")
- self.known_agents[agent_id] = agent_encryption
-
- def create_session(self, agent_ids: List[str]) -> str:
- """Create a new encrypted session between multiple agents"""
- if not self.enable_multi_agent:
- raise ValueError("Multi-agent support is not enabled")
-
- session_id = str(uuid.uuid4())
-
- # Generate a shared session key
- session_key = Fernet.generate_key()
-
- # Create encrypted copies of the session key for each agent
- encrypted_keys = {}
- for agent_id in agent_ids:
- if (
- agent_id not in self.known_agents
- and agent_id != self.agent_id
- ):
- raise ValueError(f"Agent {agent_id} not registered")
-
- if agent_id == self.agent_id:
- agent_public_key = self.public_key
- else:
- agent_public_key = self.known_agents[
- agent_id
- ].public_key
-
- encrypted_key = agent_public_key.encrypt(
- session_key,
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None,
- ),
- )
- encrypted_keys[agent_id] = encrypted_key
-
- # Store session information
- self.sessions[session_id] = EncryptionSession(
- session_id=session_id,
- agent_ids=agent_ids,
- encrypted_keys=encrypted_keys,
- created_at=datetime.now(),
- )
-
- return session_id
-
- def encrypt_message(
- self,
- content: Union[str, dict],
- receiver_id: str,
- session_id: str,
- ) -> EncryptedMessage:
- """Encrypt a message for another agent within a session"""
- if not self.enable_multi_agent:
- raise ValueError("Multi-agent support is not enabled")
-
- if session_id not in self.sessions:
- raise ValueError("Invalid session ID")
-
- session = self.sessions[session_id]
- if (
- self.agent_id not in session.agent_ids
- or receiver_id not in session.agent_ids
- ):
- raise ValueError("Sender or receiver not in session")
-
- # Serialize content if it's a dictionary
- if isinstance(content, dict):
- content = json.dumps(content)
-
- # Get the session key
- encrypted_session_key = session.encrypted_keys[self.agent_id]
- session_key = self.decrypt_session_key(encrypted_session_key)
-
- # Create Fernet cipher with session key
- cipher = Fernet(session_key)
-
- # Encrypt the message
- encrypted_content = cipher.encrypt(content.encode())
-
- return EncryptedMessage(
- sender_id=self.agent_id,
- receiver_id=receiver_id,
- encrypted_content=encrypted_content,
- timestamp=datetime.now().timestamp(),
- message_id=str(uuid.uuid4()),
- session_id=session_id,
- )
-
- def decrypt_message(
- self, message: EncryptedMessage
- ) -> Union[str, dict]:
- """Decrypt a message from another agent"""
- if not self.enable_multi_agent:
- raise ValueError("Multi-agent support is not enabled")
-
- if message.session_id not in self.sessions:
- raise ValueError("Invalid session ID")
-
- if self.agent_id != message.receiver_id:
- raise ValueError("Message not intended for this agent")
-
- session = self.sessions[message.session_id]
-
- # Get the session key
- encrypted_session_key = session.encrypted_keys[self.agent_id]
- session_key = self.decrypt_session_key(encrypted_session_key)
-
- # Create Fernet cipher with session key
- cipher = Fernet(session_key)
-
- # Decrypt the message
- decrypted_content = cipher.decrypt(
- message.encrypted_content
- ).decode()
-
- # Try to parse as JSON
- try:
- return json.loads(decrypted_content)
- except json.JSONDecodeError:
- return decrypted_content
-
- def decrypt_session_key(self, encrypted_key: bytes) -> bytes:
- """Decrypt a session key using the agent's private key"""
- return self.private_key.decrypt(
- encrypted_key,
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None,
- ),
- )
-
- # Original methods preserved below
- def encrypt_at_rest(self, data: Union[str, dict, bytes]) -> bytes:
- """Encrypts data for storage"""
- if not self.enable_rest_encryption:
- return (
- data
- if isinstance(data, bytes)
- else str(data).encode()
- )
-
- if isinstance(data, dict):
- data = json.dumps(data)
- if isinstance(data, str):
- data = data.encode()
-
- return self.cipher_suite.encrypt(data)
-
- def decrypt_at_rest(
- self, encrypted_data: bytes
- ) -> Union[str, dict]:
- """Decrypts stored data"""
- if not self.enable_rest_encryption:
- return encrypted_data.decode()
-
- decrypted_data = self.cipher_suite.decrypt(encrypted_data)
-
- try:
- return json.loads(decrypted_data)
- except json.JSONDecodeError:
- return decrypted_data.decode()
-
- def encrypt_for_transit(self, data: Union[str, dict]) -> bytes:
- """Encrypts data for transmission"""
- if not self.enable_transit_encryption:
- return str(data).encode()
-
- if isinstance(data, dict):
- data = json.dumps(data)
-
- return self.public_key.encrypt(
- data.encode(),
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None,
- ),
- )
-
- def decrypt_from_transit(
- self, data: Union[bytes, str]
- ) -> Union[str, dict]:
- """Decrypts received data, handling both encrypted and unencrypted inputs"""
- if not self.enable_transit_encryption:
- return data.decode() if isinstance(data, bytes) else data
-
- try:
- if isinstance(data, bytes) and len(data) == 256:
- decrypted_data = self.private_key.decrypt(
- data,
- padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA256()),
- algorithm=hashes.SHA256(),
- label=None,
- ),
- ).decode()
- else:
- return (
- data.decode() if isinstance(data, bytes) else data
- )
-
- try:
- return json.loads(decrypted_data)
- except json.JSONDecodeError:
- return decrypted_data
- except ValueError:
- return data.decode() if isinstance(data, bytes) else data
-
- def get_public_key_pem(self) -> bytes:
- """Returns the public key in PEM format for sharing"""
- if (
- not self.enable_transit_encryption
- and not self.enable_multi_agent
- ):
- return b""
-
- return self.public_key.public_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PublicFormat.SubjectPublicKeyInfo,
- )
diff --git a/swarms/structs/airflow_swarm.py b/swarms/structs/airflow_swarm.py
deleted file mode 100644
index 84ea68c7..00000000
--- a/swarms/structs/airflow_swarm.py
+++ /dev/null
@@ -1,430 +0,0 @@
-import subprocess
-import sys
-import uuid
-import threading
-from concurrent.futures import (
- FIRST_COMPLETED,
- ThreadPoolExecutor,
- wait,
-)
-from dataclasses import dataclass
-from datetime import datetime, timedelta
-from enum import Enum
-from typing import Any, Dict, List, Optional, Set, Union
-from graphviz import Digraph
-from loguru import logger
-
-# Airflow imports
-try:
- from airflow import DAG
- from airflow.operators.python import PythonOperator
-except ImportError:
- logger.error(
- "Airflow is not installed. Please install it using 'pip install apache-airflow'."
- )
- subprocess.run(
- [sys.executable, "-m", "pip", "install", "apache-airflow"]
- )
- from airflow import DAG
- from airflow.operators.python import PythonOperator
-
-# Import the real Agent from swarms.
-from swarms.structs.conversation import Conversation
-
-
-class NodeType(Enum):
- AGENT = "agent"
- CALLABLE = "callable"
- TOOL = "tool"
-
-
-def dag_id():
- return uuid.uuid4().hex
-
-
-@dataclass
-class Node:
- """Represents a node in the DAG"""
-
- id: str
- type: NodeType
- component: Any # Agent, Callable, or Tool
- query: Optional[str] = None
- args: Optional[List[Any]] = None
- kwargs: Optional[Dict[str, Any]] = None
- concurrent: bool = False
-
-
-# ======= Airflow DAG Swarm Class =========
-class AirflowDAGSwarm:
- """
- A simplified and more intuitive DAG-based swarm for orchestrating agents, callables, and tools.
- Provides an easy-to-use API for building agent pipelines with support for concurrent execution.
- """
-
- def __init__(
- self,
- dag_id: str = dag_id(),
- description: str = "A DAG Swarm for Airflow",
- name: str = "Airflow DAG Swarm",
- schedule_interval: Union[timedelta, str] = timedelta(days=1),
- start_date: datetime = datetime(2025, 2, 14),
- default_args: Optional[Dict[str, Any]] = None,
- initial_message: Optional[str] = None,
- max_workers: int = 5,
- ):
- """Initialize the AirflowDAGSwarm with improved configuration."""
- self.dag_id = dag_id
- self.name = name
- self.description = description
- self.max_workers = max_workers
-
- self.default_args = default_args or {
- "owner": "airflow",
- "depends_on_past": False,
- "email_on_failure": False,
- "email_on_retry": False,
- "retries": 1,
- "retry_delay": timedelta(minutes=5),
- }
-
- self.dag = DAG(
- dag_id=dag_id,
- default_args=self.default_args,
- schedule_interval=schedule_interval,
- start_date=start_date,
- catchup=False,
- )
-
- self.nodes: Dict[str, Node] = {}
- self.edges: Dict[str, Set[str]] = (
- {}
- ) # node_id -> set of child node_ids
-
- # Initialize conversation
- self.conversation = Conversation()
- if initial_message:
- self.conversation.add("user", initial_message)
-
- self.lock = threading.Lock()
-
- def add_user_message(self, message: str) -> None:
- """Add a user message to the conversation."""
- with self.lock:
- self.conversation.add("user", message)
- logger.info(f"Added user message: {message}")
-
- def get_conversation_history(self) -> str:
- """Get the conversation history as JSON."""
- return self.conversation.to_json()
-
- def add_node(
- self,
- node_id: str,
- component: Any,
- node_type: NodeType,
- query: Optional[str] = None,
- args: Optional[List[Any]] = None,
- kwargs: Optional[Dict[str, Any]] = None,
- concurrent: bool = False,
- ) -> str:
- """
- Add a node to the DAG with improved type checking and validation.
-
- Args:
- node_id: Unique identifier for the node
- component: Agent, callable, or tool to execute
- node_type: Type of the node (AGENT, CALLABLE, or TOOL)
- query: Query string for agents
- args: Positional arguments for callables/tools
- kwargs: Keyword arguments for callables/tools
- concurrent: Whether to execute this node concurrently
-
- Returns:
- node_id: The ID of the created node
- """
- if node_id in self.nodes:
- raise ValueError(f"Node with ID {node_id} already exists")
-
- if node_type == NodeType.AGENT and not hasattr(
- component, "run"
- ):
- raise ValueError("Agent must have a 'run' method")
- elif node_type in (
- NodeType.CALLABLE,
- NodeType.TOOL,
- ) and not callable(component):
- raise ValueError(f"{node_type.value} must be callable")
-
- node = Node(
- id=node_id,
- type=node_type,
- component=component,
- query=query,
- args=args or [],
- kwargs=kwargs or {},
- concurrent=concurrent,
- )
-
- self.nodes[node_id] = node
- self.edges[node_id] = set()
- logger.info(f"Added {node_type.value} node: {node_id}")
- return node_id
-
- def add_edge(self, from_node: str, to_node: str) -> None:
- """
- Add a directed edge between two nodes in the DAG.
-
- Args:
- from_node: ID of the source node
- to_node: ID of the target node
- """
- if from_node not in self.nodes or to_node not in self.nodes:
- raise ValueError("Both nodes must exist in the DAG")
-
- self.edges[from_node].add(to_node)
- logger.info(f"Added edge: {from_node} -> {to_node}")
-
- def _execute_node(self, node: Node) -> str:
- """Execute a single node and return its output."""
- try:
- if node.type == NodeType.AGENT:
- query = (
- node.query
- or self.conversation.get_last_message_as_string()
- or "Default query"
- )
- logger.info(
- f"Executing agent node {node.id} with query: {query}"
- )
- return node.component.run(query)
-
- elif node.type in (NodeType.CALLABLE, NodeType.TOOL):
- logger.info(
- f"Executing {node.type.value} node {node.id}"
- )
- return node.component(
- *node.args,
- conversation=self.conversation,
- **node.kwargs,
- )
- except Exception as e:
- logger.exception(f"Error executing node {node.id}: {e}")
- return f"Error in node {node.id}: {str(e)}"
-
- def _get_root_nodes(self) -> List[str]:
- """Get nodes with no incoming edges."""
- all_nodes = set(self.nodes.keys())
- nodes_with_incoming = {
- node for edges in self.edges.values() for node in edges
- }
- return list(all_nodes - nodes_with_incoming)
-
- def run(self, **context: Any) -> str:
- """
- Execute the DAG with improved concurrency handling and error recovery.
-
- Returns:
- The final conversation state as a JSON string
- """
- logger.info("Starting swarm execution")
-
- # Track completed nodes and their results
- completed: Dict[str, str] = {}
-
- def can_execute_node(node_id: str) -> bool:
- """Check if all dependencies of a node are completed."""
- return all(
- dep in completed
- for dep_set in self.edges.values()
- for dep in dep_set
- if node_id in dep_set
- )
-
- with ThreadPoolExecutor(
- max_workers=self.max_workers
- ) as executor:
- # Initialize futures dict for concurrent root nodes
- futures_dict = {
- executor.submit(
- self._execute_node, self.nodes[node_id]
- ): node_id
- for node_id in self._get_root_nodes()
- if self.nodes[node_id].concurrent
- }
-
- # Execute nodes that shouldn't run concurrently
- for node_id in self._get_root_nodes():
- if not self.nodes[node_id].concurrent:
- output = self._execute_node(self.nodes[node_id])
- with self.lock:
- completed[node_id] = output
- self.conversation.add("assistant", output)
-
- # Process remaining nodes
- while futures_dict:
- done, _ = wait(
- futures_dict.keys(), return_when=FIRST_COMPLETED
- )
-
- for future in done:
- node_id = futures_dict.pop(future)
- try:
- output = future.result()
- with self.lock:
- completed[node_id] = output
- self.conversation.add("assistant", output)
- except Exception as e:
- logger.exception(
- f"Error in node {node_id}: {e}"
- )
- completed[node_id] = f"Error: {str(e)}"
-
- # Add new nodes that are ready to execute
- new_nodes = [
- node_id
- for node_id in self.nodes
- if node_id not in completed
- and can_execute_node(node_id)
- ]
-
- for node_id in new_nodes:
- if self.nodes[node_id].concurrent:
- future = executor.submit(
- self._execute_node,
- self.nodes[node_id],
- )
- futures_dict[future] = node_id
- else:
- output = self._execute_node(
- self.nodes[node_id]
- )
- with self.lock:
- completed[node_id] = output
- self.conversation.add(
- "assistant", output
- )
-
- return self.conversation.to_json()
-
- def visualize(
- self, filename: str = "dag_visualization", view: bool = True
- ) -> Digraph:
- """
- Generate a visualization of the DAG structure.
-
- Args:
- filename: Output filename for the visualization
- view: Whether to open the visualization
-
- Returns:
- Graphviz Digraph object
- """
- dot = Digraph(comment=f"DAG Visualization: {self.name}")
-
- # Add nodes
- for node_id, node in self.nodes.items():
- label = f"{node_id}\n({node.type.value})"
- shape = "box" if node.concurrent else "ellipse"
- dot.node(node_id, label, shape=shape)
-
- # Add edges
- for from_node, to_nodes in self.edges.items():
- for to_node in to_nodes:
- dot.edge(from_node, to_node)
-
- dot.render(filename, view=view, format="pdf")
- return dot
-
- def create_dag(self) -> DAG:
- """
- Create an Airflow DAG with a single PythonOperator that executes the entire swarm.
- In a production environment, you might break the components into multiple tasks.
-
- :return: The configured Airflow DAG.
- """
- logger.info("Creating Airflow DAG for swarm execution.")
- PythonOperator(
- task_id="run",
- python_callable=self.run,
- op_kwargs={
- "concurrent": False
- }, # Change to True for concurrent execution.
- dag=self.dag,
- )
- return self.dag
-
-
-# # ======= Example Usage =========
-# if __name__ == "__main__":
-# # Configure logger to output to console.
-# logger.remove()
-# logger.add(lambda msg: print(msg, end=""), level="DEBUG")
-
-# # Create the DAG swarm with an initial message
-# swarm = AirflowDAGSwarm(
-# dag_id="swarm_conversation_dag",
-# initial_message="Hello, how can I help you with financial planning?",
-# )
-
-# # Create a real financial agent using the swarms package.
-# financial_agent = Agent(
-# agent_name="Financial-Analysis-Agent",
-# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
-# model_name="gpt-4o-mini",
-# max_loops=1,
-# )
-
-# # Add the real agent with a specific query.
-# swarm.add_node(
-# "financial_advisor",
-# financial_agent,
-# NodeType.AGENT,
-# query="How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria",
-# concurrent=True,
-# )
-
-# # Add a callable component.
-# def extra_processing(x: int, conversation: Conversation) -> str:
-# return f"Extra processing output with data {x} and conversation length {len(conversation.messages)}"
-
-# swarm.add_node(
-# "extra_processing",
-# extra_processing,
-# NodeType.CALLABLE,
-# args=[42],
-# concurrent=True,
-# )
-
-# # Add a tool component (for example, a tool to create a conversation graph).
-# def create_conversation_graph(conversation: Conversation) -> str:
-# # In this tool, we generate the graph and return a confirmation message.
-# swarm.visualize(
-# filename="swarm_conversation_tool_graph", view=False
-# )
-# return "Graph created."
-
-# swarm.add_node(
-# "conversation_graph",
-# create_conversation_graph,
-# NodeType.TOOL,
-# concurrent=False,
-# )
-
-# # Add edges to create the pipeline
-# swarm.add_edge("financial_advisor", "extra_processing")
-# swarm.add_edge("extra_processing", "conversation_graph")
-
-# # Execute the swarm
-# final_state = swarm.run()
-# logger.info(f"Final conversation: {final_state}")
-
-# # Visualize the DAG
-# print(
-# swarm.visualize(
-# filename="swarm_conversation_final", view=False
-# )
-# )
-
-# # Create the Airflow DAG.
-# dag = swarm.create_dag()
diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py
index 88654a93..7553f822 100644
--- a/swarms/structs/concurrent_workflow.py
+++ b/swarms/structs/concurrent_workflow.py
@@ -12,7 +12,7 @@ from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.conversation import Conversation
from swarms.structs.swarm_id_generator import generate_swarm_id
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
logger = initialize_logger(log_folder="concurrent_workflow")
diff --git a/swarms/structs/deep_research_swarm.py b/swarms/structs/deep_research_swarm.py
index 0dfa4aaa..e5fbed08 100644
--- a/swarms/structs/deep_research_swarm.py
+++ b/swarms/structs/deep_research_swarm.py
@@ -454,6 +454,21 @@ class DeepResearchSwarm:
def run(self, task: str):
return self.step(task)
+ def batched_run(self, tasks: List[str]):
+ """
+ Execute a list of research tasks in parallel.
+
+ Args:
+ tasks (List[str]): A list of research tasks to execute
+
+ Returns:
+ List[str]: A list of formatted conversation histories
+ """
+ futures = []
+ for task in tasks:
+ future = self.executor.submit(self.step, task)
+ futures.append((task, future))
+
# # Example usage
# if __name__ == "__main__":
diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py
index c495093d..f7653531 100644
--- a/swarms/structs/hiearchical_swarm.py
+++ b/swarms/structs/hiearchical_swarm.py
@@ -8,7 +8,7 @@ from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
from swarms.utils.formatter import formatter
from swarms.utils.function_caller_model import OpenAIFunctionCaller
diff --git a/swarms/structs/hybrid_hiearchical_peer_swarm.py b/swarms/structs/hybrid_hiearchical_peer_swarm.py
new file mode 100644
index 00000000..821e1e0b
--- /dev/null
+++ b/swarms/structs/hybrid_hiearchical_peer_swarm.py
@@ -0,0 +1,273 @@
+import os
+from typing import List
+from swarms.structs.agent import Agent
+from swarms.structs.conversation import Conversation
+from swarms.structs.swarm_router import SwarmRouter
+from swarms.utils.history_output_formatter import (
+ history_output_formatter,
+)
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Union, Callable
+
+tools = [
+ {
+ "type": "function",
+ "function": {
+ "name": "select_swarm",
+ "description": "Analyzes the input task and selects the most appropriate swarm configuration, outputting both the swarm name and the formatted task.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "reasoning": {
+ "type": "string",
+ "description": "The reasoning behind the selection of the swarm and task description.",
+ },
+ "swarm_name": {
+ "type": "string",
+ "description": "The name of the selected swarm that is most appropriate for handling the given task.",
+ },
+ "task_description": {
+ "type": "string",
+ "description": "A clear and structured description of the task to be performed by the swarm.",
+ },
+ },
+ "required": [
+ "reasoning",
+ "swarm_name",
+ "task_description",
+ ],
+ },
+ },
+ },
+]
+
+router_system_prompt = """
+You are an intelligent Router Agent responsible for analyzing tasks and directing them to the most appropriate swarm in our system. Your role is critical in ensuring optimal task execution and resource utilization.
+
+Key Responsibilities:
+1. Task Analysis:
+ - Carefully analyze the input task's requirements, complexity, and domain
+ - Identify key components and dependencies
+ - Determine the specialized skills needed for completion
+
+2. Swarm Selection Criteria:
+ - Match task requirements with swarm capabilities
+ - Consider swarm specializations and past performance
+ - Evaluate computational resources needed
+ - Account for task priority and time constraints
+
+3. Decision Making Framework:
+ - Use a systematic approach to evaluate all available swarms
+ - Consider load balancing across the system
+ - Factor in swarm availability and current workload
+ - Assess potential risks and failure points
+
+4. Output Requirements:
+ - Provide clear justification for swarm selection
+ - Structure the task description in a way that maximizes swarm efficiency
+ - Include any relevant context or constraints
+ - Ensure all critical information is preserved
+
+Best Practices:
+- Always prefer specialized swarms for domain-specific tasks
+- Consider breaking complex tasks into subtasks when appropriate
+- Maintain consistency in task formatting across different swarms
+- Include error handling considerations in task descriptions
+
+Your output must strictly follow the required format:
+{
+ "swarm_name": "Name of the selected swarm",
+ "task_description": "Detailed and structured task description"
+}
+
+Remember: Your selection directly impacts the overall system performance and task completion success rate. Take all factors into account before making your final decision.
+"""
+
+
+class HybridHierarchicalClusterSwarm:
+ """
+ A class representing a Hybrid Hierarchical-Cluster Swarm that routes tasks to appropriate swarms.
+
+ Attributes:
+ name (str): The name of the swarm.
+ description (str): A description of the swarm's functionality.
+ swarms (List[SwarmRouter]): A list of available swarm routers.
+ max_loops (int): The maximum number of loops for task processing.
+ output_type (str): The format of the output (e.g., list).
+ conversation (Conversation): An instance of the Conversation class to manage interactions.
+ router_agent (Agent): An instance of the Agent class responsible for routing tasks.
+ """
+
+ def __init__(
+ self,
+ name: str = "Hybrid Hierarchical-Cluster Swarm",
+ description: str = "A swarm that uses a hybrid hierarchical-peer model to solve complex tasks.",
+ swarms: List[Union[SwarmRouter, Callable]] = [],
+ max_loops: int = 1,
+ output_type: str = "list",
+ router_agent_model_name: str = "gpt-4o-mini",
+ *args,
+ **kwargs,
+ ):
+ self.name = name
+ self.description = description
+ self.swarms = swarms
+ self.max_loops = max_loops
+ self.output_type = output_type
+
+ self.conversation = Conversation()
+
+ self.router_agent = Agent(
+ agent_name="Router Agent",
+ agent_description="A router agent that routes tasks to the appropriate swarms.",
+ system_prompt=f"{router_system_prompt}\n\n{self.get_swarms_info()}",
+ tools_list_dictionary=tools,
+ model_name=router_agent_model_name,
+ max_loops=1,
+ )
+
+ def run(self, task: str, *args, **kwargs):
+ """
+ Runs the routing process for a given task.
+
+ Args:
+ task (str): The task to be processed by the swarm.
+
+ Returns:
+ str: The formatted history output of the conversation.
+
+ Raises:
+ ValueError: If the task is empty or invalid.
+ """
+ if not task:
+ raise ValueError("Task cannot be empty.")
+
+ self.conversation.add(role="User", content=task)
+
+ response = self.router_agent.run(task=task)
+
+ # Handle response whether it's a string or dictionary
+ if isinstance(response, str):
+ try:
+ import json
+
+ response = json.loads(response)
+ except json.JSONDecodeError:
+ raise ValueError(
+ "Invalid JSON response from router agent"
+ )
+
+ swarm_name = response.get("swarm_name")
+ task_description = response.get("task_description")
+
+ if not swarm_name or not task_description:
+ raise ValueError(
+ "Invalid response from router agent: missing swarm_name or task_description."
+ )
+
+ self.route_task(swarm_name, task_description)
+
+ return history_output_formatter(
+ self.conversation, self.output_type
+ )
+
+ def find_swarm_by_name(self, swarm_name: str):
+ """
+ Finds a swarm by its name.
+
+ Args:
+ swarm_name (str): The name of the swarm to find.
+
+ Returns:
+ SwarmRouter: The found swarm router, or None if not found.
+ """
+ for swarm in self.swarms:
+ if swarm.name == swarm_name:
+ return swarm
+ return None
+
+ def route_task(self, swarm_name: str, task_description: str):
+ """
+ Routes the task to the specified swarm.
+
+ Args:
+ swarm_name (str): The name of the swarm to route the task to.
+ task_description (str): The description of the task to be executed.
+
+ Raises:
+ ValueError: If the swarm is not found.
+ """
+ swarm = self.find_swarm_by_name(swarm_name)
+
+ if swarm:
+ output = swarm.run(task_description)
+ self.conversation.add(role=swarm.name, content=output)
+ else:
+ raise ValueError(f"Swarm '{swarm_name}' not found.")
+
+ def batched_run(self, tasks: List[str]):
+ """
+ Runs the routing process for a list of tasks in batches.
+
+ Args:
+ tasks (List[str]): A list of tasks to be processed by the swarm.
+
+ Returns:
+ List[str]: A list of formatted history outputs for each batch.
+
+ Raises:
+ ValueError: If the task list is empty or invalid.
+ """
+ if not tasks:
+ raise ValueError("Task list cannot be empty.")
+
+ max_workers = os.cpu_count() * 2
+
+ results = []
+
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ # Submit all tasks to the executor
+ future_to_task = {
+ executor.submit(self.run, task): task
+ for task in tasks
+ }
+
+ # Collect results as they complete
+ for future in as_completed(future_to_task):
+ try:
+ result = future.result()
+ results.append(result)
+ except Exception as e:
+ # Handle any errors that occurred during task execution
+ results.append(f"Error processing task: {str(e)}")
+
+ return results
+
+ def get_swarms_info(self) -> str:
+ """
+ Fetches and formats information about all available swarms in the system.
+
+ Returns:
+ str: A formatted string containing names and descriptions of all swarms.
+ """
+ if not self.swarms:
+ return "No swarms currently available in the system."
+
+ swarm_info = [
+ "Available Swarms:",
+ "",
+ ] # Empty string for line spacing
+
+ for idx, swarm in enumerate(self.swarms, 1):
+ swarm_info.extend(
+ [
+ f"[Swarm {idx}]",
+ f"Name: {swarm.name}",
+ f"Description: {swarm.description}",
+ f"Length of Agents: {len(swarm.agents)}",
+ f"Swarm Type: {swarm.swarm_type}",
+ "", # Empty string for line spacing between swarms
+ ]
+ )
+
+ return "\n".join(swarm_info).strip()
diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py
index 1cbe2345..cba42331 100644
--- a/swarms/structs/majority_voting.py
+++ b/swarms/structs/majority_voting.py
@@ -9,7 +9,7 @@ from typing import Any, Callable, List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
from swarms.utils.formatter import formatter
from swarms.utils.loguru_logger import initialize_logger
diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py
index 9d2c19ec..4a69ded6 100644
--- a/swarms/structs/mixture_of_agents.py
+++ b/swarms/structs/mixture_of_agents.py
@@ -11,7 +11,7 @@ from swarms.schemas.agent_step_schemas import ManySteps
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.utils.loguru_logger import initialize_logger
import concurrent.futures
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
from swarms.structs.conversation import Conversation
logger = initialize_logger(log_folder="mixture_of_agents")
diff --git a/swarms/structs/multi_agent_orchestrator.py b/swarms/structs/multi_agent_orchestrator.py
index f20211a1..eb6f00eb 100644
--- a/swarms/structs/multi_agent_orchestrator.py
+++ b/swarms/structs/multi_agent_orchestrator.py
@@ -18,7 +18,7 @@ from pydantic import BaseModel, Field
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
from swarms.utils.any_to_str import any_to_str
diff --git a/swarms/structs/output_type.py b/swarms/structs/output_type.py
deleted file mode 100644
index 5dcc7f29..00000000
--- a/swarms/structs/output_type.py
+++ /dev/null
@@ -1,18 +0,0 @@
-from typing import Literal
-
-# Define the output_type using Literal
-OutputType = Literal[
- "all",
- "final",
- "list",
- "dict",
- ".json",
- ".md",
- ".txt",
- ".yaml",
- ".toml",
- "str",
-]
-
-# Use the OutputType for type annotations
-output_type: OutputType
diff --git a/swarms/structs/output_types.py b/swarms/structs/output_types.py
index 1d9d91be..eaabc17d 100644
--- a/swarms/structs/output_types.py
+++ b/swarms/structs/output_types.py
@@ -14,3 +14,6 @@ OutputType = Literal[
"string",
"str",
]
+
+# Use the OutputType for type annotations
+output_type: OutputType
diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py
index 1f51b818..dedd75dd 100644
--- a/swarms/structs/rearrange.py
+++ b/swarms/structs/rearrange.py
@@ -15,7 +15,7 @@ from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import initialize_logger
from swarms.telemetry.main import log_agent_data
from swarms.structs.conversation import Conversation
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
logger = initialize_logger(log_folder="rearrange")
diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py
index 80feb385..82c5a3a4 100644
--- a/swarms/structs/sequential_workflow.py
+++ b/swarms/structs/sequential_workflow.py
@@ -1,7 +1,7 @@
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.utils.loguru_logger import initialize_logger
diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py
index 6e2b8d53..46588702 100644
--- a/swarms/structs/swarm_router.py
+++ b/swarms/structs/swarm_router.py
@@ -18,7 +18,7 @@ from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_matcher import swarm_matcher
-from swarms.structs.output_type import OutputType
+from swarms.structs.output_types import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.malt import MALT
from swarms.structs.deep_research_swarm import DeepResearchSwarm
diff --git a/tests/structs/test_airflow_swarm.py b/tests/structs/test_airflow_swarm.py
index 0463ddb7..0fdfeb20 100644
--- a/tests/structs/test_airflow_swarm.py
+++ b/tests/structs/test_airflow_swarm.py
@@ -3,7 +3,7 @@ import time
from loguru import logger
from swarms import Agent
-from swarms.structs.airflow_swarm import (
+from experimental.airflow_swarm import (
AirflowDAGSwarm,
NodeType,
Conversation,