[SWARMS][Memory -> playground]

pull/453/head
Kye 9 months ago
parent 53fc90d9ed
commit ac921e8df2

@ -1,79 +0,0 @@
"""Math utils."""
import logging
from typing import List, Optional, Tuple, Union
import numpy as np
logger = logging.getLogger(__name__)
Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray]
def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
"""Row-wise cosine similarity between two equal-width matrices."""
if len(X) == 0 or len(Y) == 0:
return np.array([])
X = np.array(X)
Y = np.array(Y)
if X.shape[1] != Y.shape[1]:
raise ValueError(
"Number of columns in X and Y must be the same. X has"
f" shape {X.shape} and Y has shape {Y.shape}."
)
try:
import simsimd as simd
X = np.array(X, dtype=np.float32)
Y = np.array(Y, dtype=np.float32)
Z = 1 - simd.cdist(X, Y, metric="cosine")
if isinstance(Z, float):
return np.array([Z])
return Z
except ImportError:
logger.info(
"Unable to import simsimd, defaulting to NumPy"
" implementation. If you want to use simsimd please"
" install with `pip install simsimd`."
)
X_norm = np.linalg.norm(X, axis=1)
Y_norm = np.linalg.norm(Y, axis=1)
# Ignore divide by zero errors run time warnings as those are handled below.
with np.errstate(divide="ignore", invalid="ignore"):
similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm)
similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
return similarity
def cosine_similarity_top_k(
X: Matrix,
Y: Matrix,
top_k: Optional[int] = 5,
score_threshold: Optional[float] = None,
) -> Tuple[List[Tuple[int, int]], List[float]]:
"""Row-wise cosine similarity with optional top-k and score threshold filtering.
Args:
X: Matrix.
Y: Matrix, same width as X.
top_k: Max number of results to return.
score_threshold: Minimum cosine similarity of results.
Returns:
Tuple of two lists. First contains two-tuples of indices (X_idx, Y_idx),
second contains corresponding cosine similarities.
"""
if len(X) == 0 or len(Y) == 0:
return [], []
score_array = cosine_similarity(X, Y)
score_threshold = score_threshold or -1.0
score_array[score_array < score_threshold] = 0
top_k = min(top_k or len(score_array), np.count_nonzero(score_array))
top_k_idxs = np.argpartition(score_array, -top_k, axis=None)[-top_k:]
top_k_idxs = top_k_idxs[np.argsort(score_array.ravel()[top_k_idxs])][
::-1
]
ret_idxs = np.unravel_index(top_k_idxs, score_array.shape)
scores = score_array.ravel()[top_k_idxs].tolist()
return list(zip(*ret_idxs)), scores # type: ignore

@ -1,194 +0,0 @@
import threading
from pathlib import Path
from langchain.chains import RetrievalQA
from langchain.chains.question_answering import load_qa_chain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import Chroma
from swarms.models.popular_llms import OpenAIChat
from swarms.memory.base_vectordb import BaseVectorDatabase
def synchronized_mem(method):
"""
Decorator that synchronizes access to a method using a lock.
Args:
method: The method to be decorated.
Returns:
The decorated method.
"""
def wrapper(self, *args, **kwargs):
with self.lock:
try:
return method(self, *args, **kwargs)
except Exception as e:
print(f"Failed to execute {method.__name__}: {e}")
return wrapper
class LangchainChromaVectorMemory(BaseVectorDatabase):
"""
A class representing a vector memory for storing and retrieving text entries.
Attributes:
loc (str): The location of the vector memory.
chunk_size (int): The size of each text chunk.
chunk_overlap_frac (float): The fraction of overlap between text chunks.
embeddings (OpenAIEmbeddings): The embeddings used for text representation.
count (int): The current count of text entries in the vector memory.
lock (threading.Lock): A lock for thread safety.
db (Chroma): The Chroma database for storing text entries.
qa (RetrievalQA): The retrieval QA system for answering questions.
Methods:
__init__: Initializes the VectorMemory object.
_init_db: Initializes the Chroma database.
_init_retriever: Initializes the retrieval QA system.
add_entry: Adds an entry to the vector memory.
search_memory: Searches the vector memory for similar entries.
ask_question: Asks a question to the vector memory.
"""
def __init__(
self,
loc=None,
chunk_size: int = 1000,
chunk_overlap_frac: float = 0.1,
*args,
**kwargs,
):
"""
Initializes the VectorMemory object.
Args:
loc (str): The location of the vector memory. If None, defaults to "./tmp/vector_memory".
chunk_size (int): The size of each text chunk.
chunk_overlap_frac (float): The fraction of overlap between text chunks.
"""
if loc is None:
loc = "./tmp/vector_memory"
self.loc = Path(loc)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_size * chunk_overlap_frac
self.embeddings = OpenAIEmbeddings()
self.count = 0
self.lock = threading.Lock()
self.db = self._init_db()
self.qa = self._init_retriever()
def _init_db(self):
"""
Initializes the Chroma database.
Returns:
Chroma: The initialized Chroma database.
"""
texts = [
"init"
] # TODO find how to initialize Chroma without any text
chroma_db = Chroma.from_texts(
texts=texts,
embedding=self.embeddings,
persist_directory=str(self.loc),
)
self.count = chroma_db._collection.count()
return chroma_db
def _init_retriever(self):
"""
Initializes the retrieval QA system.
Returns:
RetrievalQA: The initialized retrieval QA system.
"""
model = OpenAIChat(
model_name="gpt-3.5-turbo",
)
qa_chain = load_qa_chain(model, chain_type="stuff")
retriever = self.db.as_retriever(
search_type="mmr", search_kwargs={"k": 10}
)
qa = RetrievalQA(
combine_documents_chain=qa_chain, retriever=retriever
)
return qa
@synchronized_mem
def add(self, entry: str):
"""
Add an entry to the internal memory.
Args:
entry (str): The entry to be added.
Returns:
bool: True if the entry was successfully added, False otherwise.
"""
text_splitter = CharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
separator=" ",
)
texts = text_splitter.split_text(entry)
self.db.add_texts(texts)
self.count += self.db._collection.count()
self.db.persist()
return True
@synchronized_mem
def search_memory(
self, query: str, k=10, type="mmr", distance_threshold=0.5
):
"""
Searching the vector memory for similar entries.
Args:
query (str): The query to search for.
k (int): The number of results to return.
type (str): The type of search to perform: "cos" or "mmr".
distance_threshold (float): The similarity threshold to use for the search. Results with distance > similarity_threshold will be dropped.
Returns:
list[str]: A list of the top k results.
"""
self.count = self.db._collection.count()
if k > self.count:
k = self.count - 1
if k <= 0:
return None
if type == "mmr":
texts = self.db.max_marginal_relevance_search(
query=query, k=k, fetch_k=min(20, self.count)
)
texts = [text.page_content for text in texts]
elif type == "cos":
texts = self.db.similarity_search_with_score(query=query, k=k)
texts = [
text[0].page_content
for text in texts
if text[-1] < distance_threshold
]
return texts
@synchronized_mem
def query(self, question: str):
"""
Ask a question to the vector memory.
Args:
question (str): The question to ask.
Returns:
str: The answer to the question.
"""
answer = self.qa.run(question)
return answer

