[CLEANUP][Agent][Docs][++Utils]

pull/480/head
Kye Gomez 8 months ago
parent 7a35e329f2
commit 25f3e5f61b

@ -118,16 +118,17 @@ nav:
- Overview: "swarms/index.md"
- DIY Build Your Own Agent: "diy_your_own_agent.md"
- Agents with Tools: "examples/tools_agent.md"
- Multi-Agent Orchestration: "swarms/structs/multi_agent_orchestration.md"
- swarms.models:
- How to Create A Custom Language Model: "swarms/models/custom_model.md"
- Deploying Azure OpenAI in Production A Comprehensive Guide: "swarms/models/azure_openai.md"
- Language:
- Language Models Available:
- BaseLLM: "swarms/models/base_llm.md"
- Overview: "swarms/models/index.md"
- HuggingFaceLLM: "swarms/models/huggingface.md"
- Anthropic: "swarms/models/anthropic.md"
- OpenAIChat: "swarms/models/openai.md"
- MultiModal:
- MultiModal Models Available:
- BaseMultiModalModel: "swarms/models/base_multimodal_model.md"
- Fuyu: "swarms/models/fuyu.md"
- Vilt: "swarms/models/vilt.md"
@ -142,10 +143,11 @@ nav:
- BaseStructure: "swarms/structs/basestructure.md"
- Task: "swarms/structs/task.md"
- YamlModel: "swarms/structs/yaml_model.md"
- BaseSwarm: "swarms/structs/base_swarm.md"
- BaseWorkflow: "swarms/structs/baseworkflow.md"
- Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md"
- BaseWorkflow: "swarms/structs/baseworkflow.md"
- Multi Agent Architectures:
- Conversation: "swarms/structs/conversation.md"
- SwarmNetwork: "swarms/structs/swarmnetwork.md"
@ -156,8 +158,7 @@ nav:
- Building Custom Vector Memory Databases with the BaseVectorDatabase Class: "swarms/memory/diy_memory.md"
- ShortTermMemory: "swarms/memory/short_term_memory.md"
- swarms.tools:
- Tool Decorator: "swarms/tools/decorator.md"
- BaseTool: "swarms/tools/base_tool.md"
- The Swarms Tool System Functions, Pydantic BaseModels as Tools, and Radical Customization: "swarms/tools/main.md"
- Guides:
- Agents:
- Building Custom Vector Memory Databases with the BaseVectorDatabase Class: "swarms/memory/diy_memory.md"

