From e9b6001b3ac3a9ab683ccb31fdf41f959ae2c760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A5=A5=E5=AE=87?= <625024108@qq.com> Date: Tue, 22 Jul 2025 17:58:49 +0800 Subject: [PATCH] Reusing agent instances --- examples/agents/example_reasoningduo.py | 97 ++++++++ swarms/agents/reasoning_duo.py | 287 ++++++++++++++++++++---- 2 files changed, 346 insertions(+), 38 deletions(-) create mode 100644 examples/agents/example_reasoningduo.py diff --git a/examples/agents/example_reasoningduo.py b/examples/agents/example_reasoningduo.py new file mode 100644 index 00000000..1b6df588 --- /dev/null +++ b/examples/agents/example_reasoningduo.py @@ -0,0 +1,97 @@ +from swarms.agents.reasoning_duo import ReasoningDuo +from loguru import logger +import time +from dotenv import load_dotenv + + +# Load environment variables from .env file +load_dotenv() + +# Configure loguru to display more detailed information +logger.remove() +logger.add(lambda msg: print(msg), level="DEBUG") + +print("===== agent pool mechanism demonstration (Claude model) =====") + +# Example 1: When creating a agent for the first time, a new agent instance will be created and added to the pool +print("\n[example1]: Creating the first ReasoningDuo instance (Configuration A)") +duo_a1 = ReasoningDuo( + system_prompt="You are a helpful assistant that can answer questions and help with tasks.", + model_names=["claude-3-5-haiku-20241022", "claude-3-7-sonnet-20250219"], + reasoning_model_name="claude-3-5-haiku-20241022", + agent_name="finance-agent" +) + +# use first agent to run a task +print("\n[Example 1] Run a task using the first agent:") +result = duo_a1.run( + "What is the best possible financial strategy to maximize returns but minimize risk? Give a list of etfs to invest in and the percentage of the portfolio to allocate to each etf." +) + +# example2: Create a second instance of the same configuration while the first instance is still in use +print("\n[example2]: Creating the second ReasoningDuo instance (Configuration A)") +duo_a2 = ReasoningDuo( + system_prompt="You are a helpful assistant that can answer questions and help with tasks.", + model_names=["claude-3-5-haiku-20241022", "claude-3-7-sonnet-20250219"], + reasoning_model_name="claude-3-5-haiku-20241022", + agent_name="finance-agent" +) + +# example3: Create a third instance with a different configuration, which will create a new agent +print("\n[example3]: Creating a ReasoningDuo instance with a different configuration:") +duo_b = ReasoningDuo( + system_prompt="You are an expert financial advisor helping with investment strategies.", + model_names=["claude-3-5-haiku-20241022", "claude-3-5-sonnet-20241022"], # 不同的模型组合 + reasoning_model_name="claude-3-5-haiku-20241022", + agent_name="expert-finance-agent" +) + +# example4: Disable agent pool reuse +print("\n[example4]: Creating a ReasoningDuo instance with agent reuse disabled:") +duo_c = ReasoningDuo( + system_prompt="You are a helpful assistant that can answer questions and help with tasks.", + model_names=["claude-3-5-haiku-20241022", "claude-3-7-sonnet-20250219"], + reasoning_model_name="claude-3-5-haiku-20241022", + agent_name="finance-agent", + reuse_agents=False # Disable agent reuse +) + +# Release the first instance's agent back to the pool +print("\n[example5]: Releasing the first instance's agent back to the pool:") +del duo_a1 # This will trigger the __del__ method and release the agent back to the pool + +# Wait for a second to ensure cleanup is complete +time.sleep(1) + +# Example 6: Create a new instance with the same configuration as the first one, which should reuse the agent from the pool +print("\n[example6]: Creating a new instance with the same configuration as the first one, which should reuse the agent from the pool:") +duo_a3 = ReasoningDuo( + system_prompt="You are a helpful assistant that can answer questions and help with tasks.", + model_names=["claude-3-5-haiku-20241022", "claude-3-7-sonnet-20250219"], + reasoning_model_name="claude-3-5-haiku-20241022", + agent_name="finance-agent" +) + +# use batched_run to execute tasks +print("\n[example7]: Using the reused agent to run batched tasks:") +results = duo_a3.batched_run( + [ + "What is the best possible financial strategy for a conservative investor nearing retirement?", + "What is the best possible financial strategy for a young investor with high risk tolerance?" + ] +) + +# Configure pool parameters +print("\n[example8]: Configuring agent pool parameters:") +ReasoningDuo.configure_pool(cleanup_interval=180, max_idle_time=600) + +# Display current pool status +print("\n[example9]: Displaying current pool status:") +print(f"Reasoning agent pool size: {len(ReasoningDuo._reasoning_agent_pool)}") +print(f"Main agent pool size: {len(ReasoningDuo._main_agent_pool)}") + +# Clear all pools +print("\n[example10]: Clearing all agent pools:") +ReasoningDuo.clear_pools() +print(f"Reasoning agent pool size after clearing: {len(ReasoningDuo._reasoning_agent_pool)}") +print(f"Main agent pool size after clearing: {len(ReasoningDuo._main_agent_pool)}") \ No newline at end of file diff --git a/swarms/agents/reasoning_duo.py b/swarms/agents/reasoning_duo.py index 57ae9849..f511deb3 100644 --- a/swarms/agents/reasoning_duo.py +++ b/swarms/agents/reasoning_duo.py @@ -1,7 +1,5 @@ -from typing import List - +from typing import List, Optional, Dict from loguru import logger - from swarms.prompts.reasoning_prompt import REASONING_PROMPT from swarms.structs.agent import Agent from swarms.utils.output_types import OutputType @@ -9,11 +7,15 @@ from swarms.structs.conversation import Conversation from swarms.utils.history_output_formatter import ( history_output_formatter, ) - +import uuid +import time +import threading class ReasoningDuo: """ ReasoningDuo is a class that encapsulates the functionality of two agents: a reasoning agent and a main agent. + + Implements an agent pool mechanism to reuse agent instances between calls. Attributes: model_name (str): The name of the model used for the reasoning agent. @@ -23,87 +25,296 @@ class ReasoningDuo: reasoning_agent (Agent): An instance of the Agent class for reasoning tasks. main_agent (Agent): An instance of the Agent class for main tasks. """ + + # Class-level agent pools + _reasoning_agent_pool: Dict[str, Dict] = {} # key: config_key, value: {agent, last_used, in_use} + _main_agent_pool: Dict[str, Dict] = {} # key: config_key, value: {agent, last_used, in_use} + _pool_lock = threading.RLock() # Thread-safe lock for pool access + _pool_cleanup_interval = 300 # 5 minutes + _pool_max_idle_time = 1800 # 30 minutes + _last_cleanup_time = 0 + + @classmethod + def _generate_agent_config_key(cls, agent_type, model_name, system_prompt, **kwargs): + """Generate a unique key for an agent configuration""" + # Include essential parameters that affect agent behavior + key_parts = [ + agent_type, + model_name, + system_prompt + ] + # Add other important configuration parameters + for k in sorted(kwargs.keys()): + if k in ['max_loops', 'dynamic_temperature_enabled', 'streaming', 'output_type']: + key_parts.append(f"{k}={kwargs[k]}") + + return ":".join(str(part) for part in key_parts) + + @classmethod + def _get_agent_from_pool(cls, agent_type, config_key, create_func): + """Get an agent from the pool or create a new one if needed""" + with cls._pool_lock: + pool = cls._reasoning_agent_pool if agent_type == "reasoning" else cls._main_agent_pool + + # Periodic cleanup of idle agents + current_time = time.time() + if current_time - cls._last_cleanup_time > cls._pool_cleanup_interval: + cls._cleanup_idle_agents() + cls._last_cleanup_time = current_time + + # Try to find an available agent with matching configuration + if config_key in pool and not pool[config_key]["in_use"]: + pool[config_key]["in_use"] = True + pool[config_key]["last_used"] = time.time() + logger.debug(f"Reusing {agent_type} agent from pool with config: {config_key}") + return pool[config_key]["agent"] + + # Create a new agent if none available + logger.debug(f"Creating new {agent_type} agent with config: {config_key}") + new_agent = create_func() + pool[config_key] = { + "agent": new_agent, + "last_used": time.time(), + "in_use": True + } + return new_agent + + @classmethod + def _release_agent_to_pool(cls, agent_type, config_key): + """Release an agent back to the pool""" + with cls._pool_lock: + pool = cls._reasoning_agent_pool if agent_type == "reasoning" else cls._main_agent_pool + if config_key in pool: + pool[config_key]["in_use"] = False + pool[config_key]["last_used"] = time.time() + logger.debug(f"Released {agent_type} agent back to pool: {config_key}") + + @classmethod + def _cleanup_idle_agents(cls): + """Clean up agents that have been idle for too long""" + with cls._pool_lock: + current_time = time.time() + + for pool in [cls._reasoning_agent_pool, cls._main_agent_pool]: + keys_to_remove = [] + + for key, data in pool.items(): + # Only remove if not in use and idle for too long + if not data["in_use"] and (current_time - data["last_used"] > cls._pool_max_idle_time): + keys_to_remove.append(key) + + for key in keys_to_remove: + logger.debug(f"Removing idle agent from pool: {key}") + del pool[key] + + @classmethod + def configure_pool(cls, cleanup_interval=None, max_idle_time=None): + """Configure the agent pool parameters""" + with cls._pool_lock: + if cleanup_interval is not None: + cls._pool_cleanup_interval = max(60, cleanup_interval) # Minimum 1 minute + if max_idle_time is not None: + cls._pool_max_idle_time = max(300, max_idle_time) # Minimum 5 minutes + + @classmethod + def clear_pools(cls): + """Clear all agent pools (useful for testing or memory management)""" + with cls._pool_lock: + cls._reasoning_agent_pool.clear() + cls._main_agent_pool.clear() def __init__( self, + id: str = str(uuid.uuid4()), agent_name: str = "reasoning-agent-01", agent_description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.", model_name: str = "gpt-4o-mini", description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.", model_names: list[str] = ["gpt-4o-mini", "gpt-4o"], system_prompt: str = "You are a helpful assistant that can answer questions and help with tasks.", - output_type: OutputType = "dict", + output_type: OutputType = "dict-all-except-first", + reasoning_model_name: Optional[ + str + ] = "claude-3-5-sonnet-20240620", + max_loops: int = 1, + reuse_agents: bool = True, # New parameter to control agent reuse + *args, + **kwargs, ): + self.id = id self.agent_name = agent_name self.agent_description = agent_description self.model_name = model_name self.description = description self.output_type = output_type - self.reasoning_agent = Agent( - agent_name="Your", - description="A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.", - system_prompt=REASONING_PROMPT, - max_loops=1, - model_name=model_names[0], - dynamic_temperature_enabled=True, + self.reasoning_model_name = reasoning_model_name + self.max_loops = max_loops + self.reuse_agents = reuse_agents + self.args = args + self.kwargs = kwargs + + if self.reasoning_model_name is None: + self.reasoning_model_name = model_names[0] + + self.conversation = Conversation() + + # Create a complete configuration for the reasoning agent + reasoning_full_config = { + "agent_name": self.agent_name, + "description": self.agent_description, + "system_prompt": REASONING_PROMPT, + "max_loops": 1, + "model_name": self.reasoning_model_name, + "dynamic_temperature_enabled": True + } + + # Create a complete configuration for the main agent + main_full_config = { + "agent_name": self.agent_name, + "description": self.agent_description, + "system_prompt": system_prompt, + "max_loops": 1, + "model_name": model_names[1], + "dynamic_temperature_enabled": True + } + + + for k, v in kwargs.items(): + if k not in ["system_prompt", "model_name"]: + reasoning_full_config[k] = v + main_full_config[k] = v + + # To generate the configuration keys we need to extract the parameters (excluding those that have been explicitly passed) + reasoning_extra_params = { + k: v for k, v in reasoning_full_config.items() + if k not in ["system_prompt", "model_name"] + } + + main_extra_params = { + k: v for k, v in main_full_config.items() + if k not in ["system_prompt", "model_name"] + } + + # generate the configuration keys + self.reasoning_config_key = self._generate_agent_config_key( + "reasoning", + self.reasoning_model_name, + REASONING_PROMPT, + **reasoning_extra_params + ) + + self.main_config_key = self._generate_agent_config_key( + "main", + model_names[1], + system_prompt, + **main_extra_params ) + + # Get the agent instance + if self.reuse_agents: + self.reasoning_agent = self._get_agent_from_pool( + "reasoning", + self.reasoning_config_key, + lambda: Agent(**reasoning_full_config) + ) + + self.main_agent = self._get_agent_from_pool( + "main", + self.main_config_key, + lambda: Agent(**main_full_config) + ) + else: + # If reuse is disabled, create a new agent directly + self.reasoning_agent = Agent(**reasoning_full_config) + self.main_agent = Agent(**main_full_config) - self.main_agent = Agent( - agent_name=self.agent_name, - description=self.agent_description, - system_prompt=system_prompt, - max_loops=1, - model_name=model_names[1], - dynamic_temperature_enabled=True, + def __del__(self): + """Release agents back to the pool when instance is destroyed""" + if hasattr(self, 'reuse_agents') and self.reuse_agents: + if hasattr(self, 'reasoning_config_key'): + self._release_agent_to_pool("reasoning", self.reasoning_config_key) + if hasattr(self, 'main_config_key'): + self._release_agent_to_pool("main", self.main_config_key) + + def step(self, task: str, img: Optional[str] = None): + """ + Executes one step of reasoning and main agent processing. + + Args: + task (str): The task to be processed. + img (Optional[str]): Optional image input. + """ + # For reasoning agent, use the current task (which may include conversation context) + output_reasoner = self.reasoning_agent.run(task, img=img) + self.conversation.add( + role=self.reasoning_agent.agent_name, + content=output_reasoner, ) - self.conversation = Conversation() + # For main agent, always use the full conversation context + output_main = self.main_agent.run( + task=self.conversation.get_str(), img=img + ) + self.conversation.add( + role=self.main_agent.agent_name, content=output_main + ) - def run(self, task: str): + def run(self, task: str, img: Optional[str] = None): """ Executes the reasoning and main agents on the provided task. Args: task (str): The task to be processed by the agents. + img (Optional[str]): Optional image input. Returns: str: The output from the main agent after processing the task. """ - logger.info(f"Running task: {task}") + logger.info( + f"Running task: {task} with max_loops: {self.max_loops}" + ) self.conversation.add(role="user", content=task) - output_reasoner = self.reasoning_agent.run(task) - - self.conversation.add( - role=self.reasoning_agent.agent_name, - content=output_reasoner, - ) + for loop_iteration in range(self.max_loops): + logger.info( + f"Loop iteration {loop_iteration + 1}/{self.max_loops}" + ) - prompt = f"Task: {task} \n\n Your thoughts: {output_reasoner}" + if loop_iteration == 0: + # First iteration: use original task + current_task = task + else: + # Subsequent iterations: use task with context of previous reasoning + current_task = f"Continue reasoning and refining your analysis. Original task: {task}\n\nPrevious conversation context:\n{self.conversation.get_str()}" - output_main = self.main_agent.run(prompt) - - self.conversation.add( - role=self.main_agent.agent_name, content=output_main - ) + self.step(task=current_task, img=img) return history_output_formatter( self.conversation, self.output_type ) - def batched_run(self, tasks: List[str]): + def batched_run( + self, tasks: List[str], imgs: Optional[List[str]] = None + ): """ Executes the run method for a list of tasks. Args: tasks (list[str]): A list of tasks to be processed. + imgs (Optional[List[str]]): Optional list of images corresponding to tasks. Returns: list: A list of outputs from the main agent for each task. """ outputs = [] - for task in tasks: + + # Handle case where imgs is None + if imgs is None: + imgs = [None] * len(tasks) + + for task, img in zip(tasks, imgs): logger.info(f"Processing task: {task}") - outputs.append(self.run(task)) - return outputs + outputs.append(self.run(task, img=img)) + + return outputs \ No newline at end of file