Merge branch 'kyegomez:master' into master

pull/307/head
pliny 1 year ago committed by GitHub
commit 936d94820c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,15 @@
name: "PR Labeler"
on:
pull_request_target:
types: ["opened", "reopened", "ready_for_review"]
jobs:
triage:
permissions:
contents: read
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v4
if: ${{ github.event.pull_request.draft == false }}

@ -0,0 +1,49 @@
# This workflow warns and then closes issues and PRs that have had no activity for a specified amount of time.
#
# You can adjust the behavior by modifying this file.
# For more information, see:
# https://github.com/actions/stale
name: Mark stale issues and pull requests
on:
schedule:
# Scheduled to run at 1.30 UTC everyday
- cron: '30 1 * * *'
jobs:
stale:
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v5
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
days-before-issue-stale: 14
days-before-issue-close: 14
stale-issue-label: "status:stale"
close-issue-reason: not_planned
any-of-labels: "status:awaiting user response,status:more data needed"
stale-issue-message: >
Marking this issue as stale since it has been open for 14 days with no activity.
This issue will be closed if no further activity occurs.
close-issue-message: >
This issue was closed because it has been inactive for 28 days.
Please post a new issue if you need further assistance. Thanks!
days-before-pr-stale: 14
days-before-pr-close: 14
stale-pr-label: "status:stale"
stale-pr-message: >
Marking this pull request as stale since it has been open for 14 days with no activity.
This PR will be closed if no further activity occurs.
close-pr-message: >
This pull request was closed because it has been inactive for 28 days.
Please open a new pull request if you need further assistance. Thanks!
# Label that can be assigned to issues to exclude them from being marked as stale
exempt-issue-labels: 'override-stale'
# Label that can be assigned to PRs to exclude them from being marked as stale
exempt-pr-labels: "override-stale"

@ -0,0 +1,81 @@
# Notebook-related checks
name: Presubmit checks
on:
# Relevant PRs
pull_request:
paths:
- "swarms/**"
- "tests/**"
# Allow manual runs
workflow_dispatch:
jobs:
test3_11:
name: Test Py3.11
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Run tests
run: |
python --version
pip install .[dev]
python -m pytest
test3_10:
name: Test Py3.10
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Run tests
run: |
python --version
pip install -q .[dev]
python -m pytest
test3_9:
name: Test Py3.9
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Run tests
run: |
python --version
pip install .[dev]
python -m pytest
pytype3_10:
name: pytype 3.10
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Run pytype
run: |
python --version
pip install .[dev]
pip install -q gspread ipython
pytype
format:
name: Check format with black
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Check format
run: |
python --version
pip install -q .
pip install -q black
black . --check

@ -1,4 +1,4 @@
# `PineconeVectorStoreStore` Documentation # `PineconDB` Documentation
## Table of Contents ## Table of Contents

