[FEAT][Memory]

pull/207/head^2
Kye 1 year ago
parent 9e0fcd95f6
commit cb31ab314d

@ -1,11 +1,3 @@
from swarms.memory.pinecone import PineconeVector
from swarms.memory.base import BaseVectorStore
from swarms.memory.pg import PgVectorVectorStore
from swarms.memory.ocean import OceanDB
__all__ = [
"BaseVectorStore",
"PineconeVector",
"PgVectorVectorStore",
"OceanDB",
]
# from swarms.memory.pinecone import PineconeVector
# from swarms.memory.base import BaseVectorStore
# from swarms.memory.pg import PgVectorVectorStore

@ -9,8 +9,6 @@ from griptape.artifacts import TextArtifact
@define
class BaseVectorStore(ABC):
""" """
DEFAULT_QUERY_COUNT = 5
@dataclass

@ -1,177 +0,0 @@
import uuid
from abc import ABC
from typing import Any, Dict, List, Optional
from swarms.memory.schemas import Artifact, Status
from swarms.memory.schemas import Step as APIStep
from swarms.memory.schemas import Task as APITask
class Step(APIStep):
additional_properties: Optional[Dict[str, str]] = None
class Task(APITask):
steps: List[Step] = []
class NotFoundException(Exception):
"""
Exception raised when a resource is not found.
"""
def __init__(self, item_name: str, item_id: str):
self.item_name = item_name
self.item_id = item_id
super().__init__(f"{item_name} with {item_id} not found.")
class TaskDB(ABC):
async def create_task(
self,
input: Optional[str],
additional_input: Any = None,
artifacts: Optional[List[Artifact]] = None,
steps: Optional[List[Step]] = None,
) -> Task:
raise NotImplementedError
async def create_step(
self,
task_id: str,
name: Optional[str] = None,
input: Optional[str] = None,
is_last: bool = False,
additional_properties: Optional[Dict[str, str]] = None,
) -> Step:
raise NotImplementedError
async def create_artifact(
self,
task_id: str,
file_name: str,
relative_path: Optional[str] = None,
step_id: Optional[str] = None,
) -> Artifact:
raise NotImplementedError
async def get_task(self, task_id: str) -> Task:
raise NotImplementedError
async def get_step(self, task_id: str, step_id: str) -> Step:
raise NotImplementedError
async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
raise NotImplementedError
async def list_tasks(self) -> List[Task]:
raise NotImplementedError
async def list_steps(
self, task_id: str, status: Optional[Status] = None
) -> List[Step]:
raise NotImplementedError
class InMemoryTaskDB(TaskDB):
_tasks: Dict[str, Task] = {}
async def create_task(
self,
input: Optional[str],
additional_input: Any = None,
artifacts: Optional[List[Artifact]] = None,
steps: Optional[List[Step]] = None,
) -> Task:
if not steps:
steps = []
if not artifacts:
artifacts = []
task_id = str(uuid.uuid4())
task = Task(
task_id=task_id,
input=input,
steps=steps,
artifacts=artifacts,
additional_input=additional_input,
)
self._tasks[task_id] = task
return task
async def create_step(
self,
task_id: str,
name: Optional[str] = None,
input: Optional[str] = None,
is_last=False,
additional_properties: Optional[Dict[str, Any]] = None,
) -> Step:
step_id = str(uuid.uuid4())
step = Step(
task_id=task_id,
step_id=step_id,
name=name,
input=input,
status=Status.created,
is_last=is_last,
additional_properties=additional_properties,
)
task = await self.get_task(task_id)
task.steps.append(step)
return step
async def get_task(self, task_id: str) -> Task:
task = self._tasks.get(task_id, None)
if not task:
raise NotFoundException("Task", task_id)
return task
async def get_step(self, task_id: str, step_id: str) -> Step:
task = await self.get_task(task_id)
step = next(filter(lambda s: s.task_id == task_id, task.steps), None)
if not step:
raise NotFoundException("Step", step_id)
return step
async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
task = await self.get_task(task_id)
artifact = next(
filter(lambda a: a.artifact_id == artifact_id, task.artifacts), None
)
if not artifact:
raise NotFoundException("Artifact", artifact_id)
return artifact
async def create_artifact(
self,
task_id: str,
file_name: str,
relative_path: Optional[str] = None,
step_id: Optional[str] = None,
) -> Artifact:
artifact_id = str(uuid.uuid4())
artifact = Artifact(
artifact_id=artifact_id,
file_name=file_name,
relative_path=relative_path,
)
task = await self.get_task(task_id)
task.artifacts.append(artifact)
if step_id:
step = await self.get_step(task_id, step_id)
step.artifacts.append(artifact)
return artifact
async def list_tasks(self) -> List[Task]:
return [task for task in self._tasks.values()]
async def list_steps(
self, task_id: str, status: Optional[Status] = None
) -> List[Step]:
task = await self.get_task(task_id)
steps = task.steps
if status:
steps = list(filter(lambda s: s.status == status, steps))
return steps

