diff --git a/swarms/structs/agent_registry.py b/swarms/structs/agent_registry.py index 9477d3ea..cbede884 100644 --- a/swarms/structs/agent_registry.py +++ b/swarms/structs/agent_registry.py @@ -1,4 +1,4 @@ -import time + import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock from typing import Any, Callable, Dict, List, Optional @@ -8,31 +8,49 @@ from pydantic import BaseModel, Field, ValidationError from swarms import Agent from swarms.utils.loguru_logger import logger + class AgentConfigSchema(BaseModel): uuid: str = Field( - ..., description="The unique identifier for the agent." + ..., + description="The unique identifier for the agent.", ) name: str = None description: str = None time_added: str = Field( time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), - description="Time when the agent was added to the registry." + description="Time when the agent was added to the registry.", ) config: Dict[Any, Any] = None + class AgentRegistrySchema(BaseModel): name: str description: str agents: List[AgentConfigSchema] - time_registry_created: str = Field( + time_registry_creatd: str = Field( time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), - description="Time when the registry was created." + description="Time when the registry was created.", ) number_of_agents: int = Field( - 0, description="The number of agents in the registry." + 0, + description="The number of agents in the registry.", ) + class AgentRegistry: + """ + A class for managing a registry of agents. + + Attributes: + name (str): The name of the registry. + description (str): A description of the registry. + return_json (bool): Indicates whether to return data in JSON format. + auto_save (bool): Indicates whether to automatically save changes to the registry. + agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name. + lock (Lock): A lock for thread-safe operations on the registry. + agent_registry (AgentRegistrySchema): The schema for the agent registry. + """ + def __init__( self, name: str = "Agent Registry", @@ -43,24 +61,188 @@ class AgentRegistry: *args, **kwargs, ): + """ + Initializes the AgentRegistry. + + Args: + name (str, optional): The name of the registry. Defaults to "Agent Registry". + description (str, optional): A description of the registry. Defaults to "A registry for managing agents.". + agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None. + return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True. + auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False. + """ self.name = name self.description = description self.return_json = return_json self.auto_save = auto_save self.agents: Dict[str, Agent] = {} self.lock = Lock() - + + # Initialize the agent registry self.agent_registry = AgentRegistrySchema( name=self.name, description=self.description, agents=[], number_of_agents=len(agents) if agents else 0, ) - + if agents: self.add_many(agents) + def add(self, agent: Agent) -> None: + """ + Adds a new agent to the registry. + + Args: + agent (Agent): The agent to add. + + Raises: + ValueError: If the agent_name already exists in the registry or is invalid. + ValidationError: If the input data is invalid. + """ + name = agent.agent_name + + # Input validation for agent_name + if not name or not isinstance(name, str): + logger.error("Invalid agent name provided.") + raise ValueError("Invalid agent name provided.") + + self.agent_to_py_model(agent) + + with self.lock: + if name in self.agents: + logger.error( + f"Agent with name {name} already exists." + ) + raise ValueError( + f"Agent with name {name} already exists." + ) + try: + self.agents[name] = agent + logger.info(f"Agent {name} added successfully.") + except ValidationError as e: + logger.error(f"Validation error: {e}") + raise + + def add_many(self, agents: List[Agent]) -> None: + """ + Adds multiple agents to the registry. + + Args: + agents (List[Agent]): The list of agents to add. + + Raises: + ValueError: If any of the agent_names already exist in the registry. + ValidationError: If the input data is invalid. + """ + with ThreadPoolExecutor() as executor: + futures = { + executor.submit(self.add, agent): agent + for agent in agents + } + for future in as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Error adding agent: {e}") + raise + + def delete(self, agent_name: str) -> None: + """ + Deletes an agent from the registry. + + Args: + agent_name (str): The name of the agent to delete. + + Raises: + KeyError: If the agent_name does not exist in the registry. + """ + with self.lock: + try: + del self.agents[agent_name] + logger.info( + f"Agent {agent_name} deleted successfully." + ) + except KeyError as e: + logger.error(f"Error: {e}") + raise + + def update_agent(self, agent_name: str, new_agent: Agent) -> None: + """ + Updates an existing agent in the registry. + + Args: + agent_name (str): The name of the agent to update. + new_agent (Agent): The new agent to replace the existing one. + + Raises: + KeyError: If the agent_name does not exist in the registry. + ValidationError: If the input data is invalid. + """ + with self.lock: + if agent_name not in self.agents: + logger.error( + f"Agent with name {agent_name} does not exist." + ) + raise KeyError( + f"Agent with name {agent_name} does not exist." + ) + try: + self.agents[agent_name] = new_agent + logger.info( + f"Agent {agent_name} updated successfully." + ) + except ValidationError as e: + logger.error(f"Validation error: {e}") + raise + + def get(self, agent_name: str) -> Agent: + """ + Retrieves an agent from the registry. + + Args: + agent_name (str): The name of the agent to retrieve. + + Returns: + Agent: The agent associated with the given agent_name. + + Raises: + KeyError: If the agent_name does not exist in the registry. + """ + with self.lock: + try: + agent = self.agents[agent_name] + logger.info( + f"Agent {agent_name} retrieved successfully." + ) + return agent + except KeyError as e: + logger.error(f"Error: {e}") + raise + + def list_agents(self) -> List[str]: + """ + Lists all agent names in the registry. + + Returns: + List[str]: A list of all agent names. + """ + try: + with self.lock: + agent_names = list(self.agents.keys()) + logger.info("Listing all agents.") + return agent_names + except Exception as e: + logger.error(f"Error: {e}") + raise e + def return_all_agents(self) -> List[Agent]: + """ + Returns all agents from the registry. + + Returns: + List[Agent]: A list of all agents. + """ try: with self.lock: agents = list(self.agents.values()) @@ -69,3 +251,85 @@ class AgentRegistry: except Exception as e: logger.error(f"Error: {e}") raise e + + def query( + self, condition: Optional[Callable[[Agent], bool]] = None + ) -> List[Agent]: + """ + Queries agents based on a condition. + + Args: + condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating + whether the agent meets the condition. + + Returns: + List[Agent]: A list of agents that meet the condition. + """ + try: + with self.lock: + if condition is None: + agents = list(self.agents.values()) + logger.info("Querying all agents.") + return agents + + agents = [ + agent + for agent in self.agents.values() + if condition(agent) + ] + logger.info("Querying agents with condition.") + return agents + except Exception as e: + logger.error(f"Error: {e}") + raise e + + def find_agent_by_name(self, agent_name: str) -> Optional[Agent]: + """ + Find an agent by its name. + + Args: + agent_name (str): The name of the agent to find. + + Returns: + Agent: The agent with the given name. + """ + try: + with ThreadPoolExecutor() as executor: + futures = { + executor.submit(self.get, agent_name): agent_name + for agent_name in self.agents.keys() + } + for future in as_completed(futures): + agent = future.result() + if agent.agent_name == agent_name: + return agent + except Exception as e: + logger.error(f"Error: {e}") + raise e + + def agent_to_py_model(self, agent: Agent): + """ + Converts an agent to a Pydantic model. + + Args: + agent (Agent): The agent to convert. + """ + agent_name = agent.agent_name + agent_description = ( + agent.description + if agent.description + else "No description provided" + ) + + schema = AgentConfigSchema( + uuid=agent.id, + name=agent_name, + description=agent_description, + config=agent.to_dict(), + ) + + logger.info( + f"Agent {agent_name} converted to Pydantic model." + ) + + self.agent_registry.agents.append(schema)