[TESTS][FIXES] [DOCS][Conversation]

pull/334/head
Kye 1 year ago
parent e9bb8dcbf4
commit aa4f3d1563

@ -311,6 +311,41 @@ efficiency_analysis = efficiency_agent.run(
"Analyze the efficiency of the factory's manufacturing process",
factory_image,
)
```
### Gemini
- Deploy Gemini from Google with utmost reliability with our visual chain of thought prompt that enables more reliable responses
```python
import os
from dotenv import load_dotenv
from swarms.models import Gemini
from swarms.prompts.visual_cot import VISUAL_CHAIN_OF_THOUGHT
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("GEMINI_API_KEY")
# Initialize the language model
llm = Gemini(
gemini_api_key=api_key,
temperature=0.5,
max_tokens=1000,
system_prompt=VISUAL_CHAIN_OF_THOUGHT,
)
# Initialize the task
task = "This is an eye test. What do you see?"
img = "playground/demos/multi_modal_chain_of_thought/eyetest.jpg"
# Run the workflow on a task
out = llm.run(task=task, img=img)
print(out)
```
---

@ -1,132 +1,265 @@
# Conversation Module Documentation
# Module/Class Name: Conversation
## Introduction
The `Conversation` class is a powerful tool for managing and structuring conversation data in a Python program. It enables you to create, manipulate, and analyze conversations easily. This documentation will provide you with a comprehensive understanding of the `Conversation` class, its attributes, methods, and how to effectively use it.
## Table of Contents
1. [Introduction](#introduction)
2. [Installation](#installation)
3. [Class: Conversation](#class-conversation)
- [Attributes](#attributes)
- [Methods](#methods)
4. [Usage Examples](#usage-examples)
- [Example 1: Creating a Conversation](#example-1-creating-a-conversation)
- [Example 2: Adding Messages](#example-2-adding-messages)
- [Example 3: Displaying and Exporting Conversation](#example-3-displaying-and-exporting-conversation)
- [Example 4: Counting Messages by Role](#example-4-counting-messages-by-role)
- [Example 5: Loading and Searching](#example-5-loading-and-searching)
5. [Additional Information](#additional-information)
6. [References](#references)
1. **Class Definition**
- Overview
- Attributes
2. **Methods**
- `__init__(self, time_enabled: bool = False, *args, **kwargs)`
- `add(self, role: str, content: str, *args, **kwargs)`
- `delete(self, index: str)`
- `update(self, index: str, role, content)`
- `query(self, index: str)`
- `search(self, keyword: str)`
- `display_conversation(self, detailed: bool = False)`
- `export_conversation(self, filename: str)`
- `import_conversation(self, filename: str)`
- `count_messages_by_role(self)`
- `return_history_as_string(self)`
- `save_as_json(self, filename: str)`
- `load_from_json(self, filename: str)`
- `search_keyword_in_conversation(self, keyword: str)`
- `pretty_print_conversation(self, messages)`
---
## 1. Introduction <a name="introduction"></a>
### 1. Class Definition
#### Overview
The `Conversation` class is designed to manage conversations by keeping track of messages and their attributes. It offers methods for adding, deleting, updating, querying, and displaying messages within the conversation. Additionally, it supports exporting and importing conversations, searching for specific keywords, and more.
#### Attributes
- `time_enabled (bool)`: A flag indicating whether to enable timestamp recording for messages.
- `conversation_history (list)`: A list that stores messages in the conversation.
### 2. Methods
#### `__init__(self, time_enabled: bool = False, *args, **kwargs)`
- **Description**: Initializes a new Conversation object.
- **Parameters**:
- `time_enabled (bool)`: If `True`, timestamps will be recorded for each message. Default is `False`.
#### `add(self, role: str, content: str, *args, **kwargs)`
- **Description**: Adds a message to the conversation history.
- **Parameters**:
- `role (str)`: The role of the speaker (e.g., "user," "assistant").
- `content (str)`: The content of the message.
#### `delete(self, index: str)`
- **Description**: Deletes a message from the conversation history.
- **Parameters**:
- `index (str)`: The index of the message to delete.
#### `update(self, index: str, role, content)`
- **Description**: Updates a message in the conversation history.
- **Parameters**:
- `index (str)`: The index of the message to update.
- `role (_type_)`: The new role of the speaker.
- `content (_type_)`: The new content of the message.
#### `query(self, index: str)`
- **Description**: Retrieves a message from the conversation history.
- **Parameters**:
- `index (str)`: The index of the message to query.
- **Returns**: The message as a string.
#### `search(self, keyword: str)`
- **Description**: Searches for messages containing a specific keyword in the conversation history.
- **Parameters**:
- `keyword (str)`: The keyword to search for.
- **Returns**: A list of messages that contain the keyword.
#### `display_conversation(self, detailed: bool = False)`
- **Description**: Displays the conversation history.
- **Parameters**:
- `detailed (bool, optional)`: If `True`, provides detailed information about each message. Default is `False`.
#### `export_conversation(self, filename: str)`
- **Description**: Exports the conversation history to a text file.
- **Parameters**:
- `filename (str)`: The name of the file to export to.
#### `import_conversation(self, filename: str)`
- **Description**: Imports a conversation history from a text file.
- **Parameters**:
- `filename (str)`: The name of the file to import from.
#### `count_messages_by_role(self)`
- **Description**: Counts the number of messages by role in the conversation.
- **Returns**: A dictionary containing the count of messages for each role.
#### `return_history_as_string(self)`
- **Description**: Returns the entire conversation history as a single string.
- **Returns**: The conversation history as a string.
#### `save_as_json(self, filename: str)`
- **Description**: Saves the conversation history as a JSON file.
- **Parameters**:
- `filename (str)`: The name of the JSON file to save.
The Conversation module provides a versatile and extensible structure for managing and analyzing text-based conversations. Whether you're developing a chatbot, analyzing customer support interactions, or conducting research on dialogues, this module simplifies the process of handling conversation data.
#### `load_from_json(self, filename: str)`
With the Conversation module, you can add, delete, update, query, and search for messages within a conversation. You can also display, export, and import conversation history, making it an essential tool for various applications.
- **Description**: Loads a conversation history from a JSON file.
- **Parameters**:
- `filename (str)`: The name of the JSON file to load.
## 2. Installation <a name="installation"></a>
#### `search_keyword_in_conversation(self, keyword: str)`
To use the Conversation module, you need to have Python installed on your system. Additionally, you can install the required dependencies using pip:
- **Description**: Searches for a keyword in the conversation history and returns matching messages.
- **Parameters**:
- `keyword (str)`: The keyword to search for.
- **Returns**: A list of messages containing the keyword.
```bash
pip install termcolor
#### `pretty_print_conversation(self, messages)`
- **Description**: Pretty prints a list of messages with colored role indicators.
- **Parameters**:
- `messages (list)`: A list of messages to print.
## Examples
Here are some usage examples of the `Conversation` class:
### Creating a Conversation
```python
from swarms.structs import Conversation
conv = Conversation()
```
Once you have the dependencies installed, you can import the Conversation module into your Python code.
### Adding Messages
```python
from swarms.structs.conversation import Conversation
conv.add("user", "Hello, world!")
conv.add("assistant", "Hello, user!")
```
## 3. Class: Conversation <a name="class-conversation"></a>
### Displaying the Conversation
The Conversation class is the core of this module. It allows you to create and manipulate conversation histories. Below are the attributes and methods provided by this class.
```python
conv.display_conversation()
```
### Attributes <a name="attributes"></a>
### Searching for Messages
- `time_enabled` (bool): Indicates whether timestamps are enabled for messages in the conversation.
- `conversation_history` (list): A list that stores the conversation history as a collection of messages.
```python
result = conv.search("Hello")
```
### Methods <a name="methods"></a>
### Exporting and Importing Conversations
The Conversation class provides the following methods:
```python
conv.export_conversation("conversation.txt")
conv.import_conversation("conversation.txt")
```
- `add(role: str, content: str, *args, **kwargs)`: Adds a message to the conversation history.
- `delete(index: str)`: Deletes a message from the conversation history.
- `update(index: str, role, content)`: Updates a message in the conversation history.
- `query(index: str)`: Queries a message in the conversation history.
- `search(keyword: str)`: Searches for messages containing a specific keyword.
- `display_conversation(detailed: bool = False)`: Displays the conversation history.
- `export_conversation(filename: str)`: Exports the conversation history to a file.
- `import_conversation(filename: str)`: Imports a conversation history from a file.
- `count_messages_by_role()`: Counts the number of messages by role.
- `return_history_as_string()`: Returns the conversation history as a string.
- `save_as_json(filename: str)`: Saves the conversation history as a JSON file.
- `load_from_json(filename: str)`: Loads the conversation history from a JSON file.
- `search_keyword_in_conversation(keyword: str)`: Searches for a keyword in the conversation history.
- `pretty_print_conversation(messages)`: Pretty prints the conversation history.
### Counting Messages by Role
## 4. Usage Examples <a name="usage-examples"></a>
```python
counts = conv.count_messages_by_role()
```
In this section, we'll provide practical examples of how to use the Conversation module to manage and analyze conversation data.
### Loading and Saving as JSON
### Example 1: Creating a Conversation <a name="example-1-creating-a-conversation"></a>
```python
conv.save_as_json("conversation.json")
conv.load_from_json("conversation.json")
```
Certainly! Let's continue with more examples and additional information about the `Conversation` class.
Let's start by creating a Conversation object and enabling timestamps for messages:
### Querying a Specific Message
You can retrieve a specific message from the conversation by its index:
```python
conversation = Conversation(time_enabled=True)
message = conv.query(0) # Retrieves the first message
```
### Example 2: Adding Messages <a name="example-2-adding-messages"></a>
### Updating a Message
You can add messages to the conversation using the `add` method. Here's how to add a user message and an assistant response:
You can update a message's content or role within the conversation:
```python
conversation.add("user", "Hello, how can I help you?")
conversation.add("assistant", "Hi there! I'm here to assist you.")
conv.update(0, "user", "Hi there!") # Updates the first message
```
### Example 3: Displaying and Exporting Conversation <a name="example-3-displaying-and-exporting-conversation"></a>
### Deleting a Message
You can display the conversation history and export it to a file. Let's see how to do this:
If you want to remove a message from the conversation, you can use the `delete` method:
```python
# Display the conversation
conversation.display_conversation()
conv.delete(0) # Deletes the first message
```
# Export the conversation to a file
conversation.export_conversation("conversation_history.txt")
### Counting Messages by Role
You can count the number of messages by role in the conversation:
```python
counts = conv.count_messages_by_role()
# Example result: {'user': 2, 'assistant': 2}
```
### Example 4: Counting Messages by Role <a name="example-4-counting-messages-by-role"></a>
### Exporting and Importing as Text
You can count the number of messages by role (e.g., user, assistant, system) using the `count_messages_by_role` method:
You can export the conversation to a text file and later import it:
```python
message_counts = conversation.count_messages_by_role()
print(message_counts)
conv.export_conversation("conversation.txt") # Export
conv.import_conversation("conversation.txt") # Import
```
### Example 5: Loading and Searching <a name="example-5-loading-and-searching"></a>
### Exporting and Importing as JSON
You can load a conversation from a file and search for messages containing a specific keyword:
Conversations can also be saved and loaded as JSON files:
```python
# Load conversation from a file
conversation.load_from_json("saved_conversation.json")
conv.save_as_json("conversation.json") # Save as JSON
conv.load_from_json("conversation.json") # Load from JSON
```
# Search for messages containing the keyword "help"
results = conversation.search("help")
print(results)
### Searching for a Keyword
You can search for messages containing a specific keyword within the conversation:
```python
results = conv.search_keyword_in_conversation("Hello")
```
## 5. Additional Information <a name="additional-information"></a>
### Pretty Printing
The `pretty_print_conversation` method provides a visually appealing way to display messages with colored role indicators:
```python
conv.pretty_print_conversation(conv.conversation_history)
```
- The Conversation module is designed to provide flexibility and ease of use for managing and analyzing text-based conversations.
- You can extend the module by adding custom functionality or integrating it into your chatbot or natural language processing applications.
These examples demonstrate the versatility of the `Conversation` class in managing and interacting with conversation data. Whether you're building a chatbot, conducting analysis, or simply organizing dialogues, this class offers a robust set of tools to help you accomplish your goals.
## 6. References <a name="references"></a>
## Conclusion
For more information on the Conversation module and its usage, refer to the official documentation and examples.
The `Conversation` class is a valuable utility for handling conversation data in Python. With its ability to add, update, delete, search, export, and import messages, you have the flexibility to work with conversations in various ways. Feel free to explore its features and adapt them to your specific projects and applications.
If you have any further questions or need additional assistance, please don't hesitate to ask!

