abstract swarms class documentation

pull/64/head
Kye 1 year ago
parent 0239398bec
commit 015099aa68

@ -0,0 +1,514 @@
# `AbstractSwarm` Documentation
## Table of Contents
1. [Introduction](#introduction)
2. [Class Definition](#class-definition)
3. [Methods](#methods)
- [communicate()](#communicate)
- [run()](#run)
- [arun()](#arun)
- [add_worker(worker)](#add_worker)
- [remove_worker(worker)](#remove_worker)
- [broadcast(message, sender)](#broadcast)
- [reset()](#reset)
- [plan(task)](#plan)
- [direct_message(message, sender, recipient)](#direct_message)
- [autoscaler(num_workers, worker)](#autoscaler)
- [get_worker_by_id(id)](#get_worker_by_id)
- [get_worker_by_name(name)](#get_worker_by_name)
- [assign_task(worker, task)](#assign_task)
- [get_all_tasks(worker, task)](#get_all_tasks)
- [get_finished_tasks()](#get_finished_tasks)
- [get_pending_tasks()](#get_pending_tasks)
- [pause_worker(worker, worker_id)](#pause_worker)
- [resume_worker(worker, worker_id)](#resume_worker)
- [stop_worker(worker, worker_id)](#stop_worker)
- [restart_worker(worker)](#restart_worker)
- [scale_up(num_worker)](#scale_up)
- [scale_down(num_worker)](#scale_down)
- [scale_to(num_worker)](#scale_to)
- [get_all_workers()](#get_all_workers)
- [get_swarm_size()](#get_swarm_size)
- [get_swarm_status()](#get_swarm_status)
- [save_swarm_state()](#save_swarm_state)
---
## 1. Introduction <a name="introduction"></a>
The Swarms library is designed to provide a framework for swarm simulation architectures. Swarms are collections of autonomous agents or workers that collaborate to perform tasks and achieve common goals. This documentation will guide you through the functionality and usage of the Swarms library, explaining the purpose and implementation details of the provided classes and methods.
## 2. Class Definition <a name="class-definition"></a>
### `AbstractSwarm` Class
The `AbstractSwarm` class is an abstract base class that serves as the foundation for swarm simulation architectures. It defines the core functionality and methods required to manage and interact with a swarm of workers.
```python
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from swarms.swarms.base import AbstractWorker
class AbstractSwarm(ABC):
"""
Abstract class for swarm simulation architectures
Methods:
---------
...
"""
# The class definition and constructor are provided here.
@abstractmethod
def __init__(self, workers: List["AbstractWorker"]):
"""Initialize the swarm with workers"""
pass
# Other abstract methods are listed here.
```
## 3. Methods <a name="methods"></a>
### `communicate()` <a name="communicate"></a>
The `communicate()` method allows the swarm to exchange information through the orchestrator, protocols, and the universal communication layer.
**Usage Example 1:**
```python
swarm = YourSwarmClass(workers)
swarm.communicate()
```
**Usage Example 2:**
```python
# Another example of using the communicate method
swarm = YourSwarmClass(workers)
swarm.communicate()
```
### `run()` <a name="run"></a>
The `run()` method executes the swarm, initiating its activities.
**Usage Example 1:**
```python
swarm = YourSwarmClass(workers)
swarm.run()
```
**Usage Example 2:**
```python
# Another example of running the swarm
swarm = YourSwarmClass(workers)
swarm.run()
```
### `arun()` <a name="arun"></a>
The `arun()` method runs the swarm asynchronously, allowing for parallel execution of tasks.
**Usage Example 1:**
```python
swarm = YourSwarmClass(workers)
swarm.arun()
```
**Usage Example 2:**
```python
# Another example of running the swarm asynchronously
swarm = YourSwarmClass(workers)
swarm.arun()
```
### `add_worker(worker: "AbstractWorker")` <a name="add_worker"></a>
The `add_worker()` method adds a worker to the swarm.
**Parameters:**
- `worker` (AbstractWorker): The worker to be added to the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass([])
worker = YourWorkerClass()
swarm.add_worker(worker)
```
### `remove_worker(worker: "AbstractWorker")` <a name="remove_worker"></a>
The `remove_worker()` method removes a worker from the swarm.
**Parameters:**
- `worker` (AbstractWorker): The worker to be removed from the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_id("worker_id")
swarm.remove_worker(worker)
```
### `broadcast(message: str, sender: Optional["AbstractWorker"] = None)` <a name="broadcast"></a>
The `broadcast()` method sends a message to all workers in the swarm.
**Parameters:**
- `message` (str): The message to be broadcasted.
- `sender` (Optional[AbstractWorker]): The sender of the message (optional).
**Usage Example 1:**
```python
swarm = YourSwarmClass(workers)
message = "Hello, everyone!"
swarm.broadcast(message)
```
**Usage Example 2:**
```python
# Another example of broadcasting a message
swarm = YourSwarmClass(workers)
message = "Important announcement!"
sender = swarm.get_worker_by_name("Supervisor")
swarm.broadcast(message, sender)
```
### `reset()` <a name="reset"></a>
The `reset()` method resets the swarm to its initial state.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm.reset()
```
### `plan(task: str)` <a name="plan"></a>
The `plan()` method instructs workers to individually plan using a workflow or pipeline for a specified task.
**Parameters:**
- `task` (str): The task for which workers should plan.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
task = "Perform data analysis"
swarm.plan(task)
```
### `direct_message(message: str, sender: "AbstractWorker", recipient: "AbstractWorker")` <a name="direct_message"></a>
The `direct_message()` method sends a direct message from one worker to another.
**Parameters:**
- `message` (str): The message to be sent.
- `sender` (AbstractWorker): The sender of the message.
- `recipient` (AbstractWorker): The recipient of the message.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
sender = swarm.get_worker_by_name("Worker1")
recipient = swarm.get_worker_by_name("Worker2")
message = "Hello
, Worker2!"
swarm.direct_message(message, sender, recipient)
```
### `autoscaler(num_workers: int, worker: List["AbstractWorker"])` <a name="autoscaler"></a>
The `autoscaler()` method acts as an autoscaler, dynamically adjusting the number of workers based on system load or other criteria.
**Parameters:**
- `num_workers` (int): The desired number of workers.
- `worker` (List[AbstractWorker]): A list of workers to be managed by the autoscaler.
**Usage Example:**
```python
swarm = YourSwarmClass([])
workers = [YourWorkerClass() for _ in range(10)]
swarm.autoscaler(5, workers)
```
### `get_worker_by_id(id: str) -> "AbstractWorker"` <a name="get_worker_by_id"></a>
The `get_worker_by_id()` method locates a worker in the swarm by their ID.
**Parameters:**
- `id` (str): The ID of the worker to locate.
**Returns:**
- `AbstractWorker`: The worker with the specified ID.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker_id = "worker_123"
worker = swarm.get_worker_by_id(worker_id)
```
### `get_worker_by_name(name: str) -> "AbstractWorker"` <a name="get_worker_by_name"></a>
The `get_worker_by_name()` method locates a worker in the swarm by their name.
**Parameters:**
- `name` (str): The name of the worker to locate.
**Returns:**
- `AbstractWorker`: The worker with the specified name.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker_name = "Alice"
worker = swarm.get_worker_by_name(worker_name)
```
### `assign_task(worker: "AbstractWorker", task: Any) -> Dict` <a name="assign_task"></a>
The `assign_task()` method assigns a task to a specific worker.
**Parameters:**
- `worker` (AbstractWorker): The worker to whom the task should be assigned.
- `task` (Any): The task to be assigned.
**Returns:**
- `Dict`: A dictionary indicating the status of the task assignment.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_name("Worker1")
task = "Perform data analysis"
result = swarm.assign_task(worker, task)
```
### `get_all_tasks(worker: "AbstractWorker", task: Any)` <a name="get_all_tasks"></a>
The `get_all_tasks()` method retrieves all tasks assigned to a specific worker.
**Parameters:**
- `worker` (AbstractWorker): The worker for whom tasks should be retrieved.
- `task` (Any): The task to be retrieved.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_name("Worker1")
tasks = swarm.get_all_tasks(worker, "data analysis")
```
### `get_finished_tasks() -> List[Dict]` <a name="get_finished_tasks"></a>
The `get_finished_tasks()` method retrieves all tasks that have been completed by the workers in the swarm.
**Returns:**
- `List[Dict]`: A list of dictionaries representing finished tasks.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
finished_tasks = swarm.get_finished_tasks()
```
### `get_pending_tasks() -> List[Dict]` <a name="get_pending_tasks"></a>
The `get_pending_tasks()` method retrieves all tasks that are pending or yet to be completed by the workers in the swarm.
**Returns:**
- `List[Dict]`: A list of dictionaries representing pending tasks.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
pending_tasks = swarm.get_pending_tasks()
```
### `pause_worker(worker: "AbstractWorker", worker_id: str)` <a name="pause_worker"></a>
The `pause_worker()` method pauses a specific worker, temporarily suspending their activities.
**Parameters:**
- `worker` (AbstractWorker): The worker to be paused.
- `worker_id` (str): The ID of the worker to be paused.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_name("Worker1")
worker_id = "worker_123"
swarm.pause_worker(worker, worker_id)
```
### `resume_worker(worker: "AbstractWorker", worker_id: str)` <a name="resume_worker"></a>
The `resume_worker()` method resumes a paused worker, allowing them to continue their activities.
**Parameters:**
- `worker` (AbstractWorker): The worker to be resumed.
- `worker_id` (str): The ID of the worker to be resumed.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_name("Worker1")
worker_id = "worker_123"
swarm.resume_worker(worker, worker_id)
```
### `stop_worker(worker: "AbstractWorker", worker_id: str)` <a name="stop_worker"></a>
The `stop_worker()` method stops a specific worker, terminating their activities.
**Parameters:**
- `worker` (AbstractWorker): The worker to be stopped.
- `worker_id` (str): The ID of the worker to be stopped.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_name("Worker1")
worker_id = "worker_123"
swarm.stop_worker(worker, worker_id)
```
### `restart_worker(worker: "AbstractWorker")` <a name="restart_worker"></a>
The `restart_worker()` method restarts a worker, resetting them to their initial state.
**Parameters:**
- `worker` (AbstractWorker): The worker to be restarted.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
worker = swarm.get_worker_by_name("Worker1")
swarm.restart_worker(worker)
```
### `scale_up(num_worker: int)` <a name="scale_up"></a>
The `scale_up()` method increases the number of workers in the swarm.
**Parameters:**
- `num_worker` (int): The number of workers to add to the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm.scale_up(5)
```
### `scale_down(num_worker: int)` <a name="scale_down"></a>
The `scale_down()` method decreases the number of workers in the swarm.
**Parameters:**
- `num_worker` (int): The number of workers to remove from the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm.scale_down(3)
```
### `scale_to(num_worker: int)` <a name="scale_to"></a>
The `scale_to()` method scales the swarm to a specific number of workers.
**Parameters:**
- `num_worker` (int): The desired number of workers.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm.scale_to(10)
```
### `get
_all_workers() -> List["AbstractWorker"]` <a name="get_all_workers"></a>
The `get_all_workers()` method retrieves a list of all workers in the swarm.
**Returns:**
- `List[AbstractWorker]`: A list of all workers in the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
all_workers = swarm.get_all_workers()
```
### `get_swarm_size() -> int` <a name="get_swarm_size"></a>
The `get_swarm_size()` method returns the size of the swarm, which is the total number of workers.
**Returns:**
- `int`: The size of the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm_size = swarm.get_swarm_size()
```
### `get_swarm_status() -> Dict` <a name="get_swarm_status"></a>
The `get_swarm_status()` method provides information about the current status of the swarm.
**Returns:**
- `Dict`: A dictionary containing various status indicators for the swarm.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm_status = swarm.get_swarm_status()
```
### `save_swarm_state()` <a name="save_swarm_state"></a>
The `save_swarm_state()` method allows you to save the current state of the swarm, including worker configurations and task assignments.
**Usage Example:**
```python
swarm = YourSwarmClass(workers)
swarm.save_swarm_state()
```
---
This comprehensive documentation covers the Swarms library, including the `AbstractSwarm` class and its methods. You can use this documentation as a guide to understanding and effectively utilizing the Swarms framework for swarm simulation architectures. Feel free to explore further and adapt the library to your specific use cases.

@ -78,6 +78,7 @@ nav:
- Swarms: - Swarms:
- Overview: "swarms/index.md" - Overview: "swarms/index.md"
- swarms.swarms: - swarms.swarms:
- AbstractSwarm: "swarms/swarms/abstractswarm.md"
- AutoScaler: "swarms/swarms/autoscaler.md" - AutoScaler: "swarms/swarms/autoscaler.md"
- swarms.workers: - swarms.workers:
- Overview: "swarms/workers/index.md" - Overview: "swarms/workers/index.md"

@ -1,593 +0,0 @@
"""Wrapper around ChromaDB embeddings platform."""
from __future__ import annotations
import logging
import uuid
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
)
import numpy as np
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.utils import xor_args
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
import chromadb
import chromadb.config
from chromadb.api.types import ID, OneOrMany, Where, WhereDocument
logger = logging.getLogger()
DEFAULT_K = 4 # Number of Documents to return.
def _results_to_docs(results: Any) -> List[Document]:
return [doc for doc, _ in _results_to_docs_and_scores(results)]
def _results_to_docs_and_scores(results: Any) -> List[Tuple[Document, float]]:
return [
# TODO: Chroma can do batch querying,
# we shouldn't hard code to the 1st result
(Document(page_content=result[0], metadata=result[1] or {}), result[2])
for result in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
)
]
class Chroma(VectorStore):
"""Wrapper around ChromaDB embeddings platform.
To use, you should have the ``chromadb`` python package installed.
Example:
.. code-block:: python
from langchain.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma("langchain_store", embeddings)
"""
_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain"
def __init__(
self,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
embedding_function: Optional[Embeddings] = None,
persist_directory: Optional[str] = None,
client_settings: Optional[chromadb.config.Settings] = None,
collection_metadata: Optional[Dict] = None,
client: Optional[chromadb.Client] = None,
relevance_score_fn: Optional[Callable[[float], float]] = None,
) -> None:
"""Initialize with Chroma client."""
try:
import chromadb
import chromadb.config
except ImportError:
raise ValueError(
"Could not import chromadb python package. "
"Please install it with `pip install chromadb`."
)
if client is not None:
self._client_settings = client_settings
self._client = client
self._persist_directory = persist_directory
else:
if client_settings:
_client_settings = client_settings
elif persist_directory:
# Maintain backwards compatibility with chromadb < 0.4.0
major, minor, _ = chromadb.__version__.split(".")
if int(major) == 0 and int(minor) < 4:
_client_settings = chromadb.config.Settings(
chroma_db_impl="duckdb+parquet",
)
else:
_client_settings = chromadb.config.Settings(is_persistent=True)
_client_settings.persist_directory = persist_directory
else:
_client_settings = chromadb.config.Settings()
self._client_settings = _client_settings
self._client = chromadb.Client(_client_settings)
self._persist_directory = (
_client_settings.persist_directory or persist_directory
)
self._embedding_function = embedding_function
self._collection = self._client.get_or_create_collection(
name=collection_name,
embedding_function=self._embedding_function.embed_documents
if self._embedding_function is not None
else None,
metadata=collection_metadata,
)
self.override_relevance_score_fn = relevance_score_fn
@xor_args(("query_texts", "query_embeddings"))
def __query_collection(
self,
query_texts: Optional[List[str]] = None,
query_embeddings: Optional[List[List[float]]] = None,
n_results: int = 4,
where: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""Query the chroma collection."""
try:
import chromadb # noqa: F401
except ImportError:
raise ValueError(
"Could not import chromadb python package. "
"Please install it with `pip install chromadb`."
)
return self._collection.query(
query_texts=query_texts,
query_embeddings=query_embeddings,
n_results=n_results,
where=where,
**kwargs,
)
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts (Iterable[str]): Texts to add to the vectorstore.
metadatas (Optional[List[dict]], optional): Optional list of metadatas.
ids (Optional[List[str]], optional): Optional list of IDs.
Returns:
List[str]: List of IDs of the added texts.
"""
# TODO: Handle the case where the user doesn't provide ids on the Collection
if ids is None:
ids = [str(uuid.uuid1()) for _ in texts]
embeddings = None
if self._embedding_function is not None:
embeddings = self._embedding_function.embed_documents(list(texts))
if metadatas:
texts = list(texts)
empty = []
non_empty = []
for i, m in enumerate(metadatas):
if m:
non_empty.append(i)
else:
empty.append(i)
if non_empty:
metadatas = [metadatas[i] for i in non_empty]
texts_with_metadatas = [texts[i] for i in non_empty]
embeddings_with_metadatas = (
[embeddings[i] for i in non_empty] if embeddings else None
)
ids_with_metadata = [ids[i] for i in non_empty]
self._collection.upsert(
metadatas=metadatas,
embeddings=embeddings_with_metadatas,
documents=texts_with_metadatas,
ids=ids_with_metadata,
)
texts = [texts[j] for j in empty]
embeddings = [embeddings[j] for j in empty] if embeddings else None
ids = [ids[j] for j in empty]
if texts:
self._collection.upsert(embeddings=embeddings, documents=texts, ids=ids)
return ids
def similarity_search(
self,
query: str,
k: int = DEFAULT_K,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""Run similarity search with Chroma.
Args:
query (str): Query text to search for.
k (int): Number of results to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List[Document]: List of documents most similar to the query text.
"""
docs_and_scores = self.similarity_search_with_score(query, k, filter=filter)
return [doc for doc, _ in docs_and_scores]
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to embedding vector.
Args:
embedding (List[float]): Embedding to look up documents similar to.
k (int): Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query vector.
"""
results = self.__query_collection(
query_embeddings=embedding, n_results=k, where=filter
)
return _results_to_docs(results)
def similarity_search_by_vector_with_relevance_scores(
self,
embedding: List[float],
k: int = DEFAULT_K,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Return docs most similar to embedding vector and similarity score.
Args:
embedding (List[float]): Embedding to look up documents similar to.
k (int): Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List[Tuple[Document, float]]: List of documents most similar to
the query text and cosine distance in float for each.
Lower score represents more similarity.
"""
results = self.__query_collection(
query_embeddings=embedding, n_results=k, where=filter
)
return _results_to_docs_and_scores(results)
def similarity_search_with_score(
self,
query: str,
k: int = DEFAULT_K,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Run similarity search with Chroma with distance.
Args:
query (str): Query text to search for.
k (int): Number of results to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List[Tuple[Document, float]]: List of documents most similar to
the query text and cosine distance in float for each.
Lower score represents more similarity.
"""
if self._embedding_function is None:
results = self.__query_collection(
query_texts=[query], n_results=k, where=filter
)
else:
query_embedding = self._embedding_function.embed_query(query)
results = self.__query_collection(
query_embeddings=[query_embedding], n_results=k, where=filter
)
return _results_to_docs_and_scores(results)
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
The 'correct' relevance function
may differ depending on a few things, including:
- the distance / similarity metric used by the VectorStore
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
- embedding dimensionality
- etc.
"""
if self.override_relevance_score_fn:
return self.override_relevance_score_fn
distance = "l2"
distance_key = "hnsw:space"
metadata = self._collection.metadata
if metadata and distance_key in metadata:
distance = metadata[distance_key]
if distance == "cosine":
return self._cosine_relevance_score_fn
elif distance == "l2":
return self._euclidean_relevance_score_fn
elif distance == "ip":
return self._max_inner_product_relevance_score_fn
else:
raise ValueError(
"No supported normalization function"
f" for distance metric of type: {distance}."
"Consider providing relevance_score_fn to Chroma constructor."
)
def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = DEFAULT_K,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents selected by maximal marginal relevance.
"""
results = self.__query_collection(
query_embeddings=embedding,
n_results=fetch_k,
where=filter,
include=["metadatas", "documents", "distances", "embeddings"],
)
mmr_selected = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
results["embeddings"][0],
k=k,
lambda_mult=lambda_mult,
)
candidates = _results_to_docs(results)
selected_results = [r for i, r in enumerate(candidates) if i in mmr_selected]
return selected_results
def max_marginal_relevance_search(
self,
query: str,
k: int = DEFAULT_K,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents selected by maximal marginal relevance.
"""
if self._embedding_function is None:
raise ValueError(
"For MMR search, you must specify an embedding function on" "creation."
)
embedding = self._embedding_function.embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
embedding, k, fetch_k, lambda_mult=lambda_mult, filter=filter
)
return docs
def delete_collection(self) -> None:
"""Delete the collection."""
self._client.delete_collection(self._collection.name)
def get(
self,
ids: Optional[OneOrMany[ID]] = None,
where: Optional[Where] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
where_document: Optional[WhereDocument] = None,
include: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Gets the collection.
Args:
ids: The ids of the embeddings to get. Optional.
where: A Where type dict used to filter results by.
E.g. `{"color" : "red", "price": 4.20}`. Optional.
limit: The number of documents to return. Optional.
offset: The offset to start returning results from.
Useful for paging results with limit. Optional.
where_document: A WhereDocument type dict used to filter by the documents.
E.g. `{$contains: {"text": "hello"}}`. Optional.
include: A list of what to include in the results.
Can contain `"embeddings"`, `"metadatas"`, `"documents"`.
Ids are always included.
Defaults to `["metadatas", "documents"]`. Optional.
"""
kwargs = {
"ids": ids,
"where": where,
"limit": limit,
"offset": offset,
"where_document": where_document,
}
if include is not None:
kwargs["include"] = include
return self._collection.get(**kwargs)
def persist(self) -> None:
"""Persist the collection.
This can be used to explicitly persist the data to disk.
It will also be called automatically when the object is destroyed.
"""
if self._persist_directory is None:
raise ValueError(
"You must specify a persist_directory on"
"creation to persist the collection."
)
import chromadb
# Maintain backwards compatibility with chromadb < 0.4.0
major, minor, _ = chromadb.__version__.split(".")
if int(major) == 0 and int(minor) < 4:
self._client.persist()
def update_document(self, document_id: str, document: Document) -> None:
"""Update a document in the collection.
Args:
document_id (str): ID of the document to update.
document (Document): Document to update.
"""
text = document.page_content
metadata = document.metadata
if self._embedding_function is None:
raise ValueError(
"For update, you must specify an embedding function on creation."
)
embeddings = self._embedding_function.embed_documents([text])
self._collection.update(
ids=[document_id],
embeddings=embeddings,
documents=[text],
metadatas=[metadata],
)
@classmethod
def from_texts(
cls: Type[Chroma],
texts: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
persist_directory: Optional[str] = None,
client_settings: Optional[chromadb.config.Settings] = None,
client: Optional[chromadb.Client] = None,
collection_metadata: Optional[Dict] = None,
**kwargs: Any,
) -> Chroma:
"""Create a Chroma vectorstore from a raw documents.
If a persist_directory is specified, the collection will be persisted there.
Otherwise, the data will be ephemeral in-memory.
Args:
texts (List[str]): List of texts to add to the collection.
collection_name (str): Name of the collection to create.
persist_directory (Optional[str]): Directory to persist the collection.
embedding (Optional[Embeddings]): Embedding function. Defaults to None.
metadatas (Optional[List[dict]]): List of metadatas. Defaults to None.
ids (Optional[List[str]]): List of document IDs. Defaults to None.
client_settings (Optional[chromadb.config.Settings]): Chroma client settings
collection_metadata (Optional[Dict]): Collection configurations.
Defaults to None.
Returns:
Chroma: Chroma vectorstore.
"""
chroma_collection = cls(
collection_name=collection_name,
embedding_function=embedding,
persist_directory=persist_directory,
client_settings=client_settings,
client=client,
collection_metadata=collection_metadata,
**kwargs,
)
chroma_collection.add_texts(texts=texts, metadatas=metadatas, ids=ids)
return chroma_collection
@classmethod
def from_documents(
cls: Type[Chroma],
documents: List[Document],
embedding: Optional[Embeddings] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
persist_directory: Optional[str] = None,
client_settings: Optional[chromadb.config.Settings] = None,
client: Optional[chromadb.Client] = None, # Add this line
collection_metadata: Optional[Dict] = None,
**kwargs: Any,
) -> Chroma:
"""Create a Chroma vectorstore from a list of documents.
If a persist_directory is specified, the collection will be persisted there.
Otherwise, the data will be ephemeral in-memory.
Args:
collection_name (str): Name of the collection to create.
persist_directory (Optional[str]): Directory to persist the collection.
ids (Optional[List[str]]): List of document IDs. Defaults to None.
documents (List[Document]): List of documents to add to the vectorstore.
embedding (Optional[Embeddings]): Embedding function. Defaults to None.
client_settings (Optional[chromadb.config.Settings]): Chroma client settings
collection_metadata (Optional[Dict]): Collection configurations.
Defaults to None.
Returns:
Chroma: Chroma vectorstore.
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return cls.from_texts(
texts=texts,
embedding=embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
persist_directory=persist_directory,
client_settings=client_settings,
client=client,
collection_metadata=collection_metadata,
**kwargs,
)
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None:
"""Delete by vector IDs.
Args:
ids: List of ids to delete.
"""
self._collection.delete(ids=ids)
Loading…
Cancel
Save