commit
45baaa732e
@ -0,0 +1,453 @@
|
||||
# AgentLoader Documentation
|
||||
|
||||
The `AgentLoader` is a powerful utility for creating Swarms agents from markdown files using the Claude Code sub-agent format. It supports both single and multiple markdown file loading, providing a flexible way to define and deploy agents using YAML frontmatter configuration.
|
||||
|
||||
## Overview
|
||||
|
||||
The AgentLoader enables you to:
|
||||
|
||||
- Load single agents from markdown files with YAML frontmatter
|
||||
- Load multiple agents from directories or file lists with concurrent processing
|
||||
- Parse Claude Code sub-agent YAML frontmatter configurations
|
||||
- Extract system prompts from markdown content
|
||||
- Utilize 100% CPU cores for high-performance batch loading
|
||||
- Provide comprehensive error handling and validation
|
||||
|
||||
## Installation
|
||||
|
||||
The AgentLoader is included with the Swarms framework:
|
||||
|
||||
```python
|
||||
from swarms.utils import AgentLoader, load_agent_from_markdown, load_agents_from_markdown
|
||||
```
|
||||
|
||||
## Markdown Format
|
||||
|
||||
The AgentLoader uses the Claude Code sub-agent YAML frontmatter format:
|
||||
|
||||
```markdown
|
||||
---
|
||||
name: your-sub-agent-name
|
||||
description: Description of when this subagent should be invoked
|
||||
model_name: gpt-4
|
||||
temperature: 0.3
|
||||
max_loops: 2
|
||||
mcp_url: http://example.com/mcp # optional
|
||||
---
|
||||
|
||||
Your subagent's system prompt goes here. This can be multiple paragraphs
|
||||
and should clearly define the subagent's role, capabilities, and approach
|
||||
to solving problems.
|
||||
|
||||
Include specific instructions, best practices, and any constraints
|
||||
the subagent should follow.
|
||||
```
|
||||
|
||||
**Schema Fields:**
|
||||
|
||||
- `name` (required): Your sub-agent name
|
||||
- `description` (required): Description of when this subagent should be invoked
|
||||
- `model_name` (optional): Name of model (defaults to random selection if not provided)
|
||||
- `temperature` (optional): Float value for model temperature (0.0-2.0)
|
||||
- `max_loops` (optional): Integer for maximum reasoning loops
|
||||
- `mcp_url` (optional): MCP server URL if needed
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Loading a Single Agent
|
||||
|
||||
```python
|
||||
from swarms.utils import load_agent_from_markdown
|
||||
|
||||
# Load agent from markdown file
|
||||
agent = load_agent_from_markdown("finance_advisor.md")
|
||||
|
||||
# Use the agent
|
||||
response = agent.run(
|
||||
"I have $10,000 to invest. What's a good strategy for a beginner?"
|
||||
)
|
||||
```
|
||||
|
||||
### Loading Multiple Agents (Concurrent)
|
||||
|
||||
```python
|
||||
from swarms.utils import load_agents_from_markdown
|
||||
|
||||
# Load agents from list of files with concurrent processing
|
||||
agents = load_agents_from_markdown([
|
||||
"market_researcher.md",
|
||||
"financial_analyst.md",
|
||||
"risk_analyst.md"
|
||||
], concurrent=True) # Uses all CPU cores for faster loading
|
||||
|
||||
# Use agents in a workflow
|
||||
from swarms.structs import SequentialWorkflow
|
||||
|
||||
workflow = SequentialWorkflow(
|
||||
agents=agents,
|
||||
max_loops=1
|
||||
)
|
||||
|
||||
task = "Analyze the AI healthcare market for a $50M investment."
|
||||
result = workflow.run(task)
|
||||
```
|
||||
|
||||
## Class-Based Usage
|
||||
|
||||
### AgentLoader Class
|
||||
|
||||
For more advanced usage, use the `AgentLoader` class directly:
|
||||
|
||||
```python
|
||||
from swarms.utils import AgentLoader
|
||||
|
||||
# Initialize loader
|
||||
loader = AgentLoader()
|
||||
|
||||
# Load single agent
|
||||
agent = loader.load_single_agent("path/to/agent.md")
|
||||
|
||||
# Load multiple agents with concurrent processing
|
||||
agents = loader.load_multiple_agents(
|
||||
"./agents_directory/",
|
||||
concurrent=True, # Enable concurrent processing
|
||||
max_workers=8 # Optional: limit worker threads
|
||||
)
|
||||
|
||||
# Parse markdown file without creating agent
|
||||
config = loader.parse_markdown_file("path/to/agent.md")
|
||||
print(config.name, config.description)
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
You can override default configuration when loading agents:
|
||||
|
||||
```python
|
||||
agent = load_agent_from_markdown(
|
||||
file_path="agent.md",
|
||||
max_loops=5,
|
||||
verbose=True,
|
||||
dashboard=True,
|
||||
autosave=False,
|
||||
context_length=200000
|
||||
)
|
||||
```
|
||||
|
||||
### Available Configuration Parameters
|
||||
|
||||
- `max_loops` (int): Maximum number of reasoning loops (default: 1)
|
||||
- `autosave` (bool): Enable automatic state saving (default: True)
|
||||
- `dashboard` (bool): Enable dashboard monitoring (default: False)
|
||||
- `verbose` (bool): Enable verbose logging (default: False)
|
||||
- `dynamic_temperature_enabled` (bool): Enable dynamic temperature (default: False)
|
||||
- `saved_state_path` (str): Path for saving agent state
|
||||
- `user_name` (str): User identifier (default: "default_user")
|
||||
- `retry_attempts` (int): Number of retry attempts (default: 3)
|
||||
- `context_length` (int): Maximum context length (default: 100000)
|
||||
- `return_step_meta` (bool): Return step metadata (default: False)
|
||||
- `output_type` (str): Output format type (default: "str")
|
||||
- `auto_generate_prompt` (bool): Auto-generate prompts (default: False)
|
||||
- `artifacts_on` (bool): Enable artifacts (default: False)
|
||||
|
||||
## Complete Example
|
||||
|
||||
### Example: Finance Advisor Agent
|
||||
|
||||
Create a file `finance_advisor.md`:
|
||||
|
||||
```markdown
|
||||
---
|
||||
name: FinanceAdvisor
|
||||
description: Expert financial advisor for investment and budgeting guidance
|
||||
model_name: gpt-4
|
||||
temperature: 0.7
|
||||
max_loops: 1
|
||||
---
|
||||
|
||||
You are an expert financial advisor with deep knowledge in:
|
||||
|
||||
- Investment strategies and portfolio management
|
||||
- Personal budgeting and financial planning
|
||||
- Risk assessment and diversification
|
||||
- Tax optimization strategies
|
||||
- Retirement planning
|
||||
|
||||
Your approach:
|
||||
|
||||
- Provide clear, actionable financial advice
|
||||
- Consider individual risk tolerance and goals
|
||||
- Explain complex concepts in simple terms
|
||||
- Always emphasize the importance of diversification
|
||||
- Include relevant disclaimers about financial advice
|
||||
|
||||
When analyzing financial situations:
|
||||
|
||||
1. Assess current financial position
|
||||
2. Identify short-term and long-term goals
|
||||
3. Evaluate risk tolerance
|
||||
4. Recommend appropriate strategies
|
||||
5. Suggest specific action steps
|
||||
```
|
||||
|
||||
### Loading and Using the Agent
|
||||
|
||||
```python
|
||||
from swarms.utils import load_agent_from_markdown
|
||||
|
||||
# Load the Finance Advisor agent
|
||||
agent = load_agent_from_markdown("finance_advisor.md")
|
||||
|
||||
# Use the agent for financial advice
|
||||
response = agent.run(
|
||||
"I have $10,000 to invest. What's a good strategy for a beginner?"
|
||||
)
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
The AgentLoader provides comprehensive error handling:
|
||||
|
||||
```python
|
||||
from swarms.utils import AgentLoader
|
||||
|
||||
loader = AgentLoader()
|
||||
|
||||
try:
|
||||
# This will raise FileNotFoundError
|
||||
agent = loader.load_single_agent("nonexistent.md")
|
||||
except FileNotFoundError as e:
|
||||
print(f"File not found: {e}")
|
||||
|
||||
try:
|
||||
# This will handle parsing errors gracefully
|
||||
agents = loader.load_multiple_agents("./invalid_directory/")
|
||||
print(f"Successfully loaded {len(agents)} agents")
|
||||
except Exception as e:
|
||||
print(f"Error loading agents: {e}")
|
||||
```
|
||||
|
||||
## Concurrent Processing Features
|
||||
|
||||
### Multi-Core Performance
|
||||
|
||||
The AgentLoader utilizes 100% of CPU cores for concurrent agent loading, providing significant performance improvements when processing multiple markdown files:
|
||||
|
||||
```python
|
||||
from swarms.utils import load_agents_from_markdown
|
||||
|
||||
# Automatic concurrent processing for multiple files
|
||||
agents = load_agents_from_markdown([
|
||||
"agent1.md", "agent2.md", "agent3.md", "agent4.md"
|
||||
]) # concurrent=True by default
|
||||
|
||||
# Manual control over concurrency
|
||||
agents = load_agents_from_markdown(
|
||||
"./agents_directory/",
|
||||
concurrent=True, # Enable concurrent processing
|
||||
max_workers=8 # Limit to 8 worker threads
|
||||
)
|
||||
|
||||
# Disable concurrency for debugging or single files
|
||||
agents = load_agents_from_markdown(
|
||||
["single_agent.md"],
|
||||
concurrent=False # Sequential processing
|
||||
)
|
||||
```
|
||||
|
||||
### Resource Management
|
||||
|
||||
```python
|
||||
# Default: Uses all CPU cores
|
||||
agents = load_agents_from_markdown(files, concurrent=True)
|
||||
|
||||
# Custom worker count for resource control
|
||||
agents = load_agents_from_markdown(
|
||||
files,
|
||||
concurrent=True,
|
||||
max_workers=4 # Limit to 4 threads
|
||||
)
|
||||
|
||||
# ThreadPoolExecutor automatically manages:
|
||||
# - Thread lifecycle
|
||||
# - Resource cleanup
|
||||
# - Exception handling
|
||||
# - Result collection
|
||||
```
|
||||
|
||||
## Advanced Features
|
||||
|
||||
### Custom System Prompt Building
|
||||
|
||||
The AgentLoader automatically builds comprehensive system prompts from the markdown structure:
|
||||
|
||||
```python
|
||||
loader = AgentLoader()
|
||||
config = loader.parse_markdown_file("agent.md")
|
||||
|
||||
# The system prompt includes:
|
||||
# - Role description from the table
|
||||
# - Focus areas as bullet points
|
||||
# - Approach as numbered steps
|
||||
# - Expected outputs as deliverables
|
||||
|
||||
print("Generated System Prompt:")
|
||||
print(config.system_prompt)
|
||||
```
|
||||
|
||||
|
||||
## Integration with Swarms
|
||||
|
||||
The loaded agents are fully compatible with Swarms orchestration systems:
|
||||
|
||||
```python
|
||||
from swarms.utils import load_agents_from_markdown
|
||||
from swarms.structs import SequentialWorkflow
|
||||
|
||||
# Load multiple specialized agents
|
||||
agents = load_agents_from_markdown("./specialist_agents/")
|
||||
|
||||
# Create a sequential workflow
|
||||
workflow = SequentialWorkflow(
|
||||
agents=agents,
|
||||
max_loops=1
|
||||
)
|
||||
|
||||
# Execute complex task across multiple agents
|
||||
result = workflow.run("Conduct a comprehensive system audit")
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Consistent Naming**: Use clear, descriptive agent names
|
||||
2. **Detailed Descriptions**: Provide comprehensive role descriptions
|
||||
3. **Structured Sections**: Use the optional sections to define agent behavior
|
||||
4. **Error Handling**: Always wrap agent loading in try-catch blocks
|
||||
5. **Model Selection**: Choose appropriate models based on agent complexity
|
||||
6. **Configuration**: Override defaults when specific behavior is needed
|
||||
|
||||
|
||||
## API Reference
|
||||
|
||||
### AgentLoader Class
|
||||
|
||||
```python
|
||||
class AgentLoader:
|
||||
def __init__(self, model: Optional[LiteLLM] = None)
|
||||
def parse_markdown_file(self, file_path: str) -> MarkdownAgentConfig
|
||||
def load_single_agent(self, file_path: str, **kwargs) -> Agent
|
||||
def load_multiple_agents(self, file_paths: Union[str, List[str]], **kwargs) -> List[Agent]
|
||||
```
|
||||
|
||||
### Convenience Functions
|
||||
|
||||
```python
|
||||
def load_agent_from_markdown(file_path: str, **kwargs) -> Agent
|
||||
def load_agents_from_markdown(
|
||||
file_paths: Union[str, List[str]],
|
||||
concurrent: bool = True, # Enable concurrent processing
|
||||
max_workers: Optional[int] = None, # Max worker threads (defaults to CPU count)
|
||||
**kwargs
|
||||
) -> List[Agent]
|
||||
```
|
||||
|
||||
### Configuration Model
|
||||
|
||||
```python
|
||||
class MarkdownAgentConfig(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
model_name: Optional[str] = "gpt-4"
|
||||
temperature: Optional[float] = 0.1 # Model temperature (0.0-2.0)
|
||||
mcp_url: Optional[str] = None # Optional MCP server URL
|
||||
system_prompt: str
|
||||
max_loops: int = 1
|
||||
autosave: bool = False
|
||||
dashboard: bool = False
|
||||
verbose: bool = False
|
||||
# ... additional configuration fields
|
||||
```
|
||||
|
||||
## Examples Repository
|
||||
|
||||
Find complete working examples in the `examples/agent_loader/` directory:
|
||||
|
||||
### Single Agent Example (`agent_loader_demo.py`)
|
||||
```python
|
||||
from swarms.utils import load_agent_from_markdown
|
||||
|
||||
agent = load_agent_from_markdown("finance_advisor.md")
|
||||
|
||||
agent.run(
|
||||
task="Analyze the financial market trends for 2023."
|
||||
)
|
||||
```
|
||||
|
||||
### Multi-Agent Workflow Example (`multi_agents_loader_demo.py`)
|
||||
```python
|
||||
from swarms.utils import load_agents_from_markdown
|
||||
|
||||
agents = load_agents_from_markdown([
|
||||
"market_researcher.md",
|
||||
"financial_analyst.md",
|
||||
"risk_analyst.md"
|
||||
])
|
||||
|
||||
# Use agents in a workflow
|
||||
from swarms.structs.sequential_workflow import SequentialWorkflow
|
||||
|
||||
workflow = SequentialWorkflow(
|
||||
agents=agents,
|
||||
max_loops=1
|
||||
)
|
||||
|
||||
task = """
|
||||
Analyze the AI healthcare market for a $50M investment opportunity.
|
||||
Focus on market size, competition, financials, and risks.
|
||||
"""
|
||||
|
||||
result = workflow.run(task)
|
||||
```
|
||||
|
||||
### Sample Agent Definition (`finance_advisor.md`)
|
||||
```markdown
|
||||
---
|
||||
name: FinanceAdvisor
|
||||
description: Expert financial advisor for investment and budgeting guidance
|
||||
model_name: gpt-4o
|
||||
temperature: 0.7
|
||||
max_loops: 1
|
||||
---
|
||||
|
||||
You are an expert financial advisor with deep knowledge in:
|
||||
|
||||
- Investment strategies and portfolio management
|
||||
- Personal budgeting and financial planning
|
||||
- Risk assessment and diversification
|
||||
- Tax optimization strategies
|
||||
- Retirement planning
|
||||
|
||||
Your approach:
|
||||
|
||||
- Provide clear, actionable financial advice
|
||||
- Consider individual risk tolerance and goals
|
||||
- Explain complex concepts in simple terms
|
||||
- Always emphasize the importance of diversification
|
||||
- Include relevant disclaimers about financial advice
|
||||
|
||||
When analyzing financial situations:
|
||||
|
||||
1. Assess current financial position
|
||||
2. Identify short-term and long-term goals
|
||||
3. Evaluate risk tolerance
|
||||
4. Recommend appropriate strategies
|
||||
5. Suggest specific action steps
|
||||
```
|
||||
|
||||
## Support
|
||||
|
||||
For questions and support:
|
||||
|
||||
- GitHub Issues: [https://github.com/kyegomez/swarms/issues](https://github.com/kyegomez/swarms/issues)
|
||||
- Documentation: [https://docs.swarms.world](https://docs.swarms.world)
|
||||
- Community: Join our Discord for real-time support
|
||||
@ -0,0 +1,7 @@
|
||||
from swarms.utils import load_agent_from_markdown
|
||||
|
||||
agent = load_agent_from_markdown("finance_advisor.md")
|
||||
|
||||
agent.run(
|
||||
task="Analyze the financial market trends for 2023."
|
||||
)
|
||||
@ -0,0 +1,28 @@
|
||||
---
|
||||
name: FinanceAdvisor
|
||||
description: Expert financial advisor for investment and budgeting guidance
|
||||
model_name: gpt-4o
|
||||
temperature: 0.7
|
||||
max_loops: 1
|
||||
---
|
||||
|
||||
You are an expert financial advisor with deep knowledge in:
|
||||
- Investment strategies and portfolio management
|
||||
- Personal budgeting and financial planning
|
||||
- Risk assessment and diversification
|
||||
- Tax optimization strategies
|
||||
- Retirement planning
|
||||
|
||||
Your approach:
|
||||
- Provide clear, actionable financial advice
|
||||
- Consider individual risk tolerance and goals
|
||||
- Explain complex concepts in simple terms
|
||||
- Always emphasize the importance of diversification
|
||||
- Include relevant disclaimers about financial advice
|
||||
|
||||
When analyzing financial situations:
|
||||
1. Assess current financial position
|
||||
2. Identify short-term and long-term goals
|
||||
3. Evaluate risk tolerance
|
||||
4. Recommend appropriate strategies
|
||||
5. Suggest specific action steps
|
||||
@ -0,0 +1,22 @@
|
||||
from swarms.utils import load_agents_from_markdown
|
||||
|
||||
agents = load_agents_from_markdown([
|
||||
"market_researcher.md",
|
||||
"financial_analyst.md",
|
||||
"risk_analyst.md"
|
||||
])
|
||||
|
||||
# Example 3: Use agents in a workflow
|
||||
from swarms.structs.sequential_workflow import SequentialWorkflow
|
||||
|
||||
workflow = SequentialWorkflow(
|
||||
agents=agents,
|
||||
max_loops=1
|
||||
)
|
||||
|
||||
task = """
|
||||
Analyze the AI healthcare market for a $50M investment opportunity.
|
||||
Focus on market size, competition, financials, and risks.
|
||||
"""
|
||||
|
||||
result = workflow.run(task)
|
||||
@ -0,0 +1,51 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Basic Graph Workflow Example
|
||||
|
||||
A minimal example showing how to use GraphWorkflow with backend selection.
|
||||
"""
|
||||
|
||||
from swarms.structs.graph_workflow import GraphWorkflow
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
agent_one = Agent(agent_name="research_agent", model="gpt-4o-mini")
|
||||
agent_two = Agent(
|
||||
agent_name="research_agent_two", model="gpt-4o-mini"
|
||||
)
|
||||
agent_three = Agent(
|
||||
agent_name="research_agent_three", model="gpt-4o-mini"
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Run a basic graph workflow example without print statements.
|
||||
"""
|
||||
# Create agents
|
||||
|
||||
# Create workflow with backend selection
|
||||
workflow = GraphWorkflow(
|
||||
name="Basic Example",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# Add agents to workflow
|
||||
workflow.add_node(agent_one)
|
||||
workflow.add_node(agent_two)
|
||||
workflow.add_node(agent_three)
|
||||
|
||||
# Create simple chain using the actual agent names
|
||||
workflow.add_edge("research_agent", "research_agent_two")
|
||||
workflow.add_edge("research_agent_two", "research_agent_three")
|
||||
|
||||
# Compile the workflow
|
||||
workflow.compile()
|
||||
|
||||
# Run the workflow
|
||||
task = "Complete a simple task"
|
||||
results = workflow.run(task)
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -1,19 +0,0 @@
|
||||
from swarms.sims.senator_assembly import SenatorAssembly
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Simulate a Senate vote on a bill to invade Cuba and claim it as the 51st state.
|
||||
|
||||
This function initializes the SenatorAssembly and runs a concurrent vote simulation
|
||||
on the specified bill.
|
||||
"""
|
||||
senator_simulation = SenatorAssembly()
|
||||
senator_simulation.simulate_vote_concurrent(
|
||||
"A bill proposing to deregulate the IPO (Initial Public Offering) market in the United States as extensively as possible. The bill seeks to remove or significantly reduce existing regulatory requirements and oversight for companies seeking to go public, with the aim of increasing market efficiency and access to capital. Senators must consider the potential economic, legal, and ethical consequences of such broad deregulation, and cast their votes accordingly.",
|
||||
batch_size=10,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,39 @@
|
||||
"""
|
||||
Bell Labs Research Simulation Example
|
||||
|
||||
This example demonstrates how to use the BellLabsSwarm to simulate
|
||||
collaborative research among famous physicists.
|
||||
"""
|
||||
|
||||
from swarms.sims.bell_labs import (
|
||||
run_bell_labs_research,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Run the Bell Labs research simulation.
|
||||
|
||||
This example asks the research question:
|
||||
"Why doesn't physics take a vacation? Why are the laws of physics consistent?"
|
||||
"""
|
||||
|
||||
research_question = """
|
||||
Why doesn't physics take a vacation? Why are the laws of physics consistent across time and space?
|
||||
Explore the philosophical and scientific foundations for the uniformity and invariance of physical laws.
|
||||
Consider both theoretical explanations and any empirical evidence or challenges to this consistency.
|
||||
"""
|
||||
|
||||
# Run the research simulation
|
||||
results = run_bell_labs_research(
|
||||
research_question=research_question,
|
||||
max_loops=1,
|
||||
model_name="claude-3-5-sonnet-20240620",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
print(results)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,29 @@
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Run a quantitative trading agent to recommend top 3 gold ETFs.
|
||||
"""
|
||||
agent = Agent(
|
||||
agent_name="Quantitative-Trading-Agent",
|
||||
agent_description="Advanced quantitative trading and algorithmic analysis agent",
|
||||
system_prompt=(
|
||||
"You are an expert quantitative trading agent. "
|
||||
"Recommend the best gold ETFs using your expertise in trading strategies, "
|
||||
"risk management, and financial analysis. Be concise and precise."
|
||||
),
|
||||
model_name="claude-sonnet-4-20250514",
|
||||
dynamic_temperature_enabled=True,
|
||||
max_loops=1,
|
||||
dynamic_context_window=True,
|
||||
)
|
||||
|
||||
out = agent.run(
|
||||
task="What are the best top 3 etfs for gold coverage?"
|
||||
)
|
||||
print(out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,206 @@
|
||||
"""
|
||||
Claude Code Agent Tool - Setup Guide
|
||||
|
||||
This tool provides a Claude Code Agent that can:
|
||||
- Generate code and applications from natural language descriptions
|
||||
- Write files, execute shell commands, and manage Git repositories
|
||||
- Perform web searches and file operations
|
||||
- Handle complex development tasks with retry logic
|
||||
|
||||
SETUP GUIDE:
|
||||
1. Install dependencies:
|
||||
pip install claude-code-sdk
|
||||
npm install -g @anthropic-ai/claude-code
|
||||
|
||||
2. Set environment variable:
|
||||
export ANTHROPIC_API_KEY="your-api-key-here"
|
||||
|
||||
3. Use the tool:
|
||||
from claude_as_a_tool import developer_worker_agent
|
||||
|
||||
result = developer_worker_agent(
|
||||
task="Create a Python web scraper",
|
||||
system_prompt="You are a helpful coding assistant"
|
||||
)
|
||||
|
||||
REQUIRED: ANTHROPIC_API_KEY environment variable must be set
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Dict
|
||||
|
||||
from claude_code_sdk import ClaudeCodeOptions, ClaudeSDKClient
|
||||
from dotenv import load_dotenv
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
from loguru import logger
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class ClaudeAppGenerator:
|
||||
"""
|
||||
Generates applications using Claude Code SDK based on specifications.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "Developer Worker Agent",
|
||||
description: str = "A developer worker agent that can generate code and write it to a file.",
|
||||
retries: int = 3,
|
||||
retry_delay: float = 2.0,
|
||||
system_prompt: str = None,
|
||||
debug_mode: bool = False,
|
||||
max_steps: int = 40,
|
||||
model: str = "claude-sonnet-4-20250514",
|
||||
max_thinking_tokens: int = 1000,
|
||||
):
|
||||
"""
|
||||
Initialize the app generator.
|
||||
|
||||
Args:
|
||||
name: Name of the app
|
||||
description: Description of the app
|
||||
retries: Number of retries
|
||||
retry_delay: Delay between retries
|
||||
system_prompt: System prompt
|
||||
debug_mode: Enable extra verbose logging for Claude outputs
|
||||
max_steps: Maximum number of steps
|
||||
model: Model to use
|
||||
"""
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.retries = retries
|
||||
self.retry_delay = retry_delay
|
||||
self.system_prompt = system_prompt
|
||||
self.model = model
|
||||
self.debug_mode = debug_mode
|
||||
self.max_steps = max_steps
|
||||
self.max_thinking_tokens = max_thinking_tokens
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=15),
|
||||
)
|
||||
async def generate_app_with_claude(
|
||||
self, task: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate app using Claude Code SDK with robust error handling and retry logic.
|
||||
|
||||
Args:
|
||||
task: Task to be completed
|
||||
|
||||
Returns:
|
||||
Dict containing generation results
|
||||
"""
|
||||
# Log the Claude SDK configuration
|
||||
claude_options = ClaudeCodeOptions(
|
||||
system_prompt=self.system_prompt,
|
||||
max_turns=self.max_steps, # Sufficient for local app development and GitHub setup
|
||||
allowed_tools=[
|
||||
"Read",
|
||||
"Write",
|
||||
"Bash",
|
||||
"GitHub",
|
||||
"Git",
|
||||
"Grep",
|
||||
"WebSearch",
|
||||
],
|
||||
continue_conversation=True, # Start fresh each time
|
||||
model=self.model,
|
||||
max_thinking_tokens=self.max_thinking_tokens,
|
||||
)
|
||||
|
||||
async with ClaudeSDKClient(options=claude_options) as client:
|
||||
# Generate the application
|
||||
await client.query(task)
|
||||
|
||||
response_text = []
|
||||
message_count = 0
|
||||
|
||||
async for message in client.receive_response():
|
||||
message_count += 1
|
||||
|
||||
if hasattr(message, "content"):
|
||||
for block in message.content:
|
||||
if hasattr(block, "text"):
|
||||
text_content = block.text
|
||||
response_text.append(text_content)
|
||||
logger.info(text_content)
|
||||
|
||||
elif hasattr(block, "type"):
|
||||
if self.debug_mode and hasattr(
|
||||
block, "input"
|
||||
):
|
||||
input_str = str(block.input)
|
||||
if len(input_str) > 200:
|
||||
input_str = (
|
||||
input_str[:200]
|
||||
+ "... (truncated)"
|
||||
)
|
||||
print(f"Tool Input: {input_str}")
|
||||
|
||||
elif type(message).__name__ == "ResultMessage":
|
||||
result_text = str(message.result)
|
||||
response_text.append(result_text)
|
||||
|
||||
return response_text
|
||||
|
||||
def run(self, task: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Synchronous wrapper for app generation to work with ThreadPoolExecutor.
|
||||
|
||||
Args:
|
||||
spec: App specification
|
||||
|
||||
Returns:
|
||||
Dict containing generation results
|
||||
"""
|
||||
return asyncio.run(self.generate_app_with_claude(task))
|
||||
|
||||
|
||||
def developer_worker_agent(task: str, system_prompt: str) -> str:
|
||||
"""
|
||||
Developer Worker Agent
|
||||
|
||||
This function instantiates a ClaudeAppGenerator agent, which is a highly capable developer assistant designed to automate software development tasks.
|
||||
The agent leverages the Claude Code SDK to interpret natural language instructions and generate code, scripts, or even entire applications.
|
||||
It can interact with files, execute shell commands, perform web searches, and utilize version control systems such as Git and GitHub.
|
||||
The agent is robust, featuring retry logic, customizable system prompts, and debug modes for verbose output.
|
||||
It is ideal for automating repetitive coding tasks, prototyping, and integrating with developer workflows.
|
||||
|
||||
Capabilities:
|
||||
- Generate code based on detailed task descriptions.
|
||||
- Write generated code to files.
|
||||
- Execute shell commands and scripts.
|
||||
- Interact with Git and GitHub for version control operations.
|
||||
- Perform web searches to gather information or code snippets.
|
||||
- Provide detailed logs and debugging information if enabled.
|
||||
- Handle errors gracefully with configurable retry logic.
|
||||
|
||||
Args:
|
||||
task (str): The development task or instruction for the agent to complete.
|
||||
system_prompt (str): The system prompt to guide the agent's behavior and context.
|
||||
|
||||
Returns:
|
||||
str: The result of the agent's execution for the given task.
|
||||
"""
|
||||
claude_code_sdk = ClaudeAppGenerator(system_prompt=system_prompt)
|
||||
return claude_code_sdk.run(task)
|
||||
|
||||
|
||||
# agent = Agent(
|
||||
# agent_name="Developer Worker Agent",
|
||||
# agent_description="A developer worker agent that can generate code and write it to a file.",
|
||||
# tools=[developer_worker_agent],
|
||||
# system_prompt="You are a developer worker agent. You are given a task and you need to complete it.",
|
||||
# )
|
||||
|
||||
# agent.run(
|
||||
# task="Write a simple python script that prints 'Hello, World!'"
|
||||
# )
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# task = "Write a simple python script that prints 'Hello, World!'"
|
||||
# system_prompt = "You are a developer worker agent. You are given a task and you need to complete it."
|
||||
# print(developer_worker_agent(task, system_prompt))
|
||||
@ -1,170 +0,0 @@
|
||||
from loguru import logger
|
||||
from swarms.structs.swarm_eval import (
|
||||
SwarmEvaluator,
|
||||
PRESET_DATASETS,
|
||||
)
|
||||
|
||||
import os
|
||||
from swarms import Agent
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from swarm_models import OpenAIChat
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
model = OpenAIChat(
|
||||
model_name="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free",
|
||||
openai_api_key=os.getenv("TOGETHER_API_KEY"),
|
||||
base_url="https://api.together.xyz/v1",
|
||||
)
|
||||
|
||||
# Define system prompts for reasoning agents
|
||||
THINKING_AGENT_PROMPT = """You are a sophisticated analytical and strategic thinking agent focused on deep problem analysis and solution design.
|
||||
|
||||
Your core capabilities include:
|
||||
1. Comprehensive Problem Analysis
|
||||
- Break down complex problems into constituent elements
|
||||
- Map relationships and dependencies between components
|
||||
- Identify root causes and underlying patterns
|
||||
- Consider historical context and precedents
|
||||
|
||||
2. Multi-Perspective Evaluation
|
||||
- Examine issues from multiple stakeholder viewpoints
|
||||
- Consider short-term and long-term implications
|
||||
- Evaluate social, economic, technical, and ethical dimensions
|
||||
- Challenge assumptions and identify potential biases
|
||||
|
||||
3. Risk Assessment and Mitigation
|
||||
- Conduct thorough risk analysis across scenarios
|
||||
- Identify potential failure modes and edge cases
|
||||
- Develop contingency plans and mitigation strategies
|
||||
- Assess probability and impact of various outcomes
|
||||
|
||||
4. Strategic Solution Development
|
||||
- Generate multiple solution approaches
|
||||
- Evaluate trade-offs between different strategies
|
||||
- Consider resource constraints and limitations
|
||||
- Design scalable and sustainable solutions
|
||||
|
||||
5. Decision Framework Creation
|
||||
- Establish clear evaluation criteria
|
||||
- Weight competing priorities appropriately
|
||||
- Create structured decision matrices
|
||||
- Document reasoning and key decision factors
|
||||
|
||||
6. Systems Thinking
|
||||
- Map interconnections between system elements
|
||||
- Identify feedback loops and cascade effects
|
||||
- Consider emergent properties and behaviors
|
||||
- Account for dynamic system evolution
|
||||
|
||||
Your output should always include:
|
||||
- Clear articulation of your analytical process
|
||||
- Key assumptions and their justification
|
||||
- Potential risks and mitigation strategies
|
||||
- Multiple solution options with pros/cons
|
||||
- Specific recommendations with supporting rationale
|
||||
- Areas of uncertainty requiring further investigation
|
||||
|
||||
Focus on developing robust, well-reasoned strategies that account for complexity while remaining practical and actionable."""
|
||||
|
||||
ACTION_AGENT_PROMPT = """You are an advanced implementation and execution agent focused on turning strategic plans into concrete results.
|
||||
|
||||
Your core capabilities include:
|
||||
1. Strategic Implementation Planning
|
||||
- Break down high-level strategies into specific actions
|
||||
- Create detailed project roadmaps and timelines
|
||||
- Identify critical path dependencies
|
||||
- Establish clear milestones and success metrics
|
||||
- Design feedback and monitoring mechanisms
|
||||
|
||||
2. Resource Optimization
|
||||
- Assess resource requirements and constraints
|
||||
- Optimize resource allocation and scheduling
|
||||
- Identify efficiency opportunities
|
||||
- Plan for scalability and flexibility
|
||||
- Manage competing priorities effectively
|
||||
|
||||
3. Execution Management
|
||||
- Develop detailed implementation procedures
|
||||
- Create clear operational guidelines
|
||||
- Establish quality control measures
|
||||
- Design progress tracking systems
|
||||
- Build in review and adjustment points
|
||||
|
||||
4. Risk Management
|
||||
- Implement specific risk mitigation measures
|
||||
- Create early warning systems
|
||||
- Develop contingency procedures
|
||||
- Establish fallback positions
|
||||
- Monitor risk indicators
|
||||
|
||||
5. Stakeholder Management
|
||||
- Identify key stakeholders and their needs
|
||||
- Create communication plans
|
||||
- Establish feedback mechanisms
|
||||
- Manage expectations effectively
|
||||
- Build support and buy-in
|
||||
|
||||
6. Continuous Improvement
|
||||
- Monitor implementation effectiveness
|
||||
- Gather and analyze performance data
|
||||
- Identify improvement opportunities
|
||||
- Implement iterative enhancements
|
||||
- Document lessons learned
|
||||
|
||||
Your output should always include:
|
||||
- Detailed action plans with specific steps
|
||||
- Resource requirements and allocation plans
|
||||
- Timeline with key milestones
|
||||
- Success metrics and monitoring approach
|
||||
- Risk mitigation procedures
|
||||
- Communication and stakeholder management plans
|
||||
- Quality control measures
|
||||
- Feedback and adjustment mechanisms
|
||||
|
||||
Focus on practical, efficient, and effective implementation while maintaining high quality standards and achieving desired outcomes."""
|
||||
|
||||
# Initialize the thinking agent
|
||||
thinking_agent = Agent(
|
||||
agent_name="Strategic-Thinker",
|
||||
agent_description="Deep analysis and strategic planning agent",
|
||||
system_prompt=THINKING_AGENT_PROMPT,
|
||||
max_loops=1,
|
||||
llm=model,
|
||||
dynamic_temperature_enabled=True,
|
||||
)
|
||||
|
||||
|
||||
class DeepSeekSwarm:
|
||||
def __init__(self):
|
||||
self.thinking_agent = thinking_agent
|
||||
|
||||
def run(self, task: str):
|
||||
first_one = self.thinking_agent.run(task)
|
||||
|
||||
return self.thinking_agent.run(first_one)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Initialize the swarm (replace with your actual multi-agent system)
|
||||
swarm = DeepSeekSwarm()
|
||||
|
||||
# Initialize the evaluator with the swarm instance
|
||||
evaluator = SwarmEvaluator(swarm)
|
||||
|
||||
logger.info("Starting evaluation for dataset: gsm8k")
|
||||
|
||||
# For demonstration, we use 4 concurrent workers, show progress, and save results.
|
||||
results = evaluator.evaluate(
|
||||
"gsm8k",
|
||||
split="train",
|
||||
config=PRESET_DATASETS["gsm8k"],
|
||||
max_workers=os.cpu_count(),
|
||||
max_retries=3,
|
||||
show_progress=True,
|
||||
output_file="gsm8k_results.txt",
|
||||
)
|
||||
|
||||
logger.info(f"Results for gsm8k: {results}")
|
||||
@ -0,0 +1,23 @@
|
||||
from swarms.sims.senator_assembly import SenatorAssembly
|
||||
|
||||
|
||||
def main():
|
||||
senator_simulation = SenatorAssembly(
|
||||
model_name="claude-sonnet-4-20250514"
|
||||
)
|
||||
senator_simulation.simulate_vote_concurrent(
|
||||
(
|
||||
"A bill proposing a significant reduction in federal income tax rates for all American citizens. "
|
||||
"The legislation aims to lower tax brackets across the board, increase the standard deduction, "
|
||||
"and provide additional tax relief for middle- and lower-income families. Proponents argue that "
|
||||
"the bill will stimulate economic growth, increase disposable income, and enhance consumer spending. "
|
||||
"Opponents raise concerns about the potential impact on the federal deficit, funding for public services, "
|
||||
"and long-term fiscal responsibility. Senators must weigh the economic, social, and budgetary implications "
|
||||
"before casting their votes."
|
||||
),
|
||||
batch_size=10,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,816 @@
|
||||
"""
|
||||
Bell Labs Research Simulation with Physicist Agents
|
||||
|
||||
This simulation creates specialized AI agents representing famous physicists
|
||||
from the Bell Labs era, including Oppenheimer, von Neumann, Feynman, Einstein,
|
||||
and others. The agents work together in a collaborative research environment
|
||||
following a structured workflow: task -> Oppenheimer (planning) -> physicist discussion
|
||||
-> code implementation -> results analysis -> repeat for n loops.
|
||||
"""
|
||||
|
||||
from functools import lru_cache
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.conversation import Conversation
|
||||
from swarms.utils.history_output_formatter import (
|
||||
history_output_formatter,
|
||||
)
|
||||
|
||||
# from examples.tools.claude_as_a_tool import developer_worker_agent
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def _create_physicist_agents(
|
||||
model_name: str, random_model_name: bool = False
|
||||
) -> List[Agent]:
|
||||
"""
|
||||
Create specialized agents for each physicist.
|
||||
|
||||
Args:
|
||||
model_name: Model to use for all agents
|
||||
|
||||
Returns:
|
||||
List of configured physicist agents
|
||||
"""
|
||||
physicists_data = {
|
||||
"J. Robert Oppenheimer": {
|
||||
"role": "Research Director & Theoretical Physicist",
|
||||
"expertise": [
|
||||
"Nuclear physics",
|
||||
"Quantum mechanics",
|
||||
"Research coordination",
|
||||
"Strategic planning",
|
||||
"Team leadership",
|
||||
],
|
||||
"background": "Director of the Manhattan Project, expert in quantum mechanics and nuclear physics",
|
||||
"system_prompt": """You are J. Robert Oppenheimer, the brilliant theoretical physicist and research director.
|
||||
|
||||
Your role is to:
|
||||
1. Analyze complex research questions and break them down into manageable components
|
||||
2. Create comprehensive research plans with clear objectives and methodologies
|
||||
3. Coordinate the research team and ensure effective collaboration
|
||||
4. Synthesize findings from different physicists into coherent conclusions
|
||||
5. Guide the research process with strategic insights and theoretical frameworks
|
||||
|
||||
You excel at:
|
||||
- Identifying the core theoretical challenges in any research question
|
||||
- Designing experimental approaches that test fundamental principles
|
||||
- Balancing theoretical rigor with practical implementation
|
||||
- Fostering interdisciplinary collaboration between specialists
|
||||
- Maintaining focus on the most promising research directions
|
||||
|
||||
When creating research plans, be thorough, systematic, and consider multiple approaches.
|
||||
Always emphasize the theoretical foundations and experimental validation of any proposed solution.""",
|
||||
},
|
||||
"John von Neumann": {
|
||||
"role": "Mathematical Physicist & Computer Scientist",
|
||||
"expertise": [
|
||||
"Mathematical physics",
|
||||
"Computer architecture",
|
||||
"Game theory",
|
||||
"Quantum mechanics",
|
||||
"Numerical methods",
|
||||
],
|
||||
"background": "Pioneer of computer science, game theory, and mathematical physics",
|
||||
"system_prompt": """You are John von Neumann, the brilliant mathematical physicist and computer scientist.
|
||||
|
||||
Your approach to research questions involves:
|
||||
1. Mathematical rigor and formal mathematical frameworks
|
||||
2. Computational and algorithmic solutions to complex problems
|
||||
3. Game theory and strategic analysis of research approaches
|
||||
4. Numerical methods and computational physics
|
||||
5. Bridging abstract theory with practical implementation
|
||||
|
||||
You excel at:
|
||||
- Formulating problems in precise mathematical terms
|
||||
- Developing computational algorithms and numerical methods
|
||||
- Applying game theory to optimize research strategies
|
||||
- Creating mathematical models that capture complex phenomena
|
||||
- Designing efficient computational approaches to physical problems
|
||||
|
||||
When analyzing research questions, focus on mathematical foundations, computational feasibility,
|
||||
and the development of rigorous theoretical frameworks that can be implemented and tested.""",
|
||||
},
|
||||
"Richard Feynman": {
|
||||
"role": "Theoretical Physicist & Problem Solver",
|
||||
"expertise": [
|
||||
"Quantum electrodynamics",
|
||||
"Particle physics",
|
||||
"Problem-solving methodology",
|
||||
"Intuitive physics",
|
||||
"Experimental design",
|
||||
],
|
||||
"background": "Nobel laureate in physics, known for intuitive problem-solving and quantum electrodynamics",
|
||||
"system_prompt": """You are Richard Feynman, the brilliant theoretical physicist and master problem solver.
|
||||
|
||||
Your research methodology involves:
|
||||
1. Intuitive understanding of complex physical phenomena
|
||||
2. Creative problem-solving approaches that cut through complexity
|
||||
3. Experimental design that tests fundamental principles
|
||||
4. Clear communication of complex ideas through analogies and examples
|
||||
5. Focus on the most essential aspects of any research question
|
||||
|
||||
You excel at:
|
||||
- Finding elegant solutions to seemingly intractable problems
|
||||
- Designing experiments that reveal fundamental truths
|
||||
- Communicating complex physics in accessible terms
|
||||
- Identifying the core physics behind any phenomenon
|
||||
- Developing intuitive models that capture essential behavior
|
||||
|
||||
When approaching research questions, look for the simplest, most elegant solutions.
|
||||
Focus on the fundamental physics and design experiments that test your understanding directly.""",
|
||||
},
|
||||
"Albert Einstein": {
|
||||
"role": "Theoretical Physicist & Conceptual Innovator",
|
||||
"expertise": [
|
||||
"Relativity theory",
|
||||
"Quantum mechanics",
|
||||
"Conceptual physics",
|
||||
"Thought experiments",
|
||||
"Fundamental principles",
|
||||
],
|
||||
"background": "Revolutionary physicist who developed relativity theory and influenced quantum mechanics",
|
||||
"system_prompt": """You are Albert Einstein, the revolutionary theoretical physicist and conceptual innovator.
|
||||
|
||||
Your research approach involves:
|
||||
1. Deep conceptual thinking about fundamental physical principles
|
||||
2. Thought experiments that reveal the essence of physical phenomena
|
||||
3. Questioning established assumptions and exploring new paradigms
|
||||
4. Focus on the most fundamental and universal aspects of physics
|
||||
5. Intuitive understanding of space, time, and the nature of reality
|
||||
|
||||
You excel at:
|
||||
- Identifying the conceptual foundations of any physical theory
|
||||
- Developing thought experiments that challenge conventional wisdom
|
||||
- Finding elegant mathematical descriptions of physical reality
|
||||
- Questioning fundamental assumptions and exploring alternatives
|
||||
- Developing unified theories that explain diverse phenomena
|
||||
|
||||
When analyzing research questions, focus on the conceptual foundations and fundamental principles.
|
||||
Look for elegant, unified explanations and be willing to challenge established paradigms.""",
|
||||
},
|
||||
"Enrico Fermi": {
|
||||
"role": "Experimental Physicist & Nuclear Scientist",
|
||||
"expertise": [
|
||||
"Nuclear physics",
|
||||
"Experimental physics",
|
||||
"Neutron physics",
|
||||
"Statistical physics",
|
||||
"Practical applications",
|
||||
],
|
||||
"background": "Nobel laureate known for nuclear physics, experimental work, and the first nuclear reactor",
|
||||
"system_prompt": """You are Enrico Fermi, the brilliant experimental physicist and nuclear scientist.
|
||||
|
||||
Your research methodology involves:
|
||||
1. Rigorous experimental design and execution
|
||||
2. Practical application of theoretical principles
|
||||
3. Statistical analysis and probability in physics
|
||||
4. Nuclear physics and particle interactions
|
||||
5. Bridging theory with experimental validation
|
||||
|
||||
You excel at:
|
||||
- Designing experiments that test theoretical predictions
|
||||
- Applying statistical methods to physical problems
|
||||
- Developing practical applications of fundamental physics
|
||||
- Nuclear physics and particle physics experiments
|
||||
- Creating experimental setups that reveal new phenomena
|
||||
|
||||
When approaching research questions, focus on experimental design and practical implementation.
|
||||
Emphasize the importance of experimental validation and statistical analysis in physics research.""",
|
||||
},
|
||||
"Code-Implementer": {
|
||||
"role": "Computational Physicist & Code Developer",
|
||||
"expertise": [
|
||||
"Scientific computing",
|
||||
"Physics simulations",
|
||||
"Data analysis",
|
||||
"Algorithm implementation",
|
||||
"Numerical methods",
|
||||
],
|
||||
"background": "Specialized in implementing computational solutions to physics problems",
|
||||
"system_prompt": """You are a specialized computational physicist and code developer.
|
||||
|
||||
Your responsibilities include:
|
||||
1. Implementing computational solutions to physics problems
|
||||
2. Developing simulations and numerical methods
|
||||
3. Analyzing data and presenting results clearly
|
||||
4. Testing theoretical predictions through computation
|
||||
5. Providing quantitative analysis of research findings
|
||||
|
||||
You excel at:
|
||||
- Writing clear, efficient scientific code
|
||||
- Implementing numerical algorithms for physics problems
|
||||
- Data analysis and visualization
|
||||
- Computational optimization and performance
|
||||
- Bridging theoretical physics with computational implementation
|
||||
|
||||
When implementing solutions, focus on:
|
||||
- Clear, well-documented code
|
||||
- Efficient numerical algorithms
|
||||
- Comprehensive testing and validation
|
||||
- Clear presentation of results and analysis
|
||||
- Quantitative assessment of theoretical predictions""",
|
||||
},
|
||||
}
|
||||
|
||||
agents = []
|
||||
for name, data in physicists_data.items():
|
||||
agent = Agent(
|
||||
agent_name=name,
|
||||
system_prompt=data["system_prompt"],
|
||||
model_name=model_name,
|
||||
random_model_name=random_model_name,
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
dynamic_context_window=True,
|
||||
)
|
||||
agents.append(agent)
|
||||
|
||||
return agents
|
||||
|
||||
|
||||
class BellLabsSwarm:
|
||||
"""
|
||||
Bell Labs Research Simulation Swarm
|
||||
|
||||
Simulates the collaborative research environment of Bell Labs with famous physicists
|
||||
working together on complex research questions. The workflow follows:
|
||||
|
||||
1. Task is presented to the team
|
||||
2. Oppenheimer creates a research plan
|
||||
3. Physicists discuss and vote on approaches using majority voting
|
||||
4. Code implementation agent tests the theory
|
||||
5. Results are analyzed and fed back to the team
|
||||
6. Process repeats for n loops with iterative refinement
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "Bell Labs Research Team",
|
||||
description: str = "A collaborative research environment simulating Bell Labs physicists",
|
||||
max_loops: int = 1,
|
||||
verbose: bool = True,
|
||||
model_name: str = "gpt-4o-mini",
|
||||
random_model_name: bool = False,
|
||||
output_type: str = "str-all-except-first",
|
||||
dynamic_context_window: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Initialize the Bell Labs Research Swarm.
|
||||
|
||||
Args:
|
||||
name: Name of the swarm
|
||||
description: Description of the swarm's purpose
|
||||
max_loops: Number of research iteration loops
|
||||
verbose: Whether to enable verbose logging
|
||||
model_name: Model to use for all agents
|
||||
**kwargs: Additional arguments passed to BaseSwarm
|
||||
"""
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.max_loops = max_loops
|
||||
self.verbose = verbose
|
||||
self.model_name = model_name
|
||||
self.kwargs = kwargs
|
||||
self.random_model_name = random_model_name
|
||||
self.output_type = output_type
|
||||
self.dynamic_context_window = dynamic_context_window
|
||||
|
||||
self.conversation = Conversation(
|
||||
dynamic_context_window=dynamic_context_window
|
||||
)
|
||||
|
||||
# Create the physicist agents
|
||||
self.agents = _create_physicist_agents(
|
||||
model_name=model_name, random_model_name=random_model_name
|
||||
)
|
||||
|
||||
# Set up specialized agents
|
||||
self.oppenheimer = self._get_agent_by_name(
|
||||
"J. Robert Oppenheimer"
|
||||
)
|
||||
self.code_implementer = self._get_agent_by_name(
|
||||
"Code-Implementer"
|
||||
)
|
||||
|
||||
self.physicists = [
|
||||
agent
|
||||
for agent in self.agents
|
||||
if agent.agent_name != "J. Robert Oppenheimer"
|
||||
and agent.agent_name != "Code-Implementer"
|
||||
]
|
||||
|
||||
# # Find the code implementer agent
|
||||
# code_implementer = self._get_agent_by_name("Code-Implementer")
|
||||
# code_implementer.tools = [developer_worker_agent]
|
||||
|
||||
logger.info(
|
||||
f"Bell Labs Research Team initialized with {len(self.agents)} agents"
|
||||
)
|
||||
|
||||
def _get_agent_by_name(self, name: str) -> Optional[Agent]:
|
||||
"""Get an agent by name."""
|
||||
for agent in self.agents:
|
||||
if agent.agent_name == name:
|
||||
return agent
|
||||
return None
|
||||
|
||||
def run(
|
||||
self, task: str, img: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run the Bell Labs research simulation.
|
||||
|
||||
Args:
|
||||
task: The research question or task to investigate
|
||||
|
||||
Returns:
|
||||
Dictionary containing the research results, process history, and full conversation
|
||||
"""
|
||||
logger.info(f"Starting Bell Labs research on: {task}")
|
||||
|
||||
# Add initial task to conversation history
|
||||
self.conversation.add(
|
||||
"Research Coordinator", f"Initial Research Task: {task}"
|
||||
)
|
||||
|
||||
# Oppenheimer
|
||||
oppenheimer_plan = self.oppenheimer.run(
|
||||
task=self.conversation.get_str(), img=img
|
||||
)
|
||||
|
||||
self.conversation.add(
|
||||
self.oppenheimer.agent_name,
|
||||
f"Research Plan: {oppenheimer_plan}",
|
||||
)
|
||||
|
||||
# Discussion
|
||||
|
||||
# Physicists
|
||||
physicist_discussion = self._conduct_physicist_discussion(
|
||||
task, self.conversation.get_str()
|
||||
)
|
||||
|
||||
# Add to conversation history
|
||||
self.conversation.add(
|
||||
"Group Discussion", physicist_discussion
|
||||
)
|
||||
|
||||
# Now implement the solution
|
||||
implementation_results = self._implement_and_test_solution(
|
||||
history=self.conversation.get_str()
|
||||
)
|
||||
|
||||
# Add to conversation history
|
||||
self.conversation.add(
|
||||
self.code_implementer.agent_name, implementation_results
|
||||
)
|
||||
|
||||
return history_output_formatter(
|
||||
conversation=self.conversation, type="str"
|
||||
)
|
||||
|
||||
def _create_research_plan(
|
||||
self, task: str, loop_number: int
|
||||
) -> str:
|
||||
"""
|
||||
Have Oppenheimer create a research plan.
|
||||
|
||||
Args:
|
||||
task: Research task
|
||||
loop_number: Current loop number
|
||||
|
||||
Returns:
|
||||
Research plan from Oppenheimer
|
||||
"""
|
||||
prompt = f"""
|
||||
Research Task: {task}
|
||||
|
||||
Loop Number: {loop_number + 1}
|
||||
|
||||
As J. Robert Oppenheimer, create a comprehensive research plan for this task.
|
||||
|
||||
Your plan should include:
|
||||
1. Clear research objectives and hypotheses
|
||||
2. Theoretical framework and approach
|
||||
3. Specific research questions to investigate
|
||||
4. Methodology for testing and validation
|
||||
5. Expected outcomes and success criteria
|
||||
6. Timeline and milestones
|
||||
7. Resource requirements and team coordination
|
||||
|
||||
Provide a detailed, actionable plan that the research team can follow.
|
||||
"""
|
||||
|
||||
plan = self.oppenheimer.run(prompt)
|
||||
return plan
|
||||
|
||||
def _conduct_physicist_discussion(
|
||||
self, task: str, history: str
|
||||
) -> str:
|
||||
"""
|
||||
Conduct a natural discussion among physicists where they build on each other's ideas.
|
||||
|
||||
Args:
|
||||
task: Research task
|
||||
history: Conversation history including Oppenheimer's plan
|
||||
|
||||
Returns:
|
||||
Results of the physicist discussion as a conversation transcript
|
||||
"""
|
||||
import random
|
||||
|
||||
# Shuffle the physicists to create random discussion order
|
||||
discussion_order = self.physicists.copy()
|
||||
random.shuffle(discussion_order)
|
||||
|
||||
discussion_transcript = []
|
||||
current_context = (
|
||||
f"{history}\n\nCurrent Research Task: {task}\n\n"
|
||||
)
|
||||
|
||||
# Each physicist contributes to the discussion, building on previous contributions
|
||||
for i, physicist in enumerate(discussion_order):
|
||||
if i == 0:
|
||||
# First physicist starts the discussion
|
||||
discussion_prompt = f"""
|
||||
{current_context}
|
||||
|
||||
As {physicist.agent_name}, you are starting the group discussion about this research plan.
|
||||
|
||||
Based on your expertise, provide your initial thoughts on:
|
||||
|
||||
1. What aspects of Oppenheimer's research plan do you find most promising?
|
||||
2. What theoretical challenges or concerns do you see?
|
||||
3. What specific approaches would you recommend based on your expertise?
|
||||
4. What questions or clarifications do you have for the team?
|
||||
|
||||
Be specific and draw from your unique perspective and expertise. This will set the tone for the group discussion.
|
||||
"""
|
||||
else:
|
||||
# Subsequent physicists build on the discussion
|
||||
previous_contributions = "\n\n".join(
|
||||
discussion_transcript
|
||||
)
|
||||
discussion_prompt = f"""
|
||||
{current_context}
|
||||
|
||||
Previous Discussion:
|
||||
{previous_contributions}
|
||||
|
||||
As {physicist.agent_name}, continue the group discussion by building on your colleagues' ideas.
|
||||
|
||||
Consider:
|
||||
1. How do your colleagues' perspectives relate to your expertise in {', '.join(physicist.expertise)}?
|
||||
2. What additional insights can you add to the discussion?
|
||||
3. How can you address any concerns or questions raised by others?
|
||||
4. What specific next steps would you recommend based on the discussion so far?
|
||||
|
||||
Engage directly with your colleagues' ideas and contribute your unique perspective to move the research forward.
|
||||
"""
|
||||
|
||||
# Get the physicist's contribution
|
||||
contribution = physicist.run(discussion_prompt)
|
||||
|
||||
# Add to transcript with clear attribution
|
||||
discussion_transcript.append(
|
||||
f"{physicist.agent_name}: {contribution}"
|
||||
)
|
||||
|
||||
# Update context for next iteration
|
||||
current_context = (
|
||||
f"{history}\n\nCurrent Research Task: {task}\n\nGroup Discussion:\n"
|
||||
+ "\n\n".join(discussion_transcript)
|
||||
)
|
||||
|
||||
# Create a summary of the discussion
|
||||
summary_prompt = f"""
|
||||
Research Task: {task}
|
||||
|
||||
Complete Discussion Transcript:
|
||||
{chr(10).join(discussion_transcript)}
|
||||
|
||||
As a research coordinator, provide a concise summary of the key points from this group discussion:
|
||||
|
||||
1. Main areas of agreement among the physicists
|
||||
2. Key concerns or challenges identified
|
||||
3. Specific recommendations made by the team
|
||||
4. Next steps for moving forward with the research
|
||||
|
||||
Focus on actionable insights and clear next steps that the team can implement.
|
||||
"""
|
||||
|
||||
# Use Oppenheimer to summarize the discussion
|
||||
discussion_summary = self.oppenheimer.run(summary_prompt)
|
||||
|
||||
# Return the full discussion transcript with summary
|
||||
full_discussion = f"Group Discussion Transcript:\n\n{chr(10).join(discussion_transcript)}\n\n---\nDiscussion Summary:\n{discussion_summary}"
|
||||
|
||||
return full_discussion
|
||||
|
||||
def _implement_and_test_solution(
|
||||
self,
|
||||
history: str,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Implement and test the proposed solution.
|
||||
|
||||
Args:
|
||||
task: Research task
|
||||
plan: Research plan
|
||||
discussion_results: Results from physicist discussion
|
||||
loop_number: Current loop number
|
||||
|
||||
Returns:
|
||||
Implementation and testing results
|
||||
"""
|
||||
implementation_prompt = f"""
|
||||
{history}
|
||||
|
||||
As the Code Implementer, your task is to:
|
||||
|
||||
1. Implement a computational solution based on the research plan
|
||||
2. Test the theoretical predictions through simulation or calculation
|
||||
3. Analyze the results and provide quantitative assessment
|
||||
4. Identify any discrepancies between theory and implementation
|
||||
5. Suggest improvements or next steps
|
||||
|
||||
Provide:
|
||||
- Clear description of your implementation approach
|
||||
- Code or algorithm description
|
||||
- Test results and analysis
|
||||
- Comparison with theoretical predictions
|
||||
- Recommendations for further investigation
|
||||
|
||||
Focus on practical implementation and quantitative results.
|
||||
"""
|
||||
|
||||
implementation_results = self.code_implementer.run(
|
||||
implementation_prompt
|
||||
)
|
||||
|
||||
return implementation_results
|
||||
|
||||
def _analyze_results(
|
||||
self, implementation_results: Dict[str, Any], loop_number: int
|
||||
) -> str:
|
||||
"""
|
||||
Analyze the results and provide team review.
|
||||
|
||||
Args:
|
||||
implementation_results: Results from implementation phase
|
||||
loop_number: Current loop number
|
||||
|
||||
Returns:
|
||||
Analysis and recommendations
|
||||
"""
|
||||
analysis_prompt = f"""
|
||||
Implementation Results: {implementation_results}
|
||||
|
||||
Loop Number: {loop_number + 1}
|
||||
|
||||
As the research team, analyze these results and provide:
|
||||
|
||||
1. Assessment of whether the implementation supports the theoretical predictions
|
||||
2. Identification of any unexpected findings or discrepancies
|
||||
3. Evaluation of the methodology and approach
|
||||
4. Recommendations for the next research iteration
|
||||
5. Insights gained from this round of investigation
|
||||
|
||||
Consider:
|
||||
- What worked well in this approach?
|
||||
- What challenges or limitations were encountered?
|
||||
- How can the research be improved in the next iteration?
|
||||
- What new questions or directions have emerged?
|
||||
|
||||
Provide a comprehensive analysis that will guide the next research phase.
|
||||
"""
|
||||
|
||||
# Use team discussion for results analysis
|
||||
analysis_results = self._conduct_team_analysis(
|
||||
analysis_prompt
|
||||
)
|
||||
return analysis_results
|
||||
|
||||
def _conduct_team_analysis(self, analysis_prompt: str) -> str:
|
||||
"""
|
||||
Conduct a team analysis discussion using the same approach as physicist discussion.
|
||||
|
||||
Args:
|
||||
analysis_prompt: The prompt for the analysis
|
||||
|
||||
Returns:
|
||||
Results of the team analysis discussion
|
||||
"""
|
||||
import random
|
||||
|
||||
# Shuffle the agents to create random discussion order
|
||||
discussion_order = self.agents.copy()
|
||||
random.shuffle(discussion_order)
|
||||
|
||||
discussion_transcript = []
|
||||
current_context = analysis_prompt
|
||||
|
||||
# Each agent contributes to the analysis, building on previous contributions
|
||||
for i, agent in enumerate(discussion_order):
|
||||
if i == 0:
|
||||
# First agent starts the analysis
|
||||
agent_prompt = f"""
|
||||
{current_context}
|
||||
|
||||
As {agent.agent_name}, you are starting the team analysis discussion.
|
||||
|
||||
Based on your expertise and role, provide your initial analysis of the implementation results.
|
||||
Focus on what you can contribute from your unique perspective.
|
||||
"""
|
||||
else:
|
||||
# Subsequent agents build on the analysis
|
||||
previous_contributions = "\n\n".join(
|
||||
discussion_transcript
|
||||
)
|
||||
agent_prompt = f"""
|
||||
{current_context}
|
||||
|
||||
Previous Analysis:
|
||||
{previous_contributions}
|
||||
|
||||
As {agent.agent_name}, continue the team analysis by building on your colleagues' insights.
|
||||
|
||||
Consider:
|
||||
1. How do your colleagues' perspectives relate to your expertise?
|
||||
2. What additional insights can you add to the analysis?
|
||||
3. How can you address any concerns or questions raised by others?
|
||||
4. What specific recommendations would you make based on the analysis so far?
|
||||
|
||||
Engage directly with your colleagues' ideas and contribute your unique perspective.
|
||||
"""
|
||||
|
||||
# Get the agent's contribution
|
||||
contribution = agent.run(agent_prompt)
|
||||
|
||||
# Add to transcript with clear attribution
|
||||
discussion_transcript.append(
|
||||
f"{agent.agent_name}: {contribution}"
|
||||
)
|
||||
|
||||
# Update context for next iteration
|
||||
current_context = (
|
||||
f"{analysis_prompt}\n\nTeam Analysis:\n"
|
||||
+ "\n\n".join(discussion_transcript)
|
||||
)
|
||||
|
||||
# Create a summary of the analysis
|
||||
summary_prompt = f"""
|
||||
Analysis Prompt: {analysis_prompt}
|
||||
|
||||
Complete Analysis Transcript:
|
||||
{chr(10).join(discussion_transcript)}
|
||||
|
||||
As a research coordinator, provide a concise summary of the key points from this team analysis:
|
||||
|
||||
1. Main findings and insights from the team
|
||||
2. Key recommendations made
|
||||
3. Areas of agreement and disagreement
|
||||
4. Next steps for the research
|
||||
|
||||
Focus on actionable insights and clear next steps.
|
||||
"""
|
||||
|
||||
# Use Oppenheimer to summarize the analysis
|
||||
analysis_summary = self.oppenheimer.run(summary_prompt)
|
||||
|
||||
# Return the full analysis transcript with summary
|
||||
full_analysis = f"Team Analysis Transcript:\n\n{chr(10).join(discussion_transcript)}\n\n---\nAnalysis Summary:\n{analysis_summary}"
|
||||
|
||||
return full_analysis
|
||||
|
||||
def _refine_task_for_next_iteration(
|
||||
self, current_task: str, loop_results: Dict[str, Any]
|
||||
) -> str:
|
||||
"""
|
||||
Refine the task for the next research iteration.
|
||||
|
||||
Args:
|
||||
current_task: Current research task
|
||||
loop_results: Results from the current loop
|
||||
|
||||
Returns:
|
||||
Refined task for next iteration
|
||||
"""
|
||||
refinement_prompt = f"""
|
||||
Current Research Task: {current_task}
|
||||
|
||||
Results from Current Loop: {loop_results}
|
||||
|
||||
Based on the findings and analysis from this research loop, refine the research task for the next iteration.
|
||||
|
||||
Consider:
|
||||
- What new questions have emerged?
|
||||
- What aspects need deeper investigation?
|
||||
- What alternative approaches should be explored?
|
||||
- What specific hypotheses should be tested?
|
||||
|
||||
Provide a refined, focused research question that builds upon the current findings
|
||||
and addresses the most important next steps identified by the team.
|
||||
"""
|
||||
|
||||
# Use Oppenheimer to refine the task
|
||||
refined_task = self.oppenheimer.run(refinement_prompt)
|
||||
|
||||
# Add task refinement to conversation history
|
||||
self.conversation.add(
|
||||
"J. Robert Oppenheimer",
|
||||
f"Task Refined for Next Iteration: {refined_task}",
|
||||
)
|
||||
|
||||
return refined_task
|
||||
|
||||
def _generate_final_conclusion(
|
||||
self, research_results: Dict[str, Any]
|
||||
) -> str:
|
||||
"""
|
||||
Generate a final conclusion summarizing all research findings.
|
||||
|
||||
Args:
|
||||
research_results: Complete research results from all loops
|
||||
|
||||
Returns:
|
||||
Final research conclusion
|
||||
"""
|
||||
conclusion_prompt = f"""
|
||||
Complete Research Results: {research_results}
|
||||
|
||||
As J. Robert Oppenheimer, provide a comprehensive final conclusion for this research project.
|
||||
|
||||
Your conclusion should:
|
||||
1. Summarize the key findings from all research loops
|
||||
2. Identify the most significant discoveries or insights
|
||||
3. Evaluate the success of the research approach
|
||||
4. Highlight any limitations or areas for future investigation
|
||||
5. Provide a clear statement of what was accomplished
|
||||
6. Suggest next steps for continued research
|
||||
|
||||
Synthesize the work of the entire team and provide a coherent narrative
|
||||
of the research journey and its outcomes.
|
||||
"""
|
||||
|
||||
final_conclusion = self.oppenheimer.run(conclusion_prompt)
|
||||
return final_conclusion
|
||||
|
||||
|
||||
# Example usage function
|
||||
def run_bell_labs_research(
|
||||
research_question: str,
|
||||
max_loops: int = 3,
|
||||
model_name: str = "gpt-4o-mini",
|
||||
verbose: bool = True,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run a Bell Labs research simulation.
|
||||
|
||||
Args:
|
||||
research_question: The research question to investigate
|
||||
max_loops: Number of research iteration loops
|
||||
model_name: Model to use for all agents
|
||||
verbose: Whether to enable verbose logging
|
||||
|
||||
Returns:
|
||||
Complete research results and findings
|
||||
"""
|
||||
bell_labs = BellLabsSwarm(
|
||||
max_loops=max_loops, verbose=verbose, model_name=model_name
|
||||
)
|
||||
|
||||
results = bell_labs.run(research_question)
|
||||
return results
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# # Example research question
|
||||
# research_question = """
|
||||
# Investigate the feasibility of quantum computing for solving complex optimization problems.
|
||||
# Consider both theoretical foundations and practical implementation challenges.
|
||||
# """
|
||||
|
||||
# print("Starting Bell Labs Research Simulation...")
|
||||
# print(f"Research Question: {research_question}")
|
||||
# print("-" * 80)
|
||||
|
||||
# results = run_bell_labs_research(
|
||||
# research_question=research_question,
|
||||
# max_loops=2,
|
||||
# verbose=True
|
||||
# )
|
||||
|
||||
# print("\n" + "=" * 80)
|
||||
# print("RESEARCH SIMULATION COMPLETED")
|
||||
# print("=" * 80)
|
||||
|
||||
# print(f"\nFinal Conclusion:\n{results['final_conclusion']}")
|
||||
|
||||
# print(f"\nResearch completed in {len(results['research_history'])} loops.")
|
||||
# print("Check the results dictionary for complete research details.")
|
||||
@ -1,306 +0,0 @@
|
||||
import json
|
||||
from typing import Any, List
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
class AgentOutput(BaseModel):
|
||||
"""
|
||||
Schema for capturing metadata and results of an agent run.
|
||||
"""
|
||||
|
||||
agent_name: str = Field(..., description="Name of the agent.")
|
||||
input_query: str = Field(
|
||||
..., description="Input query provided to the agent."
|
||||
)
|
||||
output_result: Any = Field(
|
||||
..., description="Result produced by the agent."
|
||||
)
|
||||
metadata: dict = Field(
|
||||
..., description="Additional metadata about the agent run."
|
||||
)
|
||||
|
||||
|
||||
class MatrixSwarm:
|
||||
"""
|
||||
A class to manage a matrix of agents and perform matrix operations similar to linear algebra.
|
||||
"""
|
||||
|
||||
def __init__(self, agents: List[List[Agent]]):
|
||||
"""
|
||||
Initializes the MatrixSwarm with a 2D list of agents.
|
||||
Args:
|
||||
agents (List[List[Agent]]): 2D list of agents representing the matrix.
|
||||
"""
|
||||
if not agents or not all(
|
||||
isinstance(row, list) for row in agents
|
||||
):
|
||||
raise ValueError("Agents must be provided as a 2D list.")
|
||||
if not all(
|
||||
isinstance(agent, Agent)
|
||||
for row in agents
|
||||
for agent in row
|
||||
):
|
||||
raise ValueError(
|
||||
"All elements of the matrix must be instances of `Agent`."
|
||||
)
|
||||
self.agents = agents
|
||||
self.outputs = [] # List to store outputs as AgentOutput
|
||||
|
||||
def validate_dimensions(self, other: "MatrixSwarm") -> None:
|
||||
"""
|
||||
Validates that two matrices have compatible dimensions for operations.
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm.
|
||||
|
||||
Raises:
|
||||
ValueError: If dimensions are incompatible.
|
||||
"""
|
||||
if len(self.agents) != len(other.agents) or len(
|
||||
self.agents[0]
|
||||
) != len(other.agents[0]):
|
||||
raise ValueError(
|
||||
"Matrix dimensions are incompatible for this operation."
|
||||
)
|
||||
|
||||
def transpose(self) -> "MatrixSwarm":
|
||||
"""
|
||||
Transposes the matrix of agents (swap rows and columns).
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new transposed MatrixSwarm.
|
||||
"""
|
||||
transposed_agents = [
|
||||
[self.agents[j][i] for j in range(len(self.agents))]
|
||||
for i in range(len(self.agents[0]))
|
||||
]
|
||||
return MatrixSwarm(transposed_agents)
|
||||
|
||||
def add(self, other: "MatrixSwarm") -> "MatrixSwarm":
|
||||
"""
|
||||
Adds two matrices element-wise.
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm to add.
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new MatrixSwarm resulting from the addition.
|
||||
"""
|
||||
self.validate_dimensions(other)
|
||||
added_agents = [
|
||||
[self.agents[i][j] for j in range(len(self.agents[i]))]
|
||||
for i in range(len(self.agents))
|
||||
]
|
||||
return MatrixSwarm(added_agents)
|
||||
|
||||
def scalar_multiply(self, scalar: int) -> "MatrixSwarm":
|
||||
"""
|
||||
Scales the agents by duplicating them scalar times along the row.
|
||||
|
||||
Args:
|
||||
scalar (int): The scalar multiplier.
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new MatrixSwarm where each agent is repeated scalar times along the row.
|
||||
"""
|
||||
scaled_agents = [
|
||||
[agent for _ in range(scalar) for agent in row]
|
||||
for row in self.agents
|
||||
]
|
||||
return MatrixSwarm(scaled_agents)
|
||||
|
||||
def multiply(
|
||||
self, other: "MatrixSwarm", inputs: List[str]
|
||||
) -> List[List[AgentOutput]]:
|
||||
"""
|
||||
Multiplies two matrices (dot product between rows and columns).
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm for multiplication.
|
||||
inputs (List[str]): A list of input queries for the agents.
|
||||
|
||||
Returns:
|
||||
List[List[AgentOutput]]: A resulting matrix of outputs after multiplication.
|
||||
"""
|
||||
if len(self.agents[0]) != len(other.agents):
|
||||
raise ValueError(
|
||||
"Matrix dimensions are incompatible for multiplication."
|
||||
)
|
||||
|
||||
results = []
|
||||
for i, row in enumerate(self.agents):
|
||||
row_results = []
|
||||
for col_idx in range(len(other.agents[0])):
|
||||
col = [
|
||||
other.agents[row_idx][col_idx]
|
||||
for row_idx in range(len(other.agents))
|
||||
]
|
||||
query = inputs[
|
||||
i
|
||||
] # Input query for the corresponding row
|
||||
intermediate_result = []
|
||||
|
||||
for agent_r, agent_c in zip(row, col):
|
||||
try:
|
||||
result = agent_r.run(query)
|
||||
intermediate_result.append(result)
|
||||
except Exception as e:
|
||||
intermediate_result.append(f"Error: {e}")
|
||||
|
||||
# Aggregate outputs from dot product
|
||||
combined_result = " ".join(
|
||||
intermediate_result
|
||||
) # Example aggregation
|
||||
row_results.append(
|
||||
AgentOutput(
|
||||
agent_name=f"DotProduct-{i}-{col_idx}",
|
||||
input_query=query,
|
||||
output_result=combined_result,
|
||||
metadata={"row": i, "col": col_idx},
|
||||
)
|
||||
)
|
||||
results.append(row_results)
|
||||
return results
|
||||
|
||||
def subtract(self, other: "MatrixSwarm") -> "MatrixSwarm":
|
||||
"""
|
||||
Subtracts two matrices element-wise.
|
||||
|
||||
Args:
|
||||
other (MatrixSwarm): Another MatrixSwarm to subtract.
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: A new MatrixSwarm resulting from the subtraction.
|
||||
"""
|
||||
self.validate_dimensions(other)
|
||||
subtracted_agents = [
|
||||
[self.agents[i][j] for j in range(len(self.agents[i]))]
|
||||
for i in range(len(self.agents))
|
||||
]
|
||||
return MatrixSwarm(subtracted_agents)
|
||||
|
||||
def identity(self, size: int) -> "MatrixSwarm":
|
||||
"""
|
||||
Creates an identity matrix of agents with size `size`.
|
||||
|
||||
Args:
|
||||
size (int): Size of the identity matrix (NxN).
|
||||
|
||||
Returns:
|
||||
MatrixSwarm: An identity MatrixSwarm.
|
||||
"""
|
||||
identity_agents = [
|
||||
[
|
||||
(
|
||||
self.agents[i][j]
|
||||
if i == j
|
||||
else Agent(
|
||||
agent_name=f"Zero-Agent-{i}-{j}",
|
||||
system_prompt="",
|
||||
)
|
||||
)
|
||||
for j in range(size)
|
||||
]
|
||||
for i in range(size)
|
||||
]
|
||||
return MatrixSwarm(identity_agents)
|
||||
|
||||
def determinant(self) -> Any:
|
||||
"""
|
||||
Computes the determinant of a square MatrixSwarm.
|
||||
|
||||
Returns:
|
||||
Any: Determinant of the matrix (as agent outputs).
|
||||
"""
|
||||
if len(self.agents) != len(self.agents[0]):
|
||||
raise ValueError(
|
||||
"Determinant can only be computed for square matrices."
|
||||
)
|
||||
|
||||
# Recursive determinant calculation (example using placeholder logic)
|
||||
if len(self.agents) == 1:
|
||||
return self.agents[0][0].run("Compute determinant")
|
||||
|
||||
det_result = 0
|
||||
for i in range(len(self.agents)):
|
||||
submatrix = MatrixSwarm(
|
||||
[row[:i] + row[i + 1 :] for row in self.agents[1:]]
|
||||
)
|
||||
cofactor = ((-1) ** i) * self.agents[0][i].run(
|
||||
"Compute determinant"
|
||||
)
|
||||
det_result += cofactor * submatrix.determinant()
|
||||
return det_result
|
||||
|
||||
def save_to_file(self, path: str) -> None:
|
||||
"""
|
||||
Saves the agent matrix structure and metadata to a file.
|
||||
|
||||
Args:
|
||||
path (str): File path to save the matrix.
|
||||
"""
|
||||
try:
|
||||
matrix_data = {
|
||||
"agents": [
|
||||
[agent.agent_name for agent in row]
|
||||
for row in self.agents
|
||||
],
|
||||
"outputs": [output.dict() for output in self.outputs],
|
||||
}
|
||||
with open(path, "w") as f:
|
||||
json.dump(matrix_data, f, indent=4)
|
||||
logger.info(f"MatrixSwarm saved to {path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving MatrixSwarm: {e}")
|
||||
|
||||
|
||||
# # Example usage
|
||||
# if __name__ == "__main__":
|
||||
# from swarms.prompts.finance_agent_sys_prompt import (
|
||||
# FINANCIAL_AGENT_SYS_PROMPT,
|
||||
# )
|
||||
|
||||
# # Create a 3x3 matrix of agents
|
||||
# agents = [
|
||||
# [
|
||||
# Agent(
|
||||
# agent_name=f"Agent-{i}-{j}",
|
||||
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
|
||||
# model_name="gpt-4o-mini",
|
||||
# max_loops=1,
|
||||
# autosave=True,
|
||||
# dashboard=False,
|
||||
# verbose=True,
|
||||
# dynamic_temperature_enabled=True,
|
||||
# saved_state_path=f"agent_{i}_{j}.json",
|
||||
# user_name="swarms_corp",
|
||||
# retry_attempts=1,
|
||||
# context_length=200000,
|
||||
# return_step_meta=False,
|
||||
# output_type="string",
|
||||
# streaming_on=False,
|
||||
# )
|
||||
# for j in range(3)
|
||||
# ]
|
||||
# for i in range(3)
|
||||
# ]
|
||||
|
||||
# # Initialize the matrix
|
||||
# agent_matrix = MatrixSwarm(agents)
|
||||
|
||||
# # Example queries
|
||||
# inputs = [
|
||||
# "Explain Roth IRA benefits",
|
||||
# "Differences between ETFs and mutual funds",
|
||||
# "How to create a diversified portfolio",
|
||||
# ]
|
||||
|
||||
# # Run agents
|
||||
# outputs = agent_matrix.multiply(agent_matrix.transpose(), inputs)
|
||||
|
||||
# # Save results
|
||||
# agent_matrix.save_to_file("agent_matrix_results.json")
|
||||
@ -1,326 +0,0 @@
|
||||
import math
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import Any, Callable, Dict, Optional, Tuple
|
||||
|
||||
from datasets import Dataset, load_dataset
|
||||
from loguru import logger
|
||||
from tqdm import tqdm
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging configuration: log to console and file (rotating by size)
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Swarm interface example
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Benchmark configuration
|
||||
# -----------------------------------------------------------------------------
|
||||
class BenchmarkConfig:
|
||||
"""
|
||||
Configuration for a benchmark dataset.
|
||||
|
||||
Attributes:
|
||||
input_column (str): The column containing the task prompt.
|
||||
answer_column (str): The column containing the expected answer.
|
||||
answer_extractor (Optional[Callable[[Any], str]]): Function to extract
|
||||
a string answer from the dataset's raw answer format.
|
||||
answer_matcher (Optional[Callable[[str, str], bool]]): Function to compare
|
||||
the expected answer and the swarm output. If None, a simple substring
|
||||
containment is used.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_column: str,
|
||||
answer_column: str,
|
||||
answer_extractor: Optional[Callable[[Any], str]] = None,
|
||||
answer_matcher: Optional[Callable[[str, str], bool]] = None,
|
||||
):
|
||||
self.input_column = input_column
|
||||
self.answer_column = answer_column
|
||||
self.answer_extractor = answer_extractor
|
||||
self.answer_matcher = answer_matcher
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Preset dataset configurations for popular benchmarks
|
||||
# -----------------------------------------------------------------------------
|
||||
PRESET_DATASETS: Dict[str, BenchmarkConfig] = {
|
||||
"gsm8k": BenchmarkConfig(
|
||||
input_column="question",
|
||||
answer_column="answer",
|
||||
),
|
||||
"squad": BenchmarkConfig(
|
||||
input_column="question",
|
||||
answer_column="answers",
|
||||
answer_extractor=lambda ans: (
|
||||
ans["text"][0]
|
||||
if isinstance(ans, dict)
|
||||
and "text" in ans
|
||||
and isinstance(ans["text"], list)
|
||||
and ans["text"]
|
||||
else str(ans)
|
||||
),
|
||||
),
|
||||
"winogrande": BenchmarkConfig(
|
||||
input_column="sentence",
|
||||
answer_column="answer",
|
||||
),
|
||||
"commonsense_qa": BenchmarkConfig(
|
||||
input_column="question",
|
||||
answer_column="answerKey",
|
||||
),
|
||||
# Add additional presets here.
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# SwarmEvaluator with extended features
|
||||
# -----------------------------------------------------------------------------
|
||||
class SwarmEvaluator:
|
||||
"""
|
||||
Evaluator that uses a swarm of agents to process benchmark datasets
|
||||
from Hugging Face, with concurrency, retries, progress display, performance timing,
|
||||
and customizable answer matching.
|
||||
|
||||
Example:
|
||||
swarm = Swarm()
|
||||
evaluator = SwarmEvaluator(swarm)
|
||||
results = evaluator.evaluate("gsm8k", split="test", max_workers=4)
|
||||
print(results)
|
||||
"""
|
||||
|
||||
def __init__(self, swarm: callable) -> None:
|
||||
"""
|
||||
Initialize the evaluator with a given swarm.
|
||||
|
||||
Args:
|
||||
swarm (Swarm): A swarm instance with a callable run(task: str) method.
|
||||
"""
|
||||
self.swarm = swarm
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
dataset_name: str,
|
||||
split: str = "test",
|
||||
config: Optional[BenchmarkConfig] = None,
|
||||
max_workers: int = 1,
|
||||
max_retries: int = 3,
|
||||
show_progress: bool = True,
|
||||
output_file: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Evaluate the specified benchmark dataset using the swarm.
|
||||
|
||||
Args:
|
||||
dataset_name (str): The dataset name (from Hugging Face).
|
||||
split (str): The dataset split (e.g., "test", "validation").
|
||||
config (Optional[BenchmarkConfig]): Benchmark configuration. If None,
|
||||
a preset config is used.
|
||||
max_workers (int): Number of concurrent workers.
|
||||
max_retries (int): Number of retries for swarm tasks on failure.
|
||||
show_progress (bool): If True, display a progress bar.
|
||||
output_file (Optional[str]): Path to a file to write the results.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Evaluation metrics including total examples, correct answers,
|
||||
accuracy, and total evaluation time.
|
||||
"""
|
||||
if config is None:
|
||||
config = PRESET_DATASETS.get(dataset_name)
|
||||
if config is None:
|
||||
raise ValueError(
|
||||
f"No preset config for dataset '{dataset_name}'. Provide a BenchmarkConfig."
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Loading dataset '{dataset_name}' (split: {split})..."
|
||||
)
|
||||
dataset: Dataset = load_dataset(dataset_name, split=split)
|
||||
total_examples = len(dataset)
|
||||
logger.info(f"Total examples to evaluate: {total_examples}")
|
||||
|
||||
start_time = time.time()
|
||||
correct = 0
|
||||
|
||||
# Function to process a single example.
|
||||
def _process_example(
|
||||
example: Dict[str, Any], idx: int
|
||||
) -> Tuple[bool, float]:
|
||||
task_start = time.time()
|
||||
task_text = example.get(config.input_column)
|
||||
expected_answer = example.get(config.answer_column)
|
||||
|
||||
if task_text is None or expected_answer is None:
|
||||
logger.warning(
|
||||
f"Example {idx}: Missing '{config.input_column}' or '{config.answer_column}', skipping."
|
||||
)
|
||||
return (False, 0.0)
|
||||
|
||||
# Use answer_extractor if provided.
|
||||
if config.answer_extractor:
|
||||
try:
|
||||
expected_answer = config.answer_extractor(
|
||||
expected_answer
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Example {idx}: Error extracting answer: {e}"
|
||||
)
|
||||
return (False, 0.0)
|
||||
|
||||
logger.debug(f"Example {idx} - Task: {task_text}")
|
||||
logger.debug(
|
||||
f"Example {idx} - Expected Answer: {expected_answer}"
|
||||
)
|
||||
|
||||
try:
|
||||
swarm_output = self._run_with_retry(
|
||||
task_text, max_retries
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Example {idx}: Failed after retries. Error: {e}"
|
||||
)
|
||||
return (False, time.time() - task_start)
|
||||
|
||||
logger.debug(
|
||||
f"Example {idx} - Swarm Output: {swarm_output}"
|
||||
)
|
||||
|
||||
# Use custom matcher if provided; otherwise, default matching.
|
||||
if config.answer_matcher:
|
||||
is_correct = config.answer_matcher(
|
||||
expected_answer, swarm_output
|
||||
)
|
||||
else:
|
||||
is_correct = self._default_matcher(
|
||||
expected_answer, swarm_output
|
||||
)
|
||||
|
||||
task_time = time.time() - task_start
|
||||
logger.info(
|
||||
f"Example {idx}: {'Correct' if is_correct else 'Incorrect'} in {task_time:.2f}s"
|
||||
)
|
||||
return (is_correct, task_time)
|
||||
|
||||
# Use ThreadPoolExecutor for concurrency.
|
||||
futures = []
|
||||
total_time = 0.0
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
# Optionally wrap the dataset with tqdm for a progress bar.
|
||||
examples_iter = enumerate(dataset, start=1)
|
||||
if show_progress:
|
||||
examples_iter = tqdm(
|
||||
list(examples_iter),
|
||||
total=total_examples,
|
||||
desc="Evaluating",
|
||||
)
|
||||
|
||||
for idx, example in examples_iter:
|
||||
futures.append(
|
||||
executor.submit(_process_example, example, idx)
|
||||
)
|
||||
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
is_correct, elapsed = future.result()
|
||||
total_time += elapsed
|
||||
if is_correct:
|
||||
correct += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing an example: {e}")
|
||||
|
||||
overall_time = time.time() - start_time
|
||||
accuracy = (
|
||||
correct / total_examples if total_examples > 0 else 0.0
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Evaluation complete. Total examples: {total_examples}, Correct: {correct}, "
|
||||
f"Accuracy: {accuracy:.2%}, Overall Time: {overall_time:.2f}s, "
|
||||
f"Average per-example time: {total_time/total_examples if total_examples else 0:.2f}s"
|
||||
)
|
||||
|
||||
results = {
|
||||
"total": total_examples,
|
||||
"correct": correct,
|
||||
"accuracy": accuracy,
|
||||
"overall_time": overall_time,
|
||||
"average_example_time": (
|
||||
total_time / total_examples
|
||||
if total_examples
|
||||
else math.nan
|
||||
),
|
||||
}
|
||||
|
||||
# Optionally save results to a file.
|
||||
if output_file:
|
||||
try:
|
||||
with open(output_file, "w") as f:
|
||||
for key, value in results.items():
|
||||
f.write(f"{key}: {value}\n")
|
||||
logger.info(f"Results saved to {output_file}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error saving results to {output_file}: {e}"
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def _run_with_retry(self, task: str, max_retries: int) -> str:
|
||||
"""
|
||||
Runs the swarm task with a retry mechanism.
|
||||
|
||||
Args:
|
||||
task (str): The task string.
|
||||
max_retries (int): Maximum number of retries.
|
||||
|
||||
Returns:
|
||||
str: Swarm output.
|
||||
|
||||
Raises:
|
||||
Exception: If all retries fail.
|
||||
"""
|
||||
attempt = 0
|
||||
while attempt <= max_retries:
|
||||
try:
|
||||
start = time.time()
|
||||
result = self.swarm.run(task)
|
||||
elapsed = time.time() - start
|
||||
logger.debug(
|
||||
f"Task succeeded in {elapsed:.2f}s on attempt {attempt + 1}"
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Task failed on attempt {attempt + 1}: {e}"
|
||||
)
|
||||
attempt += 1
|
||||
time.sleep(0.5 * attempt) # Exponential backoff
|
||||
raise Exception("Max retries exceeded for task.")
|
||||
|
||||
@staticmethod
|
||||
def _default_matcher(expected: str, output: str) -> bool:
|
||||
"""
|
||||
Default answer matching using a normalized substring check.
|
||||
|
||||
Args:
|
||||
expected (str): The expected answer.
|
||||
output (str): The swarm output.
|
||||
|
||||
Returns:
|
||||
bool: True if expected is found in output; otherwise, False.
|
||||
"""
|
||||
expected_norm = " ".join(expected.strip().split())
|
||||
output_norm = " ".join(output.strip().split())
|
||||
return expected_norm in output_norm
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Example usage
|
||||
# -----------------------------------------------------------------------------
|
||||
@ -0,0 +1,447 @@
|
||||
import os
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from concurrent.futures import (
|
||||
ThreadPoolExecutor,
|
||||
as_completed,
|
||||
TimeoutError,
|
||||
)
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from loguru import logger
|
||||
|
||||
# Lazy import to avoid circular dependency
|
||||
|
||||
# Default model configuration
|
||||
DEFAULT_MODEL = "gpt-4o"
|
||||
|
||||
|
||||
class MarkdownAgentConfig(BaseModel):
|
||||
"""Configuration model for agents loaded from Claude Code markdown files."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
model_name: Optional[str] = "gpt-4o"
|
||||
temperature: Optional[float] = Field(default=0.1, ge=0.0, le=2.0)
|
||||
mcp_url: Optional[int] = None
|
||||
system_prompt: str
|
||||
max_loops: int = Field(default=1, ge=1)
|
||||
autosave: bool = False
|
||||
dashboard: bool = False
|
||||
verbose: bool = False
|
||||
dynamic_temperature_enabled: bool = False
|
||||
saved_state_path: Optional[str] = None
|
||||
user_name: str = "default_user"
|
||||
retry_attempts: int = Field(default=3, ge=1)
|
||||
context_length: int = Field(default=100000, ge=1000)
|
||||
return_step_meta: bool = False
|
||||
output_type: str = "str"
|
||||
auto_generate_prompt: bool = False
|
||||
artifacts_on: bool = False
|
||||
artifacts_file_extension: str = ".md"
|
||||
artifacts_output_path: str = ""
|
||||
streaming_on: bool = False
|
||||
|
||||
@field_validator("system_prompt")
|
||||
@classmethod
|
||||
def validate_system_prompt(cls, v):
|
||||
if not v or not isinstance(v, str) or len(v.strip()) == 0:
|
||||
raise ValueError(
|
||||
"System prompt must be a non-empty string"
|
||||
)
|
||||
return v
|
||||
|
||||
|
||||
class AgentLoader:
|
||||
"""
|
||||
Loader for creating agents from markdown files using Claude Code sub-agent format.
|
||||
|
||||
Supports both single markdown file and multiple markdown files.
|
||||
Uses YAML frontmatter format for agent configuration.
|
||||
|
||||
Features:
|
||||
- Single markdown file loading
|
||||
- Multiple markdown files loading (batch processing)
|
||||
- YAML frontmatter parsing
|
||||
- Agent configuration extraction from YAML metadata
|
||||
- Error handling and validation
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the AgentLoader.
|
||||
"""
|
||||
pass
|
||||
|
||||
def parse_yaml_frontmatter(self, content: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Parse YAML frontmatter from markdown content.
|
||||
|
||||
Args:
|
||||
content: Markdown content with potential YAML frontmatter
|
||||
|
||||
Returns:
|
||||
Dictionary with parsed YAML data and remaining content
|
||||
"""
|
||||
lines = content.split("\n")
|
||||
|
||||
# Check if content starts with YAML frontmatter
|
||||
if not lines[0].strip() == "---":
|
||||
return {"frontmatter": {}, "content": content}
|
||||
|
||||
# Find end of frontmatter
|
||||
end_marker = -1
|
||||
for i, line in enumerate(lines[1:], 1):
|
||||
if line.strip() == "---":
|
||||
end_marker = i
|
||||
break
|
||||
|
||||
if end_marker == -1:
|
||||
return {"frontmatter": {}, "content": content}
|
||||
|
||||
# Extract frontmatter and content
|
||||
frontmatter_text = "\n".join(lines[1:end_marker])
|
||||
remaining_content = "\n".join(lines[end_marker + 1 :]).strip()
|
||||
|
||||
try:
|
||||
frontmatter_data = yaml.safe_load(frontmatter_text) or {}
|
||||
except yaml.YAMLError as e:
|
||||
logger.warning(f"Failed to parse YAML frontmatter: {e}")
|
||||
return {"frontmatter": {}, "content": content}
|
||||
|
||||
return {
|
||||
"frontmatter": frontmatter_data,
|
||||
"content": remaining_content,
|
||||
}
|
||||
|
||||
def parse_markdown_file(
|
||||
self, file_path: str
|
||||
) -> MarkdownAgentConfig:
|
||||
"""
|
||||
Parse a single markdown file to extract agent configuration.
|
||||
Uses Claude Code sub-agent YAML frontmatter format.
|
||||
|
||||
Args:
|
||||
file_path: Path to markdown file
|
||||
|
||||
Returns:
|
||||
MarkdownAgentConfig object with parsed configuration
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
ValueError: If parsing fails or no YAML frontmatter found
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(
|
||||
f"Markdown file {file_path} not found."
|
||||
)
|
||||
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
content = file.read()
|
||||
|
||||
# Parse YAML frontmatter (Claude Code sub-agent format)
|
||||
yaml_result = self.parse_yaml_frontmatter(content)
|
||||
frontmatter = yaml_result["frontmatter"]
|
||||
remaining_content = yaml_result["content"]
|
||||
|
||||
if not frontmatter:
|
||||
raise ValueError(
|
||||
f"No YAML frontmatter found in {file_path}. File must use Claude Code sub-agent format with YAML frontmatter."
|
||||
)
|
||||
|
||||
# Use YAML frontmatter data
|
||||
config_data = {
|
||||
"name": frontmatter.get("name", Path(file_path).stem),
|
||||
"description": frontmatter.get(
|
||||
"description", "Agent loaded from markdown"
|
||||
),
|
||||
"model_name": frontmatter.get("model_name")
|
||||
or frontmatter.get("model", DEFAULT_MODEL),
|
||||
"temperature": frontmatter.get("temperature", 0.1),
|
||||
"max_loops": frontmatter.get("max_loops", 1),
|
||||
"mcp_url": frontmatter.get("mcp_url"),
|
||||
"system_prompt": remaining_content.strip(),
|
||||
"streaming_on": frontmatter.get(
|
||||
"streaming_on", False
|
||||
),
|
||||
}
|
||||
|
||||
# Use default model if not specified
|
||||
if not config_data["model_name"]:
|
||||
config_data["model_name"] = DEFAULT_MODEL
|
||||
|
||||
logger.info(
|
||||
f"Successfully parsed markdown file: {file_path}"
|
||||
)
|
||||
return MarkdownAgentConfig(**config_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error parsing markdown file {file_path}: {str(e)}"
|
||||
)
|
||||
raise ValueError(
|
||||
f"Error parsing markdown file {file_path}: {str(e)}"
|
||||
)
|
||||
|
||||
def load_agent_from_markdown(
|
||||
self, file_path: str, **kwargs
|
||||
) -> "Agent":
|
||||
"""
|
||||
Load a single agent from a markdown file.
|
||||
|
||||
Args:
|
||||
file_path: Path to markdown file
|
||||
**kwargs: Additional arguments to override default configuration
|
||||
|
||||
Returns:
|
||||
Configured Agent instance
|
||||
"""
|
||||
config = self.parse_markdown_file(file_path)
|
||||
|
||||
# Override with any provided kwargs
|
||||
config_dict = config.model_dump()
|
||||
config_dict.update(kwargs)
|
||||
|
||||
# Map config fields to Agent parameters, handling special cases
|
||||
field_mapping = {
|
||||
"name": "agent_name", # name -> agent_name
|
||||
"description": None, # not used by Agent
|
||||
"mcp_url": None, # not used by Agent
|
||||
}
|
||||
|
||||
agent_fields = {}
|
||||
for config_key, config_value in config_dict.items():
|
||||
# Handle special field mappings
|
||||
if config_key in field_mapping:
|
||||
agent_key = field_mapping[config_key]
|
||||
if agent_key: # Only include if mapped to something
|
||||
agent_fields[agent_key] = config_value
|
||||
else:
|
||||
# Direct mapping for most fields
|
||||
agent_fields[config_key] = config_value
|
||||
|
||||
try:
|
||||
# Lazy import to avoid circular dependency
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
logger.info(
|
||||
f"Creating agent '{config.name}' from {file_path}"
|
||||
)
|
||||
agent = Agent(**agent_fields)
|
||||
logger.info(
|
||||
f"Successfully created agent '{config.name}' from {file_path}"
|
||||
)
|
||||
return agent
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
logger.error(
|
||||
f"Error creating agent from {file_path}: {str(e)}"
|
||||
)
|
||||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||||
raise ValueError(
|
||||
f"Error creating agent from {file_path}: {str(e)}"
|
||||
)
|
||||
|
||||
def load_agents_from_markdown(
|
||||
self,
|
||||
file_paths: Union[str, List[str]],
|
||||
concurrent: bool = True,
|
||||
max_workers: Optional[int] = None,
|
||||
max_file_size_mb: float = 10.0,
|
||||
**kwargs,
|
||||
) -> List["Agent"]:
|
||||
"""
|
||||
Load multiple agents from markdown files with optional concurrent processing.
|
||||
|
||||
Args:
|
||||
file_paths: Single file path, directory path, or list of file paths
|
||||
concurrent: Whether to use concurrent processing for multiple files
|
||||
max_workers: Maximum number of worker threads (defaults to CPU count)
|
||||
max_file_size_mb: Maximum file size in MB to prevent memory issues
|
||||
**kwargs: Additional arguments to override default configuration
|
||||
|
||||
Returns:
|
||||
List of configured Agent instances
|
||||
"""
|
||||
agents = []
|
||||
paths_to_process = []
|
||||
|
||||
# Handle different input types
|
||||
if isinstance(file_paths, str):
|
||||
if os.path.isdir(file_paths):
|
||||
# Directory - find all .md files
|
||||
md_files = list(Path(file_paths).glob("*.md"))
|
||||
paths_to_process = [str(f) for f in md_files]
|
||||
elif os.path.isfile(file_paths):
|
||||
# Single file
|
||||
paths_to_process = [file_paths]
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"Path {file_paths} not found."
|
||||
)
|
||||
elif isinstance(file_paths, list):
|
||||
paths_to_process = file_paths
|
||||
else:
|
||||
raise ValueError(
|
||||
"file_paths must be a string or list of strings"
|
||||
)
|
||||
|
||||
# Validate file sizes to prevent memory issues
|
||||
for file_path in paths_to_process:
|
||||
try:
|
||||
file_size_mb = os.path.getsize(file_path) / (
|
||||
1024 * 1024
|
||||
)
|
||||
if file_size_mb > max_file_size_mb:
|
||||
logger.warning(
|
||||
f"Skipping {file_path}: size {file_size_mb:.2f}MB exceeds limit {max_file_size_mb}MB"
|
||||
)
|
||||
paths_to_process.remove(file_path)
|
||||
except OSError:
|
||||
logger.warning(
|
||||
f"Could not check size of {file_path}, skipping validation"
|
||||
)
|
||||
|
||||
# Adjust max_workers for I/O-bound operations
|
||||
if max_workers is None and concurrent:
|
||||
# For I/O-bound: use more threads than CPU count, but cap it
|
||||
max_workers = min(
|
||||
20, len(paths_to_process), os.cpu_count() * 2
|
||||
)
|
||||
|
||||
# Use concurrent processing for multiple files if enabled
|
||||
if concurrent and len(paths_to_process) > 1:
|
||||
logger.info(
|
||||
f"Loading {len(paths_to_process)} agents concurrently with {max_workers} workers..."
|
||||
)
|
||||
|
||||
with ThreadPoolExecutor(
|
||||
max_workers=max_workers
|
||||
) as executor:
|
||||
# Submit all tasks
|
||||
future_to_path = {
|
||||
executor.submit(
|
||||
self.load_agent_from_markdown,
|
||||
file_path,
|
||||
**kwargs,
|
||||
): file_path
|
||||
for file_path in paths_to_process
|
||||
}
|
||||
|
||||
# Collect results as they complete with timeout
|
||||
for future in as_completed(
|
||||
future_to_path, timeout=300
|
||||
): # 5 minute timeout
|
||||
file_path = future_to_path[future]
|
||||
try:
|
||||
agent = future.result(
|
||||
timeout=60
|
||||
) # 1 minute per agent
|
||||
agents.append(agent)
|
||||
logger.info(
|
||||
f"Successfully loaded agent from {file_path}"
|
||||
)
|
||||
except TimeoutError:
|
||||
logger.error(f"Timeout loading {file_path}")
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to load {file_path}: {str(e)}"
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# Sequential processing for single file or when concurrent is disabled
|
||||
logger.info(
|
||||
f"Loading {len(paths_to_process)} agents sequentially..."
|
||||
)
|
||||
for file_path in paths_to_process:
|
||||
try:
|
||||
agent = self.load_agent_from_markdown(
|
||||
file_path, **kwargs
|
||||
)
|
||||
agents.append(agent)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Skipping {file_path} due to error: {str(e)}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"Successfully loaded {len(agents)} agents from markdown files"
|
||||
)
|
||||
return agents
|
||||
|
||||
def load_single_agent(self, file_path: str, **kwargs) -> "Agent":
|
||||
"""
|
||||
Convenience method for loading a single agent.
|
||||
Uses Claude Code sub-agent YAML frontmatter format.
|
||||
|
||||
Args:
|
||||
file_path: Path to markdown file with YAML frontmatter
|
||||
**kwargs: Additional configuration overrides
|
||||
|
||||
Returns:
|
||||
Configured Agent instance
|
||||
"""
|
||||
return self.load_agent_from_markdown(file_path, **kwargs)
|
||||
|
||||
def load_multiple_agents(
|
||||
self, file_paths: Union[str, List[str]], **kwargs
|
||||
) -> List["Agent"]:
|
||||
"""
|
||||
Convenience method for loading multiple agents.
|
||||
Uses Claude Code sub-agent YAML frontmatter format.
|
||||
|
||||
Args:
|
||||
file_paths: Directory path or list of file paths with YAML frontmatter
|
||||
**kwargs: Additional configuration overrides
|
||||
|
||||
Returns:
|
||||
List of configured Agent instances
|
||||
"""
|
||||
return self.load_agents_from_markdown(file_paths, **kwargs)
|
||||
|
||||
|
||||
# Convenience functions
|
||||
def load_agent_from_markdown(file_path: str, **kwargs) -> "Agent":
|
||||
"""
|
||||
Load a single agent from a markdown file with Claude Code YAML frontmatter format.
|
||||
|
||||
Args:
|
||||
file_path: Path to markdown file with YAML frontmatter
|
||||
**kwargs: Additional configuration overrides
|
||||
|
||||
Returns:
|
||||
Configured Agent instance
|
||||
"""
|
||||
loader = AgentLoader()
|
||||
return loader.load_single_agent(file_path, **kwargs)
|
||||
|
||||
|
||||
def load_agents_from_markdown(
|
||||
file_paths: Union[str, List[str]],
|
||||
concurrent: bool = True,
|
||||
max_file_size_mb: float = 10.0,
|
||||
**kwargs,
|
||||
) -> List["Agent"]:
|
||||
"""
|
||||
Load multiple agents from markdown files with Claude Code YAML frontmatter format.
|
||||
|
||||
Args:
|
||||
file_paths: Directory path or list of file paths with YAML frontmatter
|
||||
concurrent: Whether to use concurrent processing for multiple files
|
||||
max_file_size_mb: Maximum file size in MB to prevent memory issues
|
||||
**kwargs: Additional configuration overrides
|
||||
|
||||
Returns:
|
||||
List of configured Agent instances
|
||||
"""
|
||||
loader = AgentLoader()
|
||||
return loader.load_agents_from_markdown(
|
||||
file_paths,
|
||||
concurrent=concurrent,
|
||||
max_file_size_mb=max_file_size_mb,
|
||||
**kwargs,
|
||||
)
|
||||
@ -0,0 +1,85 @@
|
||||
import traceback
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from swarms.utils.litellm_tokenizer import count_tokens
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def dynamic_auto_chunking_(
|
||||
content: str,
|
||||
context_length: Optional[int] = 8192,
|
||||
tokenizer_model_name: Optional[str] = "gpt-4.1",
|
||||
):
|
||||
"""
|
||||
Dynamically chunk the conversation history to fit within the context length.
|
||||
|
||||
Args:
|
||||
content (str): The conversation history as a string.
|
||||
context_length (int): The maximum number of tokens allowed.
|
||||
tokenizer_model_name (str): The name of the tokenizer model to use.
|
||||
|
||||
Returns:
|
||||
str: The chunked conversation history as a string that fits within context_length tokens.
|
||||
"""
|
||||
total_tokens = count_tokens(
|
||||
text=content, model=tokenizer_model_name
|
||||
)
|
||||
|
||||
if total_tokens <= context_length:
|
||||
return content
|
||||
|
||||
# We need to remove characters from the beginning until we're under the limit
|
||||
# Start by removing a percentage of characters and adjust iteratively
|
||||
target_tokens = context_length
|
||||
current_string = content
|
||||
|
||||
# Binary search approach to find the right cutoff point
|
||||
left, right = 0, len(content)
|
||||
|
||||
while left < right:
|
||||
mid = (left + right) // 2
|
||||
test_string = content[mid:]
|
||||
|
||||
if not test_string:
|
||||
break
|
||||
|
||||
test_tokens = count_tokens(
|
||||
text=test_string, model=tokenizer_model_name
|
||||
)
|
||||
|
||||
if test_tokens <= target_tokens:
|
||||
# We can remove more from the beginning
|
||||
right = mid
|
||||
current_string = test_string
|
||||
else:
|
||||
# We need to keep more from the beginning
|
||||
left = mid + 1
|
||||
|
||||
return current_string
|
||||
|
||||
|
||||
def dynamic_auto_chunking(
|
||||
content: str,
|
||||
context_length: Optional[int] = 8192,
|
||||
tokenizer_model_name: Optional[str] = "gpt-4.1",
|
||||
):
|
||||
"""
|
||||
Dynamically chunk the conversation history to fit within the context length.
|
||||
|
||||
Args:
|
||||
content (str): The conversation history as a string.
|
||||
context_length (int): The maximum number of tokens allowed.
|
||||
tokenizer_model_name (str): The name of the tokenizer model to use.
|
||||
"""
|
||||
try:
|
||||
return dynamic_auto_chunking_(
|
||||
content=content,
|
||||
context_length=context_length,
|
||||
tokenizer_model_name=tokenizer_model_name,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Dynamic auto chunking failed: {e} Traceback: {traceback.format_exc()}"
|
||||
)
|
||||
return content
|
||||
@ -1,216 +0,0 @@
|
||||
from swarms.structs.matrix_swarm import AgentMatrix, AgentOutput
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
def create_test_matrix(rows: int, cols: int) -> AgentMatrix:
|
||||
"""Helper function to create a test agent matrix"""
|
||||
agents = [
|
||||
[
|
||||
Agent(
|
||||
agent_name=f"TestAgent-{i}-{j}",
|
||||
system_prompt="Test prompt",
|
||||
)
|
||||
for j in range(cols)
|
||||
]
|
||||
for i in range(rows)
|
||||
]
|
||||
return AgentMatrix(agents)
|
||||
|
||||
|
||||
def test_init():
|
||||
"""Test AgentMatrix initialization"""
|
||||
# Test valid initialization
|
||||
matrix = create_test_matrix(2, 2)
|
||||
assert isinstance(matrix, AgentMatrix)
|
||||
assert len(matrix.agents) == 2
|
||||
assert len(matrix.agents[0]) == 2
|
||||
|
||||
# Test invalid initialization
|
||||
try:
|
||||
AgentMatrix([[1, 2], [3, 4]]) # Non-agent elements
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
AgentMatrix([]) # Empty matrix
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_transpose():
|
||||
"""Test matrix transpose operation"""
|
||||
matrix = create_test_matrix(2, 3)
|
||||
transposed = matrix.transpose()
|
||||
|
||||
assert len(transposed.agents) == 3 # Original cols become rows
|
||||
assert len(transposed.agents[0]) == 2 # Original rows become cols
|
||||
|
||||
# Verify agent positions
|
||||
for i in range(2):
|
||||
for j in range(3):
|
||||
assert (
|
||||
matrix.agents[i][j].agent_name
|
||||
== transposed.agents[j][i].agent_name
|
||||
)
|
||||
|
||||
|
||||
def test_add():
|
||||
"""Test matrix addition"""
|
||||
matrix1 = create_test_matrix(2, 2)
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
|
||||
result = matrix1.add(matrix2)
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2
|
||||
|
||||
# Test incompatible dimensions
|
||||
matrix3 = create_test_matrix(2, 3)
|
||||
try:
|
||||
matrix1.add(matrix3)
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_scalar_multiply():
|
||||
"""Test scalar multiplication"""
|
||||
matrix = create_test_matrix(2, 2)
|
||||
scalar = 3
|
||||
result = matrix.scalar_multiply(scalar)
|
||||
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2 * scalar
|
||||
|
||||
# Verify agent duplication
|
||||
for i in range(len(result.agents)):
|
||||
for j in range(0, len(result.agents[0]), scalar):
|
||||
original_agent = matrix.agents[i][j // scalar]
|
||||
for k in range(scalar):
|
||||
assert (
|
||||
result.agents[i][j + k].agent_name
|
||||
== original_agent.agent_name
|
||||
)
|
||||
|
||||
|
||||
def test_multiply():
|
||||
"""Test matrix multiplication"""
|
||||
matrix1 = create_test_matrix(2, 3)
|
||||
matrix2 = create_test_matrix(3, 2)
|
||||
inputs = ["test query 1", "test query 2"]
|
||||
|
||||
result = matrix1.multiply(matrix2, inputs)
|
||||
assert len(result) == 2 # Number of rows in first matrix
|
||||
assert len(result[0]) == 2 # Number of columns in second matrix
|
||||
|
||||
# Verify output structure
|
||||
for row in result:
|
||||
for output in row:
|
||||
assert isinstance(output, AgentOutput)
|
||||
assert isinstance(output.input_query, str)
|
||||
assert isinstance(output.metadata, dict)
|
||||
|
||||
|
||||
def test_subtract():
|
||||
"""Test matrix subtraction"""
|
||||
matrix1 = create_test_matrix(2, 2)
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
|
||||
result = matrix1.subtract(matrix2)
|
||||
assert len(result.agents) == 2
|
||||
assert len(result.agents[0]) == 2
|
||||
|
||||
|
||||
def test_identity():
|
||||
"""Test identity matrix creation"""
|
||||
matrix = create_test_matrix(3, 3)
|
||||
identity = matrix.identity(3)
|
||||
|
||||
assert len(identity.agents) == 3
|
||||
assert len(identity.agents[0]) == 3
|
||||
|
||||
# Verify diagonal elements are from original matrix
|
||||
for i in range(3):
|
||||
assert (
|
||||
identity.agents[i][i].agent_name
|
||||
== matrix.agents[i][i].agent_name
|
||||
)
|
||||
|
||||
# Verify non-diagonal elements are zero agents
|
||||
for j in range(3):
|
||||
if i != j:
|
||||
assert identity.agents[i][j].agent_name.startswith(
|
||||
"Zero-Agent"
|
||||
)
|
||||
|
||||
|
||||
def test_determinant():
|
||||
"""Test determinant calculation"""
|
||||
# Test 1x1 matrix
|
||||
matrix1 = create_test_matrix(1, 1)
|
||||
det1 = matrix1.determinant()
|
||||
assert det1 is not None
|
||||
|
||||
# Test 2x2 matrix
|
||||
matrix2 = create_test_matrix(2, 2)
|
||||
det2 = matrix2.determinant()
|
||||
assert det2 is not None
|
||||
|
||||
# Test non-square matrix
|
||||
matrix3 = create_test_matrix(2, 3)
|
||||
try:
|
||||
matrix3.determinant()
|
||||
assert False, "Should raise ValueError"
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_save_to_file(tmp_path):
|
||||
"""Test saving matrix to file"""
|
||||
import os
|
||||
|
||||
matrix = create_test_matrix(2, 2)
|
||||
file_path = os.path.join(tmp_path, "test_matrix.json")
|
||||
|
||||
matrix.save_to_file(file_path)
|
||||
assert os.path.exists(file_path)
|
||||
|
||||
# Verify file contents
|
||||
import json
|
||||
|
||||
with open(file_path, "r") as f:
|
||||
data = json.load(f)
|
||||
assert "agents" in data
|
||||
assert "outputs" in data
|
||||
assert len(data["agents"]) == 2
|
||||
assert len(data["agents"][0]) == 2
|
||||
|
||||
|
||||
def run_all_tests():
|
||||
"""Run all test functions"""
|
||||
test_functions = [
|
||||
test_init,
|
||||
test_transpose,
|
||||
test_add,
|
||||
test_scalar_multiply,
|
||||
test_multiply,
|
||||
test_subtract,
|
||||
test_identity,
|
||||
test_determinant,
|
||||
]
|
||||
|
||||
for test_func in test_functions:
|
||||
try:
|
||||
test_func()
|
||||
print(f"✅ {test_func.__name__} passed")
|
||||
except AssertionError as e:
|
||||
print(f"❌ {test_func.__name__} failed: {str(e)}")
|
||||
except Exception as e:
|
||||
print(
|
||||
f"❌ {test_func.__name__} failed with exception: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all_tests()
|
||||
Loading…
Reference in new issue