Update agent_registry.py

pull/766/head
nathanogaga118 2 months ago committed by GitHub
parent 3c9391405b
commit 7ddd24e5ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -8,49 +8,31 @@ from pydantic import BaseModel, Field, ValidationError
from swarms import Agent from swarms import Agent
from swarms.utils.loguru_logger import logger from swarms.utils.loguru_logger import logger
class AgentConfigSchema(BaseModel): class AgentConfigSchema(BaseModel):
uuid: str = Field( uuid: str = Field(
..., ..., description="The unique identifier for the agent."
description="The unique identifier for the agent.",
) )
name: str = None name: str = None
description: str = None description: str = None
time_added: str = Field( time_added: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
description="Time when the agent was added to the registry.", description="Time when the agent was added to the registry."
) )
config: Dict[Any, Any] = None config: Dict[Any, Any] = None
class AgentRegistrySchema(BaseModel): class AgentRegistrySchema(BaseModel):
name: str name: str
description: str description: str
agents: List[AgentConfigSchema] agents: List[AgentConfigSchema]
time_registry_creatd: str = Field( time_registry_created: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
description="Time when the registry was created.", description="Time when the registry was created."
) )
number_of_agents: int = Field( number_of_agents: int = Field(
0, 0, description="The number of agents in the registry."
description="The number of agents in the registry.",
) )
class AgentRegistry: class AgentRegistry:
"""
A class for managing a registry of agents.
Attributes:
name (str): The name of the registry.
description (str): A description of the registry.
return_json (bool): Indicates whether to return data in JSON format.
auto_save (bool): Indicates whether to automatically save changes to the registry.
agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name.
lock (Lock): A lock for thread-safe operations on the registry.
agent_registry (AgentRegistrySchema): The schema for the agent registry.
"""
def __init__( def __init__(
self, self,
name: str = "Agent Registry", name: str = "Agent Registry",
@ -61,16 +43,6 @@ class AgentRegistry:
*args, *args,
**kwargs, **kwargs,
): ):
"""
Initializes the AgentRegistry.
Args:
name (str, optional): The name of the registry. Defaults to "Agent Registry".
description (str, optional): A description of the registry. Defaults to "A registry for managing agents.".
agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None.
return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True.
auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False.
"""
self.name = name self.name = name
self.description = description self.description = description
self.return_json = return_json self.return_json = return_json
@ -78,7 +50,6 @@ class AgentRegistry:
self.agents: Dict[str, Agent] = {} self.agents: Dict[str, Agent] = {}
self.lock = Lock() self.lock = Lock()
# Initialize the agent registry
self.agent_registry = AgentRegistrySchema( self.agent_registry = AgentRegistrySchema(
name=self.name, name=self.name,
description=self.description, description=self.description,
@ -89,247 +60,12 @@ class AgentRegistry:
if agents: if agents:
self.add_many(agents) self.add_many(agents)
def add(self, agent: Agent) -> None:
"""
Adds a new agent to the registry.
Args:
agent (Agent): The agent to add.
Raises:
ValueError: If the agent_name already exists in the registry or is invalid.
ValidationError: If the input data is invalid.
"""
name = agent.agent_name
# Input validation for agent_name
if not name or not isinstance(name, str):
logger.error("Invalid agent name provided.")
raise ValueError("Invalid agent name provided.")
self.agent_to_py_model(agent)
with self.lock:
if name in self.agents:
logger.error(
f"Agent with name {name} already exists."
)
raise ValueError(
f"Agent with name {name} already exists."
)
try:
self.agents[name] = agent
logger.info(f"Agent {name} added successfully.")
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
def add_many(self, agents: List[Agent]) -> None:
"""
Adds multiple agents to the registry.
Args:
agents (List[Agent]): The list of agents to add.
Raises:
ValueError: If any of the agent_names already exist in the registry.
ValidationError: If the input data is invalid.
"""
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.add, agent): agent
for agent in agents
}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"Error adding agent: {e}")
raise
def delete(self, agent_name: str) -> None:
"""
Deletes an agent from the registry.
Args:
agent_name (str): The name of the agent to delete.
Raises:
KeyError: If the agent_name does not exist in the registry.
"""
with self.lock:
try:
del self.agents[agent_name]
logger.info(
f"Agent {agent_name} deleted successfully."
)
except KeyError as e:
logger.error(f"Error: {e}")
raise
def update_agent(self, agent_name: str, new_agent: Agent) -> None:
"""
Updates an existing agent in the registry.
Args:
agent_name (str): The name of the agent to update.
new_agent (Agent): The new agent to replace the existing one.
Raises:
KeyError: If the agent_name does not exist in the registry.
ValidationError: If the input data is invalid.
"""
with self.lock:
if agent_name not in self.agents:
logger.error(
f"Agent with name {agent_name} does not exist."
)
raise KeyError(
f"Agent with name {agent_name} does not exist."
)
try:
self.agents[agent_name] = new_agent
logger.info(
f"Agent {agent_name} updated successfully."
)
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
def get(self, agent_name: str) -> Agent:
"""
Retrieves an agent from the registry.
Args:
agent_name (str): The name of the agent to retrieve.
Returns:
Agent: The agent associated with the given agent_name.
Raises:
KeyError: If the agent_name does not exist in the registry.
"""
with self.lock:
try:
agent = self.agents[agent_name]
logger.info(
f"Agent {agent_name} retrieved successfully."
)
return agent
except KeyError as e:
logger.error(f"Error: {e}")
raise
def list_agents(self) -> List[str]:
"""
Lists all agent names in the registry.
Returns:
List[str]: A list of all agent names.
"""
try:
with self.lock:
agent_names = list(self.agents.keys())
logger.info("Listing all agents.")
return agent_names
except Exception as e:
logger.error(f"Error: {e}")
raise e
def return_all_agents(self) -> List[Agent]: def return_all_agents(self) -> List[Agent]:
"""
Returns all agents from the registry.
Returns:
List[Agent]: A list of all agents.
"""
try: try:
with self.lock: with self.lock:
agents = list(self.agents.values()) agents = list(self.agents.values())
logger.info("Returning all agents.") logger.info("Returning all agents.")
returnagents
except Exception as e:
logger.error(f"Error: {e}")
raise e
def query(
self, condition: Optional[Callable[[Agent], bool]] = None
) -> List[Agent]:
"""
Queries agents based on a condition.
Args:
condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating
whether the agent meets the condition.
Returns:
List[Agent]: A list of agents that meet the condition.
"""
try:
with self.lock:
if condition is None:
agents = list(self.agents.values())
logger.info("Querying all agents.")
return agents
agents = [
agent
for agent in self.agents.values()
if condition(agent)
]
logger.info("Querying agents with condition.")
return agents return agents
except Exception as e: except Exception as e:
logger.error(f"Error: {e}") logger.error(f"Error: {e}")
raise e raise e
def find_agent_by_name(self, agent_name: str) -> Optional[Agent]:
"""
Find an agent by its name.
Args:
agent_name (str): The name of the agent to find.
Returns:
Agent: The agent with the given name.
"""
try:
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.get, agent_name): agent_name
for agent_name in self.agents.keys()
}
for future in as_completed(futures):
agent = future.result()
if agent.agent_name == agent_name:
return agent
except Exception as e:
logger.error(f"Error: {e}")
raise e
def agent_to_py_model(self, agent: Agent):
"""
Converts an agent to a Pydantic model.
Args:
agent (Agent): The agent to convert.
"""
agent_name = agent.agent_name
agent_description = (
agent.description
if agent.description
else "No description provided"
)
schema = AgentConfigSchema(
uuid=agent.id,
name=agent_name,
description=agent_description,
config=agent.to_dict(),
)
logger.info(
f"Agent {agent_name} converted to Pydantic model."
)
self.agent_registry.agents.append(schema)

Loading…
Cancel
Save