Merge branch 'kyegomez:master' into master

pull/525/head
everettVT 1 year ago committed by GitHub
commit 12416156fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -22,4 +22,4 @@ jobs:
- run: ruff format .
- run: ruff check --fix .
- uses: autofix-ci/action@d3e591514b99d0fca6779455ff8338516663f7cc
- uses: autofix-ci/action@dd55f44df8f7cdb7a6bf74c78677eb8acd40cd0a

@ -153,7 +153,3 @@ Please replace `/path/to/directory` with the actual path where the `code-quality
If you're asking for a specific content or functionality inside `code-quality.sh` related to YAPF or other code quality tools, you would need to edit the `code-quality.sh` script to include the desired commands, such as running YAPF on a directory. The contents of `code-quality.sh` would dictate exactly what happens when you run it.
## 📄 license
By contributing, you agree that your contributions will be licensed under an [MIT license](https://github.com/kyegomez/swarms/blob/develop/LICENSE.md).

@ -26,7 +26,7 @@
[![GitHub issues](https://img.shields.io/github/issues/kyegomez/swarms)](https://github.com/kyegomez/swarms/issues) [![GitHub forks](https://img.shields.io/github/forks/kyegomez/swarms)](https://github.com/kyegomez/swarms/network) [![GitHub stars](https://img.shields.io/github/stars/kyegomez/swarms)](https://github.com/kyegomez/swarms/stargazers) [![GitHub license](https://img.shields.io/github/license/kyegomez/swarms)](https://github.com/kyegomez/swarms/blob/main/LICENSE)[![GitHub star chart](https://img.shields.io/github/stars/kyegomez/swarms?style=social)](https://star-history.com/#kyegomez/swarms)[![Dependency Status](https://img.shields.io/librariesio/github/kyegomez/swarms)](https://libraries.io/github/kyegomez/swarms) [![Downloads](https://static.pepy.tech/badge/swarms/month)](https://pepy.tech/project/swarms)
[![Join the Agora discord](https://img.shields.io/discord/1110910277110743103?label=Discord&logo=discord&logoColor=white&style=plastic&color=d7b023)![Share on Twitter](https://img.shields.io/twitter/url/https/twitter.com/cloudposse.svg?style=social&label=Share%20%40kyegomez/swarms)](https://twitter.com/intent/tweet?text=Check%20out%20this%20amazing%20AI%20project:%20&url=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms) [![Share on Facebook](https://img.shields.io/badge/Share-%20facebook-blue)](https://www.facebook.com/sharer/sharer.php?u=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms) [![Share on LinkedIn](https://img.shields.io/badge/Share-%20linkedin-blue)](https://www.linkedin.com/shareArticle?mini=true&url=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms&title=&summary=&source=)
[![Share on Twitter](https://img.shields.io/twitter/url/https/twitter.com/cloudposse.svg?style=social&label=Share%20%40kyegomez/swarms)](https://twitter.com/intent/tweet?text=Check%20out%20this%20amazing%20AI%20project:%20&url=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms) [![Share on Facebook](https://img.shields.io/badge/Share-%20facebook-blue)](https://www.facebook.com/sharer/sharer.php?u=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms) [![Share on LinkedIn](https://img.shields.io/badge/Share-%20linkedin-blue)](https://www.linkedin.com/shareArticle?mini=true&url=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms&title=&summary=&source=)
[![Share on Reddit](https://img.shields.io/badge/-Share%20on%20Reddit-orange)](https://www.reddit.com/submit?url=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms&title=Swarms%20-%20the%20future%20of%20AI) [![Share on Hacker News](https://img.shields.io/badge/-Share%20on%20Hacker%20News-orange)](https://news.ycombinator.com/submitlink?u=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms&t=Swarms%20-%20the%20future%20of%20AI) [![Share on Pinterest](https://img.shields.io/badge/-Share%20on%20Pinterest-red)](https://pinterest.com/pin/create/button/?url=https%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms&media=https%3A%2F%2Fexample.com%2Fimage.jpg&description=Swarms%20-%20the%20future%20of%20AI) [![Share on WhatsApp](https://img.shields.io/badge/-Share%20on%20WhatsApp-green)](https://api.whatsapp.com/send?text=Check%20out%20Swarms%20-%20the%20future%20of%20AI%20%23swarms%20%23AI%0A%0Ahttps%3A%2F%2Fgithub.com%2Fkyegomez%2Fswarms)
@ -39,21 +39,12 @@
Swarms is an enterprise grade and production ready multi-agent collaboration framework that enables you to orchestrate many agents to work collaboratively at scale to automate real-world activities.
| **Feature** | **Description** | **Performance Impact** | **Documentation Link** |
|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------|-------------------------------|
| Models | Pre-trained models that can be utilized for various tasks within the swarm framework. | ⭐⭐⭐ | [Documentation](https://docs.swarms.world/en/latest/swarms/models/) |
| Models APIs | APIs to interact with and utilize the models effectively, providing interfaces for inference, training, and fine-tuning. | ⭐⭐⭐ | [Documentation](https://docs.swarms.world/en/latest/swarms/models/) |
| Agents with Tools | Agents equipped with specialized tools to perform specific tasks more efficiently, such as data processing, analysis, or interaction with external systems. | ⭐⭐⭐⭐ | [Documentation](https://medium.com/@kyeg/the-swarms-tool-system-functions-pydantic-basemodels-as-tools-and-radical-customization-c2a2e227b8ca) |
| Agents with Memory | Mechanisms for agents to store and recall past interactions, improving learning and adaptability over time. | ⭐⭐⭐⭐ | [Documentation](https://github.com/kyegomez/swarms/blob/master/playground/structs/agent/agent_with_longterm_memory.py) |
| Multi-Agent Orchestration | Coordination of multiple agents to work together seamlessly on complex tasks, leveraging their individual strengths to achieve higher overall performance. | ⭐⭐⭐⭐⭐ | [Documentation]() |
The performance impact is rated on a scale from one to five stars, with multi-agent orchestration being the highest due to its ability to combine the strengths of multiple agents and optimize task execution.
----
## Requirements
- `python3.10` or above!
- `.env` file with API keys from your providers like `OpenAI`, `Anthropic`
- `.env` file with API keys from your providers like `OPENAI_API_KEY`, `ANTHROPIC_API_KEY`
- `$ pip install -U swarms` And, don't forget to install swarms!
## Install 💻
@ -65,7 +56,6 @@ $ pip3 install -U swarms
# Usage Examples 🤖
### Google Collab Example
Run example in Collab: <a target="_blank" href="https://colab.research.google.com/github/kyegomez/swarms/blob/master/playground/swarms_example.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>
@ -85,31 +75,57 @@ Features:
```python
import os
from swarms import Agent, Anthropic
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, OpenAIChat
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5, openai_api_key=api_key, max_tokens=4000
# Initialize the agent
agent = Agent(
agent_name="Accounting Assistant",
system_prompt="You're the accounting agent, your purpose is to generate a profit report for a company!",
agent_description="Generate a profit report for a company!",
llm=Anthropic(
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
),
max_loops="auto",
autosave=True,
# dynamic_temperature_enabled=True,
dashboard=False,
verbose=True,
streaming_on=True,
# interactive=True, # Set to False to disable interactive mode
saved_state_path="accounting_agent.json",
# tools=[
# # calculate_profit,
# # generate_report,
# # search_knowledge_base,
# # write_memory_to_rag,
# # search_knowledge_base,
# # generate_speech,
# ],
stopping_token="Stop!",
interactive=True,
# docs_folder="docs",
# pdf_path="docs/accounting_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
# user_name="User",
# # docs=
# # docs_folder="docs",
# retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=1000,
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
)
agent.run(
"Search the knowledge base for the swarms github framework and how it works"
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, autosave=True, dashboard=True)
# Run the workflow on a task
agent.run("Generate a 10,000 word blog on health and wellness.")
```
-----
### `Agent` + Long Term Memory
`Agent` equipped with quasi-infinite long term memory. Great for long document understanding, analysis, and retrieval.
@ -118,179 +134,7 @@ agent.run("Generate a 10,000 word blog on health and wellness.")
import os
from dotenv import load_dotenv
from swarms import Agent, OpenAIChat
from playground.memory.chromadb_example import ChromaDB
import logging
import os
import uuid
from typing import Optional
import chromadb
from swarms.utils.data_to_text import data_to_text
from swarms.utils.markdown_message import display_markdown_message
from swarms.memory.base_vectordb import BaseVectorDatabase
# Load environment variables
load_dotenv()
# Results storage using local ChromaDB
class ChromaDB(BaseVectorDatabase):
"""
ChromaDB database
Args:
metric (str): The similarity metric to use.
output (str): The name of the collection to store the results in.
limit_tokens (int, optional): The maximum number of tokens to use for the query. Defaults to 1000.
n_results (int, optional): The number of results to retrieve. Defaults to 2.
Methods:
add: _description_
query: _description_
Examples:
>>> chromadb = ChromaDB(
>>> metric="cosine",
>>> output="results",
>>> llm="gpt3",
>>> openai_api_key=OPENAI_API_KEY,
>>> )
>>> chromadb.add(task, result, result_id)
"""
def __init__(
self,
metric: str = "cosine",
output_dir: str = "swarms",
limit_tokens: Optional[int] = 1000,
n_results: int = 3,
docs_folder: str = None,
verbose: bool = False,
*args,
**kwargs,
):
self.metric = metric
self.output_dir = output_dir
self.limit_tokens = limit_tokens
self.n_results = n_results
self.docs_folder = docs_folder
self.verbose = verbose
# Disable ChromaDB logging
if verbose:
logging.getLogger("chromadb").setLevel(logging.INFO)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.PersistentClient(
settings=chromadb.config.Settings(
persist_directory=chroma_persist_dir,
),
*args,
**kwargs,
)
# Create ChromaDB client
self.client = chromadb.Client()
# Create Chroma collection
self.collection = chroma_client.get_or_create_collection(
name=output_dir,
metadata={"hnsw:space": metric},
*args,
**kwargs,
)
display_markdown_message(
"ChromaDB collection created:"
f" {self.collection.name} with metric: {self.metric} and"
f" output directory: {self.output_dir}"
)
# If docs
if docs_folder:
display_markdown_message(
f"Traversing directory: {docs_folder}"
)
self.traverse_directory()
def add(
self,
document: str,
*args,
**kwargs,
):
"""
Add a document to the ChromaDB collection.
Args:
document (str): The document to be added.
condition (bool, optional): The condition to check before adding the document. Defaults to True.
Returns:
str: The ID of the added document.
"""
try:
doc_id = str(uuid.uuid4())
self.collection.add(
ids=[doc_id],
documents=[document],
*args,
**kwargs,
)
print("-----------------")
print("Document added successfully")
print("-----------------")
return doc_id
except Exception as e:
raise Exception(f"Failed to add document: {str(e)}")
def query(
self,
query_text: str,
*args,
**kwargs,
):
"""
Query documents from the ChromaDB collection.
Args:
query (str): The query string.
n_docs (int, optional): The number of documents to retrieve. Defaults to 1.
Returns:
dict: The retrieved documents.
"""
try:
docs = self.collection.query(
query_texts=[query_text],
n_results=self.n_results,
*args,
**kwargs,
)["documents"]
return docs[0]
except Exception as e:
raise Exception(f"Failed to query documents: {str(e)}")
def traverse_directory(self):
"""
Traverse through every file in the given directory and its subdirectories,
and return the paths of all files.
Parameters:
- directory_name (str): The name of the directory to traverse.
Returns:
- list: A list of paths to each file in the directory and its subdirectories.
"""
added_to_db = False
for root, dirs, files in os.walk(self.docs_folder):
for file in files:
file_path = os.path.join(root, file) # Change this line
_, ext = os.path.splitext(file_path)
data = data_to_text(file_path)
added_to_db = self.add(str(data))
print(f"{file_path} added to Database")
return added_to_db
from swarms_memory import ChromaDB
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
@ -313,6 +157,7 @@ llm = OpenAIChat(
## Initialize the workflow
agent = Agent(
llm=llm,
agent_name: str = "WellNess Agent",
name = "Health and Wellness Blog",
system_prompt="Generate a 10,000 word blog on health and wellness.",
max_loops=4,
@ -327,229 +172,120 @@ agent.run("Generate a 10,000 word blog on health and wellness.")
```
-----
### `Agent` ++ Long Term Memory ++ Tools!
An LLM equipped with long term memory and tools, a full stack agent capable of automating all and any digital tasks given a good prompt.
```python
import logging
import os
import uuid
from typing import Optional
import chromadb
from dotenv import load_dotenv
from swarms.utils.data_to_text import data_to_text
from swarms.utils.markdown_message import display_markdown_message
from swarms.memory.base_vectordb import BaseVectorDatabase
from swarms import Agent, OpenAIChat
from swarms_memory import ChromaDB
import subprocess
# Making an instance of the ChromaDB class
memory = ChromaDB(
metric="cosine",
n_results=3,
output_dir="results",
docs_folder="docs",
)
# Load environment variables
load_dotenv()
# Results storage using local ChromaDB
class ChromaDB(BaseVectorDatabase):
# Tools
def terminal(
code: str,
):
"""
ChromaDB database
Run code in the terminal.
Args:
metric (str): The similarity metric to use.
output (str): The name of the collection to store the results in.
limit_tokens (int, optional): The maximum number of tokens to use for the query. Defaults to 1000.
n_results (int, optional): The number of results to retrieve. Defaults to 2.
Methods:
add: _description_
query: _description_
Examples:
>>> chromadb = ChromaDB(
>>> metric="cosine",
>>> output="results",
>>> llm="gpt3",
>>> openai_api_key=OPENAI_API_KEY,
>>> )
>>> chromadb.add(task, result, result_id)
code (str): The code to run in the terminal.
Returns:
str: The output of the code.
"""
out = subprocess.run(
code, shell=True, capture_output=True, text=True
).stdout
return str(out)
def __init__(
self,
metric: str = "cosine",
output_dir: str = "swarms",
limit_tokens: Optional[int] = 1000,
n_results: int = 3,
docs_folder: str = None,
verbose: bool = False,
*args,
**kwargs,
):
self.metric = metric
self.output_dir = output_dir
self.limit_tokens = limit_tokens
self.n_results = n_results
self.docs_folder = docs_folder
self.verbose = verbose
# Disable ChromaDB logging
if verbose:
logging.getLogger("chromadb").setLevel(logging.INFO)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.PersistentClient(
settings=chromadb.config.Settings(
persist_directory=chroma_persist_dir,
),
*args,
**kwargs,
)
def browser(query: str):
"""
Search the query in the browser with the `browser` tool.
# Create ChromaDB client
self.client = chromadb.Client()
Args:
query (str): The query to search in the browser.
# Create Chroma collection
self.collection = chroma_client.get_or_create_collection(
name=output_dir,
metadata={"hnsw:space": metric},
*args,
**kwargs,
)
display_markdown_message(
"ChromaDB collection created:"
f" {self.collection.name} with metric: {self.metric} and"
f" output directory: {self.output_dir}"
)
Returns:
str: The search results.
"""
import webbrowser
# If docs
if docs_folder:
display_markdown_message(
f"Traversing directory: {docs_folder}"
)
self.traverse_directory()
url = f"https://www.google.com/search?q={query}"
webbrowser.open(url)
return f"Searching for {query} in the browser."
def add(
self,
document: str,
*args,
**kwargs,
):
def create_file(file_path: str, content: str):
"""
Add a document to the ChromaDB collection.
Create a file using the file editor tool.
Args:
document (str): The document to be added.
condition (bool, optional): The condition to check before adding the document. Defaults to True.
file_path (str): The path to the file.
content (str): The content to write to the file.
Returns:
str: The ID of the added document.
str: The result of the file creation operation.
"""
try:
doc_id = str(uuid.uuid4())
self.collection.add(
ids=[doc_id],
documents=[document],
*args,
**kwargs,
)
print("-----------------")
print("Document added successfully")
print("-----------------")
return doc_id
except Exception as e:
raise Exception(f"Failed to add document: {str(e)}")
def query(
self,
query_text: str,
*args,
**kwargs,
):
with open(file_path, "w") as file:
file.write(content)
return f"File {file_path} created successfully."
def file_editor(file_path: str, mode: str, content: str):
"""
Query documents from the ChromaDB collection.
Edit a file using the file editor tool.
Args:
query (str): The query string.
n_docs (int, optional): The number of documents to retrieve. Defaults to 1.
file_path (str): The path to the file.
mode (str): The mode to open the file in.
content (str): The content to write to the file.
Returns:
dict: The retrieved documents.
"""
try:
docs = self.collection.query(
query_texts=[query_text],
n_results=self.n_results,
*args,
**kwargs,
)["documents"]
return docs[0]
except Exception as e:
raise Exception(f"Failed to query documents: {str(e)}")
def traverse_directory(self):
"""
Traverse through every file in the given directory and its subdirectories,
and return the paths of all files.
Parameters:
- directory_name (str): The name of the directory to traverse.
Returns:
- list: A list of paths to each file in the directory and its subdirectories.
str: The result of the file editing operation.
"""
added_to_db = False
for root, dirs, files in os.walk(self.docs_folder):
for file in files:
file_path = os.path.join(root, file) # Change this line
_, ext = os.path.splitext(file_path)
data = data_to_text(file_path)
added_to_db = self.add(str(data))
print(f"{file_path} added to Database")
return added_to_db
# Making an instance of the ChromaDB class
memory = ChromaDB(
metric="cosine",
n_results=3,
output_dir="results",
docs_folder="docs",
)
with open(file_path, mode) as file:
file.write(content)
return f"File {file_path} edited successfully."
# Initialize a tool
def search_api(query: str):
# Add your logic here
return query
# Initializing the agent with the Gemini instance and other parameters
# Agent
agent = Agent(
agent_name="Covid-19-Chat",
agent_description=(
"This agent provides information about COVID-19 symptoms."
agent_name="Devin",
system_prompt=(
"Autonomous agent that can interact with humans and other"
" agents. Be Helpful and Kind. Use the tools provided to"
" assist the user. Return all code in markdown format."
),
llm=OpenAIChat(),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
# streaming=True,
long_term_memory=memory,
stopping_condition="finish",
tools=[search_api],
)
# Defining the task and image path
task = ("What are the symptoms of COVID-19?",)
# Running the agent with the specified task and image
out = agent.run(task)
# Run the agent
out = agent("Create a new file for a plan to take over the world.")
print(out)
```
----
### Devin
Implementation of Devin in less than 90 lines of code with several tools:
@ -655,7 +391,7 @@ agent = Agent(
out = agent("Create a new file for a plan to take over the world.")
print(out)
```
---------------
### `Agent`with Pydantic BaseModel as Output Type
The following is an example of an agent that intakes a pydantic basemodel and outputs it at the same time:
@ -718,6 +454,7 @@ print(f"Generated data: {generated_data}")
```
-----
### Multi Modal Autonomous Agent
Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health.
@ -818,7 +555,7 @@ generated_data = agent.run(task)
print(f"Generated data: {generated_data}")
```
----------------
### `Task`
For deeper control of your agent stack, `Task` is a simple structure for task execution with the `Agent`. Imagine zapier like LLM-based workflow automation.
@ -911,14 +648,7 @@ In traditional swarm theory, there are many types of swarms usually for very spe
### `SequentialWorkflow`
Sequential Workflow enables you to sequentially execute tasks with `Agent` and then pass the output into the next agent and onwards until you have specified your max loops. `SequentialWorkflow` is wonderful for real-world business tasks like sending emails, summarizing documents, and analyzing data.
✅ Save and Restore Workflow states!
✅ Multi-Modal Support for Visual Chaining
✅ Utilizes Agent class
Sequential Workflow enables you to sequentially execute tasks with `Agent` and then pass the output into the next agent and onwards until you have specified your max loops.
```python
from swarms import Agent, SequentialWorkflow, Anthropic
@ -957,259 +687,13 @@ workflow.run(
```
### `ConcurrentWorkflow`
`ConcurrentWorkflow` runs all the tasks all at the same time with the inputs you give it!
```python
import os
from dotenv import load_dotenv
from swarms import Agent, ConcurrentWorkflow, OpenAIChat, Task
# Load environment variables from .env file
load_dotenv()
# Load environment variables
llm = OpenAIChat(openai_api_key=os.getenv("OPENAI_API_KEY"))
agent = Agent(llm=llm, max_loops=1)
# Create a workflow
workflow = ConcurrentWorkflow(max_workers=5)
# Create tasks
task1 = Task(agent, "What's the weather in miami")
task2 = Task(agent, "What's the weather in new york")
task3 = Task(agent, "What's the weather in london")
# Add tasks to the workflow
workflow.add(tasks=[task1, task2, task3])
# Run the workflow
workflow.run()
```
### `SwarmNetwork`
`SwarmNetwork` provides the infrasturcture for building extremely dense and complex multi-agent applications that span across various types of agents.
✅ Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion.
✅ Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents.
✅ Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure.
```python
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, OpenAIChat, SwarmNetwork
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
# Load the swarmnet with the agents
swarmnet = SwarmNetwork(
agents=[agent, agent2, agent3],
)
# List the agents in the swarm network
out = swarmnet.list_agents()
print(out)
# Run the workflow on a task
out = swarmnet.run_single_agent(
agent2.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)
# Run all the agents in the swarm network on a task
out = swarmnet.run_many_agents("Generate a 10,000 word blog on health and wellness.")
print(out)
```
### Majority Voting
Multiple-agents will evaluate an idea based off of an parsing or evaluation function. From papers like "[More agents is all you need](https://arxiv.org/pdf/2402.05120.pdf)
```python
from swarms import Agent, MajorityVoting, ChromaDB, Anthropic
# Initialize the llm
llm = Anthropic()
# Agents
agent1 = Agent(
llm = llm,
system_prompt="You are the leader of the Progressive Party. What is your stance on healthcare?",
agent_name="Progressive Leader",
agent_description="Leader of the Progressive Party",
long_term_memory=ChromaDB(),
max_steps=1,
)
agent2 = Agent(
llm=llm,
agent_name="Conservative Leader",
agent_description="Leader of the Conservative Party",
long_term_memory=ChromaDB(),
max_steps=1,
)
agent3 = Agent(
llm=llm,
agent_name="Libertarian Leader",
agent_description="Leader of the Libertarian Party",
long_term_memory=ChromaDB(),
max_steps=1,
)
# Initialize the majority voting
mv = MajorityVoting(
agents=[agent1, agent2, agent3],
output_parser=llm.majority_voting,
autosave=False,
verbose=True,
)
# Start the majority voting
mv.run("What is your stance on healthcare?")
```
## Build your own LLMs, Agents, and Swarms!
### Swarms Compliant Model Interface
```python
from swarms import BaseLLM
class vLLMLM(BaseLLM):
def __init__(self, model_name='default_model', tensor_parallel_size=1, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model_name = model_name
self.tensor_parallel_size = tensor_parallel_size
# Add any additional initialization here
def run(self, task: str):
pass
# Example
model = vLLMLM("mistral")
# Run the model
out = model("Analyze these financial documents and summarize of them")
print(out)
```
### Swarms Compliant Agent Interface
```python
from swarms import Agent
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom initialization logic
    def custom_method(self, *args, **kwargs):
        # Implement custom logic here
        pass
    def run(self, task, *args, **kwargs):
        # Customize the run method
        response = super().run(task, *args, **kwargs)
        # Additional custom logic
        return response`
# Model
agent = MyCustomAgent()
# Run the agent
out = agent("Analyze and summarize these financial documents: ")
print(out)
```
### Compliant Interface for Multi-Agent Collaboration
```python
from swarms import AutoSwarm, AutoSwarmRouter, BaseSwarm
# Build your own Swarm
class MySwarm(BaseSwarm):
def __init__(self, name="kyegomez/myswarm", *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
def run(self, task: str, *args, **kwargs):
# Add your multi-agent logic here
# agent 1
# agent 2
# agent 3
return "output of the swarm"
# Add your custom swarm to the AutoSwarmRouter
router = AutoSwarmRouter(
swarms=[MySwarm]
)
# Create an AutoSwarm instance
autoswarm = AutoSwarm(
name="kyegomez/myswarm",
description="A simple API to build and run swarms",
verbose=True,
router=router,
)
# Run the AutoSwarm
autoswarm.run("Analyze these financial data and give me a summary")
```
------
## `AgentRearrange`
Inspired by Einops and einsum, this orchestration techniques enables you to map out the relationships between various agents. For example you specify linear and sequential relationships like `a -> a1 -> a2 -> a3` or concurrent relationships where the first agent will send a message to 3 agents all at once: `a -> a1, a2, a3`. You can customize your workflow to mix sequential and concurrent relationships. [Docs Available:](https://swarms.apac.ai/en/latest/swarms/structs/agent_rearrange/)
```python
from swarms import Agent, AgentRearrange, rearrange, Anthropic
from swarms import Agent, AgentRearrange, Anthropic
# Initialize the director agent
@ -1272,26 +756,11 @@ output = agent_system.run(
)
print(output)
# Using rearrange function
output = rearrange(
agents,
flow,
"Create a format to express and communicate swarms of llms in a structured manner for youtube",
)
print(output)
```
## `HierarhicalSwarm`
Coming soon...
## `AgentLoadBalancer`
Coming soon...
## `GraphSwarm`
Coming soon...
@ -1361,12 +830,12 @@ out = swarm.run("Prepare financial statements and audit financial records")
print(out)
```
----------
## Onboarding Session
Get onboarded now with the creator and lead maintainer of Swarms, Kye Gomez, who will show you how to get started with the installation, usage examples, and starting to build your custom use case! [CLICK HERE](https://cal.com/swarms/swarms-onboarding-session)
---
## Documentation
@ -1374,6 +843,13 @@ Documentation is located here at: [docs.swarms.world](https://docs.swarms.world)
----
## Docker Instructions
- [Learn More Here About Deployments In Docker](https://swarms.apac.ai/en/latest/docker_setup/)
-----
## Folder Structure
The swarms package has been meticlously crafted for extreme use-ability and understanding, the swarms package is split up into various modules such as `swarms.agents` that holds pre-built agents, `swarms.structs` that holds a vast array of structures like `Agent` and multi agent structures. The 3 most important are `structs`, `models`, and `agents`.
@ -1406,19 +882,12 @@ Swarms is an open-source project, and contributions are VERY welcome. If you wan
----
## Community
Join our growing community around the world, for real-time support, ideas, and discussions on Swarms 😊
- View our official [Blog](https://swarms.apac.ai)
- Chat live with us on [Discord](https://discord.gg/kS3rwKs3ZC)
- Follow us on [Twitter](https://twitter.com/kyegomez)
- Connect with us on [LinkedIn](https://www.linkedin.com/company/the-swarm-corporation)
- Visit us on [YouTube](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ)
- [Join the Swarms community on Discord!](https://discord.gg/AJazBmhKnr)
- Join our Swarms Community Gathering every Thursday at 1pm NYC Time to unlock the potential of autonomous agents in automating your daily tasks [Sign up here](https://lu.ma/5p2jnc2v)
## Swarm Newsletter 🤖 🤖 🤖 📧
Sign up to the Swarm newsletter to receive updates on the latest Autonomous agent research papers, step by step guides on creating multi-agent app, and much more Swarmie goodiness 😊
---
[CLICK HERE TO SIGNUP](https://docs.google.com/forms/d/e/1FAIpQLSfqxI2ktPR9jkcIwzvHL0VY6tEIuVPd-P2fOWKnd6skT9j1EQ/viewform?usp=sf_link)
## Discovery Call
Book a discovery call to learn how Swarms can lower your operating costs by 40% with swarms of autonomous agents in lightspeed. [Click here to book a time that works for you!](https://calendly.com/swarm-corp/30min?month=2023-11)
@ -1429,15 +898,19 @@ Accelerate Bugs, Features, and Demos to implement by supporting us here:
<a href="https://polar.sh/kyegomez"><img src="https://polar.sh/embed/fund-our-backlog.svg?org=kyegomez" /></a>
## Community
## Docker Instructions
- [Learn More Here About Deployments In Docker](https://swarms.apac.ai/en/latest/docker_setup/)
Join our growing community around the world, for real-time support, ideas, and discussions on Swarms 😊
## Swarm Newsletter 🤖 🤖 🤖 📧
Sign up to the Swarm newsletter to receive updates on the latest Autonomous agent research papers, step by step guides on creating multi-agent app, and much more Swarmie goodiness 😊
- View our official [Blog](https://swarms.apac.ai)
- Chat live with us on [Discord](https://discord.gg/kS3rwKs3ZC)
- Follow us on [Twitter](https://twitter.com/kyegomez)
- Connect with us on [LinkedIn](https://www.linkedin.com/company/the-swarm-corporation)
- Visit us on [YouTube](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ)
- [Join the Swarms community on Discord!](https://discord.gg/AJazBmhKnr)
- Join our Swarms Community Gathering every Thursday at 1pm NYC Time to unlock the potential of autonomous agents in automating your daily tasks [Sign up here](https://lu.ma/5p2jnc2v)
[CLICK HERE TO SIGNUP](https://docs.google.com/forms/d/e/1FAIpQLSfqxI2ktPR9jkcIwzvHL0VY6tEIuVPd-P2fOWKnd6skT9j1EQ/viewform?usp=sf_link)
---
# License
Apache License

@ -1,6 +1,6 @@
## Building Analyst Agents with Swarms to write Business Reports
> Jupyter Notebook accompanying this post is accessible at: [Business Analyst Agent Notebook](https://github.com/kyegomez/swarms/blob/master/playground/business-analyst-agent.ipynb)
> Jupyter Notebook accompanying this post is accessible at: [Business Analyst Agent Notebook](https://github.com/kyegomez/swarms/blob/master/playground/demos/business_analysis_swarm/business-analyst-agent.ipynb)
Solving a business problem often involves preparing a Business Case Report. This report comprehensively analyzes the problem, evaluates potential solutions, and provides evidence-based recommendations and an implementation plan to effectively address the issue and drive business value. While the process of preparing one requires an experienced business analyst, the workflow can be augmented using AI agents. Two candidates stick out as areas to work on:

@ -167,6 +167,14 @@ nav:
- Getting Started with SOTA Vision Language Models VLM: "swarms_cloud/getting_started.md"
- Enterprise Guide to High-Performance Multi-Agent LLM Deployments: "swarms_cloud/production_deployment.md"
- Under The Hood The Swarm Cloud Serving Infrastructure: "swarms_cloud/architecture.md"
- Swarms Memory:
- Overview: "swarms_memory/index.md"
- Memory Systems:
- ChromaDB: "swarms_memory/chromadb.md"
- Pinecone: "swarms_memory/pinecone.md"
# - Redis: "swarms_memory/redis.md"
- Faiss: "swarms_memory/faiss.md"
# - HNSW: "swarms_memory/hnsw.md"
- References:
- Agent Glossary: "swarms/glossary.md"
- List of The Best Multi-Agent Papers: "swarms/papers.md"

@ -143,6 +143,90 @@ Finds an agent by its name.
agent = registry.find_agent_by_name("Agent1")
```
### Full Example
```python
from swarms.structs.agent_registry import AgentRegistry
from swarms import Agent, OpenAIChat, Anthropic
# Initialize the agents
growth_agent1 = Agent(
agent_name="Marketing Specialist",
system_prompt="You're the marketing specialist, your purpose is to help companies grow by improving their marketing strategies!",
agent_description="Improve a company's marketing strategies!",
llm=OpenAIChat(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="marketing_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
growth_agent2 = Agent(
agent_name="Sales Specialist",
system_prompt="You're the sales specialist, your purpose is to help companies grow by improving their sales strategies!",
agent_description="Improve a company's sales strategies!",
llm=Anthropic(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="sales_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
growth_agent3 = Agent(
agent_name="Product Development Specialist",
system_prompt="You're the product development specialist, your purpose is to help companies grow by improving their product development strategies!",
agent_description="Improve a company's product development strategies!",
llm=Anthropic(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="product_development_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
growth_agent4 = Agent(
agent_name="Customer Service Specialist",
system_prompt="You're the customer service specialist, your purpose is to help companies grow by improving their customer service strategies!",
agent_description="Improve a company's customer service strategies!",
llm=OpenAIChat(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="customer_service_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
# Register the agents\
registry = AgentRegistry()
# Register the agents
registry.add("Marketing Specialist", growth_agent1)
registry.add("Sales Specialist", growth_agent2)
registry.add("Product Development Specialist", growth_agent3)
registry.add("Customer Service Specialist", growth_agent4)
```
## Logging and Error Handling
Each method in the `AgentRegistry` class includes logging to track the execution flow and captures errors to provide detailed information in case of failures. This is crucial for debugging and ensuring smooth operation of the registry. The `report_error` function is used for reporting exceptions that occur during method execution.

@ -72,7 +72,7 @@ agent.run("Generate a 10,000 word blog on health and wellness.")
```python
from swarms import Agent, OpenAIChat
from playground.memory.chromadb_example import ChromaDB # Copy and paste the code and put it in your own local directory.
from swarms_memory import ChromaDB # Copy and paste the code and put it in your own local directory.
# Making an instance of the ChromaDB class
memory = ChromaDB(

@ -0,0 +1,141 @@
# ChromaDB Documentation
ChromaDB is a specialized module designed to facilitate the storage and retrieval of documents using the ChromaDB system. It offers functionalities for adding documents to a local ChromaDB collection and querying this collection based on provided query texts. This module integrates with the ChromaDB client to create and manage collections, leveraging various configurations for optimizing the storage and retrieval processes.
#### Parameters
| Parameter | Type | Default | Description |
|----------------|-------------------|----------|-------------------------------------------------------------|
| `metric` | `str` | `"cosine"`| The similarity metric to use for the collection. |
| `output_dir` | `str` | `"swarms"`| The name of the collection to store the results in. |
| `limit_tokens` | `Optional[int]` | `1000` | The maximum number of tokens to use for the query. |
| `n_results` | `int` | `1` | The number of results to retrieve. |
| `docs_folder` | `Optional[str]` | `None` | The folder containing documents to be added to the collection.|
| `verbose` | `bool` | `False` | Flag to enable verbose logging for debugging. |
| `*args` | `tuple` | `()` | Additional positional arguments. |
| `**kwargs` | `dict` | `{}` | Additional keyword arguments. |
#### Methods
| Method | Description |
|-----------------------|----------------------------------------------------------|
| `__init__` | Initializes the ChromaDB instance with specified parameters. |
| `add` | Adds a document to the ChromaDB collection. |
| `query` | Queries documents from the ChromaDB collection based on the query text. |
| `traverse_directory` | Traverses the specified directory to add documents to the collection. |
## Usage
```python
from swarms_memory import ChromaDB
chromadb = ChromaDB(
metric="cosine",
output_dir="results",
limit_tokens=1000,
n_results=2,
docs_folder="path/to/docs",
verbose=True,
)
```
### Adding Documents
The `add` method allows you to add a document to the ChromaDB collection. It generates a unique ID for each document and adds it to the collection.
#### Parameters
| Parameter | Type | Default | Description |
|---------------|--------|---------|---------------------------------------------|
| `document` | `str` | - | The document to be added to the collection. |
| `*args` | `tuple`| `()` | Additional positional arguments. |
| `**kwargs` | `dict` | `{}` | Additional keyword arguments. |
#### Returns
| Type | Description |
|-------|--------------------------------------|
| `str` | The ID of the added document. |
#### Example
```python
task = "example_task"
result = "example_result"
result_id = chromadb.add(document="This is a sample document.")
print(f"Document ID: {result_id}")
```
### Querying Documents
The `query` method allows you to retrieve documents from the ChromaDB collection based on the provided query text.
#### Parameters
| Parameter | Type | Default | Description |
|-------------|--------|---------|----------------------------------------|
| `query_text`| `str` | - | The query string to search for. |
| `*args` | `tuple`| `()` | Additional positional arguments. |
| `**kwargs` | `dict` | `{}` | Additional keyword arguments. |
#### Returns
| Type | Description |
|-------|--------------------------------------|
| `str` | The retrieved documents as a string. |
#### Example
```python
query_text = "search term"
results = chromadb.query(query_text=query_text)
print(f"Retrieved Documents: {results}")
```
### Traversing Directory
The `traverse_directory` method traverses through every file in the specified directory and its subdirectories, adding the contents of each file to the ChromaDB collection.
#### Example
```python
chromadb.traverse_directory()
```
## Additional Information and Tips
### Verbose Logging
Enable the `verbose` flag during initialization to get detailed logs of the operations, which is useful for debugging.
```python
chromadb = ChromaDB(verbose=True)
```
### Handling Large Documents
When dealing with large documents, consider using the `limit_tokens` parameter to restrict the number of tokens processed in a single query.
```python
chromadb = ChromaDB(limit_tokens=500)
```
### Optimizing Query Performance
Use the appropriate similarity metric (`metric` parameter) that suits your use case for optimal query performance.
```python
chromadb = ChromaDB(metric="euclidean")
```
## References and Resources
- [ChromaDB Documentation](https://chromadb.io/docs)
- [Python UUID Module](https://docs.python.org/3/library/uuid.html)
- [Python os Module](https://docs.python.org/3/library/os.html)
- [Python logging Module](https://docs.python.org/3/library/logging.html)
- [dotenv Package](https://pypi.org/project/python-dotenv/)
By following this documentation, users can effectively utilize the ChromaDB module for managing document storage and retrieval in their applications.

@ -0,0 +1,172 @@
# Announcing the Release of Swarms-Memory Package: Your Gateway to Efficient RAG Systems
We are thrilled to announce the release of the Swarms-Memory package, a powerful and easy-to-use toolkit designed to facilitate the implementation of Retrieval-Augmented Generation (RAG) systems. Whether you're a seasoned AI practitioner or just starting out, Swarms-Memory provides the tools you need to integrate high-performance, reliable RAG systems into your applications seamlessly.
In this blog post, we'll walk you through getting started with the Swarms-Memory package, covering installation, usage examples, and a detailed overview of supported RAG systems like Pinecone and ChromaDB. Let's dive in!
## What is Swarms-Memory?
Swarms-Memory is a Python package that simplifies the integration of advanced RAG systems into your projects. It supports multiple databases optimized for AI tasks, providing you with the flexibility to choose the best system for your needs. With Swarms-Memory, you can effortlessly handle large-scale AI tasks, vector searches, and more.
### Key Features
- **Easy Integration**: Quickly set up and start using powerful RAG systems.
- **Customizable**: Define custom embedding, preprocessing, and postprocessing functions.
- **Flexible**: Supports multiple RAG systems like ChromaDB and Pinecone, with more coming soon.
- **Scalable**: Designed to handle large-scale AI tasks efficiently.
## Supported RAG Systems
Here's an overview of the RAG systems currently supported by Swarms-Memory:
| RAG System | Status | Description | Documentation | Website |
|------------|--------------|------------------------------------------------------------------------------------------|---------------------------|-----------------|
| ChromaDB | Available | A high-performance, distributed database optimized for handling large-scale AI tasks. | [ChromaDB Documentation](https://chromadb.com/docs) | [ChromaDB](https://chromadb.com) |
| Pinecone | Available | A fully managed vector database for adding vector search to your applications. | [Pinecone Documentation](https://pinecone.io/docs) | [Pinecone](https://pinecone.io) |
| Redis | Coming Soon | An open-source, in-memory data structure store, used as a database, cache, and broker. | [Redis Documentation](https://redis.io/documentation) | [Redis](https://redis.io) |
| Faiss | Coming Soon | A library for efficient similarity search and clustering of dense vectors by Facebook AI. | [Faiss Documentation](https://faiss.ai) | [Faiss](https://faiss.ai) |
| HNSW | Coming Soon | A graph-based algorithm for approximate nearest neighbor search, known for speed. | [HNSW Documentation](https://hnswlib.github.io/hnswlib) | [HNSW](https://hnswlib.github.io/hnswlib) |
## Getting Started
### Requirements
Before you begin, ensure you have the following:
- Python 3.10
- `.env` file with your respective API keys (e.g., `PINECONE_API_KEY`)
### Installation
You can install the Swarms-Memory package using pip:
```bash
$ pip install swarms-memory
```
### Usage Examples
#### Pinecone
Here's a step-by-step guide on how to use Pinecone with Swarms-Memory:
1. **Import Required Libraries**:
```python
from typing import List, Dict, Any
from swarms_memory import PineconeMemory
```
2. **Define Custom Functions**:
```python
from transformers import AutoTokenizer, AutoModel
import torch
# Custom embedding function using a HuggingFace model
def custom_embedding_function(text: str) -> List[float]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().tolist()
return embeddings
# Custom preprocessing function
def custom_preprocess(text: str) -> str:
return text.lower().strip()
# Custom postprocessing function
def custom_postprocess(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
for result in results:
result["custom_score"] = result["score"] * 2 # Example modification
return results
```
3. **Initialize the Wrapper with Custom Functions**:
```python
wrapper = PineconeMemory(
api_key="your-api-key",
environment="your-environment",
index_name="your-index-name",
embedding_function=custom_embedding_function,
preprocess_function=custom_preprocess,
postprocess_function=custom_postprocess,
logger_config={
"handlers": [
{"sink": "custom_rag_wrapper.log", "rotation": "1 GB"},
{"sink": lambda msg: print(f"Custom log: {msg}", end="")},
],
},
)
```
4. **Add Documents and Query**:
```python
# Adding documents
wrapper.add("This is a sample document about artificial intelligence.", {"category": "AI"})
wrapper.add("Python is a popular programming language for data science.", {"category": "Programming"})
# Querying
results = wrapper.query("What is AI?", filter={"category": "AI"})
for result in results:
print(f"Score: {result['score']}, Custom Score: {result['custom_score']}, Text: {result['metadata']['text']}")
```
#### ChromaDB
Using ChromaDB with Swarms-Memory is straightforward. Heres how:
1. **Import ChromaDB**:
```python
from swarms_memory import ChromaDB
```
2. **Initialize ChromaDB**:
```python
chromadb = ChromaDB(
metric="cosine",
output_dir="results",
limit_tokens=1000,
n_results=2,
docs_folder="path/to/docs",
verbose=True,
)
```
3. **Add and Query Documents**:
```python
# Add a document
doc_id = chromadb.add("This is a test document.")
# Query the document
result = chromadb.query("This is a test query.")
# Traverse a directory
chromadb.traverse_directory()
# Display the result
print(result)
```
## Join the Community
We're excited to see how you leverage Swarms-Memory in your projects! Join our community on Discord to share your experiences, ask questions, and stay updated on the latest developments.
- **🐦 Twitter**: [Follow us on Twitter](https://twitter.com/swarms_platform)
- **📢 Discord**: [Join the Agora Discord](https://discord.gg/agora)
- **Swarms Platform**: [Visit our website](https://swarms.ai)
- **📙 Documentation**: [Read the Docs](https://docs.swarms.ai)
## Conclusion
The Swarms-Memory package brings a new level of ease and efficiency to building and managing RAG systems. With support for leading databases like ChromaDB and Pinecone, it's never been easier to integrate powerful, scalable AI solutions into your projects. We can't wait to see what you'll create with Swarms-Memory!
For more detailed usage examples and documentation, visit our [GitHub repository](https://github.com/swarms-ai/swarms-memory) and start exploring today!

@ -0,0 +1,179 @@
# PineconeMemory Documentation
The `PineconeMemory` class provides a robust interface for integrating Pinecone-based Retrieval-Augmented Generation (RAG) systems. It allows for adding documents to a Pinecone index and querying the index for similar documents. The class supports custom embedding models, preprocessing functions, and other customizations to suit different use cases.
#### Parameters
| Parameter | Type | Default | Description |
|----------------------|-----------------------------------------------|-----------------------------------|------------------------------------------------------------------------------------------------------|
| `api_key` | `str` | - | Pinecone API key. |
| `environment` | `str` | - | Pinecone environment. |
| `index_name` | `str` | - | Name of the Pinecone index to use. |
| `dimension` | `int` | `768` | Dimension of the document embeddings. |
| `embedding_model` | `Optional[Any]` | `None` | Custom embedding model. Defaults to `SentenceTransformer('all-MiniLM-L6-v2')`. |
| `embedding_function` | `Optional[Callable[[str], List[float]]]` | `None` | Custom embedding function. Defaults to `_default_embedding_function`. |
| `preprocess_function`| `Optional[Callable[[str], str]]` | `None` | Custom preprocessing function. Defaults to `_default_preprocess_function`. |
| `postprocess_function`| `Optional[Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]]]`| `None` | Custom postprocessing function. Defaults to `_default_postprocess_function`. |
| `metric` | `str` | `'cosine'` | Distance metric for Pinecone index. |
| `pod_type` | `str` | `'p1'` | Pinecone pod type. |
| `namespace` | `str` | `''` | Pinecone namespace. |
| `logger_config` | `Optional[Dict[str, Any]]` | `None` | Configuration for the logger. Defaults to logging to `rag_wrapper.log` and console output. |
### Methods
#### `_setup_logger`
```python
def _setup_logger(self, config: Optional[Dict[str, Any]] = None)
```
Sets up the logger with the given configuration.
#### `_default_embedding_function`
```python
def _default_embedding_function(self, text: str) -> List[float]
```
Generates embeddings using the default SentenceTransformer model.
#### `_default_preprocess_function`
```python
def _default_preprocess_function(self, text: str) -> str
```
Preprocesses the input text by stripping whitespace.
#### `_default_postprocess_function`
```python
def _default_postprocess_function(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]
```
Postprocesses the query results.
#### `add`
Adds a document to the Pinecone index.
| Parameter | Type | Default | Description |
|-----------|-----------------------|---------|-----------------------------------------------|
| `doc` | `str` | - | The document to be added. |
| `metadata`| `Optional[Dict[str, Any]]` | `None` | Additional metadata for the document. |
#### `query`
Queries the Pinecone index for similar documents.
| Parameter | Type | Default | Description |
|-----------|-------------------------|---------|-----------------------------------------------|
| `query` | `str` | - | The query string. |
| `top_k` | `int` | `5` | The number of top results to return. |
| `filter` | `Optional[Dict[str, Any]]` | `None` | Metadata filter for the query. |
## Usage
The `PineconeMemory` class is initialized with the necessary parameters to configure Pinecone and the embedding model. It supports a variety of custom configurations to suit different needs.
#### Example
```python
from swarms_memory import PineconeMemory
# Initialize PineconeMemory
memory = PineconeMemory(
api_key="your-api-key",
environment="us-west1-gcp",
index_name="example-index",
dimension=768
)
```
### Adding Documents
Documents can be added to the Pinecone index using the `add` method. The method accepts a document string and optional metadata.
#### Example
```python
doc = "This is a sample document to be added to the Pinecone index."
metadata = {"author": "John Doe", "date": "2024-07-08"}
memory.add(doc, metadata)
```
### Querying Documents
The `query` method allows for querying the Pinecone index for similar documents based on a query string. It returns the top `k` most similar documents.
#### Example
```python
query = "Sample query to find similar documents."
results = memory.query(query, top_k=5)
for result in results:
print(result)
```
## Additional Information and Tips
### Custom Embedding and Preprocessing Functions
Custom embedding and preprocessing functions can be provided during initialization to tailor the document processing to specific requirements.
#### Example
```python
def custom_embedding_function(text: str) -> List[float]:
# Custom embedding logic
return [0.1, 0.2, 0.3]
def custom_preprocess_function(text: str) -> str:
# Custom preprocessing logic
return text.lower()
memory = PineconeMemory(
api_key="your-api-key",
environment="us-west1-gcp",
index_name="example-index",
embedding_function=custom_embedding_function,
preprocess_function=custom_preprocess_function
)
```
### Logger Configuration
The logger can be configured to suit different logging needs. The default configuration logs to a file and the console.
#### Example
```python
logger_config = {
"handlers": [
{"sink": "custom_log.log", "rotation": "1 MB"},
{"sink": lambda msg: print(msg, end="")},
]
}
memory = PineconeMemory(
api_key="your-api-key",
environment="us-west1-gcp",
index_name="example-index",
logger_config=logger_config
)
```
## References and Resources
- [Pinecone Documentation](https://docs.pinecone.io/)
- [SentenceTransformers Documentation](https://www.sbert.net/)
- [Loguru Documentation](https://loguru.readthedocs.io/en/stable/)
For further exploration and examples, refer to the official documentation and resources provided by Pinecone, SentenceTransformers, and Loguru.
This concludes the detailed documentation for the `PineconeMemory` class. The class offers a flexible and powerful interface for leveraging Pinecone's capabilities in retrieval-augmented generation systems. By supporting custom embeddings, preprocessing, and postprocessing functions, it can be tailored to a wide range of applications.

@ -1,62 +1,4 @@
from swarms import Agent
from langchain_community.llms.anthropic import Anthropic
def calculate_profit(revenue: float, expenses: float):
"""
Calculates the profit by subtracting expenses from revenue.
Args:
revenue (float): The total revenue.
expenses (float): The total expenses.
Returns:
float: The calculated profit.
"""
return revenue - expenses
def generate_report(company_name: str, profit: float):
"""
Generates a report for a company's profit.
Args:
company_name (str): The name of the company.
profit (float): The calculated profit.
Returns:
str: The report for the company's profit.
"""
return f"The profit for {company_name} is ${profit}."
EMAIL_DETECT_APPOINT = """
if the user gives you an email address, then call the appointment function to schedule a meeting with the user.
SCHEMA OF THE FUNCTION:
"""
def write_memory_to_rag(memory_name: str, memory: str):
"""
Writes the memory to the RAG model for fine-tuning.
Args:
memory_name (str): The name of the memory.
memory (str): The memory to be written to the RAG model.
"""
# Write the memory to the RAG model for fine-tuning
from playground.memory.chromadb_example import ChromaDB
db = ChromaDB(output_dir=memory_name)
db.add(memory)
return None
from swarms import Agent, Anthropic
# Initialize the agent
agent = Agent(
@ -66,7 +8,6 @@ agent = Agent(
llm=Anthropic(),
max_loops="auto",
autosave=True,
sop_list=[EMAIL_DETECT_APPOINT],
# dynamic_temperature_enabled=True,
dashboard=False,
verbose=True,

@ -1,5 +1,5 @@
from swarms import Agent, OpenAIChat
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
from swarms.models.tiktoken_wrapper import TikTokenizer
# Initialize the agent

@ -4,7 +4,7 @@ from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, OpenAIChat
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
# Load the environment variables
load_dotenv()

@ -2,7 +2,7 @@ import os
from dotenv import load_dotenv
from swarms.structs import Agent, OpenAIChat, Task
from swarms import Agent, Task, OpenAIChat
# Load the environment variables
load_dotenv()

@ -1,6 +1,6 @@
from swarms import Agent
from swarms.models.llama3_hosted import llama3Hosted
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
from swarms.tools.prebuilt.bing_api import fetch_web_articles_bing_api
# Define the research system prompt

@ -10,7 +10,7 @@ $ pip install swarms
"""
from swarms import Agent, OpenAIChat
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
from swarms.tools.prebuilt.bing_api import fetch_web_articles_bing_api
import os
from dotenv import load_dotenv

@ -1,80 +0,0 @@
from swarms import Agent, Anthropic, tool
# Model
llm = Anthropic(
temperature=0.1,
)
# Tools
@tool
def text_to_video(task: str):
"""
Converts a given text task into an animated video.
Args:
task (str): The text task to be converted into a video.
Returns:
str: The path to the exported GIF file.
"""
import torch
from diffusers import (
AnimateDiffPipeline,
MotionAdapter,
EulerDiscreteScheduler,
)
from diffusers.utils import export_to_gif
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
device = "cuda"
dtype = torch.float16
step = 4 # Options: [1,2,4,8]
repo = "ByteDance/AnimateDiff-Lightning"
ckpt = f"animatediff_lightning_{step}step_diffusers.safetensors"
base = "emilianJR/epiCRealism" # Choose to your favorite base model.
adapter = MotionAdapter().to(device, dtype)
adapter.load_state_dict(
load_file(hf_hub_download(repo, ckpt), device=device)
)
pipe = AnimateDiffPipeline.from_pretrained(
base, motion_adapter=adapter, torch_dtype=dtype
).to(device)
pipe.scheduler = EulerDiscreteScheduler.from_config(
pipe.scheduler.config,
timestep_spacing="trailing",
beta_schedule="linear",
)
output = pipe(
prompt=task, guidance_scale=1.0, num_inference_steps=step
)
out = export_to_gif(output.frames[0], "animation.gif")
return out
# Agent
agent = Agent(
agent_name="Devin",
system_prompt=(
"Autonomous agent that can interact with humans and other"
" agents. Be Helpful and Kind. Use the tools provided to"
" assist the user. Return all code in markdown format."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[text_to_video],
)
# Run the agent
out = agent("Create a vide of a girl coding AI wearing hijab")
print(out)

@ -9,9 +9,9 @@ from swarms.prompts.ai_research_team import (
)
from swarms.structs import Agent
from swarms.utils.pdf_to_text import pdf_to_text
from swarms import rearrange
# Base llms
# Environment variables
load_dotenv()
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
@ -30,6 +30,7 @@ llm2 = Anthropic(
# Agents
paper_summarizer_agent = Agent(
agent_name="paper_summarizer_agent",
llm=llm2,
sop=PAPER_SUMMARY_ANALYZER,
max_loops=1,
@ -38,6 +39,7 @@ paper_summarizer_agent = Agent(
)
paper_implementor_agent = Agent(
agent_name="paper_implementor_agent",
llm=llm1,
sop=PAPER_IMPLEMENTOR_AGENT_PROMPT,
max_loops=1,
@ -46,9 +48,31 @@ paper_implementor_agent = Agent(
code_interpreter=False,
)
paper = pdf_to_text(PDF_PATH)
algorithmic_psuedocode_agent = paper_summarizer_agent.run(
"Focus on creating the algorithmic pseudocode for the novel"
f" method in this paper: {paper}"
pytorch_pseudocode_agent = Agent(
agent_name="pytorch_pseudocode_agent",
llm=llm1,
sop=PAPER_IMPLEMENTOR_AGENT_PROMPT,
max_loops=1,
autosave=True,
saved_state_path="pytorch_pseudocode_agent.json",
code_interpreter=False,
)
pytorch_code = paper_implementor_agent.run(algorithmic_psuedocode_agent)
paper = pdf_to_text(PDF_PATH)
task = f"""
Focus on creating the algorithmic pseudocode for the novel
f" method in this paper: {paper}
"""
agents = [
paper_summarizer_agent,
paper_implementor_agent,
pytorch_pseudocode_agent,
]
flow = "paper_summarizer_agent -> paper_implementor_agent -> pytorch_pseudocode_agent"
swarm = rearrange(agents, flow, task)
print(swarm)

@ -1,12 +1,10 @@
import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from swarms import Agent
from swarms.models import OpenAIChat
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.structs.rearrange import AgentRearrange
from typing import Optional, List, Dict, Any
# Load the environment variables
load_dotenv()
@ -74,69 +72,6 @@ def TREATMENT_PLAN_SYSTEM_PROMPT() -> str:
"""
class LLMConfig(BaseModel):
model_name: str
max_tokens: int
class AgentConfig(BaseModel):
agent_name: str
system_prompt: str
llm: LLMConfig
max_loops: int
autosave: bool
dashboard: bool
class AgentRearrangeConfig(BaseModel):
agents: List[AgentConfig]
flow: str
max_loops: int
verbose: bool
class AgentRunResult(BaseModel):
agent_name: str
output: Dict[str, Any]
tokens_generated: int
class RunAgentsResponse(BaseModel):
results: List[AgentRunResult]
total_tokens_generated: int
class AgentRearrangeResponse(BaseModel):
results: List[AgentRunResult]
total_tokens_generated: int
class RunConfig(BaseModel):
task: str = Field(..., title="The task to run")
flow: str = "D -> T"
image: Optional[str] = None # Optional image path as a string
max_loops: Optional[int] = 1
# @app.get("/v1/health")
# async def health_check():
# return JSONResponse(content={"status": "healthy"})
# @app.get("/v1/models_available")
# async def models_available():
# available_models = {
# "models": [
# {"name": "gpt-4-1106-vision-preview", "type": "vision"},
# {"name": "openai-chat", "type": "text"},
# ]
# }
# return JSONResponse(content=available_models)
# @app.get("/v1/swarm/completions")
# async def run_agents(run_config: RunConfig):
# Diagnoser agent
diagnoser = Agent(
# agent_name="Medical Image Diagnostic Agent",
agent_name="D",
@ -167,4 +102,6 @@ rearranger = AgentRearrange(
)
# Run the agents
results = rearranger.run("")
results = rearranger.run(
"Analyze the medical image and provide a treatment plan."
)

@ -1,6 +1,6 @@
from swarms import Agent, OpenAIChat
from typing import List
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
memory = ChromaDB(
metric="cosine",

@ -12,18 +12,6 @@ Example:
from swarms import Agent, OpenAIChat
# # Memory
# memory = ChromaDB(
# output_dir="social_media_marketing",
# docs_folder="docs",
# )
# Memory for instagram
# memory = ChromaDB(
# output_dir="social_media_marketing",
# docs_folder="docs",
# )
llm = OpenAIChat(max_tokens=4000)

@ -0,0 +1,392 @@
"""
Problem: We're creating specialized agents for various social medias
List of agents:
- Facebook agent
- Twitter agent
- Instagram agent
- LinkedIn agent
- TikTok agent
- Reddit agent
- Pinterest agent
- Snapchat agent
- YouTube agent
- WhatsApp agent
"""
from swarms import Agent, OpenAIChat, MixtureOfAgents
import os
import requests
# Model
model = OpenAIChat(max_tokens=4000, temperature=0.8)
# Content Variables
facebook_content = "Here is the content for Facebook"
twitter_content = "Here is the content for Twitter"
instagram_content = "Here is the content for Instagram"
linkedin_content = "Here is the content for LinkedIn"
tiktok_content = "Here is the content for TikTok"
reddit_content = "Here is the content for Reddit"
pinterest_content = "Here is the content for Pinterest"
snapchat_content = "Here is the content for Snapchat"
youtube_content = "Here is the content for YouTube"
whatsapp_content = "Here is the content for WhatsApp"
# Prompt Variables
facebook_prompt = f"""
You are a Facebook social media agent. Your task is to create a post that maximizes engagement on Facebook. Use rich media, personal stories, and interactive content. Ensure the post is compelling and includes a call-to-action. Here is the content to work with: {facebook_content}
"""
twitter_prompt = f"""
You are a Twitter social media agent. Your task is to create a tweet that is short, concise, and uses trending hashtags. The tweet should be engaging and include relevant media such as images, GIFs, or short videos. Here is the content to work with: {twitter_content}
"""
instagram_prompt = f"""
You are an Instagram social media agent. Your task is to create a visually appealing post that includes high-quality images and engaging captions. Consider using stories and reels to maximize reach. Here is the content to work with: {instagram_content}
"""
linkedin_prompt = f"""
You are a LinkedIn social media agent. Your task is to create a professional and insightful post related to industry trends or personal achievements. The post should include relevant media such as articles, professional photos, or videos. Here is the content to work with: {linkedin_content}
"""
tiktok_prompt = f"""
You are a TikTok social media agent. Your task is to create a short, entertaining video that aligns with trending challenges and music. The video should be engaging and encourage viewers to interact. Here is the content to work with: {tiktok_content}
"""
reddit_prompt = f"""
You are a Reddit social media agent. Your task is to create an engaging post for relevant subreddits. The post should spark in-depth discussions and include relevant media such as images or links. Here is the content to work with: {reddit_content}
"""
pinterest_prompt = f"""
You are a Pinterest social media agent. Your task is to create high-quality, visually appealing pins. Focus on popular categories such as DIY, fashion, and lifestyle. Here is the content to work with: {pinterest_content}
"""
snapchat_prompt = f"""
You are a Snapchat social media agent. Your task is to create engaging and timely snaps and stories. Include personal touches and use filters or AR lenses to enhance the content. Here is the content to work with: {snapchat_content}
"""
youtube_prompt = f"""
You are a YouTube social media agent. Your task is to create high-quality videos with engaging thumbnails. Ensure a consistent posting schedule and encourage viewer interaction. Here is the content to work with: {youtube_content}
"""
whatsapp_prompt = f"""
You are a WhatsApp social media agent. Your task is to send personalized messages and updates. Use broadcast lists and ensure the messages are engaging and relevant. Here is the content to work with: {whatsapp_content}
"""
def post_to_twitter(content: str) -> None:
"""
Posts content to Twitter.
Args:
content (str): The content to post on Twitter.
Raises:
ValueError: If the content is empty or exceeds the character limit.
requests.exceptions.RequestException: If there is an error with the request.
"""
try:
if not content:
raise ValueError("Content cannot be empty.")
if len(content) > 280:
raise ValueError(
"Content exceeds Twitter's 280 character limit."
)
# Retrieve the access token from environment variables
access_token = os.getenv("TWITTER_ACCESS_TOKEN")
if not access_token:
raise EnvironmentError(
"Twitter access token not found in environment variables."
)
# Mock API endpoint for example purposes
api_url = "https://api.twitter.com/2/tweets"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
data = {"text": content}
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()
print("Content posted to Twitter successfully.")
except ValueError as e:
print(f"Error: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
raise
def post_to_instagram(content: str) -> None:
"""
Posts content to Instagram.
Args:
content (str): The content to post on Instagram.
Raises:
ValueError: If the content is empty or exceeds the character limit.
requests.exceptions.RequestException: If there is an error with the request.
"""
try:
if not content:
raise ValueError("Content cannot be empty.")
if len(content) > 2200:
raise ValueError(
"Content exceeds Instagram's 2200 character limit."
)
# Retrieve the access token from environment variables
access_token = os.getenv("INSTAGRAM_ACCESS_TOKEN")
user_id = os.getenv("INSTAGRAM_USER_ID")
if not access_token or not user_id:
raise EnvironmentError(
"Instagram access token or user ID not found in environment variables."
)
# Mock API endpoint for example purposes
api_url = f"https://graph.instagram.com/v10.0/{user_id}/media"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
data = {
"caption": content,
"image_url": "URL_OF_THE_IMAGE_TO_POST", # Replace with actual image URL if needed
}
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()
print("Content posted to Instagram successfully.")
except ValueError as e:
print(f"Error: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
raise
def post_to_facebook(content: str) -> None:
"""
Posts content to Facebook.
Args:
content (str): The content to post on Facebook.
Raises:
ValueError: If the content is empty.
requests.exceptions.RequestException: If there is an error with the request.
"""
try:
if not content:
raise ValueError("Content cannot be empty.")
# Retrieve the access token from environment variables
access_token = os.getenv("FACEBOOK_ACCESS_TOKEN")
if not access_token:
raise EnvironmentError(
"Facebook access token not found in environment variables."
)
# Mock API endpoint for example purposes
api_url = "https://graph.facebook.com/v10.0/me/feed"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
data = {"message": content}
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()
print("Content posted to Facebook successfully.")
except ValueError as e:
print(f"Error: {e}")
raise
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
raise
# Prompts
prompts = [
facebook_prompt,
twitter_prompt,
instagram_prompt,
linkedin_prompt,
tiktok_prompt,
reddit_prompt,
pinterest_prompt,
snapchat_prompt,
youtube_prompt,
whatsapp_prompt,
]
# For every prompt, we're going to create a list of agents
for prompt in prompts:
agents = [
Agent(
agent_name="Facebook Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="facebook_agent.json",
),
Agent(
agent_name="Twitter Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
tools=[post_to_twitter],
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="twitter_agent.json",
),
Agent(
agent_name="Instagram Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
tools=[post_to_instagram],
state_save_file_type="json",
saved_state_path="instagram_agent.json",
),
Agent(
agent_name="LinkedIn Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="linkedin_agent.json",
),
Agent(
agent_name="TikTok Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="tiktok_agent.json",
),
Agent(
agent_name="Reddit Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="reddit_agent.json",
),
Agent(
agent_name="Pinterest Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="pinterest_agent.json",
),
Agent(
agent_name="Snapchat Agent",
system_prompt=prompt,
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="snapchat_agent.json",
),
]
# Final agent
final_agent = Agent(
agent_name="Final Agent",
system_prompt="Ensure the content is optimized for all social media platforms.",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
dynamic_temperature_enabled=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="final_agent.json",
)
# Create a mixture of agents
swarm = MixtureOfAgents(
agents=agents,
final_agent=final_agent,
layers=1,
verbose=True,
)
# parallel_swarm = AgentRearrange(
# agents=agents,
# flow=f"{agents[0].agent_name} -> {agents[1].agent_name}, {agents[2].agent_name}, {agents[3].agent_name}, {agents[4].agent_name}, {agents[5].agent_name}",
# max_loops=1,
# verbose=True,
# )
# Run the swarm
swarm.run(
"""
[Workshop Today][Unlocking The Secrets of Multi-Agent Collaboration]
[Location][https://lu.ma/tfn0fp37]
[Time][Today 2:30pm PST -> 4PM PST] [Circa 5 hours]
Sign up and invite your friends we're going to dive into various multi-agent orchestration workflows in swarms:
https://github.com/kyegomez/swarms
And, the swarms docs:
https://docs.swarms.world/en/latest/
"""
)

@ -15,7 +15,7 @@ task -> Understanding Agent [understands the problem better] -> Summarize of the
from swarms import Agent, llama3Hosted, AgentRearrange
from pydantic import BaseModel
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
# Initialize the language model agent (e.g., GPT-3)
llm = llama3Hosted(max_tokens=3000)

@ -11,7 +11,7 @@ Todo [Improvements]
from swarms import Agent
from swarms.models.llama3_hosted import llama3Hosted
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
# Model

Binary file not shown.

Before

Width:  |  Height:  |  Size: 113 KiB

@ -1,14 +0,0 @@
"""from swarms.models import Dalle3
# Create an instance of the Dalle3 class with high quality
dalle3 = Dalle3(quality="high")
# Define a text prompt
task = "A high-quality image of a sunset"
# Generate a high-quality image from the text prompt
image_url = dalle3(task)
# Print the generated image URL
print(image_url)
"""

@ -1,36 +0,0 @@
from swarms.models import HuggingfaceLLM
import torch
try:
inference = HuggingfaceLLM(
model_id="gpt2",
quantize=False,
verbose=True,
)
device = "cuda" if torch.cuda.is_available() else "cpu"
inference.model.to(device)
prompt_text = (
"Create a list of known biggest risks of structural collapse"
" with references"
)
inputs = inference.tokenizer(prompt_text, return_tensors="pt").to(
device
)
generated_ids = inference.model.generate(
**inputs,
max_new_tokens=1000, # Adjust the length of the generation
temperature=0.7, # Adjust creativity
top_k=50, # Limits the vocabulary considered at each step
pad_token_id=inference.tokenizer.eos_token_id,
do_sample=True, # Enable sampling to utilize temperature
)
generated_text = inference.tokenizer.decode(
generated_ids[0], skip_special_tokens=True
)
print(generated_text)
except Exception as e:
print(f"An error occurred: {e}")

@ -1,10 +0,0 @@
from swarms.models import Mixtral
# Initialize the Mixtral model with 4 bit and flash attention!
mixtral = Mixtral(load_in_4bit=True, use_flash_attention_2=True)
# Generate text for a simple task
generated_text = mixtral.run("Generate a creative story.")
# Print the generated text
print(generated_text)

@ -1,45 +0,0 @@
import os
from dotenv import load_dotenv
from swarms import (
OpenAIChat,
Conversation,
)
conv = Conversation(
time_enabled=True,
)
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(openai_api_key=api_key, model_name="gpt-4")
# Run the language model in a loop
def interactive_conversation(llm):
conv = Conversation()
while True:
user_input = input("User: ")
conv.add("user", user_input)
if user_input.lower() == "quit":
break
task = (
conv.return_history_as_string()
) # Get the conversation history
out = llm(task)
conv.add("assistant", out)
print(
f"Assistant: {out}",
)
conv.display_conversation()
conv.export_conversation("conversation.txt")
# Replace with your LLM instance
interactive_conversation(llm)

@ -1,35 +0,0 @@
# Importing necessary modules
import os
from dotenv import load_dotenv
from swarms import Worker, OpenAIChat, tool
# Loading environment variables from .env file
load_dotenv()
# Retrieving the OpenAI API key from environment variables
api_key = os.getenv("OPENAI_API_KEY")
# Create a tool
@tool
def search_api(query: str):
pass
# Creating a Worker instance
worker = Worker(
name="My Worker",
role="Worker",
human_in_the_loop=False,
tools=[search_api],
temperature=0.5,
llm=OpenAIChat(openai_api_key=api_key),
)
# Running the worker with a prompt
out = worker.run(
"Hello, how are you? Create an image of how your are doing!"
)
# Printing the output
print(out)

@ -1,12 +0,0 @@
# Import the model
from swarms import ZeroscopeTTV
# Initialize the model
zeroscope = ZeroscopeTTV()
# Specify the task
task = "A person is walking on the street."
# Generate the video!
video_path = zeroscope(task)
print(video_path)

@ -1,186 +0,0 @@
import logging
import os
import uuid
from typing import Optional
import chromadb
from dotenv import load_dotenv
from swarms.utils.data_to_text import data_to_text
from swarms.utils.markdown_message import display_markdown_message
from swarms.memory.base_vectordb import BaseVectorDatabase
# Load environment variables
load_dotenv()
# Results storage using local ChromaDB
class ChromaDB(BaseVectorDatabase):
"""
ChromaDB database
Args:
metric (str): The similarity metric to use.
output (str): The name of the collection to store the results in.
limit_tokens (int, optional): The maximum number of tokens to use for the query. Defaults to 1000.
n_results (int, optional): The number of results to retrieve. Defaults to 2.
Methods:
add: _description_
query: _description_
Examples:
>>> chromadb = ChromaDB(
>>> metric="cosine",
>>> output="results",
>>> llm="gpt3",
>>> openai_api_key=OPENAI_API_KEY,
>>> )
>>> chromadb.add(task, result, result_id)
"""
def __init__(
self,
metric: str = "cosine",
output_dir: str = "swarms",
limit_tokens: Optional[int] = 1000,
n_results: int = 1,
docs_folder: str = None,
verbose: bool = False,
*args,
**kwargs,
):
self.metric = metric
self.output_dir = output_dir
self.limit_tokens = limit_tokens
self.n_results = n_results
self.docs_folder = docs_folder
self.verbose = verbose
# Disable ChromaDB logging
if verbose:
logging.getLogger("chromadb").setLevel(logging.INFO)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.PersistentClient(
settings=chromadb.config.Settings(
persist_directory=chroma_persist_dir,
),
*args,
**kwargs,
)
# Create ChromaDB client
self.client = chromadb.Client()
# Create Chroma collection
self.collection = chroma_client.get_or_create_collection(
name=output_dir,
metadata={"hnsw:space": metric},
*args,
**kwargs,
)
display_markdown_message(
"ChromaDB collection created:"
f" {self.collection.name} with metric: {self.metric} and"
f" output directory: {self.output_dir}"
)
# If docs
if docs_folder:
display_markdown_message(
f"Traversing directory: {docs_folder}"
)
self.traverse_directory()
def add(
self,
document: str,
*args,
**kwargs,
):
"""
Add a document to the ChromaDB collection.
Args:
document (str): The document to be added.
condition (bool, optional): The condition to check before adding the document. Defaults to True.
Returns:
str: The ID of the added document.
"""
try:
doc_id = str(uuid.uuid4())
self.collection.add(
ids=[doc_id],
documents=[document],
*args,
**kwargs,
)
print("-----------------")
print("Document added successfully")
print("-----------------")
return doc_id
except Exception as e:
raise Exception(f"Failed to add document: {str(e)}")
def query(
self,
query_text: str,
*args,
**kwargs,
) -> str:
"""
Query documents from the ChromaDB collection.
Args:
query (str): The query string.
n_docs (int, optional): The number of documents to retrieve. Defaults to 1.
Returns:
dict: The retrieved documents.
"""
try:
logging.info(f"Querying documents for: {query_text}")
docs = self.collection.query(
query_texts=[query_text],
n_results=self.n_results,
*args,
**kwargs,
)["documents"]
# Convert into a string
out = ""
for doc in docs:
out += f"{doc}\n"
# Display the retrieved document
display_markdown_message(f"Query: {query_text}")
display_markdown_message(f"Retrieved Document: {out}")
return out
except Exception as e:
raise Exception(f"Failed to query documents: {str(e)}")
def traverse_directory(self):
"""
Traverse through every file in the given directory and its subdirectories,
and return the paths of all files.
Parameters:
- directory_name (str): The name of the directory to traverse.
Returns:
- list: A list of paths to each file in the directory and its subdirectories.
"""
added_to_db = False
for root, dirs, files in os.walk(self.docs_folder):
for file in files:
file_path = os.path.join(root, file) # Change this line
_, ext = os.path.splitext(file_path)
data = data_to_text(file_path)
added_to_db = self.add(str(data))
print(f"{file_path} added to Database")
return added_to_db

@ -1,14 +0,0 @@
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://kye:Kgx7d2FeLN7AyGNh@cluster0.ndu3b6d.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi("1"))
# Send a ping to confirm a successful connection
try:
client.admin.command("ping")
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
print(e)

@ -4,7 +4,7 @@ import pinecone
from attr import define, field
from swarms.memory.base_vectordb import BaseVectorDatabase
from swarms.utils.hash import str_to_hash
from swarms.utils import str_to_hash
@define

@ -1,25 +0,0 @@
from langchain.document_loaders import CSVLoader
from swarms.memory import qdrant
loader = CSVLoader(
file_path="../document_parsing/aipg/aipg.csv",
encoding="utf-8-sig",
)
docs = loader.load()
# Initialize the Qdrant instance
# See qdrant documentation on how to run locally
qdrant_client = qdrant.Qdrant(
host="https://697ea26c-2881-4e17-8af4-817fcb5862e8.europe-west3-0.gcp.cloud.qdrant.io",
collection_name="qdrant",
)
qdrant_client.add_vectors(docs)
# Perform a search
search_query = "Who is jojo"
search_results = qdrant_client.search_vectors(search_query)
print("Search Results:")
for result in search_results:
print(result)

@ -1,180 +0,0 @@
"""
Weaviate API Client
"""
from typing import Any, Dict, List, Optional
from swarms.memory.base_vectordb import BaseVectorDatabase
try:
import weaviate
except ImportError:
print("pip install weaviate-client")
class WeaviateDB(BaseVectorDatabase):
"""
Weaviate API Client
Interface to Weaviate, a vector database with a GraphQL API.
Args:
http_host (str): The HTTP host of the Weaviate server.
http_port (str): The HTTP port of the Weaviate server.
http_secure (bool): Whether to use HTTPS.
grpc_host (Optional[str]): The gRPC host of the Weaviate server.
grpc_port (Optional[str]): The gRPC port of the Weaviate server.
grpc_secure (Optional[bool]): Whether to use gRPC over TLS.
auth_client_secret (Optional[Any]): The authentication client secret.
additional_headers (Optional[Dict[str, str]]): Additional headers to send with requests.
additional_config (Optional[weaviate.AdditionalConfig]): Additional configuration for the client.
Methods:
create_collection: Create a new collection in Weaviate.
add: Add an object to a specified collection.
query: Query objects from a specified collection.
update: Update an object in a specified collection.
delete: Delete an object from a specified collection.
Examples:
>>> from swarms.memory import WeaviateDB
"""
def __init__(
self,
http_host: str,
http_port: str,
http_secure: bool,
grpc_host: Optional[str] = None,
grpc_port: Optional[str] = None,
grpc_secure: Optional[bool] = None,
auth_client_secret: Optional[Any] = None,
additional_headers: Optional[Dict[str, str]] = None,
additional_config: Optional[Any] = None,
connection_params: Dict[str, Any] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.http_host = http_host
self.http_port = http_port
self.http_secure = http_secure
self.grpc_host = grpc_host
self.grpc_port = grpc_port
self.grpc_secure = grpc_secure
self.auth_client_secret = auth_client_secret
self.additional_headers = additional_headers
self.additional_config = additional_config
self.connection_params = connection_params
# If connection_params are provided, use them to initialize the client.
connection_params = weaviate.ConnectionParams.from_params(
http_host=http_host,
http_port=http_port,
http_secure=http_secure,
grpc_host=grpc_host,
grpc_port=grpc_port,
grpc_secure=grpc_secure,
)
# If additional headers are provided, add them to the connection params.
self.client = weaviate.WeaviateDB(
connection_params=connection_params,
auth_client_secret=auth_client_secret,
additional_headers=additional_headers,
additional_config=additional_config,
)
def create_collection(
self,
name: str,
properties: List[Dict[str, Any]],
vectorizer_config: Any = None,
):
"""Create a new collection in Weaviate.
Args:
name (str): _description_
properties (List[Dict[str, Any]]): _description_
vectorizer_config (Any, optional): _description_. Defaults to None.
"""
try:
out = self.client.collections.create(
name=name,
vectorizer_config=vectorizer_config,
properties=properties,
)
print(out)
except Exception as error:
print(f"Error creating collection: {error}")
raise
def add(self, collection_name: str, properties: Dict[str, Any]):
"""Add an object to a specified collection.
Args:
collection_name (str): _description_
properties (Dict[str, Any]): _description_
Returns:
_type_: _description_
"""
try:
collection = self.client.collections.get(collection_name)
return collection.data.insert(properties)
except Exception as error:
print(f"Error adding object: {error}")
raise
def query(self, collection_name: str, query: str, limit: int = 10):
"""Query objects from a specified collection.
Args:
collection_name (str): _description_
query (str): _description_
limit (int, optional): _description_. Defaults to 10.
Returns:
_type_: _description_
"""
try:
collection = self.client.collections.get(collection_name)
response = collection.query.bm25(query=query, limit=limit)
return [o.properties for o in response.objects]
except Exception as error:
print(f"Error querying objects: {error}")
raise
def update(
self,
collection_name: str,
object_id: str,
properties: Dict[str, Any],
):
"""UPdate an object in a specified collection.
Args:
collection_name (str): _description_
object_id (str): _description_
properties (Dict[str, Any]): _description_
"""
try:
collection = self.client.collections.get(collection_name)
collection.data.update(object_id, properties)
except Exception as error:
print(f"Error updating object: {error}")
raise
def delete(self, collection_name: str, object_id: str):
"""Delete an object from a specified collection.
Args:
collection_name (str): _description_
object_id (str): _description_
"""
try:
collection = self.client.collections.get(collection_name)
collection.data.delete_by_id(object_id)
except Exception as error:
print(f"Error deleting object: {error}")
raise

@ -1,6 +1,8 @@
import os
from swarms.models import Anthropic
model = Anthropic(anthropic_api_key="")
model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"))
task = "What is quantum field theory? What are 3 books on the field?"

@ -1,13 +0,0 @@
import asyncio
from swarms.models.distilled_whisperx import DistilWhisperModel
model_wrapper = DistilWhisperModel()
# Download mp3 of voice and place the path here
transcription = model_wrapper("path/to/audio.mp3")
# For async usage
transcription = asyncio.run(
model_wrapper.async_transcribe("path/to/audio.mp3")
)

@ -1,12 +0,0 @@
from swarms import Mistral
# Initialize the model
model = Mistral(
model_name="miqudev/miqu-1-70b",
max_length=500,
use_flash_attention=True,
load_in_4bit=True,
)
# Run the model
result = model.run("What is the meaning of life?")

@ -1,7 +0,0 @@
from swarms.models import Mistral
model = Mistral(device="cuda", use_flash_attention=True)
prompt = "My favourite condiment is"
result = model.run(prompt)
print(result)

@ -1,9 +0,0 @@
from swarms.models.mpt import MPT
mpt_instance = MPT(
"mosaicml/mpt-7b-storywriter",
"EleutherAI/gpt-neox-20b",
max_tokens=150,
)
mpt_instance.generate("Once upon a time in a land far, far away...")

@ -1,7 +0,0 @@
from swarms.models.openai_chat import OpenAIChat
model = OpenAIChat()
out = model("Hello, how are you?")
print(out)

@ -1,6 +1,10 @@
from swarms.models.openai_models import OpenAIChat
import os
from swarms.models import OpenAIChat
openai = OpenAIChat(openai_api_key="", verbose=False)
# Load doten
openai = OpenAIChat(
openai_api_key=os.getenv("OPENAI_API_KEY"), verbose=False
)
chat = openai("What are quantum fields?")
print(chat)

@ -0,0 +1,88 @@
from swarms.structs.agent_registry import AgentRegistry
from swarms import Agent
from swarms.models import Anthropic
# Initialize the agents
growth_agent1 = Agent(
agent_name="Marketing Specialist",
system_prompt="You're the marketing specialist, your purpose is to help companies grow by improving their marketing strategies!",
agent_description="Improve a company's marketing strategies!",
llm=Anthropic(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="marketing_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
growth_agent2 = Agent(
agent_name="Sales Specialist",
system_prompt="You're the sales specialist, your purpose is to help companies grow by improving their sales strategies!",
agent_description="Improve a company's sales strategies!",
llm=Anthropic(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="sales_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
growth_agent3 = Agent(
agent_name="Product Development Specialist",
system_prompt="You're the product development specialist, your purpose is to help companies grow by improving their product development strategies!",
agent_description="Improve a company's product development strategies!",
llm=Anthropic(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="product_development_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
growth_agent4 = Agent(
agent_name="Customer Service Specialist",
system_prompt="You're the customer service specialist, your purpose is to help companies grow by improving their customer service strategies!",
agent_description="Improve a company's customer service strategies!",
llm=Anthropic(),
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
streaming_on=True,
saved_state_path="customer_service_specialist.json",
stopping_token="Stop!",
interactive=True,
context_length=1000,
)
# Register the agents\
registry = AgentRegistry()
# Register the agents
registry.add("Marketing Specialist", growth_agent1)
registry.add("Sales Specialist", growth_agent2)
registry.add("Product Development Specialist", growth_agent3)
registry.add("Customer Service Specialist", growth_agent4)
# Query the agents
registry.get("Marketing Specialist")
registry.get("Sales Specialist")
registry.get("Product Development Specialist")
# Get all the agents
registry.list_agents()

@ -1,45 +0,0 @@
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.autoscaler import AutoScaler
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, dashboard=True)
# Load the autoscaler
autoscaler = AutoScaler(
initial_agents=2,
scale_up_factor=1,
idle_threshold=0.2,
busy_threshold=0.7,
agents=[agent],
autoscale=True,
min_agents=1,
max_agents=5,
custom_scale_strategy=None,
)
print(autoscaler)
# Run the workflow on a task
out = autoscaler.run(
agent.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)

@ -1,6 +1,6 @@
from swarms import Agent, OpenAIChat
from swarms.structs.mixture_of_agents import MixtureOfAgents
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
SEC_DATA = """

@ -1,6 +1,6 @@
from swarms import Agent, OpenAIChat
from swarms.structs.mixture_of_agents import MixtureOfAgents
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
SEC_DATA = """

@ -6,7 +6,7 @@ from swarms import (
OpenAIChat,
TogetherLLM,
)
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
from dotenv import load_dotenv
# load the environment variables

@ -10,7 +10,7 @@ $ pip install swarms
"""
from swarms import Agent, OpenAIChat
from playground.memory.chromadb_example import ChromaDB
from swarms_memory import ChromaDB
from swarms.tools.prebuilt.bing_api import fetch_web_articles_bing_api
import os
from dotenv import load_dotenv

@ -96,7 +96,7 @@
"outputs": [],
"source": [
"from swarms import Agent, OpenAIChat\n",
"from playground.memory.chromadb_example import ChromaDB\n",
"from swarms_memory import ChromaDB\n",
"\n",
"# Making an instance of the ChromaDB class\n",
"memory = ChromaDB(\n",
@ -142,7 +142,9 @@
"metadata": {},
"outputs": [],
"source": [
"from swarms import Agent, ChromaDB, OpenAIChat, tool\n",
"# !pip install swarms-memory\n",
"from swarms import Agent, OpenAIChat, tool\n",
"from swarms_memory import ChromaDB\n",
"\n",
"# Making an instance of the ChromaDB class\n",
"memory = ChromaDB(\n",

@ -1,14 +0,0 @@
import pandas as pd
from swarms import dataframe_to_text
# # Example usage:
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": [4, 5, 6],
"C": [7, 8, 9],
}
)
print(dataframe_to_text(df))

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "5.3.2"
version = "5.3.4"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -20,6 +20,11 @@ keywords = [
"Prompt Engineering",
"swarms",
"agents",
"llms",
"transformers",
"multi-agent",
"swarms of agents",
"chicken nuggets",
]
classifiers = [
"Development Status :: 4 - Beta",
@ -42,8 +47,8 @@ toml = "*"
pypdf = "4.1.0"
ratelimit = "2.2.1"
loguru = "0.7.2"
pydantic = "2.7.4"
tenacity = "8.3.0"
pydantic = "2.8.2"
tenacity = "8.4.2"
Pillow = "10.3.0"
psutil = "*"
sentry-sdk = "*"
@ -55,13 +60,14 @@ openai = ">=1.30.1,<2.0"
termcolor = "*"
tiktoken = "*"
networkx = "*"
swarms-memory = "*"
[tool.poetry.group.lint.dependencies]
black = ">=23.1,<25.0"
ruff = ">=0.0.249,<0.4.10"
ruff = ">=0.5.1,<0.5.2"
types-toml = "^0.10.8.1"
types-pytz = ">=2023.3,<2025.0"
types-chardet = "^5.0.4.6"

@ -29,3 +29,4 @@ termcolor>=2.4.0
pandas>=2.2.2
fastapi>=0.110.1
networkx
swarms-memory

@ -110,7 +110,8 @@ def TEST_WRITER_SOP_PROMPT(
TESTS_PROMPT = f"""
Create 5,000 lines of extensive and thorough tests for the code below using the guide, do not worry about your limits you do not have any
just write the best tests possible, the module is {module}, the file path is {path}
just write the best tests possible, the module is {module}, the file path is {path} return all of the code in one file, make sure to test all the functions and methods in the code.
######### TESTING GUIDE #############

@ -0,0 +1,7 @@
"""
A script that runs ruff, black, autopep8, and all other formatters in one python script on a cron job.
- Perhaps make a github workflow as well
"""

@ -58,4 +58,4 @@ def cleanup_json_logs(name: str = None):
# Call the function
cleanup_json_logs("sequential_workflow_agents")
cleanup_json_logs("social_media_swarm")

@ -41,6 +41,7 @@ from swarms.models.types import ( # noqa: E402
VideoModality,
)
from swarms.models.vilt import Vilt # noqa: E402
from swarms.models.popular_llms import FireWorksAI
__all__ = [
"BaseEmbeddingModel",
@ -77,4 +78,5 @@ __all__ = [
"OpenAIEmbeddings",
"llama3Hosted",
"GPT4o",
"FireWorksAI",
]

@ -80,3 +80,11 @@ class OctoAIChat(OctoAIEndpoint):
def run(self, *args, **kwargs):
return self.invoke(*args, **kwargs)
class FireWorksAI(Fireworks):
def __call__(self, *args, **kwargs):
return self.invoke(*args, **kwargs)
def run(self, *args, **kwargs):
return self.invoke(*args, **kwargs)

@ -50,6 +50,7 @@ class AgentRearrange(BaseSwarm):
self.human_in_the_loop = human_in_the_loop
self.custom_human_in_the_loop = custom_human_in_the_loop
self.swarm_history = {agent.agent_name: [] for agent in agents}
self.sub_swarm = {}
# Verbose is True
if verbose is True:
@ -66,6 +67,14 @@ class AgentRearrange(BaseSwarm):
)
)
def add_sub_swarm(self, name: str, flow: str):
self.sub_swarm[name] = flow
logger.info(f"Sub-swarm {name} added with flow: {flow}")
def set_custom_flow(self, flow: str):
self.flow = flow
logger.info(f"Custom flow set: {flow}")
def add_agent(self, agent: Agent):
"""
Adds an agent to the swarm.
@ -251,6 +260,77 @@ class AgentRearrange(BaseSwarm):
logger.error(f"An error occurred: {e}")
return e
def process_agent_or_swarm(
self, name: str, task: str, img: str, *args, **kwargs
):
"""
process_agent_or_swarm: Processes the agent or sub-swarm based on the given name.
Args:
name (str): The name of the agent or sub-swarm to process.
task (str): The task to be executed.
img (str): The image to be processed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
str: The result of the last executed task.
"""
if name.startswith("Human"):
return self.human_intervention(task)
elif name in self.sub_swarm:
return self.run_sub_swarm(task, name, img, *args, **kwargs)
else:
agent = self.agents[name]
return agent.run(task, *args, **kwargs)
def human_intervention(self, task: str) -> str:
if self.human_in_the_loop and self.custom_human_in_the_loop:
return self.custom_human_in_the_loop(task)
else:
return input(
"Human intervention required. Enter your response: "
)
def run_sub_swarm(
self, swarm_name: str, task: str, img: str, *args, **kwargs
):
"""
Runs a sub-swarm by executing a sequence of tasks on a set of agents.
Args:
swarm_name (str): The name of the sub-swarm to run.
task (str): The initial task to be executed.
img (str): The image to be processed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
str: The result of the last executed task.
"""
sub_flow = self.sub_swarm[swarm_name]
sub_tasks = sub_flow.split("->")
current_task = task
for sub_task in sub_tasks:
agent_names = [name.strip() for name in sub_task.split(",")]
if len(agent_names) > 1:
results = []
for agent_name in agent_names:
result = self.process_agent_or_swarm(
agent_name, current_task, img, *args, **kwargs
)
results.append(result)
current_task = "; ".join(results)
else:
current_task = self.process_agent_or_swarm(
agent_names[0], current_task, img, *args, **kwargs
)
return current_task
def rearrange(
agents: List[Agent] = None,

@ -1,5 +1,5 @@
from typing import List
from swarms import Agent
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
from swarms.structs.rearrange import AgentRearrange

@ -4,7 +4,7 @@ from unittest.mock import Mock, patch
import pytest
from dotenv import load_dotenv
from swarms.models import BaseCohere, Cohere
from swarms import Cohere
# Load the environment variables
load_dotenv()
@ -154,7 +154,7 @@ def test_base_cohere_validate_environment():
"cohere_api_key": "my-api-key",
"user_agent": "langchain",
}
validated_values = BaseCohere.validate_environment(values)
validated_values = Cohere.validate_environment(values)
assert "client" in validated_values
assert "async_client" in validated_values
@ -166,7 +166,7 @@ def test_base_cohere_validate_environment_without_cohere():
}
with patch.dict("sys.modules", {"cohere": None}):
with pytest.raises(ImportError):
BaseCohere.validate_environment(values)
Cohere.validate_environment(values)
# Test cases for benchmarking generations with various models

Loading…
Cancel
Save