From 3dd18559076b3aa29412b55201b48cc80ea2ce97 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 24 Apr 2025 13:46:41 -0700 Subject: [PATCH] swarms tools --- agent_tools_dict_example.py | 46 +-- benchmark_init.py | 167 ++++++++ docs/mkdocs.yml | 1 + docs/swarms_cloud/swarms_api_tools.md | 371 ++++++++++++++++++ mcp_exampler.py => examples/mcp_exampler.py | 0 .../deep_research_swarm_example.py | 0 .../scient_agents/paper_idea_agent.py | 0 .../scient_agents/paper_idea_profile.py | 0 .../sequential_swarm_example.py | 0 .../swarms_of_browser_agents.py | 0 .../mcp_example/mcp_test.py => mcp_test.py | 0 mcp_utils.py | 10 + swarms/prompts/agent_system_prompts.py | 17 +- swarms/prompts/prompt.py | 16 +- swarms/structs/agent.py | 263 ++++++++----- swarms/structs/aop.py | 9 +- swarms/structs/conversation.py | 30 +- swarms/telemetry/main.py | 8 +- swarms/tools/mcp_client.py | 253 +++++++++--- swarms/tools/mcp_integration.py | 54 +-- test_execute.py | 8 + 21 files changed, 988 insertions(+), 265 deletions(-) create mode 100644 benchmark_init.py create mode 100644 docs/swarms_cloud/swarms_api_tools.md rename mcp_exampler.py => examples/mcp_exampler.py (100%) rename deep_research_swarm_example.py => examples/scient_agents/deep_research_swarm_example.py (100%) rename paper_idea_agent.py => examples/scient_agents/paper_idea_agent.py (100%) rename paper_idea_profile.py => examples/scient_agents/paper_idea_profile.py (100%) rename sequential_swarm_example.py => examples/sequential_swarm_example.py (100%) rename swarms_of_browser_agents.py => examples/swarms_of_browser_agents.py (100%) rename examples/mcp_example/mcp_test.py => mcp_test.py (100%) create mode 100644 mcp_utils.py create mode 100644 test_execute.py diff --git a/agent_tools_dict_example.py b/agent_tools_dict_example.py index 9a85e7c4..0c702924 100644 --- a/agent_tools_dict_example.py +++ b/agent_tools_dict_example.py @@ -1,40 +1,42 @@ from dotenv import load_dotenv from swarms import Agent -from swarms.prompts.finance_agent_sys_prompt import ( - FINANCIAL_AGENT_SYS_PROMPT, -) - +from swarms.tools.mcp_integration import MCPServerSseParams load_dotenv() + +server = MCPServerSseParams( + url="http://localhost:8000/sse", + timeout=10, +) + tools = [ { "type": "function", "function": { - "name": "get_stock_price", - "description": "Retrieve the current stock price and related information for a specified company.", + "name": "add_numbers", + "description": "Add two numbers together and return the result.", "parameters": { "type": "object", "properties": { - "ticker": { + "name": { "type": "string", - "description": "The stock ticker symbol of the company, e.g. AAPL for Apple Inc.", + "description": "The name of the operation to perform.", }, - "include_history": { - "type": "boolean", - "description": "Indicates whether to include historical price data along with the current price.", + "a": { + "type": "integer", + "description": "The first number to add.", }, - "time": { - "type": "string", - "format": "date-time", - "description": "Optional parameter to specify the time for which the stock data is requested, in ISO 8601 format.", + "b": { + "type": "integer", + "description": "The second number to add.", }, }, "required": [ - "ticker", - "include_history", - "time", + "name", + "a", + "b", ], }, }, @@ -46,14 +48,14 @@ tools = [ agent = Agent( agent_name="Financial-Analysis-Agent", agent_description="Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - max_loops=1, + max_loops=2, tools_list_dictionary=tools, output_type="final", + mcp_url="http://0.0.0.0:8000/sse", ) out = agent.run( - "What is the current stock price for Apple Inc. (AAPL)? Include historical price data.", + "Use the multiply tool to multiply 3 and 4 together. Look at the tools available to you.", ) -print(type(out)) +print(agent.short_memory.get_str()) diff --git a/benchmark_init.py b/benchmark_init.py new file mode 100644 index 00000000..36fc18df --- /dev/null +++ b/benchmark_init.py @@ -0,0 +1,167 @@ +from time import perf_counter_ns +import psutil +import os +from rich.panel import Panel +from rich.console import Console +from rich.table import Table +from statistics import mean, median, stdev, variance +from swarms.structs.agent import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) + + +def get_memory_stats(memory_readings): + """Calculate memory statistics""" + return { + "peak": max(memory_readings), + "min": min(memory_readings), + "mean": mean(memory_readings), + "median": median(memory_readings), + "stdev": ( + stdev(memory_readings) if len(memory_readings) > 1 else 0 + ), + "variance": ( + variance(memory_readings) + if len(memory_readings) > 1 + else 0 + ), + } + + +def get_time_stats(times): + """Calculate time statistics""" + return { + "total": sum(times), + "mean": mean(times), + "median": median(times), + "min": min(times), + "max": max(times), + "stdev": stdev(times) if len(times) > 1 else 0, + "variance": variance(times) if len(times) > 1 else 0, + } + + +def benchmark_multiple_agents(num_agents=100): + console = Console() + init_times = [] + memory_readings = [] + process = psutil.Process(os.getpid()) + + # Create benchmark tables + time_table = Table(title="Time Statistics") + time_table.add_column("Metric", style="cyan") + time_table.add_column("Value", style="green") + + memory_table = Table(title="Memory Statistics") + memory_table.add_column("Metric", style="cyan") + memory_table.add_column("Value", style="green") + + initial_memory = process.memory_info().rss / 1024 + start_total_time = perf_counter_ns() + + # Initialize agents and measure performance + for i in range(num_agents): + start_time = perf_counter_ns() + + Agent( + agent_name=f"Financial-Analysis-Agent-{i}", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=2, + model_name="gpt-4o-mini", + dynamic_temperature_enabled=True, + interactive=False, + ) + + init_time = (perf_counter_ns() - start_time) / 1_000_000 + init_times.append(init_time) + + current_memory = process.memory_info().rss / 1024 + memory_readings.append(current_memory - initial_memory) + + if (i + 1) % 10 == 0: + console.print( + f"Created {i + 1} agents...", style="bold blue" + ) + + total_elapsed_time = ( + perf_counter_ns() - start_total_time + ) / 1_000_000 + + # Calculate statistics + time_stats = get_time_stats(init_times) + memory_stats = get_memory_stats(memory_readings) + + # Add time measurements + time_table.add_row( + "Total Wall Time", f"{total_elapsed_time:.2f} ms" + ) + time_table.add_row( + "Total Init Time", f"{time_stats['total']:.2f} ms" + ) + time_table.add_row( + "Average Init Time", f"{time_stats['mean']:.2f} ms" + ) + time_table.add_row( + "Median Init Time", f"{time_stats['median']:.2f} ms" + ) + time_table.add_row("Fastest Init", f"{time_stats['min']:.2f} ms") + time_table.add_row("Slowest Init", f"{time_stats['max']:.2f} ms") + time_table.add_row( + "Std Deviation", f"{time_stats['stdev']:.2f} ms" + ) + time_table.add_row( + "Variance", f"{time_stats['variance']:.4f} ms²" + ) + time_table.add_row( + "Throughput", + f"{(num_agents/total_elapsed_time) * 1000:.2f} agents/second", + ) + + # Add memory measurements + memory_table.add_row( + "Peak Memory Usage", f"{memory_stats['peak']:.2f} KB" + ) + memory_table.add_row( + "Minimum Memory Usage", f"{memory_stats['min']:.2f} KB" + ) + memory_table.add_row( + "Average Memory Usage", f"{memory_stats['mean']:.2f} KB" + ) + memory_table.add_row( + "Median Memory Usage", f"{memory_stats['median']:.2f} KB" + ) + memory_table.add_row( + "Memory Std Deviation", f"{memory_stats['stdev']:.2f} KB" + ) + memory_table.add_row( + "Memory Variance", f"{memory_stats['variance']:.2f} KB²" + ) + memory_table.add_row( + "Avg Memory Per Agent", + f"{memory_stats['mean']/num_agents:.2f} KB", + ) + + # Create and display panels + time_panel = Panel( + time_table, + title="Time Benchmark Results", + border_style="blue", + padding=(1, 2), + ) + + memory_panel = Panel( + memory_table, + title="Memory Benchmark Results", + border_style="green", + padding=(1, 2), + ) + + console.print(time_panel) + console.print("\n") + console.print(memory_panel) + + +if __name__ == "__main__": + benchmark_multiple_agents(100) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index d382fe32..1fed842e 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -357,6 +357,7 @@ nav: - Swarms Cloud Subscription Tiers: "swarms_cloud/subscription_tiers.md" - Swarms API Best Practices: "swarms_cloud/best_practices.md" - Swarms API as MCP: "swarms_cloud/mcp.md" + - Swarms API Tools: "swarms_cloud/swarms_api_tools.md" - Swarm Ecosystem APIs: - MCS API: "swarms_cloud/mcs_api.md" # - CreateNow API: "swarms_cloud/create_api.md" diff --git a/docs/swarms_cloud/swarms_api_tools.md b/docs/swarms_cloud/swarms_api_tools.md new file mode 100644 index 00000000..f2cfa995 --- /dev/null +++ b/docs/swarms_cloud/swarms_api_tools.md @@ -0,0 +1,371 @@ +# Swarms API with Tools Guide + + +Swarms API allows you to create and manage AI agent swarms with optional tool integration. This guide will walk you through setting up and using the Swarms API with tools. + +## Prerequisites + +- Python 3.7+ +- Swarms API key +- Required Python packages: + - `requests` + - `python-dotenv` + +## Installation & Setup + +1. Install required packages: + +```bash +pip install requests python-dotenv +``` + +2. Create a `.env` file in your project root: + +```bash +SWARMS_API_KEY=your_api_key_here +``` + +3. Basic setup code: + +```python +import os +import requests +from dotenv import load_dotenv +import json + +load_dotenv() + +API_KEY = os.getenv("SWARMS_API_KEY") +BASE_URL = "https://api.swarms.world" + +headers = {"x-api-key": API_KEY, "Content-Type": "application/json"} +``` + +## Creating a Swarm with Tools + +### Step-by-Step Guide + +1. Define your tool dictionary: +```python +tool_dictionary = { + "type": "function", + "function": { + "name": "search_topic", + "description": "Conduct an in-depth search on a specified topic", + "parameters": { + "type": "object", + "properties": { + "depth": { + "type": "integer", + "description": "Search depth (1-3)" + }, + "detailed_queries": { + "type": "array", + "items": { + "type": "string", + "description": "Specific search queries" + } + } + }, + "required": ["depth", "detailed_queries"] + } + } +} +``` + +2. Create agent configurations: +```python +agent_config = { + "agent_name": "Market Analyst", + "description": "Analyzes market trends", + "system_prompt": "You are a financial analyst expert.", + "model_name": "openai/gpt-4", + "role": "worker", + "max_loops": 1, + "max_tokens": 8192, + "temperature": 0.5, + "auto_generate_prompt": False, + "tools_dictionary": [tool_dictionary] # Optional: Add tools if needed +} +``` + +3. Create the swarm payload: +```python +payload = { + "name": "Your Swarm Name", + "description": "Swarm description", + "agents": [agent_config], + "max_loops": 1, + "swarm_type": "ConcurrentWorkflow", + "task": "Your task description", + "output_type": "dict" +} +``` + +4. Make the API request: +```python +def run_swarm(payload): + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) + return response.json() +``` + +## FAQ + +### Do all agents need tools? +No, tools are optional for each agent. You can choose which agents have tools based on your specific needs. Simply omit the `tools_dictionary` field for agents that don't require tools. + +### What types of tools can I use? +Currently, the API supports function-type tools. Each tool must have: +- A unique name +- A clear description +- Well-defined parameters with types and descriptions + +### Can I mix agents with and without tools? +Yes, you can create swarms with a mix of tool-enabled and regular agents. This allows for flexible swarm architectures. + +### What's the recommended number of tools per agent? +While there's no strict limit, it's recommended to: +- Keep tools focused and specific +- Only include tools that the agent needs +- Consider the complexity of tool interactions + +## Example Implementation + +Here's a complete example of a financial analysis swarm: + +```python +def run_financial_analysis_swarm(): + payload = { + "name": "Financial Analysis Swarm", + "description": "Market analysis swarm", + "agents": [ + { + "agent_name": "Market Analyst", + "description": "Analyzes market trends", + "system_prompt": "You are a financial analyst expert.", + "model_name": "openai/gpt-4", + "role": "worker", + "max_loops": 1, + "max_tokens": 8192, + "temperature": 0.5, + "auto_generate_prompt": False, + "tools_dictionary": [ + { + "type": "function", + "function": { + "name": "search_topic", + "description": "Conduct market research", + "parameters": { + "type": "object", + "properties": { + "depth": { + "type": "integer", + "description": "Search depth (1-3)" + }, + "detailed_queries": { + "type": "array", + "items": {"type": "string"} + } + }, + "required": ["depth", "detailed_queries"] + } + } + } + ] + } + ], + "max_loops": 1, + "swarm_type": "ConcurrentWorkflow", + "task": "Analyze top performing tech ETFs", + "output_type": "dict" + } + + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) + return response.json() +``` + +## Health Check + +Always verify the API status before running swarms: + +```python +def check_api_health(): + response = requests.get(f"{BASE_URL}/health", headers=headers) + return response.json() +``` + +## Best Practices + +1. **Error Handling**: Always implement proper error handling: +```python +def safe_run_swarm(payload): + try: + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + print(f"Error running swarm: {e}") + return None +``` + +2. **Environment Variables**: Never hardcode API keys +3. **Tool Design**: Keep tools simple and focused +4. **Testing**: Validate swarm configurations before production use + +## Troubleshooting + +Common issues and solutions: + +1. **API Key Issues** + - Verify key is correctly set in `.env` + - Check key permissions + +2. **Tool Execution Errors** + - Validate tool parameters + - Check tool function signatures + +3. **Response Timeout** + - Consider reducing max_tokens + - Simplify tool complexity + + + +```python +import os +import requests +from dotenv import load_dotenv +import json + +load_dotenv() + +API_KEY = os.getenv("SWARMS_API_KEY") +BASE_URL = "https://api.swarms.world" + +headers = {"x-api-key": API_KEY, "Content-Type": "application/json"} + + +def run_health_check(): + response = requests.get(f"{BASE_URL}/health", headers=headers) + return response.json() + + +def run_single_swarm(): + payload = { + "name": "Financial Analysis Swarm", + "description": "Market analysis swarm", + "agents": [ + { + "agent_name": "Market Analyst", + "description": "Analyzes market trends", + "system_prompt": "You are a financial analyst expert.", + "model_name": "openai/gpt-4o", + "role": "worker", + "max_loops": 1, + "max_tokens": 8192, + "temperature": 0.5, + "auto_generate_prompt": False, + "tools_dictionary": [ + { + "type": "function", + "function": { + "name": "search_topic", + "description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.", + "parameters": { + "type": "object", + "properties": { + "depth": { + "type": "integer", + "description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.", + }, + "detailed_queries": { + "type": "array", + "description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.", + "items": { + "type": "string", + "description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.", + }, + }, + }, + "required": ["depth", "detailed_queries"], + }, + }, + }, + ], + }, + { + "agent_name": "Economic Forecaster", + "description": "Predicts economic trends", + "system_prompt": "You are an expert in economic forecasting.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1, + "max_tokens": 8192, + "temperature": 0.5, + "auto_generate_prompt": False, + "tools_dictionary": [ + { + "type": "function", + "function": { + "name": "search_topic", + "description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.", + "parameters": { + "type": "object", + "properties": { + "depth": { + "type": "integer", + "description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.", + }, + "detailed_queries": { + "type": "array", + "description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.", + "items": { + "type": "string", + "description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.", + }, + }, + }, + "required": ["depth", "detailed_queries"], + }, + }, + }, + ], + }, + ], + "max_loops": 1, + "swarm_type": "ConcurrentWorkflow", + "task": "What are the best etfs and index funds for ai and tech?", + "output_type": "dict", + } + + response = requests.post( + f"{BASE_URL}/v1/swarm/completions", + headers=headers, + json=payload, + ) + + print(response) + print(response.status_code) + # return response.json() + output = response.json() + + return json.dumps(output, indent=4) + + +if __name__ == "__main__": + result = run_single_swarm() + print("Swarm Result:") + print(result) + +``` \ No newline at end of file diff --git a/mcp_exampler.py b/examples/mcp_exampler.py similarity index 100% rename from mcp_exampler.py rename to examples/mcp_exampler.py diff --git a/deep_research_swarm_example.py b/examples/scient_agents/deep_research_swarm_example.py similarity index 100% rename from deep_research_swarm_example.py rename to examples/scient_agents/deep_research_swarm_example.py diff --git a/paper_idea_agent.py b/examples/scient_agents/paper_idea_agent.py similarity index 100% rename from paper_idea_agent.py rename to examples/scient_agents/paper_idea_agent.py diff --git a/paper_idea_profile.py b/examples/scient_agents/paper_idea_profile.py similarity index 100% rename from paper_idea_profile.py rename to examples/scient_agents/paper_idea_profile.py diff --git a/sequential_swarm_example.py b/examples/sequential_swarm_example.py similarity index 100% rename from sequential_swarm_example.py rename to examples/sequential_swarm_example.py diff --git a/swarms_of_browser_agents.py b/examples/swarms_of_browser_agents.py similarity index 100% rename from swarms_of_browser_agents.py rename to examples/swarms_of_browser_agents.py diff --git a/examples/mcp_example/mcp_test.py b/mcp_test.py similarity index 100% rename from examples/mcp_example/mcp_test.py rename to mcp_test.py diff --git a/mcp_utils.py b/mcp_utils.py new file mode 100644 index 00000000..b7085cc3 --- /dev/null +++ b/mcp_utils.py @@ -0,0 +1,10 @@ +from swarms.tools.mcp_client import ( + list_tools_for_multiple_urls, +) + + +print( + list_tools_for_multiple_urls( + ["http://0.0.0.0:8000/sse"], output_type="json" + ) +) diff --git a/swarms/prompts/agent_system_prompts.py b/swarms/prompts/agent_system_prompts.py index 8872ad3b..3990feaa 100644 --- a/swarms/prompts/agent_system_prompts.py +++ b/swarms/prompts/agent_system_prompts.py @@ -127,9 +127,18 @@ def agent_system_prompt_2(name: str): AGENT_SYSTEM_PROMPT_3 = """ - You are a fully autonomous agent serving the user in automating tasks, workflows, and activities. - Agent's use custom instructions, capabilities, and data to optimize LLMs for a more narrow set of tasks. + You are an autonomous agent designed to serve users by automating complex tasks, workflows, and activities with precision and intelligence. + Agents leverage custom instructions, specialized capabilities, and curated data to optimize large language models for specific domains and use cases. - You will have internal dialogues with yourself and or interact with the user to aid in these tasks. - Your responses should be coherent, contextually relevant, and tailored to the task at hand. + You possess the ability to engage in both internal reasoning and external interactions to achieve optimal results. + Through self-reflection and user collaboration, you can break down complex problems, identify optimal solutions, and execute tasks with high efficiency. + + Your responses must demonstrate: + 1. Deep understanding of the task context and requirements + 2. Logical reasoning and systematic problem-solving + 3. Clear communication and coherent explanations + 4. Adaptability to user feedback and changing requirements + 5. Attention to detail and quality in execution + + Always aim to exceed expectations by delivering comprehensive, well-structured, and contextually appropriate solutions that address both the explicit and implicit needs of the task. """ diff --git a/swarms/prompts/prompt.py b/swarms/prompts/prompt.py index ca6ec625..9c4aeb5a 100644 --- a/swarms/prompts/prompt.py +++ b/swarms/prompts/prompt.py @@ -11,10 +11,6 @@ from pydantic import ( ) from pydantic.v1 import validator -from swarms.telemetry.main import ( - capture_system_data, - log_agent_data, -) from swarms.tools.base_tool import BaseTool from swarms.utils.loguru_logger import initialize_logger @@ -141,10 +137,10 @@ class Prompt(BaseModel): if self.autosave: self._autosave() - def log_telemetry(self): - system_data = capture_system_data() - merged_data = {**system_data, **self.model_dump()} - log_agent_data(merged_data) + # def log_telemetry(self): + # system_data = capture_system_data() + # merged_data = {**system_data, **self.model_dump()} + # log_agent_data(merged_data) def rollback(self, version: int) -> None: """ @@ -174,7 +170,7 @@ class Prompt(BaseModel): # f"Prompt {self.id} rolled back to version {version}. Current content: '{self.content}'" # ) - self.log_telemetry() + # self.log_telemetry() if self.autosave: self._autosave() @@ -190,7 +186,7 @@ class Prompt(BaseModel): str: The current prompt content. """ # logger.debug(f"Returning prompt {self.id} as a string.") - self.log_telemetry() + # self.log_telemetry() return self.content diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index a5410030..55be3f99 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -46,12 +46,7 @@ from swarms.structs.safe_loading import ( ) from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool - -# from swarms.tools.mcp_integration import ( -# MCPServerSseParams, -# batch_mcp_flow, -# mcp_flow_get_tool_schema, -# ) +from swarms.tools.mcp_integration import MCPServerSseParams from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.any_to_str import any_to_str from swarms.utils.data_to_text import data_to_text @@ -64,6 +59,13 @@ from swarms.utils.history_output_formatter import ( from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.pdf_to_text import pdf_to_text +from swarms.utils.str_to_dict import str_to_dict +from swarms.tools.mcp_client import ( + execute_mcp_tool, + list_tools_for_multiple_urls, + list_all, + find_and_execute_tool, +) # Utils @@ -392,7 +394,9 @@ class Agent: role: agent_roles = "worker", no_print: bool = False, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, - # mcp_servers: List[MCPServerSseParams] = [], + mcp_servers: MCPServerSseParams = None, + mcp_url: str = None, + mcp_urls: List[str] = None, *args, **kwargs, ): @@ -512,47 +516,51 @@ class Agent: self.role = role self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary - # self.mcp_servers = mcp_servers + self.mcp_servers = mcp_servers + self.mcp_url = mcp_url + self.mcp_urls = mcp_urls self._cached_llm = ( None # Add this line to cache the LLM instance ) - self._default_model = ( - "gpt-4o-mini" # Move default model name here + + self.short_memory = self.short_memory_init() + + # Initialize the feedback + self.feedback = [] + + # Initialize the executor + self.executor = ThreadPoolExecutor( + max_workers=executor_workers ) + self.init_handling() + + def short_memory_init(self): if ( self.agent_name is not None or self.agent_description is not None ): - prompt = f"Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {system_prompt}" + prompt = f"Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {self.system_prompt}" else: - prompt = system_prompt + prompt = self.system_prompt # Initialize the short term memory self.short_memory = Conversation( system_prompt=prompt, time_enabled=False, - user=user_name, - rules=rules, + user=self.user_name, + rules=self.rules, token_count=False, - *args, - **kwargs, ) - # Initialize the feedback - self.feedback = [] - - # Initialize the executor - self.executor = ThreadPoolExecutor( - max_workers=executor_workers - ) - - self.init_handling() + return self.short_memory def init_handling(self): # Define tasks as pairs of (function, condition) # Each task will only run if its condition is True + self.setup_config() + tasks = [ (self.setup_config, True), # Always run setup_config ( @@ -565,10 +573,10 @@ class Agent: exists(self.tool_schema) or exists(self.list_base_models), ), - ( - self.handle_sop_ops, - exists(self.sop) or exists(self.sop_list), - ), + # ( + # self.handle_sop_ops, + # exists(self.sop) or exists(self.sop_list), + # ), ] # Filter out tasks whose conditions are False @@ -577,9 +585,7 @@ class Agent: ] # Execute all tasks concurrently - with concurrent.futures.ThreadPoolExecutor( - max_workers=os.cpu_count() * 4 - ) as executor: + with self.executor as executor: # Map tasks to futures and collect results results = {} future_to_task = { @@ -611,6 +617,9 @@ class Agent: if self.llm is None: self.llm = self.llm_handling() + if self.mcp_url or self.mcp_servers is not None: + self.add_mcp_tools_to_memory() + def agent_output_model(self): # Many steps id = agent_id() @@ -637,10 +646,7 @@ class Agent: return self._cached_llm if self.model_name is None: - logger.warning( - f"Model name is not provided, using {self._default_model}. You can configure any model from litellm if desired." - ) - self.model_name = self._default_model + self.model_name = "gpt-4o-mini" try: # Simplify initialization logic @@ -719,68 +725,123 @@ class Agent: tool.__name__: tool for tool in self.tools } - # def mcp_execution_flow(self, response: any): - # """ - # Executes the MCP (Model Context Protocol) flow based on the provided response. - - # This method takes a response, converts it from a string to a dictionary format, - # and checks for the presence of a tool name or a name in the response. If either - # is found, it retrieves the tool name and proceeds to call the batch_mcp_flow - # function to execute the corresponding tool actions. - - # Args: - # response (any): The response to be processed, which can be in string format - # that represents a dictionary. - - # Returns: - # The output from the batch_mcp_flow function, which contains the results of - # the tool execution. If an error occurs during processing, it logs the error - # and returns None. - - # Raises: - # Exception: Logs any exceptions that occur during the execution flow. - # """ - # try: - # response = str_to_dict(response) - - # tool_output = batch_mcp_flow( - # self.mcp_servers, - # function_call=response, - # ) - - # return tool_output - # except Exception as e: - # logger.error(f"Error in mcp_execution_flow: {e}") - # return None - - # def mcp_tool_handling(self): - # """ - # Handles the retrieval of tool schemas from the MCP servers. - - # This method iterates over the list of MCP servers, retrieves the tool schema - # for each server using the mcp_flow_get_tool_schema function, and compiles - # these schemas into a list. The resulting list is stored in the - # tools_list_dictionary attribute. - - # Returns: - # list: A list of tool schemas retrieved from the MCP servers. If an error - # occurs during the retrieval process, it logs the error and returns None. - - # Raises: - # Exception: Logs any exceptions that occur during the tool handling process. - # """ - # try: - # self.tools_list_dictionary = [] - - # for mcp_server in self.mcp_servers: - # tool_schema = mcp_flow_get_tool_schema(mcp_server) - # self.tools_list_dictionary.append(tool_schema) - - # print(self.tools_list_dictionary) - # return self.tools_list_dictionary - # except Exception as e: - # logger.error(f"Error in mcp_tool_handling: {e}") - # return None + def add_mcp_tools_to_memory(self): + """ + Adds MCP tools to the agent's short-term memory. + + This function checks for either a single MCP URL or multiple MCP URLs and adds the available tools + to the agent's memory. The tools are listed in JSON format. + + Raises: + Exception: If there's an error accessing the MCP tools + """ + try: + if self.mcp_url is not None: + tools_available = list_all( + self.mcp_url, output_type="json" + ) + self.short_memory.add( + role="Tools Available", + content=f"\n{tools_available}", + ) + + elif ( + self.mcp_url is None + and self.mcp_urls is not None + and len(self.mcp_urls) > 1 + ): + tools_available = list_tools_for_multiple_urls( + urls=self.mcp_urls, + output_type="json", + ) + + self.short_memory.add( + role="Tools Available", + content=f"\n{tools_available}", + ) + except Exception as e: + logger.error(f"Error adding MCP tools to memory: {e}") + raise e + + def _single_mcp_tool_handling(self, response: any): + """ + Handles execution of a single MCP tool. + + Args: + response (str): The tool response to process + + Raises: + Exception: If there's an error executing the tool + """ + try: + if isinstance(response, dict): + result = response + else: + result = str_to_dict(response) + + output = execute_mcp_tool( + url=self.mcp_url, + parameters=result, + ) + + print(output) + print(type(output)) + + self.short_memory.add( + role="Tool Executor", content=str(output) + ) + except Exception as e: + logger.error(f"Error in single MCP tool handling: {e}") + raise e + + def _multiple_mcp_tool_handling(self, response: any): + """ + Handles execution of multiple MCP tools. + + Args: + response (any): The tool response to process + + Raises: + Exception: If there's an error executing the tools + """ + try: + if isinstance(response, str): + response = str_to_dict(response) + + execution = find_and_execute_tool( + self.mcp_urls, + response["name"], + parameters=response, + ) + + self.short_memory.add( + role="Tool Executor", content=str(execution) + ) + except Exception as e: + logger.error(f"Error in multiple MCP tool handling: {e}") + raise e + + def mcp_tool_handling(self, response: any): + """ + Main handler for MCP tool execution. + + Args: + response (any): The tool response to process + + Raises: + ValueError: If no MCP URL or MCP Servers are provided + Exception: If there's an error in tool handling + """ + try: + if self.mcp_url is not None: + self._single_mcp_tool_handling(response) + elif self.mcp_url is None and len(self.mcp_servers) > 1: + self._multiple_mcp_tool_handling(response) + else: + raise ValueError("No MCP URL or MCP Servers provided") + except Exception as e: + logger.error(f"Error in mcp_tool_handling: {e}") + raise e def setup_config(self): # The max_loops will be set dynamically if the dynamic_loop @@ -1125,6 +1186,12 @@ class Agent: role=self.agent_name, content=out ) + if ( + self.mcp_servers + and self.tools_list_dictionary is not None + ): + self.mcp_tool_handling(response) + self.sentiment_and_evaluator(response) success = True # Mark as successful to exit the retry loop diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py index 31d62a72..79f2f5d9 100644 --- a/swarms/structs/aop.py +++ b/swarms/structs/aop.py @@ -27,6 +27,7 @@ class AOP: name: Optional[str] = None, description: Optional[str] = None, url: Optional[str] = "http://localhost:8000/sse", + urls: Optional[list[str]] = None, *args, **kwargs, ): @@ -44,7 +45,7 @@ class AOP: self.name = name self.description = description self.url = url - + self.urls = urls self.tools = {} self.swarms = {} @@ -527,6 +528,12 @@ class AOP: return tool return None + def list_tools_for_multiple_urls(self): + out = [] + for url in self.urls: + out.append(self.list_all(url)) + return out + def search_if_tool_exists(self, name: str): out = self.list_all() for tool in out: diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 2984b8ba..c19b8190 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -1,6 +1,6 @@ import datetime import json -from typing import Any, Optional, Union +from typing import Any, List, Optional, Union import yaml from swarms.structs.base_structure import BaseStructure @@ -105,7 +105,7 @@ class Conversation(BaseStructure): if tokenizer is not None: self.truncate_memory_with_tokenizer() - def _add( + def add( self, role: str, content: Union[str, dict, list], @@ -137,8 +137,19 @@ class Conversation(BaseStructure): # Add the message to history immediately without waiting for token count self.conversation_history.append(message) + if self.token_count is True: + self._count_tokens(content, message) + + def add_multiple_messages( + self, roles: List[str], contents: List[Union[str, dict, list]] + ): + for role, content in zip(roles, contents): + self.add(role, content) + + def _count_tokens(self, content: str, message: dict): # If token counting is enabled, do it in a separate thread if self.token_count is True: + # Define a function to count tokens and update the message def count_tokens_thread(): tokens = count_tokens(any_to_str(content)) @@ -158,21 +169,6 @@ class Conversation(BaseStructure): ) token_thread.start() - def add(self, role: str, content: Union[str, dict, list]): - """Add a message to the conversation history. - - Args: - role (str): The role of the speaker (e.g., 'User', 'System'). - content (Union[str, dict, list]): The content of the message to be added. - """ - process_thread = threading.Thread( - target=self._add, - args=(role, content), - daemon=True, - ) - process_thread.start() - # process_thread.join() - def delete(self, index: str): """Delete a message from the conversation history. diff --git a/swarms/telemetry/main.py b/swarms/telemetry/main.py index 4be2a033..5c81a90b 100644 --- a/swarms/telemetry/main.py +++ b/swarms/telemetry/main.py @@ -1,4 +1,3 @@ -import threading import asyncio @@ -394,9 +393,4 @@ def _log_agent_data(data_dict: dict): def log_agent_data(data_dict: dict): """Log agent data""" - process_thread = threading.Thread( - target=_log_agent_data, - args=(data_dict,), - daemon=True, - ) - process_thread.start() + pass diff --git a/swarms/tools/mcp_client.py b/swarms/tools/mcp_client.py index 9a9d2b37..c43daa9d 100644 --- a/swarms/tools/mcp_client.py +++ b/swarms/tools/mcp_client.py @@ -1,90 +1,237 @@ import asyncio -from typing import Literal, Dict, Any, Union +import json +from typing import List, Literal, Dict, Any, Union from fastmcp import Client -from swarms.utils.any_to_str import any_to_str from swarms.utils.str_to_dict import str_to_dict def parse_agent_output( dictionary: Union[str, Dict[Any, Any]] ) -> tuple[str, Dict[Any, Any]]: - if isinstance(dictionary, str): - dictionary = str_to_dict(dictionary) + """ + Parse agent output into tool name and parameters. - elif not isinstance(dictionary, dict): - raise ValueError("Invalid dictionary") + Args: + dictionary: Either a string or dictionary containing tool information. + If string, it will be converted to a dictionary. + Must contain a 'name' key for the tool name. - # Handle OpenAI function call format - if "function_call" in dictionary: - name = dictionary["function_call"]["name"] - # arguments is a JSON string, so we need to parse it - params = str_to_dict(dictionary["function_call"]["arguments"]) - return name, params + Returns: + tuple[str, Dict[Any, Any]]: A tuple containing the tool name and its parameters. - # Handle OpenAI tool calls format - if "tool_calls" in dictionary: - # Get the first tool call (or you could handle multiple if needed) - tool_call = dictionary["tool_calls"][0] - name = tool_call["function"]["name"] - params = str_to_dict(tool_call["function"]["arguments"]) - return name, params + Raises: + ValueError: If the input is invalid or missing required 'name' key. + """ + try: + if isinstance(dictionary, str): + dictionary = str_to_dict(dictionary) - # Handle regular dictionary format - if "name" in dictionary: - name = dictionary["name"] - params = dictionary.get("arguments", {}) - return name, params + elif not isinstance(dictionary, dict): + raise ValueError("Invalid dictionary") - raise ValueError("Invalid function call format") + # Handle regular dictionary format + if "name" in dictionary: + name = dictionary["name"] + # Remove the name key and use remaining key-value pairs as parameters + params = dict(dictionary) + params.pop("name") + return name, params + + raise ValueError("Invalid function call format") + except Exception as e: + raise ValueError(f"Error parsing agent output: {str(e)}") + + +async def _list_all(url: str): + """ + Asynchronously list all tools available on a given MCP server. + + Args: + url: The URL of the MCP server to query. + + Returns: + List of available tools. + + Raises: + ValueError: If there's an error connecting to or querying the server. + """ + try: + async with Client(url) as client: + return await client.list_tools() + except Exception as e: + raise ValueError(f"Error listing tools: {str(e)}") + + +def list_all(url: str, output_type: Literal["str", "json"] = "json"): + """ + Synchronously list all tools available on a given MCP server. + + Args: + url: The URL of the MCP server to query. + + Returns: + List of dictionaries containing tool information. + + Raises: + ValueError: If there's an error connecting to or querying the server. + """ + try: + out = asyncio.run(_list_all(url)) + + outputs = [] + for tool in out: + outputs.append(tool.model_dump()) + + if output_type == "json": + return json.dumps(outputs, indent=4) + else: + return outputs + except Exception as e: + raise ValueError(f"Error in list_all: {str(e)}") + + +def list_tools_for_multiple_urls( + urls: List[str], output_type: Literal["str", "json"] = "json" +): + """ + List tools available across multiple MCP servers. + + Args: + urls: List of MCP server URLs to query. + output_type: Format of the output, either "json" (string) or "str" (list). + + Returns: + If output_type is "json": JSON string containing all tools with server URLs. + If output_type is "str": List of tools with server URLs. + + Raises: + ValueError: If there's an error querying any of the servers. + """ + try: + out = [] + for url in urls: + tools = list_all(url) + # Add server URL to each tool's data + for tool in tools: + tool["server_url"] = url + out.append(tools) + + if output_type == "json": + return json.dumps(out, indent=4) + else: + return out + except Exception as e: + raise ValueError( + f"Error listing tools for multiple URLs: {str(e)}" + ) async def _execute_mcp_tool( url: str, - method: Literal["stdio", "sse"] = "sse", parameters: Dict[Any, Any] = None, - output_type: Literal["str", "dict"] = "str", *args, **kwargs, ) -> Dict[Any, Any]: + """ + Asynchronously execute a tool on an MCP server. - if "sse" or "stdio" not in url: - raise ValueError("Invalid URL") + Args: + url: The URL of the MCP server. + parameters: Dictionary containing tool name and parameters. + *args: Additional positional arguments for the Client. + **kwargs: Additional keyword arguments for the Client. - url = f"{url}/{method}" + Returns: + Dictionary containing the tool execution results. - name, params = parse_agent_output(parameters) + Raises: + ValueError: If the URL is invalid or tool execution fails. + """ + try: + + name, params = parse_agent_output(parameters) + + outputs = [] - if output_type == "str": - async with Client(url, *args, **kwargs) as client: - out = await client.call_tool( - name=name, - arguments=params, - ) - return any_to_str(out) - elif output_type == "dict": async with Client(url, *args, **kwargs) as client: out = await client.call_tool( name=name, arguments=params, ) - return out - else: - raise ValueError(f"Invalid output type: {output_type}") + + for output in out: + outputs.append(output.model_dump()) + + # convert outputs to string + return json.dumps(outputs, indent=4) + except Exception as e: + raise ValueError(f"Error executing MCP tool: {str(e)}") def execute_mcp_tool( url: str, - tool_name: str = None, - method: Literal["stdio", "sse"] = "sse", parameters: Dict[Any, Any] = None, - output_type: Literal["str", "dict"] = "str", ) -> Dict[Any, Any]: - return asyncio.run( - _execute_mcp_tool( - url=url, - tool_name=tool_name, - method=method, - parameters=parameters, - output_type=output_type, + """ + Synchronously execute a tool on an MCP server. + + Args: + url: The URL of the MCP server. + parameters: Dictionary containing tool name and parameters. + + Returns: + Dictionary containing the tool execution results. + + Raises: + ValueError: If tool execution fails. + """ + try: + return asyncio.run( + _execute_mcp_tool( + url=url, + parameters=parameters, + ) + ) + except Exception as e: + raise ValueError(f"Error in execute_mcp_tool: {str(e)}") + + +def find_and_execute_tool( + urls: List[str], tool_name: str, parameters: Dict[Any, Any] +) -> Dict[Any, Any]: + """ + Find a tool across multiple servers and execute it with the given parameters. + + Args: + urls: List of server URLs to search through. + tool_name: Name of the tool to find and execute. + parameters: Parameters to pass to the tool. + + Returns: + Dict containing the tool execution results. + + Raises: + ValueError: If tool is not found on any server or execution fails. + """ + try: + # Search for tool across all servers + for url in urls: + try: + tools = list_all(url) + # Check if tool exists on this server + if any(tool["name"] == tool_name for tool in tools): + # Prepare parameters in correct format + tool_params = {"name": tool_name, **parameters} + # Execute tool on this server + return execute_mcp_tool( + url=url, parameters=tool_params + ) + except Exception: + # Skip servers that fail and continue searching + continue + + raise ValueError( + f"Tool '{tool_name}' not found on any provided servers" ) - ) + except Exception as e: + raise ValueError(f"Error in find_and_execute_tool: {str(e)}") diff --git a/swarms/tools/mcp_integration.py b/swarms/tools/mcp_integration.py index 7a74dbaa..acc02dd0 100644 --- a/swarms/tools/mcp_integration.py +++ b/swarms/tools/mcp_integration.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, List +from typing import Any from loguru import logger @@ -25,8 +25,6 @@ from mcp.client.sse import sse_client from mcp.types import CallToolResult, JSONRPCMessage from typing_extensions import NotRequired, TypedDict -from swarms.utils.any_to_str import any_to_str - class MCPServer(abc.ABC): """Base class for Model Context Protocol servers.""" @@ -340,53 +338,3 @@ class MCPServerSse(_MCPServerWithClientSession): def name(self) -> str: """A readable name for the server.""" return self._name - - -def mcp_flow_get_tool_schema( - params: MCPServerSseParams, -) -> MCPServer: - server = MCPServerSse(params, cache_tools_list=True) - - # Connect the server - asyncio.run(server.connect()) - - # Return the server - output = asyncio.run(server.list_tools()) - - # Cleanup the server - asyncio.run(server.cleanup()) - - return output.model_dump() - - -def mcp_flow( - params: MCPServerSseParams, - function_call: dict[str, Any], -) -> MCPServer: - server = MCPServerSse(params, cache_tools_list=True) - - # Connect the server - asyncio.run(server.connect()) - - # Return the server - output = asyncio.run(server.call_tool(function_call)) - - output = output.model_dump() - - # Cleanup the server - asyncio.run(server.cleanup()) - - return any_to_str(output) - - -def batch_mcp_flow( - params: List[MCPServerSseParams], - function_call: List[dict[str, Any]] = [], -) -> MCPServer: - output_list = [] - - for param in params: - output = mcp_flow(param, function_call) - output_list.append(output) - - return output_list diff --git a/test_execute.py b/test_execute.py new file mode 100644 index 00000000..ef9d9c44 --- /dev/null +++ b/test_execute.py @@ -0,0 +1,8 @@ +from swarms.tools.mcp_client import execute_mcp_tool + +print( + execute_mcp_tool( + "http://0.0.0.0:8000/sse", + parameters={"name": "add", "a": 1, "b": 2}, + ) +)