Implement Agent Instance Caching Mechanism for ReasoningAgentRouter

pull/915/head
Wxysnx 3 weeks ago
parent d3a28edd77
commit eb2fb67a51

@ -1,4 +1,8 @@
from typing import List, Literal, Dict, Callable, Any
from typing import List, Literal, Dict, Callable, Any, Optional, Tuple, Hashable
from functools import lru_cache
from swarms.agents.consistency_agent import SelfConsistencyAgent
from swarms.agents.flexion_agent import ReflexionAgent
@ -10,6 +14,7 @@ from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.utils.output_types import OutputType
from swarms.agents.agent_judge import AgentJudge
agent_types = Literal[
"reasoning-duo",
"self-consistency",
@ -22,10 +27,12 @@ agent_types = Literal[
"AgentJudge",
]
class ReasoningAgentRouter:
"""
A Reasoning Agent that can answer questions and assist with various tasks using different reasoning strategies.
Attributes:
agent_name (str): The name of the agent.
description (str): A brief description of the agent's capabilities.
@ -37,6 +44,11 @@ class ReasoningAgentRouter:
output_type (OutputType): The format of the output (e.g., dict, list).
"""
# Class variable to store cached agent instances
_agent_cache: Dict[Tuple[Hashable, ...], Any] = {}
def __init__(
self,
agent_name: str = "reasoning_agent",
@ -61,25 +73,25 @@ class ReasoningAgentRouter:
self.num_knowledge_items = num_knowledge_items
self.memory_capacity = memory_capacity
# Added: Initialize the factory mapping dictionary
# Initialize agent factory mapping dictionary
self._initialize_agent_factories()
# Added: Factory method initialization function
def _initialize_agent_factories(self) -> None:
"""
Initialize the agent factory mapping dictionary, mapping various agent types to their respective creation functions.
This method replaces the original if-elif chain, making the code easier to maintain and extend.
This method replaces the original if-elif chain, making the code more maintainable and extensible.
"""
self.agent_factories: Dict[str, Callable[[], Any]] = {
# ReasoningDuo factory methods
# ReasoningDuo factory method
"reasoning-duo": self._create_reasoning_duo,
"reasoning-agent": self._create_reasoning_duo,
# SelfConsistencyAgent factory methods
# SelfConsistencyAgent factory method
"self-consistency": self._create_consistency_agent,
"consistency-agent": self._create_consistency_agent,
# IREAgent factory methods
# IREAgent factory method
"ire": self._create_ire_agent,
"ire-agent": self._create_ire_agent,
@ -89,9 +101,32 @@ class ReasoningAgentRouter:
"GKPAgent": self._create_gkp_agent
}
# Added: Concrete factory methods for various agent types
def _get_cache_key(self) -> Tuple[Hashable, ...]:
"""
Generate a unique key for cache lookup.
The key is based on all relevant configuration parameters of the agent.
Returns:
Tuple[Hashable, ...]: A hashable tuple to serve as the cache key
"""
return (
self.swarm_type,
self.agent_name,
self.description,
self.model_name,
self.system_prompt,
self.max_loops,
self.num_samples,
self.output_type,
self.num_knowledge_items,
self.memory_capacity
)
def _create_reasoning_duo(self):
"""Creates an agent instance for ReasoningDuo type"""
"""Create an agent instance for the ReasoningDuo type"""
return ReasoningDuo(
agent_name=self.agent_name,
agent_description=self.description,
@ -101,7 +136,7 @@ class ReasoningAgentRouter:
)
def _create_consistency_agent(self):
"""Creates an agent instance for SelfConsistencyAgent type"""
"""Create an agent instance for the SelfConsistencyAgent type"""
return SelfConsistencyAgent(
agent_name=self.agent_name,
description=self.description,
@ -113,7 +148,7 @@ class ReasoningAgentRouter:
)
def _create_ire_agent(self):
"""Creates an agent instance for IREAgent type"""
"""Create an agent instance for the IREAgent type"""
return IREAgent(
agent_name=self.agent_name,
description=self.description,
@ -125,7 +160,7 @@ class ReasoningAgentRouter:
)
def _create_agent_judge(self):
"""Creates an agent instance for AgentJudge type"""
"""Create an agent instance for the AgentJudge type"""
return AgentJudge(
agent_name=self.agent_name,
model_name=self.model_name,
@ -134,7 +169,7 @@ class ReasoningAgentRouter:
)
def _create_reflexion_agent(self):
"""Creates an agent instance for ReflexionAgent type"""
"""Create an agent instance for the ReflexionAgent type"""
return ReflexionAgent(
agent_name=self.agent_name,
system_prompt=self.system_prompt,
@ -143,118 +178,81 @@ class ReasoningAgentRouter:
)
def _create_gkp_agent(self):
"""Creates an agent instance for GKPAgent type"""
"""Create an agent instance for the GKPAgent type"""
return GKPAgent(
agent_name=self.agent_name,
model_name=self.model_name,
num_knowledge_items=self.num_knowledge_items,
)
def select_swarm(self):
"""
Selects and initializes the appropriate reasoning swarm based on the specified swarm type.
Select and initialize the appropriate reasoning swarm based on the specified swarm type.
Uses a caching mechanism to return a cached instance if an agent with the same configuration already exists.
Returns:
An instance of the selected reasoning swarm.
"""
# Commented out original if-elif chain implementation
"""
if (
self.swarm_type == "reasoning-duo"
or self.swarm_type == "reasoning-agent"
):
return ReasoningDuo(
agent_name=self.agent_name,
agent_description=self.description,
model_name=[self.model_name, self.model_name],
system_prompt=self.system_prompt,
output_type=self.output_type,
)
elif (
self.swarm_type == "self-consistency"
or self.swarm_type == "consistency-agent"
):
return SelfConsistencyAgent(
agent_name=self.agent_name,
description=self.description,
model_name=self.model_name,
system_prompt=self.system_prompt,
max_loops=self.max_loops,
num_samples=self.num_samples,
output_type=self.output_type,
)
elif (
self.swarm_type == "ire" or self.swarm_type == "ire-agent"
):
return IREAgent(
agent_name=self.agent_name,
description=self.description,
model_name=self.model_name,
system_prompt=self.system_prompt,
max_loops=self.max_loops,
max_iterations=self.num_samples,
output_type=self.output_type,
)
elif self.swarm_type == "AgentJudge":
return AgentJudge(
agent_name=self.agent_name,
model_name=self.model_name,
system_prompt=self.system_prompt,
max_loops=self.max_loops,
)
elif self.swarm_type == "ReflexionAgent":
return ReflexionAgent(
agent_name=self.agent_name,
system_prompt=self.system_prompt,
model_name=self.model_name,
max_loops=self.max_loops,
)
elif self.swarm_type == "GKPAgent":
return GKPAgent(
agent_name=self.agent_name,
model_name=self.model_name,
num_knowledge_items=self.num_knowledge_items,
)
else:
raise ValueError(f"Invalid swarm type: {self.swarm_type}")
The selected reasoning swarm instance.
"""
# Generate cache key
cache_key = self._get_cache_key()
# Check if an instance with the same configuration already exists in the cache
if cache_key in self.__class__._agent_cache:
return self.__class__._agent_cache[cache_key]
# Added: Implementation using factory pattern and dictionary mapping
try:
# Get the corresponding creation function from the factory dictionary and call it
return self.agent_factories[self.swarm_type]()
# Use the factory method to create a new instance
agent = self.agent_factories[self.swarm_type]()
# Add the newly created instance to the cache
self.__class__._agent_cache[cache_key] = agent
return agent
except KeyError:
# Maintain the same error handling as the original code
# Keep the same error handling as the original code
raise ValueError(f"Invalid swarm type: {self.swarm_type}")
def run(self, task: str, *args, **kwargs):
"""
Executes the selected swarm's reasoning process on the given task.
Execute the reasoning process of the selected swarm on a given task.
Args:
task (str): The task or question to be processed by the reasoning agent.
Returns:
The result of the reasoning process.
"""
swarm = self.select_swarm()
return swarm.run(task=task)
def batched_run(self, tasks: List[str], *args, **kwargs):
"""
Executes the reasoning process on a batch of tasks.
Execute the reasoning process on a batch of tasks.
Args:
tasks (List[str]): A list of tasks to be processed.
tasks (List[str]): The list of tasks to process.
Returns:
List of results from the reasoning process for each task.
A list of reasoning process results for each task.
"""
results = []
for task in tasks:
results.append(self.run(task, *args, **kwargs))
return results
@classmethod
def clear_cache(cls):
"""
Clear the agent instance cache.
Use this when you need to free memory or force the creation of new instances.
"""
cls._agent_cache.clear()
Loading…
Cancel
Save