From 570f8a2020bf9a04138b8adb8bc5a6ff8a149e73 Mon Sep 17 00:00:00 2001 From: Occupying-Mars Date: Fri, 6 Dec 2024 17:42:43 +0530 Subject: [PATCH] basic funcitionality with notification --- example.py | 19 ++++++ swarms/structs/agent.py | 32 +++++++++ swarms/structs/notification_manager.py | 92 ++++++++++++++++++++++++++ swarms/structs/orchestrator.py | 32 +++++++++ 4 files changed, 175 insertions(+) create mode 100644 swarms/structs/notification_manager.py create mode 100644 swarms/structs/orchestrator.py diff --git a/example.py b/example.py index 7647d1cd..c981f8c0 100644 --- a/example.py +++ b/example.py @@ -7,6 +7,9 @@ from swarms import Agent from swarms.prompts.finance_agent_sys_prompt import ( FINANCIAL_AGENT_SYS_PROMPT, ) +from swarms.structs.orchestrator import Orchestrator +from swarms.structs.notification_manager import UpdateMetadata +from datetime import datetime load_dotenv() @@ -43,6 +46,22 @@ agent = Agent( return_history=True, ) +# Create orchestrator +orchestrator = Orchestrator() + +# Register agents +orchestrator.register_agent(agent) + +# Example vector DB update +update = UpdateMetadata( + topic="stock_market", + importance=0.8, + timestamp=datetime.now(), + affected_areas=["finance", "trading"] +) + +# Handle update - only Financial-Analysis-Agent will be notified +orchestrator.handle_vector_db_update(update) agent.run( "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.", diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index c9160b1b..56830c7a 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -52,6 +52,7 @@ from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) from swarms.utils.formatter import formatter +from .notification_manager import AgentProfile logger = initialize_logger(log_folder="agents") @@ -179,6 +180,8 @@ class Agent: artifacts_output_path (str): The artifacts output path artifacts_file_extension (str): The artifacts file extension (.pdf, .md, .txt, ) scheduled_run_date (datetime): The date and time to schedule the task + expertise_areas (List[str]): The expertise areas of the agent + importance_threshold (float): The importance threshold of the agent Methods: run: Run the agent @@ -211,6 +214,8 @@ class Agent: run_async_concurrent: Run the agent asynchronously and concurrently construct_dynamic_prompt: Construct the dynamic prompt handle_artifacts: Handle artifacts + update_notification_preferences: Update agent's notification preferences + handle_vector_db_update: Handle notification of vector DB update Examples: @@ -340,6 +345,8 @@ class Agent: all_gpus: bool = False, model_name: str = None, llm_args: dict = None, + expertise_areas: List[str] = None, + importance_threshold: float = 0.5, *args, **kwargs, ): @@ -457,6 +464,8 @@ class Agent: self.all_gpus = all_gpus self.model_name = model_name self.llm_args = llm_args + self.expertise_areas = expertise_areas or [] + self.importance_threshold = importance_threshold # Initialize the short term memory self.short_memory = Conversation( @@ -595,6 +604,13 @@ class Agent: threading.Thread(target=self.llm_handling()) + # Add notification preferences + self.notification_profile = AgentProfile( + agent_id=agent_name, + expertise_areas=expertise_areas or [], + importance_threshold=importance_threshold + ) + def llm_handling(self): if self.llm is None: @@ -2442,3 +2458,19 @@ class Agent: return formatter.print_table( f"Agent: {self.agent_name} Configuration", config_dict ) + + def update_notification_preferences( + self, + expertise_areas: List[str] = None, + importance_threshold: float = None + ): + """Update agent's notification preferences""" + if expertise_areas is not None: + self.notification_profile.expertise_areas = expertise_areas + if importance_threshold is not None: + self.notification_profile.importance_threshold = importance_threshold + + def handle_vector_db_update(self, update_metadata: UpdateMetadata): + """Handle notification of vector DB update""" + # Process the update based on agent's specific needs + pass diff --git a/swarms/structs/notification_manager.py b/swarms/structs/notification_manager.py new file mode 100644 index 00000000..e3632cfc --- /dev/null +++ b/swarms/structs/notification_manager.py @@ -0,0 +1,92 @@ +from typing import Dict, List, Optional +from pydantic import BaseModel +import numpy as np +from datetime import datetime + +class UpdateMetadata(BaseModel): + """Metadata for vector DB updates""" + topic: str + importance: float + timestamp: datetime + embedding: Optional[List[float]] = None + affected_areas: List[str] = [] + +class AgentProfile(BaseModel): + """Profile for agent notification preferences""" + agent_id: str + expertise_areas: List[str] + importance_threshold: float = 0.5 + current_task_context: Optional[str] = None + embedding: Optional[List[float]] = None + +class NotificationManager: + """Manages selective notifications for vector DB updates""" + + def __init__(self): + self.agent_profiles: Dict[str, AgentProfile] = {} + + def register_agent(self, profile: AgentProfile): + """Register an agent's notification preferences""" + self.agent_profiles[profile.agent_id] = profile + + def unregister_agent(self, agent_id: str): + """Remove an agent's notification preferences""" + if agent_id in self.agent_profiles: + del self.agent_profiles[agent_id] + + def calculate_relevance( + self, + update_metadata: UpdateMetadata, + agent_profile: AgentProfile + ) -> float: + """Calculate relevance score between update and agent""" + # Topic/expertise overlap score + topic_score = len( + set(agent_profile.expertise_areas) & + set(update_metadata.affected_areas) + ) / max( + len(agent_profile.expertise_areas), + len(update_metadata.affected_areas) + ) + + # Embedding similarity if available + embedding_score = 0.0 + if update_metadata.embedding and agent_profile.embedding: + embedding_score = np.dot( + update_metadata.embedding, + agent_profile.embedding + ) + + # Combine scores (can be tuned) + relevance = 0.7 * topic_score + 0.3 * embedding_score + + return relevance + + def should_notify_agent( + self, + update_metadata: UpdateMetadata, + agent_profile: AgentProfile + ) -> bool: + """Determine if an agent should be notified of an update""" + # Check importance threshold + if update_metadata.importance < agent_profile.importance_threshold: + return False + + # Calculate relevance + relevance = self.calculate_relevance(update_metadata, agent_profile) + + # Notification threshold (can be tuned) + return relevance > 0.5 + + def get_agents_to_notify( + self, + update_metadata: UpdateMetadata + ) -> List[str]: + """Get list of agent IDs that should be notified of an update""" + agents_to_notify = [] + + for agent_id, profile in self.agent_profiles.items(): + if self.should_notify_agent(update_metadata, profile): + agents_to_notify.append(agent_id) + + return agents_to_notify \ No newline at end of file diff --git a/swarms/structs/orchestrator.py b/swarms/structs/orchestrator.py new file mode 100644 index 00000000..229be45a --- /dev/null +++ b/swarms/structs/orchestrator.py @@ -0,0 +1,32 @@ +from typing import List +from .notification_manager import NotificationManager, UpdateMetadata +from .agent import Agent + +class Orchestrator: + def __init__(self): + self.notification_manager = NotificationManager() + self.agents: List[Agent] = [] + + def register_agent(self, agent: Agent): + """Register an agent with the orchestrator""" + self.agents.append(agent) + self.notification_manager.register_agent(agent.notification_profile) + + def handle_vector_db_update(self, update_metadata: UpdateMetadata): + """Handle a vector DB update and notify relevant agents""" + # Get list of agents to notify + agents_to_notify = self.notification_manager.get_agents_to_notify( + update_metadata + ) + + # Notify relevant agents + for agent in self.agents: + if agent.agent_name in agents_to_notify: + agent.handle_vector_db_update(update_metadata) + + def update_agent_task_context(self, agent_name: str, task_context: str): + """Update an agent's current task context""" + if agent_name in self.notification_manager.agent_profiles: + self.notification_manager.agent_profiles[ + agent_name + ].current_task_context = task_context \ No newline at end of file