[FEAT][ChromaDB] [FEAT][tool_func_doc_scraper] [FEAT][BaseVectorStore] [FEAT][memory -> short_memory] [FEAT][memory: BaseVectorDB]

pull/239/head
Kye 1 year ago
parent 61976acb6b
commit 60d777080d

@ -20,7 +20,7 @@ llm = OpenAIChat(
## Initialize the workflow ## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, dashboard=True) agent = Agent(llm=llm, max_loops=1, dashboard=True, autosave=True)
# Run the workflow on a task # Run the workflow on a task
out = agent.run("Generate a 10,000 word blog on health and wellness.") out = agent.run("Generate a 10,000 word blog on health and wellness.")

@ -1,15 +0,0 @@
import concurrent.futures
import time
import random
from swarms.utils.futures import execute_futures_dict
def f(x):
time.sleep(random.random())
return x
with concurrent.futures.ThreadPoolExecutor() as executor:
"""Create a dictionary of futures."""
fs_dict = {str(i): executor.submit(f, i) for i in range(10)}
print(execute_futures_dict(fs_dict))

@ -10,43 +10,62 @@ openai_api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABILITY_API_KEY") stability_api_key = os.getenv("STABILITY_API_KEY")
# Initialize the language model and image generation model # Initialize the language model and image generation model
llm = OpenAIChat(openai_api_key=openai_api_key, temperature=0.5, max_tokens=3000) llm = OpenAIChat(
openai_api_key=openai_api_key, temperature=0.5, max_tokens=3000
)
sd_api = StableDiffusion(api_key=stability_api_key) sd_api = StableDiffusion(api_key=stability_api_key)
# Creative Concept Generator for Product Ads # Creative Concept Generator for Product Ads
class ProductAdConceptGenerator: class ProductAdConceptGenerator:
def __init__(self, product_name): def __init__(self, product_name):
self.product_name = product_name self.product_name = product_name
self.themes = [ self.themes = [
"futuristic", "rustic", "luxurious", "minimalistic", "vibrant", "elegant", "futuristic",
"retro", "urban", "ethereal", "surreal", "artistic", "tech-savvy", "rustic",
"vintage", "natural", "sophisticated", "playful", "dynamic", "serene", "lasers," "lightning" "luxurious",
"minimalistic",
"vibrant",
"elegant",
"retro",
"urban",
"ethereal",
"surreal",
"artistic",
"tech-savvy",
"vintage",
"natural",
"sophisticated",
"playful",
"dynamic",
"serene",
"lasers,lightning",
] ]
self.contexts = [ self.contexts = [
"in an everyday setting", "in a rave setting", "in an abstract environment", "in an everyday setting",
"in an adventurous context", "surrounded by nature", "in a high-tech setting", "in a rave setting",
"in a historical context", "in a busy urban scene", "in a tranquil and peaceful setting", "in an abstract environment",
"against a backdrop of city lights", "in a surreal dreamscape", "in a festive atmosphere", "in an adventurous context",
"in a luxurious setting", "in a playful and colorful background", "in an ice cave setting", "surrounded by nature",
"in a serene and calm landscape" "in a high-tech setting",
"in a historical context",
"in a busy urban scene",
"in a tranquil and peaceful setting",
"against a backdrop of city lights",
"in a surreal dreamscape",
"in a festive atmosphere",
"in a luxurious setting",
"in a playful and colorful background",
"in an ice cave setting",
"in a serene and calm landscape",
] ]
<<<<<<< HEAD
=======
self.contexts = [ self.contexts = [
"high realism product ad (extremely creative)" "high realism product ad (extremely creative)"
] ]
>>>>>>> 831147e ([CODE QUALITY])
def generate_concept(self): def generate_concept(self):
theme = random.choice(self.themes) theme = random.choice(self.themes)
context = random.choice(self.contexts) context = random.choice(self.contexts)
<<<<<<< HEAD
return f"An ad for {self.product_name} that embodies a {theme} theme {context}"
# User input
product_name = input("Enter a product name for ad creation (e.g., 'PS5', 'AirPods', 'Kirkland Vodka'): ")
social_media_platform = input("Enter a social media platform (e.g., 'Facebook', 'Twitter', 'Instagram'): ")
=======
return ( return (
f"{theme} inside a {style} {self.product_name}, {context}" f"{theme} inside a {style} {self.product_name}, {context}"
) )
@ -57,7 +76,6 @@ product_name = input(
"Enter a product name for ad creation (e.g., 'PS5', 'AirPods'," "Enter a product name for ad creation (e.g., 'PS5', 'AirPods',"
" 'Kirkland Vodka'): " " 'Kirkland Vodka'): "
) )
>>>>>>> 831147e ([CODE QUALITY])
# Generate creative concept # Generate creative concept
concept_generator = ProductAdConceptGenerator(product_name) concept_generator = ProductAdConceptGenerator(product_name)
@ -68,15 +86,13 @@ image_paths = sd_api.run(creative_concept)
# Generate ad copy # Generate ad copy
ad_copy_agent = Agent(llm=llm, max_loops=1) ad_copy_agent = Agent(llm=llm, max_loops=1)
ad_copy_prompt = f"Write a compelling {social_media_platform} ad copy for a product photo showing {product_name} {creative_concept}." ad_copy_prompt = (
f"Write a compelling {social_media_platform} ad copy for a"
f" product photo showing {product_name} {creative_concept}."
)
ad_copy = ad_copy_agent.run(task=ad_copy_prompt) ad_copy = ad_copy_agent.run(task=ad_copy_prompt)
# Output the results # Output the results
<<<<<<< HEAD
print("Creative Concept:", creative_concept)
print("Image Path:", image_paths[0] if image_paths else "No image generated")
print("Ad Copy:", ad_copy)
=======
print("Creative Concept:", concept_result) print("Creative Concept:", concept_result)
print("Design Ideas:", design_result) print("Design Ideas:", design_result)
print("Ad Copy:", copywriting_result) print("Ad Copy:", copywriting_result)
@ -84,4 +100,3 @@ print(
"Image Path:", "Image Path:",
image_paths[0] if image_paths else "No image generated", image_paths[0] if image_paths else "No image generated",
) )
>>>>>>> 831147e ([CODE QUALITY])