@ -0,0 +1,159 @@
from abc import ABC, abstractmethod
class AbstractDatabase(ABC):
"""
Abstract base class for a database.
This class defines the interface for interacting with a database.
Subclasses must implement the abstract methods to provide the
specific implementation details for connecting to a database,
executing queries, and performing CRUD operations.
"""
@abstractmethod
def connect(self):
"""
Connect to the database.
This method establishes a connection to the database.
"""
pass
@abstractmethod
def close(self):
"""
Close the database connection.
This method closes the connection to the database.
"""
pass
@abstractmethod
def execute_query(self, query):
"""
Execute a database query.
This method executes the given query on the database.
Parameters:
query (str): The query to be executed.
"""
pass
@abstractmethod
def fetch_all(self):
"""
Fetch all rows from the result set.
This method retrieves all rows from the result set of a query.
Returns:
list: A list of dictionaries representing the rows.
"""
pass
@abstractmethod
def fetch_one(self):
"""
Fetch one row from the result set.
This method retrieves one row from the result set of a query.
Returns:
dict: A dictionary representing the row.
"""
pass
@abstractmethod
def add(self, table, data):
"""
Add a new record to the database.
This method adds a new record to the specified table in the database.
Parameters:
table (str): The name of the table.
data (dict): A dictionary representing the data to be added.
"""
pass
@abstractmethod
def query(self, table, condition):
"""
Query the database.
This method queries the specified table in the database based on the given condition.
Parameters:
table (str): The name of the table.
condition (str): The condition to be applied in the query.
Returns:
list: A list of dictionaries representing the query results.
"""
pass
@abstractmethod
def get(self, table, id):
"""
Get a record from the database.
This method retrieves a record from the specified table in the database based on the given ID.
Parameters:
table (str): The name of the table.
id (int): The ID of the record to be retrieved.
Returns:
dict: A dictionary representing the retrieved record.
"""
pass
@abstractmethod
def update(self, table, id, data):
"""
Update a record in the database.
This method updates a record in the specified table in the database based on the given ID.
Parameters:
table (str): The name of the table.
id (int): The ID of the record to be updated.
data (dict): A dictionary representing the updated data.
"""
pass
@abstractmethod
def delete(self, table, id):
"""
Delete a record from the database.
This method deletes a record from the specified table in the database based on the given ID.
Parameters:
table (str): The name of the table.
id (int): The ID of the record to be deleted.
"""
pass