@ -0,0 +1,387 @@
# The Swarms Tool System: Functions, Pydantic BaseModels as Tools, and Radical Customization
This guide provides an in-depth look at the Swarms Tool System, focusing on its functions, the use of Pydantic BaseModels as tools, and the extensive customization options available. Aimed at developers, this documentation highlights how the Swarms framework works and offers detailed examples of creating and customizing tools and agents, specifically for accounting tasks.
The Swarms Tool System is a flexible and extensible component of the Swarms framework that allows for the creation, registration, and utilization of various tools. These tools can perform a wide range of tasks and are integrated into agents to provide specific functionalities. The system supports multiple ways to define tools, including using Pydantic BaseModels, functions, and dictionaries.
### Architecture
The architecture of the Swarms Tool System is designed to be highly modular. It consists of the following main components:
1. **Agents:** The primary entities that execute tasks.
2. **Tools:** Functions or classes that perform specific operations.
3. **Schemas:** Definitions of input and output data formats using Pydantic BaseModels.
### Key Concepts
#### Tools
Tools are the core functional units within the Swarms framework. They can be defined in various ways:
- **Pydantic BaseModels**: Tools can be defined using Pydantic BaseModels to ensure data validation and serialization.
- **Functions**: Tools can be simple or complex functions.
- **Dictionaries**: Tools can be represented as dictionaries for flexibility.
#### Agents
Agents utilize tools to perform tasks. They are configured with a set of tools and schemas, and they execute the tools based on the input they receive.
## Detailed Documentation
### Tool Definition
#### Using Pydantic BaseModels
Pydantic BaseModels provide a structured way to define tool inputs and outputs. They ensure data validation and serialization, making them ideal for complex data handling.
**Example:**
Define Pydantic BaseModels for accounting tasks:
```python
from pydantic import BaseModel
class CalculateTax(BaseModel):
income: float
class GenerateInvoice(BaseModel):
client_name: str
amount: float
date: str
class SummarizeExpenses(BaseModel):
expenses: list[dict]
```
Define tool functions using these models:
```python
def calculate_tax(data: CalculateTax) -> dict:
tax_rate = 0.3 # Example tax rate
tax = data.income * tax_rate
return {"income": data.income, "tax": tax}
def generate_invoice(data: GenerateInvoice) -> dict:
invoice = {
"client_name": data.client_name,
"amount": data.amount,
"date": data.date,
"invoice_id": "INV12345"
}
return invoice
def summarize_expenses(data: SummarizeExpenses) -> dict:
total_expenses = sum(expense['amount'] for expense in data.expenses)
return {"total_expenses": total_expenses}
```
#### Using Functions Directly
Tools can also be defined directly as functions without using Pydantic models. This approach is suitable for simpler tasks where complex validation is not required.
**Example:**
```python
def basic_tax_calculation(income: float) -> dict:
tax_rate = 0.25
tax = income * tax_rate
return {"income": income, "tax": tax}
```
#### Using Dictionaries
Tools can be represented as dictionaries, providing maximum flexibility. This method is useful when the tool's functionality is more dynamic or when integrating with external systems.
**Example:**
```python
basic_tool_schema = {
"name": "basic_tax_tool",
"description": "A basic tax calculation tool",
"parameters": {
"type": "object",
"properties": {
"income": {"type": "number", "description": "Income amount"}
},
"required": ["income"]
}
}
def basic_tax_tool(income: float) -> dict:
tax_rate = 0.2
tax = income * tax_rate
return {"income": income, "tax": tax}
```
### Tool Registration
Tools need to be registered with the agent for it to utilize them. This can be done by specifying the tools in the `tools` parameter during agent initialization.
**Example:**
```python
from swarms import Agent
from llama_hosted import llama3Hosted
# Define Pydantic BaseModels for accounting tasks
class CalculateTax(BaseModel):
income: float
class GenerateInvoice(BaseModel):
client_name: str
amount: float
date: str
class SummarizeExpenses(BaseModel):
expenses: list[dict]
# Define tool functions using these models
def calculate_tax(data: CalculateTax) -> dict:
tax_rate = 0.3
tax = data.income * tax_rate
return {"income": data.income, "tax": tax}
def generate_invoice(data: GenerateInvoice) -> dict:
invoice = {
"client_name": data.client_name,
"amount": data.amount,
"date": data.date,
"invoice_id": "INV12345"
}
return invoice
def summarize_expenses(data: SummarizeExpenses) -> dict:
total_expenses = sum(expense['amount'] for expense in data.expenses)
return {"total_expenses": total_expenses}
# Function to generate a tool schema for demonstration purposes
def create_tool_schema():
return {
"name": "execute",
"description": "Executes code on the user's machine",
"parameters": {
"type": "object",
"properties": {
"language": {
"type": "string",
"description": "Programming language",
"enum": ["python", "java"]
},
"code": {"type": "string", "description": "Code to execute"}
},
"required": ["language", "code"]
}
}
# Initialize the agent with the tools
agent = Agent(
agent_name="Accounting Agent",
system_prompt="This agent assists with various accounting tasks.",
sop_list=["Provide accurate and timely accounting services."],
llm=llama3Hosted(),
max_loops="auto",
interactive=True,
verbose=True,
tool_schema=BaseModel,
list_base_models=[
CalculateTax,
GenerateInvoice,
SummarizeExpenses
],
output_type=str,
metadata_output_type="json",
function_calling_format_type="OpenAI",
function_calling_type="json",
tools=[
calculate_tax,
generate_invoice,
summarize_expenses
],
list_tool_schemas_json=create_tool_schema(),
)
```
### Running the Agent
The agent can execute tasks using the `run` method. This method takes a prompt and determines the appropriate tool to use based on the input.
**Example:**
```python
# Example task: Calculate tax for an income
result = agent.run("Calculate the tax for an income of $50,000.")
print(f"Result: {result}")
# Example task: Generate an invoice
invoice_data = agent.run("Generate an invoice for John Doe for $1500 on 2024-06-01.")
print(f"Invoice Data: {invoice_data}")
# Example task: Summarize expenses
expenses = [
{"amount": 200.0, "description": "Office supplies"},
{"amount": 1500.0, "description": "Software licenses"},
{"amount": 300.0, "description": "Travel expenses"}
]
summary = agent.run("Summarize these expenses: " + str(expenses))
print(f"Expenses Summary: {summary}")
```
### Customizing Tools
Custom tools can be created to extend the functionality of the Swarms framework. This can include integrating external APIs, performing complex calculations, or handling specialized data formats.
**Example: Custom Accounting Tool**
```python
from pydantic import BaseModel
class CustomAccountingTool(BaseModel):
data: dict
def custom_accounting_tool(data: CustomAccountingTool) -> dict:
# Custom logic for the accounting tool
result = {
"status": "success",
"data_processed": len(data.data)
}
return result
# Register the custom tool with the agent
agent = Agent(
agent_name="Accounting Agent",
system_prompt="This agent assists with various accounting tasks.",
sop_list=["Provide accurate and timely accounting services."],
llm=llama3Hosted(),
max_loops="auto",
interactive=True,
verbose=True,
tool_schema=BaseModel,
list_base_models=[
CalculateTax,
GenerateInvoice,
SummarizeExpenses,
CustomAccountingTool
],
output_type=str,
metadata_output_type="json",
function_calling_format_type="OpenAI",
function_calling_type="json",
tools=[
calculate_tax,
generate_invoice,
summarize_expenses,
custom_accounting_tool
],
list_tool_schemas_json=create_tool_schema(),
)
```
### Advanced Customization
Advanced customization involves modifying the core components of the Swarms framework. This includes extending existing classes, adding new methods, or integrating third-party libraries.
**Example: Extending the Agent Class**
```python
from swarms import Agent
class AdvancedAccountingAgent(Agent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def custom_behavior(self):
print("Executing custom behavior")
def another_custom_method(self):
print("Another
custom method")
# Initialize the advanced agent
advanced_agent = AdvancedAccountingAgent(
agent_name="Advanced Accounting Agent",
system_prompt="This agent performs advanced accounting tasks.",
sop_list=["Provide advanced accounting services."],
llm=llama3Hosted(),
max_loops="auto",
interactive=True,
verbose=True,
tool_schema=BaseModel,
list_base_models=[
CalculateTax,
GenerateInvoice,
SummarizeExpenses,
CustomAccountingTool
],
output_type=str,
metadata_output_type="json",
function_calling_format_type="OpenAI",
function_calling_type="json",
tools=[
calculate_tax,
generate_invoice,
summarize_expenses,
custom_accounting_tool
],
list_tool_schemas_json=create_tool_schema(),
)
# Call custom methods
advanced_agent.custom_behavior()
advanced_agent.another_custom_method()
```
### Integrating External Libraries
You can integrate external libraries to extend the functionality of your tools. This is useful for adding new capabilities or leveraging existing libraries for complex tasks.
**Example: Integrating Pandas for Data Processing**
```python
import pandas as pd
from pydantic import BaseModel
class DataFrameTool(BaseModel):
data: list[dict]
def process_data_frame(data: DataFrameTool) -> dict:
df = pd.DataFrame(data.data)
summary = df.describe().to_dict()
return {"summary": summary}
# Register the tool with the agent
agent = Agent(
agent_name="Data Processing Agent",
system_prompt="This agent processes data frames.",
sop_list=["Provide data processing services."],
llm=llama3Hosted(),
max_loops="auto",
interactive=True,
verbose=True,
tool_schema=BaseModel,
list_base_models=[DataFrameTool],
output_type=str,
metadata_output_type="json",
function_calling_format_type="OpenAI",
function_calling_type="json",
tools=[process_data_frame],
list_tool_schemas_json=create_tool_schema(),
)
# Example task: Process a data frame
data = [
{"col1": 1, "col2": 2},
{"col1": 3, "col2": 4},
{"col1": 5, "col2": 6}
]
result = agent.run("Process this data frame: " + str(data))
print(f"Data Frame Summary: {result}")
```
## Conclusion
The Swarms Tool System provides a robust and flexible framework for defining and utilizing tools within agents. By leveraging Pydantic BaseModels, functions, and dictionaries, developers can create highly customized tools to perform a wide range of tasks. The extensive customization options allow for the integration of external libraries and the extension of core components, making the Swarms framework suitable for diverse applications.
This guide has covered the fundamental concepts and provided detailed examples to help you get started with the Swarms Tool System. With this foundation, you can explore and implement advanced features to build powerful

