diff --git a/swarms/structs/agent_registry.py b/swarms/structs/agent_registry.py index 3bdaa42f..9477d3ea 100644 --- a/swarms/structs/agent_registry.py +++ b/swarms/structs/agent_registry.py @@ -8,49 +8,31 @@ from pydantic import BaseModel, Field, ValidationError from swarms import Agent from swarms.utils.loguru_logger import logger - class AgentConfigSchema(BaseModel): uuid: str = Field( - ..., - description="The unique identifier for the agent.", + ..., description="The unique identifier for the agent." ) name: str = None description: str = None time_added: str = Field( time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), - description="Time when the agent was added to the registry.", + description="Time when the agent was added to the registry." ) config: Dict[Any, Any] = None - class AgentRegistrySchema(BaseModel): name: str description: str agents: List[AgentConfigSchema] - time_registry_creatd: str = Field( + time_registry_created: str = Field( time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), - description="Time when the registry was created.", + description="Time when the registry was created." ) number_of_agents: int = Field( - 0, - description="The number of agents in the registry.", + 0, description="The number of agents in the registry." ) - class AgentRegistry: - """ - A class for managing a registry of agents. - - Attributes: - name (str): The name of the registry. - description (str): A description of the registry. - return_json (bool): Indicates whether to return data in JSON format. - auto_save (bool): Indicates whether to automatically save changes to the registry. - agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name. - lock (Lock): A lock for thread-safe operations on the registry. - agent_registry (AgentRegistrySchema): The schema for the agent registry. - """ - def __init__( self, name: str = "Agent Registry", @@ -61,275 +43,29 @@ class AgentRegistry: *args, **kwargs, ): - """ - Initializes the AgentRegistry. - - Args: - name (str, optional): The name of the registry. Defaults to "Agent Registry". - description (str, optional): A description of the registry. Defaults to "A registry for managing agents.". - agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None. - return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True. - auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False. - """ self.name = name self.description = description self.return_json = return_json self.auto_save = auto_save self.agents: Dict[str, Agent] = {} self.lock = Lock() - - # Initialize the agent registry + self.agent_registry = AgentRegistrySchema( name=self.name, description=self.description, agents=[], number_of_agents=len(agents) if agents else 0, ) - + if agents: self.add_many(agents) - def add(self, agent: Agent) -> None: - """ - Adds a new agent to the registry. - - Args: - agent (Agent): The agent to add. - - Raises: - ValueError: If the agent_name already exists in the registry or is invalid. - ValidationError: If the input data is invalid. - """ - name = agent.agent_name - - # Input validation for agent_name - if not name or not isinstance(name, str): - logger.error("Invalid agent name provided.") - raise ValueError("Invalid agent name provided.") - - self.agent_to_py_model(agent) - - with self.lock: - if name in self.agents: - logger.error( - f"Agent with name {name} already exists." - ) - raise ValueError( - f"Agent with name {name} already exists." - ) - try: - self.agents[name] = agent - logger.info(f"Agent {name} added successfully.") - except ValidationError as e: - logger.error(f"Validation error: {e}") - raise - - def add_many(self, agents: List[Agent]) -> None: - """ - Adds multiple agents to the registry. - - Args: - agents (List[Agent]): The list of agents to add. - - Raises: - ValueError: If any of the agent_names already exist in the registry. - ValidationError: If the input data is invalid. - """ - with ThreadPoolExecutor() as executor: - futures = { - executor.submit(self.add, agent): agent - for agent in agents - } - for future in as_completed(futures): - try: - future.result() - except Exception as e: - logger.error(f"Error adding agent: {e}") - raise - - def delete(self, agent_name: str) -> None: - """ - Deletes an agent from the registry. - - Args: - agent_name (str): The name of the agent to delete. - - Raises: - KeyError: If the agent_name does not exist in the registry. - """ - with self.lock: - try: - del self.agents[agent_name] - logger.info( - f"Agent {agent_name} deleted successfully." - ) - except KeyError as e: - logger.error(f"Error: {e}") - raise - - def update_agent(self, agent_name: str, new_agent: Agent) -> None: - """ - Updates an existing agent in the registry. - - Args: - agent_name (str): The name of the agent to update. - new_agent (Agent): The new agent to replace the existing one. - - Raises: - KeyError: If the agent_name does not exist in the registry. - ValidationError: If the input data is invalid. - """ - with self.lock: - if agent_name not in self.agents: - logger.error( - f"Agent with name {agent_name} does not exist." - ) - raise KeyError( - f"Agent with name {agent_name} does not exist." - ) - try: - self.agents[agent_name] = new_agent - logger.info( - f"Agent {agent_name} updated successfully." - ) - except ValidationError as e: - logger.error(f"Validation error: {e}") - raise - - def get(self, agent_name: str) -> Agent: - """ - Retrieves an agent from the registry. - - Args: - agent_name (str): The name of the agent to retrieve. - - Returns: - Agent: The agent associated with the given agent_name. - - Raises: - KeyError: If the agent_name does not exist in the registry. - """ - with self.lock: - try: - agent = self.agents[agent_name] - logger.info( - f"Agent {agent_name} retrieved successfully." - ) - return agent - except KeyError as e: - logger.error(f"Error: {e}") - raise - - def list_agents(self) -> List[str]: - """ - Lists all agent names in the registry. - - Returns: - List[str]: A list of all agent names. - """ - try: - with self.lock: - agent_names = list(self.agents.keys()) - logger.info("Listing all agents.") - return agent_names - except Exception as e: - logger.error(f"Error: {e}") - raise e - def return_all_agents(self) -> List[Agent]: - """ - Returns all agents from the registry. - - Returns: - List[Agent]: A list of all agents. - """ try: with self.lock: agents = list(self.agents.values()) logger.info("Returning all agents.") - returnagents - except Exception as e: - logger.error(f"Error: {e}") - raise e - - def query( - self, condition: Optional[Callable[[Agent], bool]] = None - ) -> List[Agent]: - """ - Queries agents based on a condition. - - Args: - condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating - whether the agent meets the condition. - - Returns: - List[Agent]: A list of agents that meet the condition. - """ - try: - with self.lock: - if condition is None: - agents = list(self.agents.values()) - logger.info("Querying all agents.") - return agents - - agents = [ - agent - for agent in self.agents.values() - if condition(agent) - ] - logger.info("Querying agents with condition.") return agents except Exception as e: logger.error(f"Error: {e}") raise e - - def find_agent_by_name(self, agent_name: str) -> Optional[Agent]: - """ - Find an agent by its name. - - Args: - agent_name (str): The name of the agent to find. - - Returns: - Agent: The agent with the given name. - """ - try: - with ThreadPoolExecutor() as executor: - futures = { - executor.submit(self.get, agent_name): agent_name - for agent_name in self.agents.keys() - } - for future in as_completed(futures): - agent = future.result() - if agent.agent_name == agent_name: - return agent - except Exception as e: - logger.error(f"Error: {e}") - raise e - - def agent_to_py_model(self, agent: Agent): - """ - Converts an agent to a Pydantic model. - - Args: - agent (Agent): The agent to convert. - """ - agent_name = agent.agent_name - agent_description = ( - agent.description - if agent.description - else "No description provided" - ) - - schema = AgentConfigSchema( - uuid=agent.id, - name=agent_name, - description=agent_description, - config=agent.to_dict(), - ) - - logger.info( - f"Agent {agent_name} converted to Pydantic model." - ) - - self.agent_registry.agents.append(schema)