@ -1,302 +1,140 @@
import subprocess
import uuid
from typing import Optional
from attr import define, field, Factory
from dataclasses import dataclass
from swarms.memory.base import BaseVectorStore
from typing import Any, List, Optional
from sqlalchemy import JSON, Column, String, create_engine
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
try:
from sqlalchemy.engine import Engine
from sqlalchemy import create_engine, Column, String, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Session
except ImportError:
print(
"The PgVectorVectorStore requires sqlalchemy to be installed"
)
print("pip install sqlalchemy")
subprocess.run(["pip", "install", "sqlalchemy"])
try:
from pgvector.sqlalchemy import Vector
except ImportError:
print("The PgVectorVectorStore requires pgvector to be installed")
print("pip install pgvector")
subprocess.run(["pip", "install", "pgvector"])
class PostgresDB:
"""
A class representing a Postgres database.
@define
class PgVectorVectorStore(BaseVectorStore):
"""A vector store driver to Postgres using the PGVector extension.
Args:
connection_string (str): The connection string for the Postgres database.
table_name (str): The name of the table in the database.
Attributes:
connection_string: An optional string describing the target Postgres database instance.
create_engine_params: Additional configuration params passed when creating the database connection.
engine: An optional sqlalchemy Postgres engine to use.
table_name: Optionally specify the name of the table to used to store vectors.
Methods:
upsert_vector(vector: list[float], vector_id: Optional[str] = None, namespace: Optional[str] = None, meta: Optional[dict] = None, **kwargs) -> str:
Upserts a vector into the index.
load_entry(vector_id: str, namespace: Optional[str] = None) -> Optional[BaseVector.Entry]:
Loads a single vector from the index.
load_entries(namespace: Optional[str] = None) -> list[BaseVector.Entry]:
Loads all vectors from the index.
query(query: str, count: Optional[int] = None, namespace: Optional[str] = None, include_vectors: bool = False, include_metadata=True, **kwargs) -> list[BaseVector.QueryResult]:
Queries the index for vectors similar to the given query string.
setup(create_schema: bool = True, install_uuid_extension: bool = True, install_vector_extension: bool = True) -> None:
Provides a mechanism to initialize the database schema and extensions.
Usage:
>>> from swarms.memory.vector_stores.pgvector import PgVectorVectorStore
>>> from swarms.utils.embeddings import USEEmbedding
>>> from swarms.utils.hash import str_to_hash
>>> from swarms.utils.dataframe import dataframe_to_hash
>>> import pandas as pd
>>>
>>> # Create a new PgVectorVectorStore instance:
>>> pv = PgVectorVectorStore(
>>> connection_string="postgresql://postgres:password@localhost:5432/postgres",
>>> table_name="your-table-name"
>>> )
>>> # Create a new index:
>>> pv.setup()
>>> # Create a new USEEmbedding instance:
>>> use = USEEmbedding()
>>> # Create a new dataframe:
>>> df = pd.DataFrame({
>>> "text": [
>>> "This is a test",
>>> "This is another test",
>>> "This is a third test"
>>> ]
>>> })
>>> # Embed the dataframe:
>>> df["embedding"] = df["text"].apply(use.embed_string)
>>> # Upsert the dataframe into the index:
>>> pv.upsert_vector(
>>> vector=df["embedding"].tolist(),
>>> vector_id=dataframe_to_hash(df),
>>> namespace="your-namespace"
>>> )
>>> # Query the index:
>>> pv.query(
>>> query="This is a test",
>>> count=10,
>>> namespace="your-namespace"
>>> )
>>> # Load a single entry from the index:
>>> pv.load_entry(
>>> vector_id=dataframe_to_hash(df),
>>> namespace="your-namespace"
>>> )
>>> # Load all entries from the index:
>>> pv.load_entries(
>>> namespace="your-namespace"
>>> )
engine: The SQLAlchemy engine for connecting to the database.
table_name (str): The name of the table in the database.
VectorModel: The SQLAlchemy model representing the vector table.
"""
connection_string: Optional[str] = field(
default=None, kw_only=True
)
create_engine_params: dict = field(factory=dict, kw_only=True)
engine: Optional[Engine] = field(default=None, kw_only=True)
table_name: str = field(kw_only=True)
_model: any = field(
default=Factory(
lambda self: self.default_vector_model(), takes_self=True
)
)
@connection_string.validator
def validate_connection_string(
self, _, connection_string: Optional[str]
) -> None:
# If an engine is provided, the connection string is not used.
if self.engine is not None:
return
def __init__(
self, connection_string: str, table_name: str, *args, **kwargs
):
"""
Initializes a new instance of the PostgresDB class.
# If an engine is not provided, a connection string is required.
if connection_string is None:
raise ValueError(
"An engine or connection string is required"
)
Args:
connection_string (str): The connection string for the Postgres database.
table_name (str): The name of the table in the database.
if not connection_string.startswith("postgresql://"):
raise ValueError(
"The connection string must describe a Postgres"
" database connection"
"""
self.engine = create_engine(
connection_string, *args, **kwargs
)
self.table_name = table_name
self.VectorModel = self._create_vector_model()
@engine.validator
def validate_engine(self, _, engine: Optional[Engine]) -> None:
# If a connection string is provided, an engine does not need to be provided.
if self.connection_string is not None:
return
def _create_vector_model(self):
"""
Creates the SQLAlchemy model for the vector table.
# If a connection string is not provided, an engine is required.
if engine is None:
raise ValueError(
"An engine or connection string is required"
)
Returns:
The SQLAlchemy model representing the vector table.
def __attrs_post_init__(self) -> None:
"""If a an engine is provided, it will be used to connect to the database.
If not, a connection string is used to create a new database connection here.
"""
if self.engine is None:
self.engine = create_engine(
self.connection_string, **self.create_engine_params
)
Base = declarative_base()
def setup(
self,
create_schema: bool = True,
install_uuid_extension: bool = True,
install_vector_extension: bool = True,
) -> None:
"""Provides a mechanism to initialize the database schema and extensions."""
if install_uuid_extension:
self.engine.execute(
'CREATE EXTENSION IF NOT EXISTS "uuid-ossp";'
)
class VectorModel(Base):
__tablename__ = self.table_name
if install_vector_extension:
self.engine.execute(
'CREATE EXTENSION IF NOT EXISTS "vector";'
id = Column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
unique=True,
nullable=False,
)
vector = Column(
String
) # Assuming vector is stored as a string
namespace = Column(String)
meta = Column(JSON)
if create_schema:
self._model.metadata.create_all(self.engine)
return VectorModel
def upsert_vector(
def add_or_update_vector(
self,
vector: list[float],
vector: str,
vector_id: Optional[str] = None,
namespace: Optional[str] = None,
meta: Optional[dict] = None,
**kwargs,
) -> str:
"""Inserts or updates a vector in the collection."""
) -> None:
"""
Adds or updates a vector in the database.
Args:
vector (str): The vector to be added or updated.
vector_id (str, optional): The ID of the vector. If not provided, a new ID will be generated.
namespace (str, optional): The namespace of the vector.
meta (dict, optional): Additional metadata associated with the vector.
"""
try:
with Session(self.engine) as session:
obj = self._model(
obj = self.VectorModel(
id=vector_id,
vector=vector,
namespace=namespace,
meta=meta,
)
obj = session.merge(obj)
session.merge(obj)
session.commit()
except Exception as e:
print(f"Error adding or updating vector: {e}")
return str(obj.id)
def query_vectors(
self, query: Any, namespace: Optional[str] = None
) -> List[Any]:
"""
Queries vectors from the database based on the given query and namespace.
def load_entry(
self, vector_id: str, namespace: Optional[str] = None
) -> BaseVectorStore.Entry:
"""Retrieves a specific vector entry from the collection based on its identifier and optional namespace."""
with Session(self.engine) as session:
result = session.get(self._model, vector_id)
Args:
query (Any): The query or condition to filter the vectors.
namespace (str, optional): The namespace of the vectors to be queried.
return BaseVectorStore.Entry(
id=result.id,
vector=result.vector,
namespace=result.namespace,
meta=result.meta,
)
Returns:
List[Any]: A list of vectors that match the query and namespace.
def load_entries(
self, namespace: Optional[str] = None
) -> list[BaseVectorStore.Entry]:
"""Retrieves all vector entries from the collection, optionally filtering to only
those that match the provided namespace.
"""
try:
with Session(self.engine) as session:
query = session.query(self._model)
q = session.query(self.VectorModel)
if namespace:
query = query.filter_by(namespace=namespace)
results = query.all()
return [
BaseVectorStore.Entry(
id=str(result.id),
vector=result.vector,
namespace=result.namespace,
meta=result.meta,
)
for result in results
]
def query(
self,
query: str,
count: Optional[int] = BaseVectorStore.DEFAULT_QUERY_COUNT,
namespace: Optional[str] = None,
include_vectors: bool = False,
distance_metric: str = "cosine_distance",
**kwargs,
) -> list[BaseVectorStore.QueryResult]:
"""Performs a search on the collection to find vectors similar to the provided input vector,
optionally filtering to only those that match the provided namespace.
q = q.filter_by(namespace=namespace)
# Assuming 'query' is a condition or filter
q = q.filter(query)
return q.all()
except Exception as e:
print(f"Error querying vectors: {e}")
return []
def delete_vector(self, vector_id):
"""
distance_metrics = {
"cosine_distance": self._model.vector.cosine_distance,
"l2_distance": self._model.vector.l2_distance,
"inner_product": self._model.vector.max_inner_product,
}
if distance_metric not in distance_metrics:
raise ValueError("Invalid distance metric provided")
Deletes a vector from the database based on the given vector ID.
op = distance_metrics[distance_metric]
Args:
vector_id: The ID of the vector to be deleted.
"""
try:
with Session(self.engine) as session:
vector = self.embedding_driver.embed_string(query)
# The query should return both the vector and the distance metric score.
query = session.query(
self._model,
op(vector).label("score"),
).order_by(op(vector))
if namespace:
query = query.filter_by(namespace=namespace)
results = query.limit(count).all()
return [
BaseVectorStore.QueryResult(
id=str(result[0].id),
vector=(
result[0].vector if include_vectors else None
),
score=result[1],
meta=result[0].meta,
namespace=result[0].namespace,
)
for result in results
]
def default_vector_model(self) -> any:
Base = declarative_base()
@dataclass
class VectorModel(Base):
__tablename__ = self.table_name
id = Column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
unique=True,
nullable=False,
)
vector = Column(Vector())
namespace = Column(String)
meta = Column(JSON)
return VectorModel
obj = session.get(self.VectorModel, vector_id)
if obj:
session.delete(obj)
session.commit()
except Exception as e:
print(f"Error deleting vector: {e}")

@ -26,7 +26,18 @@ def maximal_marginal_relevance(
lambda_mult: float = 0.5,
k: int = 4,
) -> List[int]:
"""Calculate maximal marginal relevance."""
"""
Calculate maximal marginal relevance.
Args:
query_embedding (np.ndarray): The embedding of the query.
embedding_list (list): List of embeddings to select from.
lambda_mult (float, optional): The weight for query score. Defaults to 0.5.
k (int, optional): The number of embeddings to select. Defaults to 4.
Returns:
List[int]: List of indices of selected embeddings.
"""
if min(k, len(embedding_list)) <= 0:
return []
if query_embedding.ndim == 1:

@ -2,10 +2,20 @@ from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.autoscaler import AutoScaler
from swarms.structs.conversation import Conversation
from swarms.structs.schemas import (
TaskInput,
Artifact,
ArtifactUpload,
StepInput,
)
__all__ = [
"Agent",
"SequentialWorkflow",
"AutoScaler",
"Conversation",
"TaskInput",
"Artifact",
"ArtifactUpload",
"StepInput",
]

@ -1,8 +1,9 @@
import json
import datetime
import json
from termcolor import colored
from swarms.memory.base_db import AbstractDatabase
from swarms.structs.base import BaseStructure
@ -23,12 +24,22 @@ class Conversation(BaseStructure):
>>> conv.display_conversation()
user: Hello, world!
"""
def __init__(self, time_enabled: bool = False, *args, **kwargs):
def __init__(
self,
time_enabled: bool = False,
database: AbstractDatabase = None,
autosave: bool = True,
save_filepath: str = "/runs/conversation.json",
*args,
**kwargs,
):
super().__init__()
self.time_enabled = time_enabled
self.database = database
self.autosave = autosave
self.save_filepath = save_filepath
self.conversation_history = []
def add(self, role: str, content: str, *args, **kwargs):
@ -55,6 +66,9 @@ class Conversation(BaseStructure):
self.conversation_history.append(message)
if self.autosave:
self.save_as_json(self.save_filepath)
def delete(self, index: str):
"""Delete a message from the conversation history
@ -122,7 +136,7 @@ class Conversation(BaseStructure):
)
)
def export_conversation(self, filename: str):
def export_conversation(self, filename: str, *args, **kwargs):
"""Export the conversation history to a file
Args:
@ -259,3 +273,37 @@ class Conversation(BaseStructure):
role_to_color[message["role"]],
)
)
def add_to_database(self, *args, **kwargs):
"""Add the conversation history to the database"""
self.database.add("conversation", self.conversation_history)
def query_from_database(self, query, *args, **kwargs):
"""Query the conversation history from the database"""
return self.database.query("conversation", query)
def delete_from_database(self, *args, **kwargs):
"""Delete the conversation history from the database"""
self.database.delete("conversation")
def update_from_database(self, *args, **kwargs):
"""Update the conversation history from the database"""
self.database.update(
"conversation", self.conversation_history
)
def get_from_database(self, *args, **kwargs):
"""Get the conversation history from the database"""
return self.database.get("conversation")
def execute_query_from_database(self, query, *args, **kwargs):
"""Execute a query on the database"""
return self.database.execute_query(query)
def fetch_all_from_database(self, *args, **kwargs):
"""Fetch all from the database"""
return self.database.fetch_all()
def fetch_one_from_database(self, *args, **kwargs):
"""Fetch one from the database"""
return self.database.fetch_one()

@ -17,6 +17,15 @@ class TaskInput(BaseModel):
class Artifact(BaseModel):
"""
Represents an artifact.
Attributes:
artifact_id (str): Id of the artifact.
file_name (str): Filename of the artifact.
relative_path (str, optional): Relative path of the artifact in the agent's workspace.
"""
artifact_id: str = Field(
...,
description="Id of the artifact",

@ -1,55 +1,45 @@
import pytest
import os
from unittest.mock import patch
from swarms.memory.pg import PgVectorVectorStore
from dotenv import load_dotenv
import os
load_dotenv()
from swarms.memory.pg import PostgresDB
load_dotenv()
PSG_CONNECTION_STRING = os.getenv("PSG_CONNECTION_STRING")
def test_init():
with patch("sqlalchemy.create_engine") as MockEngine:
store = PgVectorVectorStore(
db = PostgresDB(
connection_string=PSG_CONNECTION_STRING,
table_name="test",
)
MockEngine.assert_called_once()
assert store.engine == MockEngine.return_value
assert db.engine == MockEngine.return_value
def test_init_exception():
with pytest.raises(ValueError):
PgVectorVectorStore(
connection_string=(
"mysql://root:password@localhost:3306/test"
),
table_name="test",
)
def test_setup():
with patch("sqlalchemy.create_engine") as MockEngine:
store = PgVectorVectorStore(
def test_create_vector_model():
with patch("sqlalchemy.create_engine"):
db = PostgresDB(
connection_string=PSG_CONNECTION_STRING,
table_name="test",
)
store.setup()
MockEngine.execute.assert_called()
model = db._create_vector_model()
assert model.__tablename__ == "test"
def test_upsert_vector():
def test_add_or_update_vector():
with patch("sqlalchemy.create_engine"), patch(
"sqlalchemy.orm.Session"
) as MockSession:
store = PgVectorVectorStore(
db = PostgresDB(
connection_string=PSG_CONNECTION_STRING,
table_name="test",
)
store.upsert_vector(
[1.0, 2.0, 3.0],
db.add_or_update_vector(
"test_vector",
"test_id",
"test_namespace",
{"meta": "data"},
@ -59,45 +49,32 @@ def test_upsert_vector():
MockSession.return_value.commit.assert_called()
def test_load_entry():
def test_query_vectors():
with patch("sqlalchemy.create_engine"), patch(
"sqlalchemy.orm.Session"
) as MockSession:
store = PgVectorVectorStore(
db = PostgresDB(
connection_string=PSG_CONNECTION_STRING,
table_name="test",
)
store.load_entry("test_id", "test_namespace")
MockSession.assert_called()
MockSession.return_value.get.assert_called()
def test_load_entries():
with patch("sqlalchemy.create_engine"), patch(
"sqlalchemy.orm.Session"
) as MockSession:
store = PgVectorVectorStore(
connection_string=PSG_CONNECTION_STRING,
table_name="test",
)
store.load_entries("test_namespace")
db.query_vectors("test_query", "test_namespace")
MockSession.assert_called()
MockSession.return_value.query.assert_called()
MockSession.return_value.query.return_value.filter_by.assert_called()
MockSession.return_value.query.return_value.filter.assert_called()
MockSession.return_value.query.return_value.all.assert_called()
def test_query():
def test_delete_vector():
with patch("sqlalchemy.create_engine"), patch(
"sqlalchemy.orm.Session"
) as MockSession:
store = PgVectorVectorStore(
db = PostgresDB(
connection_string=PSG_CONNECTION_STRING,
table_name="test",
)
store.query("test_query", 10, "test_namespace")
db.delete_vector("test_id")
MockSession.assert_called()
MockSession.return_value.query.assert_called()
MockSession.return_value.query.return_value.filter_by.assert_called()
MockSession.return_value.query.return_value.limit.assert_called()
MockSession.return_value.query.return_value.all.assert_called()
MockSession.return_value.get.assert_called()
MockSession.return_value.delete.assert_called()
MockSession.return_value.commit.assert_called()
Loading…
Cancel
Save