@ -84,7 +84,7 @@ def perplexity_agent(task: str = None, *args, **kwargs):
"""
out = fetch_web_articles_bing_api(
task,
subscription_key=None,
subscription_key="940fe346f0a149ea9f34d9969359aed7",
)
# Sources
@ -96,5 +96,7 @@ def perplexity_agent(task: str = None, *args, **kwargs):
return agent_response
out = perplexity_agent("What are the best ways to hold a cat?")
out = perplexity_agent(
"What are the biggest GPU chips alternatives for transformer modelsm, look up Etched"
)
print(out)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "5.0.8"
version = "5.0.9"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -23,12 +23,10 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
from swarms.structs.conversation import Conversation
from swarms.structs.yaml_model import YamlModel
from swarms.telemetry.user_utils import get_user_device_data
from swarms.tools.base_tool import BaseTool
from swarms.tools.prebuilt.code_interpreter import (
SubprocessCodeInterpreter,
)
from swarms.tools.pydantic_to_json import (
base_model_to_openai_function,
multi_base_model_to_openai_function,
)
from swarms.utils.data_to_text import data_to_text
@ -40,6 +38,10 @@ from swarms.tools.py_func_to_openai_func_str import (
from swarms.tools.func_calling_executor import openai_tool_executor
from swarms.structs.base_structure import BaseStructure
from swarms.prompts.tools import tool_sop_prompt
from swarms.tools.func_calling_utils import (
pydantic_model_to_json_str,
prepare_output_for_output_model,
)
# Utils
@ -72,6 +74,10 @@ def task_id():
return str(uuid.uuid4())
def exists(val):
return val is not None
# Step ID generator
def step_id():
return str(uuid.uuid1())
@ -249,6 +255,7 @@ class Agent(BaseStructure):
planning: Optional[str] = False,
planning_prompt: Optional[str] = None,
device: str = None,
custom_planning_prompt: str = None,
*args,
**kwargs,
):
@ -347,12 +354,6 @@ class Agent(BaseStructure):
# Memory
self.feedback = []
# Initialize the code executor
if self.code_interpreter is not False:
self.code_executor = SubprocessCodeInterpreter(
debug_mode=True,
)
# If the preset stopping token is enabled then set the stopping token to the preset stopping token
if preset_stopping_token is not None:
self.stopping_token = "<DONE>"
@ -369,7 +370,7 @@ class Agent(BaseStructure):
)
# If the docs exist then ingest the docs
if self.docs:
if self.docs is not None:
self.ingest_docs(self.docs)
# If docs folder exists then get the docs from docs folder
@ -381,7 +382,7 @@ class Agent(BaseStructure):
# self.truncate_history()
# If verbose is enabled then set the logger level to info
# if verbose:
# if verbose is not False:
# logger.setLevel(logging.INFO)
if tools is not None:
@ -389,15 +390,6 @@ class Agent(BaseStructure):
# Add the tool prompt to the memory
self.short_memory.add(role="System", content=tool_sop_prompt())
# # BaseTool
# self.base_tool = BaseTool(
# functions=tools,
# verbose=verbose,
# auto_execute_tool=execute_tool,
# autocheck=True,
# base_models=list_base_models,
# )
# Print number of tools
logger.info(f"Number of tools: {len(tools)}")
logger.info(
@ -430,27 +422,8 @@ class Agent(BaseStructure):
# Now create a function calling map for every tools
self.function_map = {tool.__name__: tool for tool in tools}
# # If tools are provided then set the tool prompt by adding to sop
# if self.tools is not None:
# if custom_tools_prompt is not None:
# tools_prompt = custom_tools_prompt(tools=self.tools)
# # Append the tools prompt to the short_term_memory
# self.short_memory.add(
# role=self.agent_name, content=tools_prompt
# )
# else:
# # Default tool prompt
# tools_prompt = tool_usage_worker_prompt(tools=self.tools)
# # Append the tools prompt to the short_term_memory
# self.short_memory.add(
# role=self.agent_name, content=tools_prompt
# )
# Set the logger handler
if logger_handler:
if exists(logger_handler):
logger.add(
f"{self.agent_name}.log",
level="INFO",
@ -460,15 +433,13 @@ class Agent(BaseStructure):
diagnose=True,
)
# logger.info("Creating Agent {}".format(self.agent_name))
# If the tool types are provided
if self.tool_schema is not None:
# Log the tool schema
logger.info(
"Tool schema provided, Automatically converting to OpenAI function"
)
tool_schema_str = self.pydantic_model_to_json_str(
tool_schema_str = pydantic_model_to_json_str(
self.tool_schema, indent=4
)
logger.info(f"Tool Schema: {tool_schema_str}")
@ -478,7 +449,7 @@ class Agent(BaseStructure):
)
# If a list of tool schemas is provided
if self.list_base_models is not None:
if exists(self.list_base_models):
logger.info(
"List of tool schemas provided, Automatically converting to OpenAI function"
)
@ -509,11 +480,11 @@ class Agent(BaseStructure):
logger.info(f"End of Agent {self.agent_name} History")
# If the user inputs a list of strings for the sop then join them and set the sop
if self.sop_list:
if exists(self.sop_list):
self.sop = "\n".join(self.sop_list)
self.short_memory.add(role=self.user_name, content=self.sop)
if self.sop is not None:
if exists(self.sop):
self.short_memory.add(role=self.user_name, content=self.sop)
# If the device is not provided then get the device data
@ -615,11 +586,13 @@ class Agent(BaseStructure):
# ############## TOKENIZER FUNCTIONS ##############
def add_message_to_memory(self, message: str):
def add_message_to_memory(self, message: str, *args, **kwargs):
"""Add the message to the memory"""
try:
logger.info(f"Adding message to memory: {message}")
self.short_memory.add(role=self.agent_name, content=message)
self.short_memory.add(
role=self.agent_name, content=message, *args, **kwargs
)
except Exception as error:
print(
colored(f"Error adding message to memory: {error}", "red")
@ -725,83 +698,6 @@ class Agent(BaseStructure):
########################## FUNCTION CALLING ##########################
def json_str_to_json(self, json_str: str):
"""Convert a JSON string to a JSON object"""
return json.loads(json_str)
def json_str_to_pydantic_model(self, json_str: str, model: BaseModel):
"""Convert a JSON string to a Pydantic model"""
return model.model_validate_json(json_str)
def json_str_to_dict(self, json_str: str):
"""Convert a JSON string to a dictionary"""
return json.loads(json_str)
def pydantic_model_to_json_str(
self, model: BaseModel, indent, *args, **kwargs
):
return json.dumps(
base_model_to_openai_function(model),
indent=indent,
*args,
**kwargs,
)
def dict_to_json_str(self, dictionary: dict):
"""Convert a dictionary to a JSON string"""
return json.dumps(dictionary)
def dict_to_pydantic_model(self, dictionary: dict, model: BaseModel):
"""Convert a dictionary to a Pydantic model"""
return model.model_validate_json(dictionary)
# def prep_pydantic_model_for_str(self, model: BaseModel):
# # Convert to Function
# out = self.pydantic_model_to_json_str(model)
# # return function_to_str(out)
def tool_schema_to_str(
self, tool_schema: BaseModel = None, *args, **kwargs
):
"""Convert a tool schema to a string"""
out = base_model_to_openai_function(tool_schema)
return str(out)
def tool_schemas_to_str(
self, tool_schemas: List[BaseModel] = None, *args, **kwargs
):
"""Convert a list of tool schemas to a string"""
out = multi_base_model_to_openai_function(tool_schemas)
return str(out)
def str_to_pydantic_model(self, string: str, model: BaseModel):
"""Convert a string to a Pydantic model"""
return model.model_validate_json(string)
def list_str_to_pydantic_model(
self, list_str: List[str], model: BaseModel
):
"""Convert a list of strings to a Pydantic model"""
# return model.model_validate_json(list_str)
for string in list_str:
return model.model_validate_json(string)
def prepare_output_for_output_model(
self, output: agent_output_type = None
):
"""Prepare the output for the output model"""
if self.output_type == BaseModel:
return self.str_to_pydantic_model(output, self.output_type)
elif self.output_type == dict:
return self.dict_to_json_str(output)
elif self.output_type == str:
return output
else:
return output
########################## FUNCTION CALLING ##########################
def run(
self,
task: Optional[str] = None,
@ -815,27 +711,22 @@ class Agent(BaseStructure):
try:
self.activate_autonomous_agent()
# Check if the task is not None
if task is not None:
# Add task to memory
self.short_memory.add(role=self.user_name, content=task)
# Set the loop count
loop_count = 0
# Clear the short memory
# self.short_memory.clear()
response = None
# response = None
while (
self.max_loops == "auto"
or loop_count < self.max_loops
# or self.custom_loop_condition()
):
while self.max_loops == "auto" or loop_count < self.max_loops:
loop_count += 1
self.loop_count_print(loop_count, self.max_loops)
print("\n")
# Dynamic temperature
if self.dynamic_temperature_enabled:
if self.dynamic_temperature_enabled is True:
self.dynamic_temperature()
# Task prompt
@ -845,17 +736,6 @@ class Agent(BaseStructure):
success = False
while attempt < self.retry_attempts and not success:
try:
if self.planning is not False:
plan = self.llm(self.planning_prompt)
# Add the plan to the memory
self.short_memory.add(
role=self.agent_name, content=plan
)
task_prompt = (
self.short_memory.return_history_as_string()
)
response_args = (
(task_prompt, *args)
@ -874,68 +754,10 @@ class Agent(BaseStructure):
# Check if tools is not None
if self.tools is not None:
self.parse_and_execute_tools(response)
# Extract json from markdown
response = extract_code_from_markdown(response)
# Try executing the tool
if self.execute_tool is not False:
try:
logger.info("Executing tool...")
# try to Execute the tool and return a string
out = openai_tool_executor(
tools=response,
function_map=self.function_map,
return_as_string=True,
)
print(f"Tool Output: {out}")
# Add the output to the memory
self.short_memory.add(
role=self.agent_name,
content=out,
)
except Exception as error:
logger.error(
f"Error executing tool: {error}"
)
print(
colored(
f"Error executing tool: {error}",
"red",
)
)
if self.code_interpreter:
# Extract code from markdown
extracted_code = extract_code_from_markdown(
response
)
# Execute the code
execution = SubprocessCodeInterpreter(
debug_mode=True
).run(extracted_code)
# Add the execution to the memory
self.short_memory.add(
role=self.agent_name,
content=execution,
)
# Run the llm again
response = self.llm(
self.short_memory.return_history_as_string(),
*args,
**kwargs,
)
print(
f"Response after code interpretation: {response}"
)
if exists(self.code_interpreter):
self.code_interpreter_execution(response)
if self.evaluator:
evaluated_response = self.evaluator(response)
@ -1037,7 +859,7 @@ class Agent(BaseStructure):
# Prepare the output for the output model
if self.output_type is not None:
# logger.info("Preparing output for output model.")
response = self.prepare_output_for_output_model(response)
response = prepare_output_for_output_model(response)
print(f"Response after output model: {response}")
# print(response)
@ -1060,6 +882,40 @@ class Agent(BaseStructure):
logger.error(f"Error calling agent: {error}")
raise error
def parse_and_execute_tools(self, response: str, *args, **kwargs):
# Extract json from markdown
response = extract_code_from_markdown(response)
# Try executing the tool
if self.execute_tool is not False:
try:
logger.info("Executing tool...")
# try to Execute the tool and return a string
out = openai_tool_executor(
tools=response,
function_map=self.function_map,
*args,
**kwargs,
)
print(f"Tool Output: {out}")
# Add the output to the memory
self.short_memory.add(
role=self.agent_name,
content=out,
)
except Exception as error:
logger.error(f"Error executing tool: {error}")
print(
colored(
f"Error executing tool: {error}",
"red",
)
)
def long_term_memory_prompt(self, query: str, *args, **kwargs):
"""
Generate the agent long term memory prompt
@ -1090,6 +946,27 @@ class Agent(BaseStructure):
logger.info(f"Adding memory: {message}")
return self.short_memory.add(role=self.agent_name, content=message)
def plan(self, task: str, *args, **kwargs):
"""
Plan the task
Args:
task (str): The task to plan
"""
try:
if exists(self.planning_prompt):
# Join the plan and the task
planning_prompt = f"{self.planning_prompt} {task}"
plan = self.llm(planning_prompt)
# Add the plan to the memory
self.short_memory.add(role=self.agent_name, content=plan)
return None
except Exception as error:
logger.error(f"Error planning task: {error}")
raise error
async def run_concurrent(self, task: str, *args, **kwargs):
"""
Run a task concurrently.
@ -1282,6 +1159,34 @@ class Agent(BaseStructure):
logger.info(f"Adding response filter: {filter_word}")
self.reponse_filters.append(filter_word)
def code_interpreter_execution(
self, code: str, *args, **kwargs
) -> str:
# Extract code from markdown
extracted_code = extract_code_from_markdown(code)
# Execute the code
execution = SubprocessCodeInterpreter(debug_mode=True).run(
extracted_code
)
# Add the execution to the memory
self.short_memory.add(
role=self.agent_name,
content=execution,
)
# Run the llm again
response = self.llm(
self.short_memory.return_history_as_string(),
*args,
**kwargs,
)
print(f"Response after code interpretation: {response}")
return response
def apply_reponse_filters(self, response: str) -> str:
"""
Apply the response filters to the response
@ -1639,12 +1544,19 @@ class Agent(BaseStructure):
if len(self.short_memory) > count:
self.short_memory = self.short_memory[:count]
def add_tool(self, tool: BaseTool):
def add_tool(self, tool: Callable):
return self.tools.append(tool)
def add_tools(self, tools: List[BaseTool]):
def add_tools(self, tools: List[Callable]):
return self.tools.extend(tools)
def remove_tool(self, tool: Callable):
return self.tools.remove(tool)
def remove_tools(self, tools: List[Callable]):
for tool in tools:
self.tools.remove(tool)
def get_docs_from_doc_folders(self):
"""Get the docs from the files"""
try:

