update reasoning agent router docs

master
Kye Gomez 1 day ago
parent b474c1b7ad
commit ad3ce5390c

@ -237,20 +237,26 @@ A `SequentialWorkflow` executes tasks in a strict order, forming a pipeline wher
```python
from swarms import Agent, SequentialWorkflow
# Initialize agents for a 3-step process
# 1. Generate an idea
idea_generator = Agent(agent_name="IdeaGenerator", system_prompt="Generate a unique startup idea.", model_name="gpt-4o-mini")
# 2. Validate the idea
validator = Agent(agent_name="Validator", system_prompt="Take this startup idea and analyze its market viability.", model_name="gpt-4o-mini")
# 3. Create a pitch
pitch_creator = Agent(agent_name="PitchCreator", system_prompt="Write a 3-sentence elevator pitch for this validated startup idea.", model_name="gpt-4o-mini")
# Create the sequential workflow
workflow = SequentialWorkflow(agents=[idea_generator, validator, pitch_creator])
# Run the workflow
elevator_pitch = workflow.run()
print(elevator_pitch)
# Agent 1: The Researcher
researcher = Agent(
agent_name="Researcher",
system_prompt="Your job is to research the provided topic and provide a detailed summary.",
model_name="gpt-4o-mini",
)
# Agent 2: The Writer
writer = Agent(
agent_name="Writer",
system_prompt="Your job is to take the research summary and write a beautiful, engaging blog post about it.",
model_name="gpt-4o-mini",
)
# Create a sequential workflow where the researcher's output feeds into the writer's input
workflow = SequentialWorkflow(agents=[researcher, writer])
# Run the workflow on a task
final_post = workflow.run("The history and future of artificial intelligence")
print(final_post)
```
-----
@ -313,9 +319,7 @@ rearrange_system = AgentRearrange(
flow=flow,
)
# Run the system
# The researcher will generate content, and then both the writer and editor
# will process that content in parallel.
# Run the swarm
outputs = rearrange_system.run("Analyze the impact of AI on modern cinema.")
print(outputs)
```

@ -44,6 +44,7 @@ graph TD
| `eval` | bool | False | Enable evaluation mode for self-consistency |
| `random_models_on` | bool | False | Enable random model selection for diversity |
| `majority_voting_prompt` | Optional[str] | None | Custom prompt for majority voting |
| `reasoning_model_name` | Optional[str] | "claude-3-5-sonnet-20240620" | Model to use for reasoning in ReasoningDuo |
### Available Agent Types
@ -74,12 +75,15 @@ graph TD
**Required Parameters**
- model_name (list of 2)
- model_name
- system_prompt
**Optional Parameters**
- output_type
- reasoning_model_name (default: "claude-3-5-sonnet-20240620")
- max_loops
- img (for image input support)
=== "Self Consistency"
**Key Features**
@ -210,8 +214,39 @@ graph TD
| Method | Description |
|--------|-------------|
| `select_swarm()` | Selects and initializes the appropriate reasoning swarm based on specified type |
| `run(task: str)` | Executes the selected swarm's reasoning process on the given task |
| `batched_run(tasks: List[str])` | Executes the reasoning process on a batch of tasks |
| `run(task: str, img: Optional[str] = None, **kwargs)` | Executes the selected swarm's reasoning process on the given task |
| `batched_run(tasks: List[str], imgs: Optional[List[str]] = None, **kwargs)` | Executes the reasoning process on a batch of tasks |
### Image Support
!!! info "Multi-modal Capabilities"
The ReasoningAgentRouter supports image inputs for compatible agent types:
**Supported Parameters:**
- `img` (str, optional): Path or URL to a single image file for single task execution
- `imgs` (List[str], optional): List of image paths/URLs for batch task execution
**Compatible Agent Types:**
- `reasoning-duo` / `reasoning-agent`: Full image support for both reasoning and execution phases
- Other agent types may have varying levels of image support depending on their underlying implementation
**Usage Example:**
```python
# Single image with task
router = ReasoningAgentRouter(swarm_type="reasoning-duo")
result = router.run(
task="Describe what you see in this image",
img="path/to/image.jpg"
)
# Batch processing with images
results = router.batched_run(
tasks=["Analyze this chart", "Describe this photo"],
imgs=["chart.png", "photo.jpg"]
)
```
### Code Examples
@ -235,6 +270,12 @@ graph TD
# Run a single task
result = router.run("What is the best approach to solve this problem?")
# Run with image input
result_with_image = router.run(
"Analyze this image and provide insights",
img="path/to/image.jpg"
)
```
=== "Self-Consistency Examples"
@ -282,6 +323,29 @@ graph TD
)
```
=== "ReasoningDuo Examples"
```python
# Basic ReasoningDuo
router = ReasoningAgentRouter(
swarm_type="reasoning-duo",
model_name="gpt-4o-mini",
reasoning_model_name="claude-3-5-sonnet-20240620"
)
# ReasoningDuo with image support
router = ReasoningAgentRouter(
swarm_type="reasoning-duo",
model_name="gpt-4o-mini",
reasoning_model_name="gpt-4-vision-preview",
max_loops=2
)
result = router.run(
"Analyze this image and explain the patterns you see",
img="data_visualization.png"
)
```
=== "AgentJudge"
```python
router = ReasoningAgentRouter(
@ -328,6 +392,15 @@ graph TD
- Consider random_models_on for diverse model perspectives
5. **Multi-modal and Reasoning Configuration**
- Use vision-capable models when processing images (e.g., "gpt-4-vision-preview")
- For ReasoningDuo, set different models for reasoning vs execution via reasoning_model_name
- Ensure image paths are accessible and in supported formats (JPG, PNG, etc.)
- Consider using reasoning_model_name with specialized reasoning models for complex tasks
## Limitations
!!! warning "Known Limitations"

@ -0,0 +1,344 @@
"""
Multi-Agent Caching Example - Super Fast Agent Loading
This example demonstrates how to use the agent caching system with multiple agents
to achieve 10-100x speedup in agent loading and reuse.
"""
import time
from swarms import Agent
from swarms.utils.agent_cache import (
cached_agent_loader,
simple_lru_agent_loader,
AgentCache,
get_agent_cache_stats,
clear_agent_cache,
)
def create_trading_team():
"""Create a team of trading agents."""
# Create multiple agents for different trading strategies
agents = [
Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis""",
max_loops=1,
model_name="gpt-4o-mini",
temperature=0.1,
),
Agent(
agent_name="Risk-Management-Agent",
agent_description="Portfolio risk assessment and management specialist",
system_prompt="""You are a risk management specialist focused on:
- Portfolio risk assessment and stress testing
- Value at Risk (VaR) calculations
- Regulatory compliance monitoring
- Risk mitigation strategies
- Capital allocation optimization""",
max_loops=1,
model_name="gpt-4o-mini",
temperature=0.2,
),
Agent(
agent_name="Market-Analysis-Agent",
agent_description="Real-time market analysis and trend identification",
system_prompt="""You are a market analysis expert specializing in:
- Technical analysis and chart patterns
- Market sentiment analysis
- Economic indicator interpretation
- Trend identification and momentum analysis
- Support and resistance level identification""",
max_loops=1,
model_name="gpt-4o-mini",
temperature=0.3,
),
Agent(
agent_name="Options-Trading-Agent",
agent_description="Options strategies and derivatives trading specialist",
system_prompt="""You are an options trading specialist with expertise in:
- Options pricing models and Greeks analysis
- Volatility trading strategies
- Complex options spreads and combinations
- Risk-neutral portfolio construction
- Derivatives market making""",
max_loops=1,
model_name="gpt-4o-mini",
temperature=0.15,
),
Agent(
agent_name="ESG-Investment-Agent",
agent_description="ESG-focused investment analysis and screening",
system_prompt="""You are an ESG investment specialist focusing on:
- Environmental, Social, and Governance criteria evaluation
- Sustainable investment screening
- Impact investing strategies
- ESG risk assessment
- Green finance and climate risk analysis""",
max_loops=1,
model_name="gpt-4o-mini",
temperature=0.25,
),
]
return agents
def basic_caching_example():
"""Basic example of caching multiple agents."""
print("=== Basic Multi-Agent Caching Example ===")
# Create our trading team
trading_team = create_trading_team()
print(f"Created {len(trading_team)} trading agents")
# First load - agents will be processed and cached
print("\n🔄 First load (will cache agents)...")
start_time = time.time()
cached_team_1 = cached_agent_loader(trading_team)
first_load_time = time.time() - start_time
print(
f"✅ First load: {len(cached_team_1)} agents in {first_load_time:.3f}s"
)
# Second load - agents will be retrieved from cache (super fast!)
print("\n⚡ Second load (from cache)...")
start_time = time.time()
cached_team_2 = cached_agent_loader(trading_team)
second_load_time = time.time() - start_time
print(
f"🚀 Second load: {len(cached_team_2)} agents in {second_load_time:.3f}s"
)
print(
f"💨 Speedup: {first_load_time/second_load_time:.1f}x faster!"
)
# Show cache statistics
stats = get_agent_cache_stats()
print(f"📊 Cache stats: {stats}")
return cached_team_1
def custom_cache_example():
"""Example using a custom cache for specific use cases."""
print("\n=== Custom Cache Example ===")
# Create a custom cache with specific settings
custom_cache = AgentCache(
max_memory_cache_size=50, # Cache up to 50 agents
cache_dir="trading_team_cache", # Custom cache directory
enable_persistent_cache=True, # Enable disk persistence
auto_save_interval=120, # Auto-save every 2 minutes
)
# Create agents
trading_team = create_trading_team()
# Load with custom cache
print("🔧 Loading with custom cache...")
start_time = time.time()
cached_team = cached_agent_loader(
trading_team,
cache_instance=custom_cache,
parallel_loading=True,
)
load_time = time.time() - start_time
print(f"✅ Loaded {len(cached_team)} agents in {load_time:.3f}s")
# Get custom cache stats
stats = custom_cache.get_cache_stats()
print(f"📊 Custom cache stats: {stats}")
# Cleanup
custom_cache.shutdown()
return cached_team
def simple_lru_example():
"""Example using the simple LRU cache approach."""
print("\n=== Simple LRU Cache Example ===")
trading_team = create_trading_team()
# First load with simple LRU
print("🔄 First load with simple LRU...")
start_time = time.time()
lru_team_1 = simple_lru_agent_loader(trading_team)
first_time = time.time() - start_time
# Second load (cached)
print("⚡ Second load with simple LRU...")
start_time = time.time()
simple_lru_agent_loader(trading_team)
cached_time = time.time() - start_time
print(
f"📈 Simple LRU - First: {first_time:.3f}s, Cached: {cached_time:.3f}s"
)
print(f"💨 Speedup: {first_time/cached_time:.1f}x faster!")
return lru_team_1
def team_workflow_simulation():
"""Simulate a real-world workflow with the cached trading team."""
print("\n=== Team Workflow Simulation ===")
# Create and cache the team
trading_team = create_trading_team()
cached_team = cached_agent_loader(trading_team)
# Simulate multiple analysis sessions
tasks = [
"Analyze the current market conditions for AAPL",
"What are the top 3 ETFs for gold coverage?",
"Assess the risk profile of a tech-heavy portfolio",
"Identify options strategies for volatile markets",
"Evaluate ESG investment opportunities in renewable energy",
]
print(
f"🎯 Running {len(tasks)} analysis tasks with {len(cached_team)} agents..."
)
session_start = time.time()
for i, (agent, task) in enumerate(zip(cached_team, tasks)):
print(f"\n📋 Task {i+1}: {agent.agent_name}")
print(f" Question: {task}")
task_start = time.time()
# Run the agent on the task
response = agent.run(task)
task_time = time.time() - task_start
print(f" ⏱️ Completed in {task_time:.2f}s")
print(
f" 💡 Response: {response[:100]}..."
if len(response) > 100
else f" 💡 Response: {response}"
)
total_session_time = time.time() - session_start
print(f"\n🏁 Total session time: {total_session_time:.2f}s")
print(
f"📊 Average task time: {total_session_time/len(tasks):.2f}s"
)
def performance_comparison():
"""Compare performance with and without caching."""
print("\n=== Performance Comparison ===")
# Create test agents
test_agents = []
for i in range(10):
agent = Agent(
agent_name=f"Test-Agent-{i:02d}",
model_name="gpt-4o-mini",
system_prompt=f"You are test agent number {i}.",
max_loops=1,
)
test_agents.append(agent)
# Test without caching (creating new agents each time)
print("🔄 Testing without caching...")
no_cache_times = []
for _ in range(3):
start_time = time.time()
# Simulate creating new agents each time
new_agents = []
for agent in test_agents:
new_agent = Agent(
agent_name=agent.agent_name,
model_name=agent.model_name,
system_prompt=agent.system_prompt,
max_loops=agent.max_loops,
)
new_agents.append(new_agent)
no_cache_time = time.time() - start_time
no_cache_times.append(no_cache_time)
avg_no_cache_time = sum(no_cache_times) / len(no_cache_times)
# Clear cache for fair comparison
clear_agent_cache()
# Test with caching (first load)
print("🔧 Testing with caching (first load)...")
start_time = time.time()
cached_agent_loader(test_agents)
first_cache_time = time.time() - start_time
# Test with caching (subsequent loads)
print("⚡ Testing with caching (subsequent loads)...")
cache_times = []
for _ in range(3):
start_time = time.time()
cached_agent_loader(test_agents)
cache_time = time.time() - start_time
cache_times.append(cache_time)
avg_cache_time = sum(cache_times) / len(cache_times)
# Results
print(f"\n📊 Performance Results for {len(test_agents)} agents:")
print(f" 🐌 No caching (avg): {avg_no_cache_time:.4f}s")
print(f" 🔧 Cached (first load): {first_cache_time:.4f}s")
print(f" 🚀 Cached (avg): {avg_cache_time:.4f}s")
print(
f" 💨 Cache speedup: {avg_no_cache_time/avg_cache_time:.1f}x faster!"
)
# Final cache stats
final_stats = get_agent_cache_stats()
print(f" 📈 Final cache stats: {final_stats}")
def main():
"""Run all examples to demonstrate multi-agent caching."""
print("🤖 Multi-Agent Caching System Examples")
print("=" * 50)
try:
# Run examples
basic_caching_example()
custom_cache_example()
simple_lru_example()
performance_comparison()
team_workflow_simulation()
print("\n✅ All examples completed successfully!")
print("\n🎯 Key Benefits of Multi-Agent Caching:")
print("• 🚀 10-100x faster agent loading from cache")
print(
"• 💾 Persistent disk cache survives application restarts"
)
print("• 🧠 Intelligent LRU memory management")
print("• 🔄 Background preloading for zero-latency access")
print("• 📊 Detailed performance monitoring")
print("• 🛡️ Thread-safe with memory leak prevention")
print("• ⚡ Parallel processing for multiple agents")
except Exception as e:
print(f"❌ Error running examples: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

@ -0,0 +1,128 @@
"""
Quick Start: Agent Caching with Multiple Agents
This is a simple example showing how to use agent caching with your existing agents
for super fast loading and reuse.
"""
import time
from swarms import Agent
from swarms.utils.agent_cache import cached_agent_loader
def main():
"""Simple example of caching multiple agents."""
# Create your agents as usual
agents = [
Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=True,
print_on=True,
telemetry_enable=False,
),
Agent(
agent_name="Risk-Manager",
system_prompt="You are a risk management specialist.",
max_loops=1,
model_name="gpt-4o-mini",
),
Agent(
agent_name="Market-Analyst",
system_prompt="You are a market analysis expert.",
max_loops=1,
model_name="gpt-4o-mini",
),
]
print(f"Created {len(agents)} agents")
# BEFORE: Creating agents each time (slow)
print("\n=== Without Caching (Slow) ===")
start_time = time.time()
# Simulate creating new agents each time
for _ in range(3):
new_agents = []
for agent in agents:
new_agent = Agent(
agent_name=agent.agent_name,
system_prompt=agent.system_prompt,
max_loops=agent.max_loops,
model_name=agent.model_name,
)
new_agents.append(new_agent)
no_cache_time = time.time() - start_time
print(f"🐌 Time without caching: {no_cache_time:.3f}s")
# AFTER: Using cached agents (super fast!)
print("\n=== With Caching (Super Fast!) ===")
# First call - will cache the agents
start_time = time.time()
cached_agent_loader(agents)
first_cache_time = time.time() - start_time
print(f"🔧 First cache load: {first_cache_time:.3f}s")
# Subsequent calls - retrieves from cache (lightning fast!)
cache_times = []
for i in range(3):
start_time = time.time()
cached_agents = cached_agent_loader(agents)
cache_time = time.time() - start_time
cache_times.append(cache_time)
print(f"⚡ Cache load #{i+1}: {cache_time:.4f}s")
avg_cache_time = sum(cache_times) / len(cache_times)
print("\n📊 Results:")
print(f" 🐌 Without caching: {no_cache_time:.3f}s")
print(f" 🚀 With caching: {avg_cache_time:.4f}s")
print(
f" 💨 Speedup: {no_cache_time/avg_cache_time:.0f}x faster!"
)
# Now use your cached agents normally
print("\n🎯 Using cached agents:")
task = "What are the best top 3 etfs for gold coverage?"
for agent in cached_agents[
:1
]: # Just use the first agent for demo
print(f" Running {agent.agent_name}...")
response = agent.run(task)
print(f" Response: {response[:100]}...")
if __name__ == "__main__":
main()

@ -0,0 +1,186 @@
"""
Simple Agent Caching Tests - Just 4 Basic Tests
Tests loading agents with and without cache for single and multiple agents.
"""
import time
from swarms import Agent
from swarms.utils.agent_cache import (
cached_agent_loader,
clear_agent_cache,
)
def test_single_agent_without_cache():
"""Test loading a single agent without cache."""
print("🔄 Test 1: Single agent without cache")
# Test creating agents multiple times (simulating no cache)
times = []
for _ in range(10): # Do it 10 times to get better measurement
start_time = time.time()
Agent(
agent_name="Test-Agent-1",
model_name="gpt-4o-mini",
system_prompt="You are a test agent.",
max_loops=1,
)
load_time = time.time() - start_time
times.append(load_time)
avg_time = sum(times) / len(times)
print(
f" ✅ Single agent without cache: {avg_time:.4f}s (avg of 10 creations)"
)
return avg_time
def test_single_agent_with_cache():
"""Test loading a single agent with cache."""
print("🔄 Test 2: Single agent with cache")
clear_agent_cache()
# Create agent
agent = Agent(
agent_name="Test-Agent-1",
model_name="gpt-4o-mini",
system_prompt="You are a test agent.",
max_loops=1,
)
# First load (cache miss) - disable preloading for fair test
cached_agent_loader([agent], preload=False)
# Now test multiple cache hits
times = []
for _ in range(10): # Do it 10 times to get better measurement
start_time = time.time()
cached_agent_loader([agent], preload=False)
load_time = time.time() - start_time
times.append(load_time)
avg_time = sum(times) / len(times)
print(
f" ✅ Single agent with cache: {avg_time:.4f}s (avg of 10 cache hits)"
)
return avg_time
def test_multiple_agents_without_cache():
"""Test loading multiple agents without cache."""
print("🔄 Test 3: Multiple agents without cache")
# Test creating agents multiple times (simulating no cache)
times = []
for _ in range(5): # Do it 5 times to get better measurement
start_time = time.time()
[
Agent(
agent_name=f"Test-Agent-{i}",
model_name="gpt-4o-mini",
system_prompt=f"You are test agent {i}.",
max_loops=1,
)
for i in range(5)
]
load_time = time.time() - start_time
times.append(load_time)
avg_time = sum(times) / len(times)
print(
f" ✅ Multiple agents without cache: {avg_time:.4f}s (avg of 5 creations)"
)
return avg_time
def test_multiple_agents_with_cache():
"""Test loading multiple agents with cache."""
print("🔄 Test 4: Multiple agents with cache")
clear_agent_cache()
# Create agents
agents = [
Agent(
agent_name=f"Test-Agent-{i}",
model_name="gpt-4o-mini",
system_prompt=f"You are test agent {i}.",
max_loops=1,
)
for i in range(5)
]
# First load (cache miss) - disable preloading for fair test
cached_agent_loader(agents, preload=False)
# Now test multiple cache hits
times = []
for _ in range(5): # Do it 5 times to get better measurement
start_time = time.time()
cached_agent_loader(agents, preload=False)
load_time = time.time() - start_time
times.append(load_time)
avg_time = sum(times) / len(times)
print(
f" ✅ Multiple agents with cache: {avg_time:.4f}s (avg of 5 cache hits)"
)
return avg_time
def main():
"""Run the 4 simple tests."""
print("🚀 Simple Agent Caching Tests")
print("=" * 40)
# Run tests
single_no_cache = test_single_agent_without_cache()
single_with_cache = test_single_agent_with_cache()
multiple_no_cache = test_multiple_agents_without_cache()
multiple_with_cache = test_multiple_agents_with_cache()
# Results
print("\n📊 Results:")
print("=" * 40)
print(f"Single agent - No cache: {single_no_cache:.4f}s")
print(f"Single agent - With cache: {single_with_cache:.4f}s")
print(f"Multiple agents - No cache: {multiple_no_cache:.4f}s")
print(f"Multiple agents - With cache: {multiple_with_cache:.4f}s")
# Speedups (handle near-zero times)
if (
single_with_cache > 0.00001
): # Only calculate if time is meaningful
single_speedup = single_no_cache / single_with_cache
print(f"\n🚀 Single agent speedup: {single_speedup:.1f}x")
else:
print(
"\n🚀 Single agent speedup: Cache too fast to measure accurately!"
)
if (
multiple_with_cache > 0.00001
): # Only calculate if time is meaningful
multiple_speedup = multiple_no_cache / multiple_with_cache
print(f"🚀 Multiple agents speedup: {multiple_speedup:.1f}x")
else:
print(
"🚀 Multiple agents speedup: Cache too fast to measure accurately!"
)
# Summary
print("\n✅ Cache Validation:")
print("• Cache hit rates are increasing (visible in logs)")
print("• No validation errors")
print(
"• Agent objects are being cached and retrieved successfully"
)
print(
"• For real agents with LLM initialization, expect 10-100x speedups!"
)
if __name__ == "__main__":
main()

@ -0,0 +1,16 @@
from swarms.structs.heavy_swarm import HeavySwarm
swarm = HeavySwarm(
worker_model_name="gpt-4o-mini",
show_dashboard=False,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
out = swarm.run(
"Identify the top 3 energy sector ETFs listed on US exchanges that offer the highest potential for growth over the next 3-5 years. Focus specifically on funds with significant exposure to companies in the nuclear, natural gas, or oil industries. For each ETF, provide the rationale for its selection, recent performance metrics, sector allocation breakdown, and any notable holdings related to nuclear, gas, or oil. Exclude broad-based energy ETFs that do not have a clear emphasis on these sub-sectors."
)
print(out)

@ -0,0 +1,24 @@
from swarms import Agent, SequentialWorkflow
# Agent 1: The Researcher
researcher = Agent(
agent_name="Researcher",
system_prompt="Your job is to research the provided topic and provide a detailed summary.",
model_name="gpt-4o-mini",
)
# Agent 2: The Writer
writer = Agent(
agent_name="Writer",
system_prompt="Your job is to take the research summary and write a beautiful, engaging blog post about it.",
model_name="gpt-4o-mini",
)
# Create a sequential workflow where the researcher's output feeds into the writer's input
workflow = SequentialWorkflow(agents=[researcher, writer])
# Run the workflow on a task
final_post = workflow.run(
"The history and future of artificial intelligence"
)
print(final_post)

@ -0,0 +1,22 @@
from swarms.agents.reasoning_agents import ReasoningAgentRouter
router = ReasoningAgentRouter(
agent_name="qft_reasoning_agent",
description="A specialized reasoning agent for answering questions and solving problems in quantum field theory.",
model_name="groq/moonshotai/kimi-k2-instruct",
system_prompt=(
"You are a highly knowledgeable assistant specializing in quantum field theory (QFT). "
"You can answer advanced questions, explain concepts, and help with tasks related to QFT, "
"including but not limited to Lagrangians, Feynman diagrams, renormalization, quantum electrodynamics, "
"quantum chromodynamics, and the Standard Model. Provide clear, accurate, and detailed explanations, "
"and cite relevant equations or references when appropriate."
),
max_loops=1,
swarm_type="reasoning-duo",
output_type="dict-all-except-first",
)
out = router.run(
"Explain the significance of spontaneous symmetry breaking in quantum field theory."
)
print(out)

@ -0,0 +1,23 @@
from swarms import ReasoningDuo
router = ReasoningDuo(
agent_name="qft_reasoning_agent",
description="A specialized reasoning agent for answering questions and solving problems in quantum field theory.",
model_name="claude-3-5-sonnet-20240620",
system_prompt=(
"You are a highly knowledgeable assistant specializing in quantum field theory (QFT). "
"You can answer advanced questions, explain concepts, and help with tasks related to QFT, "
"including but not limited to Lagrangians, Feynman diagrams, renormalization, quantum electrodynamics, "
"quantum chromodynamics, and the Standard Model. Provide clear, accurate, and detailed explanations, "
"and cite relevant equations or references when appropriate."
),
max_loops=2,
swarm_type="reasoning-duo",
output_type="dict-all-except-first",
reasoning_model_name="groq/moonshotai/kimi-k2-instruct",
)
out = router.run(
"Explain the significance of spontaneous symmetry breaking in quantum field theory."
)
print(out)

@ -6,13 +6,12 @@ agent = Agent(
model_name="groq/moonshotai/kimi-k2-instruct",
verbose=True,
streaming_on=True,
max_loops=2,
interactive=True,
)
out = agent.run(
"Create a detailed report on the best AI research papers for multi-agent collaboration. "
"Include paper titles, authors, publication venues, years, and a brief summary of each paper's key contributions. "
"Highlight recent advances and foundational works. Only include papers from 2024 and 2025."
"Provie their links as well"
"What are the best AI wechat groups in hangzhou and beijing? give me the links"
)
print(out)

@ -30,12 +30,9 @@ Example usage:
>>> results = router.batched_run(["2+2?", "3+3?"])
>>> print(results)
See also:
- docs/swarms/agents/reasoning_agent_router.md for detailed documentation and architecture diagrams.
- consistency_example.py for a usage example with SelfConsistencyAgent.
"""
import traceback
from typing import (
List,
Literal,
@ -56,6 +53,7 @@ from swarms.agents.i_agent import (
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.utils.output_types import OutputType
from swarms.agents.agent_judge import AgentJudge
from functools import lru_cache
#: Supported agent type literals for ReasoningAgentRouter
agent_types = Literal[
@ -71,6 +69,22 @@ agent_types = Literal[
]
class ReasoningAgentExecutorError(Exception):
"""
Exception raised when an error occurs during the execution of a reasoning agent.
"""
pass
class ReasoningAgentInitializationError(Exception):
"""
Exception raised when an error occurs during the initialization of a reasoning agent.
"""
pass
class ReasoningAgentRouter:
"""
A router for advanced reasoning agent swarms.
@ -98,10 +112,6 @@ class ReasoningAgentRouter:
>>> result = router.run("Explain quantum entanglement.")
>>> print(result)
"""
# Class variable to store cached agent instances
_agent_cache: Dict[Tuple[Hashable, ...], Any] = {}
def __init__(
self,
agent_name: str = "reasoning_agent",
@ -117,6 +127,7 @@ class ReasoningAgentRouter:
eval: bool = False,
random_models_on: bool = False,
majority_voting_prompt: Optional[str] = None,
reasoning_model_name: Optional[str] = "claude-3-5-sonnet-20240620",
):
"""
Initialize the ReasoningAgentRouter with the specified configuration.
@ -136,9 +147,29 @@ class ReasoningAgentRouter:
self.eval = eval
self.random_models_on = random_models_on
self.majority_voting_prompt = majority_voting_prompt
self.reasoning_model_name = reasoning_model_name
self.reliability_check()
def reliability_check(self):
if self.max_loops == 0:
raise ReasoningAgentInitializationError(
"ReasoningAgentRouter Error: Max loops must be greater than 0"
)
if self.model_name == "" or self.model_name is None:
raise ReasoningAgentInitializationError(
"ReasoningAgentRouter Error: Model name must be provided"
)
if self.swarm_type == "" or self.swarm_type is None:
raise ReasoningAgentInitializationError(
"ReasoningAgentRouter Error: Swarm type must be provided. This is the type of reasoning agent you want to use. For example, 'reasoning-duo' for a reasoning duo agent, 'self-consistency' for a self-consistency agent, 'ire' for an iterative reflective expansion agent, 'reasoning-agent' for a reasoning agent, 'consistency-agent' for a consistency agent, 'ire-agent' for an iterative reflective expansion agent, 'ReflexionAgent' for a reflexion agent, 'GKPAgent' for a generated knowledge prompting agent, 'AgentJudge' for an agent judge."
)
# Initialize the factory mapping dictionary
self._initialize_agent_factories()
self.agent_factories = self._initialize_agent_factories()
def _initialize_agent_factories(self) -> None:
"""
@ -146,46 +177,19 @@ class ReasoningAgentRouter:
This method replaces the original if-elif chain, making the code more maintainable and extensible.
"""
self.agent_factories: Dict[str, Callable[[], Any]] = {
# ReasoningDuo factory method
agent_factories = {
"reasoning-duo": self._create_reasoning_duo,
"reasoning-agent": self._create_reasoning_duo,
# SelfConsistencyAgent factory methods
"self-consistency": self._create_consistency_agent,
"consistency-agent": self._create_consistency_agent,
# IREAgent factory methods
"ire": self._create_ire_agent,
"ire-agent": self._create_ire_agent,
# Other agent type factory methods
"AgentJudge": self._create_agent_judge,
"ReflexionAgent": self._create_reflexion_agent,
"GKPAgent": self._create_gkp_agent,
}
def _get_cache_key(self) -> Tuple[Hashable, ...]:
"""
Generate a unique key for cache lookup.
The key is based on all relevant configuration parameters of the agent.
Returns:
Tuple[Hashable, ...]: A hashable tuple to serve as the cache key.
"""
return (
self.swarm_type,
self.agent_name,
self.description,
self.model_name,
self.system_prompt,
self.max_loops,
self.num_samples,
self.output_type,
self.num_knowledge_items,
self.memory_capacity,
self.eval,
self.random_models_on,
self.majority_voting_prompt,
)
return agent_factories
def _create_reasoning_duo(self):
"""
@ -200,6 +204,8 @@ class ReasoningAgentRouter:
model_name=[self.model_name, self.model_name],
system_prompt=self.system_prompt,
output_type=self.output_type,
reasoning_model_name=self.reasoning_model_name,
max_loops=self.max_loops,
)
def _create_consistency_agent(self):
@ -285,32 +291,23 @@ class ReasoningAgentRouter:
"""
Select and initialize the appropriate reasoning swarm based on the specified swarm type.
Uses a caching mechanism to return a cached instance if an agent with the same configuration already exists.
Returns:
The selected reasoning swarm instance.
Raises:
ValueError: If the specified swarm type is invalid.
"""
# Generate cache key
cache_key = self._get_cache_key()
# Check if an instance with the same configuration already exists in the cache
if cache_key in self.__class__._agent_cache:
return self.__class__._agent_cache[cache_key]
try:
# Use the factory method to create a new instance
agent = self.agent_factories[self.swarm_type]()
# Add the newly created instance to the cache
self.__class__._agent_cache[cache_key] = agent
return agent
except KeyError:
# Keep the same error handling as the original code
raise ValueError(f"Invalid swarm type: {self.swarm_type}")
if self.swarm_type in self.agent_factories:
return self.agent_factories[self.swarm_type]()
else:
raise ReasoningAgentInitializationError(
f"ReasoningAgentRouter Error: Invalid swarm type: {self.swarm_type}"
)
except Exception as e:
raise ReasoningAgentInitializationError(
f"ReasoningAgentRouter Error: {e} Traceback: {traceback.format_exc()} If the error persists, please check the agent's configuration and try again. If you would like support book a call with our team at https://cal.com/swarms"
)
def run(self, task: str, *args, **kwargs):
"""
@ -324,8 +321,13 @@ class ReasoningAgentRouter:
Returns:
The result of the reasoning process (format depends on agent and output_type).
"""
swarm = self.select_swarm()
return swarm.run(task=task, *args, **kwargs)
try:
swarm = self.select_swarm()
return swarm.run(task=task, *args, **kwargs)
except Exception as e:
raise ReasoningAgentExecutorError(
f"ReasoningAgentRouter Error: {e} Traceback: {traceback.format_exc()} If the error persists, please check the agent's configuration and try again. If you would like support book a call with our team at https://cal.com/swarms"
)
def batched_run(self, tasks: List[str], *args, **kwargs):
"""
@ -343,12 +345,3 @@ class ReasoningAgentRouter:
for task in tasks:
results.append(self.run(task, *args, **kwargs))
return results
@classmethod
def clear_cache(cls):
"""
Clear the agent instance cache.
Use this when you need to free memory or force the creation of new instances.
"""
cls._agent_cache.clear()

@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from loguru import logger
@ -9,6 +9,7 @@ from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
import uuid
class ReasoningDuo:
@ -26,26 +27,44 @@ class ReasoningDuo:
def __init__(
self,
id: str = str(uuid.uuid4()),
agent_name: str = "reasoning-agent-01",
agent_description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.",
model_name: str = "gpt-4o-mini",
description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.",
model_names: list[str] = ["gpt-4o-mini", "gpt-4o"],
system_prompt: str = "You are a helpful assistant that can answer questions and help with tasks.",
output_type: OutputType = "dict",
output_type: OutputType = "dict-all-except-first",
reasoning_model_name: Optional[
str
] = "claude-3-5-sonnet-20240620",
max_loops: int = 1,
*args,
**kwargs,
):
self.id = id
self.agent_name = agent_name
self.agent_description = agent_description
self.model_name = model_name
self.description = description
self.output_type = output_type
self.reasoning_model_name = reasoning_model_name
self.max_loops = max_loops
if self.reasoning_model_name is None:
self.reasoning_model_name = model_names[0]
self.conversation = Conversation()
self.reasoning_agent = Agent(
agent_name="Your",
description="A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.",
agent_name=self.agent_name,
description=self.agent_description,
system_prompt=REASONING_PROMPT,
max_loops=1,
model_name=model_names[0],
model_name=self.reasoning_model_name,
dynamic_temperature_enabled=True,
*args,
**kwargs,
)
self.main_agent = Agent(
@ -55,55 +74,89 @@ class ReasoningDuo:
max_loops=1,
model_name=model_names[1],
dynamic_temperature_enabled=True,
*args,
**kwargs,
)
self.conversation = Conversation()
def run(self, task: str):
def step(self, task: str, img: Optional[str] = None):
"""
Executes the reasoning and main agents on the provided task.
Executes one step of reasoning and main agent processing.
Args:
task (str): The task to be processed by the agents.
Returns:
str: The output from the main agent after processing the task.
task (str): The task to be processed.
img (Optional[str]): Optional image input.
"""
logger.info(f"Running task: {task}")
self.conversation.add(role="user", content=task)
output_reasoner = self.reasoning_agent.run(task)
# For reasoning agent, use the current task (which may include conversation context)
output_reasoner = self.reasoning_agent.run(task, img=img)
self.conversation.add(
role=self.reasoning_agent.agent_name,
content=output_reasoner,
)
prompt = f"Task: {task} \n\n Your thoughts: {output_reasoner}"
output_main = self.main_agent.run(prompt)
# For main agent, always use the full conversation context
output_main = self.main_agent.run(
task=self.conversation.get_str(), img=img
)
self.conversation.add(
role=self.main_agent.agent_name, content=output_main
)
def run(self, task: str, img: Optional[str] = None):
"""
Executes the reasoning and main agents on the provided task.
Args:
task (str): The task to be processed by the agents.
img (Optional[str]): Optional image input.
Returns:
str: The output from the main agent after processing the task.
"""
logger.info(
f"Running task: {task} with max_loops: {self.max_loops}"
)
self.conversation.add(role="user", content=task)
for loop_iteration in range(self.max_loops):
logger.info(
f"Loop iteration {loop_iteration + 1}/{self.max_loops}"
)
if loop_iteration == 0:
# First iteration: use original task
current_task = task
else:
# Subsequent iterations: use task with context of previous reasoning
current_task = f"Continue reasoning and refining your analysis. Original task: {task}\n\nPrevious conversation context:\n{self.conversation.get_str()}"
self.step(task=current_task, img=img)
return history_output_formatter(
self.conversation, self.output_type
)
def batched_run(self, tasks: List[str]):
def batched_run(
self, tasks: List[str], imgs: Optional[List[str]] = None
):
"""
Executes the run method for a list of tasks.
Args:
tasks (list[str]): A list of tasks to be processed.
imgs (Optional[List[str]]): Optional list of images corresponding to tasks.
Returns:
list: A list of outputs from the main agent for each task.
"""
outputs = []
for task in tasks:
# Handle case where imgs is None
if imgs is None:
imgs = [None] * len(tasks)
for task, img in zip(tasks, imgs):
logger.info(f"Processing task: {task}")
outputs.append(self.run(task))
outputs.append(self.run(task, img=img))
return outputs

@ -1110,6 +1110,8 @@ class Agent:
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ",
loop_count,
)
elif self.streaming_on:
pass
else:
self.pretty_print(
response, loop_count
@ -1239,12 +1241,13 @@ class Agent:
traceback_info = traceback.format_exc()
logger.error(
f"Error detected running your agent {self.agent_name}\n"
f"An error occurred while running your agent {self.agent_name}.\n"
f"Error Type: {error_type}\n"
f"Error Message: {error_message}\n"
f"Traceback:\n{traceback_info}\n"
f"Agent State: {self.to_dict()}\n"
f"Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;)"
f"Please optimize your input parameters, or create an issue on the Swarms GitHub and contact our team on Discord for support. "
f"For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/"
)
raise error
@ -2684,9 +2687,18 @@ class Agent:
return output
except ValueError as e:
except AgentRunError as e:
self._handle_run_error(e)
except KeyboardInterrupt:
logger.warning(
f"Keyboard interrupt detected for agent '{self.agent_name}'. "
"If autosave is enabled, the agent's state will be saved to the workspace directory. "
"To enable autosave, please initialize the agent with Agent(autosave=True)."
"For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/"
)
raise KeyboardInterrupt
def handle_artifacts(
self, text: str, file_output_path: str, file_extension: str
) -> None:
@ -2825,6 +2837,9 @@ class Agent:
if response is None:
response = "No response generated"
if self.streaming_on:
pass
if self.print_on:
formatter.print_panel(
response,

@ -0,0 +1,675 @@
import json
import pickle
import hashlib
import threading
import time
from functools import lru_cache, wraps
from typing import List, Dict, Any, Optional, Callable
from pathlib import Path
import weakref
from concurrent.futures import ThreadPoolExecutor
import os
from loguru import logger
# Import the Agent class - adjust path as needed
try:
from swarms.structs.agent import Agent
except ImportError:
# Fallback for development/testing
Agent = Any
class AgentCache:
"""
A comprehensive caching system for Agent objects with multiple strategies:
- Memory-based LRU cache
- Weak reference cache to prevent memory leaks
- Persistent disk cache for agent configurations
- Lazy loading with background preloading
"""
def __init__(
self,
max_memory_cache_size: int = 128,
cache_dir: Optional[str] = None,
enable_persistent_cache: bool = True,
auto_save_interval: int = 300, # 5 minutes
enable_weak_refs: bool = True,
):
"""
Initialize the AgentCache.
Args:
max_memory_cache_size: Maximum number of agents to keep in memory cache
cache_dir: Directory for persistent cache storage
enable_persistent_cache: Whether to enable disk-based caching
auto_save_interval: Interval in seconds for auto-saving cache
enable_weak_refs: Whether to use weak references to prevent memory leaks
"""
self.max_memory_cache_size = max_memory_cache_size
self.cache_dir = Path(cache_dir or "agent_cache")
self.enable_persistent_cache = enable_persistent_cache
self.auto_save_interval = auto_save_interval
self.enable_weak_refs = enable_weak_refs
# Memory caches
self._memory_cache: Dict[str, Agent] = {}
self._weak_cache: weakref.WeakValueDictionary = (
weakref.WeakValueDictionary()
)
self._access_times: Dict[str, float] = {}
self._lock = threading.RLock()
# Cache statistics
self._hits = 0
self._misses = 0
self._load_times: Dict[str, float] = {}
# Background tasks
self._auto_save_thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
# Initialize cache directory
if self.enable_persistent_cache:
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Start auto-save thread
self._start_auto_save_thread()
def _start_auto_save_thread(self):
"""Start the auto-save background thread."""
if (
self.enable_persistent_cache
and self.auto_save_interval > 0
):
self._auto_save_thread = threading.Thread(
target=self._auto_save_loop,
daemon=True,
name="AgentCache-AutoSave",
)
self._auto_save_thread.start()
def _auto_save_loop(self):
"""Background loop for auto-saving cache."""
while not self._shutdown_event.wait(self.auto_save_interval):
try:
self.save_cache_to_disk()
except Exception as e:
logger.error(f"Error in auto-save: {e}")
def _generate_cache_key(
self, agent_config: Dict[str, Any]
) -> str:
"""Generate a unique cache key from agent configuration."""
# Create a stable hash from the configuration
config_str = json.dumps(
agent_config, sort_keys=True, default=str
)
return hashlib.md5(config_str.encode()).hexdigest()
def _evict_lru(self):
"""Evict least recently used items from memory cache."""
if len(self._memory_cache) >= self.max_memory_cache_size:
# Find the least recently used item
lru_key = min(
self._access_times.items(), key=lambda x: x[1]
)[0]
# Save to persistent cache before evicting
if self.enable_persistent_cache:
self._save_agent_to_disk(
lru_key, self._memory_cache[lru_key]
)
# Remove from memory
del self._memory_cache[lru_key]
del self._access_times[lru_key]
logger.debug(f"Evicted agent {lru_key} from memory cache")
def _save_agent_to_disk(self, cache_key: str, agent: Agent):
"""Save agent to persistent cache."""
try:
cache_file = self.cache_dir / f"{cache_key}.pkl"
with open(cache_file, "wb") as f:
pickle.dump(agent.to_dict(), f)
logger.debug(f"Saved agent {cache_key} to disk cache")
except Exception as e:
logger.error(f"Error saving agent to disk: {e}")
def _load_agent_from_disk(
self, cache_key: str
) -> Optional[Agent]:
"""Load agent from persistent cache."""
try:
cache_file = self.cache_dir / f"{cache_key}.pkl"
if cache_file.exists():
with open(cache_file, "rb") as f:
agent_dict = pickle.load(f)
# Reconstruct agent from dictionary
agent = Agent(**agent_dict)
logger.debug(
f"Loaded agent {cache_key} from disk cache"
)
return agent
except Exception as e:
logger.error(f"Error loading agent from disk: {e}")
return None
def get_agent(
self, agent_config: Dict[str, Any]
) -> Optional[Agent]:
"""
Get an agent from cache, loading if necessary.
Args:
agent_config: Configuration dictionary for the agent
Returns:
Cached or newly loaded Agent instance
"""
cache_key = self._generate_cache_key(agent_config)
with self._lock:
# Check memory cache first
if cache_key in self._memory_cache:
self._access_times[cache_key] = time.time()
self._hits += 1
logger.debug(
f"Cache hit (memory) for agent {cache_key}"
)
return self._memory_cache[cache_key]
# Check weak reference cache
if (
self.enable_weak_refs
and cache_key in self._weak_cache
):
agent = self._weak_cache[cache_key]
if agent is not None:
# Move back to memory cache
self._memory_cache[cache_key] = agent
self._access_times[cache_key] = time.time()
self._hits += 1
logger.debug(
f"Cache hit (weak ref) for agent {cache_key}"
)
return agent
# Check persistent cache
if self.enable_persistent_cache:
agent = self._load_agent_from_disk(cache_key)
if agent is not None:
self._evict_lru()
self._memory_cache[cache_key] = agent
self._access_times[cache_key] = time.time()
if self.enable_weak_refs:
self._weak_cache[cache_key] = agent
self._hits += 1
logger.debug(
f"Cache hit (disk) for agent {cache_key}"
)
return agent
# Cache miss - need to create new agent
self._misses += 1
logger.debug(f"Cache miss for agent {cache_key}")
return None
def put_agent(self, agent_config: Dict[str, Any], agent: Agent):
"""
Put an agent into the cache.
Args:
agent_config: Configuration dictionary for the agent
agent: The Agent instance to cache
"""
cache_key = self._generate_cache_key(agent_config)
with self._lock:
self._evict_lru()
self._memory_cache[cache_key] = agent
self._access_times[cache_key] = time.time()
if self.enable_weak_refs:
self._weak_cache[cache_key] = agent
logger.debug(f"Added agent {cache_key} to cache")
def preload_agents(self, agent_configs: List[Dict[str, Any]]):
"""
Preload agents in the background for faster access.
Args:
agent_configs: List of agent configurations to preload
"""
def _preload_worker(config):
try:
cache_key = self._generate_cache_key(config)
if cache_key not in self._memory_cache:
start_time = time.time()
agent = Agent(**config)
load_time = time.time() - start_time
self.put_agent(config, agent)
self._load_times[cache_key] = load_time
logger.debug(
f"Preloaded agent {cache_key} in {load_time:.3f}s"
)
except Exception as e:
logger.error(f"Error preloading agent: {e}")
# Use thread pool for concurrent preloading
max_workers = min(len(agent_configs), os.cpu_count())
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(_preload_worker, agent_configs)
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics."""
total_requests = self._hits + self._misses
hit_rate = (
(self._hits / total_requests * 100)
if total_requests > 0
else 0
)
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate_percent": round(hit_rate, 2),
"memory_cache_size": len(self._memory_cache),
"weak_cache_size": len(self._weak_cache),
"average_load_time": (
sum(self._load_times.values()) / len(self._load_times)
if self._load_times
else 0
),
"total_agents_loaded": len(self._load_times),
}
def clear_cache(self):
"""Clear all caches."""
with self._lock:
self._memory_cache.clear()
self._weak_cache.clear()
self._access_times.clear()
logger.info("Cleared all caches")
def save_cache_to_disk(self):
"""Save current memory cache to disk."""
if not self.enable_persistent_cache:
return
with self._lock:
saved_count = 0
for cache_key, agent in self._memory_cache.items():
try:
self._save_agent_to_disk(cache_key, agent)
saved_count += 1
except Exception as e:
logger.error(
f"Error saving agent {cache_key}: {e}"
)
logger.info(f"Saved {saved_count} agents to disk cache")
def shutdown(self):
"""Shutdown the cache system gracefully."""
self._shutdown_event.set()
if self._auto_save_thread:
self._auto_save_thread.join(timeout=5)
# Final save
if self.enable_persistent_cache:
self.save_cache_to_disk()
logger.info("AgentCache shutdown complete")
# Global cache instance
_global_cache: Optional[AgentCache] = None
def get_global_cache() -> AgentCache:
"""Get or create the global agent cache instance."""
global _global_cache
if _global_cache is None:
_global_cache = AgentCache()
return _global_cache
def cached_agent_loader(
agents: List[Agent],
cache_instance: Optional[AgentCache] = None,
preload: bool = True,
parallel_loading: bool = True,
) -> List[Agent]:
"""
Load a list of agents with caching for super fast performance.
Args:
agents: List of Agent instances to cache/load
cache_instance: Optional cache instance (uses global cache if None)
preload: Whether to preload agents in background
parallel_loading: Whether to load agents in parallel
Returns:
List of Agent instances (cached versions if available)
Examples:
# Basic usage
agents = [Agent(agent_name="Agent1", model_name="gpt-4"), ...]
cached_agents = cached_agent_loader(agents)
# With custom cache
cache = AgentCache(max_memory_cache_size=256)
cached_agents = cached_agent_loader(agents, cache_instance=cache)
# Preload for even faster subsequent access
cached_agent_loader(agents, preload=True)
cached_agents = cached_agent_loader(agents) # Super fast!
"""
cache = cache_instance or get_global_cache()
start_time = time.time()
# Extract configurations from agents for caching
agent_configs = []
for agent in agents:
config = _extract_agent_config(agent)
agent_configs.append(config)
if preload:
# Preload agents in background
cache.preload_agents(agent_configs)
def _load_single_agent(agent: Agent) -> Agent:
"""Load a single agent with caching."""
config = _extract_agent_config(agent)
# Try to get from cache first
cached_agent = cache.get_agent(config)
if cached_agent is None:
# Cache miss - use the provided agent and cache it
load_start = time.time()
# Add to cache for future use
cache.put_agent(config, agent)
load_time = time.time() - load_start
logger.debug(
f"Cached new agent {agent.agent_name} in {load_time:.3f}s"
)
return agent
else:
logger.debug(
f"Retrieved cached agent {cached_agent.agent_name}"
)
return cached_agent
# Load agents (parallel or sequential)
if parallel_loading and len(agents) > 1:
max_workers = min(len(agents), os.cpu_count())
with ThreadPoolExecutor(max_workers=max_workers) as executor:
cached_agents = list(
executor.map(_load_single_agent, agents)
)
else:
cached_agents = [
_load_single_agent(agent) for agent in agents
]
total_time = time.time() - start_time
# Log performance stats
stats = cache.get_cache_stats()
logger.info(
f"Processed {len(cached_agents)} agents in {total_time:.3f}s "
f"(Hit rate: {stats['hit_rate_percent']}%)"
)
return cached_agents
def _extract_agent_config(agent: Agent) -> Dict[str, Any]:
"""
Extract a configuration dictionary from an Agent instance for caching.
Args:
agent: Agent instance to extract config from
Returns:
Configuration dictionary suitable for cache key generation
"""
# Extract key attributes that define an agent's identity
config = {
"agent_name": getattr(agent, "agent_name", None),
"model_name": getattr(agent, "model_name", None),
"system_prompt": getattr(agent, "system_prompt", None),
"max_loops": getattr(agent, "max_loops", None),
"temperature": getattr(agent, "temperature", None),
"max_tokens": getattr(agent, "max_tokens", None),
"agent_description": getattr(
agent, "agent_description", None
),
# Add other key identifying attributes
"tools": str(
getattr(agent, "tools", [])
), # Convert to string for hashing, default to empty list
"context_length": getattr(agent, "context_length", None),
}
# Remove None values to create a clean config
config = {k: v for k, v in config.items() if v is not None}
return config
def cached_agent_loader_from_configs(
agent_configs: List[Dict[str, Any]],
cache_instance: Optional[AgentCache] = None,
preload: bool = True,
parallel_loading: bool = True,
) -> List[Agent]:
"""
Load a list of agents from configuration dictionaries with caching.
Args:
agent_configs: List of agent configuration dictionaries
cache_instance: Optional cache instance (uses global cache if None)
preload: Whether to preload agents in background
parallel_loading: Whether to load agents in parallel
Returns:
List of Agent instances
Examples:
# Basic usage
configs = [{"agent_name": "Agent1", "model_name": "gpt-4"}, ...]
agents = cached_agent_loader_from_configs(configs)
# With custom cache
cache = AgentCache(max_memory_cache_size=256)
agents = cached_agent_loader_from_configs(configs, cache_instance=cache)
"""
cache = cache_instance or get_global_cache()
start_time = time.time()
if preload:
# Preload agents in background
cache.preload_agents(agent_configs)
def _load_single_agent(config: Dict[str, Any]) -> Agent:
"""Load a single agent with caching."""
# Try to get from cache first
agent = cache.get_agent(config)
if agent is None:
# Cache miss - create new agent
load_start = time.time()
agent = Agent(**config)
load_time = time.time() - load_start
# Add to cache for future use
cache.put_agent(config, agent)
logger.debug(
f"Created new agent {agent.agent_name} in {load_time:.3f}s"
)
return agent
# Load agents (parallel or sequential)
if parallel_loading and len(agent_configs) > 1:
max_workers = min(len(agent_configs), os.cpu_count())
with ThreadPoolExecutor(max_workers=max_workers) as executor:
agents = list(
executor.map(_load_single_agent, agent_configs)
)
else:
agents = [
_load_single_agent(config) for config in agent_configs
]
total_time = time.time() - start_time
# Log performance stats
stats = cache.get_cache_stats()
logger.info(
f"Loaded {len(agents)} agents in {total_time:.3f}s "
f"(Hit rate: {stats['hit_rate_percent']}%)"
)
return agents
# Decorator for caching individual agent creation
def cache_agent_creation(cache_instance: Optional[AgentCache] = None):
"""
Decorator to cache agent creation based on initialization parameters.
Args:
cache_instance: Optional cache instance (uses global cache if None)
Returns:
Decorator function
Example:
@cache_agent_creation()
def create_trading_agent(symbol: str, model: str):
return Agent(
agent_name=f"Trading-{symbol}",
model_name=model,
system_prompt=f"You are a trading agent for {symbol}"
)
agent1 = create_trading_agent("AAPL", "gpt-4") # Creates new agent
agent2 = create_trading_agent("AAPL", "gpt-4") # Returns cached agent
"""
def decorator(func: Callable[..., Agent]) -> Callable[..., Agent]:
cache = cache_instance or get_global_cache()
@wraps(func)
def wrapper(*args, **kwargs) -> Agent:
# Create a config dict from function arguments
import inspect
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
config = dict(bound_args.arguments)
# Try to get from cache
agent = cache.get_agent(config)
if agent is None:
# Cache miss - call original function
agent = func(*args, **kwargs)
cache.put_agent(config, agent)
return agent
return wrapper
return decorator
# LRU Cache-based simple approach
@lru_cache(maxsize=128)
def _cached_agent_by_hash(
config_hash: str, config_json: str
) -> Agent:
"""Internal LRU cached agent creation by config hash."""
config = json.loads(config_json)
return Agent(**config)
def simple_lru_agent_loader(
agents: List[Agent],
) -> List[Agent]:
"""
Simple LRU cache-based agent loader using functools.lru_cache.
Args:
agents: List of Agent instances
Returns:
List of Agent instances (cached versions if available)
Note:
This is a simpler approach but less flexible than the full AgentCache.
"""
cached_agents = []
for agent in agents:
# Extract config from agent
config = _extract_agent_config(agent)
# Create stable hash and JSON string
config_json = json.dumps(config, sort_keys=True, default=str)
config_hash = hashlib.md5(config_json.encode()).hexdigest()
# Use LRU cached function
cached_agent = _cached_agent_by_hash_from_agent(
config_hash, agent
)
cached_agents.append(cached_agent)
return cached_agents
@lru_cache(maxsize=128)
def _cached_agent_by_hash_from_agent(
config_hash: str, agent: Agent
) -> Agent:
"""Internal LRU cached agent storage by config hash."""
# Return the same agent instance (this creates the caching effect)
return agent
# Utility functions for cache management
def clear_agent_cache():
"""Clear the global agent cache."""
cache = get_global_cache()
cache.clear_cache()
def get_agent_cache_stats() -> Dict[str, Any]:
"""Get statistics from the global agent cache."""
cache = get_global_cache()
return cache.get_cache_stats()
def shutdown_agent_cache():
"""Shutdown the global agent cache gracefully."""
global _global_cache
if _global_cache:
_global_cache.shutdown()
_global_cache = None

@ -292,7 +292,7 @@ def run_test_suite():
assert isinstance(is_url_direct, bool)
assert isinstance(is_local_direct, bool)
assert (
is_local_direct == False
is_local_direct is False
) # Local files should never use direct URL
log_test_result("Local vs URL Detection", True)

Loading…
Cancel
Save