diff --git a/docs/swarms/swarms/abstractswarm.md b/docs/swarms/swarms/abstractswarm.md new file mode 100644 index 00000000..78e28493 --- /dev/null +++ b/docs/swarms/swarms/abstractswarm.md @@ -0,0 +1,514 @@ +# `AbstractSwarm` Documentation + +## Table of Contents + +1. [Introduction](#introduction) +2. [Class Definition](#class-definition) +3. [Methods](#methods) + - [communicate()](#communicate) + - [run()](#run) + - [arun()](#arun) + - [add_worker(worker)](#add_worker) + - [remove_worker(worker)](#remove_worker) + - [broadcast(message, sender)](#broadcast) + - [reset()](#reset) + - [plan(task)](#plan) + - [direct_message(message, sender, recipient)](#direct_message) + - [autoscaler(num_workers, worker)](#autoscaler) + - [get_worker_by_id(id)](#get_worker_by_id) + - [get_worker_by_name(name)](#get_worker_by_name) + - [assign_task(worker, task)](#assign_task) + - [get_all_tasks(worker, task)](#get_all_tasks) + - [get_finished_tasks()](#get_finished_tasks) + - [get_pending_tasks()](#get_pending_tasks) + - [pause_worker(worker, worker_id)](#pause_worker) + - [resume_worker(worker, worker_id)](#resume_worker) + - [stop_worker(worker, worker_id)](#stop_worker) + - [restart_worker(worker)](#restart_worker) + - [scale_up(num_worker)](#scale_up) + - [scale_down(num_worker)](#scale_down) + - [scale_to(num_worker)](#scale_to) + - [get_all_workers()](#get_all_workers) + - [get_swarm_size()](#get_swarm_size) + - [get_swarm_status()](#get_swarm_status) + - [save_swarm_state()](#save_swarm_state) + +--- + +## 1. Introduction + +The Swarms library is designed to provide a framework for swarm simulation architectures. Swarms are collections of autonomous agents or workers that collaborate to perform tasks and achieve common goals. This documentation will guide you through the functionality and usage of the Swarms library, explaining the purpose and implementation details of the provided classes and methods. + +## 2. Class Definition + +### `AbstractSwarm` Class + +The `AbstractSwarm` class is an abstract base class that serves as the foundation for swarm simulation architectures. It defines the core functionality and methods required to manage and interact with a swarm of workers. + +```python +from abc import ABC, abstractmethod +from typing import Optional, List, Dict, Any +from swarms.swarms.base import AbstractWorker + +class AbstractSwarm(ABC): + """ + Abstract class for swarm simulation architectures + + Methods: + --------- + ... + """ + # The class definition and constructor are provided here. + + @abstractmethod + def __init__(self, workers: List["AbstractWorker"]): + """Initialize the swarm with workers""" + pass + + # Other abstract methods are listed here. +``` + +## 3. Methods + +### `communicate()` + +The `communicate()` method allows the swarm to exchange information through the orchestrator, protocols, and the universal communication layer. + +**Usage Example 1:** + +```python +swarm = YourSwarmClass(workers) +swarm.communicate() +``` + +**Usage Example 2:** + +```python +# Another example of using the communicate method +swarm = YourSwarmClass(workers) +swarm.communicate() +``` + +### `run()` + +The `run()` method executes the swarm, initiating its activities. + +**Usage Example 1:** + +```python +swarm = YourSwarmClass(workers) +swarm.run() +``` + +**Usage Example 2:** + +```python +# Another example of running the swarm +swarm = YourSwarmClass(workers) +swarm.run() +``` + +### `arun()` + +The `arun()` method runs the swarm asynchronously, allowing for parallel execution of tasks. + +**Usage Example 1:** + +```python +swarm = YourSwarmClass(workers) +swarm.arun() +``` + +**Usage Example 2:** + +```python +# Another example of running the swarm asynchronously +swarm = YourSwarmClass(workers) +swarm.arun() +``` + +### `add_worker(worker: "AbstractWorker")` + +The `add_worker()` method adds a worker to the swarm. + +**Parameters:** +- `worker` (AbstractWorker): The worker to be added to the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass([]) +worker = YourWorkerClass() +swarm.add_worker(worker) +``` + +### `remove_worker(worker: "AbstractWorker")` + +The `remove_worker()` method removes a worker from the swarm. + +**Parameters:** +- `worker` (AbstractWorker): The worker to be removed from the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_id("worker_id") +swarm.remove_worker(worker) +``` + +### `broadcast(message: str, sender: Optional["AbstractWorker"] = None)` + +The `broadcast()` method sends a message to all workers in the swarm. + +**Parameters:** +- `message` (str): The message to be broadcasted. +- `sender` (Optional[AbstractWorker]): The sender of the message (optional). + +**Usage Example 1:** + +```python +swarm = YourSwarmClass(workers) +message = "Hello, everyone!" +swarm.broadcast(message) +``` + +**Usage Example 2:** + +```python +# Another example of broadcasting a message +swarm = YourSwarmClass(workers) +message = "Important announcement!" +sender = swarm.get_worker_by_name("Supervisor") +swarm.broadcast(message, sender) +``` + +### `reset()` + +The `reset()` method resets the swarm to its initial state. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm.reset() +``` + +### `plan(task: str)` + +The `plan()` method instructs workers to individually plan using a workflow or pipeline for a specified task. + +**Parameters:** +- `task` (str): The task for which workers should plan. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +task = "Perform data analysis" +swarm.plan(task) +``` + +### `direct_message(message: str, sender: "AbstractWorker", recipient: "AbstractWorker")` + +The `direct_message()` method sends a direct message from one worker to another. + +**Parameters:** +- `message` (str): The message to be sent. +- `sender` (AbstractWorker): The sender of the message. +- `recipient` (AbstractWorker): The recipient of the message. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +sender = swarm.get_worker_by_name("Worker1") +recipient = swarm.get_worker_by_name("Worker2") +message = "Hello + +, Worker2!" +swarm.direct_message(message, sender, recipient) +``` + +### `autoscaler(num_workers: int, worker: List["AbstractWorker"])` + +The `autoscaler()` method acts as an autoscaler, dynamically adjusting the number of workers based on system load or other criteria. + +**Parameters:** +- `num_workers` (int): The desired number of workers. +- `worker` (List[AbstractWorker]): A list of workers to be managed by the autoscaler. + +**Usage Example:** + +```python +swarm = YourSwarmClass([]) +workers = [YourWorkerClass() for _ in range(10)] +swarm.autoscaler(5, workers) +``` + +### `get_worker_by_id(id: str) -> "AbstractWorker"` + +The `get_worker_by_id()` method locates a worker in the swarm by their ID. + +**Parameters:** +- `id` (str): The ID of the worker to locate. + +**Returns:** +- `AbstractWorker`: The worker with the specified ID. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker_id = "worker_123" +worker = swarm.get_worker_by_id(worker_id) +``` + +### `get_worker_by_name(name: str) -> "AbstractWorker"` + +The `get_worker_by_name()` method locates a worker in the swarm by their name. + +**Parameters:** +- `name` (str): The name of the worker to locate. + +**Returns:** +- `AbstractWorker`: The worker with the specified name. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker_name = "Alice" +worker = swarm.get_worker_by_name(worker_name) +``` + +### `assign_task(worker: "AbstractWorker", task: Any) -> Dict` + +The `assign_task()` method assigns a task to a specific worker. + +**Parameters:** +- `worker` (AbstractWorker): The worker to whom the task should be assigned. +- `task` (Any): The task to be assigned. + +**Returns:** +- `Dict`: A dictionary indicating the status of the task assignment. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_name("Worker1") +task = "Perform data analysis" +result = swarm.assign_task(worker, task) +``` + +### `get_all_tasks(worker: "AbstractWorker", task: Any)` + +The `get_all_tasks()` method retrieves all tasks assigned to a specific worker. + +**Parameters:** +- `worker` (AbstractWorker): The worker for whom tasks should be retrieved. +- `task` (Any): The task to be retrieved. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_name("Worker1") +tasks = swarm.get_all_tasks(worker, "data analysis") +``` + +### `get_finished_tasks() -> List[Dict]` + +The `get_finished_tasks()` method retrieves all tasks that have been completed by the workers in the swarm. + +**Returns:** +- `List[Dict]`: A list of dictionaries representing finished tasks. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +finished_tasks = swarm.get_finished_tasks() +``` + +### `get_pending_tasks() -> List[Dict]` + +The `get_pending_tasks()` method retrieves all tasks that are pending or yet to be completed by the workers in the swarm. + +**Returns:** +- `List[Dict]`: A list of dictionaries representing pending tasks. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +pending_tasks = swarm.get_pending_tasks() +``` + +### `pause_worker(worker: "AbstractWorker", worker_id: str)` + +The `pause_worker()` method pauses a specific worker, temporarily suspending their activities. + +**Parameters:** +- `worker` (AbstractWorker): The worker to be paused. +- `worker_id` (str): The ID of the worker to be paused. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_name("Worker1") +worker_id = "worker_123" +swarm.pause_worker(worker, worker_id) +``` + +### `resume_worker(worker: "AbstractWorker", worker_id: str)` + +The `resume_worker()` method resumes a paused worker, allowing them to continue their activities. + +**Parameters:** +- `worker` (AbstractWorker): The worker to be resumed. +- `worker_id` (str): The ID of the worker to be resumed. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_name("Worker1") +worker_id = "worker_123" +swarm.resume_worker(worker, worker_id) +``` + +### `stop_worker(worker: "AbstractWorker", worker_id: str)` + +The `stop_worker()` method stops a specific worker, terminating their activities. + +**Parameters:** +- `worker` (AbstractWorker): The worker to be stopped. +- `worker_id` (str): The ID of the worker to be stopped. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_name("Worker1") +worker_id = "worker_123" +swarm.stop_worker(worker, worker_id) +``` + +### `restart_worker(worker: "AbstractWorker")` + +The `restart_worker()` method restarts a worker, resetting them to their initial state. + +**Parameters:** +- `worker` (AbstractWorker): The worker to be restarted. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +worker = swarm.get_worker_by_name("Worker1") +swarm.restart_worker(worker) +``` + +### `scale_up(num_worker: int)` + +The `scale_up()` method increases the number of workers in the swarm. + +**Parameters:** +- `num_worker` (int): The number of workers to add to the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm.scale_up(5) +``` + +### `scale_down(num_worker: int)` + +The `scale_down()` method decreases the number of workers in the swarm. + +**Parameters:** +- `num_worker` (int): The number of workers to remove from the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm.scale_down(3) +``` + +### `scale_to(num_worker: int)` + +The `scale_to()` method scales the swarm to a specific number of workers. + +**Parameters:** +- `num_worker` (int): The desired number of workers. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm.scale_to(10) +``` + +### `get + +_all_workers() -> List["AbstractWorker"]` + +The `get_all_workers()` method retrieves a list of all workers in the swarm. + +**Returns:** +- `List[AbstractWorker]`: A list of all workers in the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +all_workers = swarm.get_all_workers() +``` + +### `get_swarm_size() -> int` + +The `get_swarm_size()` method returns the size of the swarm, which is the total number of workers. + +**Returns:** +- `int`: The size of the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm_size = swarm.get_swarm_size() +``` + +### `get_swarm_status() -> Dict` + +The `get_swarm_status()` method provides information about the current status of the swarm. + +**Returns:** +- `Dict`: A dictionary containing various status indicators for the swarm. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm_status = swarm.get_swarm_status() +``` + +### `save_swarm_state()` + +The `save_swarm_state()` method allows you to save the current state of the swarm, including worker configurations and task assignments. + +**Usage Example:** + +```python +swarm = YourSwarmClass(workers) +swarm.save_swarm_state() +``` + +--- + +This comprehensive documentation covers the Swarms library, including the `AbstractSwarm` class and its methods. You can use this documentation as a guide to understanding and effectively utilizing the Swarms framework for swarm simulation architectures. Feel free to explore further and adapt the library to your specific use cases. \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index c27f86a1..1a52cae8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -78,6 +78,7 @@ nav: - Swarms: - Overview: "swarms/index.md" - swarms.swarms: + - AbstractSwarm: "swarms/swarms/abstractswarm.md" - AutoScaler: "swarms/swarms/autoscaler.md" - swarms.workers: - Overview: "swarms/workers/index.md" diff --git a/swarms/memory/chroma.py b/swarms/memory/chroma.py deleted file mode 100644 index 810d3cc4..00000000 --- a/swarms/memory/chroma.py +++ /dev/null @@ -1,593 +0,0 @@ -"""Wrapper around ChromaDB embeddings platform.""" -from __future__ import annotations - -import logging -import uuid -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Optional, - Tuple, - Type, -) - -import numpy as np - -from langchain.docstore.document import Document -from langchain.embeddings.base import Embeddings -from langchain.utils import xor_args -from langchain.vectorstores.base import VectorStore -from langchain.vectorstores.utils import maximal_marginal_relevance - -if TYPE_CHECKING: - import chromadb - import chromadb.config - from chromadb.api.types import ID, OneOrMany, Where, WhereDocument - -logger = logging.getLogger() -DEFAULT_K = 4 # Number of Documents to return. - - -def _results_to_docs(results: Any) -> List[Document]: - return [doc for doc, _ in _results_to_docs_and_scores(results)] - - -def _results_to_docs_and_scores(results: Any) -> List[Tuple[Document, float]]: - return [ - # TODO: Chroma can do batch querying, - # we shouldn't hard code to the 1st result - (Document(page_content=result[0], metadata=result[1] or {}), result[2]) - for result in zip( - results["documents"][0], - results["metadatas"][0], - results["distances"][0], - ) - ] - - -class Chroma(VectorStore): - """Wrapper around ChromaDB embeddings platform. - - To use, you should have the ``chromadb`` python package installed. - - Example: - .. code-block:: python - - from langchain.vectorstores import Chroma - from langchain.embeddings.openai import OpenAIEmbeddings - - embeddings = OpenAIEmbeddings() - vectorstore = Chroma("langchain_store", embeddings) - """ - - _LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain" - - def __init__( - self, - collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, - embedding_function: Optional[Embeddings] = None, - persist_directory: Optional[str] = None, - client_settings: Optional[chromadb.config.Settings] = None, - collection_metadata: Optional[Dict] = None, - client: Optional[chromadb.Client] = None, - relevance_score_fn: Optional[Callable[[float], float]] = None, - ) -> None: - """Initialize with Chroma client.""" - try: - import chromadb - import chromadb.config - except ImportError: - raise ValueError( - "Could not import chromadb python package. " - "Please install it with `pip install chromadb`." - ) - - if client is not None: - self._client_settings = client_settings - self._client = client - self._persist_directory = persist_directory - else: - if client_settings: - _client_settings = client_settings - elif persist_directory: - # Maintain backwards compatibility with chromadb < 0.4.0 - major, minor, _ = chromadb.__version__.split(".") - if int(major) == 0 and int(minor) < 4: - _client_settings = chromadb.config.Settings( - chroma_db_impl="duckdb+parquet", - ) - else: - _client_settings = chromadb.config.Settings(is_persistent=True) - _client_settings.persist_directory = persist_directory - else: - _client_settings = chromadb.config.Settings() - self._client_settings = _client_settings - self._client = chromadb.Client(_client_settings) - self._persist_directory = ( - _client_settings.persist_directory or persist_directory - ) - - self._embedding_function = embedding_function - self._collection = self._client.get_or_create_collection( - name=collection_name, - embedding_function=self._embedding_function.embed_documents - if self._embedding_function is not None - else None, - metadata=collection_metadata, - ) - self.override_relevance_score_fn = relevance_score_fn - - @xor_args(("query_texts", "query_embeddings")) - def __query_collection( - self, - query_texts: Optional[List[str]] = None, - query_embeddings: Optional[List[List[float]]] = None, - n_results: int = 4, - where: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Query the chroma collection.""" - try: - import chromadb # noqa: F401 - except ImportError: - raise ValueError( - "Could not import chromadb python package. " - "Please install it with `pip install chromadb`." - ) - return self._collection.query( - query_texts=query_texts, - query_embeddings=query_embeddings, - n_results=n_results, - where=where, - **kwargs, - ) - - def add_texts( - self, - texts: Iterable[str], - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - **kwargs: Any, - ) -> List[str]: - """Run more texts through the embeddings and add to the vectorstore. - - Args: - texts (Iterable[str]): Texts to add to the vectorstore. - metadatas (Optional[List[dict]], optional): Optional list of metadatas. - ids (Optional[List[str]], optional): Optional list of IDs. - - Returns: - List[str]: List of IDs of the added texts. - """ - # TODO: Handle the case where the user doesn't provide ids on the Collection - if ids is None: - ids = [str(uuid.uuid1()) for _ in texts] - embeddings = None - if self._embedding_function is not None: - embeddings = self._embedding_function.embed_documents(list(texts)) - - if metadatas: - texts = list(texts) - empty = [] - non_empty = [] - for i, m in enumerate(metadatas): - if m: - non_empty.append(i) - else: - empty.append(i) - if non_empty: - metadatas = [metadatas[i] for i in non_empty] - texts_with_metadatas = [texts[i] for i in non_empty] - embeddings_with_metadatas = ( - [embeddings[i] for i in non_empty] if embeddings else None - ) - ids_with_metadata = [ids[i] for i in non_empty] - self._collection.upsert( - metadatas=metadatas, - embeddings=embeddings_with_metadatas, - documents=texts_with_metadatas, - ids=ids_with_metadata, - ) - - texts = [texts[j] for j in empty] - embeddings = [embeddings[j] for j in empty] if embeddings else None - ids = [ids[j] for j in empty] - - if texts: - self._collection.upsert(embeddings=embeddings, documents=texts, ids=ids) - return ids - - def similarity_search( - self, - query: str, - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Run similarity search with Chroma. - - Args: - query (str): Query text to search for. - k (int): Number of results to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List[Document]: List of documents most similar to the query text. - """ - docs_and_scores = self.similarity_search_with_score(query, k, filter=filter) - return [doc for doc, _ in docs_and_scores] - - def similarity_search_by_vector( - self, - embedding: List[float], - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs most similar to embedding vector. - Args: - embedding (List[float]): Embedding to look up documents similar to. - k (int): Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - Returns: - List of Documents most similar to the query vector. - """ - results = self.__query_collection( - query_embeddings=embedding, n_results=k, where=filter - ) - return _results_to_docs(results) - - def similarity_search_by_vector_with_relevance_scores( - self, - embedding: List[float], - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """ - Return docs most similar to embedding vector and similarity score. - - Args: - embedding (List[float]): Embedding to look up documents similar to. - k (int): Number of Documents to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List[Tuple[Document, float]]: List of documents most similar to - the query text and cosine distance in float for each. - Lower score represents more similarity. - """ - results = self.__query_collection( - query_embeddings=embedding, n_results=k, where=filter - ) - return _results_to_docs_and_scores(results) - - def similarity_search_with_score( - self, - query: str, - k: int = DEFAULT_K, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Tuple[Document, float]]: - """Run similarity search with Chroma with distance. - - Args: - query (str): Query text to search for. - k (int): Number of results to return. Defaults to 4. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List[Tuple[Document, float]]: List of documents most similar to - the query text and cosine distance in float for each. - Lower score represents more similarity. - """ - if self._embedding_function is None: - results = self.__query_collection( - query_texts=[query], n_results=k, where=filter - ) - else: - query_embedding = self._embedding_function.embed_query(query) - results = self.__query_collection( - query_embeddings=[query_embedding], n_results=k, where=filter - ) - - return _results_to_docs_and_scores(results) - - def _select_relevance_score_fn(self) -> Callable[[float], float]: - """ - The 'correct' relevance function - may differ depending on a few things, including: - - the distance / similarity metric used by the VectorStore - - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - - embedding dimensionality - - etc. - """ - if self.override_relevance_score_fn: - return self.override_relevance_score_fn - - distance = "l2" - distance_key = "hnsw:space" - metadata = self._collection.metadata - - if metadata and distance_key in metadata: - distance = metadata[distance_key] - - if distance == "cosine": - return self._cosine_relevance_score_fn - elif distance == "l2": - return self._euclidean_relevance_score_fn - elif distance == "ip": - return self._max_inner_product_relevance_score_fn - else: - raise ValueError( - "No supported normalization function" - f" for distance metric of type: {distance}." - "Consider providing relevance_score_fn to Chroma constructor." - ) - - def max_marginal_relevance_search_by_vector( - self, - embedding: List[float], - k: int = DEFAULT_K, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance. - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - embedding: Embedding to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch to pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List of Documents selected by maximal marginal relevance. - """ - - results = self.__query_collection( - query_embeddings=embedding, - n_results=fetch_k, - where=filter, - include=["metadatas", "documents", "distances", "embeddings"], - ) - mmr_selected = maximal_marginal_relevance( - np.array(embedding, dtype=np.float32), - results["embeddings"][0], - k=k, - lambda_mult=lambda_mult, - ) - - candidates = _results_to_docs(results) - - selected_results = [r for i, r in enumerate(candidates) if i in mmr_selected] - return selected_results - - def max_marginal_relevance_search( - self, - query: str, - k: int = DEFAULT_K, - fetch_k: int = 20, - lambda_mult: float = 0.5, - filter: Optional[Dict[str, str]] = None, - **kwargs: Any, - ) -> List[Document]: - """Return docs selected using the maximal marginal relevance. - Maximal marginal relevance optimizes for similarity to query AND diversity - among selected documents. - - Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. - fetch_k: Number of Documents to fetch to pass to MMR algorithm. - lambda_mult: Number between 0 and 1 that determines the degree - of diversity among the results with 0 corresponding - to maximum diversity and 1 to minimum diversity. - Defaults to 0.5. - filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. - - Returns: - List of Documents selected by maximal marginal relevance. - """ - if self._embedding_function is None: - raise ValueError( - "For MMR search, you must specify an embedding function on" "creation." - ) - - embedding = self._embedding_function.embed_query(query) - docs = self.max_marginal_relevance_search_by_vector( - embedding, k, fetch_k, lambda_mult=lambda_mult, filter=filter - ) - return docs - - def delete_collection(self) -> None: - """Delete the collection.""" - self._client.delete_collection(self._collection.name) - - def get( - self, - ids: Optional[OneOrMany[ID]] = None, - where: Optional[Where] = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - where_document: Optional[WhereDocument] = None, - include: Optional[List[str]] = None, - ) -> Dict[str, Any]: - """Gets the collection. - - Args: - ids: The ids of the embeddings to get. Optional. - where: A Where type dict used to filter results by. - E.g. `{"color" : "red", "price": 4.20}`. Optional. - limit: The number of documents to return. Optional. - offset: The offset to start returning results from. - Useful for paging results with limit. Optional. - where_document: A WhereDocument type dict used to filter by the documents. - E.g. `{$contains: {"text": "hello"}}`. Optional. - include: A list of what to include in the results. - Can contain `"embeddings"`, `"metadatas"`, `"documents"`. - Ids are always included. - Defaults to `["metadatas", "documents"]`. Optional. - """ - kwargs = { - "ids": ids, - "where": where, - "limit": limit, - "offset": offset, - "where_document": where_document, - } - - if include is not None: - kwargs["include"] = include - - return self._collection.get(**kwargs) - - def persist(self) -> None: - """Persist the collection. - - This can be used to explicitly persist the data to disk. - It will also be called automatically when the object is destroyed. - """ - if self._persist_directory is None: - raise ValueError( - "You must specify a persist_directory on" - "creation to persist the collection." - ) - import chromadb - - # Maintain backwards compatibility with chromadb < 0.4.0 - major, minor, _ = chromadb.__version__.split(".") - if int(major) == 0 and int(minor) < 4: - self._client.persist() - - def update_document(self, document_id: str, document: Document) -> None: - """Update a document in the collection. - - Args: - document_id (str): ID of the document to update. - document (Document): Document to update. - """ - text = document.page_content - metadata = document.metadata - if self._embedding_function is None: - raise ValueError( - "For update, you must specify an embedding function on creation." - ) - embeddings = self._embedding_function.embed_documents([text]) - - self._collection.update( - ids=[document_id], - embeddings=embeddings, - documents=[text], - metadatas=[metadata], - ) - - @classmethod - def from_texts( - cls: Type[Chroma], - texts: List[str], - embedding: Optional[Embeddings] = None, - metadatas: Optional[List[dict]] = None, - ids: Optional[List[str]] = None, - collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, - persist_directory: Optional[str] = None, - client_settings: Optional[chromadb.config.Settings] = None, - client: Optional[chromadb.Client] = None, - collection_metadata: Optional[Dict] = None, - **kwargs: Any, - ) -> Chroma: - """Create a Chroma vectorstore from a raw documents. - - If a persist_directory is specified, the collection will be persisted there. - Otherwise, the data will be ephemeral in-memory. - - Args: - texts (List[str]): List of texts to add to the collection. - collection_name (str): Name of the collection to create. - persist_directory (Optional[str]): Directory to persist the collection. - embedding (Optional[Embeddings]): Embedding function. Defaults to None. - metadatas (Optional[List[dict]]): List of metadatas. Defaults to None. - ids (Optional[List[str]]): List of document IDs. Defaults to None. - client_settings (Optional[chromadb.config.Settings]): Chroma client settings - collection_metadata (Optional[Dict]): Collection configurations. - Defaults to None. - - Returns: - Chroma: Chroma vectorstore. - """ - chroma_collection = cls( - collection_name=collection_name, - embedding_function=embedding, - persist_directory=persist_directory, - client_settings=client_settings, - client=client, - collection_metadata=collection_metadata, - **kwargs, - ) - chroma_collection.add_texts(texts=texts, metadatas=metadatas, ids=ids) - return chroma_collection - - @classmethod - def from_documents( - cls: Type[Chroma], - documents: List[Document], - embedding: Optional[Embeddings] = None, - ids: Optional[List[str]] = None, - collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, - persist_directory: Optional[str] = None, - client_settings: Optional[chromadb.config.Settings] = None, - client: Optional[chromadb.Client] = None, # Add this line - collection_metadata: Optional[Dict] = None, - **kwargs: Any, - ) -> Chroma: - """Create a Chroma vectorstore from a list of documents. - - If a persist_directory is specified, the collection will be persisted there. - Otherwise, the data will be ephemeral in-memory. - - Args: - collection_name (str): Name of the collection to create. - persist_directory (Optional[str]): Directory to persist the collection. - ids (Optional[List[str]]): List of document IDs. Defaults to None. - documents (List[Document]): List of documents to add to the vectorstore. - embedding (Optional[Embeddings]): Embedding function. Defaults to None. - client_settings (Optional[chromadb.config.Settings]): Chroma client settings - collection_metadata (Optional[Dict]): Collection configurations. - Defaults to None. - - Returns: - Chroma: Chroma vectorstore. - """ - texts = [doc.page_content for doc in documents] - metadatas = [doc.metadata for doc in documents] - return cls.from_texts( - texts=texts, - embedding=embedding, - metadatas=metadatas, - ids=ids, - collection_name=collection_name, - persist_directory=persist_directory, - client_settings=client_settings, - client=client, - collection_metadata=collection_metadata, - **kwargs, - ) - - def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None: - """Delete by vector IDs. - - Args: - ids: List of ids to delete. - """ - self._collection.delete(ids=ids)