@ -1,92 +0,0 @@
"""Utility functions for working with vectors and vectorstores."""
from enum import Enum
from typing import List, Tuple, Type
import numpy as np
from swarms.memory.cosine_similarity import cosine_similarity
from swarms.structs.document import Document
class DistanceStrategy(str, Enum):
"""Enumerator of the Distance strategies for calculating distances
between vectors."""
EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE"
MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT"
DOT_PRODUCT = "DOT_PRODUCT"
JACCARD = "JACCARD"
COSINE = "COSINE"
def maximal_marginal_relevance(
query_embedding: np.ndarray,
embedding_list: list,
lambda_mult: float = 0.5,
k: int = 4,
) -> List[int]:
"""
Calculate maximal marginal relevance.
Args:
query_embedding (np.ndarray): The embedding of the query.
embedding_list (list): List of embeddings to select from.
lambda_mult (float, optional): The weight for query score. Defaults to 0.5.
k (int, optional): The number of embeddings to select. Defaults to 4.
Returns:
List[int]: List of indices of selected embeddings.
"""
if min(k, len(embedding_list)) <= 0:
return []
if query_embedding.ndim == 1:
query_embedding = np.expand_dims(query_embedding, axis=0)
similarity_to_query = cosine_similarity(
query_embedding, embedding_list
)[0]
most_similar = int(np.argmax(similarity_to_query))
idxs = [most_similar]
selected = np.array([embedding_list[most_similar]])
while len(idxs) < min(k, len(embedding_list)):
best_score = -np.inf
idx_to_add = -1
similarity_to_selected = cosine_similarity(
embedding_list, selected
)
for i, query_score in enumerate(similarity_to_query):
if i in idxs:
continue
redundant_score = max(similarity_to_selected[i])
equation_score = (
lambda_mult * query_score
- (1 - lambda_mult) * redundant_score
)
if equation_score > best_score:
best_score = equation_score
idx_to_add = i
idxs.append(idx_to_add)
selected = np.append(
selected, [embedding_list[idx_to_add]], axis=0
)
return idxs
def filter_complex_metadata(
documents: List[Document],
*,
allowed_types: Tuple[Type, ...] = (str, bool, int, float),
) -> List[Document]:
"""Filter out metadata types that are not supported for a vector store."""
updated_documents = []
for document in documents:
filtered_metadata = {}
for key, value in document.metadata.items():
if not isinstance(value, allowed_types):
continue
filtered_metadata[key] = value
document.metadata = filtered_metadata
updated_documents.append(document)
return updated_documents

@ -2,7 +2,7 @@ import sqlite3
import pytest import pytest
from swarms.memory.sqlite import SQLiteDB from playground.memory.sqlite import SQLiteDB
@pytest.fixture @pytest.fixture

Loading…
Cancel
Save