embed with chroma

Former-commit-id: e2dfdf638c
group-chat
Kye 1 year ago
parent af66efe9ed
commit 5c98686ecf

@ -45,6 +45,7 @@ tenacity = "*"
redis = "*" redis = "*"
Pillow = "*" Pillow = "*"
shapeless="*" shapeless="*"
chromadb = "*"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
# Add development dependencies here # Add development dependencies here

@ -1,11 +1,13 @@
import logging import logging
import queue
import threading import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List from typing import Any, Dict, List
# from swarms.memory.ocean import OceanDB
import chromadb import chromadb
from chromadb.utils import embedding_functions
## =========> ## =========>
class Orchestrator(ABC): class Orchestrator(ABC):
@ -17,16 +19,25 @@ class Orchestrator(ABC):
collection_name: str = "swarm" collection_name: str = "swarm"
): ):
self.agent = agent self.agent = agent
self.agents = [agent() for _ in range(agent_list)] self.agents = queue.Queue()
self.task_queue = task_queue
for _ in range(agent_list):
self.agents.put(agent())
self.task_queue = queue.Queue()
self.chroma_client = chromadb.Client() self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection( self.collection = self.chroma_client.create_collection(
name=collection_name name = collection_name
) )
self.current_tasks = {} self.current_tasks = {}
self.lock = threading.Lock() self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.executor = ThreadPoolExecutor(max_workers=len(agent_list))
@abstractmethod @abstractmethod
def assign_task( def assign_task(
@ -36,26 +47,45 @@ class Orchestrator(ABC):
) -> None: ) -> None:
"""Assign a task to a specific agent""" """Assign a task to a specific agent"""
with self.lock: while True:
if self.task_queue: with self.condition:
#get and agent and a task while not self.task_queue:
agent = self.agents.pop(0) self.condition.wait()
task = self.task_queue.popleft() agent = self.agents.get()
result, vector_representation = agent.process_task() task = self.task_queue.get()
#use chromas's method to add data try:
result, vector_representation = agent.process_task(
task
)
self.collection.add( self.collection.add(
embeddings=[vector_representation], embeddings=[vector_representation],
documents=[str(id(task))], documents=[str(id(task))],
ids=[str(id(task))] ids=[str(id(task))]
) )
logging.info(f"Task {id(str)} has been processed by agent {id(agent)} with")
except Exception as error:
logging.error(f"Failed to process task {id(task)} by agent {id(agent)}. Error: {error}")
finally:
with self.condition:
self.agents.put(agent)
self.condition.notify()
def embed(self, input, api_key, model_name):
openai = embedding_functions.OpenAIEmbeddingFunction(
api_key=api_key,
model_name=model_name
)
#put the agent back to agent slist embedding = openai(input)
self.agents.append(agent) # print(embedding)
logging.info(f"Task {id(str)} has been processed by agent {id(agent)} ")
return result embedding_metadata = {input: embedding}
else: print(embedding_metadata)
logging.error("Task queue is empty")
# return embedding
@abstractmethod @abstractmethod
def retrieve_results(self, agent_id: int) -> Any: def retrieve_results(self, agent_id: int) -> Any:
@ -115,6 +145,7 @@ class Orchestrator(ABC):
try: try:
self.task_queue.append(objective) self.task_queue.append(objective)
results = [ results = [
self.assign_task( self.assign_task(
agent_id, task agent_id, task

Loading…
Cancel
Save