diff --git a/playground/memory/chroma_usage.py b/playground/memory/chroma_usage.py new file mode 100644 index 00000000..64aa347c --- /dev/null +++ b/playground/memory/chroma_usage.py @@ -0,0 +1,27 @@ +from swarms.memory import chroma + +# loader = CSVLoader( +# file_path="../document_parsing/aipg/aipg.csv", +# encoding="utf-8-sig", +# ) +# docs = loader.load() + + +# Initialize the Qdrant instance +# See qdrant documentation on how to run locally +qdrant_client = chroma.ChromaClient() + +qdrant_client.add_vectors(["This is a document", "BONSAIIIIIII", "the walking dead"]) + +results = qdrant_client.search_vectors("zombie", limit=1) + +print(results) + +# qdrant_client.add_vectors(docs) +# +# # Perform a search +# search_query = "Who is jojo" +# search_results = qdrant_client.search_vectors(search_query) +# print("Search Results:") +# for result in search_results: +# print(result) diff --git a/swarms/memory/chroma.py b/swarms/memory/chroma.py index 79b92964..4fc05081 100644 --- a/swarms/memory/chroma.py +++ b/swarms/memory/chroma.py @@ -1,753 +1,112 @@ -from __future__ import annotations +from typing import List +from chromadb.utils import embedding_functions +from httpx import RequestError +import chromadb -import logging -import uuid -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Optional, - Tuple, - Type, -) - -import numpy as np - -from swarms.structs.document import Document -from swarms.models.embeddings_base import Embeddings -from langchain.schema.vectorstore import VectorStore -from langchain.utils import xor_args -from langchain.vectorstores.utils import maximal_marginal_relevance - -if TYPE_CHECKING: - import chromadb - import chromadb.config - from chromadb.api.types import ID, OneOrMany, Where, WhereDocument - -logger = logging.getLogger() -DEFAULT_K = 4 # Number of Documents to return. - - -def _results_to_docs(results: Any) -> List[Document]: - return [doc for doc, _ in _results_to_docs_and_scores(results)] - - -def _results_to_docs_and_scores( - results: Any, -) -> List[Tuple[Document, float]]: - return [ - # TODO: Chroma can do batch querying, - # we shouldn't hard code to the 1st result - ( - Document( - page_content=result[0], metadata=result[1] or {} - ), - result[2], - ) - for result in zip( - results["documents"][0], - results["metadatas"][0], - results["distances"][0], - ) - ] - - -class Chroma(VectorStore): - """`ChromaDB` vector store. - - To use, you should have the ``chromadb`` python package installed. - - Example: - .. code-block:: python - - from langchain.vectorstores import Chroma - from langchain.embeddings.openai import OpenAIEmbeddings - - embeddings = OpenAIEmbeddings() - vectorstore = Chroma("langchain_store", embeddings) - """ - - _LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain" +class ChromaClient: def __init__( self, - collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, - embedding_function: Optional[Embeddings] = None, - persist_directory: Optional[str] = None, - client_settings: Optional[chromadb.config.Settings] = None, - collection_metadata: Optional[Dict] = None, - client: Optional[chromadb.Client] = None, - relevance_score_fn: Optional[Callable[[float], float]] = None, - ) -> None: - """Initialize with a Chroma client.""" - try: - import chromadb - import chromadb.config - except ImportError: - raise ImportError( - "Could not import chromadb python package. " - "Please install it with `pip install chromadb`." - ) - - if client is not None: - self._client_settings = client_settings - self._client = client - self._persist_directory = persist_directory - else: - if client_settings: - # If client_settings is provided with persist_directory specified, - # then it is "in-memory and persisting to disk" mode. - client_settings.persist_directory = ( - persist_directory - or client_settings.persist_directory - ) - if client_settings.persist_directory is not None: - # Maintain backwards compatibility with chromadb < 0.4.0 - major, minor, _ = chromadb.__version__.split(".") - if int(major) == 0 and int(minor) < 4: - client_settings.chroma_db_impl = ( - "duckdb+parquet" - ) - - _client_settings = client_settings - elif persist_directory: - # Maintain backwards compatibility with chromadb < 0.4.0 - major, minor, _ = chromadb.__version__.split(".") - if int(major) == 0 and int(minor) < 4: - _client_settings = chromadb.config.Settings( - chroma_db_impl="duckdb+parquet", - ) - else: - _client_settings = chromadb.config.Settings( - is_persistent=True - ) - _client_settings.persist_directory = persist_directory - else: - _client_settings = chromadb.config.Settings() - self._client_settings = _client_settings - self._client = chromadb.Client(_client_settings) - self._persist_directory = ( - _client_settings.persist_directory - or persist_directory - ) - - self._embedding_function = embedding_function - self._collection = self._client.get_or_create_collection( - name=collection_name, - embedding_function=( - self._embedding_function.embed_documents - if self._embedding_function is not None - else None - ), - metadata=collection_metadata, - ) - self.override_relevance_score_fn = relevance_score_fn - - @property - def embeddings(self) -> Optional[Embeddings]: - return self._embedding_function - - @xor_args(("query_texts", "query_embeddings")) - def __query_collection( - self, - query_texts: Optional[List[str]] = None, - query_embeddings: Optional[List[List[float]]] = None, - n_results: int = 4, - where: Optional[Dict[str, str]] = None, - where_document: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Query the chroma collection.""" + collection_name: str = "chromadb-collection", + model_name: str = "BAAI/bge-small-en-v1.5", + ): try: - import chromadb # noqa: F401 - except ImportError: - raise ValueError( - "Could not import chromadb python package. " - "Please install it with `pip install chromadb`." - ) - return self._collection.query( - query_texts=query_texts, - query_embeddings=query_embeddings, - n_results=n_results, - where=where, - where_document=where_document, - **kwargs, - ) + self.client = chromadb.Client() + self.collection_name = collection_name + self.model = None + self.collection = None + self._load_embedding_model(model_name) + self._setup_collection() + except RequestError as e: + print(f"Error setting up QdrantClient: {e}") - def add_texts( - self, - texts: Iterable[str], - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> List[str]: - """Run more texts through the embeddings and add to the vectorstore. - - Args: - texts (Iterable[str]): Texts to add to the vectorstore. - metadatas (Optional[List[dict]], optional): Optional list of metadatas. - ids (Optional[List[str]], optional): Optional list of IDs. - - Returns: - List[str]: List of IDs of the added texts. + def _load_embedding_model(self, model_name: str): """ - # TODO: Handle the case where the user doesn't provide ids on the Collection - if ids is None: - ids = [str(uuid.uuid1()) for _ in texts] - embeddings = None - texts = list(texts) - if self._embedding_function is not None: - embeddings = self._embedding_function.embed_documents( - texts - ) - if metadatas: - # fill metadatas with empty dicts if somebody - # did not specify metadata for all texts - length_diff = len(texts) - len(metadatas) - if length_diff: - metadatas = metadatas + [{}] * length_diff - empty_ids = [] - non_empty_ids = [] - for idx, m in enumerate(metadatas): - if m: - non_empty_ids.append(idx) - else: - empty_ids.append(idx) - if non_empty_ids: - metadatas = [metadatas[idx] for idx in non_empty_ids] - texts_with_metadatas = [ - texts[idx] for idx in non_empty_ids - ] - embeddings_with_metadatas = ( - [embeddings[idx] for idx in non_empty_ids] - if embeddings - else None - ) - ids_with_metadata = [ - ids[idx] for idx in non_empty_ids - ] - try: - self._collection.upsert( - metadatas=metadatas, - embeddings=embeddings_with_metadatas, - documents=texts_with_metadatas, - ids=ids_with_metadata, - ) - except ValueError as e: - if "Expected metadata value to be" in str(e): - msg = ( - "Try filtering complex metadata from the" - " document" - " using " - "langchain.vectorstores.utils.filter_complex_metadata." - ) - raise ValueError(e.args[0] + "\n\n" + msg) - else: - raise e - if empty_ids: - texts_without_metadatas = [ - texts[j] for j in empty_ids - ] - embeddings_without_metadatas = ( - [embeddings[j] for j in empty_ids] - if embeddings - else None - ) - ids_without_metadatas = [ids[j] for j in empty_ids] - self._collection.upsert( - embeddings=embeddings_without_metadatas, - documents=texts_without_metadatas, - ids=ids_without_metadatas, - ) - else: - self._collection.upsert( - embeddings=embeddings, - documents=texts, - ids=ids, - ) - return ids - - def similarity_search( - self, - query: str, - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Run similarity search with Chroma. + Loads the sentence embedding model specified by the model name. Args: - query (str): Query text to search for. - k (int): Number of results to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List[Document]: List of documents most similar to the query text. + model_name (str): The name of the model to load for generating embeddings. """ - docs_and_scores = self.similarity_search_with_score( - query, k, filter=filter - ) - return [doc for doc, _ in docs_and_scores] + try: + self.model =embedding_functions.SentenceTransformerEmbeddingFunction(model_name=model_name) + except Exception as e: + print(f"Error loading embedding model: {e}") - def similarity_search_by_vector( - self, - embedding: List[float], - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - where_document: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs most similar to embedding vector. - Args: - embedding (List[float]): Embedding to look up documents similar to. - k (int): Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - Returns: - List of Documents most similar to the query vector. - """ - results = self.__query_collection( - query_embeddings=embedding, - n_results=k, - where=filter, - where_document=where_document, - ) - return _results_to_docs(results) + def _setup_collection(self): + try: + self.collection = self.client.get_collection(name=self.collection_name, embedding_function=self.model) + except Exception as e: + print(f"{e}. Creating new collection: {self.collection}") - def similarity_search_by_vector_with_relevance_scores( - self, - embedding: List[float], - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - where_document: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """ - Return docs most similar to embedding vector and similarity score. + self.collection = self.client.create_collection(name=self.collection_name, embedding_function=self.model) - Args: - embedding (List[float]): Embedding to look up documents similar to. - k (int): Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - Returns: - List[Tuple[Document, float]]: List of documents most similar to - the query text and cosine distance in float for each. - Lower score represents more similarity. + def add_vectors(self, docs: List[str]): """ - results = self.__query_collection( - query_embeddings=embedding, - n_results=k, - where=filter, - where_document=where_document, - ) - return _results_to_docs_and_scores(results) - - def similarity_search_with_score( - self, - query: str, - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - where_document: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Run similarity search with Chroma with distance. + Adds vector representations of documents to the Qdrant collection. Args: - query (str): Query text to search for. - k (int): Number of results to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + docs (List[dict]): A list of documents where each document is a dictionary with at least a 'page_content' key. Returns: - List[Tuple[Document, float]]: List of documents most similar to - the query text and cosine distance in float for each. - Lower score represents more similarity. - """ - if self._embedding_function is None: - results = self.__query_collection( - query_texts=[query], - n_results=k, - where=filter, - where_document=where_document, - ) - else: - query_embedding = self._embedding_function.embed_query( - query - ) - results = self.__query_collection( - query_embeddings=[query_embedding], - n_results=k, - where=filter, - where_document=where_document, - ) - - return _results_to_docs_and_scores(results) - - def _select_relevance_score_fn(self) -> Callable[[float], float]: - """ - The 'correct' relevance function - may differ depending on a few things, including: - - the distance / similarity metric used by the VectorStore - - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - - embedding dimensionality - - etc. + OperationResponse or None: Returns the operation information if successful, otherwise None. """ - if self.override_relevance_score_fn: - return self.override_relevance_score_fn + points = [] + ids = [] + for i, doc in enumerate(docs): + try: + points.append(doc) + ids.append("id"+str(i)) + except Exception as e: + print(f"Error processing document at index {i}: {e}") - distance = "l2" - distance_key = "hnsw:space" - metadata = self._collection.metadata - - if metadata and distance_key in metadata: - distance = metadata[distance_key] - - if distance == "cosine": - return self._cosine_relevance_score_fn - elif distance == "l2": - return self._euclidean_relevance_score_fn - elif distance == "ip": - return self._max_inner_product_relevance_score_fn - else: - raise ValueError( - "No supported normalization function for distance" - f" metric of type: {distance}.Consider providing" - " relevance_score_fn to Chroma constructor." + try: + self.collection.add( + documents=points, + ids=ids ) + except Exception as e: + print(f"Error adding vectors: {e}") + return None - def max_marginal_relevance_search_by_vector( - self, - embedding: List[float], - k: int = DEFAULT_K, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Dict[str, str]] = None, - where_document: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance. - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch to pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List of Documents selected by maximal marginal relevance. + def search_vectors(self, query: str, limit: int = 2): """ - - results = self.__query_collection( - query_embeddings=embedding, - n_results=fetch_k, - where=filter, - where_document=where_document, - include=[ - "metadatas", - "documents", - "distances", - "embeddings", - ], - ) - mmr_selected = maximal_marginal_relevance( - np.array(embedding, dtype=np.float32), - results["embeddings"][0], - k=k, - lambda_mult=lambda_mult, - ) - - candidates = _results_to_docs(results) - - selected_results = [ - r for i, r in enumerate(candidates) if i in mmr_selected - ] - return selected_results - - def max_marginal_relevance_search( - self, - query: str, - k: int = DEFAULT_K, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Dict[str, str]] = None, - where_document: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance. - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. + Searches the collection for vectors similar to the query vector. Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch to pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. + query (str): The query string to be converted into a vector and used for searching. + limit (int): The number of search results to return. Defaults to 3. Returns: - List of Documents selected by maximal marginal relevance. - """ - if self._embedding_function is None: - raise ValueError( - "For MMR search, you must specify an embedding" - " function oncreation." - ) - - embedding = self._embedding_function.embed_query(query) - docs = self.max_marginal_relevance_search_by_vector( - embedding, - k, - fetch_k, - lambda_mult=lambda_mult, - filter=filter, - where_document=where_document, - ) - return docs - - def delete_collection(self) -> None: - """Delete the collection.""" - self._client.delete_collection(self._collection.name) - - def get( - self, - ids: Optional[OneOrMany[ID]] = None, - where: Optional[Where] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - where_document: Optional[WhereDocument] = None, - include: Optional[List[str]] = None, - ) -> Dict[str, Any]: - """Gets the collection. - - Args: - ids: The ids of the embeddings to get. Optional. - where: A Where type dict used to filter results by. - E.g. `{"color" : "red", "price": 4.20}`. Optional. - limit: The number of documents to return. Optional. - offset: The offset to start returning results from. - Useful for paging results with limit. Optional. - where_document: A WhereDocument type dict used to filter by the documents. - E.g. `{$contains: "hello"}`. Optional. - include: A list of what to include in the results. - Can contain `"embeddings"`, `"metadatas"`, `"documents"`. - Ids are always included. - Defaults to `["metadatas", "documents"]`. Optional. + SearchResult or None: Returns the search results if successful, otherwise None. """ - kwargs = { - "ids": ids, - "where": where, - "limit": limit, - "offset": offset, - "where_document": where_document, - } - - if include is not None: - kwargs["include"] = include - - return self._collection.get(**kwargs) - - def persist(self) -> None: - """Persist the collection. - - This can be used to explicitly persist the data to disk. - It will also be called automatically when the object is destroyed. - """ - if self._persist_directory is None: - raise ValueError( - "You must specify a persist_directory on" - "creation to persist the collection." - ) - import chromadb - - # Maintain backwards compatibility with chromadb < 0.4.0 - major, minor, _ = chromadb.__version__.split(".") - if int(major) == 0 and int(minor) < 4: - self._client.persist() - - def update_document( - self, document_id: str, document: Document - ) -> None: - """Update a document in the collection. - - Args: - document_id (str): ID of the document to update. - document (Document): Document to update. - """ - return self.update_documents([document_id], [document]) - - def update_documents( - self, ids: List[str], documents: List[Document] - ) -> None: - """Update a document in the collection. - - Args: - ids (List[str]): List of ids of the document to update. - documents (List[Document]): List of documents to update. - """ - text = [document.page_content for document in documents] - metadata = [document.metadata for document in documents] - if self._embedding_function is None: - raise ValueError( - "For update, you must specify an embedding function" - " on creation." - ) - embeddings = self._embedding_function.embed_documents(text) - - if hasattr( - self._collection._client, "max_batch_size" - ): # for Chroma 0.4.10 and above - from chromadb.utils.batch_utils import create_batches - - for batch in create_batches( - api=self._collection._client, - ids=ids, - metadatas=metadata, - documents=text, - embeddings=embeddings, - ): - self._collection.update( - ids=batch[0], - embeddings=batch[1], - documents=batch[3], - metadatas=batch[2], - ) - else: - self._collection.update( - ids=ids, - embeddings=embeddings, - documents=text, - metadatas=metadata, - ) - - @classmethod - def from_texts( - cls: Type[Chroma], - texts: List[str], - embedding: Optional[Embeddings] = None, - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, - persist_directory: Optional[str] = None, - client_settings: Optional[chromadb.config.Settings] = None, - client: Optional[chromadb.Client] = None, - collection_metadata: Optional[Dict] = None, - **kwargs: Any, - ) -> Chroma: - """Create a Chroma vectorstore from a raw documents. - - If a persist_directory is specified, the collection will be persisted there. - Otherwise, the data will be ephemeral in-memory. - - Args: - texts (List[str]): List of texts to add to the collection. - collection_name (str): Name of the collection to create. - persist_directory (Optional[str]): Directory to persist the collection. - embedding (Optional[Embeddings]): Embedding function. Defaults to None. - metadatas (Optional[List[dict]]): List of metadatas. Defaults to None. - ids (Optional[List[str]]): List of document IDs. Defaults to None. - client_settings (Optional[chromadb.config.Settings]): Chroma client settings - collection_metadata (Optional[Dict]): Collection configurations. - Defaults to None. + try: + search_result = self.collection.query( + query_texts=query, + n_results=limit, + ) + return search_result + except Exception as e: + print(f"Error searching vectors: {e}") + return None - Returns: - Chroma: Chroma vectorstore. + def search_vectors_formatted(self, query: str, limit: int = 2): """ - chroma_collection = cls( - collection_name=collection_name, - embedding_function=embedding, - persist_directory=persist_directory, - client_settings=client_settings, - client=client, - collection_metadata=collection_metadata, - **kwargs, - ) - if ids is None: - ids = [str(uuid.uuid1()) for _ in texts] - if hasattr( - chroma_collection._client, "max_batch_size" - ): # for Chroma 0.4.10 and above - from chromadb.utils.batch_utils import create_batches - - for batch in create_batches( - api=chroma_collection._client, - ids=ids, - metadatas=metadatas, - documents=texts, - ): - chroma_collection.add_texts( - texts=batch[3] if batch[3] else [], - metadatas=batch[2] if batch[2] else None, - ids=batch[0], - ) - else: - chroma_collection.add_texts( - texts=texts, metadatas=metadatas, ids=ids - ) - return chroma_collection - - @classmethod - def from_documents( - cls: Type[Chroma], - documents: List[Document], - embedding: Optional[Embeddings] = None, - ids: Optional[List[str]] = None, - collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, - persist_directory: Optional[str] = None, - client_settings: Optional[chromadb.config.Settings] = None, - client: Optional[chromadb.Client] = None, # Add this line - collection_metadata: Optional[Dict] = None, - **kwargs: Any, - ) -> Chroma: - """Create a Chroma vectorstore from a list of documents. - - If a persist_directory is specified, the collection will be persisted there. - Otherwise, the data will be ephemeral in-memory. + Searches the collection for vectors similar to the query vector. Args: - collection_name (str): Name of the collection to create. - persist_directory (Optional[str]): Directory to persist the collection. - ids (Optional[List[str]]): List of document IDs. Defaults to None. - documents (List[Document]): List of documents to add to the vectorstore. - embedding (Optional[Embeddings]): Embedding function. Defaults to None. - client_settings (Optional[chromadb.config.Settings]): Chroma client settings - collection_metadata (Optional[Dict]): Collection configurations. - Defaults to None. + query (str): The query string to be converted into a vector and used for searching. + limit (int): The number of search results to return. Defaults to 3. Returns: - Chroma: Chroma vectorstore. - """ - texts = [doc.page_content for doc in documents] - metadatas = [doc.metadata for doc in documents] - return cls.from_texts( - texts=texts, - embedding=embedding, - metadatas=metadatas, - ids=ids, - collection_name=collection_name, - persist_directory=persist_directory, - client_settings=client_settings, - client=client, - collection_metadata=collection_metadata, - **kwargs, - ) - - def delete( - self, ids: Optional[List[str]] = None, **kwargs: Any - ) -> None: - """Delete by vector IDs. - - Args: - ids: List of ids to delete. + SearchResult or None: Returns the search results if successful, otherwise None. """ - self._collection.delete(ids=ids) + try: + search_result = self.collection.query( + query_texts=query, + n_results=limit, + ) + return search_result + except Exception as e: + print(f"Error searching vectors: {e}") + return None