pull/1022/head
harshalmore31 2 weeks ago
parent 0a929e702b
commit aa1be4c065

@ -6,9 +6,10 @@ The `AgentLoader` is a powerful utility for creating Swarms agents from markdown
The AgentLoader enables you to:
- Load single agents from markdown files with YAML frontmatter
- Load multiple agents from directories or file lists
- Load multiple agents from directories or file lists with concurrent processing
- Parse Claude Code sub-agent YAML frontmatter configurations
- Extract system prompts from markdown content
- Utilize 100% CPU cores for high-performance batch loading
- Provide comprehensive error handling and validation
## Installation
@ -56,42 +57,37 @@ the subagent should follow.
```python
from swarms.utils import load_agent_from_markdown
# Load Claude Code format agent (YAML frontmatter)
agent = load_agent_from_markdown(
file_path="performance-engineer.md" # Uses YAML frontmatter format
)
# Load agent from markdown file
agent = load_agent_from_markdown("finance_advisor.md")
# The agent automatically gets configured with:
# - Name, description from frontmatter
# - Temperature, max_loops, model settings
# - System prompt from content after frontmatter
response = agent.run("Analyze application performance issues")
print(response)
# Use the agent
response = agent.run(
"I have $10,000 to invest. What's a good strategy for a beginner?"
)
```
### Loading Multiple Agents
### Loading Multiple Agents (Concurrent)
```python
from swarms.utils import load_agents_from_markdown
# Load all agents from directory (YAML frontmatter format)
agents = load_agents_from_markdown(
file_paths="./agents_directory/" # Directory with Claude Code format files
)
# Load agents from list of files with concurrent processing
agents = load_agents_from_markdown([
"market_researcher.md",
"financial_analyst.md",
"risk_analyst.md"
], concurrent=True) # Uses all CPU cores for faster loading
# Load agents from specific files
agents = load_agents_from_markdown(
file_paths=[
"performance-engineer.md", # Claude Code YAML format
"financial-analyst.md", # Claude Code YAML format
"security-analyst.md" # Claude Code YAML format
]
# Use agents in a workflow
from swarms.structs import SequentialWorkflow
workflow = SequentialWorkflow(
agents=agents,
max_loops=1
)
print(f"Loaded {len(agents)} agents")
for agent in agents:
print(f"- {agent.agent_name}: {getattr(agent, 'temperature', 'default temp')}")
task = "Analyze the AI healthcare market for a $50M investment."
result = workflow.run(task)
```
## Class-Based Usage
@ -109,8 +105,12 @@ loader = AgentLoader()
# Load single agent
agent = loader.load_single_agent("path/to/agent.md")
# Load multiple agents
agents = loader.load_multiple_agents("./agents_directory/")
# Load multiple agents with concurrent processing
agents = loader.load_multiple_agents(
"./agents_directory/",
concurrent=True, # Enable concurrent processing
max_workers=8 # Optional: limit worker threads
)
# Parse markdown file without creating agent
config = loader.parse_markdown_file("path/to/agent.md")
@ -150,34 +150,39 @@ agent = load_agent_from_markdown(
## Complete Example
### Example: Claude Code Sub-Agent Format
### Example: Finance Advisor Agent
Create a file `performance-engineer.md`:
Create a file `finance_advisor.md`:
```markdown
---
name: performance-engineer
description: Optimize application performance and identify bottlenecks
name: FinanceAdvisor
description: Expert financial advisor for investment and budgeting guidance
model_name: gpt-4
temperature: 0.3
max_loops: 2
mcp_url: http://example.com/mcp
temperature: 0.7
max_loops: 1
---
You are a Performance Engineer specializing in application optimization and scalability.
Your role involves analyzing system performance, identifying bottlenecks, and implementing
solutions to improve efficiency and user experience.
Key responsibilities:
- Profile applications to identify performance issues
- Optimize database queries and caching strategies
- Implement load testing and monitoring solutions
- Recommend infrastructure improvements
- Provide actionable optimization recommendations
Always provide specific, measurable recommendations with implementation details.
Focus on both immediate wins and long-term architectural improvements.
You are an expert financial advisor with deep knowledge in:
- Investment strategies and portfolio management
- Personal budgeting and financial planning
- Risk assessment and diversification
- Tax optimization strategies
- Retirement planning
Your approach:
- Provide clear, actionable financial advice
- Consider individual risk tolerance and goals
- Explain complex concepts in simple terms
- Always emphasize the importance of diversification
- Include relevant disclaimers about financial advice
When analyzing financial situations:
1. Assess current financial position
2. Identify short-term and long-term goals
3. Evaluate risk tolerance
4. Recommend appropriate strategies
5. Suggest specific action steps
```
### Loading and Using the Agent
@ -185,25 +190,13 @@ Focus on both immediate wins and long-term architectural improvements.
```python
from swarms.utils import load_agent_from_markdown
# Load Claude Code format agent (YAML frontmatter)
performance_agent = load_agent_from_markdown(
file_path="performance-engineer.md"
)
print(f"Agent: {performance_agent.agent_name}")
print(f"Temperature: {getattr(performance_agent, 'temperature', 'default')}")
print(f"Max loops: {performance_agent.max_loops}")
print(f"System prompt preview: {performance_agent.system_prompt[:100]}...")
# Load the Finance Advisor agent
agent = load_agent_from_markdown("finance_advisor.md")
# Use the performance agent
task = """
Analyze the performance of a web application that handles 10,000 concurrent users
but is experiencing slow response times averaging 3 seconds. The application uses
a PostgreSQL database and is deployed on AWS with 4 EC2 instances behind a load balancer.
"""
# Note: Actual agent.run() would make API calls
print(f"\nTask for {performance_agent.agent_name}: {task[:100]}...")
# Use the agent for financial advice
response = agent.run(
"I have $10,000 to invest. What's a good strategy for a beginner?"
)
```
## Error Handling
@ -229,6 +222,54 @@ except Exception as e:
print(f"Error loading agents: {e}")
```
## Concurrent Processing Features
### Multi-Core Performance
The AgentLoader utilizes **100% of CPU cores** for concurrent agent loading, providing significant performance improvements when processing multiple markdown files:
```python
from swarms.utils import load_agents_from_markdown
# Automatic concurrent processing for multiple files
agents = load_agents_from_markdown([
"agent1.md", "agent2.md", "agent3.md", "agent4.md"
]) # concurrent=True by default
# Manual control over concurrency
agents = load_agents_from_markdown(
"./agents_directory/",
concurrent=True, # Enable concurrent processing
max_workers=8 # Limit to 8 worker threads
)
# Disable concurrency for debugging or single files
agents = load_agents_from_markdown(
["single_agent.md"],
concurrent=False # Sequential processing
)
```
### Resource Management
```python
# Default: Uses all CPU cores
agents = load_agents_from_markdown(files, concurrent=True)
# Custom worker count for resource control
agents = load_agents_from_markdown(
files,
concurrent=True,
max_workers=4 # Limit to 4 threads
)
# ThreadPoolExecutor automatically manages:
# - Thread lifecycle
# - Resource cleanup
# - Exception handling
# - Result collection
```
## Advanced Features
### Custom System Prompt Building
@ -249,33 +290,6 @@ print("Generated System Prompt:")
print(config.system_prompt)
```
### Batch Processing
Process multiple agent files efficiently:
```python
import os
from pathlib import Path
from swarms.utils import AgentLoader
loader = AgentLoader()
# Find all markdown files in a directory
agent_dir = Path("./agents")
md_files = list(agent_dir.glob("*.md"))
# Load all agents
agents = []
for file_path in md_files:
try:
agent = loader.load_single_agent(str(file_path))
agents.append(agent)
print(f"✓ Loaded: {agent.agent_name}")
except Exception as e:
print(f"✗ Failed to load {file_path}: {e}")
print(f"\nSuccessfully loaded {len(agents)} agents")
```
## Integration with Swarms
@ -323,8 +337,13 @@ class AgentLoader:
### Convenience Functions
```python
def load_agent_from_markdown(file_path: str, model: Optional[LiteLLM] = None, **kwargs) -> Agent
def load_agents_from_markdown(file_paths: Union[str, List[str]], model: Optional[LiteLLM] = None, **kwargs) -> List[Agent]
def load_agent_from_markdown(file_path: str, **kwargs) -> Agent
def load_agents_from_markdown(
file_paths: Union[str, List[str]],
concurrent: bool = True, # Enable concurrent processing
max_workers: Optional[int] = None, # Max worker threads (defaults to CPU count)
**kwargs
) -> List[Agent]
```
### Configuration Model
@ -347,9 +366,8 @@ class MarkdownAgentConfig(BaseModel):
## Examples Repository
Find more examples in the Swarms repository:
- `examples/agents_loader_example.py` - Complete usage demonstration
- `test_agent_loader.py` - Test suite with validation examples
- `examples/single_agent/utils/markdown_agent.py` - Markdown agent utilities
- `agents_loader_example.py` - Simple usage example
- `examples/agent_loader_demo.py` - Multi-agent workflow example
## Support

@ -22,6 +22,12 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.check_all_model_max_tokens import (
check_all_model_max_tokens,
)
from swarms.utils.agent_loader import (
AgentLoader,
MarkdownAgentConfig,
load_agent_from_markdown,
load_agents_from_markdown,
)
__all__ = [
@ -41,4 +47,8 @@ __all__ = [
"HistoryOutputType",
"history_output_formatter",
"check_all_model_max_tokens",
"AgentLoader",
"MarkdownAgentConfig",
"load_agent_from_markdown",
"load_agents_from_markdown",
]

@ -1,22 +1,29 @@
import os
import re
import random
import yaml
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
TimeoutError,
)
from pydantic import BaseModel, Field, field_validator
from loguru import logger
from swarms.structs.agent import Agent
# Default model configuration
DEFAULT_MODEL = "gpt-4o"
class MarkdownAgentConfig(BaseModel):
"""Configuration model for agents loaded from Claude Code markdown files."""
name: str
description: str
model_name: Optional[str] = "gpt-4"
model_name: Optional[str] = "gpt-4o"
temperature: Optional[float] = Field(default=0.1, ge=0.0, le=2.0)
mcp_url: Optional[str] = None
mcp_url: Optional[int] = None
system_prompt: str
max_loops: int = Field(default=1, ge=1)
autosave: bool = False
@ -33,22 +40,25 @@ class MarkdownAgentConfig(BaseModel):
artifacts_on: bool = False
artifacts_file_extension: str = ".md"
artifacts_output_path: str = ""
streaming_on: bool = False
@field_validator("system_prompt")
@classmethod
def validate_system_prompt(cls, v):
if not v or not isinstance(v, str) or len(v.strip()) == 0:
raise ValueError("System prompt must be a non-empty string")
raise ValueError(
"System prompt must be a non-empty string"
)
return v
class AgentLoader:
"""
Loader for creating agents from markdown files using Claude Code sub-agent format.
Supports both single markdown file and multiple markdown files.
Uses YAML frontmatter format for agent configuration.
Features:
- Single markdown file loading
- Multiple markdown files loading (batch processing)
@ -56,223 +66,335 @@ class AgentLoader:
- Agent configuration extraction from YAML metadata
- Error handling and validation
"""
def __init__(self):
"""
Initialize the AgentLoader.
"""
pass
def parse_yaml_frontmatter(self, content: str) -> Dict[str, Any]:
"""
Parse YAML frontmatter from markdown content.
Args:
content: Markdown content with potential YAML frontmatter
Returns:
Dictionary with parsed YAML data and remaining content
"""
lines = content.split('\n')
lines = content.split("\n")
# Check if content starts with YAML frontmatter
if not lines[0].strip() == '---':
if not lines[0].strip() == "---":
return {"frontmatter": {}, "content": content}
# Find end of frontmatter
end_marker = -1
for i, line in enumerate(lines[1:], 1):
if line.strip() == '---':
if line.strip() == "---":
end_marker = i
break
if end_marker == -1:
return {"frontmatter": {}, "content": content}
# Extract frontmatter and content
frontmatter_text = '\n'.join(lines[1:end_marker])
remaining_content = '\n'.join(lines[end_marker + 1:]).strip()
frontmatter_text = "\n".join(lines[1:end_marker])
remaining_content = "\n".join(lines[end_marker + 1 :]).strip()
try:
frontmatter_data = yaml.safe_load(frontmatter_text) or {}
except yaml.YAMLError as e:
logger.warning(f"Failed to parse YAML frontmatter: {e}")
return {"frontmatter": {}, "content": content}
return {"frontmatter": frontmatter_data, "content": remaining_content}
def parse_markdown_file(self, file_path: str) -> MarkdownAgentConfig:
return {
"frontmatter": frontmatter_data,
"content": remaining_content,
}
def parse_markdown_file(
self, file_path: str
) -> MarkdownAgentConfig:
"""
Parse a single markdown file to extract agent configuration.
Uses Claude Code sub-agent YAML frontmatter format.
Args:
file_path: Path to markdown file
Returns:
MarkdownAgentConfig object with parsed configuration
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If parsing fails or no YAML frontmatter found
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Markdown file {file_path} not found.")
raise FileNotFoundError(
f"Markdown file {file_path} not found."
)
try:
with open(file_path, 'r', encoding='utf-8') as file:
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
# Parse YAML frontmatter (Claude Code sub-agent format)
yaml_result = self.parse_yaml_frontmatter(content)
frontmatter = yaml_result["frontmatter"]
remaining_content = yaml_result["content"]
if not frontmatter:
raise ValueError(f"No YAML frontmatter found in {file_path}. File must use Claude Code sub-agent format with YAML frontmatter.")
raise ValueError(
f"No YAML frontmatter found in {file_path}. File must use Claude Code sub-agent format with YAML frontmatter."
)
# Use YAML frontmatter data
config_data = {
'name': frontmatter.get('name', Path(file_path).stem),
'description': frontmatter.get('description', 'Agent loaded from markdown'),
'model_name': frontmatter.get('model_name') or frontmatter.get('model', 'gpt-4'),
'temperature': frontmatter.get('temperature', 0.1),
'max_loops': frontmatter.get('max_loops', 1),
'mcp_url': frontmatter.get('mcp_url'),
'system_prompt': remaining_content.strip(),
"name": frontmatter.get("name", Path(file_path).stem),
"description": frontmatter.get(
"description", "Agent loaded from markdown"
),
"model_name": frontmatter.get("model_name")
or frontmatter.get("model", DEFAULT_MODEL),
"temperature": frontmatter.get("temperature", 0.1),
"max_loops": frontmatter.get("max_loops", 1),
"mcp_url": frontmatter.get("mcp_url"),
"system_prompt": remaining_content.strip(),
"streaming_on": frontmatter.get(
"streaming_on", False
),
}
# Generate random model if not specified
if not config_data['model_name'] or config_data['model_name'] == 'random':
models = ['gpt-4', 'gpt-4-turbo', 'claude-3-sonnet', 'claude-3-haiku']
config_data['model_name'] = random.choice(models)
logger.info(f"Successfully parsed markdown file: {file_path}")
# Use default model if not specified
if not config_data["model_name"]:
config_data["model_name"] = DEFAULT_MODEL
logger.info(
f"Successfully parsed markdown file: {file_path}"
)
return MarkdownAgentConfig(**config_data)
except Exception as e:
logger.error(f"Error parsing markdown file {file_path}: {str(e)}")
raise ValueError(f"Error parsing markdown file {file_path}: {str(e)}")
def load_agent_from_markdown(self, file_path: str, **kwargs) -> Agent:
logger.error(
f"Error parsing markdown file {file_path}: {str(e)}"
)
raise ValueError(
f"Error parsing markdown file {file_path}: {str(e)}"
)
def load_agent_from_markdown(
self, file_path: str, **kwargs
) -> Agent:
"""
Load a single agent from a markdown file.
Args:
file_path: Path to markdown file
**kwargs: Additional arguments to override default configuration
Returns:
Configured Agent instance
"""
config = self.parse_markdown_file(file_path)
# Override with any provided kwargs
config_dict = config.model_dump()
config_dict.update(kwargs)
# Remove fields not needed for Agent creation
agent_fields = {
'agent_name': config_dict['name'],
'system_prompt': config_dict['system_prompt'],
'model_name': config_dict.get('model_name', 'gpt-4'),
'temperature': config_dict.get('temperature', 0.1),
'max_loops': config_dict['max_loops'],
'autosave': config_dict['autosave'],
'dashboard': config_dict['dashboard'],
'verbose': config_dict['verbose'],
'dynamic_temperature_enabled': config_dict['dynamic_temperature_enabled'],
'saved_state_path': config_dict['saved_state_path'],
'user_name': config_dict['user_name'],
'retry_attempts': config_dict['retry_attempts'],
'context_length': config_dict['context_length'],
'return_step_meta': config_dict['return_step_meta'],
'output_type': config_dict['output_type'],
'auto_generate_prompt': config_dict['auto_generate_prompt'],
'artifacts_on': config_dict['artifacts_on'],
'artifacts_file_extension': config_dict['artifacts_file_extension'],
'artifacts_output_path': config_dict['artifacts_output_path'],
# Map config fields to Agent parameters, handling special cases
field_mapping = {
"name": "agent_name", # name -> agent_name
"description": None, # not used by Agent
"mcp_url": None, # not used by Agent
}
agent_fields = {}
for config_key, config_value in config_dict.items():
# Handle special field mappings
if config_key in field_mapping:
agent_key = field_mapping[config_key]
if agent_key: # Only include if mapped to something
agent_fields[agent_key] = config_value
else:
# Direct mapping for most fields
agent_fields[config_key] = config_value
try:
logger.info(f"Creating agent '{config.name}' from {file_path}")
logger.info(
f"Creating agent '{config.name}' from {file_path}"
)
agent = Agent(**agent_fields)
logger.info(f"Successfully created agent '{config.name}' from {file_path}")
logger.info(
f"Successfully created agent '{config.name}' from {file_path}"
)
return agent
except Exception as e:
import traceback
logger.error(f"Error creating agent from {file_path}: {str(e)}")
logger.error(
f"Error creating agent from {file_path}: {str(e)}"
)
logger.error(f"Traceback: {traceback.format_exc()}")
raise ValueError(f"Error creating agent from {file_path}: {str(e)}")
def load_agents_from_markdown(self, file_paths: Union[str, List[str]], **kwargs) -> List[Agent]:
raise ValueError(
f"Error creating agent from {file_path}: {str(e)}"
)
def load_agents_from_markdown(
self,
file_paths: Union[str, List[str]],
concurrent: bool = True,
max_workers: Optional[int] = None,
max_file_size_mb: float = 10.0,
**kwargs,
) -> List[Agent]:
"""
Load multiple agents from markdown files.
Load multiple agents from markdown files with optional concurrent processing.
Args:
file_paths: Single file path, directory path, or list of file paths
concurrent: Whether to use concurrent processing for multiple files
max_workers: Maximum number of worker threads (defaults to CPU count)
max_file_size_mb: Maximum file size in MB to prevent memory issues
**kwargs: Additional arguments to override default configuration
Returns:
List of configured Agent instances
"""
agents = []
paths_to_process = []
# Handle different input types
if isinstance(file_paths, str):
if os.path.isdir(file_paths):
# Directory - find all .md files
md_files = list(Path(file_paths).glob('*.md'))
md_files = list(Path(file_paths).glob("*.md"))
paths_to_process = [str(f) for f in md_files]
elif os.path.isfile(file_paths):
# Single file
paths_to_process = [file_paths]
else:
raise FileNotFoundError(f"Path {file_paths} not found.")
raise FileNotFoundError(
f"Path {file_paths} not found."
)
elif isinstance(file_paths, list):
paths_to_process = file_paths
else:
raise ValueError("file_paths must be a string or list of strings")
# Process each file
raise ValueError(
"file_paths must be a string or list of strings"
)
# Validate file sizes to prevent memory issues
for file_path in paths_to_process:
try:
agent = self.load_agent_from_markdown(file_path, **kwargs)
agents.append(agent)
except Exception as e:
logger.warning(f"Skipping {file_path} due to error: {str(e)}")
continue
logger.info(f"Successfully loaded {len(agents)} agents from markdown files")
file_size_mb = os.path.getsize(file_path) / (
1024 * 1024
)
if file_size_mb > max_file_size_mb:
logger.warning(
f"Skipping {file_path}: size {file_size_mb:.2f}MB exceeds limit {max_file_size_mb}MB"
)
paths_to_process.remove(file_path)
except OSError:
logger.warning(
f"Could not check size of {file_path}, skipping validation"
)
# Adjust max_workers for I/O-bound operations
if max_workers is None and concurrent:
# For I/O-bound: use more threads than CPU count, but cap it
max_workers = min(
20, len(paths_to_process), os.cpu_count() * 2
)
# Use concurrent processing for multiple files if enabled
if concurrent and len(paths_to_process) > 1:
logger.info(
f"Loading {len(paths_to_process)} agents concurrently with {max_workers} workers..."
)
with ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all tasks
future_to_path = {
executor.submit(
self.load_agent_from_markdown,
file_path,
**kwargs,
): file_path
for file_path in paths_to_process
}
# Collect results as they complete with timeout
for future in as_completed(
future_to_path, timeout=300
): # 5 minute timeout
file_path = future_to_path[future]
try:
agent = future.result(
timeout=60
) # 1 minute per agent
agents.append(agent)
logger.info(
f"Successfully loaded agent from {file_path}"
)
except TimeoutError:
logger.error(f"Timeout loading {file_path}")
continue
except Exception as e:
logger.error(
f"Failed to load {file_path}: {str(e)}"
)
continue
else:
# Sequential processing for single file or when concurrent is disabled
logger.info(
f"Loading {len(paths_to_process)} agents sequentially..."
)
for file_path in paths_to_process:
try:
agent = self.load_agent_from_markdown(
file_path, **kwargs
)
agents.append(agent)
except Exception as e:
logger.warning(
f"Skipping {file_path} due to error: {str(e)}"
)
continue
logger.info(
f"Successfully loaded {len(agents)} agents from markdown files"
)
return agents
def load_single_agent(self, file_path: str, **kwargs) -> Agent:
"""
Convenience method for loading a single agent.
Uses Claude Code sub-agent YAML frontmatter format.
Args:
file_path: Path to markdown file with YAML frontmatter
**kwargs: Additional configuration overrides
Returns:
Configured Agent instance
"""
return self.load_agent_from_markdown(file_path, **kwargs)
def load_multiple_agents(self, file_paths: Union[str, List[str]], **kwargs) -> List[Agent]:
def load_multiple_agents(
self, file_paths: Union[str, List[str]], **kwargs
) -> List[Agent]:
"""
Convenience method for loading multiple agents.
Uses Claude Code sub-agent YAML frontmatter format.
Args:
file_paths: Directory path or list of file paths with YAML frontmatter
**kwargs: Additional configuration overrides
Returns:
List of configured Agent instances
"""
@ -283,11 +405,11 @@ class AgentLoader:
def load_agent_from_markdown(file_path: str, **kwargs) -> Agent:
"""
Load a single agent from a markdown file with Claude Code YAML frontmatter format.
Args:
file_path: Path to markdown file with YAML frontmatter
**kwargs: Additional configuration overrides
Returns:
Configured Agent instance
"""
@ -295,16 +417,28 @@ def load_agent_from_markdown(file_path: str, **kwargs) -> Agent:
return loader.load_single_agent(file_path, **kwargs)
def load_agents_from_markdown(file_paths: Union[str, List[str]], **kwargs) -> List[Agent]:
def load_agents_from_markdown(
file_paths: Union[str, List[str]],
concurrent: bool = True,
max_file_size_mb: float = 10.0,
**kwargs,
) -> List[Agent]:
"""
Load multiple agents from markdown files with Claude Code YAML frontmatter format.
Args:
file_paths: Directory path or list of file paths with YAML frontmatter
concurrent: Whether to use concurrent processing for multiple files
max_file_size_mb: Maximum file size in MB to prevent memory issues
**kwargs: Additional configuration overrides
Returns:
List of configured Agent instances
"""
loader = AgentLoader()
return loader.load_multiple_agents(file_paths, **kwargs)
return loader.load_agents_from_markdown(
file_paths,
concurrent=concurrent,
max_file_size_mb=max_file_size_mb,
**kwargs,
)

Loading…
Cancel
Save