@ -1,157 +0,0 @@
import logging
from typing import List
import oceandb
from oceandb.utils.embedding_function import MultiModalEmbeddingFunction
class OceanDB:
"""
A class to interact with OceanDB.
...
Attributes
----------
client : oceandb.Client
a client to interact with OceanDB
Methods
-------
create_collection(collection_name: str, modality: str):
Creates a new collection in OceanDB.
append_document(collection, document: str, id: str):
Appends a document to a collection in OceanDB.
add_documents(collection, documents: List[str], ids: List[str]):
Adds multiple documents to a collection in OceanDB.
query(collection, query_texts: list[str], n_results: int):
Queries a collection in OceanDB.
"""
def __init__(self, client: oceandb.Client = None):
"""
Constructs all the necessary attributes for the OceanDB object.
Parameters
----------
client : oceandb.Client, optional
a client to interact with OceanDB (default is None, which creates a new client)
"""
try:
self.client = client if client else oceandb.Client()
print(self.client.heartbeat())
except Exception as e:
logging.error(f"Failed to initialize OceanDB client. Error: {e}")
raise
def create_collection(self, collection_name: str, modality: str):
"""
Creates a new collection in OceanDB.
Parameters
----------
collection_name : str
the name of the new collection
modality : str
the modality of the new collection
Returns
-------
collection
the created collection
"""
try:
embedding_function = MultiModalEmbeddingFunction(modality=modality)
collection = self.client.create_collection(
collection_name, embedding_function=embedding_function
)
return collection
except Exception as e:
logging.error(f"Failed to create collection. Error {e}")
raise
def append_document(self, collection, document: str, id: str):
"""
Appends a document to a collection in OceanDB.
Parameters
----------
collection
the collection to append the document to
document : str
the document to append
id : str
the id of the document
Returns
-------
result
the result of the append operation
"""
try:
return collection.add(documents=[document], ids=[id])
except Exception as e:
logging.error(
f"Failed to append document to the collection. Error {e}"
)
raise
def add_documents(self, collection, documents: List[str], ids: List[str]):
"""
Adds multiple documents to a collection in OceanDB.
Parameters
----------
collection
the collection to add the documents to
documents : List[str]
the documents to add
ids : List[str]
the ids of the documents
Returns
-------
result
the result of the add operation
"""
try:
return collection.add(documents=documents, ids=ids)
except Exception as e:
logging.error(f"Failed to add documents to collection. Error: {e}")
raise
def query(self, collection, query_texts: list[str], n_results: int):
"""
Queries a collection in OceanDB.
Parameters
----------
collection
the collection to query
query_texts : list[str]
the texts to query
n_results : int
the number of results to return
Returns
-------
results
the results of the query
"""
try:
results = collection.query(
query_texts=query_texts, n_results=n_results
)
return results
except Exception as e:
logging.error(f"Failed to query the collection. Error {e}")
raise
# Example
# ocean = OceanDB()
# collection = ocean.create_collection("test", "text")
# ocean.append_document(collection, "hello world", "1")
# ocean.add_documents(collection, ["hello world", "hello world"], ["2", "3"])
# results = ocean.query(collection, ["hello world"], 3)
# print(results)
Loading…
Cancel
Save