[CLEANUP] [New Tools System ]

pull/475/head
Kye Gomez 8 months ago
parent 36a092f6e6
commit b4f923f1fb

1
.gitignore vendored

@ -14,6 +14,7 @@ runs
chroma
Unit Testing Agent_state.json
Devin_state.json
json_logs
Medical Image Diagnostic Agent_state.json
D_state.json
swarms/__pycache__

@ -1,22 +0,0 @@
mkdocs
mkdocs-material
mkdocs-glightbox
mkdocs-git-authors-plugin
mkdocs-git-revision-date-plugin
mkdocs-git-committers-plugin
mkdocstrings
mike
mkdocs-jupyter
mkdocs-git-committers-plugin-2
mkdocs-git-revision-date-localized-plugin
mkdocs-redirects
mkdocs-material-extensions
mkdocs-simple-hooks
mkdocs-awesome-pages-plugin
mkdocs-versioning
mkdocs-mermaid2-plugin
mkdocs-include-markdown-plugin
mkdocs-enumerate-headings-plugin
mkdocs-autolinks-plugin
mkdocs-minify-html-plugin
mkdocs-autolinks-plugin

@ -0,0 +1,77 @@
"""
tool decorated func [search_api] -> agent which parses the docs of the tool func
-> injected into prompt -> agent will output json containing tool usage -> agent output will be parsed -> tool executed
-> terminal response can be returned to agent for self-healing
"""
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, llama3Hosted
# Load the environment variables
load_dotenv()
# Define a tool
def search_api(query: str, description: str):
"""Search the web for the query
Args:
query (str): _description_
Returns:
_type_: _description_
"""
return f"Search results for {query}"
def weather_api(
query: str,
):
"""_summary_
Args:
query (str): _description_
"""
print(f"Getting the weather for {query}")
def rapid_api(query: str):
"""_summary_
Args:
query (str): _description_
"""
print(f"Getting the weather for {query}")
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = llama3Hosted(
temperature=0.5,
)
## Initialize the workflow
agent = Agent(
agent_name="Research Agent",
llm=llm,
max_loops=3,
dashboard=True,
tools=[search_api, weather_api, rapid_api],
interactive=True,
execute_tool=True,
)
# Run the workflow on a task
out = agent.run("Use the weather tool in Miami")
print(out)