@ -26,7 +26,10 @@ from swarms.tools.py_func_to_openai_func_str import (
from swarms.tools.openai_tool_creator_decorator import tool
from swarms.tools.base_tool import BaseTool
from swarms.tools.prebuilt import * # noqa: F403
from swarms.tools.cohere_func_call_schema import (
CohereFuncSchema,
ParameterDefinition,
)
__all__ = [
"BaseTool",
@ -48,4 +51,6 @@ __all__ = [
"parse_and_execute_tools",
"scrape_tool_func_docs",
"tool_find_by_name",
"CohereFuncSchema",
"ParameterDefinition",
]

@ -0,0 +1,16 @@
from pydantic import BaseModel, Field
from typing import Dict
class ParameterDefinition(BaseModel):
description: str = Field(..., title="Description of the parameter")
type: str = Field(..., title="Type of the parameter")
required: bool = Field(..., title="Is the parameter required?")
class CohereFuncSchema(BaseModel):
name: str = Field(..., title="Name of the tool")
description: str = Field(..., title="Description of the tool")
parameter_definitions: Dict[str, ParameterDefinition] = Field(
..., title="Parameter definitions for the tool"
)

@ -3,6 +3,113 @@ from typing import Callable, Any, Dict, List
from swarms.utils.loguru_logger import logger
# def openai_tool_executor(
# tools: List[Dict[str, Any]],
# function_map: Dict[str, Callable],
# verbose: bool = True,
# return_as_string: bool = False,
# *args,
# **kwargs,
# ) -> Callable:
# """
# Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
# in a list of tool dictionaries, with extensive error handling and validation.
# Args:
# tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
# function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
# verbose (bool): If True, enables verbose logging.
# return_as_string (bool): If True, returns the results as a concatenated string.
# Returns:
# Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
# Examples:
# >>> def test_function(param1: int, param2: str) -> str:
# ... return f"Test function called with parameters: {param1}, {param2}"
# >>> tool_executor = openai_tool_executor(
# ... tools=[
# ... {
# ... "type": "function",
# ... "function": {
# ... "name": "test_function",
# ... "parameters": {
# ... "param1": 1,
# ... "param2": "example"
# ... }
# ... }
# ... }
# ... ],
# ... function_map={
# ... "test_function": test_function
# ... },
# ... return_as_string=True
# ... )
# >>> results = tool_executor()
# >>> print(results)
# """
# def tool_executor():
# # Prepare tasks for concurrent execution
# results = []
# logger.info(f"Executing {len(tools)} tools concurrently.")
# with concurrent.futures.ThreadPoolExecutor() as executor:
# futures = []
# for tool in tools:
# if tool.get("type") != "function":
# continue # Skip non-function tool entries
# function_info = tool.get("function", {})
# func_name = function_info.get("name")
# logger.info(f"Executing function: {func_name}")
# # Check if the function name is mapped to an actual function
# if func_name not in function_map:
# error_message = f"Function '{func_name}' not found in function map."
# logger.error(error_message)
# results.append(error_message)
# continue
# # Validate parameters
# params = function_info.get("parameters", {})
# if not params:
# error_message = f"No parameters specified for function '{func_name}'."
# logger.error(error_message)
# results.append(error_message)
# continue
# # Submit the function for execution
# try:
# future = executor.submit(
# function_map[func_name], **params
# )
# futures.append((func_name, future))
# except Exception as e:
# error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
# logger.error(error_message)
# results.append(error_message)
# # Gather results from all futures
# for func_name, future in futures:
# try:
# result = future.result() # Collect result from future
# results.append(f"{func_name}: {result}")
# except Exception as e:
# error_message = f"Error during execution of function '{func_name}': {e}"
# logger.error(error_message)
# results.append(error_message)
# if return_as_string:
# return "\n".join(results)
# logger.info(f"Results: {results}")
# return results
# return tool_executor
def openai_tool_executor(
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
@ -11,67 +118,25 @@ def openai_tool_executor(
*args,
**kwargs,
) -> Callable:
"""
Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
in a list of tool dictionaries, with extensive error handling and validation.
Args:
tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
verbose (bool): If True, enables verbose logging.
return_as_string (bool): If True, returns the results as a concatenated string.
Returns:
Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
Examples:
>>> def test_function(param1: int, param2: str) -> str:
... return f"Test function called with parameters: {param1}, {param2}"
>>> tool_executor = openai_tool_executor(
... tools=[
... {
... "type": "function",
... "function": {
... "name": "test_function",
... "parameters": {
... "param1": 1,
... "param2": "example"
... }
... }
... }
... ],
... function_map={
... "test_function": test_function
... },
... return_as_string=True
... )
>>> results = tool_executor()
>>> print(results)
"""
def tool_executor():
# Prepare tasks for concurrent execution
results = []
logger.info(f"Executing {len(tools)} tools concurrently.")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool in tools:
if tool.get("type") != "function":
continue # Skip non-function tool entries
continue
function_info = tool.get("function", {})
func_name = function_info.get("name")
logger.info(f"Executing function: {func_name}")
# Check if the function name is mapped to an actual function
if func_name not in function_map:
error_message = f"Function '{func_name}' not found in function map."
logger.error(error_message)
results.append(error_message)
continue
# Validate parameters
params = function_info.get("parameters", {})
if not params:
error_message = f"No parameters specified for function '{func_name}'."
@ -79,7 +144,16 @@ def openai_tool_executor(
results.append(error_message)
continue
# Submit the function for execution
if "name" in params and params["name"] in function_map:
try:
result = function_map[params["name"]](**params)
results.append(f"{params['name']}: {result}")
except Exception as e:
error_message = f"Failed to execute the function '{params['name']}': {e}"
logger.error(error_message)
results.append(error_message)
continue
try:
future = executor.submit(
function_map[func_name], **params
@ -90,10 +164,9 @@ def openai_tool_executor(
logger.error(error_message)
results.append(error_message)
# Gather results from all futures
for func_name, future in futures:
try:
result = future.result() # Collect result from future
result = future.result()
results.append(f"{func_name}: {result}")
except Exception as e:
error_message = f"Error during execution of function '{func_name}': {e}"
@ -110,66 +183,49 @@ def openai_tool_executor(
return tool_executor
# # Example
# @tool(
# name="test_function",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function(param1: int, param2: str) -> str:
# return f"Test function called with parameters: {param1}, {param2}"
# @tool(
# name="test_function2",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function2(param1: int, param2: str) -> str:
# return f"Test function 2 called with parameters: {param1}, {param2}"
# # Example execution
# out = openai_tool_executor(
# tools=[
# {
# "type": "function",
# "function": {
# "name": "test_function",
# function_schema = {
# "name": "execute",
# "description": "Executes code on the user's machine **in the users local environment** and returns the output",
# "parameters": {
# "type": "object",
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# "language": {
# "type": "string",
# "description": "The programming language (required parameter to the `execute` function)",
# "enum": [
# # This will be filled dynamically with the languages OI has access to.
# ],
# },
# "code": {
# "type": "string",
# "description": "The code to execute (required)",
# },
# {
# "type": "function",
# "function": {
# "name": "test_function2",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# "required": ["language", "code"],
# },
# }
# },
# },
# },
# ],
# def execute(language: str, code: str):
# """
# Executes code on the user's machine **in the users local environment** and returns the output
# Args:
# language (str): The programming language (required parameter to the `execute` function)
# code (str): The code to execute (required)
# Returns:
# str: The output of the code execution
# """
# # This function will be implemented by the user
# return "Code execution not implemented yet"
# # Example execution
# out = openai_tool_executor(
# tools=[function_schema],
# function_map={
# "test_function": test_function,
# "test_function2": test_function2,
# "execute": execute,
# },
# return_as_string=True,
# )

@ -0,0 +1,101 @@
import json
from typing import List, Union, Dict
from pydantic import BaseModel
from swarms.tools.pydantic_to_json import (
base_model_to_openai_function,
multi_base_model_to_openai_function,
)
def json_str_to_json(json_str: str) -> dict:
"""Convert a JSON string to a JSON object"""
return json.loads(json_str)
def json_str_to_pydantic_model(
json_str: str, model: BaseModel
) -> BaseModel:
"""Convert a JSON string to a Pydantic model"""
return model.model_validate_json(json_str)
def json_str_to_dict(json_str: str) -> dict:
"""Convert a JSON string to a dictionary"""
return json.loads(json_str)
def pydantic_model_to_json_str(
model: BaseModel, indent: int, *args, **kwargs
) -> str:
return json.dumps(
base_model_to_openai_function(model),
indent=indent,
*args,
**kwargs,
)
def dict_to_json_str(dictionary: dict) -> str:
"""Convert a dictionary to a JSON string"""
return json.dumps(dictionary)
def dict_to_pydantic_model(
dictionary: dict, model: BaseModel
) -> BaseModel:
"""Convert a dictionary to a Pydantic model"""
return model.model_validate_json(dictionary)
# def prep_pydantic_model_for_str(model: BaseModel):
# # Convert to Function
# out = pydantic_model_to_json_str(model)
# # return function_to_str(out)
def tool_schema_to_str(
tool_schema: BaseModel = None, *args, **kwargs
) -> str:
"""Convert a tool schema to a string"""
out = base_model_to_openai_function(tool_schema)
return str(out)
def tool_schemas_to_str(
tool_schemas: List[BaseModel] = None, *args, **kwargs
) -> str:
"""Convert a list of tool schemas to a string"""
out = multi_base_model_to_openai_function(tool_schemas)
return str(out)
def str_to_pydantic_model(string: str, model: BaseModel) -> BaseModel:
"""Convert a string to a Pydantic model"""
return model.model_validate_json(string)
def list_str_to_pydantic_model(
list_str: List[str], model: BaseModel
) -> BaseModel:
"""Convert a list of strings to a Pydantic model"""
# return model.model_validate_json(list_str)
for string in list_str:
return model.model_validate_json(string)
def prepare_output_for_output_model(
output_type: Union[str, Dict, BaseModel],
output: Union[str, Dict, BaseModel] = None,
) -> Union[BaseModel, str]:
"""Prepare the output for the output model"""
if output_type == BaseModel:
return str_to_pydantic_model(output, output_type)
elif output_type == dict:
return dict_to_json_str(output)
elif output_type == str:
return output
else:
return output

@ -3,6 +3,7 @@ import subprocess
import threading
import time
import traceback
from swarms.utils.loguru_logger import logger
class SubprocessCodeInterpreter:
@ -24,8 +25,18 @@ class SubprocessCodeInterpreter:
self,
start_cmd: str = "python3",
debug_mode: bool = False,
max_retries: int = 3,
verbose: bool = False,
retry_count: int = 0,
*args,
**kwargs,
):
self.process = None
self.start_cmd = start_cmd
self.debug_mode = debug_mode
self.max_retries = max_retries
self.verbose = verbose
self.retry_count = retry_count
self.output_queue = queue.Queue()
self.done = threading.Event()
@ -80,6 +91,7 @@ class SubprocessCodeInterpreter:
if self.process:
self.terminate()
logger.info(f"Starting subprocess with command: {self.start_cmd}")
self.process = subprocess.Popen(
self.start_cmd.split(),
stdin=subprocess.PIPE,
@ -100,6 +112,8 @@ class SubprocessCodeInterpreter:
daemon=True,
).start()
return self.process
def run(self, code: str):
"""Run the code in the subprocess
@ -109,10 +123,9 @@ class SubprocessCodeInterpreter:
Yields:
_type_: _description_
"""
retry_count = 0
max_retries = 3
# Setup
logger.info("Running code in subprocess")
try:
code = self.preprocess_code(code)
if not self.process:
@ -121,7 +134,7 @@ class SubprocessCodeInterpreter:
yield {"output": traceback.format_exc()}
return
while retry_count <= max_retries:
while self.retry_count <= self.max_retries:
if self.debug_mode:
print(f"Running code:\n{code}\n---")
@ -132,22 +145,23 @@ class SubprocessCodeInterpreter:
self.process.stdin.flush()
break
except BaseException:
if retry_count != 0:
if self.retry_count != 0:
# For UX, I like to hide this if it happens once. Obviously feels better to not see errors
# Most of the time it doesn't matter, but we should figure out why it happens frequently with:
# applescript
yield {"output": traceback.format_exc()}
yield {
"output": (
"Retrying..." f" ({retry_count}/{max_retries})"
"Retrying..."
f" ({self.retry_count}/{self.max_retries})"
)
}
yield {"output": "Restarting process."}
self.start_process()
retry_count += 1
if retry_count > max_retries:
self.retry_count += 1
if self.retry_count > self.max_retries:
yield {
"output": (
"Maximum retries reached. Could not"
@ -209,8 +223,8 @@ class SubprocessCodeInterpreter:
# interpreter = SubprocessCodeInterpreter()
# interpreter.start_cmd = "python3"
# for output in interpreter.run("""
# out = interpreter.run("""
# print("hello")
# print("world")
# """):
# print(output)
# """)
# print(out)

Loading…
Cancel
Save