From ac921e8df2bc97bce84bc39f06696d71855f9382 Mon Sep 17 00:00:00 2001 From: Kye Date: Sun, 28 Apr 2024 19:52:35 -0400 Subject: [PATCH] [SWARMS][Memory -> playground] --- {swarms => playground}/memory/sqlite.py | 0 swarms/memory/cosine_similarity.py | 79 ---------- swarms/memory/lanchain_chroma.py | 194 ------------------------ swarms/memory/utils.py | 92 ----------- tests/memory/test_sqlite.py | 2 +- 5 files changed, 1 insertion(+), 366 deletions(-) rename {swarms => playground}/memory/sqlite.py (100%) delete mode 100644 swarms/memory/cosine_similarity.py delete mode 100644 swarms/memory/lanchain_chroma.py delete mode 100644 swarms/memory/utils.py diff --git a/swarms/memory/sqlite.py b/playground/memory/sqlite.py similarity index 100% rename from swarms/memory/sqlite.py rename to playground/memory/sqlite.py diff --git a/swarms/memory/cosine_similarity.py b/swarms/memory/cosine_similarity.py deleted file mode 100644 index 11115720..00000000 --- a/swarms/memory/cosine_similarity.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Math utils.""" - -import logging -from typing import List, Optional, Tuple, Union - -import numpy as np - -logger = logging.getLogger(__name__) - -Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray] - - -def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray: - """Row-wise cosine similarity between two equal-width matrices.""" - if len(X) == 0 or len(Y) == 0: - return np.array([]) - - X = np.array(X) - Y = np.array(Y) - if X.shape[1] != Y.shape[1]: - raise ValueError( - "Number of columns in X and Y must be the same. X has" - f" shape {X.shape} and Y has shape {Y.shape}." - ) - try: - import simsimd as simd - - X = np.array(X, dtype=np.float32) - Y = np.array(Y, dtype=np.float32) - Z = 1 - simd.cdist(X, Y, metric="cosine") - if isinstance(Z, float): - return np.array([Z]) - return Z - except ImportError: - logger.info( - "Unable to import simsimd, defaulting to NumPy" - " implementation. If you want to use simsimd please" - " install with `pip install simsimd`." - ) - X_norm = np.linalg.norm(X, axis=1) - Y_norm = np.linalg.norm(Y, axis=1) - # Ignore divide by zero errors run time warnings as those are handled below. - with np.errstate(divide="ignore", invalid="ignore"): - similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm) - similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0 - return similarity - - -def cosine_similarity_top_k( - X: Matrix, - Y: Matrix, - top_k: Optional[int] = 5, - score_threshold: Optional[float] = None, -) -> Tuple[List[Tuple[int, int]], List[float]]: - """Row-wise cosine similarity with optional top-k and score threshold filtering. - - Args: - X: Matrix. - Y: Matrix, same width as X. - top_k: Max number of results to return. - score_threshold: Minimum cosine similarity of results. - - Returns: - Tuple of two lists. First contains two-tuples of indices (X_idx, Y_idx), - second contains corresponding cosine similarities. - """ - if len(X) == 0 or len(Y) == 0: - return [], [] - score_array = cosine_similarity(X, Y) - score_threshold = score_threshold or -1.0 - score_array[score_array < score_threshold] = 0 - top_k = min(top_k or len(score_array), np.count_nonzero(score_array)) - top_k_idxs = np.argpartition(score_array, -top_k, axis=None)[-top_k:] - top_k_idxs = top_k_idxs[np.argsort(score_array.ravel()[top_k_idxs])][ - ::-1 - ] - ret_idxs = np.unravel_index(top_k_idxs, score_array.shape) - scores = score_array.ravel()[top_k_idxs].tolist() - return list(zip(*ret_idxs)), scores # type: ignore diff --git a/swarms/memory/lanchain_chroma.py b/swarms/memory/lanchain_chroma.py deleted file mode 100644 index b2849e81..00000000 --- a/swarms/memory/lanchain_chroma.py +++ /dev/null @@ -1,194 +0,0 @@ -import threading -from pathlib import Path - -from langchain.chains import RetrievalQA -from langchain.chains.question_answering import load_qa_chain -from langchain.embeddings.openai import OpenAIEmbeddings -from langchain.text_splitter import CharacterTextSplitter -from langchain.vectorstores import Chroma -from swarms.models.popular_llms import OpenAIChat -from swarms.memory.base_vectordb import BaseVectorDatabase - - -def synchronized_mem(method): - """ - Decorator that synchronizes access to a method using a lock. - - Args: - method: The method to be decorated. - - Returns: - The decorated method. - """ - - def wrapper(self, *args, **kwargs): - with self.lock: - try: - return method(self, *args, **kwargs) - except Exception as e: - print(f"Failed to execute {method.__name__}: {e}") - - return wrapper - - -class LangchainChromaVectorMemory(BaseVectorDatabase): - """ - A class representing a vector memory for storing and retrieving text entries. - - Attributes: - loc (str): The location of the vector memory. - chunk_size (int): The size of each text chunk. - chunk_overlap_frac (float): The fraction of overlap between text chunks. - embeddings (OpenAIEmbeddings): The embeddings used for text representation. - count (int): The current count of text entries in the vector memory. - lock (threading.Lock): A lock for thread safety. - db (Chroma): The Chroma database for storing text entries. - qa (RetrievalQA): The retrieval QA system for answering questions. - - Methods: - __init__: Initializes the VectorMemory object. - _init_db: Initializes the Chroma database. - _init_retriever: Initializes the retrieval QA system. - add_entry: Adds an entry to the vector memory. - search_memory: Searches the vector memory for similar entries. - ask_question: Asks a question to the vector memory. - """ - - def __init__( - self, - loc=None, - chunk_size: int = 1000, - chunk_overlap_frac: float = 0.1, - *args, - **kwargs, - ): - """ - Initializes the VectorMemory object. - - Args: - loc (str): The location of the vector memory. If None, defaults to "./tmp/vector_memory". - chunk_size (int): The size of each text chunk. - chunk_overlap_frac (float): The fraction of overlap between text chunks. - """ - if loc is None: - loc = "./tmp/vector_memory" - self.loc = Path(loc) - self.chunk_size = chunk_size - self.chunk_overlap = chunk_size * chunk_overlap_frac - self.embeddings = OpenAIEmbeddings() - self.count = 0 - self.lock = threading.Lock() - - self.db = self._init_db() - self.qa = self._init_retriever() - - def _init_db(self): - """ - Initializes the Chroma database. - - Returns: - Chroma: The initialized Chroma database. - """ - texts = [ - "init" - ] # TODO find how to initialize Chroma without any text - chroma_db = Chroma.from_texts( - texts=texts, - embedding=self.embeddings, - persist_directory=str(self.loc), - ) - self.count = chroma_db._collection.count() - return chroma_db - - def _init_retriever(self): - """ - Initializes the retrieval QA system. - - Returns: - RetrievalQA: The initialized retrieval QA system. - """ - model = OpenAIChat( - model_name="gpt-3.5-turbo", - ) - qa_chain = load_qa_chain(model, chain_type="stuff") - retriever = self.db.as_retriever( - search_type="mmr", search_kwargs={"k": 10} - ) - qa = RetrievalQA( - combine_documents_chain=qa_chain, retriever=retriever - ) - return qa - - @synchronized_mem - def add(self, entry: str): - """ - Add an entry to the internal memory. - - Args: - entry (str): The entry to be added. - - Returns: - bool: True if the entry was successfully added, False otherwise. - """ - text_splitter = CharacterTextSplitter( - chunk_size=self.chunk_size, - chunk_overlap=self.chunk_overlap, - separator=" ", - ) - texts = text_splitter.split_text(entry) - - self.db.add_texts(texts) - self.count += self.db._collection.count() - self.db.persist() - return True - - @synchronized_mem - def search_memory( - self, query: str, k=10, type="mmr", distance_threshold=0.5 - ): - """ - Searching the vector memory for similar entries. - - Args: - query (str): The query to search for. - k (int): The number of results to return. - type (str): The type of search to perform: "cos" or "mmr". - distance_threshold (float): The similarity threshold to use for the search. Results with distance > similarity_threshold will be dropped. - - Returns: - list[str]: A list of the top k results. - """ - self.count = self.db._collection.count() - if k > self.count: - k = self.count - 1 - if k <= 0: - return None - - if type == "mmr": - texts = self.db.max_marginal_relevance_search( - query=query, k=k, fetch_k=min(20, self.count) - ) - texts = [text.page_content for text in texts] - elif type == "cos": - texts = self.db.similarity_search_with_score(query=query, k=k) - texts = [ - text[0].page_content - for text in texts - if text[-1] < distance_threshold - ] - - return texts - - @synchronized_mem - def query(self, question: str): - """ - Ask a question to the vector memory. - - Args: - question (str): The question to ask. - - Returns: - str: The answer to the question. - """ - answer = self.qa.run(question) - return answer diff --git a/swarms/memory/utils.py b/swarms/memory/utils.py deleted file mode 100644 index 4dbbff80..00000000 --- a/swarms/memory/utils.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Utility functions for working with vectors and vectorstores.""" - -from enum import Enum -from typing import List, Tuple, Type - -import numpy as np - -from swarms.memory.cosine_similarity import cosine_similarity -from swarms.structs.document import Document - - -class DistanceStrategy(str, Enum): - """Enumerator of the Distance strategies for calculating distances - between vectors.""" - - EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE" - MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT" - DOT_PRODUCT = "DOT_PRODUCT" - JACCARD = "JACCARD" - COSINE = "COSINE" - - -def maximal_marginal_relevance( - query_embedding: np.ndarray, - embedding_list: list, - lambda_mult: float = 0.5, - k: int = 4, -) -> List[int]: - """ - Calculate maximal marginal relevance. - - Args: - query_embedding (np.ndarray): The embedding of the query. - embedding_list (list): List of embeddings to select from. - lambda_mult (float, optional): The weight for query score. Defaults to 0.5. - k (int, optional): The number of embeddings to select. Defaults to 4. - - Returns: - List[int]: List of indices of selected embeddings. - """ - if min(k, len(embedding_list)) <= 0: - return [] - if query_embedding.ndim == 1: - query_embedding = np.expand_dims(query_embedding, axis=0) - similarity_to_query = cosine_similarity( - query_embedding, embedding_list - )[0] - most_similar = int(np.argmax(similarity_to_query)) - idxs = [most_similar] - selected = np.array([embedding_list[most_similar]]) - while len(idxs) < min(k, len(embedding_list)): - best_score = -np.inf - idx_to_add = -1 - similarity_to_selected = cosine_similarity( - embedding_list, selected - ) - for i, query_score in enumerate(similarity_to_query): - if i in idxs: - continue - redundant_score = max(similarity_to_selected[i]) - equation_score = ( - lambda_mult * query_score - - (1 - lambda_mult) * redundant_score - ) - if equation_score > best_score: - best_score = equation_score - idx_to_add = i - idxs.append(idx_to_add) - selected = np.append( - selected, [embedding_list[idx_to_add]], axis=0 - ) - return idxs - - -def filter_complex_metadata( - documents: List[Document], - *, - allowed_types: Tuple[Type, ...] = (str, bool, int, float), -) -> List[Document]: - """Filter out metadata types that are not supported for a vector store.""" - updated_documents = [] - for document in documents: - filtered_metadata = {} - for key, value in document.metadata.items(): - if not isinstance(value, allowed_types): - continue - filtered_metadata[key] = value - - document.metadata = filtered_metadata - updated_documents.append(document) - - return updated_documents diff --git a/tests/memory/test_sqlite.py b/tests/memory/test_sqlite.py index 78900199..65fc4bf1 100644 --- a/tests/memory/test_sqlite.py +++ b/tests/memory/test_sqlite.py @@ -2,7 +2,7 @@ import sqlite3 import pytest -from swarms.memory.sqlite import SQLiteDB +from playground.memory.sqlite import SQLiteDB @pytest.fixture