@ -0,0 +1,21 @@
from swarms.tools.tool import tool
from swarms.tools.tool_func_doc_scraper import scrape_tool_func_docs
# Define a tool by decorating a function with the tool decorator and providing a docstring
@tool(return_direct=True)
def search_api(query: str):
"""Search the web for the query
Args:
query (str): _description_
Returns:
_type_: _description_
"""
return f"Search results for {query}"
# Scrape the tool func docs to prepare for injection into the agent prompt
out = scrape_tool_func_docs(search_api)
print(out)

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "2.5.0" version = "2.5.2"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,60 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
class VectorDatabase(ABC):
@abstractmethod
def add(
self, vector: Dict[str, Any], metadata: Dict[str, Any]
) -> None:
"""
add a vector into the database.
Args:
vector (Dict[str, Any]): The vector to add.
metadata (Dict[str, Any]): Metadata associated with the vector.
"""
pass
@abstractmethod
def query(
self, vector: Dict[str, Any], num_results: int
) -> Dict[str, Any]:
"""
Query the database for vectors similar to the given vector.
Args:
vector (Dict[str, Any]): The vector to compare against.
num_results (int): The number of similar vectors to return.
Returns:
Dict[str, Any]: The most similar vectors and their associated metadata.
"""
pass
@abstractmethod
def delete(self, vector_id: str) -> None:
"""
Delete a vector from the database.
Args:
vector_id (str): The ID of the vector to delete.
"""
pass
@abstractmethod
def update(
self,
vector_id: str,
vector: Dict[str, Any],
metadata: Dict[str, Any],
) -> None:
"""
Update a vector in the database.
Args:
vector_id (str): The ID of the vector to update.
vector (Dict[str, Any]): The new vector.
metadata (Dict[str, Any]): The new metadata.
"""
pass

