commit
af44d86eaa
@ -0,0 +1,106 @@
|
||||
# LLM Council Examples
|
||||
|
||||
This page provides examples demonstrating the LLM Council pattern, inspired by Andrej Karpathy's llm-council implementation. The LLM Council uses multiple specialized AI agents that:
|
||||
|
||||
1. Each respond independently to queries
|
||||
2. Review and rank each other's anonymized responses
|
||||
3. Have a Chairman synthesize all responses into a final comprehensive answer
|
||||
|
||||
## Example Files
|
||||
|
||||
All LLM Council examples are located in the [`examples/multi_agent/llm_council_examples/`](https://github.com/kyegomez/swarms/tree/master/examples/multi_agent/llm_council_examples) directory.
|
||||
|
||||
### Marketing & Business
|
||||
|
||||
- **[marketing_strategy_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/marketing_strategy_council.py)** - Marketing strategy analysis and recommendations
|
||||
- **[business_strategy_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/business_strategy_council.py)** - Comprehensive business strategy development
|
||||
|
||||
### Finance & Investment
|
||||
|
||||
- **[finance_analysis_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/finance_analysis_council.py)** - Financial analysis and investment recommendations
|
||||
- **[etf_stock_analysis_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/etf_stock_analysis_council.py)** - ETF and stock analysis with portfolio recommendations
|
||||
|
||||
### Medical & Healthcare
|
||||
|
||||
- **[medical_treatment_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/medical_treatment_council.py)** - Medical treatment recommendations and care plans
|
||||
- **[medical_diagnosis_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/medical_diagnosis_council.py)** - Diagnostic analysis based on symptoms
|
||||
|
||||
### Technology & Research
|
||||
|
||||
- **[technology_assessment_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/technology_assessment_council.py)** - Technology evaluation and implementation strategy
|
||||
- **[research_analysis_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/research_analysis_council.py)** - Comprehensive research analysis on complex topics
|
||||
|
||||
### Legal
|
||||
|
||||
- **[legal_analysis_council.py](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/llm_council_examples/legal_analysis_council.py)** - Legal implications and compliance analysis
|
||||
|
||||
## Basic Usage Pattern
|
||||
|
||||
All examples follow the same pattern:
|
||||
|
||||
```python
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Run a query
|
||||
result = council.run("Your query here")
|
||||
|
||||
# Access results
|
||||
print(result["final_response"]) # Chairman's synthesized answer
|
||||
print(result["original_responses"]) # Individual member responses
|
||||
print(result["evaluations"]) # How members ranked each other
|
||||
```
|
||||
|
||||
## Running Examples
|
||||
|
||||
Run any example directly:
|
||||
|
||||
```bash
|
||||
python examples/multi_agent/llm_council_examples/marketing_strategy_council.py
|
||||
python examples/multi_agent/llm_council_examples/finance_analysis_council.py
|
||||
python examples/multi_agent/llm_council_examples/medical_diagnosis_council.py
|
||||
```
|
||||
|
||||
## Key Features
|
||||
|
||||
- **Multiple Perspectives**: Each council member (GPT-5.1, Gemini, Claude, Grok) provides unique insights
|
||||
- **Peer Review**: Members evaluate and rank each other's responses anonymously
|
||||
- **Synthesis**: Chairman combines the best elements from all responses
|
||||
- **Transparency**: See both individual responses and evaluation rankings
|
||||
|
||||
## Council Members
|
||||
|
||||
The default council consists of:
|
||||
- **GPT-5.1-Councilor**: Analytical and comprehensive
|
||||
- **Gemini-3-Pro-Councilor**: Concise and well-processed
|
||||
- **Claude-Sonnet-4.5-Councilor**: Thoughtful and balanced
|
||||
- **Grok-4-Councilor**: Creative and innovative
|
||||
|
||||
## Customization
|
||||
|
||||
You can create custom council members:
|
||||
|
||||
```python
|
||||
from swarms import Agent
|
||||
from swarms.structs.llm_council import LLMCouncil, get_gpt_councilor_prompt
|
||||
|
||||
custom_agent = Agent(
|
||||
agent_name="Custom-Councilor",
|
||||
system_prompt=get_gpt_councilor_prompt(),
|
||||
model_name="gpt-4.1",
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
council = LLMCouncil(
|
||||
council_members=[custom_agent, ...],
|
||||
chairman_model="gpt-5.1",
|
||||
verbose=True
|
||||
)
|
||||
```
|
||||
|
||||
## Documentation
|
||||
|
||||
For complete API reference and detailed documentation, see the [LLM Council Reference Documentation](../swarms/structs/llm_council.md).
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,534 @@
|
||||
# LLM Council Class Documentation
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
A[User Query] --> B[Council Members]
|
||||
|
||||
subgraph "Council Members"
|
||||
C1[GPT-5.1-Councilor]
|
||||
C2[Gemini-3-Pro-Councilor]
|
||||
C3[Claude-Sonnet-4.5-Councilor]
|
||||
C4[Grok-4-Councilor]
|
||||
end
|
||||
|
||||
B --> C1
|
||||
B --> C2
|
||||
B --> C3
|
||||
B --> C4
|
||||
|
||||
C1 --> D[Responses]
|
||||
C2 --> D
|
||||
C3 --> D
|
||||
C4 --> D
|
||||
|
||||
D --> E[Anonymize & Evaluate]
|
||||
E --> F[Chairman Synthesis]
|
||||
F --> G[Final Response]
|
||||
|
||||
```
|
||||
|
||||
The `LLMCouncil` class orchestrates multiple specialized LLM agents to collaboratively answer queries through a structured peer review and synthesis process. Inspired by Andrej Karpathy's llm-council implementation, this architecture demonstrates how different models evaluate and rank each other's work, often selecting responses from other models as superior to their own.
|
||||
|
||||
The class automatically tracks all agent messages in a `Conversation` object and formats output using `history_output_formatter`, providing flexible output formats including dictionaries, lists, strings, JSON, YAML, and more.
|
||||
|
||||
## Workflow Overview
|
||||
|
||||
The LLM Council follows a four-step process:
|
||||
|
||||
1. **Parallel Response Generation**: All council members independently respond to the user query
|
||||
2. **Anonymization**: Responses are anonymized with random IDs (A, B, C, D, etc.) to ensure objective evaluation
|
||||
3. **Peer Review**: Each member evaluates and ranks all responses (including potentially their own)
|
||||
4. **Synthesis**: The Chairman agent synthesizes all responses and evaluations into a final comprehensive answer
|
||||
|
||||
## Class Definition
|
||||
|
||||
### LLMCouncil
|
||||
|
||||
```python
|
||||
class LLMCouncil:
|
||||
```
|
||||
|
||||
### Attributes
|
||||
|
||||
| Attribute | Type | Description | Default |
|
||||
|-----------|------|-------------|---------|
|
||||
| `council_members` | `List[Agent]` | List of Agent instances representing council members | `None` (creates default council) |
|
||||
| `chairman` | `Agent` | The Chairman agent responsible for synthesizing responses | Created during initialization |
|
||||
| `conversation` | `Conversation` | Conversation object tracking all messages throughout the workflow | Created during initialization |
|
||||
| `output_type` | `HistoryOutputType` | Format for the output (e.g., "dict", "list", "string", "json", "yaml") | `"dict"` |
|
||||
| `verbose` | `bool` | Whether to print progress and intermediate results | `True` |
|
||||
|
||||
## Methods
|
||||
|
||||
### `__init__`
|
||||
|
||||
Initializes the LLM Council with council members and a Chairman agent.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `id` | `str` | `swarm_id()` | Unique identifier for the council instance. |
|
||||
| `name` | `str` | `"LLM Council"` | Name of the council instance. |
|
||||
| `description` | `str` | `"A collaborative council..."` | Description of the council's purpose. |
|
||||
| `council_members` | `Optional[List[Agent]]` | `None` | List of Agent instances representing council members. If `None`, creates default council with GPT-5.1, Gemini 3 Pro, Claude Sonnet 4.5, and Grok-4. |
|
||||
| `chairman_model` | `str` | `"gpt-5.1"` | Model name for the Chairman agent that synthesizes responses. |
|
||||
| `verbose` | `bool` | `True` | Whether to print progress and intermediate results. |
|
||||
| `output_type` | `HistoryOutputType` | `"dict"` | Format for the output. Options: "list", "dict", "string", "final", "json", "yaml", "xml", "dict-all-except-first", "str-all-except-first", "dict-final", "list-final". |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `LLMCouncil` | Initialized LLM Council instance. |
|
||||
|
||||
#### Description
|
||||
|
||||
Creates an LLM Council instance with specialized council members. If no members are provided, it creates a default council consisting of:
|
||||
|
||||
| Council Member | Description |
|
||||
|---------------------------------|------------------------------------------|
|
||||
| **GPT-5.1-Councilor** | Analytical and comprehensive responses |
|
||||
| **Gemini-3-Pro-Councilor** | Concise and well-processed responses |
|
||||
| **Claude-Sonnet-4.5-Councilor** | Thoughtful and balanced responses |
|
||||
| **Grok-4-Councilor** | Creative and innovative responses |
|
||||
|
||||
The Chairman agent is automatically created with a specialized prompt for synthesizing responses. A `Conversation` object is also initialized to track all messages throughout the workflow, including user queries, council member responses, evaluations, and the final synthesis.
|
||||
|
||||
#### Example Usage
|
||||
|
||||
```python
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create council with default members
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Create council with custom members and output format
|
||||
from swarms import Agent
|
||||
custom_members = [
|
||||
Agent(agent_name="Expert-1", model_name="gpt-4", max_loops=1),
|
||||
Agent(agent_name="Expert-2", model_name="claude-3-opus", max_loops=1),
|
||||
]
|
||||
council = LLMCouncil(
|
||||
council_members=custom_members,
|
||||
chairman_model="gpt-4",
|
||||
verbose=True,
|
||||
output_type="json" # Output as JSON string
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `run`
|
||||
|
||||
Executes the full LLM Council workflow: parallel responses, anonymization, peer review, and synthesis. All messages are tracked in the conversation object and formatted according to the `output_type` setting.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `query` | `str` | Required | The user's query to process through the council. |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `Union[List, Dict, str]` | Formatted output based on `output_type`. The output contains the conversation history with all messages tracked throughout the workflow. |
|
||||
|
||||
#### Output Format
|
||||
|
||||
The return value depends on the `output_type` parameter set during initialization:
|
||||
|
||||
| `output_type` value | Description |
|
||||
|---------------------------------|---------------------------------------------------------------------|
|
||||
| **`"dict"`** (default) | Returns conversation as a dictionary/list of message dictionaries |
|
||||
| **`"list"`** | Returns conversation as a list of formatted strings (`"role: content"`) |
|
||||
| **`"string"`** or **`"str"`** | Returns conversation as a formatted string |
|
||||
| **`"final"`** or **`"last"`** | Returns only the content of the final message (Chairman's response) |
|
||||
| **`"json"`** | Returns conversation as a JSON string |
|
||||
| **`"yaml"`** | Returns conversation as a YAML string |
|
||||
| **`"xml"`** | Returns conversation as an XML string |
|
||||
| **`"dict-all-except-first"`** | Returns all messages except the first as a dictionary |
|
||||
| **`"str-all-except-first"`** | Returns all messages except the first as a string |
|
||||
| **`"dict-final"`** | Returns the final message as a dictionary |
|
||||
| **`"list-final"`** | Returns the final message as a list |
|
||||
|
||||
#### Conversation Tracking
|
||||
|
||||
All messages are automatically tracked in the conversation object with the following roles:
|
||||
|
||||
- **`"User"`**: The original user query
|
||||
- **`"{member_name}"`**: Each council member's response (e.g., "GPT-5.1-Councilor")
|
||||
- **`"{member_name}-Evaluation"`**: Each council member's evaluation (e.g., "GPT-5.1-Councilor-Evaluation")
|
||||
- **`"Chairman"`**: The final synthesized response
|
||||
|
||||
#### Description
|
||||
|
||||
Executes the complete LLM Council workflow:
|
||||
|
||||
1. **User Query Tracking**: Adds the user query to the conversation as "User" role
|
||||
2. **Dispatch Phase**: Sends the query to all council members in parallel using `run_agents_concurrently`
|
||||
3. **Collection Phase**: Collects all responses, maps them to member names, and adds each to the conversation with the member's name as the role
|
||||
4. **Anonymization Phase**: Creates anonymous IDs (A, B, C, D, etc.) and shuffles them to ensure anonymity
|
||||
5. **Evaluation Phase**: Each member evaluates and ranks all anonymized responses using `batched_grid_agent_execution`, then adds evaluations to the conversation with "{member_name}-Evaluation" as the role
|
||||
6. **Synthesis Phase**: The Chairman agent synthesizes all responses and evaluations into a final comprehensive answer, which is added to the conversation as "Chairman" role
|
||||
7. **Output Formatting**: Returns the conversation formatted according to the `output_type` setting using `history_output_formatter`
|
||||
|
||||
The method provides verbose output by default, showing progress at each stage. All messages are tracked in the `conversation` attribute for later access or export.
|
||||
|
||||
#### Example Usage
|
||||
|
||||
```python
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create council with default output format (dict)
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
query = "What are the top five best energy stocks across nuclear, solar, gas, and other energy sources?"
|
||||
|
||||
# Run the council - returns formatted conversation based on output_type
|
||||
result = council.run(query)
|
||||
|
||||
# With default "dict" output_type, result is a list of message dictionaries
|
||||
# Access conversation messages
|
||||
for message in result:
|
||||
print(f"{message['role']}: {message['content'][:200]}...")
|
||||
|
||||
# Access the conversation object directly for more control
|
||||
conversation = council.conversation
|
||||
print("\nFinal message:", conversation.get_final_message_content())
|
||||
|
||||
# Get conversation as string
|
||||
print("\nFull conversation:")
|
||||
print(conversation.get_str())
|
||||
|
||||
# Example with different output types
|
||||
council_json = LLMCouncil(output_type="json", verbose=False)
|
||||
result_json = council_json.run(query) # Returns JSON string
|
||||
|
||||
council_final = LLMCouncil(output_type="final", verbose=False)
|
||||
result_final = council_final.run(query) # Returns only final response string
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `_create_default_council`
|
||||
|
||||
Creates default council members with specialized prompts and models.
|
||||
|
||||
#### Parameters
|
||||
|
||||
None (internal method).
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `List[Agent]` | List of Agent instances configured as council members. |
|
||||
|
||||
#### Description
|
||||
|
||||
Internal method that creates the default council configuration with four specialized agents:
|
||||
|
||||
- **GPT-5.1-Councilor** (`model_name="gpt-5.1"`): Analytical and comprehensive, temperature=0.7
|
||||
- **Gemini-3-Pro-Councilor** (`model_name="gemini-2.5-flash"`): Concise and structured, temperature=0.7
|
||||
- **Claude-Sonnet-4.5-Councilor** (`model_name="anthropic/claude-sonnet-4-5"`): Thoughtful and balanced, temperature=0.0
|
||||
- **Grok-4-Councilor** (`model_name="x-ai/grok-4"`): Creative and innovative, temperature=0.8
|
||||
|
||||
Each agent is configured with:
|
||||
|
||||
- Specialized system prompts matching their role
|
||||
- `max_loops=1` for single-response generation
|
||||
- `verbose=False` to reduce noise during parallel execution
|
||||
- Appropriate temperature settings for their style
|
||||
|
||||
---
|
||||
|
||||
## Helper Functions
|
||||
|
||||
### `get_gpt_councilor_prompt()`
|
||||
|
||||
Returns the system prompt for GPT-5.1 councilor agent.
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | System prompt string emphasizing analytical thinking and comprehensive coverage. |
|
||||
|
||||
---
|
||||
|
||||
### `get_gemini_councilor_prompt()`
|
||||
|
||||
Returns the system prompt for Gemini 3 Pro councilor agent.
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | System prompt string emphasizing concise, well-processed, and structured responses. |
|
||||
|
||||
---
|
||||
|
||||
### `get_claude_councilor_prompt()`
|
||||
|
||||
Returns the system prompt for Claude Sonnet 4.5 councilor agent.
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | System prompt string emphasizing thoughtful, balanced, and nuanced responses. |
|
||||
|
||||
---
|
||||
|
||||
### `get_grok_councilor_prompt()`
|
||||
|
||||
Returns the system prompt for Grok-4 councilor agent.
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | System prompt string emphasizing creative, innovative, and unique perspectives. |
|
||||
|
||||
---
|
||||
|
||||
### `get_chairman_prompt()`
|
||||
|
||||
Returns the system prompt for the Chairman agent.
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | System prompt string for synthesizing responses and evaluations into a final answer. |
|
||||
|
||||
---
|
||||
|
||||
### `get_evaluation_prompt(query, responses, evaluator_name)`
|
||||
|
||||
Creates evaluation prompt for council members to review and rank responses.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Parameter | Type | Description |
|
||||
|-----------|------|-------------|
|
||||
| `query` | `str` | The original user query. |
|
||||
| `responses` | `Dict[str, str]` | Dictionary mapping anonymous IDs to response texts. |
|
||||
| `evaluator_name` | `str` | Name of the agent doing the evaluation. |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | Formatted evaluation prompt string with instructions for ranking responses. |
|
||||
|
||||
---
|
||||
|
||||
### `get_synthesis_prompt(query, original_responses, evaluations, id_to_member)`
|
||||
|
||||
Creates synthesis prompt for the Chairman.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Parameter | Type | Description |
|
||||
|-----------|------|-------------|
|
||||
| `query` | `str` | Original user query. |
|
||||
| `original_responses` | `Dict[str, str]` | Dictionary mapping member names to their responses. |
|
||||
| `evaluations` | `Dict[str, str]` | Dictionary mapping evaluator names to their evaluation texts. |
|
||||
| `id_to_member` | `Dict[str, str]` | Mapping from anonymous IDs to member names. |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `str` | Formatted synthesis prompt for the Chairman agent. |
|
||||
|
||||
---
|
||||
|
||||
## Use Cases
|
||||
|
||||
The LLM Council is ideal for scenarios requiring:
|
||||
|
||||
- **Multi-perspective Analysis**: When you need diverse viewpoints on complex topics
|
||||
- **Quality Assurance**: When peer review and ranking can improve response quality
|
||||
- **Transparent Decision Making**: When you want to see how different models evaluate each other
|
||||
- **Synthesis of Expertise**: When combining multiple specialized perspectives is valuable
|
||||
|
||||
### Common Applications
|
||||
|
||||
| Use Case | Description |
|
||||
|-----------------------|--------------------------------------------------------------------------------------------------|
|
||||
| **Medical Diagnosis** | Multiple medical AI agents provide diagnoses, evaluate each other, and synthesize recommendations |
|
||||
| **Financial Analysis**| Different financial experts analyze investments and rank each other's assessments |
|
||||
| **Legal Analysis** | Multiple legal perspectives evaluate compliance and risk |
|
||||
| **Business Strategy** | Diverse strategic viewpoints are synthesized into comprehensive plans |
|
||||
| **Research Analysis** | Multiple research perspectives are combined for thorough analysis |
|
||||
|
||||
|
||||
## Examples
|
||||
|
||||
For comprehensive examples demonstrating various use cases, see the [LLM Council Examples](../../../examples/multi_agent/llm_council_examples/) directory:
|
||||
|
||||
- **Medical**: `medical_diagnosis_council.py`, `medical_treatment_council.py`
|
||||
- **Finance**: `finance_analysis_council.py`, `etf_stock_analysis_council.py`
|
||||
- **Business**: `business_strategy_council.py`, `marketing_strategy_council.py`
|
||||
- **Technology**: `technology_assessment_council.py`, `research_analysis_council.py`
|
||||
- **Legal**: `legal_analysis_council.py`
|
||||
|
||||
### Quick Start Example
|
||||
|
||||
```python
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council with default output format
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Example query
|
||||
query = "What are the top five best energy stocks across nuclear, solar, gas, and other energy sources?"
|
||||
|
||||
# Run the council - returns formatted conversation
|
||||
result = council.run(query)
|
||||
|
||||
# With default "dict" output_type, result is a list of message dictionaries
|
||||
# Print all messages
|
||||
for message in result:
|
||||
role = message['role']
|
||||
content = message['content']
|
||||
print(f"\n{role}:")
|
||||
print(content[:500] + "..." if len(content) > 500 else content)
|
||||
|
||||
# Access conversation object directly for more options
|
||||
conversation = council.conversation
|
||||
|
||||
# Get only the final response
|
||||
print("\n" + "="*80)
|
||||
print("FINAL RESPONSE")
|
||||
print("="*80)
|
||||
print(conversation.get_final_message_content())
|
||||
|
||||
# Get conversation as formatted string
|
||||
print("\n" + "="*80)
|
||||
print("FULL CONVERSATION")
|
||||
print("="*80)
|
||||
print(conversation.get_str())
|
||||
|
||||
# Export conversation to JSON
|
||||
conversation.export()
|
||||
```
|
||||
|
||||
## Customization
|
||||
|
||||
### Creating Custom Council Members
|
||||
|
||||
You can create custom council members with specialized roles:
|
||||
|
||||
```python
|
||||
from swarms import Agent
|
||||
from swarms.structs.llm_council import LLMCouncil, get_gpt_councilor_prompt
|
||||
|
||||
# Create custom councilor
|
||||
custom_agent = Agent(
|
||||
agent_name="Domain-Expert-Councilor",
|
||||
agent_description="Specialized domain expert for specific analysis",
|
||||
system_prompt=get_gpt_councilor_prompt(), # Or create custom prompt
|
||||
model_name="gpt-4",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
temperature=0.7,
|
||||
)
|
||||
|
||||
# Create council with custom members
|
||||
council = LLMCouncil(
|
||||
council_members=[custom_agent, ...], # Add your custom agents
|
||||
chairman_model="gpt-4",
|
||||
verbose=True
|
||||
)
|
||||
```
|
||||
|
||||
### Custom Chairman Model
|
||||
|
||||
You can specify a different model for the Chairman:
|
||||
|
||||
```python
|
||||
council = LLMCouncil(
|
||||
chairman_model="claude-3-opus", # Use Claude as Chairman
|
||||
verbose=True
|
||||
)
|
||||
```
|
||||
|
||||
### Custom Output Format
|
||||
|
||||
You can control the output format using the `output_type` parameter:
|
||||
|
||||
```python
|
||||
# Get output as JSON string
|
||||
council = LLMCouncil(output_type="json")
|
||||
result = council.run(query) # Returns JSON string
|
||||
|
||||
# Get only the final response
|
||||
council = LLMCouncil(output_type="final")
|
||||
result = council.run(query) # Returns only final response string
|
||||
|
||||
# Get as YAML
|
||||
council = LLMCouncil(output_type="yaml")
|
||||
result = council.run(query) # Returns YAML string
|
||||
|
||||
# Get as formatted string
|
||||
council = LLMCouncil(output_type="string")
|
||||
result = council.run(query) # Returns formatted conversation string
|
||||
```
|
||||
|
||||
### Accessing Conversation History
|
||||
|
||||
The conversation object is accessible for advanced usage:
|
||||
|
||||
```python
|
||||
council = LLMCouncil()
|
||||
council.run(query)
|
||||
|
||||
# Access conversation directly
|
||||
conversation = council.conversation
|
||||
|
||||
# Get conversation history
|
||||
history = conversation.conversation_history
|
||||
|
||||
# Export to file
|
||||
conversation.export() # Saves to default location
|
||||
|
||||
# Get specific format
|
||||
json_output = conversation.to_json()
|
||||
yaml_output = conversation.return_messages_as_dictionary()
|
||||
```
|
||||
|
||||
## Architecture Benefits
|
||||
|
||||
1. **Diversity**: Multiple models provide varied perspectives and approaches
|
||||
2. **Quality Control**: Peer review ensures responses are evaluated objectively
|
||||
3. **Synthesis**: Chairman combines the best elements from all responses
|
||||
4. **Transparency**: Full visibility into individual responses and evaluation rankings
|
||||
5. **Scalability**: Easy to add or remove council members
|
||||
6. **Flexibility**: Supports custom agents and models
|
||||
7. **Conversation Tracking**: All messages are automatically tracked in a Conversation object for history and export
|
||||
8. **Flexible Output**: Multiple output formats supported via `history_output_formatter` (dict, list, string, JSON, YAML, XML, etc.)
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
| Feature | Description |
|
||||
|---------------------------|----------------------------------------------------------------------------------------------------------------|
|
||||
| **Parallel Execution** | Both response generation and evaluation phases run in parallel for efficiency |
|
||||
| **Anonymization** | Responses are anonymized to prevent bias in evaluation |
|
||||
| **Model Selection** | Different models can be used for different roles based on their strengths |
|
||||
| **Verbose Mode** | Can be disabled for production use to reduce output |
|
||||
| **Conversation Management** | Conversation object efficiently tracks all messages in memory and supports export to JSON/YAML files |
|
||||
| **Output Formatting** | Choose lightweight output formats (e.g., "final") for production to reduce memory usage |
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Multi-Agent Architectures Overview](overview.md)
|
||||
- [Council of Judges](council_of_judges.md) - Similar peer review pattern
|
||||
- [Agent Class Reference](agent.md) - Understanding individual agents
|
||||
- [Conversation Class Reference](conversation.md) - Understanding conversation tracking and management
|
||||
- [Multi-Agent Execution Utilities](various_execution_methods.md) - Underlying execution methods
|
||||
- [History Output Formatter](../../../swarms/utils/history_output_formatter.py) - Output formatting utilities
|
||||
@ -0,0 +1,46 @@
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
research_agent = Agent(
|
||||
agent_name="Research-Analyst",
|
||||
agent_description="Specialized in comprehensive research and data gathering",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
analysis_agent = Agent(
|
||||
agent_name="Data-Analyst",
|
||||
agent_description="Expert in data analysis and pattern recognition",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
strategy_agent = Agent(
|
||||
agent_name="Strategy-Consultant",
|
||||
agent_description="Specialized in strategic planning and recommendations",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Rustworkx-Basic-Workflow",
|
||||
description="Basic workflow using rustworkx backend for faster graph operations",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow.add_node(research_agent)
|
||||
workflow.add_node(analysis_agent)
|
||||
workflow.add_node(strategy_agent)
|
||||
|
||||
workflow.add_edge(research_agent, analysis_agent)
|
||||
workflow.add_edge(analysis_agent, strategy_agent)
|
||||
|
||||
task = "Conduct a research analysis on water stocks and ETFs"
|
||||
results = workflow.run(task=task)
|
||||
|
||||
for agent_name, output in results.items():
|
||||
print(f"{agent_name}: {output}")
|
||||
@ -0,0 +1,56 @@
|
||||
import time
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
agents = [
|
||||
Agent(
|
||||
agent_name=f"Agent-{i}",
|
||||
agent_description=f"Agent number {i}",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
for i in range(5)
|
||||
]
|
||||
|
||||
nx_workflow = GraphWorkflow(
|
||||
name="NetworkX-Workflow",
|
||||
backend="networkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
for agent in agents:
|
||||
nx_workflow.add_node(agent)
|
||||
|
||||
for i in range(len(agents) - 1):
|
||||
nx_workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
nx_start = time.time()
|
||||
nx_workflow.compile()
|
||||
nx_compile_time = time.time() - nx_start
|
||||
|
||||
rx_workflow = GraphWorkflow(
|
||||
name="Rustworkx-Workflow",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
for agent in agents:
|
||||
rx_workflow.add_node(agent)
|
||||
|
||||
for i in range(len(agents) - 1):
|
||||
rx_workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
rx_start = time.time()
|
||||
rx_workflow.compile()
|
||||
rx_compile_time = time.time() - rx_start
|
||||
|
||||
speedup = (
|
||||
nx_compile_time / rx_compile_time if rx_compile_time > 0 else 0
|
||||
)
|
||||
print(f"NetworkX compile time: {nx_compile_time:.4f}s")
|
||||
print(f"Rustworkx compile time: {rx_compile_time:.4f}s")
|
||||
print(f"Speedup: {speedup:.2f}x")
|
||||
print(
|
||||
f"Identical layers: {nx_workflow._sorted_layers == rx_workflow._sorted_layers}"
|
||||
)
|
||||
@ -0,0 +1,73 @@
|
||||
from swarms import Agent, GraphWorkflow
|
||||
|
||||
coordinator = Agent(
|
||||
agent_name="Coordinator",
|
||||
agent_description="Coordinates and distributes tasks",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
tech_analyst = Agent(
|
||||
agent_name="Tech-Analyst",
|
||||
agent_description="Technical analysis specialist",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
fundamental_analyst = Agent(
|
||||
agent_name="Fundamental-Analyst",
|
||||
agent_description="Fundamental analysis specialist",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
sentiment_analyst = Agent(
|
||||
agent_name="Sentiment-Analyst",
|
||||
agent_description="Sentiment analysis specialist",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
synthesis_agent = Agent(
|
||||
agent_name="Synthesis-Agent",
|
||||
agent_description="Synthesizes multiple analyses into final report",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Fan-Out-Fan-In-Workflow",
|
||||
description="Demonstrates parallel processing patterns with rustworkx",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow.add_node(coordinator)
|
||||
workflow.add_node(tech_analyst)
|
||||
workflow.add_node(fundamental_analyst)
|
||||
workflow.add_node(sentiment_analyst)
|
||||
workflow.add_node(synthesis_agent)
|
||||
|
||||
workflow.add_edges_from_source(
|
||||
coordinator,
|
||||
[tech_analyst, fundamental_analyst, sentiment_analyst],
|
||||
)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[tech_analyst, fundamental_analyst, sentiment_analyst],
|
||||
synthesis_agent,
|
||||
)
|
||||
|
||||
task = "Analyze Tesla stock from technical, fundamental, and sentiment perspectives"
|
||||
results = workflow.run(task=task)
|
||||
|
||||
for agent_name, output in results.items():
|
||||
print(f"{agent_name}: {output}")
|
||||
|
||||
|
||||
workflow.visualize(view=True)
|
||||
@ -0,0 +1,101 @@
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
data_collector_1 = Agent(
|
||||
agent_name="Data-Collector-1",
|
||||
agent_description="Collects market data",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
data_collector_2 = Agent(
|
||||
agent_name="Data-Collector-2",
|
||||
agent_description="Collects financial data",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
technical_analyst = Agent(
|
||||
agent_name="Technical-Analyst",
|
||||
agent_description="Performs technical analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
fundamental_analyst = Agent(
|
||||
agent_name="Fundamental-Analyst",
|
||||
agent_description="Performs fundamental analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
risk_analyst = Agent(
|
||||
agent_name="Risk-Analyst",
|
||||
agent_description="Performs risk analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
strategy_consultant = Agent(
|
||||
agent_name="Strategy-Consultant",
|
||||
agent_description="Develops strategic recommendations",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
report_writer = Agent(
|
||||
agent_name="Report-Writer",
|
||||
agent_description="Writes comprehensive reports",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Complex-Multi-Layer-Workflow",
|
||||
description="Complex workflow with multiple layers and parallel processing",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
all_agents = [
|
||||
data_collector_1,
|
||||
data_collector_2,
|
||||
technical_analyst,
|
||||
fundamental_analyst,
|
||||
risk_analyst,
|
||||
strategy_consultant,
|
||||
report_writer,
|
||||
]
|
||||
|
||||
for agent in all_agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
workflow.add_parallel_chain(
|
||||
[data_collector_1, data_collector_2],
|
||||
[technical_analyst, fundamental_analyst, risk_analyst],
|
||||
)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[technical_analyst, fundamental_analyst, risk_analyst],
|
||||
strategy_consultant,
|
||||
)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[technical_analyst, fundamental_analyst, risk_analyst],
|
||||
report_writer,
|
||||
)
|
||||
|
||||
workflow.add_edge(strategy_consultant, report_writer)
|
||||
|
||||
task = "Conduct a comprehensive analysis of the renewable energy sector including market trends, financial health, and risk assessment"
|
||||
results = workflow.run(task=task)
|
||||
|
||||
for agent_name, output in results.items():
|
||||
print(f"{agent_name}: {output}")
|
||||
@ -0,0 +1,104 @@
|
||||
import time
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
agents_small = [
|
||||
Agent(
|
||||
agent_name=f"Agent-{i}",
|
||||
agent_description=f"Agent number {i}",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
for i in range(5)
|
||||
]
|
||||
|
||||
agents_medium = [
|
||||
Agent(
|
||||
agent_name=f"Agent-{i}",
|
||||
agent_description=f"Agent number {i}",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
for i in range(20)
|
||||
]
|
||||
|
||||
nx_workflow_small = GraphWorkflow(
|
||||
name="NetworkX-Small",
|
||||
backend="networkx",
|
||||
verbose=False,
|
||||
auto_compile=False,
|
||||
)
|
||||
|
||||
for agent in agents_small:
|
||||
nx_workflow_small.add_node(agent)
|
||||
|
||||
for i in range(len(agents_small) - 1):
|
||||
nx_workflow_small.add_edge(agents_small[i], agents_small[i + 1])
|
||||
|
||||
nx_start = time.time()
|
||||
nx_workflow_small.compile()
|
||||
nx_small_time = time.time() - nx_start
|
||||
|
||||
rx_workflow_small = GraphWorkflow(
|
||||
name="Rustworkx-Small",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
auto_compile=False,
|
||||
)
|
||||
|
||||
for agent in agents_small:
|
||||
rx_workflow_small.add_node(agent)
|
||||
|
||||
for i in range(len(agents_small) - 1):
|
||||
rx_workflow_small.add_edge(agents_small[i], agents_small[i + 1])
|
||||
|
||||
rx_start = time.time()
|
||||
rx_workflow_small.compile()
|
||||
rx_small_time = time.time() - rx_start
|
||||
|
||||
nx_workflow_medium = GraphWorkflow(
|
||||
name="NetworkX-Medium",
|
||||
backend="networkx",
|
||||
verbose=False,
|
||||
auto_compile=False,
|
||||
)
|
||||
|
||||
for agent in agents_medium:
|
||||
nx_workflow_medium.add_node(agent)
|
||||
|
||||
for i in range(len(agents_medium) - 1):
|
||||
nx_workflow_medium.add_edge(
|
||||
agents_medium[i], agents_medium[i + 1]
|
||||
)
|
||||
|
||||
nx_start = time.time()
|
||||
nx_workflow_medium.compile()
|
||||
nx_medium_time = time.time() - nx_start
|
||||
|
||||
rx_workflow_medium = GraphWorkflow(
|
||||
name="Rustworkx-Medium",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
auto_compile=False,
|
||||
)
|
||||
|
||||
for agent in agents_medium:
|
||||
rx_workflow_medium.add_node(agent)
|
||||
|
||||
for i in range(len(agents_medium) - 1):
|
||||
rx_workflow_medium.add_edge(
|
||||
agents_medium[i], agents_medium[i + 1]
|
||||
)
|
||||
|
||||
rx_start = time.time()
|
||||
rx_workflow_medium.compile()
|
||||
rx_medium_time = time.time() - rx_start
|
||||
|
||||
print(
|
||||
f"Small (5 agents) - NetworkX: {nx_small_time:.4f}s, Rustworkx: {rx_small_time:.4f}s, Speedup: {nx_small_time/rx_small_time if rx_small_time > 0 else 0:.2f}x"
|
||||
)
|
||||
print(
|
||||
f"Medium (20 agents) - NetworkX: {nx_medium_time:.4f}s, Rustworkx: {rx_medium_time:.4f}s, Speedup: {nx_medium_time/rx_medium_time if rx_medium_time > 0 else 0:.2f}x"
|
||||
)
|
||||
@ -0,0 +1,55 @@
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
test_agent = Agent(
|
||||
agent_name="Test-Agent",
|
||||
agent_description="Test agent for error handling",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow_rx = GraphWorkflow(
|
||||
name="Rustworkx-Workflow",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
workflow_rx.add_node(test_agent)
|
||||
|
||||
workflow_nx = GraphWorkflow(
|
||||
name="NetworkX-Workflow",
|
||||
backend="networkx",
|
||||
verbose=False,
|
||||
)
|
||||
workflow_nx.add_node(test_agent)
|
||||
|
||||
workflow_default = GraphWorkflow(
|
||||
name="Default-Workflow",
|
||||
verbose=False,
|
||||
)
|
||||
workflow_default.add_node(test_agent)
|
||||
|
||||
workflow_invalid = GraphWorkflow(
|
||||
name="Invalid-Workflow",
|
||||
backend="invalid_backend",
|
||||
verbose=False,
|
||||
)
|
||||
workflow_invalid.add_node(test_agent)
|
||||
|
||||
print(
|
||||
f"Rustworkx backend: {type(workflow_rx.graph_backend).__name__}"
|
||||
)
|
||||
print(f"NetworkX backend: {type(workflow_nx.graph_backend).__name__}")
|
||||
print(
|
||||
f"Default backend: {type(workflow_default.graph_backend).__name__}"
|
||||
)
|
||||
print(
|
||||
f"Invalid backend fallback: {type(workflow_invalid.graph_backend).__name__}"
|
||||
)
|
||||
|
||||
try:
|
||||
import rustworkx as rx
|
||||
|
||||
print("Rustworkx available: True")
|
||||
except ImportError:
|
||||
print("Rustworkx available: False")
|
||||
@ -0,0 +1,61 @@
|
||||
import time
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
NUM_AGENTS = 30
|
||||
|
||||
agents = [
|
||||
Agent(
|
||||
agent_name=f"Agent-{i:02d}",
|
||||
agent_description=f"Agent number {i} in large-scale workflow",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
for i in range(NUM_AGENTS)
|
||||
]
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Large-Scale-Workflow",
|
||||
description=f"Large-scale workflow with {NUM_AGENTS} agents using rustworkx",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
add_nodes_time = time.time() - start_time
|
||||
|
||||
start_time = time.time()
|
||||
for i in range(9):
|
||||
workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
workflow.add_edges_from_source(
|
||||
agents[5],
|
||||
agents[10:20],
|
||||
)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
agents[10:20],
|
||||
agents[20],
|
||||
)
|
||||
|
||||
for i in range(20, 29):
|
||||
workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
add_edges_time = time.time() - start_time
|
||||
|
||||
start_time = time.time()
|
||||
workflow.compile()
|
||||
compile_time = time.time() - start_time
|
||||
|
||||
print(
|
||||
f"Agents: {len(workflow.nodes)}, Edges: {len(workflow.edges)}, Layers: {len(workflow._sorted_layers)}"
|
||||
)
|
||||
print(
|
||||
f"Node addition: {add_nodes_time:.4f}s, Edge addition: {add_edges_time:.4f}s, Compilation: {compile_time:.4f}s"
|
||||
)
|
||||
print(
|
||||
f"Total setup: {add_nodes_time + add_edges_time + compile_time:.4f}s"
|
||||
)
|
||||
@ -0,0 +1,73 @@
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
data_collector_1 = Agent(
|
||||
agent_name="Data-Collector-1",
|
||||
agent_description="Collects market data",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
data_collector_2 = Agent(
|
||||
agent_name="Data-Collector-2",
|
||||
agent_description="Collects financial data",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
data_collector_3 = Agent(
|
||||
agent_name="Data-Collector-3",
|
||||
agent_description="Collects news data",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
technical_analyst = Agent(
|
||||
agent_name="Technical-Analyst",
|
||||
agent_description="Performs technical analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
fundamental_analyst = Agent(
|
||||
agent_name="Fundamental-Analyst",
|
||||
agent_description="Performs fundamental analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
sentiment_analyst = Agent(
|
||||
agent_name="Sentiment-Analyst",
|
||||
agent_description="Performs sentiment analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Parallel-Chain-Workflow",
|
||||
description="Demonstrates parallel chain pattern with rustworkx",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
sources = [data_collector_1, data_collector_2, data_collector_3]
|
||||
targets = [technical_analyst, fundamental_analyst, sentiment_analyst]
|
||||
|
||||
for agent in sources + targets:
|
||||
workflow.add_node(agent)
|
||||
|
||||
workflow.add_parallel_chain(sources, targets)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
task = "Analyze the technology sector using multiple data sources and analysis methods"
|
||||
results = workflow.run(task=task)
|
||||
|
||||
for agent_name, output in results.items():
|
||||
print(f"{agent_name}: {output}")
|
||||
@ -0,0 +1,79 @@
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
agent_a = Agent(
|
||||
agent_name="Agent-A",
|
||||
agent_description="Agent A",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent_b = Agent(
|
||||
agent_name="Agent-B",
|
||||
agent_description="Agent B",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent_c = Agent(
|
||||
agent_name="Agent-C",
|
||||
agent_description="Agent C",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent_isolated = Agent(
|
||||
agent_name="Agent-Isolated",
|
||||
agent_description="Isolated agent with no connections",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Validation-Workflow",
|
||||
description="Workflow for validation testing",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow.add_node(agent_a)
|
||||
workflow.add_node(agent_b)
|
||||
workflow.add_node(agent_c)
|
||||
workflow.add_node(agent_isolated)
|
||||
|
||||
workflow.add_edge(agent_a, agent_b)
|
||||
workflow.add_edge(agent_b, agent_c)
|
||||
|
||||
validation_result = workflow.validate(auto_fix=False)
|
||||
print(f"Valid: {validation_result['is_valid']}")
|
||||
print(f"Warnings: {len(validation_result['warnings'])}")
|
||||
print(f"Errors: {len(validation_result['errors'])}")
|
||||
|
||||
validation_result_fixed = workflow.validate(auto_fix=True)
|
||||
print(
|
||||
f"After auto-fix - Valid: {validation_result_fixed['is_valid']}"
|
||||
)
|
||||
print(f"Fixed: {len(validation_result_fixed['fixed'])}")
|
||||
print(f"Entry points: {workflow.entry_points}")
|
||||
print(f"End points: {workflow.end_points}")
|
||||
|
||||
workflow_cycle = GraphWorkflow(
|
||||
name="Cycle-Test-Workflow",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow_cycle.add_node(agent_a)
|
||||
workflow_cycle.add_node(agent_b)
|
||||
workflow_cycle.add_node(agent_c)
|
||||
|
||||
workflow_cycle.add_edge(agent_a, agent_b)
|
||||
workflow_cycle.add_edge(agent_b, agent_c)
|
||||
workflow_cycle.add_edge(agent_c, agent_a)
|
||||
|
||||
cycle_validation = workflow_cycle.validate(auto_fix=False)
|
||||
print(f"Cycles detected: {len(cycle_validation.get('cycles', []))}")
|
||||
@ -0,0 +1,122 @@
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
market_researcher = Agent(
|
||||
agent_name="Market-Researcher",
|
||||
agent_description="Conducts comprehensive market research and data collection",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
competitor_analyst = Agent(
|
||||
agent_name="Competitor-Analyst",
|
||||
agent_description="Analyzes competitor landscape and positioning",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
market_analyst = Agent(
|
||||
agent_name="Market-Analyst",
|
||||
agent_description="Analyzes market trends and opportunities",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
financial_analyst = Agent(
|
||||
agent_name="Financial-Analyst",
|
||||
agent_description="Analyzes financial metrics and projections",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
risk_analyst = Agent(
|
||||
agent_name="Risk-Analyst",
|
||||
agent_description="Assesses market risks and challenges",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
strategy_consultant = Agent(
|
||||
agent_name="Strategy-Consultant",
|
||||
agent_description="Develops strategic recommendations based on all analyses",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
report_writer = Agent(
|
||||
agent_name="Report-Writer",
|
||||
agent_description="Compiles comprehensive market research report",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
executive_summary_writer = Agent(
|
||||
agent_name="Executive-Summary-Writer",
|
||||
agent_description="Creates executive summary for leadership",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Market-Research-Workflow",
|
||||
description="Real-world market research workflow using rustworkx backend",
|
||||
backend="rustworkx",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
all_agents = [
|
||||
market_researcher,
|
||||
competitor_analyst,
|
||||
market_analyst,
|
||||
financial_analyst,
|
||||
risk_analyst,
|
||||
strategy_consultant,
|
||||
report_writer,
|
||||
executive_summary_writer,
|
||||
]
|
||||
|
||||
for agent in all_agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
workflow.add_parallel_chain(
|
||||
[market_researcher, competitor_analyst],
|
||||
[market_analyst, financial_analyst, risk_analyst],
|
||||
)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[market_analyst, financial_analyst, risk_analyst],
|
||||
strategy_consultant,
|
||||
)
|
||||
|
||||
workflow.add_edges_from_source(
|
||||
strategy_consultant,
|
||||
[report_writer, executive_summary_writer],
|
||||
)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[market_analyst, financial_analyst, risk_analyst],
|
||||
report_writer,
|
||||
)
|
||||
|
||||
task = """
|
||||
Conduct a comprehensive market research analysis on the electric vehicle (EV) industry:
|
||||
1. Research current market size, growth trends, and key players
|
||||
2. Analyze competitor landscape and market positioning
|
||||
3. Assess financial opportunities and investment potential
|
||||
4. Evaluate risks and challenges in the EV market
|
||||
5. Develop strategic recommendations
|
||||
6. Create detailed report and executive summary
|
||||
"""
|
||||
|
||||
results = workflow.run(task=task)
|
||||
|
||||
for agent_name, output in results.items():
|
||||
print(f"{agent_name}: {output}")
|
||||
|
After Width: | Height: | Size: 28 KiB |
@ -0,0 +1,156 @@
|
||||
# Rustworkx Backend Examples
|
||||
|
||||
This directory contains comprehensive examples demonstrating the use of the **rustworkx backend** in GraphWorkflow. Rustworkx provides faster graph operations compared to NetworkX, especially for large graphs and complex operations.
|
||||
|
||||
## Installation
|
||||
|
||||
Before running these examples, ensure rustworkx is installed:
|
||||
|
||||
```bash
|
||||
pip install rustworkx
|
||||
```
|
||||
|
||||
If rustworkx is not installed, GraphWorkflow will automatically fallback to NetworkX backend.
|
||||
|
||||
## Examples Overview
|
||||
|
||||
### 01_basic_usage.py
|
||||
Basic example showing how to use rustworkx backend with GraphWorkflow. Demonstrates simple linear workflow creation and execution.
|
||||
|
||||
**Key Concepts:**
|
||||
- Initializing GraphWorkflow with rustworkx backend
|
||||
- Adding agents and creating edges
|
||||
- Running a workflow
|
||||
|
||||
### 02_backend_comparison.py
|
||||
Compares NetworkX and Rustworkx backends side-by-side, showing performance differences and functional equivalence.
|
||||
|
||||
**Key Concepts:**
|
||||
- Backend comparison
|
||||
- Performance metrics
|
||||
- Functional equivalence verification
|
||||
|
||||
### 03_fan_out_fan_in_patterns.py
|
||||
Demonstrates parallel processing patterns: fan-out (one-to-many) and fan-in (many-to-one) connections.
|
||||
|
||||
**Key Concepts:**
|
||||
- Fan-out pattern: `add_edges_from_source()`
|
||||
- Fan-in pattern: `add_edges_to_target()`
|
||||
- Parallel execution optimization
|
||||
|
||||
### 04_complex_workflow.py
|
||||
Shows a complex multi-layer workflow with multiple parallel branches and convergence points.
|
||||
|
||||
**Key Concepts:**
|
||||
- Multi-layer workflows
|
||||
- Parallel chains: `add_parallel_chain()`
|
||||
- Complex graph structures
|
||||
|
||||
### 05_performance_benchmark.py
|
||||
Benchmarks performance differences between NetworkX and Rustworkx for various graph sizes and structures.
|
||||
|
||||
**Key Concepts:**
|
||||
- Performance benchmarking
|
||||
- Scalability testing
|
||||
- Different graph topologies (chain, tree)
|
||||
|
||||
### 06_error_handling.py
|
||||
Demonstrates error handling and graceful fallback behavior when rustworkx is unavailable.
|
||||
|
||||
**Key Concepts:**
|
||||
- Error handling
|
||||
- Automatic fallback to NetworkX
|
||||
- Backend availability checking
|
||||
|
||||
### 07_large_scale_workflow.py
|
||||
Demonstrates rustworkx's efficiency with large-scale workflows containing many agents.
|
||||
|
||||
**Key Concepts:**
|
||||
- Large-scale workflows
|
||||
- Performance with many nodes/edges
|
||||
- Complex interconnections
|
||||
|
||||
### 08_parallel_chain_example.py
|
||||
Detailed example of the parallel chain pattern creating a full mesh connection.
|
||||
|
||||
**Key Concepts:**
|
||||
- Parallel chain pattern
|
||||
- Full mesh connections
|
||||
- Maximum parallelization
|
||||
|
||||
### 09_workflow_validation.py
|
||||
Shows workflow validation features including cycle detection, isolated nodes, and auto-fixing.
|
||||
|
||||
**Key Concepts:**
|
||||
- Workflow validation
|
||||
- Cycle detection
|
||||
- Auto-fixing capabilities
|
||||
|
||||
### 10_real_world_scenario.py
|
||||
A realistic market research workflow demonstrating real-world agent coordination scenarios.
|
||||
|
||||
**Key Concepts:**
|
||||
- Real-world use case
|
||||
- Complex multi-phase workflow
|
||||
- Practical application
|
||||
|
||||
## Quick Start
|
||||
|
||||
Run any example:
|
||||
|
||||
```bash
|
||||
python 01_basic_usage.py
|
||||
```
|
||||
|
||||
## Backend Selection
|
||||
|
||||
To use rustworkx backend:
|
||||
|
||||
```python
|
||||
workflow = GraphWorkflow(
|
||||
backend="rustworkx", # Use rustworkx
|
||||
# ... other parameters
|
||||
)
|
||||
```
|
||||
|
||||
To use NetworkX backend (default):
|
||||
|
||||
```python
|
||||
workflow = GraphWorkflow(
|
||||
backend="networkx", # Or omit for default
|
||||
# ... other parameters
|
||||
)
|
||||
```
|
||||
|
||||
## Performance Benefits
|
||||
|
||||
Rustworkx provides performance benefits especially for:
|
||||
- **Large graphs** (100+ nodes)
|
||||
- **Complex operations** (topological sorting, cycle detection)
|
||||
- **Frequent graph modifications** (adding/removing nodes/edges)
|
||||
|
||||
## Key Differences
|
||||
|
||||
While both backends are functionally equivalent, rustworkx:
|
||||
- Uses integer indices internally (abstracted away)
|
||||
- Provides faster graph operations
|
||||
- Better memory efficiency for large graphs
|
||||
- Maintains full compatibility with GraphWorkflow API
|
||||
|
||||
## Notes
|
||||
|
||||
- Both backends produce identical results
|
||||
- Rustworkx automatically falls back to NetworkX if not installed
|
||||
- All GraphWorkflow features work with both backends
|
||||
- Performance gains become more significant with larger graphs
|
||||
|
||||
## Requirements
|
||||
|
||||
- `swarms` package
|
||||
- `rustworkx` (optional, for rustworkx backend)
|
||||
- `networkx` (always available, default backend)
|
||||
|
||||
## Contributing
|
||||
|
||||
Feel free to add more examples demonstrating rustworkx capabilities or specific use cases!
|
||||
|
||||
@ -0,0 +1,632 @@
|
||||
import pytest
|
||||
from swarms.structs.graph_workflow import (
|
||||
GraphWorkflow,
|
||||
)
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
try:
|
||||
import rustworkx as rx
|
||||
|
||||
RUSTWORKX_AVAILABLE = True
|
||||
except ImportError:
|
||||
RUSTWORKX_AVAILABLE = False
|
||||
|
||||
|
||||
def create_test_agent(name: str, description: str = None) -> Agent:
|
||||
"""Create a test agent"""
|
||||
if description is None:
|
||||
description = f"Test agent for {name} operations"
|
||||
|
||||
return Agent(
|
||||
agent_name=name,
|
||||
agent_description=description,
|
||||
model_name="gpt-4o-mini",
|
||||
verbose=False,
|
||||
print_on=False,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not RUSTWORKX_AVAILABLE, reason="rustworkx not available"
|
||||
)
|
||||
class TestRustworkxBackend:
|
||||
"""Test suite for rustworkx backend"""
|
||||
|
||||
def test_rustworkx_backend_initialization(self):
|
||||
"""Test that rustworkx backend is properly initialized"""
|
||||
workflow = GraphWorkflow(name="Test", backend="rustworkx")
|
||||
assert (
|
||||
workflow.graph_backend.__class__.__name__
|
||||
== "RustworkxBackend"
|
||||
)
|
||||
assert hasattr(workflow.graph_backend, "_node_id_to_index")
|
||||
assert hasattr(workflow.graph_backend, "_index_to_node_id")
|
||||
assert hasattr(workflow.graph_backend, "graph")
|
||||
|
||||
def test_rustworkx_node_addition(self):
|
||||
"""Test adding nodes to rustworkx backend"""
|
||||
workflow = GraphWorkflow(name="Test", backend="rustworkx")
|
||||
agent = create_test_agent("TestAgent", "Test agent")
|
||||
|
||||
workflow.add_node(agent)
|
||||
|
||||
assert "TestAgent" in workflow.nodes
|
||||
assert "TestAgent" in workflow.graph_backend._node_id_to_index
|
||||
assert (
|
||||
workflow.graph_backend._node_id_to_index["TestAgent"]
|
||||
in workflow.graph_backend._index_to_node_id
|
||||
)
|
||||
|
||||
def test_rustworkx_edge_addition(self):
|
||||
"""Test adding edges to rustworkx backend"""
|
||||
workflow = GraphWorkflow(name="Test", backend="rustworkx")
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
assert len(workflow.edges) == 1
|
||||
assert workflow.edges[0].source == "Agent1"
|
||||
assert workflow.edges[0].target == "Agent2"
|
||||
|
||||
def test_rustworkx_topological_generations_linear(self):
|
||||
"""Test topological generations with linear chain"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Linear-Test", backend="rustworkx"
|
||||
)
|
||||
agents = [
|
||||
create_test_agent(f"Agent{i}", f"Agent {i}")
|
||||
for i in range(5)
|
||||
]
|
||||
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
for i in range(len(agents) - 1):
|
||||
workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 5
|
||||
assert workflow._sorted_layers[0] == ["Agent0"]
|
||||
assert workflow._sorted_layers[1] == ["Agent1"]
|
||||
assert workflow._sorted_layers[2] == ["Agent2"]
|
||||
assert workflow._sorted_layers[3] == ["Agent3"]
|
||||
assert workflow._sorted_layers[4] == ["Agent4"]
|
||||
|
||||
def test_rustworkx_topological_generations_fan_out(self):
|
||||
"""Test topological generations with fan-out pattern"""
|
||||
workflow = GraphWorkflow(
|
||||
name="FanOut-Test", backend="rustworkx"
|
||||
)
|
||||
coordinator = create_test_agent("Coordinator", "Coordinates")
|
||||
analyst1 = create_test_agent("Analyst1", "First analyst")
|
||||
analyst2 = create_test_agent("Analyst2", "Second analyst")
|
||||
analyst3 = create_test_agent("Analyst3", "Third analyst")
|
||||
|
||||
workflow.add_node(coordinator)
|
||||
workflow.add_node(analyst1)
|
||||
workflow.add_node(analyst2)
|
||||
workflow.add_node(analyst3)
|
||||
|
||||
workflow.add_edges_from_source(
|
||||
coordinator, [analyst1, analyst2, analyst3]
|
||||
)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 2
|
||||
assert len(workflow._sorted_layers[0]) == 1
|
||||
assert "Coordinator" in workflow._sorted_layers[0]
|
||||
assert len(workflow._sorted_layers[1]) == 3
|
||||
assert "Analyst1" in workflow._sorted_layers[1]
|
||||
assert "Analyst2" in workflow._sorted_layers[1]
|
||||
assert "Analyst3" in workflow._sorted_layers[1]
|
||||
|
||||
def test_rustworkx_topological_generations_fan_in(self):
|
||||
"""Test topological generations with fan-in pattern"""
|
||||
workflow = GraphWorkflow(
|
||||
name="FanIn-Test", backend="rustworkx"
|
||||
)
|
||||
analyst1 = create_test_agent("Analyst1", "First analyst")
|
||||
analyst2 = create_test_agent("Analyst2", "Second analyst")
|
||||
analyst3 = create_test_agent("Analyst3", "Third analyst")
|
||||
synthesizer = create_test_agent("Synthesizer", "Synthesizes")
|
||||
|
||||
workflow.add_node(analyst1)
|
||||
workflow.add_node(analyst2)
|
||||
workflow.add_node(analyst3)
|
||||
workflow.add_node(synthesizer)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[analyst1, analyst2, analyst3], synthesizer
|
||||
)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 2
|
||||
assert len(workflow._sorted_layers[0]) == 3
|
||||
assert "Analyst1" in workflow._sorted_layers[0]
|
||||
assert "Analyst2" in workflow._sorted_layers[0]
|
||||
assert "Analyst3" in workflow._sorted_layers[0]
|
||||
assert len(workflow._sorted_layers[1]) == 1
|
||||
assert "Synthesizer" in workflow._sorted_layers[1]
|
||||
|
||||
def test_rustworkx_topological_generations_complex(self):
|
||||
"""Test topological generations with complex topology"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Complex-Test", backend="rustworkx"
|
||||
)
|
||||
agents = [
|
||||
create_test_agent(f"Agent{i}", f"Agent {i}")
|
||||
for i in range(6)
|
||||
]
|
||||
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
# Create: Agent0 -> Agent1, Agent2
|
||||
# Agent1, Agent2 -> Agent3
|
||||
# Agent3 -> Agent4, Agent5
|
||||
workflow.add_edge(agents[0], agents[1])
|
||||
workflow.add_edge(agents[0], agents[2])
|
||||
workflow.add_edge(agents[1], agents[3])
|
||||
workflow.add_edge(agents[2], agents[3])
|
||||
workflow.add_edge(agents[3], agents[4])
|
||||
workflow.add_edge(agents[3], agents[5])
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 4
|
||||
assert "Agent0" in workflow._sorted_layers[0]
|
||||
assert (
|
||||
"Agent1" in workflow._sorted_layers[1]
|
||||
or "Agent2" in workflow._sorted_layers[1]
|
||||
)
|
||||
assert "Agent3" in workflow._sorted_layers[2]
|
||||
assert (
|
||||
"Agent4" in workflow._sorted_layers[3]
|
||||
or "Agent5" in workflow._sorted_layers[3]
|
||||
)
|
||||
|
||||
def test_rustworkx_predecessors(self):
|
||||
"""Test predecessor retrieval"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Predecessors-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent2, agent3)
|
||||
|
||||
predecessors = list(
|
||||
workflow.graph_backend.predecessors("Agent2")
|
||||
)
|
||||
assert "Agent1" in predecessors
|
||||
assert len(predecessors) == 1
|
||||
|
||||
predecessors = list(
|
||||
workflow.graph_backend.predecessors("Agent3")
|
||||
)
|
||||
assert "Agent2" in predecessors
|
||||
assert len(predecessors) == 1
|
||||
|
||||
predecessors = list(
|
||||
workflow.graph_backend.predecessors("Agent1")
|
||||
)
|
||||
assert len(predecessors) == 0
|
||||
|
||||
def test_rustworkx_descendants(self):
|
||||
"""Test descendant retrieval"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Descendants-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent2, agent3)
|
||||
|
||||
descendants = workflow.graph_backend.descendants("Agent1")
|
||||
assert "Agent2" in descendants
|
||||
assert "Agent3" in descendants
|
||||
assert len(descendants) == 2
|
||||
|
||||
descendants = workflow.graph_backend.descendants("Agent2")
|
||||
assert "Agent3" in descendants
|
||||
assert len(descendants) == 1
|
||||
|
||||
descendants = workflow.graph_backend.descendants("Agent3")
|
||||
assert len(descendants) == 0
|
||||
|
||||
def test_rustworkx_in_degree(self):
|
||||
"""Test in-degree calculation"""
|
||||
workflow = GraphWorkflow(
|
||||
name="InDegree-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent3, agent2)
|
||||
|
||||
assert workflow.graph_backend.in_degree("Agent1") == 0
|
||||
assert workflow.graph_backend.in_degree("Agent2") == 2
|
||||
assert workflow.graph_backend.in_degree("Agent3") == 0
|
||||
|
||||
def test_rustworkx_out_degree(self):
|
||||
"""Test out-degree calculation"""
|
||||
workflow = GraphWorkflow(
|
||||
name="OutDegree-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent1, agent3)
|
||||
|
||||
assert workflow.graph_backend.out_degree("Agent1") == 2
|
||||
assert workflow.graph_backend.out_degree("Agent2") == 0
|
||||
assert workflow.graph_backend.out_degree("Agent3") == 0
|
||||
|
||||
def test_rustworkx_agent_objects_in_edges(self):
|
||||
"""Test using Agent objects directly in edge methods"""
|
||||
workflow = GraphWorkflow(
|
||||
name="AgentObjects-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
# Use Agent objects directly
|
||||
workflow.add_edges_from_source(agent1, [agent2, agent3])
|
||||
workflow.add_edges_to_target([agent2, agent3], agent1)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow.edges) == 4
|
||||
assert len(workflow._sorted_layers) >= 1
|
||||
|
||||
def test_rustworkx_parallel_chain(self):
|
||||
"""Test parallel chain pattern"""
|
||||
workflow = GraphWorkflow(
|
||||
name="ParallelChain-Test", backend="rustworkx"
|
||||
)
|
||||
sources = [
|
||||
create_test_agent(f"Source{i}", f"Source {i}")
|
||||
for i in range(3)
|
||||
]
|
||||
targets = [
|
||||
create_test_agent(f"Target{i}", f"Target {i}")
|
||||
for i in range(3)
|
||||
]
|
||||
|
||||
for agent in sources + targets:
|
||||
workflow.add_node(agent)
|
||||
|
||||
workflow.add_parallel_chain(sources, targets)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow.edges) == 9 # 3x3 = 9 edges
|
||||
assert len(workflow._sorted_layers) == 2
|
||||
|
||||
def test_rustworkx_large_scale(self):
|
||||
"""Test rustworkx with large workflow"""
|
||||
workflow = GraphWorkflow(
|
||||
name="LargeScale-Test", backend="rustworkx"
|
||||
)
|
||||
agents = [
|
||||
create_test_agent(f"Agent{i}", f"Agent {i}")
|
||||
for i in range(20)
|
||||
]
|
||||
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
# Create linear chain
|
||||
for i in range(len(agents) - 1):
|
||||
workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 20
|
||||
assert len(workflow.nodes) == 20
|
||||
assert len(workflow.edges) == 19
|
||||
|
||||
def test_rustworkx_reverse(self):
|
||||
"""Test graph reversal"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Reverse-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
reversed_backend = workflow.graph_backend.reverse()
|
||||
|
||||
# In reversed graph, Agent2 should have Agent1 as predecessor
|
||||
preds = list(reversed_backend.predecessors("Agent1"))
|
||||
assert "Agent2" in preds
|
||||
|
||||
# Agent2 should have no predecessors in reversed graph
|
||||
preds = list(reversed_backend.predecessors("Agent2"))
|
||||
assert len(preds) == 0
|
||||
|
||||
def test_rustworkx_entry_end_points(self):
|
||||
"""Test entry and end point detection"""
|
||||
workflow = GraphWorkflow(
|
||||
name="EntryEnd-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "Entry agent")
|
||||
agent2 = create_test_agent("Agent2", "Middle agent")
|
||||
agent3 = create_test_agent("Agent3", "End agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent2, agent3)
|
||||
|
||||
workflow.auto_set_entry_points()
|
||||
workflow.auto_set_end_points()
|
||||
|
||||
assert "Agent1" in workflow.entry_points
|
||||
assert "Agent3" in workflow.end_points
|
||||
assert workflow.graph_backend.in_degree("Agent1") == 0
|
||||
assert workflow.graph_backend.out_degree("Agent3") == 0
|
||||
|
||||
def test_rustworkx_isolated_nodes(self):
|
||||
"""Test handling of isolated nodes"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Isolated-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "Connected agent")
|
||||
agent2 = create_test_agent("Agent2", "Isolated agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent1) # Self-loop
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow.nodes) == 2
|
||||
assert "Agent2" in workflow.nodes
|
||||
|
||||
def test_rustworkx_workflow_execution(self):
|
||||
"""Test full workflow execution with rustworkx"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Execution-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
result = workflow.run("Test task")
|
||||
|
||||
assert result is not None
|
||||
assert "Agent1" in result
|
||||
assert "Agent2" in result
|
||||
|
||||
def test_rustworkx_compilation_caching(self):
|
||||
"""Test that compilation is cached correctly"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Cache-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
# First compilation
|
||||
workflow.compile()
|
||||
layers1 = workflow._sorted_layers.copy()
|
||||
compiled1 = workflow._compiled
|
||||
|
||||
# Second compilation should use cache
|
||||
workflow.compile()
|
||||
layers2 = workflow._sorted_layers.copy()
|
||||
compiled2 = workflow._compiled
|
||||
|
||||
assert compiled1 == compiled2 == True
|
||||
assert layers1 == layers2
|
||||
|
||||
def test_rustworkx_node_metadata(self):
|
||||
"""Test node metadata handling"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Metadata-Test", backend="rustworkx"
|
||||
)
|
||||
agent = create_test_agent("Agent", "Test agent")
|
||||
|
||||
workflow.add_node(
|
||||
agent, metadata={"priority": "high", "timeout": 60}
|
||||
)
|
||||
|
||||
node_index = workflow.graph_backend._node_id_to_index["Agent"]
|
||||
node_data = workflow.graph_backend.graph[node_index]
|
||||
|
||||
assert isinstance(node_data, dict)
|
||||
assert node_data.get("node_id") == "Agent"
|
||||
assert node_data.get("priority") == "high"
|
||||
assert node_data.get("timeout") == 60
|
||||
|
||||
def test_rustworkx_edge_metadata(self):
|
||||
"""Test edge metadata handling"""
|
||||
workflow = GraphWorkflow(
|
||||
name="EdgeMetadata-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent2, weight=5, label="test")
|
||||
|
||||
assert len(workflow.edges) == 1
|
||||
assert workflow.edges[0].metadata.get("weight") == 5
|
||||
assert workflow.edges[0].metadata.get("label") == "test"
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not RUSTWORKX_AVAILABLE, reason="rustworkx not available"
|
||||
)
|
||||
class TestRustworkxPerformance:
|
||||
"""Performance tests for rustworkx backend"""
|
||||
|
||||
def test_rustworkx_large_graph_compilation(self):
|
||||
"""Test compilation performance with large graph"""
|
||||
workflow = GraphWorkflow(
|
||||
name="LargeGraph-Test", backend="rustworkx"
|
||||
)
|
||||
agents = [
|
||||
create_test_agent(f"Agent{i}", f"Agent {i}")
|
||||
for i in range(50)
|
||||
]
|
||||
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
# Create a complex topology
|
||||
for i in range(len(agents) - 1):
|
||||
workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
import time
|
||||
|
||||
start = time.time()
|
||||
workflow.compile()
|
||||
compile_time = time.time() - start
|
||||
|
||||
assert compile_time < 1.0 # Should compile quickly
|
||||
assert len(workflow._sorted_layers) == 50
|
||||
|
||||
def test_rustworkx_many_predecessors(self):
|
||||
"""Test performance with many predecessors"""
|
||||
workflow = GraphWorkflow(
|
||||
name="ManyPreds-Test", backend="rustworkx"
|
||||
)
|
||||
target = create_test_agent("Target", "Target agent")
|
||||
sources = [
|
||||
create_test_agent(f"Source{i}", f"Source {i}")
|
||||
for i in range(100)
|
||||
]
|
||||
|
||||
workflow.add_node(target)
|
||||
for source in sources:
|
||||
workflow.add_node(source)
|
||||
|
||||
workflow.add_edges_to_target(sources, target)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
predecessors = list(
|
||||
workflow.graph_backend.predecessors("Target")
|
||||
)
|
||||
assert len(predecessors) == 100
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not RUSTWORKX_AVAILABLE, reason="rustworkx not available"
|
||||
)
|
||||
class TestRustworkxEdgeCases:
|
||||
"""Edge case tests for rustworkx backend"""
|
||||
|
||||
def test_rustworkx_empty_graph(self):
|
||||
"""Test empty graph handling"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Empty-Test", backend="rustworkx"
|
||||
)
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 0
|
||||
assert len(workflow.nodes) == 0
|
||||
|
||||
def test_rustworkx_single_node(self):
|
||||
"""Test single node graph"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Single-Test", backend="rustworkx"
|
||||
)
|
||||
agent = create_test_agent("Agent", "Single agent")
|
||||
|
||||
workflow.add_node(agent)
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow._sorted_layers) == 1
|
||||
assert workflow._sorted_layers[0] == ["Agent"]
|
||||
|
||||
def test_rustworkx_self_loop(self):
|
||||
"""Test self-loop handling"""
|
||||
workflow = GraphWorkflow(
|
||||
name="SelfLoop-Test", backend="rustworkx"
|
||||
)
|
||||
agent = create_test_agent("Agent", "Self-looping agent")
|
||||
|
||||
workflow.add_node(agent)
|
||||
workflow.add_edge(agent, agent)
|
||||
|
||||
workflow.compile()
|
||||
|
||||
assert len(workflow.edges) == 1
|
||||
assert workflow.graph_backend.in_degree("Agent") == 1
|
||||
assert workflow.graph_backend.out_degree("Agent") == 1
|
||||
|
||||
def test_rustworkx_duplicate_edge(self):
|
||||
"""Test duplicate edge handling"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Duplicate-Test", backend="rustworkx"
|
||||
)
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
|
||||
# Add same edge twice
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
# rustworkx should handle duplicate edges
|
||||
assert (
|
||||
len(workflow.edges) == 2
|
||||
) # Both edges are stored in workflow
|
||||
workflow.compile() # Should not crash
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@ -0,0 +1,95 @@
|
||||
# LLM Council Examples
|
||||
|
||||
This directory contains examples demonstrating the LLM Council pattern, inspired by Andrej Karpathy's llm-council implementation. The LLM Council uses multiple specialized AI agents that:
|
||||
|
||||
1. Each respond independently to queries
|
||||
2. Review and rank each other's anonymized responses
|
||||
3. Have a Chairman synthesize all responses into a final comprehensive answer
|
||||
|
||||
## Examples
|
||||
|
||||
### Marketing & Business
|
||||
- **marketing_strategy_council.py** - Marketing strategy analysis and recommendations
|
||||
- **business_strategy_council.py** - Comprehensive business strategy development
|
||||
|
||||
### Finance & Investment
|
||||
- **finance_analysis_council.py** - Financial analysis and investment recommendations
|
||||
- **etf_stock_analysis_council.py** - ETF and stock analysis with portfolio recommendations
|
||||
|
||||
### Medical & Healthcare
|
||||
- **medical_treatment_council.py** - Medical treatment recommendations and care plans
|
||||
- **medical_diagnosis_council.py** - Diagnostic analysis based on symptoms
|
||||
|
||||
### Technology & Research
|
||||
- **technology_assessment_council.py** - Technology evaluation and implementation strategy
|
||||
- **research_analysis_council.py** - Comprehensive research analysis on complex topics
|
||||
|
||||
### Legal
|
||||
- **legal_analysis_council.py** - Legal implications and compliance analysis
|
||||
|
||||
## Usage
|
||||
|
||||
Each example follows the same pattern:
|
||||
|
||||
```python
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Run a query
|
||||
result = council.run("Your query here")
|
||||
|
||||
# Access results
|
||||
print(result["final_response"]) # Chairman's synthesized answer
|
||||
print(result["original_responses"]) # Individual member responses
|
||||
print(result["evaluations"]) # How members ranked each other
|
||||
```
|
||||
|
||||
## Running Examples
|
||||
|
||||
Run any example directly:
|
||||
|
||||
```bash
|
||||
python examples/multi_agent/llm_council_examples/marketing_strategy_council.py
|
||||
python examples/multi_agent/llm_council_examples/finance_analysis_council.py
|
||||
python examples/multi_agent/llm_council_examples/medical_diagnosis_council.py
|
||||
```
|
||||
|
||||
## Key Features
|
||||
|
||||
- **Multiple Perspectives**: Each council member (GPT-5.1, Gemini, Claude, Grok) provides unique insights
|
||||
- **Peer Review**: Members evaluate and rank each other's responses anonymously
|
||||
- **Synthesis**: Chairman combines the best elements from all responses
|
||||
- **Transparency**: See both individual responses and evaluation rankings
|
||||
|
||||
## Council Members
|
||||
|
||||
The default council consists of:
|
||||
- **GPT-5.1-Councilor**: Analytical and comprehensive
|
||||
- **Gemini-3-Pro-Councilor**: Concise and well-processed
|
||||
- **Claude-Sonnet-4.5-Councilor**: Thoughtful and balanced
|
||||
- **Grok-4-Councilor**: Creative and innovative
|
||||
|
||||
## Customization
|
||||
|
||||
You can create custom council members:
|
||||
|
||||
```python
|
||||
from swarms import Agent
|
||||
from swarms.structs.llm_council import LLMCouncil, get_gpt_councilor_prompt
|
||||
|
||||
custom_agent = Agent(
|
||||
agent_name="Custom-Councilor",
|
||||
system_prompt=get_gpt_councilor_prompt(),
|
||||
model_name="gpt-4.1",
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
council = LLMCouncil(
|
||||
council_members=[custom_agent, ...],
|
||||
chairman_model="gpt-5.1",
|
||||
verbose=True
|
||||
)
|
||||
```
|
||||
|
||||
@ -0,0 +1,31 @@
|
||||
"""
|
||||
LLM Council Example: Business Strategy Development
|
||||
|
||||
This example demonstrates using the LLM Council to develop comprehensive
|
||||
business strategies for new ventures.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Business strategy query
|
||||
query = """
|
||||
A tech startup wants to launch an AI-powered personal finance app targeting
|
||||
millennials and Gen Z. Develop a comprehensive business strategy including:
|
||||
1. Market opportunity and competitive landscape analysis
|
||||
2. Product positioning and unique value proposition
|
||||
3. Go-to-market strategy and customer acquisition plan
|
||||
4. Revenue model and pricing strategy
|
||||
5. Key partnerships and distribution channels
|
||||
6. Resource requirements and funding needs
|
||||
7. Risk assessment and mitigation strategies
|
||||
8. Success metrics and KPIs for first 12 months
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,29 @@
|
||||
"""
|
||||
LLM Council Example: ETF Stock Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to analyze ETF holdings
|
||||
and provide stock investment recommendations.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# ETF and stock analysis query
|
||||
query = """
|
||||
Analyze the top energy ETFs (including nuclear, solar, gas, and renewable energy)
|
||||
and provide:
|
||||
1. Top 5 best-performing energy stocks across all energy sectors
|
||||
2. ETF recommendations for diversified energy exposure
|
||||
3. Risk-return profiles for each recommendation
|
||||
4. Current market conditions affecting energy investments
|
||||
5. Allocation strategy for a $100,000 portfolio
|
||||
6. Key metrics to track for each investment
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,29 @@
|
||||
"""
|
||||
LLM Council Example: Financial Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to provide comprehensive
|
||||
financial analysis and investment recommendations.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Financial analysis query
|
||||
query = """
|
||||
Provide a comprehensive financial analysis for investing in emerging markets
|
||||
technology ETFs. Include:
|
||||
1. Risk assessment and volatility analysis
|
||||
2. Historical performance trends
|
||||
3. Sector composition and diversification benefits
|
||||
4. Comparison with developed market tech ETFs
|
||||
5. Recommended allocation percentage for a moderate risk portfolio
|
||||
6. Key factors to monitor going forward
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,31 @@
|
||||
"""
|
||||
LLM Council Example: Legal Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to analyze legal scenarios
|
||||
and provide comprehensive legal insights.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Legal analysis query
|
||||
query = """
|
||||
A startup is considering using AI-generated content for their marketing materials.
|
||||
Analyze the legal implications including:
|
||||
1. Intellectual property rights and ownership of AI-generated content
|
||||
2. Copyright and trademark considerations
|
||||
3. Liability for AI-generated content that may be inaccurate or misleading
|
||||
4. Compliance with advertising regulations (FTC, FDA, etc.)
|
||||
5. Data privacy implications if using customer data to train models
|
||||
6. Contractual considerations with AI service providers
|
||||
7. Risk mitigation strategies
|
||||
8. Best practices for legal compliance
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,28 @@
|
||||
"""
|
||||
LLM Council Example: Marketing Strategy Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to analyze and develop
|
||||
comprehensive marketing strategies by leveraging multiple AI perspectives.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Marketing strategy query
|
||||
query = """
|
||||
Analyze the marketing strategy for a new sustainable energy startup launching
|
||||
a solar panel subscription service. Provide recommendations on:
|
||||
1. Target audience segmentation
|
||||
2. Key messaging and value propositions
|
||||
3. Marketing channels and budget allocation
|
||||
4. Competitive positioning
|
||||
5. Launch timeline and milestones
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,36 @@
|
||||
"""
|
||||
LLM Council Example: Medical Diagnosis Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to analyze symptoms
|
||||
and provide diagnostic insights.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Medical diagnosis query
|
||||
query = """
|
||||
A 35-year-old patient presents with:
|
||||
- Persistent fatigue for 3 months
|
||||
- Unexplained weight loss (15 lbs)
|
||||
- Night sweats
|
||||
- Intermittent low-grade fever
|
||||
- Swollen lymph nodes in neck and armpits
|
||||
- Recent blood work shows elevated ESR and CRP
|
||||
|
||||
Provide:
|
||||
1. Differential diagnosis with most likely conditions ranked
|
||||
2. Additional diagnostic tests needed to confirm
|
||||
3. Red flag symptoms requiring immediate attention
|
||||
4. Possible causes and risk factors
|
||||
5. Recommended next steps for the patient
|
||||
6. When to seek emergency care
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,30 @@
|
||||
"""
|
||||
LLM Council Example: Medical Treatment Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to analyze medical treatments
|
||||
and provide comprehensive treatment recommendations.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Medical treatment query
|
||||
query = """
|
||||
A 45-year-old patient with Type 2 diabetes, hypertension, and early-stage
|
||||
kidney disease needs treatment recommendations. Provide:
|
||||
1. Comprehensive treatment plan addressing all conditions
|
||||
2. Medication options with pros/cons for each condition
|
||||
3. Lifestyle modifications and their expected impact
|
||||
4. Monitoring schedule and key metrics to track
|
||||
5. Potential drug interactions and contraindications
|
||||
6. Expected outcomes and timeline for improvement
|
||||
7. When to consider specialist referrals
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,31 @@
|
||||
"""
|
||||
LLM Council Example: Research Analysis
|
||||
|
||||
This example demonstrates using the LLM Council to conduct comprehensive
|
||||
research analysis on complex topics.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Research analysis query
|
||||
query = """
|
||||
Conduct a comprehensive analysis of the potential impact of climate change
|
||||
on global food security over the next 20 years. Include:
|
||||
1. Key climate factors affecting agriculture (temperature, precipitation, extreme weather)
|
||||
2. Regional vulnerabilities and impacts on major food-producing regions
|
||||
3. Crop yield projections and food availability scenarios
|
||||
4. Economic implications and food price volatility
|
||||
5. Adaptation strategies and technological solutions
|
||||
6. Policy recommendations for governments and international organizations
|
||||
7. Role of innovation in agriculture (precision farming, GMOs, vertical farming)
|
||||
8. Social and geopolitical implications of food insecurity
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,31 @@
|
||||
"""
|
||||
LLM Council Example: Technology Assessment
|
||||
|
||||
This example demonstrates using the LLM Council to assess emerging technologies
|
||||
and their business implications.
|
||||
"""
|
||||
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Technology assessment query
|
||||
query = """
|
||||
Evaluate the business potential and implementation strategy for integrating
|
||||
quantum computing capabilities into a financial services company. Consider:
|
||||
1. Current state of quantum computing technology
|
||||
2. Specific use cases in financial services (risk modeling, portfolio optimization, fraud detection)
|
||||
3. Competitive advantages and potential ROI
|
||||
4. Implementation timeline and resource requirements
|
||||
5. Technical challenges and limitations
|
||||
6. Risk factors and mitigation strategies
|
||||
7. Partnership opportunities with quantum computing providers
|
||||
8. Expected timeline for practical business value
|
||||
"""
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
@ -0,0 +1,22 @@
|
||||
from swarms.structs.llm_council import LLMCouncil
|
||||
|
||||
# Create the council
|
||||
council = LLMCouncil(verbose=True)
|
||||
|
||||
# Example query
|
||||
query = "What are the top five best energy stocks across nuclear, solar, gas, and other energy sources?"
|
||||
|
||||
# Run the council
|
||||
result = council.run(query)
|
||||
|
||||
# Print final response
|
||||
print(result["final_response"])
|
||||
|
||||
# Optionally print evaluations
|
||||
for name, evaluation in result["evaluations"].items():
|
||||
print(f"\n{name}:")
|
||||
print(
|
||||
evaluation[:500] + "..."
|
||||
if len(evaluation) > 500
|
||||
else evaluation
|
||||
)
|
||||
@ -0,0 +1,516 @@
|
||||
"""
|
||||
LLM Council - A Swarms implementation inspired by Andrej Karpathy's llm-council.
|
||||
|
||||
This implementation creates a council of specialized LLM agents that:
|
||||
1. Each agent responds to the user query independently
|
||||
2. All agents review and rank each other's (anonymized) responses
|
||||
3. A Chairman LLM synthesizes all responses and rankings into a final answer
|
||||
|
||||
The council demonstrates how different models evaluate and rank each other's work,
|
||||
often selecting responses from other models as superior to their own.
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Optional
|
||||
import random
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.multi_agent_exec import (
|
||||
run_agents_concurrently,
|
||||
batched_grid_agent_execution,
|
||||
)
|
||||
from swarms.utils.history_output_formatter import HistoryOutputType, history_output_formatter
|
||||
from swarms.structs.conversation import Conversation
|
||||
from swarms.structs.swarm_id import swarm_id
|
||||
|
||||
def get_gpt_councilor_prompt() -> str:
|
||||
"""
|
||||
Get system prompt for GPT-5.1 councilor.
|
||||
|
||||
Returns:
|
||||
System prompt string for GPT-5.1 councilor agent.
|
||||
"""
|
||||
return """You are a member of the LLM Council, representing GPT-5.1. Your role is to provide comprehensive, analytical, and thorough responses to user queries.
|
||||
|
||||
Your strengths:
|
||||
- Deep analytical thinking and comprehensive coverage
|
||||
- Ability to break down complex topics into detailed components
|
||||
- Thorough exploration of multiple perspectives
|
||||
- Rich contextual understanding
|
||||
|
||||
Your approach:
|
||||
- Provide detailed, well-structured responses
|
||||
- Include relevant context and background information
|
||||
- Consider multiple angles and perspectives
|
||||
- Be thorough but clear in your explanations
|
||||
|
||||
Remember: You are part of a council where multiple AI models will respond to the same query, and then evaluate each other's responses. Focus on quality, depth, and clarity."""
|
||||
|
||||
|
||||
def get_gemini_councilor_prompt() -> str:
|
||||
"""
|
||||
Get system prompt for Gemini 3 Pro councilor.
|
||||
|
||||
Returns:
|
||||
System prompt string for Gemini 3 Pro councilor agent.
|
||||
"""
|
||||
return """You are a member of the LLM Council, representing Gemini 3 Pro. Your role is to provide concise, well-processed, and structured responses to user queries.
|
||||
|
||||
Your strengths:
|
||||
- Clear and structured communication
|
||||
- Efficient information processing
|
||||
- Condensed yet comprehensive responses
|
||||
- Well-organized presentation
|
||||
|
||||
Your approach:
|
||||
- Provide concise but complete answers
|
||||
- Structure information clearly and logically
|
||||
- Focus on key points without unnecessary verbosity
|
||||
- Present information in an easily digestible format
|
||||
|
||||
Remember: You are part of a council where multiple AI models will respond to the same query, and then evaluate each other's responses. Focus on clarity, structure, and efficiency."""
|
||||
|
||||
|
||||
def get_claude_councilor_prompt() -> str:
|
||||
"""
|
||||
Get system prompt for Claude Sonnet 4.5 councilor.
|
||||
|
||||
Returns:
|
||||
System prompt string for Claude Sonnet 4.5 councilor agent.
|
||||
"""
|
||||
return """You are a member of the LLM Council, representing Claude Sonnet 4.5. Your role is to provide thoughtful, balanced, and nuanced responses to user queries.
|
||||
|
||||
Your strengths:
|
||||
- Nuanced understanding and balanced perspectives
|
||||
- Thoughtful consideration of trade-offs
|
||||
- Clear reasoning and logical structure
|
||||
- Ethical and responsible analysis
|
||||
|
||||
Your approach:
|
||||
- Provide balanced, well-reasoned responses
|
||||
- Consider multiple viewpoints and implications
|
||||
- Be thoughtful about potential limitations or edge cases
|
||||
- Maintain clarity while showing depth of thought
|
||||
|
||||
Remember: You are part of a council where multiple AI models will respond to the same query, and then evaluate each other's responses. Focus on thoughtfulness, balance, and nuanced reasoning."""
|
||||
|
||||
|
||||
def get_grok_councilor_prompt() -> str:
|
||||
"""
|
||||
Get system prompt for Grok-4 councilor.
|
||||
|
||||
Returns:
|
||||
System prompt string for Grok-4 councilor agent.
|
||||
"""
|
||||
return """You are a member of the LLM Council, representing Grok-4. Your role is to provide creative, innovative, and unique perspectives on user queries.
|
||||
|
||||
Your strengths:
|
||||
- Creative problem-solving and innovative thinking
|
||||
- Unique perspectives and out-of-the-box approaches
|
||||
- Engaging and dynamic communication style
|
||||
- Ability to connect seemingly unrelated concepts
|
||||
|
||||
Your approach:
|
||||
- Provide creative and innovative responses
|
||||
- Offer unique perspectives and fresh insights
|
||||
- Be engaging and dynamic in your communication
|
||||
- Think creatively while maintaining accuracy
|
||||
|
||||
Remember: You are part of a council where multiple AI models will respond to the same query, and then evaluate each other's responses. Focus on creativity, innovation, and unique insights."""
|
||||
|
||||
|
||||
def get_chairman_prompt() -> str:
|
||||
"""
|
||||
Get system prompt for the Chairman agent.
|
||||
|
||||
Returns:
|
||||
System prompt string for the Chairman agent.
|
||||
"""
|
||||
return """You are the Chairman of the LLM Council. Your role is to synthesize responses from all council members along with their evaluations and rankings into a final, comprehensive answer.
|
||||
|
||||
Your responsibilities:
|
||||
1. Review all council member responses to the user's query
|
||||
2. Consider the rankings and evaluations provided by each council member
|
||||
3. Synthesize the best elements from all responses
|
||||
4. Create a final, comprehensive answer that incorporates the strengths of different approaches
|
||||
5. Provide transparency about which perspectives influenced the final answer
|
||||
|
||||
Your approach:
|
||||
- Synthesize rather than simply aggregate
|
||||
- Identify the strongest elements from each response
|
||||
- Create a cohesive final answer that benefits from multiple perspectives
|
||||
- Acknowledge the diversity of approaches taken by council members
|
||||
- Provide a balanced, comprehensive response that serves the user's needs
|
||||
|
||||
Remember: You have access to all original responses and all evaluations. Use this rich context to create the best possible final answer."""
|
||||
|
||||
|
||||
def get_evaluation_prompt(
|
||||
query: str, responses: Dict[str, str], evaluator_name: str
|
||||
) -> str:
|
||||
"""
|
||||
Create evaluation prompt for council members to review and rank responses.
|
||||
|
||||
Args:
|
||||
query: The original user query
|
||||
responses: Dictionary mapping anonymous IDs to response texts
|
||||
evaluator_name: Name of the agent doing the evaluation
|
||||
|
||||
Returns:
|
||||
Formatted evaluation prompt string
|
||||
"""
|
||||
responses_text = "\n\n".join(
|
||||
[
|
||||
f"Response {response_id}:\n{response_text}"
|
||||
for response_id, response_text in responses.items()
|
||||
]
|
||||
)
|
||||
|
||||
return f"""You are evaluating responses from your fellow LLM Council members to the following query:
|
||||
|
||||
QUERY: {query}
|
||||
|
||||
Below are the anonymized responses from all council members (including potentially your own):
|
||||
|
||||
{responses_text}
|
||||
|
||||
Your task:
|
||||
1. Carefully read and analyze each response
|
||||
2. Evaluate the quality, accuracy, completeness, and usefulness of each response
|
||||
3. Rank the responses from best to worst (1 = best, {len(responses)} = worst)
|
||||
4. Provide brief reasoning for your rankings
|
||||
5. Be honest and objective - you may find another model's response superior to your own
|
||||
|
||||
Format your evaluation as follows:
|
||||
|
||||
RANKINGS:
|
||||
1. Response [ID]: [Brief reason why this is the best]
|
||||
2. Response [ID]: [Brief reason]
|
||||
...
|
||||
{len(responses)}. Response [ID]: [Brief reason why this ranks lowest]
|
||||
|
||||
ADDITIONAL OBSERVATIONS:
|
||||
[Any additional insights about the responses, common themes, strengths/weaknesses, etc.]
|
||||
|
||||
Remember: The goal is honest, objective evaluation. If another model's response is genuinely better, acknowledge it."""
|
||||
|
||||
|
||||
def get_synthesis_prompt(
|
||||
query: str,
|
||||
original_responses: Dict[str, str],
|
||||
evaluations: Dict[str, str],
|
||||
id_to_member: Dict[str, str],
|
||||
) -> str:
|
||||
"""
|
||||
Create synthesis prompt for the Chairman.
|
||||
|
||||
Args:
|
||||
query: Original user query
|
||||
original_responses: Dict mapping member names to their responses
|
||||
evaluations: Dict mapping evaluator names to their evaluation texts
|
||||
id_to_member: Mapping from anonymous IDs to member names
|
||||
|
||||
Returns:
|
||||
Formatted synthesis prompt
|
||||
"""
|
||||
responses_section = "\n\n".join(
|
||||
[
|
||||
f"=== {name} ===\n{response}"
|
||||
for name, response in original_responses.items()
|
||||
]
|
||||
)
|
||||
|
||||
evaluations_section = "\n\n".join(
|
||||
[
|
||||
f"=== Evaluation by {name} ===\n{evaluation}"
|
||||
for name, evaluation in evaluations.items()
|
||||
]
|
||||
)
|
||||
|
||||
return f"""As the Chairman of the LLM Council, synthesize the following information into a final, comprehensive answer.
|
||||
|
||||
ORIGINAL QUERY:
|
||||
{query}
|
||||
|
||||
COUNCIL MEMBER RESPONSES:
|
||||
{responses_section}
|
||||
|
||||
COUNCIL MEMBER EVALUATIONS AND RANKINGS:
|
||||
{evaluations_section}
|
||||
|
||||
ANONYMOUS ID MAPPING (for reference):
|
||||
{chr(10).join([f" {aid} = {name}" for aid, name in id_to_member.items()])}
|
||||
|
||||
Your task:
|
||||
1. Review all council member responses
|
||||
2. Consider the evaluations and rankings provided by each member
|
||||
3. Identify the strongest elements from each response
|
||||
4. Synthesize a final, comprehensive answer that:
|
||||
- Incorporates the best insights from multiple perspectives
|
||||
- Addresses the query thoroughly and accurately
|
||||
- Benefits from the diversity of approaches taken
|
||||
- Is clear, well-structured, and useful
|
||||
|
||||
Provide your final synthesized response below. You may reference which perspectives or approaches influenced different parts of your answer."""
|
||||
|
||||
|
||||
class LLMCouncil:
|
||||
"""
|
||||
An LLM Council that orchestrates multiple specialized agents to collaboratively
|
||||
answer queries through independent responses, peer review, and synthesis.
|
||||
|
||||
The council follows this workflow:
|
||||
1. Dispatch query to all council members in parallel
|
||||
2. Collect all responses (anonymized)
|
||||
3. Have each member review and rank all responses
|
||||
4. Chairman synthesizes everything into final response
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
id: str = swarm_id(),
|
||||
name: str = "LLM Council",
|
||||
description: str = "A collaborative council of LLM agents where each member independently answers a query, reviews and ranks anonymized peer responses, and a chairman synthesizes the best elements into a final answer.",
|
||||
council_members: Optional[List[Agent]] = None,
|
||||
chairman_model: str = "gpt-5.1",
|
||||
verbose: bool = True,
|
||||
output_type: HistoryOutputType = "dict",
|
||||
):
|
||||
"""
|
||||
Initialize the LLM Council.
|
||||
|
||||
Args:
|
||||
council_members: List of Agent instances representing council members.
|
||||
If None, creates default council with GPT-5.1, Gemini 3 Pro,
|
||||
Claude Sonnet 4.5, and Grok-4.
|
||||
chairman_model: Model name for the Chairman agent that synthesizes responses.
|
||||
verbose: Whether to print progress and intermediate results.
|
||||
output_type: Format for the output. Options: "list", "dict", "string", "final", "json", "yaml", etc.
|
||||
"""
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.verbose = verbose
|
||||
self.output_type = output_type
|
||||
|
||||
# Create default council members if none provided
|
||||
if council_members is None:
|
||||
self.council_members = self._create_default_council()
|
||||
else:
|
||||
self.council_members = council_members
|
||||
|
||||
# Create Chairman agent
|
||||
self.chairman = Agent(
|
||||
agent_name="Chairman",
|
||||
agent_description="Chairman of the LLM Council, responsible for synthesizing all responses and rankings into a final answer",
|
||||
system_prompt=get_chairman_prompt(),
|
||||
model_name=chairman_model,
|
||||
max_loops=1,
|
||||
verbose=verbose,
|
||||
temperature=0.7,
|
||||
)
|
||||
|
||||
self.conversation = Conversation(name=f"[LLM Council] [Conversation][{name}]")
|
||||
|
||||
if self.verbose:
|
||||
print(
|
||||
f"🏛️ LLM Council initialized with {len(self.council_members)} members"
|
||||
)
|
||||
for i, member in enumerate(self.council_members, 1):
|
||||
print(
|
||||
f" {i}. {member.agent_name} ({member.model_name})"
|
||||
)
|
||||
|
||||
def _create_default_council(self) -> List[Agent]:
|
||||
"""
|
||||
Create default council members with specialized prompts and models.
|
||||
|
||||
Returns:
|
||||
List of Agent instances configured as council members.
|
||||
"""
|
||||
|
||||
# GPT-5.1 Agent - Analytical and comprehensive
|
||||
gpt_agent = Agent(
|
||||
agent_name="GPT-5.1-Councilor",
|
||||
agent_description="Analytical and comprehensive AI councilor specializing in deep analysis and thorough responses",
|
||||
system_prompt=get_gpt_councilor_prompt(),
|
||||
model_name="gpt-5.1",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
temperature=0.7,
|
||||
)
|
||||
|
||||
# Gemini 3 Pro Agent - Concise and processed
|
||||
gemini_agent = Agent(
|
||||
agent_name="Gemini-3-Pro-Councilor",
|
||||
agent_description="Concise and well-processed AI councilor specializing in clear, structured responses",
|
||||
system_prompt=get_gemini_councilor_prompt(),
|
||||
model_name="gemini-2.5-flash", # Using available Gemini model
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
temperature=0.7,
|
||||
)
|
||||
|
||||
# Claude Sonnet 4.5 Agent - Balanced and thoughtful
|
||||
claude_agent = Agent(
|
||||
agent_name="Claude-Sonnet-4.5-Councilor",
|
||||
agent_description="Thoughtful and balanced AI councilor specializing in nuanced and well-reasoned responses",
|
||||
system_prompt=get_claude_councilor_prompt(),
|
||||
model_name="anthropic/claude-sonnet-4-5", # Using available Claude model
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
temperature=0.0,
|
||||
top_p=None,
|
||||
)
|
||||
|
||||
# Grok-4 Agent - Creative and innovative
|
||||
grok_agent = Agent(
|
||||
agent_name="Grok-4-Councilor",
|
||||
agent_description="Creative and innovative AI councilor specializing in unique perspectives and creative solutions",
|
||||
system_prompt=get_grok_councilor_prompt(),
|
||||
model_name="x-ai/grok-4", # Using available model as proxy for Grok-4
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
temperature=0.8,
|
||||
)
|
||||
|
||||
members = [gpt_agent, gemini_agent, claude_agent, grok_agent]
|
||||
|
||||
return members
|
||||
|
||||
def run(self, query: str):
|
||||
"""
|
||||
Execute the full LLM Council workflow.
|
||||
|
||||
Args:
|
||||
query: The user's query to process
|
||||
|
||||
Returns:
|
||||
Formatted output based on output_type, containing conversation history
|
||||
with all council member responses, evaluations, and final synthesis.
|
||||
"""
|
||||
if self.verbose:
|
||||
print(f"\n{'='*80}")
|
||||
print("🏛️ LLM COUNCIL SESSION")
|
||||
print("=" * 80)
|
||||
print(f"\n📝 Query: {query}\n")
|
||||
|
||||
# Add user query to conversation
|
||||
self.conversation.add(role="User", content=query)
|
||||
|
||||
# Step 1: Get responses from all council members in parallel
|
||||
if self.verbose:
|
||||
print("📤 Dispatching query to all council members...")
|
||||
|
||||
results_dict = run_agents_concurrently(
|
||||
self.council_members,
|
||||
task=query,
|
||||
return_agent_output_dict=True,
|
||||
)
|
||||
|
||||
# Map results to member names
|
||||
original_responses = {
|
||||
member.agent_name: response
|
||||
for member, response in zip(
|
||||
self.council_members,
|
||||
[
|
||||
results_dict.get(member.agent_name, "")
|
||||
for member in self.council_members
|
||||
],
|
||||
)
|
||||
}
|
||||
|
||||
# Add each council member's response to conversation
|
||||
for member_name, response in original_responses.items():
|
||||
self.conversation.add(role=member_name, content=response)
|
||||
|
||||
if self.verbose:
|
||||
print(
|
||||
f"✅ Received {len(original_responses)} responses\n"
|
||||
)
|
||||
for name, response in original_responses.items():
|
||||
print(f" {name}: {response[:100]}...")
|
||||
|
||||
# Step 2: Anonymize responses for evaluation
|
||||
# Create anonymous IDs (A, B, C, D, etc.)
|
||||
anonymous_ids = [
|
||||
chr(65 + i) for i in range(len(self.council_members))
|
||||
]
|
||||
random.shuffle(anonymous_ids) # Shuffle to ensure anonymity
|
||||
|
||||
anonymous_responses = {
|
||||
anonymous_ids[i]: original_responses[member.agent_name]
|
||||
for i, member in enumerate(self.council_members)
|
||||
}
|
||||
|
||||
# Create mapping from anonymous ID to member name (for later reference)
|
||||
id_to_member = {
|
||||
anonymous_ids[i]: member.agent_name
|
||||
for i, member in enumerate(self.council_members)
|
||||
}
|
||||
|
||||
if self.verbose:
|
||||
print(
|
||||
"\n🔍 Council members evaluating each other's responses..."
|
||||
)
|
||||
|
||||
# Step 3: Have each member evaluate and rank all responses concurrently
|
||||
# Create evaluation tasks for each member
|
||||
evaluation_tasks = [
|
||||
get_evaluation_prompt(
|
||||
query, anonymous_responses, member.agent_name
|
||||
)
|
||||
for member in self.council_members
|
||||
]
|
||||
|
||||
# Run evaluations concurrently using batched_grid_agent_execution
|
||||
evaluation_results = batched_grid_agent_execution(
|
||||
self.council_members, evaluation_tasks
|
||||
)
|
||||
|
||||
# Map results to member names
|
||||
evaluations = {
|
||||
member.agent_name: evaluation_results[i]
|
||||
for i, member in enumerate(self.council_members)
|
||||
}
|
||||
|
||||
# Add each council member's evaluation to conversation
|
||||
for member_name, evaluation in evaluations.items():
|
||||
self.conversation.add(
|
||||
role=f"{member_name}-Evaluation", content=evaluation
|
||||
)
|
||||
|
||||
if self.verbose:
|
||||
print(f"✅ Received {len(evaluations)} evaluations\n")
|
||||
|
||||
# Step 4: Chairman synthesizes everything
|
||||
if self.verbose:
|
||||
print("👔 Chairman synthesizing final response...\n")
|
||||
|
||||
synthesis_prompt = get_synthesis_prompt(
|
||||
query, original_responses, evaluations, id_to_member
|
||||
)
|
||||
|
||||
final_response = self.chairman.run(task=synthesis_prompt)
|
||||
|
||||
# Add chairman's final response to conversation
|
||||
self.conversation.add(role="Chairman", content=final_response)
|
||||
|
||||
if self.verbose:
|
||||
print(f"{'='*80}")
|
||||
print("✅ FINAL RESPONSE")
|
||||
print(f"{'='*80}\n")
|
||||
|
||||
# Format and return output using history_output_formatter
|
||||
return history_output_formatter(
|
||||
conversation=self.conversation, type=self.output_type
|
||||
)
|
||||
|
||||
def batched_run(self, tasks: List[str]):
|
||||
"""
|
||||
Run the LLM Council workflow for a batch of tasks.
|
||||
|
||||
Args:
|
||||
tasks: List of tasks to process
|
||||
|
||||
Returns:
|
||||
List of formatted outputs based on output_type
|
||||
"""
|
||||
return [self.run(task) for task in tasks]
|
||||
@ -0,0 +1,552 @@
|
||||
import pytest
|
||||
from swarms.structs.graph_workflow import (
|
||||
GraphWorkflow,
|
||||
Node,
|
||||
NodeType,
|
||||
)
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
try:
|
||||
import rustworkx as rx
|
||||
|
||||
RUSTWORKX_AVAILABLE = True
|
||||
except ImportError:
|
||||
RUSTWORKX_AVAILABLE = False
|
||||
|
||||
|
||||
def create_test_agent(name: str, description: str = None) -> Agent:
|
||||
"""Create a real agent for testing"""
|
||||
if description is None:
|
||||
description = f"Test agent for {name} operations"
|
||||
|
||||
return Agent(
|
||||
agent_name=name,
|
||||
agent_description=description,
|
||||
model_name="gpt-4o-mini",
|
||||
verbose=False,
|
||||
print_on=False,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
|
||||
def test_graph_workflow_basic_node_creation():
|
||||
"""Test basic GraphWorkflow node creation with real agents"""
|
||||
# Test basic node creation
|
||||
agent = create_test_agent(
|
||||
"TestAgent", "Test agent for node creation"
|
||||
)
|
||||
node = Node.from_agent(agent)
|
||||
assert node.id == "TestAgent"
|
||||
assert node.type == NodeType.AGENT
|
||||
assert node.agent == agent
|
||||
|
||||
# Test node with custom id
|
||||
node2 = Node(id="CustomID", type=NodeType.AGENT, agent=agent)
|
||||
assert node2.id == "CustomID"
|
||||
|
||||
|
||||
def test_graph_workflow_multi_agent_collaboration():
|
||||
"""Test GraphWorkflow with multiple agents in a collaboration scenario"""
|
||||
# Create specialized agents for a business analysis workflow
|
||||
market_researcher = create_test_agent(
|
||||
"Market-Researcher",
|
||||
"Specialist in market analysis and trend identification",
|
||||
)
|
||||
|
||||
data_analyst = create_test_agent(
|
||||
"Data-Analyst",
|
||||
"Expert in data processing and statistical analysis",
|
||||
)
|
||||
|
||||
strategy_consultant = create_test_agent(
|
||||
"Strategy-Consultant",
|
||||
"Senior consultant for strategic planning and recommendations",
|
||||
)
|
||||
|
||||
# Create workflow with linear execution path
|
||||
workflow = GraphWorkflow(name="Business-Analysis-Workflow")
|
||||
workflow.add_node(market_researcher)
|
||||
workflow.add_node(data_analyst)
|
||||
workflow.add_node(strategy_consultant)
|
||||
|
||||
# Add edges to define execution order
|
||||
workflow.add_edge("Market-Researcher", "Data-Analyst")
|
||||
workflow.add_edge("Data-Analyst", "Strategy-Consultant")
|
||||
|
||||
# Test workflow execution
|
||||
result = workflow.run(
|
||||
"Analyze market opportunities for AI in healthcare"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_parallel_execution():
|
||||
"""Test GraphWorkflow with parallel execution paths"""
|
||||
# Create agents for parallel analysis
|
||||
technical_analyst = create_test_agent(
|
||||
"Technical-Analyst",
|
||||
"Technical feasibility and implementation analysis",
|
||||
)
|
||||
|
||||
market_analyst = create_test_agent(
|
||||
"Market-Analyst",
|
||||
"Market positioning and competitive analysis",
|
||||
)
|
||||
|
||||
financial_analyst = create_test_agent(
|
||||
"Financial-Analyst", "Financial modeling and ROI analysis"
|
||||
)
|
||||
|
||||
risk_assessor = create_test_agent(
|
||||
"Risk-Assessor", "Risk assessment and mitigation planning"
|
||||
)
|
||||
|
||||
# Create workflow with parallel execution
|
||||
workflow = GraphWorkflow(name="Parallel-Analysis-Workflow")
|
||||
workflow.add_node(technical_analyst)
|
||||
workflow.add_node(market_analyst)
|
||||
workflow.add_node(financial_analyst)
|
||||
workflow.add_node(risk_assessor)
|
||||
|
||||
# Add edges for fan-out execution (one to many)
|
||||
workflow.add_edges_from_source(
|
||||
"Technical-Analyst",
|
||||
["Market-Analyst", "Financial-Analyst", "Risk-Assessor"],
|
||||
)
|
||||
|
||||
# Test parallel execution
|
||||
result = workflow.run(
|
||||
"Evaluate feasibility of launching a new fintech platform"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_complex_topology():
|
||||
"""Test GraphWorkflow with complex node topology"""
|
||||
# Create agents for a comprehensive product development workflow
|
||||
product_manager = create_test_agent(
|
||||
"Product-Manager", "Product strategy and roadmap management"
|
||||
)
|
||||
|
||||
ux_designer = create_test_agent(
|
||||
"UX-Designer", "User experience design and research"
|
||||
)
|
||||
|
||||
backend_developer = create_test_agent(
|
||||
"Backend-Developer",
|
||||
"Backend system architecture and development",
|
||||
)
|
||||
|
||||
frontend_developer = create_test_agent(
|
||||
"Frontend-Developer",
|
||||
"Frontend interface and user interaction development",
|
||||
)
|
||||
|
||||
qa_engineer = create_test_agent(
|
||||
"QA-Engineer", "Quality assurance and testing specialist"
|
||||
)
|
||||
|
||||
devops_engineer = create_test_agent(
|
||||
"DevOps-Engineer", "Deployment and infrastructure management"
|
||||
)
|
||||
|
||||
# Create workflow with complex dependencies
|
||||
workflow = GraphWorkflow(name="Product-Development-Workflow")
|
||||
workflow.add_node(product_manager)
|
||||
workflow.add_node(ux_designer)
|
||||
workflow.add_node(backend_developer)
|
||||
workflow.add_node(frontend_developer)
|
||||
workflow.add_node(qa_engineer)
|
||||
workflow.add_node(devops_engineer)
|
||||
|
||||
# Define complex execution topology
|
||||
workflow.add_edge("Product-Manager", "UX-Designer")
|
||||
workflow.add_edge("UX-Designer", "Frontend-Developer")
|
||||
workflow.add_edge("Product-Manager", "Backend-Developer")
|
||||
workflow.add_edge("Backend-Developer", "QA-Engineer")
|
||||
workflow.add_edge("Frontend-Developer", "QA-Engineer")
|
||||
workflow.add_edge("QA-Engineer", "DevOps-Engineer")
|
||||
|
||||
# Test complex workflow execution
|
||||
result = workflow.run(
|
||||
"Develop a comprehensive e-commerce platform with AI recommendations"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_error_handling():
|
||||
"""Test GraphWorkflow error handling and validation"""
|
||||
# Test with empty workflow
|
||||
workflow = GraphWorkflow()
|
||||
result = workflow.run("Test task")
|
||||
# Empty workflow should handle gracefully
|
||||
assert result is not None
|
||||
|
||||
# Test workflow compilation and caching
|
||||
researcher = create_test_agent(
|
||||
"Researcher", "Research specialist"
|
||||
)
|
||||
workflow.add_node(researcher)
|
||||
|
||||
# First run should compile
|
||||
result1 = workflow.run("Research task")
|
||||
assert result1 is not None
|
||||
|
||||
# Second run should use cached compilation
|
||||
result2 = workflow.run("Another research task")
|
||||
assert result2 is not None
|
||||
|
||||
|
||||
def test_graph_workflow_node_metadata():
|
||||
"""Test GraphWorkflow with node metadata"""
|
||||
# Create agents with different priorities and requirements
|
||||
high_priority_agent = create_test_agent(
|
||||
"High-Priority-Analyst", "High priority analysis specialist"
|
||||
)
|
||||
|
||||
standard_agent = create_test_agent(
|
||||
"Standard-Analyst", "Standard analysis agent"
|
||||
)
|
||||
|
||||
# Create workflow and add nodes with metadata
|
||||
workflow = GraphWorkflow(name="Metadata-Workflow")
|
||||
workflow.add_node(
|
||||
high_priority_agent,
|
||||
metadata={"priority": "high", "timeout": 60},
|
||||
)
|
||||
workflow.add_node(
|
||||
standard_agent, metadata={"priority": "normal", "timeout": 30}
|
||||
)
|
||||
|
||||
# Add execution dependency
|
||||
workflow.add_edge("High-Priority-Analyst", "Standard-Analyst")
|
||||
|
||||
# Test execution with metadata
|
||||
result = workflow.run(
|
||||
"Analyze business requirements with different priorities"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_basic(backend):
|
||||
"""Test GraphWorkflow basic functionality with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"Backend-Test-{backend}", backend=backend
|
||||
)
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
assert len(workflow.nodes) == 2
|
||||
assert len(workflow.edges) == 1
|
||||
|
||||
result = workflow.run("Test task")
|
||||
assert result is not None
|
||||
assert "Agent1" in result
|
||||
assert "Agent2" in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_parallel_execution(backend):
|
||||
"""Test parallel execution with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
coordinator = create_test_agent(
|
||||
"Coordinator", "Coordinates tasks"
|
||||
)
|
||||
analyst1 = create_test_agent("Analyst1", "First analyst")
|
||||
analyst2 = create_test_agent("Analyst2", "Second analyst")
|
||||
analyst3 = create_test_agent("Analyst3", "Third analyst")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"Parallel-Test-{backend}", backend=backend
|
||||
)
|
||||
workflow.add_node(coordinator)
|
||||
workflow.add_node(analyst1)
|
||||
workflow.add_node(analyst2)
|
||||
workflow.add_node(analyst3)
|
||||
|
||||
workflow.add_edges_from_source(
|
||||
coordinator, [analyst1, analyst2, analyst3]
|
||||
)
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow._sorted_layers) >= 1
|
||||
assert (
|
||||
len(workflow._sorted_layers[0]) == 1
|
||||
) # Coordinator in first layer
|
||||
|
||||
result = workflow.run("Analyze data in parallel")
|
||||
assert result is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_fan_in_pattern(backend):
|
||||
"""Test fan-in pattern with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
analyst1 = create_test_agent("Analyst1", "First analyst")
|
||||
analyst2 = create_test_agent("Analyst2", "Second analyst")
|
||||
analyst3 = create_test_agent("Analyst3", "Third analyst")
|
||||
synthesizer = create_test_agent(
|
||||
"Synthesizer", "Synthesizes results"
|
||||
)
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"FanIn-Test-{backend}", backend=backend
|
||||
)
|
||||
workflow.add_node(analyst1)
|
||||
workflow.add_node(analyst2)
|
||||
workflow.add_node(analyst3)
|
||||
workflow.add_node(synthesizer)
|
||||
|
||||
workflow.add_edges_to_target(
|
||||
[analyst1, analyst2, analyst3], synthesizer
|
||||
)
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow._sorted_layers) >= 2
|
||||
assert synthesizer.agent_name in workflow.end_points
|
||||
|
||||
result = workflow.run("Synthesize multiple analyses")
|
||||
assert result is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_parallel_chain(backend):
|
||||
"""Test parallel chain pattern with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
collector1 = create_test_agent("Collector1", "First collector")
|
||||
collector2 = create_test_agent("Collector2", "Second collector")
|
||||
processor1 = create_test_agent("Processor1", "First processor")
|
||||
processor2 = create_test_agent("Processor2", "Second processor")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"ParallelChain-Test-{backend}", backend=backend
|
||||
)
|
||||
workflow.add_node(collector1)
|
||||
workflow.add_node(collector2)
|
||||
workflow.add_node(processor1)
|
||||
workflow.add_node(processor2)
|
||||
|
||||
workflow.add_parallel_chain(
|
||||
[collector1, collector2], [processor1, processor2]
|
||||
)
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow.edges) == 4 # 2x2 = 4 edges
|
||||
|
||||
result = workflow.run("Process data from multiple collectors")
|
||||
assert result is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_complex_topology(backend):
|
||||
"""Test complex topology with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agents = [
|
||||
create_test_agent(f"Agent{i}", f"Agent {i}") for i in range(5)
|
||||
]
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"Complex-Topology-{backend}", backend=backend
|
||||
)
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
workflow.add_edge(agents[0], agents[1])
|
||||
workflow.add_edge(agents[0], agents[2])
|
||||
workflow.add_edge(agents[1], agents[3])
|
||||
workflow.add_edge(agents[2], agents[3])
|
||||
workflow.add_edge(agents[3], agents[4])
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow._sorted_layers) >= 3
|
||||
|
||||
result = workflow.run("Execute complex workflow")
|
||||
assert result is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_validation(backend):
|
||||
"""Test workflow validation with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
isolated = create_test_agent("Isolated", "Isolated agent")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"Validation-Test-{backend}", backend=backend
|
||||
)
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(isolated)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
|
||||
validation = workflow.validate(auto_fix=False)
|
||||
assert isinstance(validation, dict)
|
||||
assert "is_valid" in validation
|
||||
|
||||
validation_fixed = workflow.validate(auto_fix=True)
|
||||
assert isinstance(validation_fixed, dict)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("backend", ["networkx", "rustworkx"])
|
||||
def test_graph_workflow_backend_entry_end_points(backend):
|
||||
"""Test entry and end points with both backends"""
|
||||
if backend == "rustworkx" and not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agent1 = create_test_agent("Agent1", "Entry agent")
|
||||
agent2 = create_test_agent("Agent2", "Middle agent")
|
||||
agent3 = create_test_agent("Agent3", "End agent")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name=f"EntryEnd-Test-{backend}", backend=backend
|
||||
)
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent2, agent3)
|
||||
|
||||
workflow.auto_set_entry_points()
|
||||
workflow.auto_set_end_points()
|
||||
|
||||
assert agent1.agent_name in workflow.entry_points
|
||||
assert agent3.agent_name in workflow.end_points
|
||||
|
||||
|
||||
def test_graph_workflow_rustworkx_specific():
|
||||
"""Test rustworkx-specific features"""
|
||||
if not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Rustworkx-Specific-Test", backend="rustworkx"
|
||||
)
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
workflow.add_edge(agent1, agent2)
|
||||
workflow.add_edge(agent2, agent3)
|
||||
|
||||
assert (
|
||||
workflow.graph_backend.__class__.__name__
|
||||
== "RustworkxBackend"
|
||||
)
|
||||
assert hasattr(workflow.graph_backend, "_node_id_to_index")
|
||||
assert hasattr(workflow.graph_backend, "_index_to_node_id")
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow._sorted_layers) == 3
|
||||
|
||||
predecessors = list(
|
||||
workflow.graph_backend.predecessors(agent2.agent_name)
|
||||
)
|
||||
assert agent1.agent_name in predecessors
|
||||
|
||||
descendants = workflow.graph_backend.descendants(
|
||||
agent1.agent_name
|
||||
)
|
||||
assert agent2.agent_name in descendants
|
||||
assert agent3.agent_name in descendants
|
||||
|
||||
result = workflow.run("Test rustworkx backend")
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_rustworkx_large_scale():
|
||||
"""Test rustworkx with larger workflow"""
|
||||
if not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agents = [
|
||||
create_test_agent(f"Agent{i}", f"Agent {i}")
|
||||
for i in range(10)
|
||||
]
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Rustworkx-Large-Scale", backend="rustworkx"
|
||||
)
|
||||
for agent in agents:
|
||||
workflow.add_node(agent)
|
||||
|
||||
for i in range(len(agents) - 1):
|
||||
workflow.add_edge(agents[i], agents[i + 1])
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow._sorted_layers) == 10
|
||||
|
||||
result = workflow.run("Test large scale workflow")
|
||||
assert result is not None
|
||||
assert len(result) == 10
|
||||
|
||||
|
||||
def test_graph_workflow_rustworkx_agent_objects():
|
||||
"""Test rustworkx with Agent objects directly in edges"""
|
||||
if not RUSTWORKX_AVAILABLE:
|
||||
pytest.skip("rustworkx not available")
|
||||
|
||||
agent1 = create_test_agent("Agent1", "First agent")
|
||||
agent2 = create_test_agent("Agent2", "Second agent")
|
||||
agent3 = create_test_agent("Agent3", "Third agent")
|
||||
|
||||
workflow = GraphWorkflow(
|
||||
name="Rustworkx-Agent-Objects", backend="rustworkx"
|
||||
)
|
||||
workflow.add_node(agent1)
|
||||
workflow.add_node(agent2)
|
||||
workflow.add_node(agent3)
|
||||
|
||||
workflow.add_edges_from_source(agent1, [agent2, agent3])
|
||||
workflow.add_edges_to_target([agent2, agent3], agent1)
|
||||
|
||||
workflow.compile()
|
||||
assert len(workflow.edges) == 4
|
||||
|
||||
result = workflow.run("Test agent objects in edges")
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_backend_fallback():
|
||||
"""Test backend fallback when rustworkx unavailable"""
|
||||
workflow = GraphWorkflow(
|
||||
name="Fallback-Test", backend="rustworkx"
|
||||
)
|
||||
agent = create_test_agent("Agent", "Test agent")
|
||||
workflow.add_node(agent)
|
||||
|
||||
if not RUSTWORKX_AVAILABLE:
|
||||
assert (
|
||||
workflow.graph_backend.__class__.__name__
|
||||
== "NetworkXBackend"
|
||||
)
|
||||
else:
|
||||
assert (
|
||||
workflow.graph_backend.__class__.__name__
|
||||
== "RustworkxBackend"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@ -1,225 +0,0 @@
|
||||
import pytest
|
||||
from swarms.structs.graph_workflow import (
|
||||
GraphWorkflow,
|
||||
Node,
|
||||
NodeType,
|
||||
)
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
def create_test_agent(name: str, description: str = None) -> Agent:
|
||||
"""Create a real agent for testing"""
|
||||
if description is None:
|
||||
description = f"Test agent for {name} operations"
|
||||
|
||||
return Agent(
|
||||
agent_name=name,
|
||||
agent_description=description,
|
||||
model_name="gpt-4o-mini",
|
||||
verbose=False,
|
||||
print_on=False,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
|
||||
def test_graph_workflow_basic_node_creation():
|
||||
"""Test basic GraphWorkflow node creation with real agents"""
|
||||
# Test basic node creation
|
||||
agent = create_test_agent(
|
||||
"TestAgent", "Test agent for node creation"
|
||||
)
|
||||
node = Node.from_agent(agent)
|
||||
assert node.id == "TestAgent"
|
||||
assert node.type == NodeType.AGENT
|
||||
assert node.agent == agent
|
||||
|
||||
# Test node with custom id
|
||||
node2 = Node(id="CustomID", type=NodeType.AGENT, agent=agent)
|
||||
assert node2.id == "CustomID"
|
||||
|
||||
|
||||
def test_graph_workflow_multi_agent_collaboration():
|
||||
"""Test GraphWorkflow with multiple agents in a collaboration scenario"""
|
||||
# Create specialized agents for a business analysis workflow
|
||||
market_researcher = create_test_agent(
|
||||
"Market-Researcher",
|
||||
"Specialist in market analysis and trend identification",
|
||||
)
|
||||
|
||||
data_analyst = create_test_agent(
|
||||
"Data-Analyst",
|
||||
"Expert in data processing and statistical analysis",
|
||||
)
|
||||
|
||||
strategy_consultant = create_test_agent(
|
||||
"Strategy-Consultant",
|
||||
"Senior consultant for strategic planning and recommendations",
|
||||
)
|
||||
|
||||
# Create workflow with linear execution path
|
||||
workflow = GraphWorkflow(name="Business-Analysis-Workflow")
|
||||
workflow.add_node(market_researcher)
|
||||
workflow.add_node(data_analyst)
|
||||
workflow.add_node(strategy_consultant)
|
||||
|
||||
# Add edges to define execution order
|
||||
workflow.add_edge("Market-Researcher", "Data-Analyst")
|
||||
workflow.add_edge("Data-Analyst", "Strategy-Consultant")
|
||||
|
||||
# Test workflow execution
|
||||
result = workflow.run(
|
||||
"Analyze market opportunities for AI in healthcare"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_parallel_execution():
|
||||
"""Test GraphWorkflow with parallel execution paths"""
|
||||
# Create agents for parallel analysis
|
||||
technical_analyst = create_test_agent(
|
||||
"Technical-Analyst",
|
||||
"Technical feasibility and implementation analysis",
|
||||
)
|
||||
|
||||
market_analyst = create_test_agent(
|
||||
"Market-Analyst",
|
||||
"Market positioning and competitive analysis",
|
||||
)
|
||||
|
||||
financial_analyst = create_test_agent(
|
||||
"Financial-Analyst", "Financial modeling and ROI analysis"
|
||||
)
|
||||
|
||||
risk_assessor = create_test_agent(
|
||||
"Risk-Assessor", "Risk assessment and mitigation planning"
|
||||
)
|
||||
|
||||
# Create workflow with parallel execution
|
||||
workflow = GraphWorkflow(name="Parallel-Analysis-Workflow")
|
||||
workflow.add_node(technical_analyst)
|
||||
workflow.add_node(market_analyst)
|
||||
workflow.add_node(financial_analyst)
|
||||
workflow.add_node(risk_assessor)
|
||||
|
||||
# Add edges for fan-out execution (one to many)
|
||||
workflow.add_edges_from_source(
|
||||
"Technical-Analyst",
|
||||
["Market-Analyst", "Financial-Analyst", "Risk-Assessor"],
|
||||
)
|
||||
|
||||
# Test parallel execution
|
||||
result = workflow.run(
|
||||
"Evaluate feasibility of launching a new fintech platform"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_complex_topology():
|
||||
"""Test GraphWorkflow with complex node topology"""
|
||||
# Create agents for a comprehensive product development workflow
|
||||
product_manager = create_test_agent(
|
||||
"Product-Manager", "Product strategy and roadmap management"
|
||||
)
|
||||
|
||||
ux_designer = create_test_agent(
|
||||
"UX-Designer", "User experience design and research"
|
||||
)
|
||||
|
||||
backend_developer = create_test_agent(
|
||||
"Backend-Developer",
|
||||
"Backend system architecture and development",
|
||||
)
|
||||
|
||||
frontend_developer = create_test_agent(
|
||||
"Frontend-Developer",
|
||||
"Frontend interface and user interaction development",
|
||||
)
|
||||
|
||||
qa_engineer = create_test_agent(
|
||||
"QA-Engineer", "Quality assurance and testing specialist"
|
||||
)
|
||||
|
||||
devops_engineer = create_test_agent(
|
||||
"DevOps-Engineer", "Deployment and infrastructure management"
|
||||
)
|
||||
|
||||
# Create workflow with complex dependencies
|
||||
workflow = GraphWorkflow(name="Product-Development-Workflow")
|
||||
workflow.add_node(product_manager)
|
||||
workflow.add_node(ux_designer)
|
||||
workflow.add_node(backend_developer)
|
||||
workflow.add_node(frontend_developer)
|
||||
workflow.add_node(qa_engineer)
|
||||
workflow.add_node(devops_engineer)
|
||||
|
||||
# Define complex execution topology
|
||||
workflow.add_edge("Product-Manager", "UX-Designer")
|
||||
workflow.add_edge("UX-Designer", "Frontend-Developer")
|
||||
workflow.add_edge("Product-Manager", "Backend-Developer")
|
||||
workflow.add_edge("Backend-Developer", "QA-Engineer")
|
||||
workflow.add_edge("Frontend-Developer", "QA-Engineer")
|
||||
workflow.add_edge("QA-Engineer", "DevOps-Engineer")
|
||||
|
||||
# Test complex workflow execution
|
||||
result = workflow.run(
|
||||
"Develop a comprehensive e-commerce platform with AI recommendations"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_graph_workflow_error_handling():
|
||||
"""Test GraphWorkflow error handling and validation"""
|
||||
# Test with empty workflow
|
||||
workflow = GraphWorkflow()
|
||||
result = workflow.run("Test task")
|
||||
# Empty workflow should handle gracefully
|
||||
assert result is not None
|
||||
|
||||
# Test workflow compilation and caching
|
||||
researcher = create_test_agent(
|
||||
"Researcher", "Research specialist"
|
||||
)
|
||||
workflow.add_node(researcher)
|
||||
|
||||
# First run should compile
|
||||
result1 = workflow.run("Research task")
|
||||
assert result1 is not None
|
||||
|
||||
# Second run should use cached compilation
|
||||
result2 = workflow.run("Another research task")
|
||||
assert result2 is not None
|
||||
|
||||
|
||||
def test_graph_workflow_node_metadata():
|
||||
"""Test GraphWorkflow with node metadata"""
|
||||
# Create agents with different priorities and requirements
|
||||
high_priority_agent = create_test_agent(
|
||||
"High-Priority-Analyst", "High priority analysis specialist"
|
||||
)
|
||||
|
||||
standard_agent = create_test_agent(
|
||||
"Standard-Analyst", "Standard analysis agent"
|
||||
)
|
||||
|
||||
# Create workflow and add nodes with metadata
|
||||
workflow = GraphWorkflow(name="Metadata-Workflow")
|
||||
workflow.add_node(
|
||||
high_priority_agent,
|
||||
metadata={"priority": "high", "timeout": 60},
|
||||
)
|
||||
workflow.add_node(
|
||||
standard_agent, metadata={"priority": "normal", "timeout": 30}
|
||||
)
|
||||
|
||||
# Add execution dependency
|
||||
workflow.add_edge("High-Priority-Analyst", "Standard-Analyst")
|
||||
|
||||
# Test execution with metadata
|
||||
result = workflow.run(
|
||||
"Analyze business requirements with different priorities"
|
||||
)
|
||||
assert result is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@ -0,0 +1,84 @@
|
||||
from swarms.agents.i_agent import IterativeReflectiveExpansion
|
||||
|
||||
|
||||
def test_ire_agent_initialization():
|
||||
"""Test IRE agent initialization with default parameters"""
|
||||
agent = IterativeReflectiveExpansion()
|
||||
|
||||
assert agent is not None
|
||||
assert agent.agent_name == "General-Reasoning-Agent"
|
||||
assert agent.max_iterations == 5
|
||||
assert agent.output_type == "dict"
|
||||
assert agent.agent is not None
|
||||
|
||||
|
||||
def test_ire_agent_custom_initialization():
|
||||
"""Test IRE agent initialization with custom parameters"""
|
||||
agent = IterativeReflectiveExpansion(
|
||||
agent_name="Custom-IRE-Agent",
|
||||
description="A custom reasoning agent",
|
||||
max_iterations=3,
|
||||
model_name="gpt-4o",
|
||||
output_type="string",
|
||||
)
|
||||
|
||||
assert agent.agent_name == "Custom-IRE-Agent"
|
||||
assert agent.description == "A custom reasoning agent"
|
||||
assert agent.max_iterations == 3
|
||||
assert agent.output_type == "string"
|
||||
|
||||
|
||||
def test_ire_agent_execution():
|
||||
"""Test IRE agent execution with a simple problem"""
|
||||
agent = IterativeReflectiveExpansion(
|
||||
agent_name="Test-IRE-Agent",
|
||||
model_name="gpt-4o",
|
||||
max_iterations=2,
|
||||
output_type="dict",
|
||||
)
|
||||
|
||||
# Test with a simple reasoning task
|
||||
task = "What are three main benefits of renewable energy?"
|
||||
result = agent.run(task)
|
||||
|
||||
# Result should not be None
|
||||
assert result is not None
|
||||
# Result should be dict or string based on output_type
|
||||
assert isinstance(result, (str, dict))
|
||||
|
||||
|
||||
def test_ire_agent_generate_hypotheses():
|
||||
"""Test IRE agent hypothesis generation"""
|
||||
agent = IterativeReflectiveExpansion(
|
||||
agent_name="Hypothesis-Test-Agent",
|
||||
max_iterations=1,
|
||||
)
|
||||
|
||||
task = "How can we reduce carbon emissions?"
|
||||
hypotheses = agent.generate_initial_hypotheses(task)
|
||||
|
||||
assert hypotheses is not None
|
||||
assert isinstance(hypotheses, list)
|
||||
assert len(hypotheses) > 0
|
||||
|
||||
|
||||
def test_ire_agent_workflow():
|
||||
"""Test complete IRE agent workflow with iterative refinement"""
|
||||
agent = IterativeReflectiveExpansion(
|
||||
agent_name="Workflow-Test-Agent",
|
||||
description="Agent for testing complete workflow",
|
||||
model_name="gpt-4o",
|
||||
max_iterations=2,
|
||||
output_type="dict",
|
||||
)
|
||||
|
||||
# Test with a problem that requires iterative refinement
|
||||
task = "Design an efficient public transportation system for a small city"
|
||||
result = agent.run(task)
|
||||
|
||||
# Verify the result is valid
|
||||
assert result is not None
|
||||
assert isinstance(result, (str, dict))
|
||||
|
||||
# Check that conversation was populated during execution
|
||||
assert agent.conversation is not None
|
||||
Loading…
Reference in new issue