orchestrator scaffold

pull/10/head
Kye 1 year ago
parent 77e919a7da
commit 4bcd090a06

@ -1,5 +1,7 @@
#base toolset #base toolset
from swarms.agents.tools.agent_tools import * from swarms.agents.tools.agent_tools import *
from swarms.utils.logger import logger
from langchain.tools import BaseTool from langchain.tools import BaseTool
from langchain.callbacks.manager import ( from langchain.callbacks.manager import (
@ -11,12 +13,10 @@ from langchain.memory.chat_message_histories import FileChatMessageHistory
import logging import logging
from pydantic import BaseModel, Extra from pydantic import BaseModel, Extra
from swarms.agents.models.hf import HuggingFaceLLM from swarms.agents.models.hf import HuggingFaceLLM
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AgentNodeInitializer: class AgentNodeInitializer:
"""Useful for spawning autonomous agent instances to accomplish complex tasks.""" """Useful for spawning autonomous agent instances to accomplish complex tasks."""

@ -44,35 +44,72 @@ import celery
from typing import List, Dict, Any from typing import List, Dict, Any
import numpy as np import numpy as np
from swarms.agents. from swarms.agents.memory.ocean import OceanDB
class Orchestrator(ABC): class Orchestrator(ABC):
def __init__(self, agent, agent_list: List[Any], task_queue: celery.Celery, vector_db: np.ndarray): def __init__(self,
agent,
agent_list: List[Any],
task_queue: celery.Celery,
vector_db: OceanDB
):
self.agent = agent self.agent = agent
self.agents = agent_list self.agents = [agent_class() for _ in range(agent_list)]
self.task_queue = task_queue self.task_queue = task_queue
self.vector_db = vector_db self.vector_db = vector_db
self.current_tasks = {} self.current_tasks = {}
self.lock = Lock()
@abstractmethod @abstractmethod
def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None: def assign_task(self, agent_id: int, task: Dict[str, Any]) -> None:
"""Assign a task to a specific agent""" """Assign a task to a specific agent"""
pass with self.lock:
if self.task_queue:
#get and agent and a task
agent = self.agents.pop(0)
task = self.task_queue.popleft()
#process the task and get result and vector representation
result, vector_representation = agent.process_task()
#store the vector representation in the database
self.vector_db.add_documents([vector_representation],[str(id(task))])
#put the agent back to agent slist
self.agents.append(agent)
logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ")
return result
else:
logging.error("Task queue is empty")
@abstractmethod @abstractmethod
def retrieve_results(self, agent_id: int) -> Any: def retrieve_results(self, agent_id: int) -> Any:
"""Retrieve results from a specific agent""" """Retrieve results from a specific agent"""
pass try:
#Query the vector database for documents created by the agents
results = self.vector_db.query(query_texts=[str(agent_id)], n_results=10)
return results
except Exception as e:
logging.error(f"Failed to retrieve results from agent {agent_id}. Error {e}")
raise
@abstractmethod @abstractmethod
def update_vector_db(self, data: np.ndarray) -> None: def update_vector_db(self, data: np.ndarray) -> None:
"""Update the vector database""" """Update the vector database"""
pass try:
self.vector_db.add_documents([data['vector']], [str(data['task_id'])])
except Exception as e:
logging.error(f"Failed to update the vector database. Error: {e}")
raise
@abstractmethod @abstractmethod
def get_vector_db(self) -> np.ndarray: def get_vector_db(self) -> np.ndarray:
"""Retrieve the vector database""" """Retrieve the vector database"""
pass return self.vector_db

Loading…
Cancel
Save