@ -0,0 +1,248 @@
# Short-Term Memory Module Documentation
## Introduction
The Short-Term Memory module is a component of the SWARMS framework designed for managing short-term and medium-term memory in a multi-agent system. This documentation provides a detailed explanation of the Short-Term Memory module, its purpose, functions, and usage.
### Purpose
The Short-Term Memory module serves the following purposes:
1. To store and manage messages in short-term memory.
2. To provide functions for retrieving, updating, and clearing memory.
3. To facilitate searching for specific terms within the memory.
4. To enable saving and loading memory data to/from a file.
### Class Definition
```python
class ShortTermMemory(BaseStructure):
def __init__(
self,
return_str: bool = True,
autosave: bool = True,
*args,
**kwargs,
):
...
```
#### Parameters
| Parameter | Type | Default Value | Description |
|---------------------|----------|---------------|------------------------------------------------------------------------------------------------------------------|
| `return_str` | bool | True | If True, returns memory as a string. |
| `autosave` | bool | True | If True, enables automatic saving of memory data to a file. |
| `*args`, `**kwargs` | | | Additional arguments and keyword arguments (not used in the constructor but allowed for flexibility). |
### Functions
#### 1. `add`
```python
def add(self, role: str = None, message: str = None, *args, **kwargs):
```
- Adds a message to the short-term memory.
- Parameters:
- `role` (str, optional): Role associated with the message.
- `message` (str, optional): The message to be added.
- Returns: The added memory.
##### Example 1: Adding a Message to Short-Term Memory
```python
memory.add(role="Agent 1", message="Received task assignment.")
```
##### Example 2: Adding Multiple Messages to Short-Term Memory
```python
messages = [("Agent 1", "Received task assignment."), ("Agent 2", "Task completed.")]
for role, message in messages:
memory.add(role=role, message=message)
```
#### 2. `get_short_term`
```python
def get_short_term(self):
```
- Retrieves the short-term memory.
- Returns: The contents of the short-term memory.
##### Example: Retrieving Short-Term Memory
```python
short_term_memory = memory.get_short_term()
for entry in short_term_memory:
print(entry["role"], ":", entry["message"])
```
#### 3. `get_medium_term`
```python
def get_medium_term(self):
```
- Retrieves the medium-term memory.
- Returns: The contents of the medium-term memory.
##### Example: Retrieving Medium-Term Memory
```python
medium_term_memory = memory.get_medium_term()
for entry in medium_term_memory:
print(entry["role"], ":", entry["message"])
```
#### 4. `clear_medium_term`
```python
def clear_medium_term(self):
```
- Clears the medium-term memory.
##### Example: Clearing Medium-Term Memory
```python
memory.clear_medium_term()
```
#### 5. `get_short_term_memory_str`
```python
def get_short_term_memory_str(self, *args, **kwargs):
```
- Retrieves the short-term memory as a string.
- Returns: A string representation of the short-term memory.
##### Example: Getting Short-Term Memory as a String
```python
short_term_memory_str = memory.get_short_term_memory_str()
print(short_term_memory_str)
```
#### 6. `update_short_term`
```python
def update_short_term(self, index, role: str, message: str, *args, **kwargs):
```
- Updates a message in the short-term memory.
- Parameters:
- `index` (int): The index of the message to update.
- `role` (str): New role for the message.
- `message` (str): New message content.
- Returns: None.
##### Example: Updating a Message in Short-Term Memory
```python
memory.update_short_term(index=0, role="Updated Role", message="Updated message content.")
```
#### 7. `clear`
```python
def clear(self):
```
- Clears the short-term memory.
##### Example: Clearing Short-Term Memory
```python
memory.clear()
```
#### 8. `search_memory`
```python
def search_memory(self, term):
```
- Searches the memory for a specific term.
- Parameters:
- `term` (str): The term to search for.
- Returns: A dictionary containing search results for short-term and medium-term memory.
##### Example: Searching Memory for a Term
```python
search_results = memory.search_memory("task")
print("Short-Term Memory Results:", search_results["short_term"])
print("Medium-Term Memory Results:", search_results["medium_term"])
```
#### 9. `return_shortmemory_as_str`
```python
def return_shortmemory_as_str(self):
```
- Returns the memory as a string.
##### Example: Returning Short-Term Memory as a String
```python
short_term_memory_str = memory.return_shortmemory_as_str()
print(short_term_memory_str)
```
#### 10. `move_to_medium_term`
```python
def move_to_medium_term(self, index):
```
- Moves a message from the short-term memory to the medium-term memory.
- Parameters:
- `index` (int): The index of the message to move.
##### Example: Moving a Message to Medium-Term Memory
```python
memory.move_to_medium_term(index=0)
```
#### 11. `return_medium_memory_as_str`
```python
def return_medium_memory_as_str(self):
```
- Returns the medium-term memory as a string.
##### Example: Returning Medium-Term Memory as a String
```python
medium_term_memory_str = memory.return_medium_memory_as_str()
print(medium_term_memory_str)
```
#### 12. `save_to_file`
```python
def save_to_file(self, filename: str):
```
- Saves the memory data to a file.
- Parameters:
- `filename` (str): The name of the file to save the data to.
##### Example: Saving Memory Data to a File
```python
memory.save_to_file("memory_data.json")
```
#### 13. `load_from_file`
```python
def load_from_file(self, filename: str, *args, **kwargs):
```
- Loads memory data from a file.
- Parameters:
- `filename` (str): The name of the file to load data from.
##### Example: Loading Memory Data from a File
```python
memory.load_from_file("memory_data.json")
```
### Additional Information and Tips
- To use the Short-Term Memory module effectively, consider the following tips:
- Use the `add` function to store messages in short-term memory.
-
Retrieve memory contents using `get_short_term` and `get_medium_term` functions.
- Clear memory as needed using `clear` and `clear_medium_term` functions.
- Search for specific terms within the memory using the `search_memory` function.
- Save and load memory data to/from files using `save_to_file` and `load_from_file` functions.
- Ensure proper exception handling when using memory functions to handle potential errors gracefully.
- When using the `search_memory` function, iterate through the results dictionary to access search results for short-term and medium-term memory.
### References and Resources
- For more information on multi-agent systems and memory management, refer to the SWARMS framework documentation: [SWARMS Documentation](https://swarms.apac.ai/).
- For advanced memory management and customization, explore the SWARMS framework source code.

@ -105,8 +105,9 @@ nav:
- SequentialWorkflow: 'swarms/structs/sequential_workflow.md' - SequentialWorkflow: 'swarms/structs/sequential_workflow.md'
- swarms.memory: - swarms.memory:
- Weaviate: "swarms/memory/weaviate.md" - Weaviate: "swarms/memory/weaviate.md"
- PineconeVectorStoreStore: "swarms/memory/pinecone.md" - PineconDB: "swarms/memory/pinecone.md"
- PGVectorStore: "swarms/memory/pg.md" - PGVectorStore: "swarms/memory/pg.md"
- ShortTermMemory: "swarms/memory/short_term_memory.md"
- swarms.utils: - swarms.utils:
- phoenix_trace_decorator: "swarms/utils/phoenix_tracer.md" - phoenix_trace_decorator: "swarms/utils/phoenix_tracer.md"
- Guides: - Guides:

@ -7,3 +7,7 @@ from swarms.swarms import * # noqa: E402, F403
from swarms.structs import * # noqa: E402, F403 from swarms.structs import * # noqa: E402, F403
from swarms.models import * # noqa: E402, F403 from swarms.models import * # noqa: E402, F403
from swarms.telemetry import * # noqa: E402, F403 from swarms.telemetry import * # noqa: E402, F403
from swarms.utils import * # noqa: E402, F403
from swarms.prompts import * # noqa: E402, F403
# from swarms.cli import * # noqa: E402, F403

@ -1,5 +1,7 @@
from swarms.memory.base_vectordb import VectorDatabase from swarms.memory.base_vectordb import VectorDatabase
from swarms.memory.short_term_memory import ShortTermMemory
__all__ = [ __all__ = [
"VectorDatabase", "VectorDatabase",
"ShortTermMemory"
] ]

@ -61,6 +61,8 @@ class ChromaDB:
openai_api_key: str = OPENAI_API_KEY, openai_api_key: str = OPENAI_API_KEY,
top_results_num: int = 3, top_results_num: int = 3,
limit_tokens: Optional[int] = 1000, limit_tokens: Optional[int] = 1000,
*args,
**kwargs,
): ):
self.metric = metric self.metric = metric
self.RESULTS_STORE_NAME = RESULTS_STORE_NAME self.RESULTS_STORE_NAME = RESULTS_STORE_NAME
@ -91,7 +93,9 @@ class ChromaDB:
embedding_function=embedding_function, embedding_function=embedding_function,
) )
def add(self, task: Dict, result: str, result_id: str): def add(
self, task: Dict, result: str, result_id: str, *args, **kwargs
):
"""Adds a result to the ChromaDB collection """Adds a result to the ChromaDB collection
Args: Args:
@ -137,16 +141,15 @@ class ChromaDB:
"task": task["task_name"], "task": task["task_name"],
"result": result, "result": result,
}, },
*args,
**kwargs,
) )
except Exception as error: except Exception as error:
print( print(
colored(f"Error adding to ChromaDB: {error}", "red") colored(f"Error adding to ChromaDB: {error}", "red")
) )
def query( def query(self, query: str, *args, **kwargs) -> List[dict]:
self,
query: str,
) -> List[dict]:
"""Queries the ChromaDB collection with a query for the top results """Queries the ChromaDB collection with a query for the top results
Args: Args:
@ -164,6 +167,8 @@ class ChromaDB:
query_texts=query, query_texts=query,
n_results=min(self.top_results_num, count), n_results=min(self.top_results_num, count),
include=["metadatas"], include=["metadatas"],
*args,
**kwargs,
) )
out = [item["task"] for item in results["metadatas"][0]] out = [item["task"] for item in results["metadatas"][0]]
out = limit_tokens_from_string( out = limit_tokens_from_string(

@ -1,14 +1,14 @@
from typing import Optional from typing import Optional
from swarms.memory.base import BaseVectorStore from swarms.memory.base_vectordb import VectorDatabase
import pinecone import pinecone
from attr import define, field from attr import define, field
from swarms.utils.hash import str_to_hash from swarms.utils.hash import str_to_hash
@define @define
class PineconeVectorStoreStore(BaseVectorStore): class PineconDB(VectorDatabase):
""" """
PineconeVectorStore is a vector storage driver that uses Pinecone as the underlying storage engine. PineconDB is a vector storage driver that uses Pinecone as the underlying storage engine.
Pinecone is a vector database that allows you to store, search, and retrieve high-dimensional vectors with Pinecone is a vector database that allows you to store, search, and retrieve high-dimensional vectors with
blazing speed and low latency. It is a managed service that is easy to use and scales effortlessly, so you can blazing speed and low latency. It is a managed service that is easy to use and scales effortlessly, so you can
@ -34,14 +34,14 @@ class PineconeVectorStoreStore(BaseVectorStore):
Creates a new index. Creates a new index.
Usage: Usage:
>>> from swarms.memory.vector_stores.pinecone import PineconeVectorStore >>> from swarms.memory.vector_stores.pinecone import PineconDB
>>> from swarms.utils.embeddings import USEEmbedding >>> from swarms.utils.embeddings import USEEmbedding
>>> from swarms.utils.hash import str_to_hash >>> from swarms.utils.hash import str_to_hash
>>> from swarms.utils.dataframe import dataframe_to_hash >>> from swarms.utils.dataframe import dataframe_to_hash
>>> import pandas as pd >>> import pandas as pd
>>> >>>
>>> # Create a new PineconeVectorStore instance: >>> # Create a new PineconDB instance:
>>> pv = PineconeVectorStore( >>> pv = PineconDB(
>>> api_key="your-api-key", >>> api_key="your-api-key",
>>> index_name="your-index-name", >>> index_name="your-index-name",
>>> environment="us-west1-gcp", >>> environment="us-west1-gcp",
@ -102,7 +102,7 @@ class PineconeVectorStoreStore(BaseVectorStore):
self.index = pinecone.Index(self.index_name) self.index = pinecone.Index(self.index_name)
def upsert_vector( def add(
self, self,
vector: list[float], vector: list[float],
vector_id: Optional[str] = None, vector_id: Optional[str] = None,
@ -110,7 +110,17 @@ class PineconeVectorStoreStore(BaseVectorStore):
meta: Optional[dict] = None, meta: Optional[dict] = None,
**kwargs, **kwargs,
) -> str: ) -> str:
"""Upsert vector""" """Add a vector to the index.
Args:
vector (list[float]): _description_
vector_id (Optional[str], optional): _description_. Defaults to None.
namespace (Optional[str], optional): _description_. Defaults to None.
meta (Optional[dict], optional): _description_. Defaults to None.
Returns:
str: _description_
"""
vector_id = ( vector_id = (
vector_id if vector_id else str_to_hash(str(vector)) vector_id if vector_id else str_to_hash(str(vector))
) )
@ -121,31 +131,15 @@ class PineconeVectorStoreStore(BaseVectorStore):
return vector_id return vector_id
def load_entry( def load_entries(self, namespace: Optional[str] = None):
self, vector_id: str, namespace: Optional[str] = None """Load all entries from the index.
) -> Optional[BaseVectorStore.Entry]:
"""Load entry""" Args:
result = self.index.fetch( namespace (Optional[str], optional): _description_. Defaults to None.
ids=[vector_id], namespace=namespace
).to_dict() Returns:
vectors = list(result["vectors"].values()) _type_: _description_
"""
if len(vectors) > 0:
vector = vectors[0]
return BaseVectorStore.Entry(
id=vector["id"],
meta=vector["metadata"],
vector=vector["values"],
namespace=result["namespace"],
)
else:
return None
def load_entries(
self, namespace: Optional[str] = None
) -> list[BaseVectorStore.Entry]:
"""Load entries"""
# This is a hacky way to query up to 10,000 values from Pinecone. Waiting on an official API for fetching # This is a hacky way to query up to 10,000 values from Pinecone. Waiting on an official API for fetching
# all values from a namespace: # all values from a namespace:
# https://community.pinecone.io/t/is-there-a-way-to-query-all-the-vectors-and-or-metadata-from-a-namespace/797/5 # https://community.pinecone.io/t/is-there-a-way-to-query-all-the-vectors-and-or-metadata-from-a-namespace/797/5
@ -157,15 +151,14 @@ class PineconeVectorStoreStore(BaseVectorStore):
namespace=namespace, namespace=namespace,
) )
return [ for result in results["matches"]:
BaseVectorStore.Entry( entry = {
id=r["id"], "id": result["id"],
vector=r["values"], "vector": result["values"],
meta=r["metadata"], "meta": result["metadata"],
namespace=results["namespace"], "namespace": result["namespace"],
) }
for r in results["matches"] return entry
]
def query( def query(
self, self,
@ -173,19 +166,26 @@ class PineconeVectorStoreStore(BaseVectorStore):
count: Optional[int] = None, count: Optional[int] = None,
namespace: Optional[str] = None, namespace: Optional[str] = None,
include_vectors: bool = False, include_vectors: bool = False,
# PineconeVectorStoreStorageDriver-specific params: # PineconDBStorageDriver-specific params:
include_metadata=True, include_metadata=True,
**kwargs, **kwargs,
) -> list[BaseVectorStore.QueryResult]: ):
"""Query vectors""" """Query the index for vectors similar to the given query string.
Args:
query (str): _description_
count (Optional[int], optional): _description_. Defaults to None.
namespace (Optional[str], optional): _description_. Defaults to None.
include_vectors (bool, optional): _description_. Defaults to False.
include_metadata (bool, optional): _description_. Defaults to True.
Returns:
_type_: _description_
"""
vector = self.embedding_driver.embed_string(query) vector = self.embedding_driver.embed_string(query)
params = { params = {
"top_k": ( "top_k": count,
count
if count
else BaseVectorStore.DEFAULT_QUERY_COUNT
),
"namespace": namespace, "namespace": namespace,
"include_values": include_vectors, "include_values": include_vectors,
"include_metadata": include_metadata, "include_metadata": include_metadata,
@ -193,19 +193,22 @@ class PineconeVectorStoreStore(BaseVectorStore):
results = self.index.query(vector, **params) results = self.index.query(vector, **params)
return [ for r in results["matches"]:
BaseVectorStore.QueryResult( entry = {
id=r["id"], "id": results["id"],
vector=r["values"], "vector": results["values"],
score=r["score"], "score": results["scores"],
meta=r["metadata"], "meta": results["metadata"],
namespace=results["namespace"], "namespace": results["namespace"],
) }
for r in results["matches"] return entry
]
def create_index(self, name: str, **kwargs) -> None: def create_index(self, name: str, **kwargs) -> None:
"""Create index""" """Create a new index.
Args:
name (str): _description_
"""
params = { params = {
"name": name, "name": name,
"dimension": self.embedding_driver.dimensions, "dimension": self.embedding_driver.dimensions,

@ -1,5 +1,6 @@
import subprocess import subprocess
from typing import List from typing import List
from httpx import RequestError from httpx import RequestError
try: try:
@ -15,8 +16,8 @@ try:
from qdrant_client import QdrantClient from qdrant_client import QdrantClient
from qdrant_client.http.models import ( from qdrant_client.http.models import (
Distance, Distance,
VectorParams,
PointStruct, PointStruct,
VectorParams,
) )
except ImportError: except ImportError:
print("Please install the qdrant-client package") print("Please install the qdrant-client package")
@ -91,7 +92,7 @@ class Qdrant:
) )
print(f"Collection '{self.collection_name}' created.") print(f"Collection '{self.collection_name}' created.")
def add_vectors(self, docs: List[dict]): def add(self, docs: List[dict], *args, **kwargs):
""" """
Adds vector representations of documents to the Qdrant collection. Adds vector representations of documents to the Qdrant collection.
@ -128,13 +129,15 @@ class Qdrant:
collection_name=self.collection_name, collection_name=self.collection_name,
wait=True, wait=True,
points=points, points=points,
*args,
**kwargs,
) )
return operation_info return operation_info
except Exception as e: except Exception as e:
print(f"Error adding vectors: {e}") print(f"Error adding vectors: {e}")
return None return None
def search_vectors(self, query: str, limit: int = 3): def query(self, query: str, limit: int = 3, *args, **kwargs):
""" """
Searches the collection for vectors similar to the query vector. Searches the collection for vectors similar to the query vector.
@ -147,12 +150,14 @@ class Qdrant:
""" """
try: try:
query_vector = self.model.encode( query_vector = self.model.encode(
query, normalize_embeddings=True query, normalize_embeddings=True, *args, **kwargs
) )
search_result = self.client.search( search_result = self.client.search(
collection_name=self.collection_name, collection_name=self.collection_name,
query_vector=query_vector, query_vector=query_vector,
limit=limit, limit=limit,
*args,
**kwargs,
) )
return search_result return search_result
except Exception as e: except Exception as e:

@ -0,0 +1,166 @@
import logging
from swarms.structs.base import BaseStructure
import threading
import json
import os
class ShortTermMemory(BaseStructure):
def __init__(
self,
return_str: bool = True,
autosave: bool = True,
*args,
**kwargs,
):
self.return_str = return_str
self.autosave = autosave
self.short_term_memory = []
self.medium_term_memory = []
self.lock = threading.Lock()
def add(
self, role: str = None, message: str = None, *args, **kwargs
):
"""Add a message to the short term memory.
Args:
role (str, optional): _description_. Defaults to None.
message (str, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
try:
memory = self.short_term_memory.append(
{"role": role, "message": message}
)
return memory
except Exception as error:
print(f"Add to short term memory failed: {error}")
raise error
def get_short_term(self):
"""Get the short term memory.
Returns:
_type_: _description_
"""
return self.short_term_memory
def get_medium_term(self):
"""Get the medium term memory.
Returns:
_type_: _description_
"""
return self.medium_term_memory
def clear_medium_term(self):
"""Clear the medium term memory."""
self.medium_term_memory = []
def get_short_term_memory_str(self, *args, **kwargs):
"""Get the short term memory as a string."""
return str(self.short_term_memory)
def update_short_term(
self, index, role: str, message: str, *args, **kwargs
):
self.short_term_memory[index] = {
"role": role,
"message": message,
}
def clear(self):
"""Clear the short term memory."""
self.short_term_memory = []
def search_memory(self, term):
"""Search the memory for a term.
Args:
term (_type_): _description_
Returns:
_type_: _description_
"""
results = {"short_term": [], "medium_term": []}
for i, message in enumerate(self.short_term_memory):
if term in message["message"]:
results["short_term"].append((i, message))
for i, message in enumerate(self.medium_term_memory):
if term in message["message"]:
results["medium_term"].append((i, message))
return results
def return_shortmemory_as_str(self):
"""Return the memory as a string.
Returns:
_type_: _description_
"""
return str(self.short_term_memory)
def move_to_medium_term(self, index):
"""Move a message from the short term memory to the medium term memory.
Args:
index (_type_): _description_
"""
message = self.short_term_memory.pop(index)
self.medium_term_memory.append(message)
def return_medium_memory_as_str(self):
"""Return the medium term memory as a string.
Returns:
_type_: _description_
"""
return str(self.medium_term_memory)
def save_to_file(self, filename: str):
"""Save the memory to a file.
Args:
filename (str): _description_
"""
try:
with self.lock:
with open(filename, "w") as f:
json.dump(
{
"short_term_memory": (
self.short_term_memory
),
"medium_term_memory": (
self.medium_term_memory
),
},
f,
)
logging.info(f"Saved memory to {filename}")
except Exception as error:
print(f"Error saving memory to {filename}: {error}")
def load_from_file(self, filename: str, *args, **kwargs):
"""Load the memory from a file.
Args:
filename (str): _description_
"""
try:
with self.lock:
with open(filename, "r") as f:
data = json.load(f)
self.short_term_memory = data.get(
"short_term_memory", []
)
self.medium_term_memory = data.get(
"medium_term_memory", []
)
logging.info(f"Loaded memory from {filename}")
except Exception as error:
print(f"Erorr loading memory from {filename}: {error}")

@ -1,4 +0,0 @@
"""
Implement retreiever for vector store
"""

@ -31,7 +31,7 @@ from swarms.models.layoutlm_document_qa import (
) # noqa: E402 ) # noqa: E402
from swarms.models.gpt4_vision_api import GPT4VisionAPI # noqa: E402 from swarms.models.gpt4_vision_api import GPT4VisionAPI # noqa: E402
from swarms.models.openai_tts import OpenAITTS # noqa: E402 from swarms.models.openai_tts import OpenAITTS # noqa: E402
from swarms.models.gemini import Gemini # noqa: E402
# from swarms.models.gpt4v import GPT4Vision # from swarms.models.gpt4v import GPT4Vision
# from swarms.models.dalle3 import Dalle3 # from swarms.models.dalle3 import Dalle3
# from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402 # from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402
@ -63,4 +63,5 @@ __all__ = [
"GPT4VisionAPI", "GPT4VisionAPI",
# "vLLM", # "vLLM",
"OpenAITTS", "OpenAITTS",
"Gemini",
] ]

@ -264,14 +264,6 @@ class Agent:
if preset_stopping_token: if preset_stopping_token:
self.stopping_token = "<DONE>" self.stopping_token = "<DONE>"
# If memory then add the json to the memory vector database
if memory:
# Add all of the state to the memory
self.add_message_to_memory_db(
{"message": self.state_to_str()},
{"agent_id": self.id},
)
# If tools exist then add the tool docs usage to the sop # If tools exist then add the tool docs usage to the sop
if self.tools: if self.tools:
self.sop_list.append( self.sop_list.append(

@ -79,7 +79,6 @@ class BaseStructure(ABC):
self.save_metadata_path = save_metadata_path self.save_metadata_path = save_metadata_path
self.save_error_path = save_error_path self.save_error_path = save_error_path
@abstractmethod
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
"""Run the structure.""" """Run the structure."""
pass pass

@ -6,74 +6,7 @@ from typing import Any, Callable, Dict, List, Optional, Union
from termcolor import colored from termcolor import colored
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.task import Task
# Define a generic Task that can handle different types of callable objects
@dataclass
class Task:
"""
Task class for running a task in a sequential workflow.
Args:
description (str): The description of the task.
agent (Union[Callable, Agent]): The model or agent to execute the task.
args (List[Any]): Additional arguments to pass to the task execution.
kwargs (Dict[str, Any]): Additional keyword arguments to pass to the task execution.
result (Any): The result of the task execution.
history (List[Any]): The history of the task execution.
Methods:
execute: Execute the task.
Examples:
>>> from swarms.structs import Task, Agent
>>> from swarms.models import OpenAIChat
>>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
>>> task = Task(description="What's the weather in miami", agent=agent)
>>> task.execute()
>>> task.result
"""
description: str
agent: Union[Callable, Agent]
args: List[Any] = field(default_factory=list)
kwargs: Dict[str, Any] = field(default_factory=dict)
result: Any = None
history: List[Any] = field(default_factory=list)
# logger = logging.getLogger(__name__)
def execute(self):
"""
Execute the task.
Raises:
ValueError: If a Agent instance is used as a task and the 'task' argument is not provided.
"""
if isinstance(self.agent, Agent):
# Add a prompt to notify the Agent of the sequential workflow
if "prompt" in self.kwargs:
self.kwargs["prompt"] += (
f"\n\nPrevious output: {self.result}"
if self.result
else ""
)
else:
self.kwargs["prompt"] = (
f"Main task: {self.description}"
+ (
f"\n\nPrevious output: {self.result}"
if self.result
else ""
)
)
self.result = self.agent.run(*self.args, **self.kwargs)
else:
self.result = self.agent(*self.args, **self.kwargs)
self.history.append(self.result)
# SequentialWorkflow class definition using dataclasses # SequentialWorkflow class definition using dataclasses
@ -361,7 +294,10 @@ class SequentialWorkflow:
) )
def workflow_bootup(self, **kwargs) -> None: def workflow_bootup(self, **kwargs) -> None:
"""Bootup the workflow.""" """
Workflow bootup.
"""
print( print(
colored( colored(
""" """

@ -1,132 +1,78 @@
from dataclass import dataclass, field from dataclasses import dataclass, field
from typing import (
Any,
Callable,
Dict,
List,
Union,
)
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from typing import Optional
from typing import List, Dict, Any, Sequence
# Define a generic Task that can handle different types of callable objects
@dataclass @dataclass
class Task: class Task:
""" """
Task is a unit of work that can be executed by a set of agents. Task class for running a task in a sequential workflow.
A task is defined by a task name and a set of agents that can execute the task.
The task can also have a set of dependencies, which are the names of other tasks
that must be executed before this task can be executed.
Args: Args:
id (str): The name of the task. description (str): The description of the task.
description (Optional[str]): A description of the task. agent (Union[Callable, Agent]): The model or agent to execute the task.
task (str): The name of the task. args (List[Any]): Additional arguments to pass to the task execution.
result (Any): The result of the task. kwargs (Dict[str, Any]): Additional keyword arguments to pass to the task execution.
agents (Sequence[Agent]): A list of agents that can execute the task. result (Any): The result of the task execution.
dependencies (List[str], optional): A list of task names that must be executed before this task can be executed. Defaults to []. history (List[Any]): The history of the task execution.
args (List[Any], optional): A list of arguments to pass to the agents. Defaults to field(default_factory=list).
kwargs (List[Any], optional): A list of keyword arguments to pass to the agents. Defaults to field(default_factory=list).
Methods: Methods:
execute: Executes the task by passing the results of the parent tasks to the agents. execute: Execute the task.
Examples:
import os
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
from dotenv import load_dotenv
load_dotenv()
# Load the environment variables
api_key = os.getenv("OPENAI_API_KEY")
# Initialize the language agent
llm = OpenAIChat(
openai_api_key=api_key,
temperature=0.5,
max_tokens=3000,
)
# Initialize the agent with the language agent
agent1 = Agent(llm=llm, max_loops=1)
# Create another agent for a different task
agent2 = Agent(llm=llm, max_loops=1)
# Create the workflow
workflow = SequentialWorkflow(max_loops=1)
# Add tasks to the workflow
workflow.add(
agent1, "Generate a 10,000 word blog on health and wellness.",
)
# Suppose the next task takes the output of the first task as input Examples:
workflow.add( >>> from swarms.structs import Task, Agent
agent2, "Summarize the generated blog", >>> from swarms.models import OpenAIChat
) >>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
>>> task = Task(description="What's the weather in miami", agent=agent)
# Run the workflow >>> task.execute()
workflow.run() >>> task.result
# Output the results
for task in workflow.tasks:
print(f"Task: {task.description}, Result: {task.result}")
""" """
def __init__( description: str
self, agent: Union[Callable, Agent]
id: str, args: List[Any] = field(default_factory=list)
description: Optional[str], kwargs: Dict[str, Any] = field(default_factory=dict)
task: str, result: Any = None
result: Any, history: List[Any] = field(default_factory=list)
agents: Sequence[Agent], # logger = logging.getLogger(__name__)
dependencies: List[str] = [],
args: List[Any] = field(default_factory=list),
kwargs: List[Any] = field(default_factory=list),
):
self.id = id
self.description = description
self.task = task
self.result = result
self.agents = agents
self.dependencies = dependencies
self.results = []
self.args = args
self.kwargs = kwargs
def execute(self, parent_results: Dict[str, Any]): def execute(self):
"""Executes the task by passing the results of the parent tasks to the agents. """
Execute the task.
Args:
parent_results (Dict[str, Any]): A dictionary of task names and their results.
Examples: Raises:
ValueError: If a Agent instance is used as a task and the 'task' argument is not provided.
""" """
args = [parent_results[dep] for dep in self.dependencies] if isinstance(self.agent, Agent):
for agent in self.agents: # Add a prompt to notify the Agent of the sequential workflow
if isinstance(agent, Agent): if "prompt" in self.kwargs:
if "prompt" in self.kwargs: self.kwargs["prompt"] += (
self.kwargs["prompt"] += ( f"\n\nPrevious output: {self.result}"
f"\n\nPrevious output: {self.results[-1]}" if self.result
if self.results else ""
)
else:
self.kwargs["prompt"] = (
f"Main task: {self.description}"
+ (
f"\n\nPrevious output: {self.result}"
if self.result
else "" else ""
) )
else:
self.kwargs["prompt"] = (
f"Main task: {self.description}"
+ (
f"\n\nPrevious output: {self.results[-1]}"
if self.results
else ""
)
)
result = agent.run(
self.description, *args, **self.kwargs
) )
else: self.result = self.agent.run(*self.args, **self.kwargs)
result = agent(self.description, *args, **self.kwargs) else:
self.results.append(result) self.result = self.agent(*self.args, **self.kwargs)
args = [result]
self.history.append(result) self.history.append(self.result)

@ -1,6 +1,6 @@
import os import os
from unittest.mock import patch from unittest.mock import patch
from swarms.memory.pinecone import PineconeVectorStore from swarms.memory.pinecone import PineconDB
api_key = os.getenv("PINECONE_API_KEY") or "" api_key = os.getenv("PINECONE_API_KEY") or ""
@ -9,7 +9,7 @@ def test_init():
with patch("pinecone.init") as MockInit, patch( with patch("pinecone.init") as MockInit, patch(
"pinecone.Index" "pinecone.Index"
) as MockIndex: ) as MockIndex:
store = PineconeVectorStore( store = PineconDB(
api_key=api_key, api_key=api_key,
index_name="test_index", index_name="test_index",
environment="test_env", environment="test_env",
@ -21,7 +21,7 @@ def test_init():
def test_upsert_vector(): def test_upsert_vector():
with patch("pinecone.init"), patch("pinecone.Index") as MockIndex: with patch("pinecone.init"), patch("pinecone.Index") as MockIndex:
store = PineconeVectorStore( store = PineconDB(
api_key=api_key, api_key=api_key,
index_name="test_index", index_name="test_index",
environment="test_env", environment="test_env",
@ -37,7 +37,7 @@ def test_upsert_vector():
def test_load_entry(): def test_load_entry():
with patch("pinecone.init"), patch("pinecone.Index") as MockIndex: with patch("pinecone.init"), patch("pinecone.Index") as MockIndex:
store = PineconeVectorStore( store = PineconDB(
api_key=api_key, api_key=api_key,
index_name="test_index", index_name="test_index",
environment="test_env", environment="test_env",
@ -48,7 +48,7 @@ def test_load_entry():
def test_load_entries(): def test_load_entries():
with patch("pinecone.init"), patch("pinecone.Index") as MockIndex: with patch("pinecone.init"), patch("pinecone.Index") as MockIndex:
store = PineconeVectorStore( store = PineconDB(
api_key=api_key, api_key=api_key,
index_name="test_index", index_name="test_index",
environment="test_env", environment="test_env",
@ -59,7 +59,7 @@ def test_load_entries():
def test_query(): def test_query():
with patch("pinecone.init"), patch("pinecone.Index") as MockIndex: with patch("pinecone.init"), patch("pinecone.Index") as MockIndex:
store = PineconeVectorStore( store = PineconDB(
api_key=api_key, api_key=api_key,
index_name="test_index", index_name="test_index",
environment="test_env", environment="test_env",
@ -72,7 +72,7 @@ def test_create_index():
with patch("pinecone.init"), patch("pinecone.Index"), patch( with patch("pinecone.init"), patch("pinecone.Index"), patch(
"pinecone.create_index" "pinecone.create_index"
) as MockCreateIndex: ) as MockCreateIndex:
store = PineconeVectorStore( store = PineconDB(
api_key=api_key, api_key=api_key,
index_name="test_index", index_name="test_index",
environment="test_env", environment="test_env",

@ -6,7 +6,7 @@ from swarms.memory.qdrant import Qdrant
@pytest.fixture @pytest.fixture
def mock_qdrant_client(): def mock_qdrant_client():
with patch("your_module.QdrantClient") as MockQdrantClient: with patch("swarms.memory.Qdrant") as MockQdrantClient:
yield MockQdrantClient() yield MockQdrantClient()

@ -0,0 +1,128 @@
import pytest
from swarms.memory.short_term_memory import ShortTermMemory
import threading
def test_init():
memory = ShortTermMemory()
assert memory.short_term_memory == []
assert memory.medium_term_memory == []
def test_add():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
assert memory.short_term_memory == [
{"role": "user", "message": "Hello, world!"}
]
def test_get_short_term():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
assert memory.get_short_term() == [
{"role": "user", "message": "Hello, world!"}
]
def test_get_medium_term():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
memory.move_to_medium_term(0)
assert memory.get_medium_term() == [
{"role": "user", "message": "Hello, world!"}
]
def test_clear_medium_term():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
memory.move_to_medium_term(0)
memory.clear_medium_term()
assert memory.get_medium_term() == []
def test_get_short_term_memory_str():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
assert (
memory.get_short_term_memory_str()
== "[{'role': 'user', 'message': 'Hello, world!'}]"
)
def test_update_short_term():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
memory.update_short_term(0, "user", "Goodbye, world!")
assert memory.get_short_term() == [
{"role": "user", "message": "Goodbye, world!"}
]
def test_clear():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
memory.clear()
assert memory.get_short_term() == []
def test_search_memory():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
assert memory.search_memory("Hello") == {
"short_term": [
(0, {"role": "user", "message": "Hello, world!"})
],
"medium_term": [],
}
def test_return_shortmemory_as_str():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
assert (
memory.return_shortmemory_as_str()
== "[{'role': 'user', 'message': 'Hello, world!'}]"
)
def test_move_to_medium_term():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
memory.move_to_medium_term(0)
assert memory.get_medium_term() == [
{"role": "user", "message": "Hello, world!"}
]
assert memory.get_short_term() == []
def test_return_medium_memory_as_str():
memory = ShortTermMemory()
memory.add("user", "Hello, world!")
memory.move_to_medium_term(0)
assert (
memory.return_medium_memory_as_str()
== "[{'role': 'user', 'message': 'Hello, world!'}]"
)
def test_thread_safety():
memory = ShortTermMemory()
def add_messages():
for _ in range(1000):
memory.add("user", "Hello, world!")
threads = [threading.Thread(target=add_messages) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert len(memory.get_short_term()) == 10000
def test_save_and_load():
memory1 = ShortTermMemory()
memory1.add("user", "Hello, world!")
memory1.save_to_file("memory.json")
memory2 = ShortTermMemory()
memory2.load_from_file("memory.json")
assert memory1.get_short_term() == memory2.get_short_term()
assert memory1.get_medium_term() == memory2.get_medium_term()

@ -108,3 +108,59 @@ def test_task_execute_with_mocked_agents(task, mocker):
parent_results = {} parent_results = {}
task.execute(parent_results) task.execute(parent_results)
assert len(task.results) == 5 assert len(task.results) == 5
def test_task_creation():
agent = Agent()
task = Task(id="1", task="Task1", result=None, agents=[agent])
assert task.id == "1"
assert task.task == "Task1"
assert task.result is None
assert task.agents == [agent]
def test_task_with_dependencies():
agent = Agent()
task = Task(
id="2",
task="Task2",
result=None,
agents=[agent],
dependencies=["Task1"],
)
assert task.dependencies == ["Task1"]
def test_task_with_args():
agent = Agent()
task = Task(
id="3",
task="Task3",
result=None,
agents=[agent],
args=["arg1", "arg2"],
)
assert task.args == ["arg1", "arg2"]
def test_task_with_kwargs():
agent = Agent()
task = Task(
id="4",
task="Task4",
result=None,
agents=[agent],
kwargs={"kwarg1": "value1"},
)
assert task.kwargs == {"kwarg1": "value1"}
# ... continue creating tests for different scenarios
# Test execute method
def test_execute():
agent = Agent()
task = Task(id="5", task="Task5", result=None, agents=[agent])
# Assuming execute method returns True on successful execution
assert task.execute() == True

Loading…
Cancel
Save