Update agent_registry.py

bb
pull/766/head
nathanogaga118 2 months ago committed by GitHub
parent 78332cd7d6
commit 67f8edcf03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -11,14 +11,13 @@ from swarms.utils.loguru_logger import logger
class AgentConfigSchema(BaseModel):
uuid: str = Field(
...,
description="The unique identifier for the agent.",
..., description="The unique identifier for the agent."
)
name: str = None
description: str = None
time_added: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
description="Time when the agent was added to the registry.",
description="Time when the agent was added to the registry."
)
config: Dict[Any, Any] = None
@ -29,28 +28,14 @@ class AgentRegistrySchema(BaseModel):
agents: List[AgentConfigSchema]
time_registry_creatd: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
description="Time when the registry was created.",
description="Time when the registry was created."
)
number_of_agents: int = Field(
0,
description="The number of agents in the registry.",
0, description="The number of agents in the registry."
)
class AgentRegistry:
"""
A class for managing a registry of agents.
Attributes:
name (str): The name of the registry.
description (str): A description of the registry.
return_json (bool): Indicates whether to return data in JSON format.
auto_save (bool): Indicates whether to automatically save changes to the registry.
agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name.
lock (Lock): A lock for thread-safe operations on the registry.
agent_registry (AgentRegistrySchema): The schema for the agent registry.
"""
def __init__(
self,
name: str = "Agent Registry",
@ -61,16 +46,6 @@ class AgentRegistry:
*args,
**kwargs,
):
"""
Initializes the AgentRegistry.
Args:
name (str, optional): The name of the registry. Defaults to "Agent Registry".
description (str, optional): A description of the registry. Defaults to "A registry for managing agents.".
agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None.
return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True.
auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False.
"""
self.name = name
self.description = description
self.return_json = return_json
@ -78,7 +53,6 @@ class AgentRegistry:
self.agents: Dict[str, Agent] = {}
self.lock = Lock()
# Initialize the agent registry
self.agent_registry = AgentRegistrySchema(
name=self.name,
description=self.description,
@ -90,33 +64,14 @@ class AgentRegistry:
self.add_many(agents)
def add(self, agent: Agent) -> None:
"""
Adds a new agent to the registry.
Args:
agent (Agent): The agent to add.
Raises:
ValueError: If the agent_name already exists in the registry or is invalid.
ValidationError: If the input data is invalid.
"""
name = agent.agent_name
# Input validation for agent_name
if not name or not isinstance(name, str):
logger.error("Invalid agent name provided.")
raise ValueError("Invalid agent name provided.")
self.agent_to_py_model(agent)
with self.lock:
if name in self.agents:
logger.error(
f"Agent with name {name} already exists."
)
raise ValueError(
f"Agent with name {name} already exists."
)
logger.error(f"Agent with name {name} already exists.")
raise ValueError(f"Agent with name {name} already exists.")
try:
self.agents[name] = agent
logger.info(f"Agent {name} added successfully.")
@ -125,21 +80,8 @@ class AgentRegistry:
raise
def add_many(self, agents: List[Agent]) -> None:
"""
Adds multiple agents to the registry.
Args:
agents (List[Agent]): The list of agents to add.
Raises:
ValueError: If any of the agent_names already exist in the registry.
ValidationError: If the input data is invalid.
"""
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.add, agent): agent
for agent in agents
}
futures = {executor.submit(self.add, agent): agent for agent in agents}
for future in as_completed(futures):
try:
future.result()
@ -148,85 +90,37 @@ class AgentRegistry:
raise
def delete(self, agent_name: str) -> None:
"""
Deletes an agent from the registry.
Args:
agent_name (str): The name of the agent to delete.
Raises:
KeyError: If the agent_name does not exist in the registry.
"""
with self.lock:
try:
del self.agents[agent_name]
logger.info(
f"Agent {agent_name} deleted successfully."
)
logger.info(f"Agent {agent_name} deleted successfully.")
except KeyError as e:
logger.error(f"Error: {e}")
raise
def update_agent(self, agent_name: str, new_agent: Agent) -> None:
"""
Updates an existing agent in the registry.
Args:
agent_name (str): The name of the agent to update.
new_agent (Agent): The new agent to replace the existing one.
Raises:
KeyError: If the agent_name does not exist in the registry.
ValidationError: If the input data is invalid.
"""
with self.lock:
if agent_name not in self.agents:
logger.error(
f"Agent with name {agent_name} does not exist."
)
raise KeyError(
f"Agent with name {agent_name} does not exist."
)
logger.error(f"Agent with name {agent_name} does not exist.")
raise KeyError(f"Agent with name {agent_name} does not exist.")
try:
self.agents[agent_name] = new_agent
logger.info(
f"Agent {agent_name} updated successfully."
)
logger.info(f"Agent {agent_name} updated successfully.")
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
def get(self, agent_name: str) -> Agent:
"""
Retrieves an agent from the registry.
Args:
agent_name (str): The name of the agent to retrieve.
Returns:
Agent: The agent associated with the given agent_name.
Raises:
KeyError: If the agent_name does not exist in the registry.
"""
with self.lock:
try:
agent = self.agents[agent_name]
logger.info(
f"Agent {agent_name} retrieved successfully."
)
logger.info(f"Agent {agent_name} retrieved successfully.")
return agent
except KeyError as e:
logger.error(f"Error: {e}")
raise
def list_agents(self) -> List[str]:
"""
Lists all agent names in the registry.
Returns:
List[str]: A list of all agent names.
"""
try:
with self.lock:
agent_names = list(self.agents.keys())
@ -237,12 +131,6 @@ class AgentRegistry:
raise e
def return_all_agents(self) -> List[Agent]:
"""
Returns all agents from the registry.
Returns:
List[Agent]: A list of all agents.
"""
try:
with self.lock:
agents = list(self.agents.values())
@ -252,19 +140,7 @@ class AgentRegistry:
logger.error(f"Error: {e}")
raise e
def query(
self, condition: Optional[Callable[[Agent], bool]] = None
) -> List[Agent]:
"""
Queries agents based on a condition.
Args:
condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating
whether the agent meets the condition.
Returns:
List[Agent]: A list of agents that meet the condition.
"""
def query(self, condition: Optional[Callable[[Agent], bool]] = None) -> List[Agent]:
try:
with self.lock:
if condition is None:
@ -272,11 +148,7 @@ class AgentRegistry:
logger.info("Querying all agents.")
return agents
agents = [
agent
for agent in self.agents.values()
if condition(agent)
]
agents = [agent for agent in self.agents.values() if condition(agent)]
logger.info("Querying agents with condition.")
return agents
except Exception as e:
@ -284,21 +156,9 @@ class AgentRegistry:
raise e
def find_agent_by_name(self, agent_name: str) -> Optional[Agent]:
"""
Find an agent by its name.
Args:
agent_name (str): The name of the agent to find.
Returns:
Agent: The agent with the given name.
"""
try:
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.get, agent_name): agent_name
for agent_name in self.agents.keys()
}
futures = {executor.submit(self.get, agent_name): agent_name for agent_name in self.agents.keys()}
for future in as_completed(futures):
agent = future.result()
if agent.agent_name == agent_name:
@ -308,18 +168,8 @@ class AgentRegistry:
raise e
def agent_to_py_model(self, agent: Agent):
"""
Converts an agent to a Pydantic model.
Args:
agent (Agent): The agent to convert.
"""
agent_name = agent.agent_name
agent_description = (
agent.description
if agent.description
else "No description provided"
)
agent_description = agent.description if agent.description else "No description provided"
schema = AgentConfigSchema(
uuid=agent.id,
@ -328,8 +178,8 @@ class AgentRegistry:
config=agent.to_dict(),
)
logger.info(
f"Agent {agent_name} converted to Pydantic model."
)
logger.info(f"Agent {agent_name} converted to Pydantic model.")
self.agent_registry.agents.append(schema)
if __name__ == "__main__":
print("Agent registry is running successfully!")

Loading…
Cancel
Save