[FEAT][AgentRearrange] [DOCS][Swarms Cloud] [BUFG][Sequential Workflow]

pull/459/head^2
Kye 8 months ago
parent 035e5b81e6
commit ad58b388ea

@ -112,7 +112,9 @@ nav:
- Docker Setup: docker_setup.md
- Swarms Cloud API:
- Overview: "swarms_cloud/main.md"
- Available Models: "swarms_cloud/available_models.md"
- Migrate from OpenAI to Swarms in 3 lines of code: "swarms_cloud/migrate_openai.md"
- Getting Started with SOTA Vision Language Models VLM: "swarms_cloud/getting_started.md"
- Swarms Framework [PY]:
- Overview: "swarms/index.md"
- DIY Build Your Own Agent: "diy_your_own_agent.md"

@ -0,0 +1,17 @@
# Available Models
```markdown
| Model Name | Description | Input Price | Output Price | Use Cases |
|-----------------------|---------------------------------------------------------------------------------------------------------|--------------|--------------|------------------------------------------------------------------------|
| **Llama3-70b** | Llama 3 is an auto-regressive language model that uses an optimized transformer architecture. | $0.80/1M Tokens | $1.60/1M Tokens | General natural language processing tasks. |
| **Llava-Internlm2-20b** | LLaVA model fine-tuned from InternLM2-Chat-20B and CLIP-ViT-Large-patch14-336. | Contact for pricing | Contact for pricing | Enhanced language understanding integrated with visual processing. |
| **Llama-3-Giraffe-70B** | Abacus.AI presents our longer-necked variant of Llama 3 70B! | $1/1M Tokens | $2/1M Tokens | Extensive natural language tasks with a focus on depth and efficiency. |
| **Qwen-vl** | Qwen VL for real-world multi-modal function calling. | $5/1M Tokens | $10/1M Tokens | Multi-modal interactions and function handling in complex environments.|
| **XComposer2-4khd-7b** | One of the highest performing VLMs (Video Language Models). | $4/1M Tokens | $8/1M Tokens | High-resolution video processing and understanding. |
| **Llava-Llama-3** | Llama3 with Multi-Modal Processing. | $5/1M Tokens | $10/1M Tokens | Advanced multi-modal scenarios involving language and image processing. |
| **cogvlm-chat-17b** | Groundbreaking multimodal model designed to understand and reason about visual elements in images. | $5/1M Tokens | $10/1M Tokens | Image-based chatbots and interactive systems. |
```
## What models should we add?
[Book a call with us to learn more about your needs:](https://calendly.com/swarm-corp/30min)

@ -0,0 +1,94 @@
# Getting Started with State-of-the-Art Vision Language Models (VLMs) Using the Swarms API
The intersection of vision and language tasks within the field of artificial intelligence has led to the emergence of highly sophisticated models known as Vision Language Models (VLMs). These models leverage the capabilities of both computer vision and natural language processing to provide a more nuanced understanding of multimodal inputs. In this blog post, we will guide you through the process of integrating state-of-the-art VLMs available through the Swarms API, focusing particularly on models like "internlm-xcomposer2-4khd", which represents a blend of high-performance language and visual understanding.
#### What Are Vision Language Models?
Vision Language Models are at the frontier of integrating visual data processing with text analysis. These models are trained on large datasets that include both images and their textual descriptions, learning to correlate visual elements with linguistic context. The result is a model that can not only recognize objects in an image but also generate descriptive, context-aware text, answer questions about the image, and even engage in a dialogue about its content.
#### Why Use Swarms API for VLMs?
Swarms API provides access to several cutting-edge VLMs including the "internlm-xcomposer2-4khd" model. This API is designed for developers looking to seamlessly integrate advanced multimodal capabilities into their applications without the need for extensive machine learning expertise or infrastructure. Swarms API is robust, scalable, and offers state-of-the-art models that are continuously updated to leverage the latest advancements in AI research.
#### Prerequisites
Before diving into the technical setup, ensure you have the following:
- An active account with Swarms API to obtain an API key.
- Python installed on your machine (Python 3.6 or later is recommended).
- An environment where you can install packages and run Python scripts (like Visual Studio Code, Jupyter Notebook, or simply your terminal).
#### Setting Up Your Environment
First, you'll need to install the `OpenAI` Python library if it's not already installed:
```bash
pip install openai
```
#### Integrating the Swarms API
Heres a basic guide on how to set up the Swarms API in your Python environment:
1. **API Key Configuration**:
Start by setting up your API key and base URL. Replace `"your_swarms_key"` with the actual API key you obtained from Swarms.
```python
from openai import OpenAI
openai_api_key = "your_swarms_key"
openai_api_base = "https://api.swarms.world/v1"
```
2. **Initialize Client**:
Initialize your OpenAI client with the provided API key and base URL.
```python
client = OpenAI(
api_key=openai_api_key,
base_url=openai_api_base,
)
```
3. **Creating a Chat Completion**:
To use the VLM, youll send a request to the API with a multimodal input consisting of both an image and a text query. The following example shows how to structure this request:
```python
chat_response = client.chat.completions.create(
model="internlm-xcomposer2-4khd",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg",
},
},
{"type": "text", "text": "What's in this image?"},
]
}
],
)
print("Chat response:", chat_response)
```
This code sends a multimodal query to the model, which includes an image URL followed by a text question regarding the image.
#### Understanding the Response
The response from the API will include details generated by the model about the image based on the textual query. This could range from simple descriptions to complex narratives, depending on the models capabilities and the nature of the question.
#### Best Practices
- **Data Privacy**: Always ensure that the images and data you use comply with privacy laws and regulations.
- **Error Handling**: Implement robust error handling to manage potential issues during API calls.
- **Model Updates**: Keep track of updates to the Swarms API and model improvements to leverage new features and improved accuracies.
#### Conclusion
Integrating VLMs via the Swarms API opens up a plethora of opportunities for developers to create rich, interactive, and intelligent applications that understand and interpret the world not just through text but through visuals as well. Whether youre building an educational tool, a content management system, or an interactive chatbot, these models can significantly enhance the way users interact with your application.
As you embark on your journey to integrate these powerful models into your projects, remember that the key to successful implementation lies in understanding the capabilities and limitations of the technology, continually testing with diverse data, and iterating based on user feedback and technological advances.
Happy coding, and heres to building more intelligent, multimodal applications!

@ -6,7 +6,7 @@ The AI Chat Completion API processes text and image inputs to generate conversat
## API Endpoints
### Chat Completion
### Chat Completion URL
`https://api.swarms.world`

@ -1,14 +1,14 @@
from swarms import Agent, Anthropic
from swarms import Agent, OpenAIChat
# Initialize the agemt
# Initialize the agent
agent = Agent(
agent_name="Transcript Generator",
agent_description=(
"Generate a transcript for a youtube video on what swarms" " are!"
),
llm=Anthropic(),
max_loops=3,
llm=OpenAIChat(),
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
@ -24,9 +24,3 @@ out = agent.run(
"Generate a transcript for a youtube video on what swarms are!"
)
print(out)
# Save the state
check = agent.save_state(
"transcript_generator.json",
"Generate a transcript for a youtube video on what swarms are!",
)

@ -0,0 +1,68 @@
from pydantic import BaseModel, Field
from swarms import Agent
from swarms.models.popular_llms import Anthropic
from swarms.tools.openai_tool_creator_decorator import tool
# Importing the search API tool
@tool
def search_api(query: str) -> str:
"""
This tool searches the web for information about COVID-19 symptoms.
"""
return f"Search API tool called with query: {query}"
print(search_api("COVID-19 symptoms"))
# Initialize the schema for the person's information
class Schema(BaseModel):
name: str = Field(..., title="Name of the person")
agent: int = Field(..., title="Age of the person")
is_student: bool = Field(..., title="Whether the person is a student")
courses: list[str] = Field(
..., title="List of courses the person is taking"
)
# Convert the schema to a JSON string
tool_schema = Schema(
name="Tool Name",
agent=1,
is_student=True,
courses=["Course1", "Course2"],
)
# Define the task to generate a person's information
task = "Generate a person's information based on the following schema:"
# Initialize the agent
agent = Agent(
agent_name="WeatherMan Agent",
# Set the tool schema to the JSON string -- this is the key difference
tool_schema=tool_schema,
llm=Anthropic(),
max_loops=3,
autosave=True,
dashboard=False,
streaming_on=True,
tools=[], # or list of tools
verbose=True,
interactive=True,
# Set the output type to the tool schema which is a BaseModel
output_type=tool_schema, # or dict, or str
metadata_output_type="json",
# List of schemas that the agent can handle
list_tool_schemas=[tool_schema],
function_calling_format_type="OpenAI",
function_calling_type="json", # or soon yaml
execute_tool=True,
)
# Run the agent to generate the person's information
generated_data = agent.run(task)
# Print the generated data
print(f"Generated data: {generated_data}")

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "4.8.8"
version = "4.9.2"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -2,8 +2,8 @@ import os
import shutil
# Create a new directory for the log files if it doesn't exist
if not os.path.exists("artifacts_two"):
os.makedirs("artifacts_two")
if not os.path.exists("artifacts_three"):
os.makedirs("artifacts_three")
# Walk through the current directory
for dirpath, dirnames, filenames in os.walk("."):
@ -12,10 +12,10 @@ for dirpath, dirnames, filenames in os.walk("."):
if filename.endswith(".log"):
# Construct the full file path
file_path = os.path.join(dirpath, filename)
# Move the log file to the 'artifacts_two' directory
shutil.move(file_path, "artifacts_two")
# Move the log file to the 'artifacts_three' directory
shutil.move(file_path, "artifacts_three")
print(
"Moved all log files into the 'artifacts_two' directory and"
"Moved all log files into the 'artifacts_three' directory and"
" deleted their original location."
)

@ -1,41 +1,33 @@
from swarms import Anthropic, Agent, SequentialWorkflow
from swarms import Agent, SequentialWorkflow, Anthropic
# Initialize the language model agent (e.g., GPT-3)
llm = Anthropic()
# Initialize agents for individual tasks
agent1 = Agent(
agent_name="Blog generator", llm=llm, max_loops=1, dashboard=False
agent_name="Blog generator",
system_prompt="Generate a blog post like stephen king",
llm=llm,
max_loops=1,
dashboard=False,
tools=[],
)
agent2 = Agent(
agent_name="summarizer", llm=llm, max_loops=1, dashboard=False
agent_name="summarizer",
system_prompt="Sumamrize the blog post",
llm=llm,
max_loops=1,
dashboard=False,
tools=[],
)
# Create the Sequential workflow
workflow = SequentialWorkflow(
max_loops=1, objective="Create a full blog and then summarize it"
agents=[agent1, agent2], max_loops=1, verbose=False
)
# Add tasks to the workflow
workflow.add(
"Generate a 10,000 word blog on health and wellness.", agent1
) # this task will be executed task,
workflow.add(
"Summarize the generated blog", agent2
) # then the next agent will accomplish this task
# Run the workflow
out = workflow.run()
print(f"{out}")
workflow.run(
"Generate a blog post on how swarms of agents can help businesses grow."
)

@ -3,7 +3,7 @@ from typing import List
from langchain_experimental.autonomous_agents import AutoGPT
from swarms.structs.agent import Agent
from swarms.tools.tool import BaseTool
from swarms.tools.base_tool import BaseTool
from swarms.utils.decorators import error_decorator, timing_decorator

@ -15,9 +15,7 @@ from swarms.models.mpt import MPT7B # noqa: E402
from swarms.models.nougat import Nougat # noqa: E402
from swarms.models.palm import GooglePalm as Palm # noqa: E402
from swarms.models.openai_tts import OpenAITTS # noqa: E402
from swarms.models.popular_llms import (
AnthropicChat as Anthropic,
)
from swarms.models.popular_llms import Anthropic as Anthropic
from swarms.models.popular_llms import (
AzureOpenAILLM as AzureOpenAI,
)

@ -12,7 +12,7 @@ from langchain_community.llms.octoai_endpoint import OctoAIEndpoint
from langchain.llms.replicate import Replicate
class AnthropicChat(Anthropic):
class Anthropic(Anthropic):
def __call__(self, *args, **kwargs):
return self.invoke(*args, **kwargs)

@ -7,7 +7,6 @@ from swarms.prompts.operations_agent_prompt import (
OPERATIONS_AGENT_PROMPT,
)
from swarms.prompts.product_agent_prompt import PRODUCT_AGENT_PROMPT
from swarms.prompts.schema_generator import SchemaGenerator
__all__ = [
"CODE_INTERPRETER",
@ -17,5 +16,4 @@ __all__ = [
"OPERATIONS_AGENT_PROMPT",
"PRODUCT_AGENT_PROMPT",
"DOCUMENTATION_WRITER_SOP",
"SchemaGenerator",
]

@ -4,7 +4,7 @@ from swarms.prompts.tools import (
)
# PROMPTS
FLOW_SYSTEM_PROMPT_v2 = """
AGENT_SYSTEM_PROMPT_V2 = """
You are an elite autonomous agent operating within an autonomous loop structure.
Your primary function is to reliably complete user's tasks.
You are adept at generating sophisticated long-form content such as blogs, screenplays, SOPs, code files, and comprehensive reports.
@ -18,7 +18,6 @@ You are programmed to follow these rules:
4. Ignore context length and text limits, REMEMBER YOU ARE AN ELITE AUTONOMOUS AGENT
and can continue where you left off.
5. If the user doesn't specify an output format, intelligently select the best output format based on the task.
Take a deep breath.
"""
@ -67,15 +66,13 @@ def agent_system_prompt_2_v2(name: str):
# ORIGINAL PROMPTS
FLOW_SYSTEM_PROMPT = """
AGENT_SYSTEM_PROMPT_V1 = """
You are an autonomous agent granted autonomy in a autonomous loop structure.
Your role is to engage in multi-step conversations with your self or the user,
generate long-form content like blogs, screenplays, or SOPs,
and accomplish tasks bestowed by the user.
generate long-form content like blogs, screenplays and accomplish tasks set by the user.
You can have internal dialogues with yourself or can interact with the user
to aid in these complex tasks. Your responses should be coherent, contextually relevant, and tailored to the task at hand.
"""

@ -1,214 +0,0 @@
import json
from typing import List
from swarms.tools.tool import BaseTool
FINISH_NAME = "finish"
class SchemaGenerator:
"""A class for generating custom prompt strings.
Does this based on constraints, commands, resources, and performance evaluations.
Attributes:
constraints (List[str]): A list of constraints.
commands (List[BaseTool]): A list of commands.
resources (List[str]): A list of resources.
performance_evaluation (List[str]): A list of performance evaluations.
response_format (dict): A dictionary of the response format.
Examples:
>>> schema_generator = SchemaGenerator()
>>> schema_generator.add_constraint("No user assistance")
>>> schema_generator.add_resource("Internet access for searches and information gathering.")
>>> schema_generator.add_performance_evaluation("Continuously review and analyze your actions to ensure you are performing to the best of your abilities.")
>>> prompt_string = schema_generator.generate_prompt_string()
>>> print(prompt_string)
"""
def __init__(self) -> None:
"""Initialize the SchemaGenerator object.
Starts with empty lists of constraints, commands, resources,
and performance evaluations.
"""
self.constraints: List[str] = []
self.commands: List[BaseTool] = []
self.resources: List[str] = []
self.performance_evaluation: List[str] = []
self.response_format = {
"thoughts": {
"text": "thought",
"reasoning": "reasoning",
"plan": (
"- short bulleted\n- list that conveys\n-"
" long-term plan"
),
"criticism": "constructive self-criticism",
"speak": "thoughts summary to say to user",
},
"command": {
"name": "command name",
"args": {"arg name": "value"},
},
}
def add_constraint(self, constraint: str) -> None:
"""
Add a constraint to the constraints list.
Args:
constraint (str): The constraint to be added.
"""
self.constraints.append(constraint)
def add_tool(self, tool: BaseTool) -> None:
self.commands.append(tool)
def _generate_command_string(self, tool: BaseTool) -> str:
output = f"{tool.name}: {tool.description}"
output += f", args json schema: {json.dumps(tool.args)}"
return output
def add_resource(self, resource: str) -> None:
"""
Add a resource to the resources list.
Args:
resource (str): The resource to be added.
"""
self.resources.append(resource)
def add_performance_evaluation(self, evaluation: str) -> None:
"""
Add a performance evaluation item to the performance_evaluation list.
Args:
evaluation (str): The evaluation item to be added.
"""
self.performance_evaluation.append(evaluation)
def _generate_numbered_list(
self, items: list, item_type: str = "list"
) -> str:
"""
Generate a numbered list from given items based on the item_type.
Args:
items (list): A list of items to be numbered.
item_type (str, optional): The type of items in the list.
Defaults to 'list'.
Returns:
str: The formatted numbered list.
"""
if item_type == "command":
command_strings = [
f"{i + 1}. {self._generate_command_string(item)}"
for i, item in enumerate(items)
]
finish_description = (
"use this to signal that you have finished all your"
" objectives"
)
finish_args = (
'"response": "final response to let '
'people know you have finished your objectives"'
)
finish_string = (
f"{len(items) + 1}. {FINISH_NAME}: "
f"{finish_description}, args: {finish_args}"
)
return "\n".join(command_strings + [finish_string])
else:
return "\n".join(
f"{i+1}. {item}" for i, item in enumerate(items)
)
def generate_prompt_string(self) -> str:
"""Generate a prompt string.
Returns:
str: The generated prompt string.
"""
formatted_response_format = json.dumps(
self.response_format, indent=4
)
prompt_string = (
f"Constraints:\n{self._generate_numbered_list(self.constraints)}\n\nCommands:\n{self._generate_numbered_list(self.commands, item_type='command')}\n\nResources:\n{self._generate_numbered_list(self.resources)}\n\nPerformance"
f" Evaluation:\n{self._generate_numbered_list(self.performance_evaluation)}\n\nYou"
" should only respond in JSON format as described below"
" \nResponse Format:"
f" \n{formatted_response_format} \nEnsure the response"
" can be parsed by Python json.loads"
)
return prompt_string
def get_prompt(tools: List[BaseTool]) -> str:
"""Generates a prompt string.
It includes various constraints, commands, resources, and performance evaluations.
Returns:
str: The generated prompt string.
"""
# Initialize the SchemaGenerator object
schema_generator = SchemaGenerator()
# Add constraints to the SchemaGenerator object
schema_generator.add_constraint(
"~4000 word limit for short term memory. "
"Your short term memory is short, "
"so immediately save important information to files."
)
schema_generator.add_constraint(
"If you are unsure how you previously did something "
"or want to recall past events, "
"thinking about similar events will help you remember."
)
schema_generator.add_constraint("No user assistance")
schema_generator.add_constraint(
"Exclusively use the commands listed in double quotes e.g."
' "command name"'
)
# Add commands to the SchemaGenerator object
for tool in tools:
schema_generator.add_tool(tool)
# Add resources to the SchemaGenerator object
schema_generator.add_resource(
"Internet access for searches and information gathering."
)
schema_generator.add_resource("Long Term memory management.")
schema_generator.add_resource(
"GPT-3.5 powered Agents for delegation of simple tasks."
)
schema_generator.add_resource("File output.")
# Add performance evaluations to the SchemaGenerator object
schema_generator.add_performance_evaluation(
"Continuously review and analyze your actions "
"to ensure you are performing to the best of your abilities."
)
schema_generator.add_performance_evaluation(
"Constructively self-criticize your big-picture behavior"
" constantly."
)
schema_generator.add_performance_evaluation(
"Reflect on past decisions and strategies to refine your"
" approach."
)
schema_generator.add_performance_evaluation(
"Every command has a cost, so be smart and efficient. "
"Aim to complete tasks in the least number of steps."
)
# Generate the prompt string
prompt_string = schema_generator.generate_prompt_string()
return prompt_string

@ -1,8 +1,9 @@
import datetime
from pydantic import BaseModel, Field
from swarms.tools.tool import BaseTool
from swarms.tools.base_tool import BaseTool
from swarms.tools.tool_utils import scrape_tool_func_docs
from typing import List
from swarms.tools.base_tool import BaseTool
time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@ -15,7 +16,7 @@ class Thoughts(BaseModel):
class Command(BaseModel):
name: str = Field(..., title="Command Name")
args: dict = Field({}, title="Command Arguments")
parameters: dict = Field({}, title="Command Arguments")
class ResponseFormat(BaseModel):
@ -30,14 +31,9 @@ tool_usage_browser = """
```json
{
"thoughts": {
"text": "To check the weather in Miami, I will use the browser tool to search for 'Miami weather'.",
"reasoning": "The browser tool allows me to search the web, so I can look up the current weather conditions in Miami.",
"plan": "Use the browser tool to search Google for 'Miami weather'. Parse the result to get the current temperature, conditions, etc. and format that into a readable weather report."
},
"command": {
"functions": {
"name": "browser",
"args": {
"parameters": {
"query": "Miami weather"
}
}
@ -50,14 +46,9 @@ tool_usage_terminal = """
```json
{
"thoughts": {
"text": "To check the weather in Miami, I will use the browser tool to search for 'Miami weather'.",
"reasoning": "The browser tool allows me to search the web, so I can look up the current weather conditions in Miami.",
"plan": "Use the browser tool to search Google for 'Miami weather'. Parse the result to get the current temperature, conditions, etc. and format that into a readable weather report."
},
"command": {
"functions": {
"name": "terminal",
"args": {
"parameters": {
"code": "uptime"
}
}
@ -69,22 +60,16 @@ tool_usage_terminal = """
browser_and_terminal_tool = """
```
{
"thoughts": {
"text": "To analyze the latest stock market trends, I need to fetch current stock data and then process it using a script.",
"reasoning": "Using the browser tool to retrieve stock data ensures I have the most recent information. Following this, the terminal tool can run a script that analyzes this data to identify trends.",
"plan": "First, use the browser to get the latest stock prices. Then, use the terminal to execute a data analysis script on the fetched data."
},
"commands": [
"functions": [
{
"name": "browser",
"args": {
"parameters": {
"query": "download latest stock data for NASDAQ"
}
},
{
"name": "terminal",
"args": {
"parameters": {
"cmd": "python analyze_stocks.py"
}
}
@ -98,27 +83,22 @@ browser_and_terminal_tool = """
browser_and_terminal_tool_two = """
```
{
"thoughts": {
"text": "To prepare a monthly budget report, I need current expenditure data, process it, and calculate the totals and averages.",
"reasoning": "The browser will fetch the latest expenditure data. The terminal will run a processing script to organize the data, and the calculator will be used to sum up expenses and compute averages.",
"plan": "Download the data using the browser, process it with a terminal command, and then calculate totals and averages using the calculator."
},
"commands": [
"functions": [
{
"name": "browser",
"args": {
"parameters": {
"query": "download monthly expenditure data"
}
},
{
"name": "terminal",
"args": {
"parameters": {
"cmd": "python process_expenditures.py"
}
},
{
"name": "calculator",
"args": {
"parameters": {
"operation": "sum",
"numbers": "[output_from_process_expenditures]"
}
@ -142,13 +122,17 @@ def parse_tools(tools: List[BaseTool] = []):
# Function to generate the worker prompt
def tool_usage_worker_prompt(
current_time=time, tools: List[BaseTool] = []
current_time=time, tools: List[callable] = []
):
tool_docs = parse_tools(tools)
tool_docs = BaseTool(verbose=True, functions=tools)
prompt = f"""
**Date and Time**: {current_time}
You have been assigned a task that requires the use of various tools to gather information and execute commands.
Follow the instructions provided to complete the task effectively. This SOP is designed to guide you through the structured and effective use of tools.
By adhering to this protocol, you will enhance your productivity and accuracy in task execution.
### Constraints
- Only use the tools as specified in the instructions.
- Follow the command format strictly to avoid errors and ensure consistency.
@ -167,23 +151,23 @@ def tool_usage_worker_prompt(
1. **Browser**
- **Purpose**: To retrieve information from the internet.
- **Usage**:
- `{{"name": "browser", "args": {{"query": "search query here"}}}}`
- `{{"name": "browser", "parameters": {{"query": "search query here"}}}}`
- Example: Fetch current weather in London.
- Command: `{{"name": "browser", "args": {{"query": "London weather"}}}}`
- Command: `{{"name": "browser", "parameters": {{"query": "London weather"}}}}`
2. **Terminal**
- **Purpose**: To execute system commands.
- **Usage**:
- `{{"name": "terminal", "args": {{"cmd": "system command here"}}}}`
- `{{"name": "terminal", "parameters": {{"cmd": "system command here"}}}}`
- Example: Check disk usage on a server.
- Command: `{{"name": "terminal", "args": {{"cmd": "df -h"}}}}`
- Command: `{{"name": "terminal", "parameters": {{"cmd": "df -h"}}}}`
3. **Custom Tool** (if applicable)
- **Purpose**: Describe specific functionality.
- **Usage**:
- `{{"name": "custom_tool", "args": {{"parameter": "value"}}}}`
- `{{"name": "custom_tool", "parameters": {{"parameter": "value"}}}}`
- Example: Custom analytics tool.
- Command: `{{"name": "custom_tool", "args": {{"data": "analyze this data"}}}}`
- Command: `{{"name": "custom_tool", "parameters": {{"data": "analyze this data"}}}}`
### Usage Examples
@ -221,8 +205,6 @@ def tool_usage_worker_prompt(
{tool_docs}
This SOP is designed to guide you through the structured and effective use of tools.
By adhering to this protocol, you will enhance your productivity and accuracy in task execution.
"""
return prompt

@ -75,7 +75,7 @@ from swarms.structs.utils import (
find_token_in_text,
parse_tasks,
)
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.structs.rearrange import AgentRearrange, rearrange
from swarms.structs.yaml_model import (
get_type_name,
@ -109,7 +109,6 @@ __all__ = [
"MultiAgentCollaboration",
"MultiProcessWorkflow",
"MultiThreadedWorkflow",
"NonlinearWorkflow",
"Plan",
"RecursiveWorkflow",
"Artifact",
@ -156,4 +155,5 @@ __all__ = [
"pydantic_type_to_yaml_schema",
"YamlModel",
"MessagePool",
"rearrange"
]

@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import json
import logging
import os
@ -7,7 +8,6 @@ import sys
import time
import uuid
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import yaml
from loguru import logger
from pydantic import BaseModel
@ -15,25 +15,23 @@ from termcolor import colored
from swarms.memory.base_vectordb import BaseVectorDatabase
from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3
from swarms.prompts.aot_prompt import algorithm_of_thoughts_sop
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.prompts.worker_prompt import tool_usage_worker_prompt
from swarms.structs.conversation import Conversation
from swarms.structs.schemas import ManySteps, Step
from swarms.structs.yaml_model import YamlModel
from swarms.telemetry.user_utils import get_user_device_data
from swarms.tools.base_tool import BaseTool
from swarms.tools.code_interpreter import SubprocessCodeInterpreter
from swarms.tools.exec_tool import execute_tool_by_name
from swarms.tools.pydantic_to_json import (
base_model_to_openai_function,
multi_base_model_to_openai_function,
)
from swarms.tools.tool import BaseTool
from swarms.utils.data_to_text import data_to_text
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.prompts.aot_prompt import algorithm_of_thoughts_sop
# Utils
@ -183,7 +181,7 @@ class Agent:
agent_name: Optional[str] = "swarm-worker-01",
agent_description: Optional[str] = None,
system_prompt: Optional[str] = AGENT_SYSTEM_PROMPT_3,
tools: List[BaseTool] = [],
tools: List[BaseTool] = None,
dynamic_temperature_enabled: Optional[bool] = False,
sop: Optional[str] = None,
sop_list: Optional[List[str]] = None,
@ -213,27 +211,34 @@ class Agent:
logger_handler: Optional[Any] = sys.stderr,
search_algorithm: Optional[Callable] = None,
logs_to_filename: Optional[str] = None,
evaluator: Optional[Callable] = None,
evaluator: Optional[Callable] = None, # Custom LLM or agent
output_json: Optional[bool] = False,
stopping_func: Optional[Callable] = None,
custom_loop_condition: Optional[Callable] = None,
sentiment_threshold: Optional[float] = None,
sentiment_threshold: Optional[
float
] = None, # Evaluate on output using an external model
custom_exit_command: Optional[str] = "exit",
sentiment_analyzer: Optional[Callable] = None,
limit_tokens_from_string: Optional[Callable] = None,
# [Tools]
custom_tools_prompt: Optional[Callable] = None,
tool_schema: ToolUsageType = None,
output_type: agent_output_type = None,
function_calling_type: str = "json",
output_cleaner: Optional[Callable] = None,
function_calling_format_type: Optional[str] = "OpenAI",
list_tool_schemas: Optional[List[BaseModel]] = None,
list_base_models: Optional[List[BaseModel]] = None,
metadata_output_type: str = "json",
state_save_file_type: str = "json",
chain_of_thoughts: bool = False,
algorithm_of_thoughts: bool = False,
tree_of_thoughts: bool = False,
tool_choice: str = "auto",
execute_tool: bool = False,
rules: str = None,
planning: Optional[str] = False,
planning_prompt: Optional[str] = None,
*args,
**kwargs,
):
@ -299,13 +304,26 @@ class Agent:
self.function_calling_type = function_calling_type
self.output_cleaner = output_cleaner
self.function_calling_format_type = function_calling_format_type
self.list_tool_schemas = list_tool_schemas
self.list_base_models = list_base_models
self.metadata_output_type = metadata_output_type
self.state_save_file_type = state_save_file_type
self.chain_of_thoughts = chain_of_thoughts
self.algorithm_of_thoughts = algorithm_of_thoughts
self.tree_of_thoughts = tree_of_thoughts
self.tool_choice = tool_choice
self.execute_tool = execute_tool
self.planning = planning
self.planning_prompt = planning_prompt
# Name
self.name = agent_name
# Description
self.description = agent_description
# Agentic stuff
self.reply = ""
self.question = None
self.answer = ""
# The max_loops will be set dynamically if the dynamic_loop
if self.dynamic_loops:
@ -319,11 +337,16 @@ class Agent:
# If the user inputs a list of strings for the sop then join them and set the sop
if self.sop_list:
self.sop = "\n".join(self.sop_list)
self.short_memory.add(role=self.user_name, content=self.sop)
if self.sop is not None:
self.short_memory.add(role=self.user_name, content=self.sop)
# Memory
self.feedback = []
# Initialize the code executor
if self.code_interpreter is not False:
self.code_executor = SubprocessCodeInterpreter(
debug_mode=True,
)
@ -332,9 +355,15 @@ class Agent:
if preset_stopping_token is not None:
self.stopping_token = "<DONE>"
# If the stopping function is provided then set the stopping condition to the stopping function
# If the system prompt is provided then set the system prompt
# Initialize the short term memory
self.short_memory = Conversation(
system_prompt=system_prompt, time_enabled=True, *args, **kwargs
system_prompt=system_prompt,
time_enabled=True,
user=user_name,
rules=rules,
*args,
**kwargs,
)
# If the docs exist then ingest the docs
@ -353,16 +382,25 @@ class Agent:
# if verbose:
# logger.setLevel(logging.INFO)
if tools is not None:
self.tool_executor = BaseTool(
verbose=True,
auto_execute_tool=execute_tool,
functions=tools,
)
# If tools are provided then set the tool prompt by adding to sop
if self.tools:
if self.tools is not None:
if custom_tools_prompt is not None:
tools_prompt = custom_tools_prompt(tools=self.tools)
# Append the tools prompt to the short_term_memory
self.short_memory.add(
role=self.agent_name, content=tools_prompt
)
else:
# Default tool prompt
tools_prompt = tool_usage_worker_prompt(tools=self.tools)
# Append the tools prompt to the short_term_memory
@ -370,19 +408,6 @@ class Agent:
role=self.agent_name, content=tools_prompt
)
# If the long term memory is provided then set the long term memory prompt
# Agentic stuff
self.reply = ""
self.question = None
self.answer = ""
# Initialize the llm with the conditional variables
# self.llm = llm(*args, **kwargs)
# Step cache
self.step_cache = []
# Set the logger handler
if logger_handler:
logger.add(
@ -396,43 +421,52 @@ class Agent:
# logger.info("Creating Agent {}".format(self.agent_name))
# If the tool types
# If the tool types are provided
if self.tool_schema is not None:
logger.info("Tool schema provided")
tool_schema_str = self.tool_schema_to_str(self.tool_schema)
print(tool_schema_str)
# Add to the short memory
logger.info(f"Adding tool schema to memory: {tool_schema_str}")
# Log the tool schema
logger.info(
"Tool schema provided, Automatically converting to OpenAI function"
)
tool_schema_str = self.pydantic_model_to_json_str(
self.tool_schema, indent=4
)
logger.info(f"Tool Schema: {tool_schema_str}")
# Add the tool schema to the short memory
self.short_memory.add(
role=self.user_name, content=tool_schema_str
)
# If a list of tool schemas:
if self.list_tool_schemas is not None:
logger.info("Tool schema provided")
tool_schema_str = self.tool_schemas_to_str(list_tool_schemas)
# If a list of tool schemas is provided
if self.list_base_models is not None:
logger.info(
"List of tool schemas provided, Automatically converting to OpenAI function"
)
tool_schemas = multi_base_model_to_openai_function(
self.list_base_models
)
# Add to the short memory
logger.info(f"Adding tool schema to memory: {tool_schema_str}")
# Convert the tool schemas to a string
tool_schemas = json.dumps(tool_schemas, indent=4)
# Add the tool schema to the short memory
logger.info("Adding tool schema to short memory")
self.short_memory.add(
role=self.user_name, content=tool_schema_str
)
# Name
self.name = agent_name
# Description
self.description = agent_description
# If the algorithm of thoughts is enabled then set the sop to the algorithm of thoughts
if self.algorithm_of_thoughts is not None:
if self.algorithm_of_thoughts is not False:
self.short_memory.add(
role=self.agent_name,
content=algorithm_of_thoughts_sop(objective=self.task),
)
# Return the history
if return_history is True:
logger.info(f"Beginning of Agent {self.agent_name} History")
logger.info(self.short_memory.return_history_as_string())
logger.info(f"End of Agent {self.agent_name} History")
def set_system_prompt(self, system_prompt: str):
"""Set the system prompt"""
self.system_prompt = system_prompt
@ -630,13 +664,13 @@ class Agent:
print("\n")
def streaming(self, content: str = None):
"""prints each chunk of content as it is generated
"""Prints each letter of the content as it is generated.
Args:
content (str, optional): _description_. Defaults to None.
content (str, optional): The content to be streamed. Defaults to None.
"""
for chunk in content:
print(chunk, end="")
for letter in content:
print(letter, end="")
########################## FUNCTION CALLING ##########################
@ -652,8 +686,15 @@ class Agent:
"""Convert a JSON string to a dictionary"""
return json.loads(json_str)
def pydantic_model_to_json_str(self, model: BaseModel):
return str(base_model_to_openai_function(model))
def pydantic_model_to_json_str(
self, model: BaseModel, indent, *args, **kwargs
):
return json.dumps(
base_model_to_openai_function(model),
indent=indent,
*args,
**kwargs,
)
def dict_to_json_str(self, dictionary: dict):
"""Convert a dictionary to a JSON string"""
@ -710,37 +751,11 @@ class Agent:
########################## FUNCTION CALLING ##########################
def _history(self, user_name: str, task: str) -> str:
"""Generate the history for the history prompt
Args:
user_name (str): _description_
task (str): _description_
Returns:
str: _description_
"""
history = [f"{user_name}: {task}"]
return history
def _dynamic_prompt_setup(self, dynamic_prompt: str, task: str) -> str:
"""_dynamic_prompt_setup summary
Args:
dynamic_prompt (str): _description_
task (str): _description_
Returns:
str: _description_
"""
dynamic_prompt = dynamic_prompt or self.construct_dynamic_prompt()
combined_prompt = f"{dynamic_prompt}\n{task}"
return combined_prompt
def run(
self,
task: Optional[str] = None,
img: Optional[str] = None,
function_map: Dict[str, Callable] = None,
*args,
**kwargs,
):
@ -750,12 +765,15 @@ class Agent:
try:
self.activate_autonomous_agent()
if task:
# Check if the task is not None
if task is not None:
self.short_memory.add(role=self.user_name, content=task)
loop_count = 0
# Clear the short memory
# self.short_memory.clear()
response = None
step_pool = []
while (
self.max_loops == "auto"
@ -766,35 +784,84 @@ class Agent:
self.loop_count_print(loop_count, self.max_loops)
print("\n")
# Dynamic temperature
if self.dynamic_temperature_enabled:
self.dynamic_temperature()
# Task prompt
task_prompt = self.short_memory.return_history_as_string()
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if self.planning is not False:
plan = self.llm(self.planning_prompt)
# Add the plan to the memory
self.short_memory.add(
role=self.agent_name, content=plan
)
task_prompt = (
self.short_memory.return_history_as_string()
)
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
response = self.llm(*response_args, **kwargs)
# print(response)
# Print
print(response)
# Add the response to the memory
self.short_memory.add(
role=self.agent_name, content=response
)
if self.tools:
# Check if tools is not None
if self.tools is not None:
# Extract code from markdown
response = extract_code_from_markdown(response)
# Execute the tool by name
execute_tool_by_name(
# Execute the tool by name [OLD VERISON]
# execute_tool_by_name(
# response,
# self.tools,
# stop_token=self.stopping_token,
# )
# Try executing the tool
if self.execute_tool is not False:
try:
logger.info("Executing tool...")
# Execute the tool
out = self.tool_executor.execute_tool(
response,
self.tools,
stop_token=self.stopping_token,
function_map,
)
print(f"Tool Output: {out}")
# Add the output to the memory
self.short_memory.add(
role=self.agent_name,
content=out,
)
except Exception as error:
logger.error(
f"Error executing tool: {error}"
)
print(
colored(
f"Error executing tool: {error}",
"red",
)
)
if self.code_interpreter:
@ -821,6 +888,10 @@ class Agent:
**kwargs,
)
print(
f"Response after code interpretation: {response}"
)
if self.evaluator:
evaluated_response = self.evaluator(response)
print(
@ -856,10 +927,7 @@ class Agent:
content=sentiment,
)
if self.streaming:
self.streaming(response)
else:
print(response)
# print(response)
success = True # Mark as successful to exit the retry loop
@ -912,43 +980,30 @@ class Agent:
)
time.sleep(self.loop_interval)
# Save Step Metadata
active_step = Step(
task_id=task_id(),
step_id=loop_count,
name=task,
output=response,
max_loops=self.max_loops,
)
step_pool.append(active_step)
# Save the step pool
# self.step_cache = step_pool
if self.autosave:
logger.info("Autosaving agent state.")
self.save_state(self.saved_state_path, task)
# Apply the cleaner function to the response
if self.output_cleaner is not None:
logger.info("Applying output cleaner to response.")
response = self.output_cleaner(response)
logger.info(f"Response after output cleaner: {response}")
# Prepare the output for the output model
if self.output_type is not None:
logger.info("Preparing output for output model.")
response = self.prepare_output_for_output_model(response)
print(f"Response after output model: {response}")
# List of steps for this task
ManySteps(task_id=task_id(), steps=step_pool)
# Save Many steps
# print(response)
return response
except Exception as error:
print(f"Error running agent: {error}")
raise error
def __call__(self, task: str, img: str = None, *args, **kwargs):
def __call__(self, task: str = None, img: str = None, *args, **kwargs):
"""Call the agent
Args:
@ -956,45 +1011,10 @@ class Agent:
img (str, optional): _description_. Defaults to None.
"""
try:
self.run(task, img, *args, **kwargs)
return self.run(task, img, *args, **kwargs)
except Exception as error:
logger.error(f"Error calling agent: {error}")
raise
def agent_history_prompt(
self,
history: str = None,
):
"""
Generate the agent history prompt
Args:
system_prompt (str): The system prompt
history (List[str]): The history of the conversation
Returns:
str: The agent history prompt
"""
if self.sop:
system_prompt = self.system_prompt
agent_history_prompt = f"""
role: system
{system_prompt}
Follow this standard operating procedure (SOP) to complete tasks:
{self.sop}
{history}
"""
return agent_history_prompt
else:
system_prompt = self.system_prompt
agent_history_prompt = f"""
System : {system_prompt}
{history}
"""
return agent_history_prompt
raise error
def long_term_memory_prompt(self, query: str, *args, **kwargs):
"""
@ -1026,35 +1046,39 @@ class Agent:
logger.info(f"Adding memory: {message}")
return self.short_memory.add(role=self.agent_name, content=message)
async def run_concurrent(self, tasks: List[str], **kwargs):
async def run_concurrent(self, task: str, *args, **kwargs):
"""
Run a batch of tasks concurrently and handle an infinite level of task inputs.
Run a task concurrently.
Args:
tasks (List[str]): A list of tasks to run.
task (str): The task to run.
"""
try:
logger.info(f"Running concurrent tasks: {tasks}")
task_coroutines = [
self.run_async(task, **kwargs) for task in tasks
]
completed_tasks = await asyncio.gather(*task_coroutines)
logger.info(f"Completed tasks: {completed_tasks}")
return completed_tasks
logger.info(f"Running concurrent task: {task}")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.run, task, *args, **kwargs)
result = await asyncio.wrap_future(future)
logger.info(f"Completed task: {result}")
return result
except Exception as error:
print(
colored(
(
f"Error running agent: {error} while running"
" concurrently"
),
"red",
)
logger.error(
f"Error running agent: {error} while running concurrently"
)
def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
"""
Generate responses for multiple input sets.
Args:
inputs (List[Dict[str, Any]]): A list of input dictionaries containing the necessary data for each run.
Returns:
List[str]: A list of response strings generated for each input set.
Raises:
Exception: If an error occurs while running the bulk tasks.
"""
try:
"""Generate responses for multiple input sets."""
logger.info(f"Running bulk tasks: {inputs}")
return [self.run(**input_data) for input_data in inputs]
except Exception as error:
@ -1116,7 +1140,7 @@ class Agent:
print(colored("------------------------", "cyan"))
print(colored("End of Agent History", "cyan", attrs=["bold"]))
def step(self, task: str, **kwargs):
def step(self, task: str, *args, **kwargs):
"""
Executes a single step in the agent interaction, generating a response
@ -1133,9 +1157,9 @@ class Agent:
"""
try:
logger.info(f"Running a single step: {task}")
logger.info(f"Running a step: {task}")
# Generate the response using lm
response = self.llm(task, **kwargs)
response = self.llm(task, *args, **kwargs)
# Update the agent's history with the new interaction
if self.interactive:
@ -1294,7 +1318,6 @@ class Agent:
"autosave": self.autosave,
"saved_state_path": self.saved_state_path,
"max_loops": self.max_loops,
"StepCache": self.step_cache,
"Task": task,
"Stopping Token": self.stopping_token,
"Dynamic Loops": self.dynamic_loops,
@ -1339,7 +1362,7 @@ class Agent:
"function_calling_type": self.function_calling_type,
"output_cleaner": self.output_cleaner,
"function_calling_format_type": self.function_calling_format_type,
"list_tool_schemas": self.list_tool_schemas,
"list_base_models": self.list_base_models,
"metadata_output_type": self.metadata_output_type,
"user_meta_data": get_user_device_data(),
}

@ -1,231 +0,0 @@
import logging
from collections import defaultdict
from typing import Callable, Sequence
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
# Assuming the existence of an appropriate Agent class and logger setup
class AgentRearrange(BaseSwarm):
def __init__(
self,
agents: Sequence[Agent] = None,
verbose: bool = False,
custom_prompt: str = None,
callbacks: Sequence[Callable] = None,
*args,
**kwargs,
):
super().__init__()
if not all(isinstance(agent, Agent) for agent in agents):
raise ValueError(
"All elements must be instances of the Agent class."
)
self.agents = agents
self.verbose = verbose
self.custom_prompt = custom_prompt
self.callbacks = callbacks if callbacks is not None else []
self.flows = defaultdict(list)
def parse_pattern(self, pattern: str):
"""
Parse the interaction pattern to set up task flows, supporting both sequential
and concurrent executions within the same pattern.
"""
try:
self.flows.clear() # Ensure flows are reset each time pattern is parsed
# Split pattern into potentially concurrent flows
concurrent_flows = pattern.split(",")
for flow in concurrent_flows:
# Trim whitespace and identify sequential parts within each concurrent flow
parts = [part.strip() for part in flow.split("->")]
if len(parts) > 1:
# Link each part sequentially to the next as source -> destination
for i in range(len(parts) - 1):
source = parts[i]
destination = parts[i + 1]
# Validate and add each sequential link
if source not in [
agent.agent_name for agent in self.agents
]:
logging.error(
f"Source agent {source} not found."
)
return False
if destination not in [
agent.agent_name for agent in self.agents
]:
logging.error(
f"Destination agent {destination} not"
" found."
)
return False
self.flows[source].append(destination)
else:
# Handle single agent case if needed
self.flows[parts[0]] = []
return True
except Exception as e:
logging.error(f"Error parsing pattern: {e}")
return False
def self_find_agent_by_name(self, name: str):
for agent in self.agents:
if agent.agent_name == name:
return agent
return None
def agent_exists(self, name: str):
for agent in self.agents:
if agent.agent_name == name:
return True
return False
def parse_concurrent_flow(
self,
flow: str,
):
sequential_agents = flow.split("->")
for i, source_name in enumerate(sequential_agents[:-1]):
destination_name = sequential_agents[i + 1].strip()
self.parse_sequential_flow(
source_name.strip(), destination_name
)
def parse_sequential_flow(
self,
source: str,
destination: str,
):
if not self.self_find_agent_by_name(
source
) or not self.self_find_agent_by_name(destination):
return False
self.flows[source].append(destination)
def execute_task(
self,
dest_agent_name: str,
source: str,
task: str,
specific_tasks: dict,
):
dest_agent = self.self_find_agent_by_name(dest_agent_name)
if not dest_agent:
return None
task_to_run = specific_tasks.get(dest_agent_name, task)
if self.custom_prompt:
out = dest_agent.run(f"{task_to_run} {self.custom_prompt}")
else:
out = dest_agent.run(f"{task_to_run} (from {source})")
return out
def process_flows(self, pattern, default_task, specific_tasks):
if not self.parse_pattern(pattern):
return None
results = []
for source, destinations in self.flows.items():
if not destinations:
task = specific_tasks.get(source, default_task)
source_agent = self.self_find_agent_by_name(source)
if source_agent:
result = source_agent.run(task)
results.append(result)
else:
for destination in destinations:
task = specific_tasks.get(destination, default_task)
destination_agent = self.self_find_agent_by_name(
destination
)
if destination_agent:
result = destination_agent.run(task)
results.append(result)
return results
def __call__(
self,
pattern: str = None,
default_task: str = None,
**specific_tasks,
):
self.flows.clear() # Reset previous flows
results = self.process_flows(pattern, default_task, specific_tasks)
return results
# ## Initialize the workflow
# agent = Agent(
# agent_name="t",
# agent_description=(
# "Generate a transcript for a youtube video on what swarms"
# " are!"
# ),
# system_prompt=(
# "Generate a transcript for a youtube video on what swarms"
# " are!"
# ),
# llm=Anthropic(),
# max_loops=1,
# autosave=True,
# dashboard=False,
# streaming_on=True,
# verbose=True,
# stopping_token="<DONE>",
# )
# agent2 = Agent(
# agent_name="t1",
# agent_description=(
# "Generate a transcript for a youtube video on what swarms"
# " are!"
# ),
# llm=Anthropic(),
# max_loops=1,
# system_prompt="Summarize the transcript",
# autosave=True,
# dashboard=False,
# streaming_on=True,
# verbose=True,
# stopping_token="<DONE>",
# )
# agent3 = Agent(
# agent_name="t2",
# agent_description=(
# "Generate a transcript for a youtube video on what swarms"
# " are!"
# ),
# llm=Anthropic(),
# max_loops=1,
# system_prompt="Finalize the transcript",
# autosave=True,
# dashboard=False,
# streaming_on=True,
# verbose=True,
# stopping_token="<DONE>",
# )
# # Rearrange the agents
# rearrange = AgentRearrange(
# agents=[agent, agent2, agent3],
# verbose=True,
# # custom_prompt="Summarize the transcript",
# )
# # Run the workflow on a task
# results = rearrange(
# # pattern="t -> t1, t2 -> t2",
# pattern="t -> t1 -> t2",
# default_task=(
# "Generate a transcript for a YouTube video on what swarms"
# " are!"
# ),
# t="Generate a transcript for a YouTube video on what swarms are!",
# # t2="Summarize the transcript",
# # t3="Finalize the transcript",
# )
# # print(results)

@ -206,6 +206,33 @@ class BaseSwarm(ABC):
def plan(self, task: str):
"""agents must individually plan using a workflow or pipeline"""
def self_find_agent_by_name(self, name: str):
"""
Find an agent by its name.
Args:
name (str): The name of the agent to find.
Returns:
Agent: The Agent object if found, None otherwise.
"""
for agent in self.agents:
if agent.agent_name == name:
return agent
return None
def agent_exists(self, name: str):
"""
Check if an agent exists in the swarm.
Args:
name (str): The name of the agent to check.
Returns:
bool: True if the agent exists, False otherwise.
"""
return self.self_find_agent_by_name(name) is not None
def direct_message(
self,
message: str,

@ -10,15 +10,6 @@ from swarms.utils.loguru_logger import logger
class BaseWorkflow(BaseStructure):
"""
Base class for workflows.
Attributes:
task_pool (list): A list to store tasks.
"""
def __init__(
self,
agents: List[Agent] = None,

@ -1,148 +1,223 @@
import logging
from collections import defaultdict
from swarms import Agent
from typing import List
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
from swarms.structs.agent import Agent
from typing import Sequence, Callable
class AgentRearrange:
class AgentRearrange(BaseSwarm):
"""
A class representing a swarm of agents for rearranging tasks.
Attributes:
agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object.
flow (str): The flow pattern of the tasks.
Methods:
__init__(agents: List[Agent] = None, flow: str = None): Initializes the AgentRearrange object.
add_agent(agent: Agent): Adds an agent to the swarm.
remove_agent(agent_name: str): Removes an agent from the swarm.
add_agents(agents: List[Agent]): Adds multiple agents to the swarm.
validate_flow(): Validates the flow pattern.
run(task): Runs the swarm to rearrange the tasks.
"""
def __init__(
self,
agents: Sequence[Agent] = None,
verbose: bool = False,
custom_prompt: str = None,
callbacks: Sequence[Callable] = None,
*args,
**kwargs,
agents: List[Agent] = None,
flow: str = None,
max_loops: int = 1,
verbose: bool = True,
):
"""
Initialize the AgentRearrange class.
Initializes the AgentRearrange object.
Args:
agents (Sequence[Agent], optional): A sequence of Agent objects. Defaults to None.
verbose (bool, optional): Whether to enable verbose mode. Defaults to False.
custom_prompt (str, optional): A custom prompt string. Defaults to None.
callbacks (Sequence[Callable], optional): A sequence of callback functions. Defaults to None.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
if not all(isinstance(agent, Agent) for agent in agents):
raise ValueError(
"All elements must be instances of the Agent class."
)
self.agents = agents
agents (List[Agent], optional): A list of Agent objects. Defaults to None.
flow (str, optional): The flow pattern of the tasks. Defaults to None.
"""
self.agents = {agent.name: agent for agent in agents}
self.flow = flow
self.verbose = verbose
self.custom_prompt = custom_prompt
self.callbacks = callbacks if callbacks is not None else []
self.flows = defaultdict(list)
self.max_loops = max_loops
def parse_pattern(self, pattern: str):
if verbose is True:
logger.add("agent_rearrange.log")
def add_agent(self, agent: Agent):
"""
Parse the interaction pattern and setup task flows.
Adds an agent to the swarm.
Args:
pattern (str): The interaction pattern to parse.
agent (Agent): The agent to be added.
"""
logger.info(f"Adding agent {agent.name} to the swarm.")
self.agents[agent.name] = agent
def remove_agent(self, agent_name: str):
"""
Removes an agent from the swarm.
Args:
agent_name (str): The name of the agent to be removed.
"""
del self.agents[agent_name]
def add_agents(self, agents: List[Agent]):
"""
Adds multiple agents to the swarm.
Args:
agents (List[Agent]): A list of Agent objects.
"""
for agent in agents:
self.agents[agent.name] = agent
def validate_flow(self):
"""
Validates the flow pattern.
Raises:
ValueError: If the flow pattern is incorrectly formatted or contains duplicate agent names.
Returns:
bool: True if the pattern parsing is successful, False otherwise.
"""
try:
for flow in pattern.split(","):
parts = [part.strip() for part in flow.split("->")]
if len(parts) != 2:
logging.error(
f"Invalid flow pattern: {flow}. Each flow"
" must have exactly one '->'."
bool: True if the flow pattern is valid.
"""
if "->" not in self.flow:
raise ValueError(
"Flow must include '->' to denote the direction of the task."
)
return False
source_name, destinations_str = parts
source = self.find_agent_by_name(source_name)
if source is None:
logging.error(f"Source agent {source_name} not found.")
return False
destinations_names = destinations_str.split()
for dest_name in destinations_names:
dest = self.find_agent_by_name(dest_name)
if dest is None:
logging.error(
f"Destination agent {dest_name} not" " found."
agents_in_flow = []
tasks = self.flow.split("->")
for task in tasks:
agent_names = [name.strip() for name in task.split(",")]
for agent_name in agent_names:
if agent_name not in self.agents:
raise ValueError(
f"Agent '{agent_name}' is not registered."
)
agents_in_flow.append(agent_name)
if len(set(agents_in_flow)) != len(agents_in_flow):
raise ValueError(
"Duplicate agent names in the flow are not allowed."
)
return False
self.flows[source.agent_name].append(dest.agent_name)
print("Flow is valid.")
return True
except Exception as e:
logger.error(f"Error: {e}")
raise e
def self_find_agen_by_name(self, name: str):
def run(self, task: str, *args, **kwargs):
"""
Find an agent by its name.
Runs the swarm to rearrange the tasks.
Args:
name (str): The name of the agent to find.
task: The initial task to be processed.
Returns:
Agent: The Agent object if found, None otherwise.
str: The final processed task.
"""
for agent in self.agents:
if agent.agent_name == name:
return agent
return None
if not self.validate_flow():
return "Invalid flow configuration."
tasks = self.flow.split("->")
current_task = task
for task in tasks:
agent_names = [name.strip() for name in task.split(",")]
if len(agent_names) > 1:
# Parallel processing
logger.info(f"Running agents in parallel: {agent_names}")
results = []
for agent_name in agent_names:
agent = self.agents[agent_name]
result = agent.run(current_task, *args, **kwargs)
results.append(result)
current_task = "; ".join(results)
else:
# Sequential processing
logger.info(f"Running agents sequentially: {agent_names}")
agent = self.agents[agent_names[0]]
current_task = agent.run(current_task, *args, **kwargs)
def __call__(
self,
agents: Sequence[Agent] = None,
pattern: str = None,
task: str = None,
**tasks,
return current_task
def rearrange(
agents: List[Agent], flow: str, task: str = None, *args, **kwargs
):
"""
Execute the task based on the specified pattern.
Rearranges the given list of agents based on the specified flow.
Args:
agents (Sequence[Agent], optional): A sequence of Agent objects. Defaults to None.
pattern (str, optional): The interaction pattern to follow. Defaults to None.
task (str, optional): The task to execute. Defaults to None.
**tasks: Additional tasks specified as keyword arguments.
"""
try:
if agents:
self.flows.clear() # Reset previous flows
if not self.parse_pattern(pattern):
return # Pattern parsing failed
for source, destinations in self.flows.items():
for dest in destinations:
dest_agent = self.self_find_agen_by_name(dest)
task = tasks.get(dest, task)
if self.custom_prompt:
dest_agent.run(f"{task} {self.custom_prompt}")
else:
dest_agent.run(f"{task} (from {source})")
# else:
# raise ValueError(
# "No agents provided. Please provide agents to"
# " execute the task."
# )
except Exception as e:
logger.error(
f"Error: {e} try again by providing agents and" " pattern"
Parameters:
agents (List[Agent]): The list of agents to be rearranged.
flow (str): The flow used for rearranging the agents.
task (str, optional): The task to be performed during rearrangement. Defaults to None.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
The result of running the agent system with the specified task.
Example:
agents = [agent1, agent2, agent3]
flow = "agent1 -> agent2, agent3"
task = "Perform a task"
rearrange(agents, flow, task)
"""
agent_system = AgentRearrange(
agents=agents, flow=flow, *args, **kwargs
)
raise e
return agent_system.run(task, *args, **kwargs)
# # Initialize the director agent
# director = Agent(
# agent_name="Director",
# system_prompt="Directs the tasks for the workers",
# llm=Anthropic(),
# max_loops=1,
# dashboard=False,
# streaming_on=True,
# verbose=True,
# stopping_token="<DONE>",
# state_save_file_type="json",
# saved_state_path="director.json",
# )
# # Initialize worker 1
# worker1 = Agent(
# agent_name="Worker1",
# system_prompt="Generates a transcript for a youtube video on what swarms are",
# llm=Anthropic(),
# max_loops=1,
# dashboard=False,
# streaming_on=True,
# verbose=True,
# stopping_token="<DONE>",
# state_save_file_type="json",
# saved_state_path="worker1.json",
# )
# # Initialize worker 2
# worker2 = Agent(
# agent_name="Worker2",
# system_prompt="Summarizes the transcript generated by Worker1",
# llm=Anthropic(),
# max_loops=1,
# dashboard=False,
# streaming_on=True,
# verbose=True,
# stopping_token="<DONE>",
# state_save_file_type="json",
# saved_state_path="worker2.json",
# )
# # Example usage
# try:
# agents = [
# Agent(agent_name=f"b{i}") for i in range(1, 4)
# ] # Creating agents b1, b2, b3
# agents.append(Agent(agent_name="d")) # Adding agent d
# rearranger = Rearrange(agents)
# # Specifying a complex pattern for task execution
# rearranger.execute("d -> b1 b2 b3, b2 -> b3", "Analyze data")
# except ValueError as e:
# logging.error(e)
# flow = "Director -> Worker1 -> Worker2"
# agent_system = AgentRearrange(
# agents=[director, worker1, worker2], flow=flow
# )
# # Run the system
# output = agent_system.run(
# "Create a format to express and communicate swarms of llms in a structured manner for youtube"
# )

@ -1,107 +1,91 @@
from dataclasses import dataclass, field
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
import time
import json
from swarms.utils.loguru_logger import logger
from swarms.utils.try_except_wrapper import try_except_wrapper
from swarms.structs.base_workflow import BaseWorkflow
from pydantic import BaseModel, Field
from typing import List, Dict
from swarms.structs.agent import Agent
@dataclass
class SequentialWorkflow(BaseWorkflow):
name: str = "Sequential Workflow"
description: str = None
objective: str = None
max_loops: int = 1
autosave: bool = False
saved_state_filepath: Optional[str] = "sequential_workflow_state.json"
restore_state_filepath: Optional[str] = None
dashboard: bool = False
agent_pool: List[Agent] = field(default_factory=list)
# task_pool: List[str] = field(
# default_factory=list
# ) # List to store tasks
def __post_init__(self):
super().__init__()
self.conversation = Conversation(
time_enabled=True,
autosave=True,
class StepSequentialWorkflow(BaseModel):
agent_names: List[str] = Field(
..., description="List of agent names to include in the workflow."
)
# If objective exists then set it
if self.objective is not None:
self.conversation.system_prompt = self.objective
def workflow_bootup(self):
logger.info(f"{self.name} is activating...")
for agent in self.agent_pool:
logger.info(f"Agent {agent.agent_name} Activated")
@try_except_wrapper
def add(self, task: str, agent: Agent, *args, **kwargs):
self.agent_pool.append(agent)
# self.task_pool.append(
# task
# ) # Store tasks corresponding to each agent
return self.conversation.add(
role=agent.agent_name, content=task, *args, **kwargs
max_loops: int = Field(
1, description="Maximum number of loops to run the workflow."
)
def reset_workflow(self) -> None:
self.conversation = {}
@try_except_wrapper
def run(self):
if not self.agent_pool:
raise ValueError("No agents have been added to the workflow.")
self.workflow_bootup()
loops = 0
while loops < self.max_loops:
previous_output = None # Initialize to None; will hold the output of the previous agent
for i, agent in enumerate(self.agent_pool):
# Fetch the last task specific to this agent from the conversation history
tasks_for_agent = [
msg["content"]
for msg in self.conversation.conversation_history
if msg["role"] == agent.agent_name
]
task = tasks_for_agent[-1] if tasks_for_agent else None
if task is None and previous_output is not None:
# If no specific task for this agent, use the output from the previous agent
task = previous_output
if task is None:
# If no initial task is found, and there's no previous output, log error and skip this agent
logger.error(
f"No initial task found for agent {agent.agent_name}, and no previous output to use."
verbose: bool = Field(
False, description="Whether to log debug information."
)
continue
logger.info(
f" \n Agent {i+1} ({agent.agent_name}) is executing the task: {task} \n"
steps: Dict = Field(
...,
description="Dictionary of steps for the workflow with each agent and its parameters.",
)
time: str = Field(
time.strftime("%Y-%m-%d %H:%M:%S"),
description="Time of the workflow.",
)
# Space the log
output = agent.run(task)
if output is None:
logger.error(
f"Agent {agent.agent_name} returned None for task: {task}"
)
raise ValueError(
f"Agent {agent.agent_name} returned None."
# Define a class to handle the sequential workflow
class SequentialWorkflow(BaseWorkflow):
def __init__(
self,
agents: List[Agent] = None,
max_loops: int = 2,
verbose: bool = False,
*args,
**kwargs,
):
"""
Initializes a SequentialWorkflow with a list of agents.
:param agents: List of agents to include in the workflow.
"""
self.agents = agents
self.max_loops = max_loops
if verbose:
logger.add("sequential_workflow.log", level="DEBUG")
if not self.agents:
raise ValueError("No agents provided for workflow")
if not self.max_loops:
self.max_loops = 1
# Log all the agents in the workflow
logger.info(
f"Initialized SequentialWorkflow with agents: {json.dumps([str(agent.agent_name) for agent in self.agents])}"
)
# Update the conversation history with the new output using agent's role
self.conversation.add(
role=agent.agent_name, content=output
def run(self, task: str, *args, **kwargs):
"""
Run the workflow starting with an initial task.
:param task: The task to start the workflow.
"""
logger.info(f"Starting workflow with task: {task}")
current_output = task
for agent in self.agents:
count = 0
while count < self.max_loops:
try:
logger.info(f"Running agent {agent.agent_name}")
current_output = agent.run(
current_output, *args, **kwargs
)
previous_output = output # Update the previous_output to pass to the next agent
loops += 1
return self.conversation.return_history_as_string()
print(current_output)
count += 1
logger.debug(
f"Agent {agent.agent_name} completed loop {count} "
) # Log partial output for brevity
except Exception as e:
logger.error(
f"Error occurred while running agent {agent.agent_name}: {str(e)}"
)
raise
logger.info(f"Finished running agent {agent.agent_name}")
logger.info("Finished running workflow")
return current_output

@ -1,6 +1,6 @@
from typing import Dict, List, Sequence
from swarms.tools.tool import BaseTool
from swarms.tools.base_tool import BaseTool
from pydantic import BaseModel

@ -1,106 +0,0 @@
import json
from typing import List, Optional
from pydantic import model_validator, BaseModel, Field, Json
from swarms.structs.agent import Agent
from swarms.structs.task import Task
class Team(BaseModel):
"""
Class that represents a group of agents, how they should work together and
their tasks.
Attributes:
tasks (Optional[List[Task]]): List of tasks.
agents (Optional[List[Agent]]): List of agents in this Team.
architecture (str): Architecture that the Team will follow. Default is "sequential".
verbose (bool): Verbose mode for the Agent Execution. Default is False.
config (Optional[Json]): Configuration of the Team. Default is None.
"""
tasks: Optional[List[Task]] = Field(None, description="List of tasks")
agents: Optional[List[Agent]] = Field(
None, description="List of agents in this Team."
)
architecture = Field(
description="architecture that the Team will follow.",
default="sequential",
)
verbose: bool = Field(
description="Verbose mode for the Agent Execution",
default=False,
)
config: Optional[Json] = Field(
description="Configuration of the Team.", default=None
)
@model_validator(mode="before")
@classmethod
def check_config(_cls, values):
if not values.get("config") and (
not values.get("agents") and not values.get("tasks")
):
raise ValueError(
"Either agents and task need to be set or config."
)
if values.get("config"):
config = json.loads(values.get("config"))
if not config.get("agents") or not config.get("tasks"):
raise ValueError("Config should have agents and tasks.")
values["agents"] = [
Agent(**agent) for agent in config["agents"]
]
tasks = []
for task in config["tasks"]:
task_agent = [
agt
for agt in values["agents"]
if agt.role == task["agent"]
][0]
del task["agent"]
tasks.append(Task(**task, agent=task_agent))
values["tasks"] = tasks
return values
def run(self) -> str:
"""
Kickoff the Team to work on its tasks.
Returns:
output (List[str]): Output of the Team for each task.
"""
if self.architecture == "sequential":
return self.__sequential_loop()
def __sequential_loop(self) -> str:
"""
Loop that executes the sequential architecture.
Returns:
output (str): Output of the Team.
"""
task_outcome = None
for task in self.tasks:
# Add delegation tools to the task if the agent allows it
# if task.agent.allow_delegation:
# tools = AgentTools(agents=self.agents).tools()
# task.tools += tools
self.__log(f"\nWorking Agent: {task.agent.role}")
self.__log(f"Starting Task: {task.description} ...")
task_outcome = task.execute(task_outcome)
self.__log(f"Task output: {task_outcome}")
return task_outcome
def __log(self, message):
if self.verbose:
print(message)

@ -1,4 +1,3 @@
from swarms.tools.tool import BaseTool, Tool, StructuredTool, tool
from swarms.tools.exec_tool import (
AgentAction,
AgentOutputParser,
@ -32,14 +31,10 @@ from swarms.tools.py_func_to_openai_func_str import (
Function,
ToolFunction,
)
from swarms.tools.openai_tool_creator_decorator import create_openai_tool
from swarms.tools.openai_tool_creator_decorator import tool
from swarms.tools.base_tool import BaseTool
__all__ = [
"BaseTool",
"Tool",
"StructuredTool",
"tool",
"AgentAction",
"AgentOutputParser",
"BaseAgentOutputParser",
@ -63,5 +58,6 @@ __all__ = [
"get_required_params",
"Function",
"ToolFunction",
"create_openai_tool",
"tool",
"BaseTool",
]

@ -0,0 +1,379 @@
import json
from pydantic import BaseModel
from swarms.utils.loguru_logger import logger
from swarms.tools.py_func_to_openai_func_str import (
get_openai_function_schema_from_func,
load_basemodels_if_needed,
)
from swarms.tools.openai_tool_creator_decorator import openai_tool_executor
from typing import Callable, Optional, Any, Dict, List
from swarms.tools.pydantic_to_json import (
base_model_to_openai_function,
multi_base_model_to_openai_function,
function_to_str,
functions_to_str,
)
from swarms.tools.function_util import process_tool_docs
from typing import Union
ToolType = Union[BaseModel, Dict[str, Any], Callable[..., Any]]
class BaseTool(BaseModel):
"""
Base class for tools in the swarms package.
Attributes:
verbose (bool): Flag indicating whether to enable verbose mode.
functions (List[Callable[..., Any]]): List of functions associated with the tool.
base_models (List[type[BaseModel]]): List of base models associated with the tool.
Methods:
func_to_dict(function: Callable[..., Any], name: Optional[str] = None, description: str) -> Dict[str, Any]:
Converts a function to a dictionary representation.
load_params_from_func_for_pybasemodel(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Callable[..., Any]:
Loads parameters from a function for a Pydantic BaseModel.
base_model_to_dict(pydantic_type: type[BaseModel], output_str: bool = False, *args: Any, **kwargs: Any) -> dict[str, Any]:
Converts a Pydantic BaseModel to a dictionary representation.
multi_base_models_to_dict(pydantic_types: List[type[BaseModel]], *args: Any, **kwargs: Any) -> dict[str, Any]:
Converts multiple Pydantic BaseModels to a dictionary representation.
dict_to_str(dict: dict[str, Any]) -> str:
Converts a dictionary to a string representation.
multi_dict_to_str(dicts: list[dict[str, Any]]) -> str:
Converts multiple dictionaries to a string representation.
get_docs_from_callable(item) -> Any:
Retrieves documentation from a callable item.
"""
verbose: bool = False
functions: List[Callable[..., Any]] = []
base_models: List[type[BaseModel]] = []
verbose: bool = False
autocheck: bool = False
auto_execute_tool: Optional[bool] = False
def func_to_dict(
function: Callable[..., Any],
*,
name: Optional[str] = None,
description: str,
) -> Dict[str, Any]:
try:
return get_openai_function_schema_from_func(
function=function,
name=name,
description=description,
)
except Exception as e:
logger.error(f"An error occurred in func_to_dict: {e}")
logger.error(
"Please check the function and ensure it is valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def load_params_from_func_for_pybasemodel(
func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Callable[..., Any]:
try:
return load_basemodels_if_needed(func, *args, **kwargs)
except Exception as e:
logger.error(
f"An error occurred in load_params_from_func_for_pybasemodel: {e}"
)
logger.error(
"Please check the function and ensure it is valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def base_model_to_dict(
pydantic_type: type[BaseModel],
output_str: bool = False,
*args: Any,
**kwargs: Any,
) -> dict[str, Any]:
try:
return base_model_to_openai_function(
pydantic_type, output_str, *args, **kwargs
)
except Exception as e:
logger.error(f"An error occurred in base_model_to_dict: {e}")
logger.error(
"Please check the Pydantic type and ensure it is valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def multi_base_models_to_dict(
pydantic_types: List[type[BaseModel]],
*args: Any,
**kwargs: Any,
) -> dict[str, Any]:
try:
return multi_base_model_to_openai_function(
pydantic_types, *args, **kwargs
)
except Exception as e:
logger.error(
f"An error occurred in multi_base_models_to_dict: {e}"
)
logger.error(
"Please check the Pydantic types and ensure they are valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def dict_to_str(
dict: dict[str, Any],
) -> str:
try:
return function_to_str(dict)
except Exception as e:
logger.error(f"An error occurred in dict_to_str: {e}")
logger.error(
"Please check the dictionary and ensure it is valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def multi_dict_to_str(
dicts: list[dict[str, Any]],
) -> str:
try:
return functions_to_str(dicts)
except Exception as e:
logger.error(f"An error occurred in multi_dict_to_str: {e}")
logger.error(
"Please check the dictionaries and ensure they are valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def get_docs_from_callable(item):
try:
return process_tool_docs(item)
except Exception as e:
logger.error(f"An error occurred in get_docs: {e}")
logger.error("Please check the item and ensure it is valid.")
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def execute_tool(
self,
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
*args: Any,
**kwargs: Any,
) -> Callable:
try:
return openai_tool_executor(
tools, function_map, self.verbose, *args, **kwargs
)
except Exception as e:
logger.error(f"An error occurred in execute_tool: {e}")
logger.error(
"Please check the tools and function map and ensure they are valid."
)
logger.error(
"If the issue persists, please seek further assistance."
)
raise
def detect_tool_input_type(input):
if isinstance(input, BaseModel):
return "Pydantic"
elif isinstance(input, dict):
return "Dictionary"
elif callable(input):
return "Function"
else:
return "Unknown"
def dynamic_run(self, input) -> str:
"""
Executes the dynamic run based on the input type.
Args:
input: The input to be processed.
Returns:
str: The result of the dynamic run.
Raises:
None
"""
tool_input_type = self.detect_tool_input_type(input)
if tool_input_type == "Pydantic":
function_str = base_model_to_openai_function(input)
elif tool_input_type == "Dictionary":
function_str = function_to_str(input)
elif tool_input_type == "Function":
function_str = get_openai_function_schema_from_func(input)
else:
return "Unknown tool input type"
if self.auto_execute_tool:
if tool_input_type == "Function":
# Add the function to the functions list
self.functions.append(input)
# Create a function map from the functions list
function_map = {func.__name__: func for func in self.functions}
# Execute the tool
return self.execute_tool(
tools=[function_str], function_map=function_map
)
else:
return function_str
def execute_tool_by_name(
tools: List[Dict[str, Any]],
tool_name: str,
function_map: Dict[str, Callable],
) -> Any:
"""
Search for a tool by name and execute it.
Args:
tools (List[Dict[str, Any]]): A list of tools. Each tool is a dictionary that includes a 'name' key.
tool_name (str): The name of the tool to execute.
function_map (Dict[str, Callable]): A dictionary that maps tool names to functions.
Returns:
The result of executing the tool.
Raises:
ValueError: If the tool with the specified name is not found.
TypeError: If the tool name is not mapped to a function in the function map.
"""
# Search for the tool by name
tool = next(
(tool for tool in tools if tool.get("name") == tool_name), None
)
# If the tool is not found, raise an error
if tool is None:
raise ValueError(f"Tool '{tool_name}' not found")
# Get the function associated with the tool
func = function_map.get(tool_name)
# If the function is not found, raise an error
if func is None:
raise TypeError(
f"Tool '{tool_name}' is not mapped to a function"
)
# Execute the tool
return func(**tool.get("parameters", {}))
def execute_tool_from_text(
text: str = None, function_map: Dict[str, Callable] = None
) -> Any:
"""
Convert a JSON-formatted string into a tool dictionary and execute the tool.
Args:
text (str): A JSON-formatted string that represents a tool. The string should be convertible into a dictionary that includes a 'name' key and a 'parameters' key.
function_map (Dict[str, Callable]): A dictionary that maps tool names to functions.
Returns:
The result of executing the tool.
Raises:
ValueError: If the tool with the specified name is not found.
TypeError: If the tool name is not mapped to a function in the function map.
"""
# Convert the text into a dictionary
tool = json.loads(text)
# Get the tool name and parameters from the dictionary
tool_name = tool.get("name")
tool_params = tool.get("parameters", {})
# Get the function associated with the tool
func = function_map.get(tool_name)
# If the function is not found, raise an error
if func is None:
raise TypeError(
f"Tool '{tool_name}' is not mapped to a function"
)
# Execute the tool
return func(**tool_params)
# # Example function definitions and mappings
# def get_current_weather(location, unit='celsius'):
# return f"Weather in {location} is likely sunny and 75° {unit.title()}"
# def add(a, b):
# return a + b
# # Example tool configurations
# tools = [
# {
# "type": "function",
# "function": {
# "name": "get_current_weather",
# "parameters": {
# "properties": {
# "location": "San Francisco, CA",
# "unit": "fahrenheit",
# },
# },
# },
# },
# {
# "type": "function",
# "function": {
# "name": "add",
# "parameters": {
# "properties": {
# "a": 1,
# "b": 2,
# },
# },
# },
# }
# ]
# function_map = {
# "get_current_weather": get_current_weather,
# "add": add,
# }
# # Creating and executing the advanced executor
# tool_executor = BaseTool(verbose=True).execute_tool(tools, function_map)
# try:
# results = tool_executor()
# print(results) # Outputs results from both functions
# except Exception as e:
# print(f"Error: {e}")

@ -7,7 +7,7 @@ from typing import Dict, List, NamedTuple
from langchain.schema import BaseOutputParser
from pydantic import ValidationError
from swarms.tools.tool import BaseTool
from swarms.tools.base_tool import BaseTool
from swarms.utils.loguru_logger import logger

@ -1,12 +1,13 @@
from functools import wraps
import concurrent.futures
from typing import Callable, Any, Dict, List
from swarms.tools.py_func_to_openai_func_str import (
get_openai_function_schema_from_func,
)
from swarms.utils.loguru_logger import logger
def create_openai_tool(
def tool(
name: str = None,
description: str = None,
return_dict: bool = True,
@ -79,3 +80,314 @@ def create_openai_tool(
return wrapper
return decorator
def openai_tool_executor(
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
verbose: bool = True,
*args,
**kwargs,
) -> Callable:
"""
Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
in a list of tool dictionaries, with extensive error handling and validation.
Args:
tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
Returns:
Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
Examples:
>>> from swarms.tools.openai_tool_creator_decorator import openai_tool_executor
>>> from swarms.tools.py_func_to_openai_func_str import get_openai_function_schema_from_func
>>> from swarms.utils.loguru_logger import logger
>>>
>>> def test_function(param1: int, param2: str) -> str:
... return f"Test function called with parameters: {param1}, {param2}"
...
>>> @openai_tool_executor(
... tools=[
... {
... "type": "function",
... "function": {
... "name": "test_function",
... "parameters": {
... "properties": {
... "param1": {
... "type": "int",
... "description": "An integer parameter."
... },
... "param2": {
... "type": "str",
... "description": "A string parameter."
... },
... }
... }
... }
... }
... ],
... function_map={
... "test_function": test_function
... }
... )
... def tool_executor():
... pass
...
>>> results = tool_executor()
>>> logger.info(results)
"""
def tool_executor():
# Prepare tasks for concurrent execution
results = []
logger.info(f"Executing {len(tools)} tools concurrently.")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool in tools:
if tool.get("type") != "function":
continue # Skip non-function tool entries
function_info = tool.get("function", {})
func_name = function_info.get("name")
logger.info(f"Executing function: {func_name}")
# Check if the function name is mapped to an actual function
if func_name not in function_map:
raise KeyError(
f"Function '{func_name}' not found in function map."
)
# Validate parameters
params = function_info.get("parameters", {}).get(
"properties", {}
)
if not params:
raise ValueError(
f"No parameters specified for function '{func_name}'."
)
# Submit the function for execution
try:
future = executor.submit(
function_map[func_name], **params
)
futures.append(future)
except Exception as e:
print(
f"Failed to submit the function '{func_name}' for execution: {e}"
)
# Gather results from all futures
for future in futures:
try:
result = future.result() # Collect result from future
results.append(result)
except Exception as e:
print(f"Error during execution of a function: {e}")
logger.info(f"Results: {results}")
return results
return tool_executor
# def openai_tool_executor(
# tools: List[Dict[str, Any]],
# function_map: Dict[str, Callable],
# verbose: bool = True,
# concurrent_execution: bool = True,
# retry_on_error: bool = False,
# retry_attempts: int = 3,
# max_loops: int = 1,
# max_workers: int = 10,
# *args,
# **kwargs,
# ) -> Callable:
# """
# Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
# in a list of tool dictionaries, with extensive error handling and validation.
# Args:
# tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
# function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
# Returns:
# Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
# Examples:
# >>> from swarms.tools.openai_tool_creator_decorator import openai_tool_executor
# >>> from swarms.tools.py_func_to_openai_func_str import get_openai_function_schema_from_func
# >>> from swarms.utils.loguru_logger import logger
# >>>
# >>> def test_function(param1: int, param2: str) -> str:
# ... return f"Test function called with parameters: {param1}, {param2}"
# ...
# >>> @openai_tool_executor(
# ... tools=[
# ... {
# ... "type": "function",
# ... "function": {
# ... "name": "test_function",
# ... "parameters": {
# ... "properties": {
# ... "param1": {
# ... "type": "int",
# ... "description": "An integer parameter."
# ... },
# ... "param2": {
# ... "type": "str",
# ... "description": "A string parameter."
# ... },
# ... }
# ... }
# ... }
# ... }
# ... ],
# ... function_map={
# ... "test_function": test_function
# ... }
# ... )
# ... def tool_executor():
# ... pass
# ...
# >>> results = tool_executor()
# >>> logger.info(results)
# """
# def tool_executor():
# logger.info(
# f"Starting execution of tools with {max_loops} loops and concurrency set to {concurrent_execution}."
# )
# results = []
# def execute_function(func_name, params):
# try:
# logger.debug(
# f"Executing function: {func_name} with params: {params}"
# )
# return function_map[func_name](**params)
# except Exception as e:
# logger.error(
# f"Error executing function {func_name}: {str(e)}"
# )
# if retry_on_error:
# for attempt in range(retry_attempts):
# try:
# logger.debug(
# f"Retrying function: {func_name}, attempt {attempt+1}"
# )
# return function_map[func_name](**params)
# except Exception as e:
# logger.error(
# f"Retry {attempt+1} for function {func_name} failed: {str(e)}"
# )
# raise
# else:
# raise
# for loop in range(max_loops):
# logger.info(f"Executing loop {loop + 1}/{max_loops}")
# with concurrent.futures.ThreadPoolExecutor(
# max_workers=max_workers
# ) as executor:
# future_to_function = {
# executor.submit(
# execute_function,
# tool["function"]["name"],
# tool["function"]["parameters"]["properties"],
# ): tool
# for tool in tools
# if tool.get("type") == "function"
# }
# for future in concurrent.futures.as_completed(
# future_to_function
# ):
# try:
# result = future.result()
# results.append(result)
# logger.debug(
# f"Function completed with result: {result}"
# )
# except Exception as e:
# logger.error(
# f"Execution failed with error: {str(e)}"
# )
# continue
# logger.info(f"All loops completed. Results: {results}")
# return results
# return tool_executor
# # Example
# @tool(
# name="test_function",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function(param1: int, param2: str) -> str:
# return f"Test function called with parameters: {param1}, {param2}"
# @tool(
# name="test_function2",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function2(param1: int, param2: str) -> str:
# return f"Test function 2 called with parameters: {param1}, {param2}"
# # Example execution
# out = openai_tool_executor(
# tools=[
# {
# "type": "function",
# "function": {
# "name": "test_function",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# },
# },
# {
# "type": "function",
# "function": {
# "name": "test_function2",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# },
# },
# ],
# function_map={"test_function": test_function, "test_function2": test_function2},
# )
# print(out)

@ -51,7 +51,7 @@ def base_model_to_openai_function(
schema["description"] = docstring.short_description
else:
schema["description"] = (
f"Correctly extracted `{pydantic_type.__class__.__name__.lower()}` with all "
f"Correctly extracted `{pydantic_type.__name__}` with all "
f"the required parameters with correct types"
)
@ -61,11 +61,11 @@ def base_model_to_openai_function(
if output_str:
out = {
"function_call": {
"name": pydantic_type.__class__.__name__.lower(),
"name": pydantic_type.__name__,
},
"functions": [
{
"name": pydantic_type.__class__.__name__.lower(),
"name": pydantic_type.__name__,
"description": schema["description"],
"parameters": parameters,
},
@ -76,11 +76,11 @@ def base_model_to_openai_function(
else:
return {
"function_call": {
"name": pydantic_type.__class__.__name__.lower(),
"name": pydantic_type.__name__,
},
"functions": [
{
"name": pydantic_type.__class__.__name__.lower(),
"name": pydantic_type.__name__,
"description": schema["description"],
"parameters": parameters,
},

@ -3,7 +3,7 @@ import re
from typing import Any, List
from swarms.prompts.tools import SCENARIOS
from swarms.tools.tool import BaseTool
from swarms.tools.base_tool import BaseTool
import inspect
from typing import Callable

@ -0,0 +1,58 @@
import pytest
from agent_rearrange import AgentRearrange
# Mocking the Agent class
class MockAgent:
def __init__(self, agent_name):
self.agent_name = agent_name
def run(self, task):
return f"Running {task}"
# Test for AgentRearrange class
class TestAgentRearrange:
@pytest.fixture
def agent_rearrange(self):
agents = [MockAgent("agent1"), MockAgent("agent2")]
return AgentRearrange(agents=agents)
def test_parse_pattern(self, agent_rearrange):
assert agent_rearrange.parse_pattern("agent1->agent2") is True
assert agent_rearrange.parse_pattern("agent3->agent4") is False
def test_self_find_agent_by_name(self, agent_rearrange):
assert (
agent_rearrange.self_find_agent_by_name("agent1").agent_name
== "agent1"
)
assert agent_rearrange.self_find_agent_by_name("agent3") is None
def test_agent_exists(self, agent_rearrange):
assert agent_rearrange.agent_exists("agent1") is True
assert agent_rearrange.agent_exists("agent3") is False
def test_parse_concurrent_flow(self, agent_rearrange):
agent_rearrange.parse_concurrent_flow("agent1->agent2")
assert "agent2" in agent_rearrange.flows["agent1"]
def test_parse_sequential_flow(self, agent_rearrange):
agent_rearrange.parse_sequential_flow("agent1", "agent2")
assert "agent2" in agent_rearrange.flows["agent1"]
def test_execute_task(self, agent_rearrange):
assert (
agent_rearrange.execute_task("agent1", "agent2", "task1", {})
== "Running task1 (from agent2)"
)
def test_process_flows(self, agent_rearrange):
assert agent_rearrange.process_flows(
"agent1->agent2", "task1", {}
) == ["Running task1"]
def test_call(self, agent_rearrange):
assert agent_rearrange(
pattern="agent1->agent2", default_task="task1"
) == ["Running task1"]
Loading…
Cancel
Save