swarms tools

pull/814/merge
Kye Gomez 2 days ago
parent 8c9d9c673b
commit 3dd1855907

@ -1,40 +1,42 @@
from dotenv import load_dotenv
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.tools.mcp_integration import MCPServerSseParams
load_dotenv()
server = MCPServerSseParams(
url="http://localhost:8000/sse",
timeout=10,
)
tools = [
{
"type": "function",
"function": {
"name": "get_stock_price",
"description": "Retrieve the current stock price and related information for a specified company.",
"name": "add_numbers",
"description": "Add two numbers together and return the result.",
"parameters": {
"type": "object",
"properties": {
"ticker": {
"name": {
"type": "string",
"description": "The stock ticker symbol of the company, e.g. AAPL for Apple Inc.",
"description": "The name of the operation to perform.",
},
"include_history": {
"type": "boolean",
"description": "Indicates whether to include historical price data along with the current price.",
"a": {
"type": "integer",
"description": "The first number to add.",
},
"time": {
"type": "string",
"format": "date-time",
"description": "Optional parameter to specify the time for which the stock data is requested, in ISO 8601 format.",
"b": {
"type": "integer",
"description": "The second number to add.",
},
},
"required": [
"ticker",
"include_history",
"time",
"name",
"a",
"b",
],
},
},
@ -46,14 +48,14 @@ tools = [
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
max_loops=2,
tools_list_dictionary=tools,
output_type="final",
mcp_url="http://0.0.0.0:8000/sse",
)
out = agent.run(
"What is the current stock price for Apple Inc. (AAPL)? Include historical price data.",
"Use the multiply tool to multiply 3 and 4 together. Look at the tools available to you.",
)
print(type(out))
print(agent.short_memory.get_str())

@ -0,0 +1,167 @@
from time import perf_counter_ns
import psutil
import os
from rich.panel import Panel
from rich.console import Console
from rich.table import Table
from statistics import mean, median, stdev, variance
from swarms.structs.agent import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
def get_memory_stats(memory_readings):
"""Calculate memory statistics"""
return {
"peak": max(memory_readings),
"min": min(memory_readings),
"mean": mean(memory_readings),
"median": median(memory_readings),
"stdev": (
stdev(memory_readings) if len(memory_readings) > 1 else 0
),
"variance": (
variance(memory_readings)
if len(memory_readings) > 1
else 0
),
}
def get_time_stats(times):
"""Calculate time statistics"""
return {
"total": sum(times),
"mean": mean(times),
"median": median(times),
"min": min(times),
"max": max(times),
"stdev": stdev(times) if len(times) > 1 else 0,
"variance": variance(times) if len(times) > 1 else 0,
}
def benchmark_multiple_agents(num_agents=100):
console = Console()
init_times = []
memory_readings = []
process = psutil.Process(os.getpid())
# Create benchmark tables
time_table = Table(title="Time Statistics")
time_table.add_column("Metric", style="cyan")
time_table.add_column("Value", style="green")
memory_table = Table(title="Memory Statistics")
memory_table.add_column("Metric", style="cyan")
memory_table.add_column("Value", style="green")
initial_memory = process.memory_info().rss / 1024
start_total_time = perf_counter_ns()
# Initialize agents and measure performance
for i in range(num_agents):
start_time = perf_counter_ns()
Agent(
agent_name=f"Financial-Analysis-Agent-{i}",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=2,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
interactive=False,
)
init_time = (perf_counter_ns() - start_time) / 1_000_000
init_times.append(init_time)
current_memory = process.memory_info().rss / 1024
memory_readings.append(current_memory - initial_memory)
if (i + 1) % 10 == 0:
console.print(
f"Created {i + 1} agents...", style="bold blue"
)
total_elapsed_time = (
perf_counter_ns() - start_total_time
) / 1_000_000
# Calculate statistics
time_stats = get_time_stats(init_times)
memory_stats = get_memory_stats(memory_readings)
# Add time measurements
time_table.add_row(
"Total Wall Time", f"{total_elapsed_time:.2f} ms"
)
time_table.add_row(
"Total Init Time", f"{time_stats['total']:.2f} ms"
)
time_table.add_row(
"Average Init Time", f"{time_stats['mean']:.2f} ms"
)
time_table.add_row(
"Median Init Time", f"{time_stats['median']:.2f} ms"
)
time_table.add_row("Fastest Init", f"{time_stats['min']:.2f} ms")
time_table.add_row("Slowest Init", f"{time_stats['max']:.2f} ms")
time_table.add_row(
"Std Deviation", f"{time_stats['stdev']:.2f} ms"
)
time_table.add_row(
"Variance", f"{time_stats['variance']:.4f} ms²"
)
time_table.add_row(
"Throughput",
f"{(num_agents/total_elapsed_time) * 1000:.2f} agents/second",
)
# Add memory measurements
memory_table.add_row(
"Peak Memory Usage", f"{memory_stats['peak']:.2f} KB"
)
memory_table.add_row(
"Minimum Memory Usage", f"{memory_stats['min']:.2f} KB"
)
memory_table.add_row(
"Average Memory Usage", f"{memory_stats['mean']:.2f} KB"
)
memory_table.add_row(
"Median Memory Usage", f"{memory_stats['median']:.2f} KB"
)
memory_table.add_row(
"Memory Std Deviation", f"{memory_stats['stdev']:.2f} KB"
)
memory_table.add_row(
"Memory Variance", f"{memory_stats['variance']:.2f} KB²"
)
memory_table.add_row(
"Avg Memory Per Agent",
f"{memory_stats['mean']/num_agents:.2f} KB",
)
# Create and display panels
time_panel = Panel(
time_table,
title="Time Benchmark Results",
border_style="blue",
padding=(1, 2),
)
memory_panel = Panel(
memory_table,
title="Memory Benchmark Results",
border_style="green",
padding=(1, 2),
)
console.print(time_panel)
console.print("\n")
console.print(memory_panel)
if __name__ == "__main__":
benchmark_multiple_agents(100)

@ -357,6 +357,7 @@ nav:
- Swarms Cloud Subscription Tiers: "swarms_cloud/subscription_tiers.md"
- Swarms API Best Practices: "swarms_cloud/best_practices.md"
- Swarms API as MCP: "swarms_cloud/mcp.md"
- Swarms API Tools: "swarms_cloud/swarms_api_tools.md"
- Swarm Ecosystem APIs:
- MCS API: "swarms_cloud/mcs_api.md"
# - CreateNow API: "swarms_cloud/create_api.md"

@ -0,0 +1,371 @@
# Swarms API with Tools Guide
Swarms API allows you to create and manage AI agent swarms with optional tool integration. This guide will walk you through setting up and using the Swarms API with tools.
## Prerequisites
- Python 3.7+
- Swarms API key
- Required Python packages:
- `requests`
- `python-dotenv`
## Installation & Setup
1. Install required packages:
```bash
pip install requests python-dotenv
```
2. Create a `.env` file in your project root:
```bash
SWARMS_API_KEY=your_api_key_here
```
3. Basic setup code:
```python
import os
import requests
from dotenv import load_dotenv
import json
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}
```
## Creating a Swarm with Tools
### Step-by-Step Guide
1. Define your tool dictionary:
```python
tool_dictionary = {
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct an in-depth search on a specified topic",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Search depth (1-3)"
},
"detailed_queries": {
"type": "array",
"items": {
"type": "string",
"description": "Specific search queries"
}
}
},
"required": ["depth", "detailed_queries"]
}
}
}
```
2. Create agent configurations:
```python
agent_config = {
"agent_name": "Market Analyst",
"description": "Analyzes market trends",
"system_prompt": "You are a financial analyst expert.",
"model_name": "openai/gpt-4",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
"temperature": 0.5,
"auto_generate_prompt": False,
"tools_dictionary": [tool_dictionary] # Optional: Add tools if needed
}
```
3. Create the swarm payload:
```python
payload = {
"name": "Your Swarm Name",
"description": "Swarm description",
"agents": [agent_config],
"max_loops": 1,
"swarm_type": "ConcurrentWorkflow",
"task": "Your task description",
"output_type": "dict"
}
```
4. Make the API request:
```python
def run_swarm(payload):
response = requests.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload
)
return response.json()
```
## FAQ
### Do all agents need tools?
No, tools are optional for each agent. You can choose which agents have tools based on your specific needs. Simply omit the `tools_dictionary` field for agents that don't require tools.
### What types of tools can I use?
Currently, the API supports function-type tools. Each tool must have:
- A unique name
- A clear description
- Well-defined parameters with types and descriptions
### Can I mix agents with and without tools?
Yes, you can create swarms with a mix of tool-enabled and regular agents. This allows for flexible swarm architectures.
### What's the recommended number of tools per agent?
While there's no strict limit, it's recommended to:
- Keep tools focused and specific
- Only include tools that the agent needs
- Consider the complexity of tool interactions
## Example Implementation
Here's a complete example of a financial analysis swarm:
```python
def run_financial_analysis_swarm():
payload = {
"name": "Financial Analysis Swarm",
"description": "Market analysis swarm",
"agents": [
{
"agent_name": "Market Analyst",
"description": "Analyzes market trends",
"system_prompt": "You are a financial analyst expert.",
"model_name": "openai/gpt-4",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
"temperature": 0.5,
"auto_generate_prompt": False,
"tools_dictionary": [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct market research",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Search depth (1-3)"
},
"detailed_queries": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["depth", "detailed_queries"]
}
}
}
]
}
],
"max_loops": 1,
"swarm_type": "ConcurrentWorkflow",
"task": "Analyze top performing tech ETFs",
"output_type": "dict"
}
response = requests.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload
)
return response.json()
```
## Health Check
Always verify the API status before running swarms:
```python
def check_api_health():
response = requests.get(f"{BASE_URL}/health", headers=headers)
return response.json()
```
## Best Practices
1. **Error Handling**: Always implement proper error handling:
```python
def safe_run_swarm(payload):
try:
response = requests.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error running swarm: {e}")
return None
```
2. **Environment Variables**: Never hardcode API keys
3. **Tool Design**: Keep tools simple and focused
4. **Testing**: Validate swarm configurations before production use
## Troubleshooting
Common issues and solutions:
1. **API Key Issues**
- Verify key is correctly set in `.env`
- Check key permissions
2. **Tool Execution Errors**
- Validate tool parameters
- Check tool function signatures
3. **Response Timeout**
- Consider reducing max_tokens
- Simplify tool complexity
```python
import os
import requests
from dotenv import load_dotenv
import json
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}
def run_health_check():
response = requests.get(f"{BASE_URL}/health", headers=headers)
return response.json()
def run_single_swarm():
payload = {
"name": "Financial Analysis Swarm",
"description": "Market analysis swarm",
"agents": [
{
"agent_name": "Market Analyst",
"description": "Analyzes market trends",
"system_prompt": "You are a financial analyst expert.",
"model_name": "openai/gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
"temperature": 0.5,
"auto_generate_prompt": False,
"tools_dictionary": [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.",
},
"detailed_queries": {
"type": "array",
"description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.",
"items": {
"type": "string",
"description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.",
},
},
},
"required": ["depth", "detailed_queries"],
},
},
},
],
},
{
"agent_name": "Economic Forecaster",
"description": "Predicts economic trends",
"system_prompt": "You are an expert in economic forecasting.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
"temperature": 0.5,
"auto_generate_prompt": False,
"tools_dictionary": [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.",
},
"detailed_queries": {
"type": "array",
"description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.",
"items": {
"type": "string",
"description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.",
},
},
},
"required": ["depth", "detailed_queries"],
},
},
},
],
},
],
"max_loops": 1,
"swarm_type": "ConcurrentWorkflow",
"task": "What are the best etfs and index funds for ai and tech?",
"output_type": "dict",
}
response = requests.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload,
)
print(response)
print(response.status_code)
# return response.json()
output = response.json()
return json.dumps(output, indent=4)
if __name__ == "__main__":
result = run_single_swarm()
print("Swarm Result:")
print(result)
```

@ -0,0 +1,10 @@
from swarms.tools.mcp_client import (
list_tools_for_multiple_urls,
)
print(
list_tools_for_multiple_urls(
["http://0.0.0.0:8000/sse"], output_type="json"
)
)

@ -127,9 +127,18 @@ def agent_system_prompt_2(name: str):
AGENT_SYSTEM_PROMPT_3 = """
You are a fully autonomous agent serving the user in automating tasks, workflows, and activities.
Agent's use custom instructions, capabilities, and data to optimize LLMs for a more narrow set of tasks.
You are an autonomous agent designed to serve users by automating complex tasks, workflows, and activities with precision and intelligence.
Agents leverage custom instructions, specialized capabilities, and curated data to optimize large language models for specific domains and use cases.
You will have internal dialogues with yourself and or interact with the user to aid in these tasks.
Your responses should be coherent, contextually relevant, and tailored to the task at hand.
You possess the ability to engage in both internal reasoning and external interactions to achieve optimal results.
Through self-reflection and user collaboration, you can break down complex problems, identify optimal solutions, and execute tasks with high efficiency.
Your responses must demonstrate:
1. Deep understanding of the task context and requirements
2. Logical reasoning and systematic problem-solving
3. Clear communication and coherent explanations
4. Adaptability to user feedback and changing requirements
5. Attention to detail and quality in execution
Always aim to exceed expectations by delivering comprehensive, well-structured, and contextually appropriate solutions that address both the explicit and implicit needs of the task.
"""

@ -11,10 +11,6 @@ from pydantic import (
)
from pydantic.v1 import validator
from swarms.telemetry.main import (
capture_system_data,
log_agent_data,
)
from swarms.tools.base_tool import BaseTool
from swarms.utils.loguru_logger import initialize_logger
@ -141,10 +137,10 @@ class Prompt(BaseModel):
if self.autosave:
self._autosave()
def log_telemetry(self):
system_data = capture_system_data()
merged_data = {**system_data, **self.model_dump()}
log_agent_data(merged_data)
# def log_telemetry(self):
# system_data = capture_system_data()
# merged_data = {**system_data, **self.model_dump()}
# log_agent_data(merged_data)
def rollback(self, version: int) -> None:
"""
@ -174,7 +170,7 @@ class Prompt(BaseModel):
# f"Prompt {self.id} rolled back to version {version}. Current content: '{self.content}'"
# )
self.log_telemetry()
# self.log_telemetry()
if self.autosave:
self._autosave()
@ -190,7 +186,7 @@ class Prompt(BaseModel):
str: The current prompt content.
"""
# logger.debug(f"Returning prompt {self.id} as a string.")
self.log_telemetry()
# self.log_telemetry()
return self.content

@ -46,12 +46,7 @@ from swarms.structs.safe_loading import (
)
from swarms.telemetry.main import log_agent_data
from swarms.tools.base_tool import BaseTool
# from swarms.tools.mcp_integration import (
# MCPServerSseParams,
# batch_mcp_flow,
# mcp_flow_get_tool_schema,
# )
from swarms.tools.mcp_integration import MCPServerSseParams
from swarms.tools.tool_parse_exec import parse_and_execute_json
from swarms.utils.any_to_str import any_to_str
from swarms.utils.data_to_text import data_to_text
@ -64,6 +59,13 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.str_to_dict import str_to_dict
from swarms.tools.mcp_client import (
execute_mcp_tool,
list_tools_for_multiple_urls,
list_all,
find_and_execute_tool,
)
# Utils
@ -392,7 +394,9 @@ class Agent:
role: agent_roles = "worker",
no_print: bool = False,
tools_list_dictionary: Optional[List[Dict[str, Any]]] = None,
# mcp_servers: List[MCPServerSseParams] = [],
mcp_servers: MCPServerSseParams = None,
mcp_url: str = None,
mcp_urls: List[str] = None,
*args,
**kwargs,
):
@ -512,47 +516,51 @@ class Agent:
self.role = role
self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary
# self.mcp_servers = mcp_servers
self.mcp_servers = mcp_servers
self.mcp_url = mcp_url
self.mcp_urls = mcp_urls
self._cached_llm = (
None # Add this line to cache the LLM instance
)
self._default_model = (
"gpt-4o-mini" # Move default model name here
self.short_memory = self.short_memory_init()
# Initialize the feedback
self.feedback = []
# Initialize the executor
self.executor = ThreadPoolExecutor(
max_workers=executor_workers
)
self.init_handling()
def short_memory_init(self):
if (
self.agent_name is not None
or self.agent_description is not None
):
prompt = f"Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {system_prompt}"
prompt = f"Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {self.system_prompt}"
else:
prompt = system_prompt
prompt = self.system_prompt
# Initialize the short term memory
self.short_memory = Conversation(
system_prompt=prompt,
time_enabled=False,
user=user_name,
rules=rules,
user=self.user_name,
rules=self.rules,
token_count=False,
*args,
**kwargs,
)
# Initialize the feedback
self.feedback = []
# Initialize the executor
self.executor = ThreadPoolExecutor(
max_workers=executor_workers
)
self.init_handling()
return self.short_memory
def init_handling(self):
# Define tasks as pairs of (function, condition)
# Each task will only run if its condition is True
self.setup_config()
tasks = [
(self.setup_config, True), # Always run setup_config
(
@ -565,10 +573,10 @@ class Agent:
exists(self.tool_schema)
or exists(self.list_base_models),
),
(
self.handle_sop_ops,
exists(self.sop) or exists(self.sop_list),
),
# (
# self.handle_sop_ops,
# exists(self.sop) or exists(self.sop_list),
# ),
]
# Filter out tasks whose conditions are False
@ -577,9 +585,7 @@ class Agent:
]
# Execute all tasks concurrently
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count() * 4
) as executor:
with self.executor as executor:
# Map tasks to futures and collect results
results = {}
future_to_task = {
@ -611,6 +617,9 @@ class Agent:
if self.llm is None:
self.llm = self.llm_handling()
if self.mcp_url or self.mcp_servers is not None:
self.add_mcp_tools_to_memory()
def agent_output_model(self):
# Many steps
id = agent_id()
@ -637,10 +646,7 @@ class Agent:
return self._cached_llm
if self.model_name is None:
logger.warning(
f"Model name is not provided, using {self._default_model}. You can configure any model from litellm if desired."
)
self.model_name = self._default_model
self.model_name = "gpt-4o-mini"
try:
# Simplify initialization logic
@ -719,68 +725,123 @@ class Agent:
tool.__name__: tool for tool in self.tools
}
# def mcp_execution_flow(self, response: any):
# """
# Executes the MCP (Model Context Protocol) flow based on the provided response.
# This method takes a response, converts it from a string to a dictionary format,
# and checks for the presence of a tool name or a name in the response. If either
# is found, it retrieves the tool name and proceeds to call the batch_mcp_flow
# function to execute the corresponding tool actions.
# Args:
# response (any): The response to be processed, which can be in string format
# that represents a dictionary.
# Returns:
# The output from the batch_mcp_flow function, which contains the results of
# the tool execution. If an error occurs during processing, it logs the error
# and returns None.
# Raises:
# Exception: Logs any exceptions that occur during the execution flow.
# """
# try:
# response = str_to_dict(response)
# tool_output = batch_mcp_flow(
# self.mcp_servers,
# function_call=response,
# )
# return tool_output
# except Exception as e:
# logger.error(f"Error in mcp_execution_flow: {e}")
# return None
# def mcp_tool_handling(self):
# """
# Handles the retrieval of tool schemas from the MCP servers.
# This method iterates over the list of MCP servers, retrieves the tool schema
# for each server using the mcp_flow_get_tool_schema function, and compiles
# these schemas into a list. The resulting list is stored in the
# tools_list_dictionary attribute.
# Returns:
# list: A list of tool schemas retrieved from the MCP servers. If an error
# occurs during the retrieval process, it logs the error and returns None.
# Raises:
# Exception: Logs any exceptions that occur during the tool handling process.
# """
# try:
# self.tools_list_dictionary = []
# for mcp_server in self.mcp_servers:
# tool_schema = mcp_flow_get_tool_schema(mcp_server)
# self.tools_list_dictionary.append(tool_schema)
# print(self.tools_list_dictionary)
# return self.tools_list_dictionary
# except Exception as e:
# logger.error(f"Error in mcp_tool_handling: {e}")
# return None
def add_mcp_tools_to_memory(self):
"""
Adds MCP tools to the agent's short-term memory.
This function checks for either a single MCP URL or multiple MCP URLs and adds the available tools
to the agent's memory. The tools are listed in JSON format.
Raises:
Exception: If there's an error accessing the MCP tools
"""
try:
if self.mcp_url is not None:
tools_available = list_all(
self.mcp_url, output_type="json"
)
self.short_memory.add(
role="Tools Available",
content=f"\n{tools_available}",
)
elif (
self.mcp_url is None
and self.mcp_urls is not None
and len(self.mcp_urls) > 1
):
tools_available = list_tools_for_multiple_urls(
urls=self.mcp_urls,
output_type="json",
)
self.short_memory.add(
role="Tools Available",
content=f"\n{tools_available}",
)
except Exception as e:
logger.error(f"Error adding MCP tools to memory: {e}")
raise e
def _single_mcp_tool_handling(self, response: any):
"""
Handles execution of a single MCP tool.
Args:
response (str): The tool response to process
Raises:
Exception: If there's an error executing the tool
"""
try:
if isinstance(response, dict):
result = response
else:
result = str_to_dict(response)
output = execute_mcp_tool(
url=self.mcp_url,
parameters=result,
)
print(output)
print(type(output))
self.short_memory.add(
role="Tool Executor", content=str(output)
)
except Exception as e:
logger.error(f"Error in single MCP tool handling: {e}")
raise e
def _multiple_mcp_tool_handling(self, response: any):
"""
Handles execution of multiple MCP tools.
Args:
response (any): The tool response to process
Raises:
Exception: If there's an error executing the tools
"""
try:
if isinstance(response, str):
response = str_to_dict(response)
execution = find_and_execute_tool(
self.mcp_urls,
response["name"],
parameters=response,
)
self.short_memory.add(
role="Tool Executor", content=str(execution)
)
except Exception as e:
logger.error(f"Error in multiple MCP tool handling: {e}")
raise e
def mcp_tool_handling(self, response: any):
"""
Main handler for MCP tool execution.
Args:
response (any): The tool response to process
Raises:
ValueError: If no MCP URL or MCP Servers are provided
Exception: If there's an error in tool handling
"""
try:
if self.mcp_url is not None:
self._single_mcp_tool_handling(response)
elif self.mcp_url is None and len(self.mcp_servers) > 1:
self._multiple_mcp_tool_handling(response)
else:
raise ValueError("No MCP URL or MCP Servers provided")
except Exception as e:
logger.error(f"Error in mcp_tool_handling: {e}")
raise e
def setup_config(self):
# The max_loops will be set dynamically if the dynamic_loop
@ -1125,6 +1186,12 @@ class Agent:
role=self.agent_name, content=out
)
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
self.sentiment_and_evaluator(response)
success = True # Mark as successful to exit the retry loop

@ -27,6 +27,7 @@ class AOP:
name: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = "http://localhost:8000/sse",
urls: Optional[list[str]] = None,
*args,
**kwargs,
):
@ -44,7 +45,7 @@ class AOP:
self.name = name
self.description = description
self.url = url
self.urls = urls
self.tools = {}
self.swarms = {}
@ -527,6 +528,12 @@ class AOP:
return tool
return None
def list_tools_for_multiple_urls(self):
out = []
for url in self.urls:
out.append(self.list_all(url))
return out
def search_if_tool_exists(self, name: str):
out = self.list_all()
for tool in out:

@ -1,6 +1,6 @@
import datetime
import json
from typing import Any, Optional, Union
from typing import Any, List, Optional, Union
import yaml
from swarms.structs.base_structure import BaseStructure
@ -105,7 +105,7 @@ class Conversation(BaseStructure):
if tokenizer is not None:
self.truncate_memory_with_tokenizer()
def _add(
def add(
self,
role: str,
content: Union[str, dict, list],
@ -137,8 +137,19 @@ class Conversation(BaseStructure):
# Add the message to history immediately without waiting for token count
self.conversation_history.append(message)
if self.token_count is True:
self._count_tokens(content, message)
def add_multiple_messages(
self, roles: List[str], contents: List[Union[str, dict, list]]
):
for role, content in zip(roles, contents):
self.add(role, content)
def _count_tokens(self, content: str, message: dict):
# If token counting is enabled, do it in a separate thread
if self.token_count is True:
# Define a function to count tokens and update the message
def count_tokens_thread():
tokens = count_tokens(any_to_str(content))
@ -158,21 +169,6 @@ class Conversation(BaseStructure):
)
token_thread.start()
def add(self, role: str, content: Union[str, dict, list]):
"""Add a message to the conversation history.
Args:
role (str): The role of the speaker (e.g., 'User', 'System').
content (Union[str, dict, list]): The content of the message to be added.
"""
process_thread = threading.Thread(
target=self._add,
args=(role, content),
daemon=True,
)
process_thread.start()
# process_thread.join()
def delete(self, index: str):
"""Delete a message from the conversation history.

@ -1,4 +1,3 @@
import threading
import asyncio
@ -394,9 +393,4 @@ def _log_agent_data(data_dict: dict):
def log_agent_data(data_dict: dict):
"""Log agent data"""
process_thread = threading.Thread(
target=_log_agent_data,
args=(data_dict,),
daemon=True,
)
process_thread.start()
pass

@ -1,90 +1,237 @@
import asyncio
from typing import Literal, Dict, Any, Union
import json
from typing import List, Literal, Dict, Any, Union
from fastmcp import Client
from swarms.utils.any_to_str import any_to_str
from swarms.utils.str_to_dict import str_to_dict
def parse_agent_output(
dictionary: Union[str, Dict[Any, Any]]
) -> tuple[str, Dict[Any, Any]]:
if isinstance(dictionary, str):
dictionary = str_to_dict(dictionary)
"""
Parse agent output into tool name and parameters.
elif not isinstance(dictionary, dict):
raise ValueError("Invalid dictionary")
Args:
dictionary: Either a string or dictionary containing tool information.
If string, it will be converted to a dictionary.
Must contain a 'name' key for the tool name.
# Handle OpenAI function call format
if "function_call" in dictionary:
name = dictionary["function_call"]["name"]
# arguments is a JSON string, so we need to parse it
params = str_to_dict(dictionary["function_call"]["arguments"])
return name, params
Returns:
tuple[str, Dict[Any, Any]]: A tuple containing the tool name and its parameters.
# Handle OpenAI tool calls format
if "tool_calls" in dictionary:
# Get the first tool call (or you could handle multiple if needed)
tool_call = dictionary["tool_calls"][0]
name = tool_call["function"]["name"]
params = str_to_dict(tool_call["function"]["arguments"])
return name, params
Raises:
ValueError: If the input is invalid or missing required 'name' key.
"""
try:
if isinstance(dictionary, str):
dictionary = str_to_dict(dictionary)
# Handle regular dictionary format
if "name" in dictionary:
name = dictionary["name"]
params = dictionary.get("arguments", {})
return name, params
elif not isinstance(dictionary, dict):
raise ValueError("Invalid dictionary")
raise ValueError("Invalid function call format")
# Handle regular dictionary format
if "name" in dictionary:
name = dictionary["name"]
# Remove the name key and use remaining key-value pairs as parameters
params = dict(dictionary)
params.pop("name")
return name, params
raise ValueError("Invalid function call format")
except Exception as e:
raise ValueError(f"Error parsing agent output: {str(e)}")
async def _list_all(url: str):
"""
Asynchronously list all tools available on a given MCP server.
Args:
url: The URL of the MCP server to query.
Returns:
List of available tools.
Raises:
ValueError: If there's an error connecting to or querying the server.
"""
try:
async with Client(url) as client:
return await client.list_tools()
except Exception as e:
raise ValueError(f"Error listing tools: {str(e)}")
def list_all(url: str, output_type: Literal["str", "json"] = "json"):
"""
Synchronously list all tools available on a given MCP server.
Args:
url: The URL of the MCP server to query.
Returns:
List of dictionaries containing tool information.
Raises:
ValueError: If there's an error connecting to or querying the server.
"""
try:
out = asyncio.run(_list_all(url))
outputs = []
for tool in out:
outputs.append(tool.model_dump())
if output_type == "json":
return json.dumps(outputs, indent=4)
else:
return outputs
except Exception as e:
raise ValueError(f"Error in list_all: {str(e)}")
def list_tools_for_multiple_urls(
urls: List[str], output_type: Literal["str", "json"] = "json"
):
"""
List tools available across multiple MCP servers.
Args:
urls: List of MCP server URLs to query.
output_type: Format of the output, either "json" (string) or "str" (list).
Returns:
If output_type is "json": JSON string containing all tools with server URLs.
If output_type is "str": List of tools with server URLs.
Raises:
ValueError: If there's an error querying any of the servers.
"""
try:
out = []
for url in urls:
tools = list_all(url)
# Add server URL to each tool's data
for tool in tools:
tool["server_url"] = url
out.append(tools)
if output_type == "json":
return json.dumps(out, indent=4)
else:
return out
except Exception as e:
raise ValueError(
f"Error listing tools for multiple URLs: {str(e)}"
)
async def _execute_mcp_tool(
url: str,
method: Literal["stdio", "sse"] = "sse",
parameters: Dict[Any, Any] = None,
output_type: Literal["str", "dict"] = "str",
*args,
**kwargs,
) -> Dict[Any, Any]:
"""
Asynchronously execute a tool on an MCP server.
if "sse" or "stdio" not in url:
raise ValueError("Invalid URL")
Args:
url: The URL of the MCP server.
parameters: Dictionary containing tool name and parameters.
*args: Additional positional arguments for the Client.
**kwargs: Additional keyword arguments for the Client.
url = f"{url}/{method}"
Returns:
Dictionary containing the tool execution results.
name, params = parse_agent_output(parameters)
Raises:
ValueError: If the URL is invalid or tool execution fails.
"""
try:
name, params = parse_agent_output(parameters)
outputs = []
if output_type == "str":
async with Client(url, *args, **kwargs) as client:
out = await client.call_tool(
name=name,
arguments=params,
)
return any_to_str(out)
elif output_type == "dict":
async with Client(url, *args, **kwargs) as client:
out = await client.call_tool(
name=name,
arguments=params,
)
return out
else:
raise ValueError(f"Invalid output type: {output_type}")
for output in out:
outputs.append(output.model_dump())
# convert outputs to string
return json.dumps(outputs, indent=4)
except Exception as e:
raise ValueError(f"Error executing MCP tool: {str(e)}")
def execute_mcp_tool(
url: str,
tool_name: str = None,
method: Literal["stdio", "sse"] = "sse",
parameters: Dict[Any, Any] = None,
output_type: Literal["str", "dict"] = "str",
) -> Dict[Any, Any]:
return asyncio.run(
_execute_mcp_tool(
url=url,
tool_name=tool_name,
method=method,
parameters=parameters,
output_type=output_type,
"""
Synchronously execute a tool on an MCP server.
Args:
url: The URL of the MCP server.
parameters: Dictionary containing tool name and parameters.
Returns:
Dictionary containing the tool execution results.
Raises:
ValueError: If tool execution fails.
"""
try:
return asyncio.run(
_execute_mcp_tool(
url=url,
parameters=parameters,
)
)
except Exception as e:
raise ValueError(f"Error in execute_mcp_tool: {str(e)}")
def find_and_execute_tool(
urls: List[str], tool_name: str, parameters: Dict[Any, Any]
) -> Dict[Any, Any]:
"""
Find a tool across multiple servers and execute it with the given parameters.
Args:
urls: List of server URLs to search through.
tool_name: Name of the tool to find and execute.
parameters: Parameters to pass to the tool.
Returns:
Dict containing the tool execution results.
Raises:
ValueError: If tool is not found on any server or execution fails.
"""
try:
# Search for tool across all servers
for url in urls:
try:
tools = list_all(url)
# Check if tool exists on this server
if any(tool["name"] == tool_name for tool in tools):
# Prepare parameters in correct format
tool_params = {"name": tool_name, **parameters}
# Execute tool on this server
return execute_mcp_tool(
url=url, parameters=tool_params
)
except Exception:
# Skip servers that fail and continue searching
continue
raise ValueError(
f"Tool '{tool_name}' not found on any provided servers"
)
)
except Exception as e:
raise ValueError(f"Error in find_and_execute_tool: {str(e)}")

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Any, List
from typing import Any
from loguru import logger
@ -25,8 +25,6 @@ from mcp.client.sse import sse_client
from mcp.types import CallToolResult, JSONRPCMessage
from typing_extensions import NotRequired, TypedDict
from swarms.utils.any_to_str import any_to_str
class MCPServer(abc.ABC):
"""Base class for Model Context Protocol servers."""
@ -340,53 +338,3 @@ class MCPServerSse(_MCPServerWithClientSession):
def name(self) -> str:
"""A readable name for the server."""
return self._name
def mcp_flow_get_tool_schema(
params: MCPServerSseParams,
) -> MCPServer:
server = MCPServerSse(params, cache_tools_list=True)
# Connect the server
asyncio.run(server.connect())
# Return the server
output = asyncio.run(server.list_tools())
# Cleanup the server
asyncio.run(server.cleanup())
return output.model_dump()
def mcp_flow(
params: MCPServerSseParams,
function_call: dict[str, Any],
) -> MCPServer:
server = MCPServerSse(params, cache_tools_list=True)
# Connect the server
asyncio.run(server.connect())
# Return the server
output = asyncio.run(server.call_tool(function_call))
output = output.model_dump()
# Cleanup the server
asyncio.run(server.cleanup())
return any_to_str(output)
def batch_mcp_flow(
params: List[MCPServerSseParams],
function_call: List[dict[str, Any]] = [],
) -> MCPServer:
output_list = []
for param in params:
output = mcp_flow(param, function_call)
output_list.append(output)
return output_list

@ -0,0 +1,8 @@
from swarms.tools.mcp_client import execute_mcp_tool
print(
execute_mcp_tool(
"http://0.0.0.0:8000/sse",
parameters={"name": "add", "a": 1, "b": 2},
)
)
Loading…
Cancel
Save