basic funcitionality with notification

pull/666/head
Occupying-Mars 2 months ago
parent 82a2d8954b
commit 570f8a2020

@ -7,6 +7,9 @@ from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.orchestrator import Orchestrator
from swarms.structs.notification_manager import UpdateMetadata
from datetime import datetime
load_dotenv()
@ -43,6 +46,22 @@ agent = Agent(
return_history=True,
)
# Create orchestrator
orchestrator = Orchestrator()
# Register agents
orchestrator.register_agent(agent)
# Example vector DB update
update = UpdateMetadata(
topic="stock_market",
importance=0.8,
timestamp=datetime.now(),
affected_areas=["finance", "trading"]
)
# Handle update - only Financial-Analysis-Agent will be notified
orchestrator.handle_vector_db_update(update)
agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.",

@ -52,6 +52,7 @@ from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.utils.formatter import formatter
from .notification_manager import AgentProfile
logger = initialize_logger(log_folder="agents")
@ -179,6 +180,8 @@ class Agent:
artifacts_output_path (str): The artifacts output path
artifacts_file_extension (str): The artifacts file extension (.pdf, .md, .txt, )
scheduled_run_date (datetime): The date and time to schedule the task
expertise_areas (List[str]): The expertise areas of the agent
importance_threshold (float): The importance threshold of the agent
Methods:
run: Run the agent
@ -211,6 +214,8 @@ class Agent:
run_async_concurrent: Run the agent asynchronously and concurrently
construct_dynamic_prompt: Construct the dynamic prompt
handle_artifacts: Handle artifacts
update_notification_preferences: Update agent's notification preferences
handle_vector_db_update: Handle notification of vector DB update
Examples:
@ -340,6 +345,8 @@ class Agent:
all_gpus: bool = False,
model_name: str = None,
llm_args: dict = None,
expertise_areas: List[str] = None,
importance_threshold: float = 0.5,
*args,
**kwargs,
):
@ -457,6 +464,8 @@ class Agent:
self.all_gpus = all_gpus
self.model_name = model_name
self.llm_args = llm_args
self.expertise_areas = expertise_areas or []
self.importance_threshold = importance_threshold
# Initialize the short term memory
self.short_memory = Conversation(
@ -595,6 +604,13 @@ class Agent:
threading.Thread(target=self.llm_handling())
# Add notification preferences
self.notification_profile = AgentProfile(
agent_id=agent_name,
expertise_areas=expertise_areas or [],
importance_threshold=importance_threshold
)
def llm_handling(self):
if self.llm is None:
@ -2442,3 +2458,19 @@ class Agent:
return formatter.print_table(
f"Agent: {self.agent_name} Configuration", config_dict
)
def update_notification_preferences(
self,
expertise_areas: List[str] = None,
importance_threshold: float = None
):
"""Update agent's notification preferences"""
if expertise_areas is not None:
self.notification_profile.expertise_areas = expertise_areas
if importance_threshold is not None:
self.notification_profile.importance_threshold = importance_threshold
def handle_vector_db_update(self, update_metadata: UpdateMetadata):
"""Handle notification of vector DB update"""
# Process the update based on agent's specific needs
pass

@ -0,0 +1,92 @@
from typing import Dict, List, Optional
from pydantic import BaseModel
import numpy as np
from datetime import datetime
class UpdateMetadata(BaseModel):
"""Metadata for vector DB updates"""
topic: str
importance: float
timestamp: datetime
embedding: Optional[List[float]] = None
affected_areas: List[str] = []
class AgentProfile(BaseModel):
"""Profile for agent notification preferences"""
agent_id: str
expertise_areas: List[str]
importance_threshold: float = 0.5
current_task_context: Optional[str] = None
embedding: Optional[List[float]] = None
class NotificationManager:
"""Manages selective notifications for vector DB updates"""
def __init__(self):
self.agent_profiles: Dict[str, AgentProfile] = {}
def register_agent(self, profile: AgentProfile):
"""Register an agent's notification preferences"""
self.agent_profiles[profile.agent_id] = profile
def unregister_agent(self, agent_id: str):
"""Remove an agent's notification preferences"""
if agent_id in self.agent_profiles:
del self.agent_profiles[agent_id]
def calculate_relevance(
self,
update_metadata: UpdateMetadata,
agent_profile: AgentProfile
) -> float:
"""Calculate relevance score between update and agent"""
# Topic/expertise overlap score
topic_score = len(
set(agent_profile.expertise_areas) &
set(update_metadata.affected_areas)
) / max(
len(agent_profile.expertise_areas),
len(update_metadata.affected_areas)
)
# Embedding similarity if available
embedding_score = 0.0
if update_metadata.embedding and agent_profile.embedding:
embedding_score = np.dot(
update_metadata.embedding,
agent_profile.embedding
)
# Combine scores (can be tuned)
relevance = 0.7 * topic_score + 0.3 * embedding_score
return relevance
def should_notify_agent(
self,
update_metadata: UpdateMetadata,
agent_profile: AgentProfile
) -> bool:
"""Determine if an agent should be notified of an update"""
# Check importance threshold
if update_metadata.importance < agent_profile.importance_threshold:
return False
# Calculate relevance
relevance = self.calculate_relevance(update_metadata, agent_profile)
# Notification threshold (can be tuned)
return relevance > 0.5
def get_agents_to_notify(
self,
update_metadata: UpdateMetadata
) -> List[str]:
"""Get list of agent IDs that should be notified of an update"""
agents_to_notify = []
for agent_id, profile in self.agent_profiles.items():
if self.should_notify_agent(update_metadata, profile):
agents_to_notify.append(agent_id)
return agents_to_notify

@ -0,0 +1,32 @@
from typing import List
from .notification_manager import NotificationManager, UpdateMetadata
from .agent import Agent
class Orchestrator:
def __init__(self):
self.notification_manager = NotificationManager()
self.agents: List[Agent] = []
def register_agent(self, agent: Agent):
"""Register an agent with the orchestrator"""
self.agents.append(agent)
self.notification_manager.register_agent(agent.notification_profile)
def handle_vector_db_update(self, update_metadata: UpdateMetadata):
"""Handle a vector DB update and notify relevant agents"""
# Get list of agents to notify
agents_to_notify = self.notification_manager.get_agents_to_notify(
update_metadata
)
# Notify relevant agents
for agent in self.agents:
if agent.agent_name in agents_to_notify:
agent.handle_vector_db_update(update_metadata)
def update_agent_task_context(self, agent_name: str, task_context: str):
"""Update an agent's current task context"""
if agent_name in self.notification_manager.agent_profiles:
self.notification_manager.agent_profiles[
agent_name
].current_task_context = task_context
Loading…
Cancel
Save