@ -0,0 +1,172 @@
import os
from termcolor import colored
import logging
from typing import Dict, List, Optional
import chromadb
import tiktoken as tiktoken
from chromadb.config import Settings
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from dotenv import load_dotenv
from swarms.utils.token_count_tiktoken import limit_tokens_from_string
load_dotenv()
# ChromaDB settings
client = chromadb.Client(Settings(anonymized_telemetry=False))
# ChromaDB client
def get_chromadb_client():
return client
# OpenAI API key
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Results storage using local ChromaDB
class ChromaDB:
"""
ChromaDB database
Args:
metric (str): _description_
RESULTS_STORE_NAME (str): _description_
LLM_MODEL (str): _description_
openai_api_key (str): _description_
Methods:
add: _description_
query: _description_
Examples:
>>> chromadb = ChromaDB(
>>> metric="cosine",
>>> RESULTS_STORE_NAME="results",
>>> LLM_MODEL="gpt3",
>>> openai_api_key=OPENAI_API_KEY,
>>> )
>>> chromadb.add(task, result, result_id)
>>> chromadb.query(query, top_results_num)
"""
def __init__(
self,
metric: str,
RESULTS_STORE_NAME: str,
LLM_MODEL: str,
openai_api_key: str = OPENAI_API_KEY,
top_results_num: int = 3,
limit_tokens: Optional[int] = 1000,
):
self.metric = metric
self.RESULTS_STORE_NAME = RESULTS_STORE_NAME
self.LLM_MODEL = LLM_MODEL
self.openai_api_key = openai_api_key
self.top_results_num = top_results_num
self.limit_tokens = limit_tokens
# Disable ChromaDB logging
logging.getLogger("chromadb").setLevel(logging.ERROR)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.PersistentClient(
settings=chromadb.config.Settings(
persist_directory=chroma_persist_dir,
)
)
# Create embedding function
embedding_function = OpenAIEmbeddingFunction(
api_key=openai_api_key
)
# Create Chroma collection
self.collection = chroma_client.get_or_create_collection(
name=RESULTS_STORE_NAME,
metadata={"hnsw:space": metric},
embedding_function=embedding_function,
)
def add(self, task: Dict, result: str, result_id: str):
"""Adds a result to the ChromaDB collection
Args:
task (Dict): _description_
result (str): _description_
result_id (str): _description_
"""
try:
# Embed the result
embeddings = (
self.collection.embedding_function.embed([result])[0]
.tolist()
.copy()
)
# If the result is a list, flatten it
if (
len(
self.collection.get(ids=[result_id], include=[])[
"ids"
]
)
> 0
): # Check if the result already exists
self.collection.update(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={
"task": task["task_name"],
"result": result,
},
)
# If the result is not a list, add it
else:
self.collection.add(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={
"task": task["task_name"],
"result": result,
},
)
except Exception as error:
print(
colored(f"Error adding to ChromaDB: {error}", "red")
)
def query(
self,
query: str,
) -> List[dict]:
"""Queries the ChromaDB collection with a query for the top results
Args:
query (str): _description_
top_results_num (int): _description_
Returns:
List[dict]: _description_
"""
try:
count: int = self.collection.count()
if count == 0:
return []
results = self.collection.query(
query_texts=query,
n_results=min(self.top_results_num, count),
include=["metadatas"],
)
out = [item["task"] for item in results["metadatas"][0]]
out = limit_tokens_from_string(
out, "gpt-4", self.limit_tokens
)
return out
except Exception as error:
print(colored(f"Error querying ChromaDB: {error}", "red"))