@ -7,7 +7,7 @@ from playground.demos.agentic_space_traffic_control.prompts import (
WEATHER_ANALYST_SYSTEM_PROMPT,
SPACE_TRAFFIC_CONTROLLER_SYS_PROMPT,
)
from playground.demos.agentic_space_traffic_control.tools import (
from tools import (
fetch_weather_data,
)
from swarms.tools import get_openai_function_schema_from_func

@ -1,39 +0,0 @@
import json
import requests
from typing import Dict, Any
def fetch_weather_data(city: str) -> Dict[str, Any]:
"""
Fetch near real-time weather data for a city using wttr.in.
Args:
city (str): The name of the city (e.g., "Austin, Tx").
Returns:
Dict[str, Any]: Weather data for the specified city.
Raises:
Exception: If the request fails or the response is invalid.
"""
url = f"http://wttr.in/{city}"
params = {"format": "j1"} # JSON format
try:
response = requests.get(url, params=params)
response.raise_for_status()
response = json.dumps(response.json(), indent=2)
return response
except requests.RequestException as e:
raise Exception(f"Failed to fetch weather data: {e}")
except ValueError:
raise Exception("Invalid response format.")
# # Example usage
# city = "Huntsville, AL"
# try:
# weather_data = fetch_weather_data(city)
# print("Weather Data:", weather_data)
# except Exception as e:
# print(e)

@ -0,0 +1,100 @@
"""
5 agent swarm
image of plant -> diagnoser [what plant is it?] -> disease detector [is it healthy?] -> treatment recommender [what should I do?] -> growth predictor [how will it grow?] -> planter [where should I plant it?] / Harvester [when should I harvest it?]
"""
def diagnoser_agent() -> str:
prompt = """
You are a Plant Diagnoser Agent. Your task is to accurately identify the plant species from the provided image.
You will receive an image of a plant, and you need to determine the specific type of plant it is.
Steps:
1. Analyze the given image.
2. Identify distinguishing features such as leaf shape, color, size, and any other relevant characteristics.
3. Use your plant identification database or model to match these features with a known plant species.
4. Provide a clear and specific identification of the plant species.
Output:
- Plant species identified with a high degree of accuracy.
- Provide any relevant information or characteristics that support your identification through a rigorous analysis of the image.
- Identify any potential challenges or ambiguities in the identification process and address them accordingly.
"""
return prompt
def disease_detector_agent() -> str:
prompt = """
You are the Disease Detector Agent.
Your task is to determine the health status of the identified plant.
You will receive an image of the plant and its identified species from the Diagnoser Agent.
Steps:
1. Analyze the given image with a focus on signs of disease or health issues.
2. Look for symptoms such as discoloration, spots, wilting, or any other abnormalities.
3. Cross-reference these symptoms with known diseases for the identified plant species.
4. Determine if the plant is healthy or diseased, and if diseased, identify the specific disease.
Output:
- Health status of the plant (Healthy or Diseased).
- If diseased, specify the disease and provide relevant confidence scores or supporting information.
- Provide a rigorous analysis of the image to support your diagnosis.
"""
return prompt
def treatment_recommender_agent() -> str:
prompt = """
You are the Treatment Recommender Agent.
Your task is to recommend appropriate treatments based on the plant's health status provided by the Disease Detector Agent.
You will receive the plant species, health status, and disease information.
Steps:
1. Analyze the health status and, if applicable, the specific disease affecting the plant.
2. Refer to your database or model of treatment options suitable for the identified plant species and its specific condition.
3. Determine the most effective treatment methods, considering factors such as severity of the disease, plant species, and environmental conditions.
4. Provide detailed treatment recommendations.
Output:
- Detailed treatment plan, including methods, materials, and steps.
- Any additional tips or considerations for optimal treatment.
"""
return prompt
def growth_predictor_agent() -> str:
prompt = """
You are the Growth Predictor Agent. Your task is to predict the future growth of the plant based on the current health status and treatment recommendations. You will receive the plant species, health status, and treatment plan.
Steps:
1. Analyze the current health status and the proposed treatment plan.
2. Use growth prediction models to forecast the plants growth trajectory.
3. Consider factors such as plant species, health improvements from treatment, environmental conditions, and typical growth patterns.
4. Provide a growth prediction timeline.
Output:
- Growth prediction, including key milestones and timeframes.
- Any assumptions or conditions that may affect the growth prediction.
"""
return prompt
def harvester_agent() -> str:
prompt = """
You are the Harvester Agent.
Your task is to recommend the optimal harvesting time based on the plants growth prediction.
You will receive the plant species and growth prediction timeline.
Steps:
1. Analyze the growth prediction and determine the optimal harvesting time.
2. Consider factors such as maturity, peak nutritional value, and market conditions.
3. Recommend the best time to harvest to ensure optimal quality and yield.
Output:
- Detailed harvesting time recommendation with justification.
"""
return prompt

@ -0,0 +1,117 @@
import os
from dotenv import load_dotenv
from playground.demos.plant_biologist_swarm.prompts import (
diagnoser_agent,
disease_detector_agent,
growth_predictor_agent,
harvester_agent,
treatment_recommender_agent,
)
from swarms import Agent, GPT4VisionAPI
# Load the OpenAI API key from the .env file
load_dotenv()
# Initialize the OpenAI API key
api_key = os.environ.get("OPENAI_API_KEY")
# llm = llm,
llm = GPT4VisionAPI(
max_tokens=4000,
)
# Initialize Diagnoser Agent
diagnoser_agent = Agent(
agent_name="Diagnoser Agent",
system_prompt=diagnoser_agent(),
llm=llm,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
# saved_state_path="diagnoser.json",
multi_modal=True,
autosave=True,
)
# Initialize Harvester Agent
harvester_agent = Agent(
agent_name="Harvester Agent",
system_prompt=harvester_agent(),
llm=llm,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
# saved_state_path="harvester.json",
multi_modal=True,
autosave=True,
)
# Initialize Growth Predictor Agent
growth_predictor_agent = Agent(
agent_name="Growth Predictor Agent",
system_prompt=growth_predictor_agent(),
llm=llm,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
# saved_state_path="growth_predictor.json",
multi_modal=True,
autosave=True,
)
# Initialize Treatment Recommender Agent
treatment_recommender_agent = Agent(
agent_name="Treatment Recommender Agent",
system_prompt=treatment_recommender_agent(),
llm=llm,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
# saved_state_path="treatment_recommender.json",
multi_modal=True,
autosave=True,
)
# Initialize Disease Detector Agent
disease_detector_agent = Agent(
agent_name="Disease Detector Agent",
system_prompt=disease_detector_agent(),
llm=llm,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
# saved_state_path="disease_detector.json",
multi_modal=True,
autosave=True,
)
agents = [
diagnoser_agent,
disease_detector_agent,
treatment_recommender_agent,
growth_predictor_agent,
harvester_agent,
]
task = "Conduct a diagnosis on the patient's symptoms."
img = "tomato.jpg"
loop = 0
for i in range(len(agents)):
if i == 0:
output = agents[i].run(task, img)
else:
output = agents[i].run(output, img)
# Add extensive logging for each agent
print(f"Agent {i+1} - {agents[i].agent_name}")
print("-----------------------------------")

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

@ -14,14 +14,12 @@ from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, OpenAIChat
from swarms.tools.tool import tool
# Load the environment variables
load_dotenv()
# Define a tool
@tool
def search_api(query: str, description: str):
"""Search the web for the query
@ -34,7 +32,6 @@ def search_api(query: str, description: str):
return f"Search results for {query}"
@tool
def weather_api(
query: str,
):
@ -46,7 +43,6 @@ def weather_api(
print(f"Getting the weather for {query}")
@tool
def rapid_api(query: str):
"""_summary_

@ -5,8 +5,10 @@ from dotenv import load_dotenv
from swarms import Agent, OpenAIChat
from swarms.agents.multion_agent import MultiOnAgent
from playground.memory.chroma_db import ChromaDB
from swarms.tools.tool import tool
from swarms.tools.code_interpreter import SubprocessCodeInterpreter
from swarms import tool
from swarms.tools.prebuilt.code_interpreter import (
SubprocessCodeInterpreter,
)
# Load the environment variables
load_dotenv()

@ -0,0 +1,34 @@
from swarms import Agent, llama3Hosted
from swarms.structs.swarm_load_balancer import AgentLoadBalancer
# Initialize the language model agent (e.g., GPT-3)
llm = llama3Hosted()
# Initialize agents for individual tasks
agent1 = Agent(
agent_name="Blog generator",
system_prompt="Generate a blog post like stephen king",
llm=llm,
max_loops=1,
dashboard=False,
tools=[],
)
agent2 = Agent(
agent_name="Summarizer",
system_prompt="Sumamrize the blog post",
llm=llm,
max_loops=1,
dashboard=False,
tools=[],
)
# Create the Sequential workflow
workflow = AgentLoadBalancer(
agents=[agent1, agent2],
max_loops=1,
)
# Run the workflow
workflow.run(
"Generate a blog post on how swarms of agents can help businesses grow."
)

@ -6,7 +6,7 @@ from dotenv import load_dotenv
from swarms import Agent, OpenAIChat
from playground.memory.chroma_db import ChromaDB
from swarms.prompts.visual_cot import VISUAL_CHAIN_OF_THOUGHT
from swarms.tools.tool import tool
from swarms import tool
# Loading environment variables from .env file
load_dotenv()

@ -3,7 +3,7 @@ import os
from dotenv import load_dotenv
from swarms import Agent, OpenAIChat
from swarms.tools.tool import tool
from swarms import tool
load_dotenv()

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "5.0.6"
version = "5.0.7"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,29 +0,0 @@
torch>=2.1.1,<3.0
transformers>=4.39.0,<5.0.0
asyncio>=3.4.3,<4.0
langchain-community==0.0.29
langchain-experimental==0.0.55
backoff==2.2.1
toml
pypdf==4.1.0
ratelimit==2.2.1
loguru==0.7.2
pydantic==2.7.1
tenacity==8.2.3
Pillow==10.3.0
psutil
sentry-sdk
python-dotenv
opencv-python-headless
PyYAML
docstring_parser==0.16
black>=23.1,<25.0
ruff>=0.0.249,<0.4.5
types-toml>=0.10.8.1
types-pytz>=2023.3,<2025.0
types-chardet>=5.0.4.6
mypy-protobuf>=3.0.0
pytest>=8.1.1
termcolor>=2.4.0
pandas>=2.2.2
fastapi>=0.110.1

@ -0,0 +1,25 @@
#!/bin/bash
# Find and delete all __pycache__ directories
find . -type d -name "__pycache__" -exec rm -r {} +
# Find and delete all .pyc files
find . -type f -name "*.pyc" -delete
# Find and delete all dist directories
find . -type d -name "dist" -exec rm -r {} +
# Find and delete all .ruff directories
find . -type d -name ".ruff" -exec rm -r {} +
# Find and delete all .egg-info directories
find . -type d -name "*.egg-info" -exec rm -r {} +
# Find and delete all .pyo files
find . -type f -name "*.pyo" -delete
# Find and delete all .pyd files
find . -type f -name "*.pyd" -delete
# Find and delete all .so files
find . -type f -name "*.so" -delete

@ -2,10 +2,10 @@ import os
import shutil
def cleanup_json_logs():
def cleanup_json_logs(name: str = None):
# Define the root directory and the target directory
root_dir = os.getcwd()
target_dir = os.path.join(root_dir, "artifacts5")
target_dir = os.path.join(root_dir, name)
# Create the target directory if it doesn't exist
if not os.path.exists(target_dir):
@ -31,4 +31,4 @@ def cleanup_json_logs():
# Call the function
cleanup_json_logs()
cleanup_json_logs("json_logs")

@ -1,7 +0,0 @@
#!/bin/bash
# Find and delete all __pycache__ directories
find . -type d -name "__pycache__" -exec rm -r {} +
# Find and delete all .pyc files
find . -type f -name "*.pyc" -delete

@ -127,3 +127,70 @@ def tools_prompt_prep(
"""
return PROMPT
def tool_sop_prompt() -> str:
return """
You've been granted tools to assist users by always providing outputs in JSON format for tool usage.
Whenever a tool usage is required, you must output the JSON wrapped inside markdown for clarity.
Provide a commentary on the tool usage and the user's request and ensure that the JSON output adheres to the tool's schema.
Here are some rules:
Do not ever use tools that do not have JSON schemas attached to them.
Do not use tools that you have not been granted access to.
Do not use tools that are not relevant to the task at hand.
Do not use tools that are not relevant to the user's request.
Here are the guidelines you must follow:
1. **Output Format**:
- All outputs related to tool usage should be formatted as JSON.
- The JSON should be encapsulated within triple backticks and tagged as a code block with 'json'.
2. **Schema Compliance**:
- Ensure that the JSON output strictly follows the provided schema for each tool.
- Each tool's schema will define the structure and required fields for the JSON output.
3. **Schema Example**:
If a tool named `example_tool` with a schema requires `param1` and `param2`, your response should look like:
```json
{
"type": "function",
"function": {
"name": "example_tool",
"parameters": {
"param1": 123,
"param2": "example_value"
}
}
}
```
4. **Error Handling**:
- If there is an error or the information provided by the user is insufficient to generate a valid JSON, respond with an appropriate error message in JSON format, also encapsulated in markdown.
Remember, clarity and adherence to the schema are paramount. Your primary goal is to ensure the user receives well-structured JSON outputs that align with the tool's requirements.
---
Here is the format you should always follow for your responses involving tool usage:
```json
{
"type": "function",
"function": {
"name": "<tool_name>",
"parameters": {
"param1": "<value1>",
"param2": "<value2>"
}
}
}
```
Please proceed with your task accordingly.
"""

@ -0,0 +1,115 @@
from swarms.utils.loguru_logger import logger
import re
import json
from pydantic import BaseModel, Field
from typing import List
from swarms.structs.agent import Agent
class HaSAgentSchema(BaseModel):
name: str = Field(
...,
title="Name of the agent",
description="Name of the agent",
)
system_prompt: str = (
Field(
...,
title="System prompt for the agent",
description="System prompt for the agent",
),
)
rules: str = Field(
...,
title="Rules",
description="Rules for the agent",
)
class HassSchema(BaseModel):
agents: List[HaSAgentSchema] = Field(
...,
title="List of agents to use for the problem",
description="List of agents to use for the problem",
)
# import json
def parse_json_from_input(input_str: str = None):
"""
Parses a JSON string from the input and returns the parsed data.
Args:
input_str (str): The input string containing the JSON.
Returns:
tuple: A tuple containing the parsed data. The tuple contains three elements:
- The plan extracted from the JSON.
- The agents extracted from the JSON.
- The rules extracted from the JSON.
If the input string is None or empty, or if the JSON decoding fails, all elements of the tuple will be None.
"""
# Validate input is not None or empty
if not input_str:
logger.info("Error: Input string is None or empty.")
return None, None, None
# Attempt to extract JSON from markdown using regular expression
json_pattern = re.compile(r"```json\n(.*?)\n```", re.DOTALL)
match = json_pattern.search(input_str)
json_str = match.group(1).strip() if match else input_str.strip()
# Attempt to parse the JSON string
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
logger.info(f"Error: JSON decoding failed with message '{e}'")
return None, None, None
hass_schema = HassSchema(**data)
return (hass_schema.agents,)
## [Create the agents]
def create_worker_agents(
agents: List[HassSchema],
*args,
**kwargs,
) -> List[Agent]:
"""
Create and initialize agents based on the provided AgentSchema objects.
Args:
agents (List[AgentSchema]): A list of AgentSchema objects containing agent information.
Returns:
List[Agent]: The initialized Agent objects.
"""
agent_list = []
for agent in agents:
name = agent.name
system_prompt = agent.system_prompt
logger.info(
f"Creating agent: {name} with system prompt:"
f" {system_prompt}"
)
out = Agent(
agent_name=name,
system_prompt=system_prompt,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
stopping_token="<DONE>",
*args,
**kwargs,
)
# Set the long term memory system of every agent to long term memory system
agent_list.append(out)
return agent_list

@ -11,7 +11,6 @@ from swarms.structs.base_workflow import BaseWorkflow
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.conversation import Conversation
from swarms.structs.groupchat import GroupChat, GroupChatManager
from swarms.structs.hiearchical_swarm import HiearchicalSwarm
from swarms.structs.majority_voting import (
MajorityVoting,
majority_voting,
@ -84,6 +83,10 @@ from swarms.structs.yaml_model import (
pydantic_type_to_yaml_schema,
)
# New Swarms
from swarms.structs.swarm_load_balancer import AgentLoadBalancer
from swarms.structs.hiearchical_swarm import HiearchicalSwarm
__all__ = [
"Agent",
"AgentJob",

@ -20,12 +20,13 @@ from swarms.prompts.aot_prompt import algorithm_of_thoughts_sop
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.prompts.worker_prompt import tool_usage_worker_prompt
from swarms.structs.conversation import Conversation
from swarms.structs.yaml_model import YamlModel
from swarms.telemetry.user_utils import get_user_device_data
from swarms.tools.base_tool import BaseTool
from swarms.tools.code_interpreter import SubprocessCodeInterpreter
from swarms.tools.prebuilt.code_interpreter import (
SubprocessCodeInterpreter,
)
from swarms.tools.pydantic_to_json import (
base_model_to_openai_function,
multi_base_model_to_openai_function,
@ -33,6 +34,12 @@ from swarms.tools.pydantic_to_json import (
from swarms.utils.data_to_text import data_to_text
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.tools.py_func_to_openai_func_str import (
get_openai_function_schema_from_func,
)
from swarms.tools.func_calling_executor import openai_tool_executor
from swarms.structs.base_structure import BaseStructure
from swarms.prompts.tools import tool_sop_prompt
# Utils
@ -76,7 +83,7 @@ ToolUsageType = Union[BaseModel, Dict[str, Any]]
# [FEAT][AGENT]
class Agent:
class Agent(BaseStructure):
"""
Agent is the backbone to connect LLMs with tools and long term memory. Agent also provides the ability to
ingest any type of docs like PDFs, Txts, Markdown, Json, and etc for the agent. Here is a list of features.
@ -182,7 +189,8 @@ class Agent:
agent_name: Optional[str] = "swarm-worker-01",
agent_description: Optional[str] = None,
system_prompt: Optional[str] = AGENT_SYSTEM_PROMPT_3,
tools: List[BaseTool] = None,
# TODO: Change to callable, then parse the callable to a string
tools: List[Callable] = None,
dynamic_temperature_enabled: Optional[bool] = False,
sop: Optional[str] = None,
sop_list: Optional[List[str]] = None,
@ -244,6 +252,7 @@ class Agent:
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.id = id
self.llm = llm
self.template = template
@ -376,31 +385,70 @@ class Agent:
# logger.setLevel(logging.INFO)
if tools is not None:
self.tool_executor = BaseTool(
verbose=True,
auto_execute_tool=execute_tool,
functions=tools,
# Add the tool prompt to the memory
self.short_memory.add(role="System", content=tool_sop_prompt())
# # BaseTool
# self.base_tool = BaseTool(
# functions=tools,
# verbose=verbose,
# auto_execute_tool=execute_tool,
# autocheck=True,
# base_models=list_base_models,
# )
# Print number of tools
logger.info(f"Number of tools: {len(tools)}")
logger.info(
"Tools provided, Automatically converting to OpenAI function"
)
# If tools are provided then set the tool prompt by adding to sop
if self.tools is not None:
if custom_tools_prompt is not None:
tools_prompt = custom_tools_prompt(tools=self.tools)
# Now the names of the tools
for tool in tools:
logger.info(f"Tool: {tool.__name__}")
# Append the tools prompt to the short_term_memory
self.short_memory.add(
role=self.agent_name, content=tools_prompt
# Transform the tools into an openai schema
for tool in tools:
# Transform the tool into a openai function calling schema
tool_schema_list = get_openai_function_schema_from_func(
tool,
name=tool.__name__,
description=tool.__doc__,
)
else:
# Default tool prompt
tools_prompt = tool_usage_worker_prompt(tools=self.tools)
# Transform the dictionary to a string
tool_schema_list = json.dumps(tool_schema_list, indent=4)
# print(tool_schema_list)
# Append the tools prompt to the short_term_memory
# Add the tool schema to the short memory
self.short_memory.add(
role=self.agent_name, content=tools_prompt
role="System", content=tool_schema_list
)
# Now create a function calling map for every tools
self.function_map = {tool.__name__: tool for tool in tools}
# # If tools are provided then set the tool prompt by adding to sop
# if self.tools is not None:
# if custom_tools_prompt is not None:
# tools_prompt = custom_tools_prompt(tools=self.tools)
# # Append the tools prompt to the short_term_memory
# self.short_memory.add(
# role=self.agent_name, content=tools_prompt
# )
# else:
# # Default tool prompt
# tools_prompt = tool_usage_worker_prompt(tools=self.tools)
# # Append the tools prompt to the short_term_memory
# self.short_memory.add(
# role=self.agent_name, content=tools_prompt
# )
# Set the logger handler
if logger_handler:
logger.add(
@ -468,6 +516,8 @@ class Agent:
if self.sop is not None:
self.short_memory.add(role=self.user_name, content=self.sop)
# If the device is not provided then get the device data
def set_system_prompt(self, system_prompt: str):
"""Set the system prompt"""
self.system_prompt = system_prompt
@ -756,7 +806,6 @@ class Agent:
self,
task: Optional[str] = None,
img: Optional[str] = None,
function_map: Dict[str, Callable] = None,
*args,
**kwargs,
):
@ -825,25 +874,20 @@ class Agent:
# Check if tools is not None
if self.tools is not None:
# Extract code from markdown
response = extract_code_from_markdown(response)
# Execute the tool by name [OLD VERISON]
# execute_tool_by_name(
# response,
# self.tools,
# stop_token=self.stopping_token,
# )
# Extract json from markdown
response = extract_code_from_markdown(response)
# Try executing the tool
if self.execute_tool is not False:
try:
logger.info("Executing tool...")
# Execute the tool
out = self.tool_executor.execute_tool(
response,
function_map,
# try to Execute the tool and return a string
out = openai_tool_executor(
tools=response,
function_map=self.function_map,
return_as_string=True,
)
print(f"Tool Output: {out}")
@ -947,9 +991,8 @@ class Agent:
break # Exit the loop if all retry attempts fail
# Check stopping conditions
if self.stopping_token is not None:
if self.stopping_token in response:
break
# if self.stopping_token in response:
# break
elif (
self.stopping_condition is not None
and self._check_stopping_condition(response)
@ -993,7 +1036,7 @@ class Agent:
# Prepare the output for the output model
if self.output_type is not None:
logger.info("Preparing output for output model.")
# logger.info("Preparing output for output model.")
response = self.prepare_output_for_output_model(response)
print(f"Response after output model: {response}")

@ -7,6 +7,7 @@ from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
from pydantic import BaseModel, Field
from swarms.structs.conversation import Conversation
class HiearchicalRequest(BaseModel):
@ -15,34 +16,87 @@ class HiearchicalRequest(BaseModel):
title="Task",
description="The task to send to the director agent.",
)
agents: Agent = Field(
agent_name: str = Field(
None,
title="Agents",
description="The list of agents in the hierarchical swarm.",
title="Agent Name",
description="The name of the agent to send the task to.",
)
rules: str = Field(
class HiearchicalRequestDict(BaseModel):
task: str = Field(
None,
title="Task",
description="The task to send to the director agent.",
)
agent_name: str = Field(
None,
title="Rules",
description="The rules for the hierarchical swarm.",
title="Agent Name",
description="The name of the agent to send the task to.",
)
class Config:
schema_extra = {
"example": {
"task": "task",
"agent_name": "agent_name",
}
}
"""
Boss -> json -> workers -> json -> Boss
Parse the JSON data and activate the selected agent.
parse -> execute
"""
class HiearchicalSwarm(BaseSwarm):
@beartype
"""
A class representing a hierarchical swarm.
Attributes:
name (str): The name of the hierarchical swarm.
description (str): The description of the hierarchical swarm.
director (Agent): The director agent of the hierarchical swarm.
agents (List[Agent]): The list of agents in the hierarchical swarm.
max_loops (int): The maximum number of loops to run the swarm.
long_term_memory_system (BaseSwarm): The long term memory system of the swarm.
custom_parse_function (callable): A custom parse function for the swarm.
Methods:
swarm_initialization(*args, **kwargs): Initializes the hierarchical swarm.
find_agent_by_name(agent_name: str = None, *args, **kwargs): Finds an agent in the swarm by name.
parse_function_activate_agent(json_data: str = None, *args, **kwargs): Parses JSON data and activates the selected agent.
select_agent_and_send_task(name: str = None, task: str = None, *args, **kwargs): Selects an agent and sends a task to them.
run(task: str = None, *args, **kwargs): Runs the hierarchical swarm.
"""
def __init__(
self,
name: str = None,
description: str = None,
director: Agent = None,
agents: List[Agent] = None,
max_loops: int = 1,
long_term_memory_system: BaseSwarm = None,
custom_parse_function: callable = None,
rules: str = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.name = name
self.description = description
self.director = director
self.agents = agents
self.max_loops = max_loops
self.long_term_memory_system = long_term_memory_system
self.custom_parse_function = custom_parse_function
# Set the director to max_one loop
if self.director.max_loops > 1:
@ -53,7 +107,66 @@ class HiearchicalSwarm(BaseSwarm):
for agent in agents:
agent.long_term_memory = long_term_memory_system
# Set the max loops of every agent to max loops
# Initialize the swarm
self.swarm_initialization()
# Initialize the conversation message pool
self.swarm_history = Conversation(
time_enabled=True,
)
def swarm_initialization(self, *args, **kwargs):
"""
Initializes the hierarchical swarm.
Args:
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
None
"""
logger.info(f"Initializing the hierarchical swarm: {self.name}")
logger.info(f"Purpose of this swarm: {self.description}")
# Now log number of agnets and their names
logger.info(f"Number of agents: {len(self.agents)}")
logger.info(
f"Agent names: {[agent.name for agent in self.agents]}"
)
# Now see if agents is not empty
if len(self.agents) == 0:
logger.info("No agents found. Please add agents to the swarm.")
return None
# Now see if director is not empty
if self.director is None:
logger.info(
"No director found. Please add a director to the swarm."
)
return None
logger.info(
f"Initialization complete for the hierarchical swarm: {self.name}"
)
def find_agent_by_name(self, agent_name: str = None, *args, **kwargs):
"""
Finds an agent in the swarm by name.
Args:
agent_name (str): The name of the agent to find.
Returns:
Agent: The agent with the specified name, or None if not found.
"""
for agent in self.agents:
if agent.name == agent_name:
return agent
return None
def parse_function_activate_agent(
self, json_data: str = None, *args, **kwargs
@ -156,7 +269,13 @@ class HiearchicalSwarm(BaseSwarm):
# Run the director
response = self.director.run(task, *args, **kwargs)
# Log the director's response
self.swarm_history.add(self.director.agent_name, response)
# Run agents
if self.custom_parse_function is not None:
response = self.custom_parse_function(response)
else:
response = self.parse_function_activate_agent(response)
loop += 1

@ -3,7 +3,7 @@ from threading import Lock
from time import sleep
from typing import Callable, List, Optional
from swarms import Agent
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
@ -57,6 +57,26 @@ class AgentLoadBalancer(BaseSwarm):
}
self.lock = Lock()
self.cooldown_time = cooldown_time
self.swarm_initialization()
def swarm_initialization(self):
logger.info(
"Initializing AgentLoadBalancer with the following agents:"
)
# Make sure all the agents exist
assert self.agents, "No agents provided to the Load Balancer"
# Assert that all agents are of type Agent
for agent in self.agents:
assert isinstance(
agent, Agent
), "All agents should be of type Agent"
for agent in self.agents:
logger.info(f"Agent Name: {agent.agent_name}")
logger.info("Load Balancer Initialized Successfully!")
def get_available_agent(self) -> Optional[Agent]:
"""
@ -72,6 +92,9 @@ class AgentLoadBalancer(BaseSwarm):
for agent in self.agents
if self.agent_status[agent.agent_name]
]
logger.info(
f"Available agents: {[agent.agent_name for agent in available_agents]}"
)
if not available_agents:
return None
return random.choice(available_agents)
@ -116,7 +139,7 @@ class AgentLoadBalancer(BaseSwarm):
for agent_name, stats in self.agent_performance.items():
logger.info(f"{agent_name}: {stats}")
def run_task(self, task: str, *args, **kwargs) -> str:
def run(self, task: str, *args, **kwargs) -> str:
"""
Run a single task using an available agent.
@ -174,7 +197,7 @@ class AgentLoadBalancer(BaseSwarm):
"""
results = []
for task in tasks:
result = self.run_task(task)
result = self.run(task)
results.append(result)
return results
@ -191,7 +214,7 @@ class AgentLoadBalancer(BaseSwarm):
"""
results = []
for _ in range(self.max_loops):
result = self.run_task(task)
result = self.run(task)
results.append(result)
return results
@ -207,7 +230,7 @@ class AgentLoadBalancer(BaseSwarm):
"""
try:
result = self.run_task(task)
result = self.run(task)
callback(result)
except Exception as e:
logger.error(f"Task failed: {e}")
@ -236,7 +259,7 @@ class AgentLoadBalancer(BaseSwarm):
def target():
try:
result[0] = self.run_task(task)
result[0] = self.run(task)
except Exception as e:
exception[0] = e

@ -0,0 +1,121 @@
import logging
import networkx as nx
import matplotlib.pyplot as plt
from typing import List, Tuple
from swarms import Agent
# Setup basic configuration for logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class AgentDFS:
"""
A DFS search class that uses a single Agent to generate and manually evaluate text states.
"""
def __init__(
self,
agent: Agent,
evaluator: Agent,
initial_prompt: str,
num_thoughts: int,
max_steps: int,
max_states: int,
pruning_threshold: float,
):
self.agent = agent
self.initial_prompt = initial_prompt
self.num_thoughts = num_thoughts
self.max_steps = max_steps
self.max_states = max_states
self.pruning_threshold = pruning_threshold
self.visited = {}
self.graph = nx.DiGraph()
def search(self) -> List[Tuple[str, float]]:
stack = [(self.initial_prompt, 0.0)]
self.graph.add_node(self.initial_prompt, score=0.0)
results = []
while stack and len(results) < self.max_steps:
current_prompt, _ = stack.pop()
logging.info(f"Generating from: {current_prompt}")
# Use agent to generate a response
out = self.agent.run(current_prompt)
# Retrieve and split generated text into segments (assuming `agent.response` holds the text)
generated_texts = self.split_into_thoughts(
out, self.num_thoughts
)
for text, score in generated_texts:
if score >= self.pruning_threshold:
stack.append((text, score))
results.append((text, score))
self.graph.add_node(text, score=score)
self.graph.add_edge(current_prompt, text)
logging.info(f"Added node: {text} with score: {score}")
results.sort(key=lambda x: x[1], reverse=True)
results = results[: self.max_states]
logging.info("Search completed")
return results
def split_into_thoughts(
self, text: str, num_thoughts: int
) -> List[Tuple[str, float]]:
"""Simulate the split of text into thoughts and assign random scores."""
import random
# Simple split based on punctuation or predefined length
thoughts = text.split(".")[:num_thoughts]
return [
(thought.strip(), random.random())
for thought in thoughts
if thought.strip()
]
def visualize(self):
pos = nx.spring_layout(self.graph, seed=42)
labels = {
node: f"{node[:15]}...: {self.graph.nodes[node]['score']:.2f}"
for node in self.graph.nodes()
}
nx.draw(
self.graph,
pos,
with_labels=True,
labels=labels,
node_size=7000,
node_color="skyblue",
font_size=8,
font_weight="bold",
edge_color="gray",
)
plt.show()
# Example usage setup remains the same as before
# Example usage setup remains the same as before, simply instantiate two agents: one for generation and one for evaluation
# # Example usage
# if __name__ == "__main__":
# load_dotenv()
# api_key = os.environ.get("OPENAI_API_KEY")
# llm = llama3Hosted(max_tokens=400)
# agent = Agent(llm=llm, max_loops=1, autosave=True, dashboard=True)
# dfs_agent = AgentDFS(
# agent=agent,
# initial_prompt="Explore the benefits of regular exercise.",
# num_thoughts=5,
# max_steps=20,
# max_states=10,
# pruning_threshold=0.3,
# )
# results = dfs_agent.search()
# dfs_agent.visualize()

@ -1,10 +1,3 @@
from swarms.tools.exec_tool import (
AgentAction,
AgentOutputParser,
BaseAgentOutputParser,
execute_tool_by_name,
preprocess_json_input,
)
from swarms.tools.tool_utils import (
execute_tools,
extract_tool_commands,
@ -12,14 +5,13 @@ from swarms.tools.tool_utils import (
scrape_tool_func_docs,
tool_find_by_name,
)
from swarms.tools.func_calling_executor import openai_tool_executor
from swarms.tools.pydantic_to_json import (
_remove_a_key,
base_model_to_openai_function,
multi_base_model_to_openai_function,
function_to_str,
functions_to_str,
)
from swarms.tools.openai_func_calling_schema import (
from swarms.tools.openai_func_calling_schema_pydantic import (
OpenAIFunctionCallSchema as OpenAIFunctionCallSchemaBaseModel,
)
from swarms.tools.py_func_to_openai_func_str import (
@ -33,31 +25,27 @@ from swarms.tools.py_func_to_openai_func_str import (
)
from swarms.tools.openai_tool_creator_decorator import tool
from swarms.tools.base_tool import BaseTool
from swarms.tools.prebuilt import * # noqa: F403
__all__ = [
"AgentAction",
"AgentOutputParser",
"BaseAgentOutputParser",
"execute_tool_by_name",
"preprocess_json_input",
"execute_tools",
"extract_tool_commands",
"parse_and_execute_tools",
"scrape_tool_func_docs",
"tool_find_by_name",
"_remove_a_key",
"base_model_to_openai_function",
"multi_base_model_to_openai_function",
"function_to_str",
"functions_to_str",
"OpenAIFunctionCallSchemaBaseModel",
"BaseTool",
"tool",
"Function",
"ToolFunction",
"get_openai_function_schema_from_func",
"load_basemodels_if_needed",
"get_load_param_if_needed_function",
"get_parameters",
"get_required_params",
"Function",
"ToolFunction",
"tool",
"BaseTool",
"OpenAIFunctionCallSchemaBaseModel",
"base_model_to_openai_function",
"multi_base_model_to_openai_function",
"_remove_a_key",
"openai_tool_executor",
"execute_tools",
"extract_tool_commands",
"parse_and_execute_tools",
"scrape_tool_func_docs",
"tool_find_by_name",
]

@ -5,14 +5,13 @@ from swarms.tools.py_func_to_openai_func_str import (
get_openai_function_schema_from_func,
load_basemodels_if_needed,
)
from swarms.tools.openai_tool_creator_decorator import openai_tool_executor
from swarms.tools.func_calling_executor import openai_tool_executor
from typing import Callable, Optional, Any, Dict, List
from swarms.tools.pydantic_to_json import (
base_model_to_openai_function,
multi_base_model_to_openai_function,
function_to_str,
functions_to_str,
)
from swarms.tools.func_to_str import function_to_str, functions_to_str
from swarms.tools.function_util import process_tool_docs
from typing import Union
@ -331,6 +330,40 @@ class BaseTool(BaseModel):
# Execute the tool
return func(**tool_params)
def check_str_for_functions_valid(
self, output: str, function_map: Dict[str, Callable]
):
"""
Check if the output is a valid JSON string, and if the function name in the JSON matches any name in the function map.
Args:
output (str): The output to check.
function_map (dict): A dictionary mapping function names to functions.
Returns:
bool: True if the output is valid and the function name matches, False otherwise.
"""
try:
# Parse the output as JSON
data = json.loads(output)
# Check if the output matches the schema
if (
data.get("type") == "function"
and "function" in data
and "name" in data["function"]
):
# Check if the function name matches any name in the function map
function_name = data["function"]["name"]
if function_name in function_map:
return True
except json.JSONDecodeError:
pass
return False
# # Example function definitions and mappings
# def get_current_weather(location, unit='celsius'):

@ -1,168 +0,0 @@
import json
import concurrent.futures
import re
from abc import abstractmethod
from typing import Dict, List, NamedTuple
from langchain.schema import BaseOutputParser
from pydantic import ValidationError
from swarms.tools.base_tool import BaseTool
from swarms.utils.loguru_logger import logger
class AgentAction(NamedTuple):
"""Action returned by AgentOutputParser."""
name: str
args: Dict
class BaseAgentOutputParser(BaseOutputParser):
"""Base Output parser for Agent."""
@abstractmethod
def parse(self, text: str) -> AgentAction:
"""Return AgentAction"""
def preprocess_json_input(input_str: str) -> str:
"""Preprocesses a string to be parsed as json.
Replace single backslashes with double backslashes,
while leaving already escaped ones intact.
Args:
input_str: String to be preprocessed
Returns:
Preprocessed string
"""
corrected_str = re.sub(
r'(?<!\\)\\(?!["\\/bfnrt]|u[0-9a-fA-F]{4})',
r"\\\\",
input_str,
)
return corrected_str
class AgentOutputParser(BaseAgentOutputParser):
"""Output parser for Agent."""
def parse(self, text: str) -> AgentAction:
try:
parsed = json.loads(text, strict=False)
except json.JSONDecodeError:
preprocessed_text = preprocess_json_input(text)
try:
parsed = json.loads(preprocessed_text, strict=False)
except Exception:
return AgentAction(
name="ERROR",
args={
"error": (f"Could not parse invalid json: {text}")
},
)
try:
return AgentAction(
name=parsed["command"]["name"],
args=parsed["command"]["args"],
)
except (KeyError, TypeError):
# If the command is null or incomplete, return an erroneous tool
return AgentAction(
name="ERROR",
args={"error": f"Incomplete command args: {parsed}"},
)
def execute_tool_by_name(
text: str,
tools: List[BaseTool],
stop_token: str = "finish",
):
"""
Executes a tool based on the given text command.
Args:
text (str): The text command to be executed.
tools (List[BaseTool]): A list of available tools.
stop_token (str, optional): The stop token to terminate the execution. Defaults to "finish".
Returns:
str: The result of the command execution.
"""
output_parser = AgentOutputParser()
# Get command name and arguments
action = output_parser.parse(text)
tools = {t.name: t for t in tools}
# logger.info(f"Tools available: {tools}")
if action.name == stop_token:
return action.args["response"]
if action.name in tools:
tool = tools[action.name]
try:
# Check if multiple tools are used
tool_names = [name for name in tools if name in text]
if len(tool_names) > 1:
# Execute tools concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool_name in tool_names:
logger.info(f"Executing tool: {tool_name}")
futures.append(
executor.submit(
tools[tool_name].run, action.args
)
)
# Wait for all futures to complete
concurrent.futures.wait(futures)
# Get results from completed futures
results = [
future.result()
for future in futures
if future.done()
]
# Process results
for result in results:
# Handle errors
if isinstance(result, Exception):
result = (
f"Error: {str(result)},"
f" {type(result).__name__}, args:"
f" {action.args}"
)
# Handle successful execution
else:
result = (
f"Command {tool.name} returned:"
f" {result}"
)
else:
observation = tool.run(action.args)
except ValidationError as e:
observation = (
f"Validation Error in args: {str(e)}, args:"
f" {action.args}"
)
except Exception as e:
observation = (
f"Error: {str(e)}, {type(e).__name__}, args:"
f" {action.args}"
)
result = f"Command {tool.name} returned: {observation}"
elif action.name == "ERROR":
result = f"Error: {action.args}. "
else:
result = (
f"Unknown command '{action.name}'. "
"Please refer to the 'COMMANDS' list for available "
"commands and only respond in the specified JSON format."
)
return result

@ -0,0 +1,176 @@
import concurrent.futures
from typing import Callable, Any, Dict, List
from swarms.utils.loguru_logger import logger
def openai_tool_executor(
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
verbose: bool = True,
return_as_string: bool = False,
*args,
**kwargs,
) -> Callable:
"""
Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
in a list of tool dictionaries, with extensive error handling and validation.
Args:
tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
verbose (bool): If True, enables verbose logging.
return_as_string (bool): If True, returns the results as a concatenated string.
Returns:
Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
Examples:
>>> def test_function(param1: int, param2: str) -> str:
... return f"Test function called with parameters: {param1}, {param2}"
>>> tool_executor = openai_tool_executor(
... tools=[
... {
... "type": "function",
... "function": {
... "name": "test_function",
... "parameters": {
... "param1": 1,
... "param2": "example"
... }
... }
... }
... ],
... function_map={
... "test_function": test_function
... },
... return_as_string=True
... )
>>> results = tool_executor()
>>> print(results)
"""
def tool_executor():
# Prepare tasks for concurrent execution
results = []
logger.info(f"Executing {len(tools)} tools concurrently.")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool in tools:
if tool.get("type") != "function":
continue # Skip non-function tool entries
function_info = tool.get("function", {})
func_name = function_info.get("name")
logger.info(f"Executing function: {func_name}")
# Check if the function name is mapped to an actual function
if func_name not in function_map:
error_message = f"Function '{func_name}' not found in function map."
logger.error(error_message)
results.append(error_message)
continue
# Validate parameters
params = function_info.get("parameters", {})
if not params:
error_message = f"No parameters specified for function '{func_name}'."
logger.error(error_message)
results.append(error_message)
continue
# Submit the function for execution
try:
future = executor.submit(
function_map[func_name], **params
)
futures.append((func_name, future))
except Exception as e:
error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
logger.error(error_message)
results.append(error_message)
# Gather results from all futures
for func_name, future in futures:
try:
result = future.result() # Collect result from future
results.append(f"{func_name}: {result}")
except Exception as e:
error_message = f"Error during execution of function '{func_name}': {e}"
logger.error(error_message)
results.append(error_message)
if return_as_string:
return "\n".join(results)
logger.info(f"Results: {results}")
return results
return tool_executor
# # Example
# @tool(
# name="test_function",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function(param1: int, param2: str) -> str:
# return f"Test function called with parameters: {param1}, {param2}"
# @tool(
# name="test_function2",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function2(param1: int, param2: str) -> str:
# return f"Test function 2 called with parameters: {param1}, {param2}"
# # Example execution
# out = openai_tool_executor(
# tools=[
# {
# "type": "function",
# "function": {
# "name": "test_function",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# },
# },
# {
# "type": "function",
# "function": {
# "name": "test_function2",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# },
# },
# ],
# function_map={
# "test_function": test_function,
# "test_function2": test_function2,
# },
# return_as_string=True,
# )
# print(out)

@ -0,0 +1,40 @@
from typing import Any
def function_to_str(function: dict[str, Any]) -> str:
"""
Convert a function dictionary to a string representation.
Args:
function (dict[str, Any]): The function dictionary to convert.
Returns:
str: The string representation of the function.
"""
function_str = f"Function: {function['name']}\n"
function_str += f"Description: {function['description']}\n"
function_str += "Parameters:\n"
for param, details in function["parameters"]["properties"].items():
function_str += f" {param} ({details['type']}): {details.get('description', '')}\n"
return function_str
def functions_to_str(functions: list[dict[str, Any]]) -> str:
"""
Convert a list of function dictionaries to a string representation.
Args:
functions (list[dict[str, Any]]): The list of function dictionaries to convert.
Returns:
str: The string representation of the functions.
"""
functions_str = ""
for function in functions:
functions_str += function_to_str(function) + "\n"
return functions_str

@ -1,6 +1,4 @@
from functools import wraps
import concurrent.futures
from typing import Callable, Any, Dict, List
from swarms.tools.py_func_to_openai_func_str import (
get_openai_function_schema_from_func,
)
@ -52,8 +50,9 @@ def tool(
func(*args, **kwargs)
# Get the openai function schema
tool_name = name if not None else func.__name__
schema = get_openai_function_schema_from_func(
func, name=name, description=description
func, name=tool_name, description=description
)
# Return the schema
@ -80,314 +79,3 @@ def tool(
return wrapper
return decorator
def openai_tool_executor(
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
verbose: bool = True,
*args,
**kwargs,
) -> Callable:
"""
Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
in a list of tool dictionaries, with extensive error handling and validation.
Args:
tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
Returns:
Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
Examples:
>>> from swarms.tools.openai_tool_creator_decorator import openai_tool_executor
>>> from swarms.tools.py_func_to_openai_func_str import get_openai_function_schema_from_func
>>> from swarms.utils.loguru_logger import logger
>>>
>>> def test_function(param1: int, param2: str) -> str:
... return f"Test function called with parameters: {param1}, {param2}"
...
>>> @openai_tool_executor(
... tools=[
... {
... "type": "function",
... "function": {
... "name": "test_function",
... "parameters": {
... "properties": {
... "param1": {
... "type": "int",
... "description": "An integer parameter."
... },
... "param2": {
... "type": "str",
... "description": "A string parameter."
... },
... }
... }
... }
... }
... ],
... function_map={
... "test_function": test_function
... }
... )
... def tool_executor():
... pass
...
>>> results = tool_executor()
>>> logger.info(results)
"""
def tool_executor():
# Prepare tasks for concurrent execution
results = []
logger.info(f"Executing {len(tools)} tools concurrently.")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool in tools:
if tool.get("type") != "function":
continue # Skip non-function tool entries
function_info = tool.get("function", {})
func_name = function_info.get("name")
logger.info(f"Executing function: {func_name}")
# Check if the function name is mapped to an actual function
if func_name not in function_map:
raise KeyError(
f"Function '{func_name}' not found in function map."
)
# Validate parameters
params = function_info.get("parameters", {}).get(
"properties", {}
)
if not params:
raise ValueError(
f"No parameters specified for function '{func_name}'."
)
# Submit the function for execution
try:
future = executor.submit(
function_map[func_name], **params
)
futures.append(future)
except Exception as e:
print(
f"Failed to submit the function '{func_name}' for execution: {e}"
)
# Gather results from all futures
for future in futures:
try:
result = future.result() # Collect result from future
results.append(result)
except Exception as e:
print(f"Error during execution of a function: {e}")
logger.info(f"Results: {results}")
return results
return tool_executor
# def openai_tool_executor(
# tools: List[Dict[str, Any]],
# function_map: Dict[str, Callable],
# verbose: bool = True,
# concurrent_execution: bool = True,
# retry_on_error: bool = False,
# retry_attempts: int = 3,
# max_loops: int = 1,
# max_workers: int = 10,
# *args,
# **kwargs,
# ) -> Callable:
# """
# Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
# in a list of tool dictionaries, with extensive error handling and validation.
# Args:
# tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
# function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
# Returns:
# Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
# Examples:
# >>> from swarms.tools.openai_tool_creator_decorator import openai_tool_executor
# >>> from swarms.tools.py_func_to_openai_func_str import get_openai_function_schema_from_func
# >>> from swarms.utils.loguru_logger import logger
# >>>
# >>> def test_function(param1: int, param2: str) -> str:
# ... return f"Test function called with parameters: {param1}, {param2}"
# ...
# >>> @openai_tool_executor(
# ... tools=[
# ... {
# ... "type": "function",
# ... "function": {
# ... "name": "test_function",
# ... "parameters": {
# ... "properties": {
# ... "param1": {
# ... "type": "int",
# ... "description": "An integer parameter."
# ... },
# ... "param2": {
# ... "type": "str",
# ... "description": "A string parameter."
# ... },
# ... }
# ... }
# ... }
# ... }
# ... ],
# ... function_map={
# ... "test_function": test_function
# ... }
# ... )
# ... def tool_executor():
# ... pass
# ...
# >>> results = tool_executor()
# >>> logger.info(results)
# """
# def tool_executor():
# logger.info(
# f"Starting execution of tools with {max_loops} loops and concurrency set to {concurrent_execution}."
# )
# results = []
# def execute_function(func_name, params):
# try:
# logger.debug(
# f"Executing function: {func_name} with params: {params}"
# )
# return function_map[func_name](**params)
# except Exception as e:
# logger.error(
# f"Error executing function {func_name}: {str(e)}"
# )
# if retry_on_error:
# for attempt in range(retry_attempts):
# try:
# logger.debug(
# f"Retrying function: {func_name}, attempt {attempt+1}"
# )
# return function_map[func_name](**params)
# except Exception as e:
# logger.error(
# f"Retry {attempt+1} for function {func_name} failed: {str(e)}"
# )
# raise
# else:
# raise
# for loop in range(max_loops):
# logger.info(f"Executing loop {loop + 1}/{max_loops}")
# with concurrent.futures.ThreadPoolExecutor(
# max_workers=max_workers
# ) as executor:
# future_to_function = {
# executor.submit(
# execute_function,
# tool["function"]["name"],
# tool["function"]["parameters"]["properties"],
# ): tool
# for tool in tools
# if tool.get("type") == "function"
# }
# for future in concurrent.futures.as_completed(
# future_to_function
# ):
# try:
# result = future.result()
# results.append(result)
# logger.debug(
# f"Function completed with result: {result}"
# )
# except Exception as e:
# logger.error(
# f"Execution failed with error: {str(e)}"
# )
# continue
# logger.info(f"All loops completed. Results: {results}")
# return results
# return tool_executor
# # Example
# @tool(
# name="test_function",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function(param1: int, param2: str) -> str:
# return f"Test function called with parameters: {param1}, {param2}"
# @tool(
# name="test_function2",
# description="A test function that takes two parameters and returns a string.",
# )
# def test_function2(param1: int, param2: str) -> str:
# return f"Test function 2 called with parameters: {param1}, {param2}"
# # Example execution
# out = openai_tool_executor(
# tools=[
# {
# "type": "function",
# "function": {
# "name": "test_function",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# },
# },
# {
# "type": "function",
# "function": {
# "name": "test_function2",
# "parameters": {
# "properties": {
# "param1": {
# "type": "int",
# "description": "An integer parameter.",
# },
# "param2": {
# "type": "str",
# "description": "A string parameter.",
# },
# }
# },
# },
# },
# ],
# function_map={"test_function": test_function, "test_function2": test_function2},
# )
# print(out)

@ -0,0 +1,9 @@
from swarms.tools.prebuilt.code_interpreter import (
SubprocessCodeInterpreter,
)
from swarms.tools.prebuilt.math_eval import math_eval
__all__ = [
"SubprocessCodeInterpreter",
"math_eval",
]

@ -110,42 +110,3 @@ def multi_base_model_to_openai_function(
"function_call": "auto",
"functions": functions,
}
def function_to_str(function: dict[str, Any]) -> str:
"""
Convert a function dictionary to a string representation.
Args:
function (dict[str, Any]): The function dictionary to convert.
Returns:
str: The string representation of the function.
"""
function_str = f"Function: {function['name']}\n"
function_str += f"Description: {function['description']}\n"
function_str += "Parameters:\n"
for param, details in function["parameters"]["properties"].items():
function_str += f" {param} ({details['type']}): {details.get('description', '')}\n"
return function_str
def functions_to_str(functions: list[dict[str, Any]]) -> str:
"""
Convert a list of function dictionaries to a string representation.
Args:
functions (list[dict[str, Any]]): The list of function dictionaries to convert.
Returns:
str: The string representation of the functions.
"""
functions_str = ""
for function in functions:
functions_str += function_to_str(function) + "\n"
return functions_str

@ -1,8 +0,0 @@
from langchain.tools import (
BaseTool,
Tool,
StructuredTool,
tool,
) # noqa F401
__all__ = ["BaseTool", "Tool", "StructuredTool", "tool"]

@ -180,3 +180,38 @@ def tools_prompt_prep(docs: str = None, scenarios: str = SCENARIOS):
"""
return PROMPT
def is_str_valid_func_output(
output: str = None, function_map: callable = None
):
"""
Check if the output is a valid JSON string, and if the function name in the JSON matches any name in the function map.
Args:
output (str): The output to check.
function_map (dict): A dictionary mapping function names to functions.
Returns:
bool: True if the output is valid and the function name matches, False otherwise.
"""
try:
# Parse the output as JSON
data = json.loads(output)
# Check if the output matches the schema
if (
data.get("type") == "function"
and "function" in data
and "name" in data["function"]
):
# Check if the function name matches any name in the function map
function_name = data["function"]["name"]
if function_name in function_map:
return True
except json.JSONDecodeError:
pass
return False

@ -1,5 +1,7 @@
from swarms.utils.class_args_wrapper import print_class_parameters
from swarms.tools.code_interpreter import SubprocessCodeInterpreter
from swarms.tools.prebuilt.code_interpreter import (
SubprocessCodeInterpreter,
)
from swarms.utils.data_to_text import (
csv_to_text,
data_to_text,
@ -19,7 +21,7 @@ from swarms.utils.find_img_path import find_image_path
from swarms.utils.json_output_parser import JsonOutputParser
from swarms.utils.llm_metrics_decorator import metrics_decorator
from swarms.utils.markdown_message import display_markdown_message
from swarms.tools.math_eval import math_eval
from swarms.tools.prebuilt.math_eval import math_eval
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.remove_json_whitespace import (

@ -4,7 +4,7 @@ import threading
import pytest
from swarms.tools.code_interpreter import ( # Adjust the import according to your project structure
from swarms.tools.prebuilt.code_interpreter import ( # Adjust the import according to your project structure
SubprocessCodeInterpreter,
)

Loading…
Cancel
Save