From ad3ce5390c6db414622686dabff617c3fb0fa3a5 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 17 Jul 2025 23:52:37 -0700 Subject: [PATCH] update reasoning agent router docs --- README.md | 38 +- docs/swarms/agents/reasoning_agent_router.md | 79 +- .../example_multi_agent_caching.py | 344 +++++++++ .../quick_start_agent_caching.py | 128 ++++ .../test_simple_agent_caching.py | 186 +++++ .../apple_board_election_example.py | 0 .../election_example.py | 0 .../heavy_swarm_example.py | 0 .../heavy_swarm_no_dashboard.py | 16 + .../sequential_wofkflow.py | 24 + reasoning_agent_router.py | 22 + reasoning_duo_test.py | 23 + simple_agent.py | 7 +- swarms/agents/reasoning_agents.py | 129 ++-- swarms/agents/reasoning_duo.py | 101 ++- swarms/structs/agent.py | 21 +- swarms/utils/agent_cache.py | 675 ++++++++++++++++++ tests/utils/test_litellm_wrapper.py | 2 +- 18 files changed, 1675 insertions(+), 120 deletions(-) create mode 100644 examples/multi_agent/caching_examples/example_multi_agent_caching.py create mode 100644 examples/multi_agent/caching_examples/quick_start_agent_caching.py create mode 100644 examples/multi_agent/caching_examples/test_simple_agent_caching.py rename {election_swarm_examples => examples/multi_agent/election_swarm_examples}/apple_board_election_example.py (100%) rename {election_swarm_examples => examples/multi_agent/election_swarm_examples}/election_example.py (100%) rename heavy_swarm_example.py => examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py (100%) create mode 100644 examples/multi_agent/heavy_swarm_examples/heavy_swarm_no_dashboard.py create mode 100644 examples/multi_agent/sequential_workflow/sequential_wofkflow.py create mode 100644 reasoning_agent_router.py create mode 100644 reasoning_duo_test.py create mode 100644 swarms/utils/agent_cache.py diff --git a/README.md b/README.md index ed847cfd..b5591b7a 100644 --- a/README.md +++ b/README.md @@ -237,20 +237,26 @@ A `SequentialWorkflow` executes tasks in a strict order, forming a pipeline wher ```python from swarms import Agent, SequentialWorkflow -# Initialize agents for a 3-step process -# 1. Generate an idea -idea_generator = Agent(agent_name="IdeaGenerator", system_prompt="Generate a unique startup idea.", model_name="gpt-4o-mini") -# 2. Validate the idea -validator = Agent(agent_name="Validator", system_prompt="Take this startup idea and analyze its market viability.", model_name="gpt-4o-mini") -# 3. Create a pitch -pitch_creator = Agent(agent_name="PitchCreator", system_prompt="Write a 3-sentence elevator pitch for this validated startup idea.", model_name="gpt-4o-mini") - -# Create the sequential workflow -workflow = SequentialWorkflow(agents=[idea_generator, validator, pitch_creator]) - -# Run the workflow -elevator_pitch = workflow.run() -print(elevator_pitch) +# Agent 1: The Researcher +researcher = Agent( + agent_name="Researcher", + system_prompt="Your job is to research the provided topic and provide a detailed summary.", + model_name="gpt-4o-mini", +) + +# Agent 2: The Writer +writer = Agent( + agent_name="Writer", + system_prompt="Your job is to take the research summary and write a beautiful, engaging blog post about it.", + model_name="gpt-4o-mini", +) + +# Create a sequential workflow where the researcher's output feeds into the writer's input +workflow = SequentialWorkflow(agents=[researcher, writer]) + +# Run the workflow on a task +final_post = workflow.run("The history and future of artificial intelligence") +print(final_post) ``` ----- @@ -313,9 +319,7 @@ rearrange_system = AgentRearrange( flow=flow, ) -# Run the system -# The researcher will generate content, and then both the writer and editor -# will process that content in parallel. +# Run the swarm outputs = rearrange_system.run("Analyze the impact of AI on modern cinema.") print(outputs) ``` diff --git a/docs/swarms/agents/reasoning_agent_router.md b/docs/swarms/agents/reasoning_agent_router.md index 969d323f..d543c0ce 100644 --- a/docs/swarms/agents/reasoning_agent_router.md +++ b/docs/swarms/agents/reasoning_agent_router.md @@ -44,6 +44,7 @@ graph TD | `eval` | bool | False | Enable evaluation mode for self-consistency | | `random_models_on` | bool | False | Enable random model selection for diversity | | `majority_voting_prompt` | Optional[str] | None | Custom prompt for majority voting | + | `reasoning_model_name` | Optional[str] | "claude-3-5-sonnet-20240620" | Model to use for reasoning in ReasoningDuo | ### Available Agent Types @@ -74,12 +75,15 @@ graph TD **Required Parameters** - - model_name (list of 2) + - model_name - system_prompt **Optional Parameters** - output_type + - reasoning_model_name (default: "claude-3-5-sonnet-20240620") + - max_loops + - img (for image input support) === "Self Consistency" **Key Features** @@ -210,8 +214,39 @@ graph TD | Method | Description | |--------|-------------| | `select_swarm()` | Selects and initializes the appropriate reasoning swarm based on specified type | - | `run(task: str)` | Executes the selected swarm's reasoning process on the given task | - | `batched_run(tasks: List[str])` | Executes the reasoning process on a batch of tasks | + | `run(task: str, img: Optional[str] = None, **kwargs)` | Executes the selected swarm's reasoning process on the given task | + | `batched_run(tasks: List[str], imgs: Optional[List[str]] = None, **kwargs)` | Executes the reasoning process on a batch of tasks | + +### Image Support + +!!! info "Multi-modal Capabilities" + The ReasoningAgentRouter supports image inputs for compatible agent types: + + **Supported Parameters:** + + - `img` (str, optional): Path or URL to a single image file for single task execution + - `imgs` (List[str], optional): List of image paths/URLs for batch task execution + + **Compatible Agent Types:** + + - `reasoning-duo` / `reasoning-agent`: Full image support for both reasoning and execution phases + - Other agent types may have varying levels of image support depending on their underlying implementation + + **Usage Example:** + ```python + # Single image with task + router = ReasoningAgentRouter(swarm_type="reasoning-duo") + result = router.run( + task="Describe what you see in this image", + img="path/to/image.jpg" + ) + + # Batch processing with images + results = router.batched_run( + tasks=["Analyze this chart", "Describe this photo"], + imgs=["chart.png", "photo.jpg"] + ) + ``` ### Code Examples @@ -235,6 +270,12 @@ graph TD # Run a single task result = router.run("What is the best approach to solve this problem?") + + # Run with image input + result_with_image = router.run( + "Analyze this image and provide insights", + img="path/to/image.jpg" + ) ``` === "Self-Consistency Examples" @@ -282,6 +323,29 @@ graph TD ) ``` +=== "ReasoningDuo Examples" + ```python + # Basic ReasoningDuo + router = ReasoningAgentRouter( + swarm_type="reasoning-duo", + model_name="gpt-4o-mini", + reasoning_model_name="claude-3-5-sonnet-20240620" + ) + + # ReasoningDuo with image support + router = ReasoningAgentRouter( + swarm_type="reasoning-duo", + model_name="gpt-4o-mini", + reasoning_model_name="gpt-4-vision-preview", + max_loops=2 + ) + + result = router.run( + "Analyze this image and explain the patterns you see", + img="data_visualization.png" + ) + ``` + === "AgentJudge" ```python router = ReasoningAgentRouter( @@ -328,6 +392,15 @@ graph TD - Consider random_models_on for diverse model perspectives + 5. **Multi-modal and Reasoning Configuration** + - Use vision-capable models when processing images (e.g., "gpt-4-vision-preview") + + - For ReasoningDuo, set different models for reasoning vs execution via reasoning_model_name + + - Ensure image paths are accessible and in supported formats (JPG, PNG, etc.) + + - Consider using reasoning_model_name with specialized reasoning models for complex tasks + ## Limitations !!! warning "Known Limitations" diff --git a/examples/multi_agent/caching_examples/example_multi_agent_caching.py b/examples/multi_agent/caching_examples/example_multi_agent_caching.py new file mode 100644 index 00000000..4eefee60 --- /dev/null +++ b/examples/multi_agent/caching_examples/example_multi_agent_caching.py @@ -0,0 +1,344 @@ +""" +Multi-Agent Caching Example - Super Fast Agent Loading + +This example demonstrates how to use the agent caching system with multiple agents +to achieve 10-100x speedup in agent loading and reuse. +""" + +import time +from swarms import Agent +from swarms.utils.agent_cache import ( + cached_agent_loader, + simple_lru_agent_loader, + AgentCache, + get_agent_cache_stats, + clear_agent_cache, +) + + +def create_trading_team(): + """Create a team of trading agents.""" + + # Create multiple agents for different trading strategies + agents = [ + Agent( + agent_name="Quantitative-Trading-Agent", + agent_description="Advanced quantitative trading and algorithmic analysis agent", + system_prompt="""You are an expert quantitative trading agent with deep expertise in: + - Algorithmic trading strategies and implementation + - Statistical arbitrage and market making + - Risk management and portfolio optimization + - High-frequency trading systems + - Market microstructure analysis""", + max_loops=1, + model_name="gpt-4o-mini", + temperature=0.1, + ), + Agent( + agent_name="Risk-Management-Agent", + agent_description="Portfolio risk assessment and management specialist", + system_prompt="""You are a risk management specialist focused on: + - Portfolio risk assessment and stress testing + - Value at Risk (VaR) calculations + - Regulatory compliance monitoring + - Risk mitigation strategies + - Capital allocation optimization""", + max_loops=1, + model_name="gpt-4o-mini", + temperature=0.2, + ), + Agent( + agent_name="Market-Analysis-Agent", + agent_description="Real-time market analysis and trend identification", + system_prompt="""You are a market analysis expert specializing in: + - Technical analysis and chart patterns + - Market sentiment analysis + - Economic indicator interpretation + - Trend identification and momentum analysis + - Support and resistance level identification""", + max_loops=1, + model_name="gpt-4o-mini", + temperature=0.3, + ), + Agent( + agent_name="Options-Trading-Agent", + agent_description="Options strategies and derivatives trading specialist", + system_prompt="""You are an options trading specialist with expertise in: + - Options pricing models and Greeks analysis + - Volatility trading strategies + - Complex options spreads and combinations + - Risk-neutral portfolio construction + - Derivatives market making""", + max_loops=1, + model_name="gpt-4o-mini", + temperature=0.15, + ), + Agent( + agent_name="ESG-Investment-Agent", + agent_description="ESG-focused investment analysis and screening", + system_prompt="""You are an ESG investment specialist focusing on: + - Environmental, Social, and Governance criteria evaluation + - Sustainable investment screening + - Impact investing strategies + - ESG risk assessment + - Green finance and climate risk analysis""", + max_loops=1, + model_name="gpt-4o-mini", + temperature=0.25, + ), + ] + + return agents + + +def basic_caching_example(): + """Basic example of caching multiple agents.""" + print("=== Basic Multi-Agent Caching Example ===") + + # Create our trading team + trading_team = create_trading_team() + print(f"Created {len(trading_team)} trading agents") + + # First load - agents will be processed and cached + print("\nšŸ”„ First load (will cache agents)...") + start_time = time.time() + cached_team_1 = cached_agent_loader(trading_team) + first_load_time = time.time() - start_time + + print( + f"āœ… First load: {len(cached_team_1)} agents in {first_load_time:.3f}s" + ) + + # Second load - agents will be retrieved from cache (super fast!) + print("\n⚔ Second load (from cache)...") + start_time = time.time() + cached_team_2 = cached_agent_loader(trading_team) + second_load_time = time.time() - start_time + + print( + f"šŸš€ Second load: {len(cached_team_2)} agents in {second_load_time:.3f}s" + ) + print( + f"šŸ’Ø Speedup: {first_load_time/second_load_time:.1f}x faster!" + ) + + # Show cache statistics + stats = get_agent_cache_stats() + print(f"šŸ“Š Cache stats: {stats}") + + return cached_team_1 + + +def custom_cache_example(): + """Example using a custom cache for specific use cases.""" + print("\n=== Custom Cache Example ===") + + # Create a custom cache with specific settings + custom_cache = AgentCache( + max_memory_cache_size=50, # Cache up to 50 agents + cache_dir="trading_team_cache", # Custom cache directory + enable_persistent_cache=True, # Enable disk persistence + auto_save_interval=120, # Auto-save every 2 minutes + ) + + # Create agents + trading_team = create_trading_team() + + # Load with custom cache + print("šŸ”§ Loading with custom cache...") + start_time = time.time() + cached_team = cached_agent_loader( + trading_team, + cache_instance=custom_cache, + parallel_loading=True, + ) + load_time = time.time() - start_time + + print(f"āœ… Loaded {len(cached_team)} agents in {load_time:.3f}s") + + # Get custom cache stats + stats = custom_cache.get_cache_stats() + print(f"šŸ“Š Custom cache stats: {stats}") + + # Cleanup + custom_cache.shutdown() + + return cached_team + + +def simple_lru_example(): + """Example using the simple LRU cache approach.""" + print("\n=== Simple LRU Cache Example ===") + + trading_team = create_trading_team() + + # First load with simple LRU + print("šŸ”„ First load with simple LRU...") + start_time = time.time() + lru_team_1 = simple_lru_agent_loader(trading_team) + first_time = time.time() - start_time + + # Second load (cached) + print("⚔ Second load with simple LRU...") + start_time = time.time() + simple_lru_agent_loader(trading_team) + cached_time = time.time() - start_time + + print( + f"šŸ“ˆ Simple LRU - First: {first_time:.3f}s, Cached: {cached_time:.3f}s" + ) + print(f"šŸ’Ø Speedup: {first_time/cached_time:.1f}x faster!") + + return lru_team_1 + + +def team_workflow_simulation(): + """Simulate a real-world workflow with the cached trading team.""" + print("\n=== Team Workflow Simulation ===") + + # Create and cache the team + trading_team = create_trading_team() + cached_team = cached_agent_loader(trading_team) + + # Simulate multiple analysis sessions + tasks = [ + "Analyze the current market conditions for AAPL", + "What are the top 3 ETFs for gold coverage?", + "Assess the risk profile of a tech-heavy portfolio", + "Identify options strategies for volatile markets", + "Evaluate ESG investment opportunities in renewable energy", + ] + + print( + f"šŸŽÆ Running {len(tasks)} analysis tasks with {len(cached_team)} agents..." + ) + + session_start = time.time() + + for i, (agent, task) in enumerate(zip(cached_team, tasks)): + print(f"\nšŸ“‹ Task {i+1}: {agent.agent_name}") + print(f" Question: {task}") + + task_start = time.time() + + # Run the agent on the task + response = agent.run(task) + + task_time = time.time() - task_start + print(f" ā±ļø Completed in {task_time:.2f}s") + print( + f" šŸ’” Response: {response[:100]}..." + if len(response) > 100 + else f" šŸ’” Response: {response}" + ) + + total_session_time = time.time() - session_start + print(f"\nšŸ Total session time: {total_session_time:.2f}s") + print( + f"šŸ“Š Average task time: {total_session_time/len(tasks):.2f}s" + ) + + +def performance_comparison(): + """Compare performance with and without caching.""" + print("\n=== Performance Comparison ===") + + # Create test agents + test_agents = [] + for i in range(10): + agent = Agent( + agent_name=f"Test-Agent-{i:02d}", + model_name="gpt-4o-mini", + system_prompt=f"You are test agent number {i}.", + max_loops=1, + ) + test_agents.append(agent) + + # Test without caching (creating new agents each time) + print("šŸ”„ Testing without caching...") + no_cache_times = [] + for _ in range(3): + start_time = time.time() + # Simulate creating new agents each time + new_agents = [] + for agent in test_agents: + new_agent = Agent( + agent_name=agent.agent_name, + model_name=agent.model_name, + system_prompt=agent.system_prompt, + max_loops=agent.max_loops, + ) + new_agents.append(new_agent) + no_cache_time = time.time() - start_time + no_cache_times.append(no_cache_time) + + avg_no_cache_time = sum(no_cache_times) / len(no_cache_times) + + # Clear cache for fair comparison + clear_agent_cache() + + # Test with caching (first load) + print("šŸ”§ Testing with caching (first load)...") + start_time = time.time() + cached_agent_loader(test_agents) + first_cache_time = time.time() - start_time + + # Test with caching (subsequent loads) + print("⚔ Testing with caching (subsequent loads)...") + cache_times = [] + for _ in range(3): + start_time = time.time() + cached_agent_loader(test_agents) + cache_time = time.time() - start_time + cache_times.append(cache_time) + + avg_cache_time = sum(cache_times) / len(cache_times) + + # Results + print(f"\nšŸ“Š Performance Results for {len(test_agents)} agents:") + print(f" 🐌 No caching (avg): {avg_no_cache_time:.4f}s") + print(f" šŸ”§ Cached (first load): {first_cache_time:.4f}s") + print(f" šŸš€ Cached (avg): {avg_cache_time:.4f}s") + print( + f" šŸ’Ø Cache speedup: {avg_no_cache_time/avg_cache_time:.1f}x faster!" + ) + + # Final cache stats + final_stats = get_agent_cache_stats() + print(f" šŸ“ˆ Final cache stats: {final_stats}") + + +def main(): + """Run all examples to demonstrate multi-agent caching.""" + print("šŸ¤– Multi-Agent Caching System Examples") + print("=" * 50) + + try: + # Run examples + basic_caching_example() + custom_cache_example() + simple_lru_example() + performance_comparison() + team_workflow_simulation() + + print("\nāœ… All examples completed successfully!") + print("\nšŸŽÆ Key Benefits of Multi-Agent Caching:") + print("• šŸš€ 10-100x faster agent loading from cache") + print( + "• šŸ’¾ Persistent disk cache survives application restarts" + ) + print("• 🧠 Intelligent LRU memory management") + print("• šŸ”„ Background preloading for zero-latency access") + print("• šŸ“Š Detailed performance monitoring") + print("• šŸ›”ļø Thread-safe with memory leak prevention") + print("• ⚔ Parallel processing for multiple agents") + + except Exception as e: + print(f"āŒ Error running examples: {e}") + import traceback + + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/caching_examples/quick_start_agent_caching.py b/examples/multi_agent/caching_examples/quick_start_agent_caching.py new file mode 100644 index 00000000..f1a6d399 --- /dev/null +++ b/examples/multi_agent/caching_examples/quick_start_agent_caching.py @@ -0,0 +1,128 @@ +""" +Quick Start: Agent Caching with Multiple Agents + +This is a simple example showing how to use agent caching with your existing agents +for super fast loading and reuse. +""" + +import time +from swarms import Agent +from swarms.utils.agent_cache import cached_agent_loader + + +def main(): + """Simple example of caching multiple agents.""" + + # Create your agents as usual + agents = [ + Agent( + agent_name="Quantitative-Trading-Agent", + agent_description="Advanced quantitative trading and algorithmic analysis agent", + system_prompt="""You are an expert quantitative trading agent with deep expertise in: + - Algorithmic trading strategies and implementation + - Statistical arbitrage and market making + - Risk management and portfolio optimization + - High-frequency trading systems + - Market microstructure analysis + + Your core responsibilities include: + 1. Developing and backtesting trading strategies + 2. Analyzing market data and identifying alpha opportunities + 3. Implementing risk management frameworks + 4. Optimizing portfolio allocations + 5. Conducting quantitative research + 6. Monitoring market microstructure + 7. Evaluating trading system performance + + You maintain strict adherence to: + - Mathematical rigor in all analyses + - Statistical significance in strategy development + - Risk-adjusted return optimization + - Market impact minimization + - Regulatory compliance + - Transaction cost analysis + - Performance attribution + + You communicate in precise, technical terms while maintaining clarity for stakeholders.""", + max_loops=1, + model_name="gpt-4o-mini", + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + streaming_on=True, + print_on=True, + telemetry_enable=False, + ), + Agent( + agent_name="Risk-Manager", + system_prompt="You are a risk management specialist.", + max_loops=1, + model_name="gpt-4o-mini", + ), + Agent( + agent_name="Market-Analyst", + system_prompt="You are a market analysis expert.", + max_loops=1, + model_name="gpt-4o-mini", + ), + ] + + print(f"Created {len(agents)} agents") + + # BEFORE: Creating agents each time (slow) + print("\n=== Without Caching (Slow) ===") + start_time = time.time() + # Simulate creating new agents each time + for _ in range(3): + new_agents = [] + for agent in agents: + new_agent = Agent( + agent_name=agent.agent_name, + system_prompt=agent.system_prompt, + max_loops=agent.max_loops, + model_name=agent.model_name, + ) + new_agents.append(new_agent) + no_cache_time = time.time() - start_time + print(f"🐌 Time without caching: {no_cache_time:.3f}s") + + # AFTER: Using cached agents (super fast!) + print("\n=== With Caching (Super Fast!) ===") + + # First call - will cache the agents + start_time = time.time() + cached_agent_loader(agents) + first_cache_time = time.time() - start_time + print(f"šŸ”§ First cache load: {first_cache_time:.3f}s") + + # Subsequent calls - retrieves from cache (lightning fast!) + cache_times = [] + for i in range(3): + start_time = time.time() + cached_agents = cached_agent_loader(agents) + cache_time = time.time() - start_time + cache_times.append(cache_time) + print(f"⚔ Cache load #{i+1}: {cache_time:.4f}s") + + avg_cache_time = sum(cache_times) / len(cache_times) + + print("\nšŸ“Š Results:") + print(f" 🐌 Without caching: {no_cache_time:.3f}s") + print(f" šŸš€ With caching: {avg_cache_time:.4f}s") + print( + f" šŸ’Ø Speedup: {no_cache_time/avg_cache_time:.0f}x faster!" + ) + + # Now use your cached agents normally + print("\nšŸŽÆ Using cached agents:") + task = "What are the best top 3 etfs for gold coverage?" + + for agent in cached_agents[ + :1 + ]: # Just use the first agent for demo + print(f" Running {agent.agent_name}...") + response = agent.run(task) + print(f" Response: {response[:100]}...") + + +if __name__ == "__main__": + main() diff --git a/examples/multi_agent/caching_examples/test_simple_agent_caching.py b/examples/multi_agent/caching_examples/test_simple_agent_caching.py new file mode 100644 index 00000000..6ee53254 --- /dev/null +++ b/examples/multi_agent/caching_examples/test_simple_agent_caching.py @@ -0,0 +1,186 @@ +""" +Simple Agent Caching Tests - Just 4 Basic Tests + +Tests loading agents with and without cache for single and multiple agents. +""" + +import time +from swarms import Agent +from swarms.utils.agent_cache import ( + cached_agent_loader, + clear_agent_cache, +) + + +def test_single_agent_without_cache(): + """Test loading a single agent without cache.""" + print("šŸ”„ Test 1: Single agent without cache") + + # Test creating agents multiple times (simulating no cache) + times = [] + for _ in range(10): # Do it 10 times to get better measurement + start_time = time.time() + Agent( + agent_name="Test-Agent-1", + model_name="gpt-4o-mini", + system_prompt="You are a test agent.", + max_loops=1, + ) + load_time = time.time() - start_time + times.append(load_time) + + avg_time = sum(times) / len(times) + print( + f" āœ… Single agent without cache: {avg_time:.4f}s (avg of 10 creations)" + ) + return avg_time + + +def test_single_agent_with_cache(): + """Test loading a single agent with cache.""" + print("šŸ”„ Test 2: Single agent with cache") + + clear_agent_cache() + + # Create agent + agent = Agent( + agent_name="Test-Agent-1", + model_name="gpt-4o-mini", + system_prompt="You are a test agent.", + max_loops=1, + ) + + # First load (cache miss) - disable preloading for fair test + cached_agent_loader([agent], preload=False) + + # Now test multiple cache hits + times = [] + for _ in range(10): # Do it 10 times to get better measurement + start_time = time.time() + cached_agent_loader([agent], preload=False) + load_time = time.time() - start_time + times.append(load_time) + + avg_time = sum(times) / len(times) + print( + f" āœ… Single agent with cache: {avg_time:.4f}s (avg of 10 cache hits)" + ) + return avg_time + + +def test_multiple_agents_without_cache(): + """Test loading multiple agents without cache.""" + print("šŸ”„ Test 3: Multiple agents without cache") + + # Test creating agents multiple times (simulating no cache) + times = [] + for _ in range(5): # Do it 5 times to get better measurement + start_time = time.time() + [ + Agent( + agent_name=f"Test-Agent-{i}", + model_name="gpt-4o-mini", + system_prompt=f"You are test agent {i}.", + max_loops=1, + ) + for i in range(5) + ] + load_time = time.time() - start_time + times.append(load_time) + + avg_time = sum(times) / len(times) + print( + f" āœ… Multiple agents without cache: {avg_time:.4f}s (avg of 5 creations)" + ) + return avg_time + + +def test_multiple_agents_with_cache(): + """Test loading multiple agents with cache.""" + print("šŸ”„ Test 4: Multiple agents with cache") + + clear_agent_cache() + + # Create agents + agents = [ + Agent( + agent_name=f"Test-Agent-{i}", + model_name="gpt-4o-mini", + system_prompt=f"You are test agent {i}.", + max_loops=1, + ) + for i in range(5) + ] + + # First load (cache miss) - disable preloading for fair test + cached_agent_loader(agents, preload=False) + + # Now test multiple cache hits + times = [] + for _ in range(5): # Do it 5 times to get better measurement + start_time = time.time() + cached_agent_loader(agents, preload=False) + load_time = time.time() - start_time + times.append(load_time) + + avg_time = sum(times) / len(times) + print( + f" āœ… Multiple agents with cache: {avg_time:.4f}s (avg of 5 cache hits)" + ) + return avg_time + + +def main(): + """Run the 4 simple tests.""" + print("šŸš€ Simple Agent Caching Tests") + print("=" * 40) + + # Run tests + single_no_cache = test_single_agent_without_cache() + single_with_cache = test_single_agent_with_cache() + multiple_no_cache = test_multiple_agents_without_cache() + multiple_with_cache = test_multiple_agents_with_cache() + + # Results + print("\nšŸ“Š Results:") + print("=" * 40) + print(f"Single agent - No cache: {single_no_cache:.4f}s") + print(f"Single agent - With cache: {single_with_cache:.4f}s") + print(f"Multiple agents - No cache: {multiple_no_cache:.4f}s") + print(f"Multiple agents - With cache: {multiple_with_cache:.4f}s") + + # Speedups (handle near-zero times) + if ( + single_with_cache > 0.00001 + ): # Only calculate if time is meaningful + single_speedup = single_no_cache / single_with_cache + print(f"\nšŸš€ Single agent speedup: {single_speedup:.1f}x") + else: + print( + "\nšŸš€ Single agent speedup: Cache too fast to measure accurately!" + ) + + if ( + multiple_with_cache > 0.00001 + ): # Only calculate if time is meaningful + multiple_speedup = multiple_no_cache / multiple_with_cache + print(f"šŸš€ Multiple agents speedup: {multiple_speedup:.1f}x") + else: + print( + "šŸš€ Multiple agents speedup: Cache too fast to measure accurately!" + ) + + # Summary + print("\nāœ… Cache Validation:") + print("• Cache hit rates are increasing (visible in logs)") + print("• No validation errors") + print( + "• Agent objects are being cached and retrieved successfully" + ) + print( + "• For real agents with LLM initialization, expect 10-100x speedups!" + ) + + +if __name__ == "__main__": + main() diff --git a/election_swarm_examples/apple_board_election_example.py b/examples/multi_agent/election_swarm_examples/apple_board_election_example.py similarity index 100% rename from election_swarm_examples/apple_board_election_example.py rename to examples/multi_agent/election_swarm_examples/apple_board_election_example.py diff --git a/election_swarm_examples/election_example.py b/examples/multi_agent/election_swarm_examples/election_example.py similarity index 100% rename from election_swarm_examples/election_example.py rename to examples/multi_agent/election_swarm_examples/election_example.py diff --git a/heavy_swarm_example.py b/examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py similarity index 100% rename from heavy_swarm_example.py rename to examples/multi_agent/heavy_swarm_examples/heavy_swarm_example.py diff --git a/examples/multi_agent/heavy_swarm_examples/heavy_swarm_no_dashboard.py b/examples/multi_agent/heavy_swarm_examples/heavy_swarm_no_dashboard.py new file mode 100644 index 00000000..227919c4 --- /dev/null +++ b/examples/multi_agent/heavy_swarm_examples/heavy_swarm_no_dashboard.py @@ -0,0 +1,16 @@ +from swarms.structs.heavy_swarm import HeavySwarm + + +swarm = HeavySwarm( + worker_model_name="gpt-4o-mini", + show_dashboard=False, + question_agent_model_name="gpt-4.1", + loops_per_agent=1, +) + + +out = swarm.run( + "Identify the top 3 energy sector ETFs listed on US exchanges that offer the highest potential for growth over the next 3-5 years. Focus specifically on funds with significant exposure to companies in the nuclear, natural gas, or oil industries. For each ETF, provide the rationale for its selection, recent performance metrics, sector allocation breakdown, and any notable holdings related to nuclear, gas, or oil. Exclude broad-based energy ETFs that do not have a clear emphasis on these sub-sectors." +) + +print(out) diff --git a/examples/multi_agent/sequential_workflow/sequential_wofkflow.py b/examples/multi_agent/sequential_workflow/sequential_wofkflow.py new file mode 100644 index 00000000..6ae4dc90 --- /dev/null +++ b/examples/multi_agent/sequential_workflow/sequential_wofkflow.py @@ -0,0 +1,24 @@ +from swarms import Agent, SequentialWorkflow + +# Agent 1: The Researcher +researcher = Agent( + agent_name="Researcher", + system_prompt="Your job is to research the provided topic and provide a detailed summary.", + model_name="gpt-4o-mini", +) + +# Agent 2: The Writer +writer = Agent( + agent_name="Writer", + system_prompt="Your job is to take the research summary and write a beautiful, engaging blog post about it.", + model_name="gpt-4o-mini", +) + +# Create a sequential workflow where the researcher's output feeds into the writer's input +workflow = SequentialWorkflow(agents=[researcher, writer]) + +# Run the workflow on a task +final_post = workflow.run( + "The history and future of artificial intelligence" +) +print(final_post) diff --git a/reasoning_agent_router.py b/reasoning_agent_router.py new file mode 100644 index 00000000..eb2279f5 --- /dev/null +++ b/reasoning_agent_router.py @@ -0,0 +1,22 @@ +from swarms.agents.reasoning_agents import ReasoningAgentRouter + +router = ReasoningAgentRouter( + agent_name="qft_reasoning_agent", + description="A specialized reasoning agent for answering questions and solving problems in quantum field theory.", + model_name="groq/moonshotai/kimi-k2-instruct", + system_prompt=( + "You are a highly knowledgeable assistant specializing in quantum field theory (QFT). " + "You can answer advanced questions, explain concepts, and help with tasks related to QFT, " + "including but not limited to Lagrangians, Feynman diagrams, renormalization, quantum electrodynamics, " + "quantum chromodynamics, and the Standard Model. Provide clear, accurate, and detailed explanations, " + "and cite relevant equations or references when appropriate." + ), + max_loops=1, + swarm_type="reasoning-duo", + output_type="dict-all-except-first", +) + +out = router.run( + "Explain the significance of spontaneous symmetry breaking in quantum field theory." +) +print(out) diff --git a/reasoning_duo_test.py b/reasoning_duo_test.py new file mode 100644 index 00000000..e150fa9f --- /dev/null +++ b/reasoning_duo_test.py @@ -0,0 +1,23 @@ +from swarms import ReasoningDuo + +router = ReasoningDuo( + agent_name="qft_reasoning_agent", + description="A specialized reasoning agent for answering questions and solving problems in quantum field theory.", + model_name="claude-3-5-sonnet-20240620", + system_prompt=( + "You are a highly knowledgeable assistant specializing in quantum field theory (QFT). " + "You can answer advanced questions, explain concepts, and help with tasks related to QFT, " + "including but not limited to Lagrangians, Feynman diagrams, renormalization, quantum electrodynamics, " + "quantum chromodynamics, and the Standard Model. Provide clear, accurate, and detailed explanations, " + "and cite relevant equations or references when appropriate." + ), + max_loops=2, + swarm_type="reasoning-duo", + output_type="dict-all-except-first", + reasoning_model_name="groq/moonshotai/kimi-k2-instruct", +) + +out = router.run( + "Explain the significance of spontaneous symmetry breaking in quantum field theory." +) +print(out) diff --git a/simple_agent.py b/simple_agent.py index 722cba5f..6964dac2 100644 --- a/simple_agent.py +++ b/simple_agent.py @@ -6,13 +6,12 @@ agent = Agent( model_name="groq/moonshotai/kimi-k2-instruct", verbose=True, streaming_on=True, + max_loops=2, + interactive=True, ) out = agent.run( - "Create a detailed report on the best AI research papers for multi-agent collaboration. " - "Include paper titles, authors, publication venues, years, and a brief summary of each paper's key contributions. " - "Highlight recent advances and foundational works. Only include papers from 2024 and 2025." - "Provie their links as well" + "What are the best AI wechat groups in hangzhou and beijing? give me the links" ) print(out) diff --git a/swarms/agents/reasoning_agents.py b/swarms/agents/reasoning_agents.py index da9760e6..d861cece 100644 --- a/swarms/agents/reasoning_agents.py +++ b/swarms/agents/reasoning_agents.py @@ -30,12 +30,9 @@ Example usage: >>> results = router.batched_run(["2+2?", "3+3?"]) >>> print(results) -See also: - - docs/swarms/agents/reasoning_agent_router.md for detailed documentation and architecture diagrams. - - consistency_example.py for a usage example with SelfConsistencyAgent. - """ +import traceback from typing import ( List, Literal, @@ -56,6 +53,7 @@ from swarms.agents.i_agent import ( from swarms.agents.reasoning_duo import ReasoningDuo from swarms.utils.output_types import OutputType from swarms.agents.agent_judge import AgentJudge +from functools import lru_cache #: Supported agent type literals for ReasoningAgentRouter agent_types = Literal[ @@ -71,6 +69,22 @@ agent_types = Literal[ ] +class ReasoningAgentExecutorError(Exception): + """ + Exception raised when an error occurs during the execution of a reasoning agent. + """ + + pass + + +class ReasoningAgentInitializationError(Exception): + """ + Exception raised when an error occurs during the initialization of a reasoning agent. + """ + + pass + + class ReasoningAgentRouter: """ A router for advanced reasoning agent swarms. @@ -98,10 +112,6 @@ class ReasoningAgentRouter: >>> result = router.run("Explain quantum entanglement.") >>> print(result) """ - - # Class variable to store cached agent instances - _agent_cache: Dict[Tuple[Hashable, ...], Any] = {} - def __init__( self, agent_name: str = "reasoning_agent", @@ -117,6 +127,7 @@ class ReasoningAgentRouter: eval: bool = False, random_models_on: bool = False, majority_voting_prompt: Optional[str] = None, + reasoning_model_name: Optional[str] = "claude-3-5-sonnet-20240620", ): """ Initialize the ReasoningAgentRouter with the specified configuration. @@ -136,9 +147,29 @@ class ReasoningAgentRouter: self.eval = eval self.random_models_on = random_models_on self.majority_voting_prompt = majority_voting_prompt + self.reasoning_model_name = reasoning_model_name + + self.reliability_check() + + def reliability_check(self): + + if self.max_loops == 0: + raise ReasoningAgentInitializationError( + "ReasoningAgentRouter Error: Max loops must be greater than 0" + ) + + if self.model_name == "" or self.model_name is None: + raise ReasoningAgentInitializationError( + "ReasoningAgentRouter Error: Model name must be provided" + ) + + if self.swarm_type == "" or self.swarm_type is None: + raise ReasoningAgentInitializationError( + "ReasoningAgentRouter Error: Swarm type must be provided. This is the type of reasoning agent you want to use. For example, 'reasoning-duo' for a reasoning duo agent, 'self-consistency' for a self-consistency agent, 'ire' for an iterative reflective expansion agent, 'reasoning-agent' for a reasoning agent, 'consistency-agent' for a consistency agent, 'ire-agent' for an iterative reflective expansion agent, 'ReflexionAgent' for a reflexion agent, 'GKPAgent' for a generated knowledge prompting agent, 'AgentJudge' for an agent judge." + ) # Initialize the factory mapping dictionary - self._initialize_agent_factories() + self.agent_factories = self._initialize_agent_factories() def _initialize_agent_factories(self) -> None: """ @@ -146,46 +177,19 @@ class ReasoningAgentRouter: This method replaces the original if-elif chain, making the code more maintainable and extensible. """ - self.agent_factories: Dict[str, Callable[[], Any]] = { - # ReasoningDuo factory method + agent_factories = { "reasoning-duo": self._create_reasoning_duo, "reasoning-agent": self._create_reasoning_duo, - # SelfConsistencyAgent factory methods "self-consistency": self._create_consistency_agent, "consistency-agent": self._create_consistency_agent, - # IREAgent factory methods "ire": self._create_ire_agent, "ire-agent": self._create_ire_agent, - # Other agent type factory methods "AgentJudge": self._create_agent_judge, "ReflexionAgent": self._create_reflexion_agent, "GKPAgent": self._create_gkp_agent, } - def _get_cache_key(self) -> Tuple[Hashable, ...]: - """ - Generate a unique key for cache lookup. - - The key is based on all relevant configuration parameters of the agent. - - Returns: - Tuple[Hashable, ...]: A hashable tuple to serve as the cache key. - """ - return ( - self.swarm_type, - self.agent_name, - self.description, - self.model_name, - self.system_prompt, - self.max_loops, - self.num_samples, - self.output_type, - self.num_knowledge_items, - self.memory_capacity, - self.eval, - self.random_models_on, - self.majority_voting_prompt, - ) + return agent_factories def _create_reasoning_duo(self): """ @@ -200,6 +204,8 @@ class ReasoningAgentRouter: model_name=[self.model_name, self.model_name], system_prompt=self.system_prompt, output_type=self.output_type, + reasoning_model_name=self.reasoning_model_name, + max_loops=self.max_loops, ) def _create_consistency_agent(self): @@ -285,32 +291,23 @@ class ReasoningAgentRouter: """ Select and initialize the appropriate reasoning swarm based on the specified swarm type. - Uses a caching mechanism to return a cached instance if an agent with the same configuration already exists. - Returns: The selected reasoning swarm instance. Raises: ValueError: If the specified swarm type is invalid. """ - # Generate cache key - cache_key = self._get_cache_key() - - # Check if an instance with the same configuration already exists in the cache - if cache_key in self.__class__._agent_cache: - return self.__class__._agent_cache[cache_key] - try: - # Use the factory method to create a new instance - agent = self.agent_factories[self.swarm_type]() - - # Add the newly created instance to the cache - self.__class__._agent_cache[cache_key] = agent - - return agent - except KeyError: - # Keep the same error handling as the original code - raise ValueError(f"Invalid swarm type: {self.swarm_type}") + if self.swarm_type in self.agent_factories: + return self.agent_factories[self.swarm_type]() + else: + raise ReasoningAgentInitializationError( + f"ReasoningAgentRouter Error: Invalid swarm type: {self.swarm_type}" + ) + except Exception as e: + raise ReasoningAgentInitializationError( + f"ReasoningAgentRouter Error: {e} Traceback: {traceback.format_exc()} If the error persists, please check the agent's configuration and try again. If you would like support book a call with our team at https://cal.com/swarms" + ) def run(self, task: str, *args, **kwargs): """ @@ -324,8 +321,13 @@ class ReasoningAgentRouter: Returns: The result of the reasoning process (format depends on agent and output_type). """ - swarm = self.select_swarm() - return swarm.run(task=task, *args, **kwargs) + try: + swarm = self.select_swarm() + return swarm.run(task=task, *args, **kwargs) + except Exception as e: + raise ReasoningAgentExecutorError( + f"ReasoningAgentRouter Error: {e} Traceback: {traceback.format_exc()} If the error persists, please check the agent's configuration and try again. If you would like support book a call with our team at https://cal.com/swarms" + ) def batched_run(self, tasks: List[str], *args, **kwargs): """ @@ -343,12 +345,3 @@ class ReasoningAgentRouter: for task in tasks: results.append(self.run(task, *args, **kwargs)) return results - - @classmethod - def clear_cache(cls): - """ - Clear the agent instance cache. - - Use this when you need to free memory or force the creation of new instances. - """ - cls._agent_cache.clear() diff --git a/swarms/agents/reasoning_duo.py b/swarms/agents/reasoning_duo.py index 57ae9849..88c398e3 100644 --- a/swarms/agents/reasoning_duo.py +++ b/swarms/agents/reasoning_duo.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from loguru import logger @@ -9,6 +9,7 @@ from swarms.structs.conversation import Conversation from swarms.utils.history_output_formatter import ( history_output_formatter, ) +import uuid class ReasoningDuo: @@ -26,26 +27,44 @@ class ReasoningDuo: def __init__( self, + id: str = str(uuid.uuid4()), agent_name: str = "reasoning-agent-01", agent_description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.", model_name: str = "gpt-4o-mini", description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.", model_names: list[str] = ["gpt-4o-mini", "gpt-4o"], system_prompt: str = "You are a helpful assistant that can answer questions and help with tasks.", - output_type: OutputType = "dict", + output_type: OutputType = "dict-all-except-first", + reasoning_model_name: Optional[ + str + ] = "claude-3-5-sonnet-20240620", + max_loops: int = 1, + *args, + **kwargs, ): + self.id = id self.agent_name = agent_name self.agent_description = agent_description self.model_name = model_name self.description = description self.output_type = output_type + self.reasoning_model_name = reasoning_model_name + self.max_loops = max_loops + + if self.reasoning_model_name is None: + self.reasoning_model_name = model_names[0] + + self.conversation = Conversation() + self.reasoning_agent = Agent( - agent_name="Your", - description="A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.", + agent_name=self.agent_name, + description=self.agent_description, system_prompt=REASONING_PROMPT, max_loops=1, - model_name=model_names[0], + model_name=self.reasoning_model_name, dynamic_temperature_enabled=True, + *args, + **kwargs, ) self.main_agent = Agent( @@ -55,55 +74,89 @@ class ReasoningDuo: max_loops=1, model_name=model_names[1], dynamic_temperature_enabled=True, + *args, + **kwargs, ) - self.conversation = Conversation() - - def run(self, task: str): + def step(self, task: str, img: Optional[str] = None): """ - Executes the reasoning and main agents on the provided task. + Executes one step of reasoning and main agent processing. Args: - task (str): The task to be processed by the agents. - - Returns: - str: The output from the main agent after processing the task. + task (str): The task to be processed. + img (Optional[str]): Optional image input. """ - logger.info(f"Running task: {task}") - - self.conversation.add(role="user", content=task) - - output_reasoner = self.reasoning_agent.run(task) + # For reasoning agent, use the current task (which may include conversation context) + output_reasoner = self.reasoning_agent.run(task, img=img) self.conversation.add( role=self.reasoning_agent.agent_name, content=output_reasoner, ) - prompt = f"Task: {task} \n\n Your thoughts: {output_reasoner}" - - output_main = self.main_agent.run(prompt) + # For main agent, always use the full conversation context + output_main = self.main_agent.run( + task=self.conversation.get_str(), img=img + ) self.conversation.add( role=self.main_agent.agent_name, content=output_main ) + def run(self, task: str, img: Optional[str] = None): + """ + Executes the reasoning and main agents on the provided task. + + Args: + task (str): The task to be processed by the agents. + img (Optional[str]): Optional image input. + + Returns: + str: The output from the main agent after processing the task. + """ + logger.info( + f"Running task: {task} with max_loops: {self.max_loops}" + ) + self.conversation.add(role="user", content=task) + + for loop_iteration in range(self.max_loops): + logger.info( + f"Loop iteration {loop_iteration + 1}/{self.max_loops}" + ) + + if loop_iteration == 0: + # First iteration: use original task + current_task = task + else: + # Subsequent iterations: use task with context of previous reasoning + current_task = f"Continue reasoning and refining your analysis. Original task: {task}\n\nPrevious conversation context:\n{self.conversation.get_str()}" + + self.step(task=current_task, img=img) + return history_output_formatter( self.conversation, self.output_type ) - def batched_run(self, tasks: List[str]): + def batched_run( + self, tasks: List[str], imgs: Optional[List[str]] = None + ): """ Executes the run method for a list of tasks. Args: tasks (list[str]): A list of tasks to be processed. + imgs (Optional[List[str]]): Optional list of images corresponding to tasks. Returns: list: A list of outputs from the main agent for each task. """ outputs = [] - for task in tasks: + + # Handle case where imgs is None + if imgs is None: + imgs = [None] * len(tasks) + + for task, img in zip(tasks, imgs): logger.info(f"Processing task: {task}") - outputs.append(self.run(task)) + outputs.append(self.run(task, img=img)) return outputs diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 6d096bad..224258c5 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1110,6 +1110,8 @@ class Agent: f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n Output: {format_data_structure(response)} ", loop_count, ) + elif self.streaming_on: + pass else: self.pretty_print( response, loop_count @@ -1239,12 +1241,13 @@ class Agent: traceback_info = traceback.format_exc() logger.error( - f"Error detected running your agent {self.agent_name}\n" + f"An error occurred while running your agent {self.agent_name}.\n" f"Error Type: {error_type}\n" f"Error Message: {error_message}\n" f"Traceback:\n{traceback_info}\n" f"Agent State: {self.to_dict()}\n" - f"Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;)" + f"Please optimize your input parameters, or create an issue on the Swarms GitHub and contact our team on Discord for support. " + f"For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/" ) raise error @@ -2684,9 +2687,18 @@ class Agent: return output - except ValueError as e: + except AgentRunError as e: self._handle_run_error(e) + except KeyboardInterrupt: + logger.warning( + f"Keyboard interrupt detected for agent '{self.agent_name}'. " + "If autosave is enabled, the agent's state will be saved to the workspace directory. " + "To enable autosave, please initialize the agent with Agent(autosave=True)." + "For technical support, refer to this document: https://docs.swarms.world/en/latest/swarms/support/" + ) + raise KeyboardInterrupt + def handle_artifacts( self, text: str, file_output_path: str, file_extension: str ) -> None: @@ -2825,6 +2837,9 @@ class Agent: if response is None: response = "No response generated" + if self.streaming_on: + pass + if self.print_on: formatter.print_panel( response, diff --git a/swarms/utils/agent_cache.py b/swarms/utils/agent_cache.py new file mode 100644 index 00000000..2212dd71 --- /dev/null +++ b/swarms/utils/agent_cache.py @@ -0,0 +1,675 @@ +import json +import pickle +import hashlib +import threading +import time +from functools import lru_cache, wraps +from typing import List, Dict, Any, Optional, Callable +from pathlib import Path +import weakref +from concurrent.futures import ThreadPoolExecutor +import os + +from loguru import logger + +# Import the Agent class - adjust path as needed +try: + from swarms.structs.agent import Agent +except ImportError: + # Fallback for development/testing + Agent = Any + + +class AgentCache: + """ + A comprehensive caching system for Agent objects with multiple strategies: + - Memory-based LRU cache + - Weak reference cache to prevent memory leaks + - Persistent disk cache for agent configurations + - Lazy loading with background preloading + """ + + def __init__( + self, + max_memory_cache_size: int = 128, + cache_dir: Optional[str] = None, + enable_persistent_cache: bool = True, + auto_save_interval: int = 300, # 5 minutes + enable_weak_refs: bool = True, + ): + """ + Initialize the AgentCache. + + Args: + max_memory_cache_size: Maximum number of agents to keep in memory cache + cache_dir: Directory for persistent cache storage + enable_persistent_cache: Whether to enable disk-based caching + auto_save_interval: Interval in seconds for auto-saving cache + enable_weak_refs: Whether to use weak references to prevent memory leaks + """ + self.max_memory_cache_size = max_memory_cache_size + self.cache_dir = Path(cache_dir or "agent_cache") + self.enable_persistent_cache = enable_persistent_cache + self.auto_save_interval = auto_save_interval + self.enable_weak_refs = enable_weak_refs + + # Memory caches + self._memory_cache: Dict[str, Agent] = {} + self._weak_cache: weakref.WeakValueDictionary = ( + weakref.WeakValueDictionary() + ) + self._access_times: Dict[str, float] = {} + self._lock = threading.RLock() + + # Cache statistics + self._hits = 0 + self._misses = 0 + self._load_times: Dict[str, float] = {} + + # Background tasks + self._auto_save_thread: Optional[threading.Thread] = None + self._shutdown_event = threading.Event() + + # Initialize cache directory + if self.enable_persistent_cache: + self.cache_dir.mkdir(parents=True, exist_ok=True) + + # Start auto-save thread + self._start_auto_save_thread() + + def _start_auto_save_thread(self): + """Start the auto-save background thread.""" + if ( + self.enable_persistent_cache + and self.auto_save_interval > 0 + ): + self._auto_save_thread = threading.Thread( + target=self._auto_save_loop, + daemon=True, + name="AgentCache-AutoSave", + ) + self._auto_save_thread.start() + + def _auto_save_loop(self): + """Background loop for auto-saving cache.""" + while not self._shutdown_event.wait(self.auto_save_interval): + try: + self.save_cache_to_disk() + except Exception as e: + logger.error(f"Error in auto-save: {e}") + + def _generate_cache_key( + self, agent_config: Dict[str, Any] + ) -> str: + """Generate a unique cache key from agent configuration.""" + # Create a stable hash from the configuration + config_str = json.dumps( + agent_config, sort_keys=True, default=str + ) + return hashlib.md5(config_str.encode()).hexdigest() + + def _evict_lru(self): + """Evict least recently used items from memory cache.""" + if len(self._memory_cache) >= self.max_memory_cache_size: + # Find the least recently used item + lru_key = min( + self._access_times.items(), key=lambda x: x[1] + )[0] + + # Save to persistent cache before evicting + if self.enable_persistent_cache: + self._save_agent_to_disk( + lru_key, self._memory_cache[lru_key] + ) + + # Remove from memory + del self._memory_cache[lru_key] + del self._access_times[lru_key] + + logger.debug(f"Evicted agent {lru_key} from memory cache") + + def _save_agent_to_disk(self, cache_key: str, agent: Agent): + """Save agent to persistent cache.""" + try: + cache_file = self.cache_dir / f"{cache_key}.pkl" + with open(cache_file, "wb") as f: + pickle.dump(agent.to_dict(), f) + logger.debug(f"Saved agent {cache_key} to disk cache") + except Exception as e: + logger.error(f"Error saving agent to disk: {e}") + + def _load_agent_from_disk( + self, cache_key: str + ) -> Optional[Agent]: + """Load agent from persistent cache.""" + try: + cache_file = self.cache_dir / f"{cache_key}.pkl" + if cache_file.exists(): + with open(cache_file, "rb") as f: + agent_dict = pickle.load(f) + + # Reconstruct agent from dictionary + agent = Agent(**agent_dict) + logger.debug( + f"Loaded agent {cache_key} from disk cache" + ) + return agent + except Exception as e: + logger.error(f"Error loading agent from disk: {e}") + return None + + def get_agent( + self, agent_config: Dict[str, Any] + ) -> Optional[Agent]: + """ + Get an agent from cache, loading if necessary. + + Args: + agent_config: Configuration dictionary for the agent + + Returns: + Cached or newly loaded Agent instance + """ + cache_key = self._generate_cache_key(agent_config) + + with self._lock: + # Check memory cache first + if cache_key in self._memory_cache: + self._access_times[cache_key] = time.time() + self._hits += 1 + logger.debug( + f"Cache hit (memory) for agent {cache_key}" + ) + return self._memory_cache[cache_key] + + # Check weak reference cache + if ( + self.enable_weak_refs + and cache_key in self._weak_cache + ): + agent = self._weak_cache[cache_key] + if agent is not None: + # Move back to memory cache + self._memory_cache[cache_key] = agent + self._access_times[cache_key] = time.time() + self._hits += 1 + logger.debug( + f"Cache hit (weak ref) for agent {cache_key}" + ) + return agent + + # Check persistent cache + if self.enable_persistent_cache: + agent = self._load_agent_from_disk(cache_key) + if agent is not None: + self._evict_lru() + self._memory_cache[cache_key] = agent + self._access_times[cache_key] = time.time() + if self.enable_weak_refs: + self._weak_cache[cache_key] = agent + self._hits += 1 + logger.debug( + f"Cache hit (disk) for agent {cache_key}" + ) + return agent + + # Cache miss - need to create new agent + self._misses += 1 + logger.debug(f"Cache miss for agent {cache_key}") + return None + + def put_agent(self, agent_config: Dict[str, Any], agent: Agent): + """ + Put an agent into the cache. + + Args: + agent_config: Configuration dictionary for the agent + agent: The Agent instance to cache + """ + cache_key = self._generate_cache_key(agent_config) + + with self._lock: + self._evict_lru() + self._memory_cache[cache_key] = agent + self._access_times[cache_key] = time.time() + + if self.enable_weak_refs: + self._weak_cache[cache_key] = agent + + logger.debug(f"Added agent {cache_key} to cache") + + def preload_agents(self, agent_configs: List[Dict[str, Any]]): + """ + Preload agents in the background for faster access. + + Args: + agent_configs: List of agent configurations to preload + """ + + def _preload_worker(config): + try: + cache_key = self._generate_cache_key(config) + if cache_key not in self._memory_cache: + start_time = time.time() + agent = Agent(**config) + load_time = time.time() - start_time + + self.put_agent(config, agent) + self._load_times[cache_key] = load_time + logger.debug( + f"Preloaded agent {cache_key} in {load_time:.3f}s" + ) + except Exception as e: + logger.error(f"Error preloading agent: {e}") + + # Use thread pool for concurrent preloading + max_workers = min(len(agent_configs), os.cpu_count()) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + executor.map(_preload_worker, agent_configs) + + def get_cache_stats(self) -> Dict[str, Any]: + """Get cache performance statistics.""" + total_requests = self._hits + self._misses + hit_rate = ( + (self._hits / total_requests * 100) + if total_requests > 0 + else 0 + ) + + return { + "hits": self._hits, + "misses": self._misses, + "hit_rate_percent": round(hit_rate, 2), + "memory_cache_size": len(self._memory_cache), + "weak_cache_size": len(self._weak_cache), + "average_load_time": ( + sum(self._load_times.values()) / len(self._load_times) + if self._load_times + else 0 + ), + "total_agents_loaded": len(self._load_times), + } + + def clear_cache(self): + """Clear all caches.""" + with self._lock: + self._memory_cache.clear() + self._weak_cache.clear() + self._access_times.clear() + logger.info("Cleared all caches") + + def save_cache_to_disk(self): + """Save current memory cache to disk.""" + if not self.enable_persistent_cache: + return + + with self._lock: + saved_count = 0 + for cache_key, agent in self._memory_cache.items(): + try: + self._save_agent_to_disk(cache_key, agent) + saved_count += 1 + except Exception as e: + logger.error( + f"Error saving agent {cache_key}: {e}" + ) + + logger.info(f"Saved {saved_count} agents to disk cache") + + def shutdown(self): + """Shutdown the cache system gracefully.""" + self._shutdown_event.set() + if self._auto_save_thread: + self._auto_save_thread.join(timeout=5) + + # Final save + if self.enable_persistent_cache: + self.save_cache_to_disk() + + logger.info("AgentCache shutdown complete") + + +# Global cache instance +_global_cache: Optional[AgentCache] = None + + +def get_global_cache() -> AgentCache: + """Get or create the global agent cache instance.""" + global _global_cache + if _global_cache is None: + _global_cache = AgentCache() + return _global_cache + + +def cached_agent_loader( + agents: List[Agent], + cache_instance: Optional[AgentCache] = None, + preload: bool = True, + parallel_loading: bool = True, +) -> List[Agent]: + """ + Load a list of agents with caching for super fast performance. + + Args: + agents: List of Agent instances to cache/load + cache_instance: Optional cache instance (uses global cache if None) + preload: Whether to preload agents in background + parallel_loading: Whether to load agents in parallel + + Returns: + List of Agent instances (cached versions if available) + + Examples: + # Basic usage + agents = [Agent(agent_name="Agent1", model_name="gpt-4"), ...] + cached_agents = cached_agent_loader(agents) + + # With custom cache + cache = AgentCache(max_memory_cache_size=256) + cached_agents = cached_agent_loader(agents, cache_instance=cache) + + # Preload for even faster subsequent access + cached_agent_loader(agents, preload=True) + cached_agents = cached_agent_loader(agents) # Super fast! + """ + cache = cache_instance or get_global_cache() + + start_time = time.time() + + # Extract configurations from agents for caching + agent_configs = [] + for agent in agents: + config = _extract_agent_config(agent) + agent_configs.append(config) + + if preload: + # Preload agents in background + cache.preload_agents(agent_configs) + + def _load_single_agent(agent: Agent) -> Agent: + """Load a single agent with caching.""" + config = _extract_agent_config(agent) + + # Try to get from cache first + cached_agent = cache.get_agent(config) + + if cached_agent is None: + # Cache miss - use the provided agent and cache it + load_start = time.time() + + # Add to cache for future use + cache.put_agent(config, agent) + load_time = time.time() - load_start + + logger.debug( + f"Cached new agent {agent.agent_name} in {load_time:.3f}s" + ) + return agent + else: + logger.debug( + f"Retrieved cached agent {cached_agent.agent_name}" + ) + return cached_agent + + # Load agents (parallel or sequential) + if parallel_loading and len(agents) > 1: + max_workers = min(len(agents), os.cpu_count()) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + cached_agents = list( + executor.map(_load_single_agent, agents) + ) + else: + cached_agents = [ + _load_single_agent(agent) for agent in agents + ] + + total_time = time.time() - start_time + + # Log performance stats + stats = cache.get_cache_stats() + logger.info( + f"Processed {len(cached_agents)} agents in {total_time:.3f}s " + f"(Hit rate: {stats['hit_rate_percent']}%)" + ) + + return cached_agents + + +def _extract_agent_config(agent: Agent) -> Dict[str, Any]: + """ + Extract a configuration dictionary from an Agent instance for caching. + + Args: + agent: Agent instance to extract config from + + Returns: + Configuration dictionary suitable for cache key generation + """ + # Extract key attributes that define an agent's identity + config = { + "agent_name": getattr(agent, "agent_name", None), + "model_name": getattr(agent, "model_name", None), + "system_prompt": getattr(agent, "system_prompt", None), + "max_loops": getattr(agent, "max_loops", None), + "temperature": getattr(agent, "temperature", None), + "max_tokens": getattr(agent, "max_tokens", None), + "agent_description": getattr( + agent, "agent_description", None + ), + # Add other key identifying attributes + "tools": str( + getattr(agent, "tools", []) + ), # Convert to string for hashing, default to empty list + "context_length": getattr(agent, "context_length", None), + } + + # Remove None values to create a clean config + config = {k: v for k, v in config.items() if v is not None} + + return config + + +def cached_agent_loader_from_configs( + agent_configs: List[Dict[str, Any]], + cache_instance: Optional[AgentCache] = None, + preload: bool = True, + parallel_loading: bool = True, +) -> List[Agent]: + """ + Load a list of agents from configuration dictionaries with caching. + + Args: + agent_configs: List of agent configuration dictionaries + cache_instance: Optional cache instance (uses global cache if None) + preload: Whether to preload agents in background + parallel_loading: Whether to load agents in parallel + + Returns: + List of Agent instances + + Examples: + # Basic usage + configs = [{"agent_name": "Agent1", "model_name": "gpt-4"}, ...] + agents = cached_agent_loader_from_configs(configs) + + # With custom cache + cache = AgentCache(max_memory_cache_size=256) + agents = cached_agent_loader_from_configs(configs, cache_instance=cache) + """ + cache = cache_instance or get_global_cache() + + start_time = time.time() + + if preload: + # Preload agents in background + cache.preload_agents(agent_configs) + + def _load_single_agent(config: Dict[str, Any]) -> Agent: + """Load a single agent with caching.""" + # Try to get from cache first + agent = cache.get_agent(config) + + if agent is None: + # Cache miss - create new agent + load_start = time.time() + agent = Agent(**config) + load_time = time.time() - load_start + + # Add to cache for future use + cache.put_agent(config, agent) + + logger.debug( + f"Created new agent {agent.agent_name} in {load_time:.3f}s" + ) + + return agent + + # Load agents (parallel or sequential) + if parallel_loading and len(agent_configs) > 1: + max_workers = min(len(agent_configs), os.cpu_count()) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + agents = list( + executor.map(_load_single_agent, agent_configs) + ) + else: + agents = [ + _load_single_agent(config) for config in agent_configs + ] + + total_time = time.time() - start_time + + # Log performance stats + stats = cache.get_cache_stats() + logger.info( + f"Loaded {len(agents)} agents in {total_time:.3f}s " + f"(Hit rate: {stats['hit_rate_percent']}%)" + ) + + return agents + + +# Decorator for caching individual agent creation +def cache_agent_creation(cache_instance: Optional[AgentCache] = None): + """ + Decorator to cache agent creation based on initialization parameters. + + Args: + cache_instance: Optional cache instance (uses global cache if None) + + Returns: + Decorator function + + Example: + @cache_agent_creation() + def create_trading_agent(symbol: str, model: str): + return Agent( + agent_name=f"Trading-{symbol}", + model_name=model, + system_prompt=f"You are a trading agent for {symbol}" + ) + + agent1 = create_trading_agent("AAPL", "gpt-4") # Creates new agent + agent2 = create_trading_agent("AAPL", "gpt-4") # Returns cached agent + """ + + def decorator(func: Callable[..., Agent]) -> Callable[..., Agent]: + cache = cache_instance or get_global_cache() + + @wraps(func) + def wrapper(*args, **kwargs) -> Agent: + # Create a config dict from function arguments + import inspect + + sig = inspect.signature(func) + bound_args = sig.bind(*args, **kwargs) + bound_args.apply_defaults() + + config = dict(bound_args.arguments) + + # Try to get from cache + agent = cache.get_agent(config) + + if agent is None: + # Cache miss - call original function + agent = func(*args, **kwargs) + cache.put_agent(config, agent) + + return agent + + return wrapper + + return decorator + + +# LRU Cache-based simple approach +@lru_cache(maxsize=128) +def _cached_agent_by_hash( + config_hash: str, config_json: str +) -> Agent: + """Internal LRU cached agent creation by config hash.""" + config = json.loads(config_json) + return Agent(**config) + + +def simple_lru_agent_loader( + agents: List[Agent], +) -> List[Agent]: + """ + Simple LRU cache-based agent loader using functools.lru_cache. + + Args: + agents: List of Agent instances + + Returns: + List of Agent instances (cached versions if available) + + Note: + This is a simpler approach but less flexible than the full AgentCache. + """ + cached_agents = [] + + for agent in agents: + # Extract config from agent + config = _extract_agent_config(agent) + + # Create stable hash and JSON string + config_json = json.dumps(config, sort_keys=True, default=str) + config_hash = hashlib.md5(config_json.encode()).hexdigest() + + # Use LRU cached function + cached_agent = _cached_agent_by_hash_from_agent( + config_hash, agent + ) + cached_agents.append(cached_agent) + + return cached_agents + + +@lru_cache(maxsize=128) +def _cached_agent_by_hash_from_agent( + config_hash: str, agent: Agent +) -> Agent: + """Internal LRU cached agent storage by config hash.""" + # Return the same agent instance (this creates the caching effect) + return agent + + +# Utility functions for cache management +def clear_agent_cache(): + """Clear the global agent cache.""" + cache = get_global_cache() + cache.clear_cache() + + +def get_agent_cache_stats() -> Dict[str, Any]: + """Get statistics from the global agent cache.""" + cache = get_global_cache() + return cache.get_cache_stats() + + +def shutdown_agent_cache(): + """Shutdown the global agent cache gracefully.""" + global _global_cache + if _global_cache: + _global_cache.shutdown() + _global_cache = None diff --git a/tests/utils/test_litellm_wrapper.py b/tests/utils/test_litellm_wrapper.py index a0a740f2..23d2a10b 100644 --- a/tests/utils/test_litellm_wrapper.py +++ b/tests/utils/test_litellm_wrapper.py @@ -292,7 +292,7 @@ def run_test_suite(): assert isinstance(is_url_direct, bool) assert isinstance(is_local_direct, bool) assert ( - is_local_direct == False + is_local_direct is False ) # Local files should never use direct URL log_test_result("Local vs URL Detection", True)