@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple
from termcolor import colored from termcolor import colored
from swarms.memory.base_vector_db import VectorDatabase
from swarms.prompts.agent_system_prompts import ( from swarms.prompts.agent_system_prompts import (
FLOW_SYSTEM_PROMPT, FLOW_SYSTEM_PROMPT,
agent_system_prompt_2, agent_system_prompt_2,
@ -26,6 +27,7 @@ from swarms.utils.parse_code import (
extract_code_in_backticks_in_string, extract_code_in_backticks_in_string,
) )
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.token_count_tiktoken import limit_tokens_from_string
# Utils # Utils
@ -35,11 +37,13 @@ def stop_when_repeats(response: str) -> bool:
return "Stop" in response.lower() return "Stop" in response.lower()
# Parse done token
def parse_done_token(response: str) -> bool: def parse_done_token(response: str) -> bool:
"""Parse the response to see if the done token is present""" """Parse the response to see if the done token is present"""
return "<DONE>" in response return "<DONE>" in response
# Agent ID generator
def agent_id(): def agent_id():
"""Generate an agent id""" """Generate an agent id"""
return str(uuid.uuid4()) return str(uuid.uuid4())
@ -58,16 +62,40 @@ class Agent:
* Ability to provide a loop interval * Ability to provide a loop interval
Args: Args:
id (str): The id of the agent
llm (Any): The language model to use llm (Any): The language model to use
max_loops (int): The maximum number of loops to run template (Optional[str]): The template to use
stopping_condition (Optional[Callable[[str], bool]]): A stopping condition max_loops (int): The maximum number of loops
loop_interval (int): The interval between loops stopping_condition (Optional[Callable[[str], bool]]): The stopping condition
retry_attempts (int): The number of retry attempts loop_interval (int): The loop interval
retry_interval (int): The interval between retry attempts retry_attempts (int): The retry attempts
interactive (bool): Whether or not to run in interactive mode retry_interval (int): The retry interval
dashboard (bool): Whether or not to print the dashboard return_history (bool): Return the history
dynamic_temperature_enabled(bool): Dynamical temperature handling stopping_token (str): The stopping token
**kwargs (Any): Any additional keyword arguments dynamic_loops (Optional[bool]): Dynamic loops
interactive (bool): Interactive mode
dashboard (bool): Dashboard mode
agent_name (str): The name of the agent
agent_description (str): The description of the agent
system_prompt (str): The system prompt
tools (List[BaseTool]): The tools
dynamic_temperature_enabled (Optional[bool]): Dynamic temperature enabled
sop (Optional[str]): The standard operating procedure
sop_list (Optional[List[str]]): The standard operating procedure list
saved_state_path (Optional[str]): The saved state path
autosave (Optional[bool]): Autosave
context_length (Optional[int]): The context length
user_name (str): The user name
self_healing_enabled (Optional[bool]): Self healing enabled
code_interpreter (Optional[bool]): Code interpreter
multi_modal (Optional[bool]): Multi modal
pdf_path (Optional[str]): The pdf path
list_of_pdf (Optional[str]): The list of pdf
tokenizer (Optional[Any]): The tokenizer
memory (Optional[VectorDatabase]): The memory
preset_stopping_token (Optional[bool]): Preset stopping token
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Methods: Methods:
run(task: str, **kwargs: Any): Run the agent on a task run(task: str, **kwargs: Any): Run the agent on a task
@ -143,15 +171,14 @@ class Agent:
dynamic_loops: Optional[bool] = False, dynamic_loops: Optional[bool] = False,
interactive: bool = False, interactive: bool = False,
dashboard: bool = False, dashboard: bool = False,
agent_name: str = "Autonomous Agent XYZ1B", agent_name: str = "Autonomous-Agent-XYZ1B",
agent_description: str = None, agent_description: str = None,
system_prompt: str = FLOW_SYSTEM_PROMPT, system_prompt: str = FLOW_SYSTEM_PROMPT,
tools: List[BaseTool] = None, tools: List[BaseTool] = None,
dynamic_temperature_enabled: Optional[bool] = False, dynamic_temperature_enabled: Optional[bool] = False,
sop: Optional[str] = None, sop: Optional[str] = None,
sop_list: Optional[List[str]] = None, sop_list: Optional[List[str]] = None,
# memory: Optional[Vectorstore] = None, saved_state_path: Optional[str] = None,
saved_state_path: Optional[str] = "flow_state.json",
autosave: Optional[bool] = False, autosave: Optional[bool] = False,
context_length: Optional[int] = 8192, context_length: Optional[int] = 8192,
user_name: str = "Human:", user_name: str = "Human:",
@ -161,6 +188,8 @@ class Agent:
pdf_path: Optional[str] = None, pdf_path: Optional[str] = None,
list_of_pdf: Optional[str] = None, list_of_pdf: Optional[str] = None,
tokenizer: Optional[Any] = None, tokenizer: Optional[Any] = None,
memory: Optional[VectorDatabase] = None,
preset_stopping_token: Optional[bool] = False,
*args, *args,
**kwargs: Any, **kwargs: Any,
): ):
@ -187,7 +216,7 @@ class Agent:
self.system_prompt = system_prompt self.system_prompt = system_prompt
self.agent_name = agent_name self.agent_name = agent_name
self.agent_description = agent_description self.agent_description = agent_description
self.saved_state_path = saved_state_path self.saved_state_path = f"{self.agent_name}_state.json"
self.autosave = autosave self.autosave = autosave
self.response_filters = [] self.response_filters = []
self.self_healing_enabled = self_healing_enabled self.self_healing_enabled = self_healing_enabled
@ -196,6 +225,8 @@ class Agent:
self.pdf_path = pdf_path self.pdf_path = pdf_path
self.list_of_pdf = list_of_pdf self.list_of_pdf = list_of_pdf
self.tokenizer = tokenizer self.tokenizer = tokenizer
self.memory = memory
self.preset_stopping_token = preset_stopping_token
# The max_loops will be set dynamically if the dynamic_loop # The max_loops will be set dynamically if the dynamic_loop
if self.dynamic_loops: if self.dynamic_loops:
@ -211,11 +242,15 @@ class Agent:
# Memory # Memory
self.feedback = [] self.feedback = []
self.memory = [] self.short_memory = []
# Initialize the code executor # Initialize the code executor
self.code_executor = SubprocessCodeInterpreter() self.code_executor = SubprocessCodeInterpreter()
# If the preset stopping token is enabled then set the stopping token to the preset stopping token
if preset_stopping_token:
self.stopping_token = "<DONE>"
def provide_feedback(self, feedback: str) -> None: def provide_feedback(self, feedback: str) -> None:
"""Allow users to provide feedback on the responses.""" """Allow users to provide feedback on the responses."""
self.feedback.append(feedback) self.feedback.append(feedback)
@ -349,20 +384,29 @@ class Agent:
""" """
Take the history and truncate it to fit into the model context length Take the history and truncate it to fit into the model context length
""" """
truncated_history = self.memory[-1][-self.context_length :] # truncated_history = self.short_memory[-1][-self.context_length :]
self.memory[-1] = truncated_history # self.short_memory[-1] = truncated_history
# out = limit_tokens_from_string(
# "\n".join(truncated_history), self.llm.model_name
# )
truncated_history = self.short_memory[-1][
-self.context_length :
]
text = "\n".join(truncated_history)
out = limit_tokens_from_string(text, "gpt-4")
return out
def add_task_to_memory(self, task: str): def add_task_to_memory(self, task: str):
"""Add the task to the memory""" """Add the task to the memory"""
self.memory.append([f"{self.user_name}: {task}"]) self.short_memory.append([f"{self.user_name}: {task}"])
def add_message_to_memory(self, message: str): def add_message_to_memory(self, message: str):
"""Add the message to the memory""" """Add the message to the memory"""
self.memory[-1].append(message) self.short_memory[-1].append(message)
def add_message_to_memory_and_truncate(self, message: str): def add_message_to_memory_and_truncate(self, message: str):
"""Add the message to the memory and truncate""" """Add the message to the memory and truncate"""
self.memory[-1].append(message) self.short_memory[-1].append(message)
self.truncate_history() self.truncate_history()
def print_dashboard(self, task: str): def print_dashboard(self, task: str):
@ -404,7 +448,36 @@ class Agent:
) )
) )
# print(dashboard) def add_message_to_memory_db(
self, message: Dict[str, Any], metadata: Dict[str, Any]
) -> None:
"""Add the message to the memory
Args:
message (Dict[str, Any]): _description_
metadata (Dict[str, Any]): _description_
"""
if self.memory is not None:
self.memory.add(message, metadata)
def query_memorydb(
self,
message: Dict[str, Any],
num_results: int = 100,
) -> Dict[str, Any]:
"""Query the memory database
Args:
message (Dict[str, Any]): _description_
num_results (int): _description_
Returns:
Dict[str, Any]: _description_
"""
if self.memory is not None:
return self.memory.query(message, num_results)
else:
return {}
def activate_autonomous_agent(self): def activate_autonomous_agent(self):
"""Print the autonomous agent activation message""" """Print the autonomous agent activation message"""
@ -588,18 +661,20 @@ class Agent:
time.sleep(self.loop_interval) time.sleep(self.loop_interval)
# Add the history to the memory # Add the history to the memory
self.memory.append(history) self.short_memory.append(history)
# If autosave is enabled then save the state # If autosave is enabled then save the state
if self.autosave: if self.autosave:
save_path = self.saved_state_path or "flow_state.json"
print( print(
colored( colored(
f"Autosaving agent state to {save_path}", (
"Autosaving agent state to"
f" {self.saved_state_path}"
),
"green", "green",
) )
) )
self.save_state(save_path) self.save_state(self.saved_state_path)
# If return history is enabled then return the response and history # If return history is enabled then return the response and history
if self.return_history: if self.return_history:
@ -685,13 +760,16 @@ class Agent:
self.memory.append(history) self.memory.append(history)
if self.autosave: if self.autosave:
save_path = self.saved_state_path or "flow_state.json"
print( print(
colored( colored(
f"Autosaving agent state to {save_path}", "green" (
"Autosaving agent state to"
f" {self.saved_state_path}"
),
"green",
) )
) )
self.save_state(save_path) self.save_state(self.saved_state_path)
if self.return_history: if self.return_history:
return response, history return response, history
@ -776,8 +854,13 @@ class Agent:
return Agent(llm=llm, template=template) return Agent(llm=llm, template=template)
def save(self, file_path) -> None: def save(self, file_path) -> None:
"""Save the agent history to a file.
Args:
file_path (_type_): _description_
"""
with open(file_path, "w") as f: with open(file_path, "w") as f:
json.dump(self.memory, f) json.dump(self.short_memory, f)
print(f"Saved agent history to {file_path}") print(f"Saved agent history to {file_path}")
def load(self, file_path: str): def load(self, file_path: str):
@ -788,7 +871,7 @@ class Agent:
file_path (str): The path to the file containing the saved agent history. file_path (str): The path to the file containing the saved agent history.
""" """
with open(file_path, "r") as f: with open(file_path, "r") as f:
self.memory = json.load(f) self.short_memory = json.load(f)
print(f"Loaded agent history from {file_path}") print(f"Loaded agent history from {file_path}")
def validate_response(self, response: str) -> bool: def validate_response(self, response: str) -> bool:
@ -813,7 +896,9 @@ class Agent:
"========================", "cyan", attrs=["bold"] "========================", "cyan", attrs=["bold"]
) )
) )
for loop_index, history in enumerate(self.memory, start=1): for loop_index, history in enumerate(
self.short_memory, start=1
):
print( print(
colored( colored(
f"\nLoop {loop_index}:", "yellow", attrs=["bold"] f"\nLoop {loop_index}:", "yellow", attrs=["bold"]
@ -856,10 +941,10 @@ class Agent:
# Update the agent's history with the new interaction # Update the agent's history with the new interaction
if self.interactive: if self.interactive:
self.memory.append(f"AI: {response}") self.short_memory.append(f"AI: {response}")
self.memory.append(f"Human: {task}") self.short_memory.append(f"Human: {task}")
else: else:
self.memory.append(f"AI: {response}") self.short_memory.append(f"AI: {response}")
return response return response
except Exception as error: except Exception as error:
@ -903,14 +988,14 @@ class Agent:
print(message) print(message)
""" """
if len(self.memory) < 2: if len(self.short_memory) < 2:
return None, None return None, None
# Remove the last response # Remove the last response
self.memory.pop() self.short_memory.pop()
# Get the previous state # Get the previous state
previous_state = self.memory[-1][-1] previous_state = self.short_memory[-1][-1]
return previous_state, f"Restored to {previous_state}" return previous_state, f"Restored to {previous_state}"
# Response Filtering # Response Filtering
@ -930,7 +1015,6 @@ class Agent:
""" """
Apply the response filters to the response Apply the response filters to the response
""" """
for word in self.response_filters: for word in self.response_filters:
response = response.replace(word, "[FILTERED]") response = response.replace(word, "[FILTERED]")
@ -1029,21 +1113,28 @@ class Agent:
>>> agent.save_state('saved_flow.json') >>> agent.save_state('saved_flow.json')
""" """
state = { state = {
"memory": self.memory, "agent_id": str(self.id),
# "llm_params": self.get_llm_params(), "agent_name": self.agent_name,
"agent_description": self.agent_description,
"system_prompt": self.system_prompt,
"sop": self.sop,
"memory": self.short_memory,
"loop_interval": self.loop_interval, "loop_interval": self.loop_interval,
"retry_attempts": self.retry_attempts, "retry_attempts": self.retry_attempts,
"retry_interval": self.retry_interval, "retry_interval": self.retry_interval,
"interactive": self.interactive, "interactive": self.interactive,
"dashboard": self.dashboard, "dashboard": self.dashboard,
"dynamic_temperature": self.dynamic_temperature_enabled, "dynamic_temperature": self.dynamic_temperature_enabled,
"autosave": self.autosave,
"saved_state_path": self.saved_state_path,
"max_loops": self.max_loops,
} }
with open(file_path, "w") as f: with open(file_path, "w") as f:
json.dump(state, f, indent=4) json.dump(state, f, indent=4)
saved = colored("Saved agent state to", "green") saved = colored(f"Saved agent state to: {file_path}", "green")
print(f"{saved} {file_path}") print(saved)
def load_state(self, file_path: str): def load_state(self, file_path: str):
""" """
@ -1060,7 +1151,16 @@ class Agent:
state = json.load(f) state = json.load(f)
# Restore other saved attributes # Restore other saved attributes
self.memory = state.get("memory", []) self.id = state.get("agent_id", self.id)
self.agent_name = state.get("agent_name", self.agent_name)
self.agent_description = state.get(
"agent_description", self.agent_description
)
self.system_prompt = state.get(
"system_prompt", self.system_prompt
)
self.sop = state.get("sop", self.sop)
self.short_memory = state.get("short_memory", [])
self.max_loops = state.get("max_loops", 5) self.max_loops = state.get("max_loops", 5)
self.loop_interval = state.get("loop_interval", 1) self.loop_interval = state.get("loop_interval", 1)
self.retry_attempts = state.get("retry_attempts", 3) self.retry_attempts = state.get("retry_attempts", 3)
@ -1120,7 +1220,7 @@ class Agent:
def reset(self): def reset(self):
"""Reset the agent""" """Reset the agent"""
self.memory = [] self.short_memory = []
def run_code(self, code: str): def run_code(self, code: str):
""" """
@ -1143,7 +1243,7 @@ class Agent:
text = pdf_to_text(pdf) text = pdf_to_text(pdf)
return text return text
def pdf_chunker(self, text: str = None): def pdf_chunker(self, text: str = None, num_limits: int = 1000):
"""Chunk the pdf into sentences """Chunk the pdf into sentences
Args: Args:
@ -1153,13 +1253,21 @@ class Agent:
_type_: _description_ _type_: _description_
""" """
text = text or self.pdf_connector() text = text or self.pdf_connector()
pass text = limit_tokens_from_string(text, num_limits)
return text
def tools_prompt_prep( def tools_prompt_prep(
self, docs: str = None, scenarios: str = None self, docs: str = None, scenarios: str = None
): ):
""" """
Prepare the tool prompt Tools prompt prep
Args:
docs (str, optional): _description_. Defaults to None.
scenarios (str, optional): _description_. Defaults to None.
Returns:
_type_: _description_
""" """
PROMPT = f""" PROMPT = f"""
# Task # Task
@ -1214,61 +1322,72 @@ class Agent:
""" """
# def self_healing(self, **kwargs): def self_healing(self, **kwargs):
# """ """
# Self healing by debugging errors and refactoring its own code Self healing by debugging errors and refactoring its own code
# Args: Args:
# **kwargs (Any): Any additional keyword arguments **kwargs (Any): Any additional keyword arguments
# """ """
# pass pass
# def refactor_code( def refactor_code(
# self, self, file: str, changes: List, confirm: bool = False
# file: str, ):
# changes: List, """_summary_
# confirm: bool = False
# ): Args:
# """ file (str): _description_
# Refactor the code changes (List): _description_
# """ confirm (bool, optional): _description_. Defaults to False.
# with open(file) as f: """
# original_file_lines = f.readlines() # with open(file) as f:
# original_file_lines = f.readlines()
# # Filter out the changes that are not confirmed
# operation_changes = [ # # Filter out the changes that are not confirmed
# change for change in changes if "operation" in change # operation_changes = [
# ] # change for change in changes if "operation" in change
# explanations = [ # ]
# change["explanation"] for change in changes if "explanation" in change # explanations = [
# ] # change["explanation"] for change in changes if "explanation" in change
# ]
# # Sort the changes in reverse line order
# # explanations.sort(key=lambda x: x["line", reverse=True]) # Sort the changes in reverse line order
# explanations.sort(key=lambda x: x["line", reverse=True])
# def error_prompt_inject( pass
# self,
# file_path: str, def error_prompt_inject(
# args: List, self,
# error: str, file_path: str,
# ): args: List,
# with open(file_path, "r") as f: error: str,
# file_lines = f.readlines() ):
"""
# file_with_lines = [] Error prompt injection
# for i, line in enumerate(file_lines):
# file_with_lines.append(str(i + 1) + "" + line) Args:
# file_with_lines = "".join(file_with_lines) file_path (str): _description_
args (List): _description_
# prompt = f""" error (str): _description_
# Here is the script that needs fixing:\n\n
# {file_with_lines}\n\n """
# Here are the arguments it was provided:\n\n # with open(file_path, "r") as f:
# {args}\n\n # file_lines = f.readlines()
# Here is the error message:\n\n
# {error}\n # file_with_lines = []
# "Please provide your suggested changes, and remember to stick to the " # for i, line in enumerate(file_lines):
# exact format as described above. # file_with_lines.append(str(i + 1) + "" + line)
# """ # file_with_lines = "".join(file_with_lines)
# print(prompt) # prompt = f"""
# Here is the script that needs fixing:\n\n
# {file_with_lines}\n\n
# Here are the arguments it was provided:\n\n
# {args}\n\n
# Here is the error message:\n\n
# {error}\n
# "Please provide your suggested changes, and remember to stick to the "
# exact format as described above.
# """
# print(prompt)
pass

@ -0,0 +1,3 @@
from swarms.tools.tool_func_doc_scraper import scrape_tool_func_docs
__all__ = ["scrape_tool_func_docs"]

@ -0,0 +1,36 @@
import inspect
from typing import Callable
from termcolor import colored
def scrape_tool_func_docs(fn: Callable) -> str:
"""
Scrape the docstrings and parameters of a function decorated with `tool` and return a formatted string.
Args:
fn (Callable): The function to scrape.
Returns:
str: A string containing the function's name, documentation string, and a list of its parameters. Each parameter is represented as a line containing the parameter's name, default value, and annotation.
"""
try:
# If the function is a tool, get the original function
if hasattr(fn, "func"):
fn = fn.func
signature = inspect.signature(fn)
parameters = []
for name, param in signature.parameters.items():
parameters.append(
f"Name: {name}, Type:"
f" {param.default if param.default is not param.empty else 'None'},"
" Annotation:"
f" {param.annotation if param.annotation is not param.empty else 'None'}"
)
parameters_str = "\n".join(parameters)
return (
f"Function: {fn.__name__}\nDocstring:"
f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}"
)
except Exception as error:
print(colored(f"Error scraping tool function docs {error} try optimizing your inputs with different variables and attempt once more.", "red"))

@ -0,0 +1,27 @@
import tiktoken
def limit_tokens_from_string(
string: str, model: str = "gpt-4", limit: int = 500
) -> str:
"""Limits the number of tokens in a string
Args:
string (str): _description_
model (str): _description_
limit (int): _description_
Returns:
str: _description_
"""
try:
encoding = tiktoken.encoding_for_model(model)
except Exception:
encoding = tiktoken.encoding_for_model(
"gpt2"
) # Fallback for others.
encoded = encoding.encode(string)
out = encoding.decode(encoded[:limit])
return out
Loading…
Cancel
Save