diff --git a/docs/developer_guides/web_scraper.md b/docs/developer_guides/web_scraper.md new file mode 100644 index 00000000..a835004d --- /dev/null +++ b/docs/developer_guides/web_scraper.md @@ -0,0 +1,84 @@ +# Web Scraper Agents + +Web scraper agents are specialized AI agents that can automatically extract and process information from websites. These agents combine the power of large language models with web scraping tools to intelligently gather, analyze, and structure data from the web. + +Web scraper agents are AI-powered tools that can: + +| Capability | Description | +|----------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------| +| **Automatically navigate websites** | Extract relevant information from web pages | +| **Parse and structure data** | Convert HTML content into readable, structured formats | +| **Handle dynamic content** | Process JavaScript-rendered pages and dynamic website elements | +| **Provide intelligent summaries and analysis** | Generate summaries and analyze the scraped content | +| **Scale to multiple websites simultaneously** | Scrape and process data from several websites at once for comprehensive research | + + +## Install + +```bash +pip3 install -U swarms swarms-tools +``` + +## Environment Setup + +```bash +OPENAI_API_KEY="your_openai_api_key_here" +``` + +## Basic Usage + +Here's a simple example of how to create a web scraper agent: + +```python +from swarms import Agent +from swarms_tools import scrape_and_format_sync + +agent = Agent( + agent_name="Web Scraper Agent", + model_name="gpt-4o-mini", + tools=[scrape_and_format_sync], + dynamic_context_window=True, + dynamic_temperature_enabled=True, + max_loops=1, + system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full", +) + +out = agent.run( + "Scrape swarms.ai website and provide a full report of the company does. The format type should be full." +) +print(out) +``` + +## Scraping Multiple Sites + +For comprehensive research, you can scrape multiple websites simultaneously using batch execution: + +```python +from swarms.structs.multi_agent_exec import batched_grid_agent_execution +from swarms_tools import scrape_and_format_sync +from swarms import Agent + +agent = Agent( + agent_name="Web Scraper Agent", + model_name="gpt-4o-mini", + tools=[scrape_and_format_sync], + dynamic_context_window=True, + dynamic_temperature_enabled=True, + max_loops=1, + system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full", +) + +out = batched_grid_agent_execution( + agents=[agent, agent], + tasks=[ + "Scrape swarms.ai website and provide a full report of the company's mission, products, and team. The format type should be full.", + "Scrape langchain.com website and provide a full report of the company's mission, products, and team. The format type should be full.", + ], +) + +print(out) +``` + +## Conclusion + +Web scraper agents combine AI with advanced automation to efficiently gather and process web data at scale. As you master the basics, explore features like batch processing and custom tools to unlock the full power of AI-driven web scraping. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 0e2cfa4c..1effe52d 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -435,6 +435,7 @@ nav: - RAG with Qdrant: "swarms/RAG/qdrant_rag.md" - Apps: + - Web Scraper Agents: "developer_guides/web_scraper.md" - Smart Database: "examples/smart_database.md" diff --git a/examples/guides/web_scraper_agents/batched_scraper_agent.py b/examples/guides/web_scraper_agents/batched_scraper_agent.py new file mode 100644 index 00000000..6253ce05 --- /dev/null +++ b/examples/guides/web_scraper_agents/batched_scraper_agent.py @@ -0,0 +1,25 @@ +from swarms.structs.multi_agent_exec import ( + batched_grid_agent_execution, +) +from swarms_tools import scrape_and_format_sync +from swarms import Agent + +agent = Agent( + agent_name="Web Scraper Agent", + model_name="gpt-4o-mini", + tools=[scrape_and_format_sync], + dynamic_context_window=True, + dynamic_temperature_enabled=True, + max_loops=1, + system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full", +) + +out = batched_grid_agent_execution( + agents=[agent, agent], + tasks=[ + "Scrape swarms.ai website and provide a full report of the company's mission, products, and team. The format type should be full.", + "Scrape langchain.com website and provide a full report of the company's mission, products, and team. The format type should be full.", + ], +) + +print(out) diff --git a/examples/guides/web_scraper_agents/web_scraper_agent.py b/examples/guides/web_scraper_agents/web_scraper_agent.py new file mode 100644 index 00000000..8b67321b --- /dev/null +++ b/examples/guides/web_scraper_agents/web_scraper_agent.py @@ -0,0 +1,17 @@ +from swarms import Agent +from swarms_tools import scrape_and_format_sync + +agent = Agent( + agent_name="Web Scraper Agent", + model_name="gpt-4o-mini", + tools=[scrape_and_format_sync], + dynamic_context_window=True, + dynamic_temperature_enabled=True, + max_loops=1, + system_prompt="You are a web scraper agent. You are given a URL and you need to scrape the website and return the data in a structured format. The format type should be full", +) + +out = agent.run( + "Scrape swarms.ai website and provide a full report of the company does. The format type should be full." +) +print(out) diff --git a/examples/mcp/mcp_agent_tool.py b/examples/mcp/mcp_agent_tool.py index 5a07f7fd..a0489cb8 100644 --- a/examples/mcp/mcp_agent_tool.py +++ b/examples/mcp/mcp_agent_tool.py @@ -4,11 +4,14 @@ from swarms import Agent mcp = FastMCP("MCPAgentTool") + @mcp.tool( name="create_agent", description="Create an agent with the specified name, system prompt, and model, then run a task.", ) -def create_agent(agent_name: str, system_prompt: str, model_name: str, task: str) -> str: +def create_agent( + agent_name: str, system_prompt: str, model_name: str, task: str +) -> str: """ Create an agent with the given parameters and execute the specified task. @@ -30,4 +33,4 @@ def create_agent(agent_name: str, system_prompt: str, model_name: str, task: str if __name__ == "__main__": - mcp.run() \ No newline at end of file + mcp.run() diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py index 7f918274..5e840d03 100644 --- a/swarms/structs/hiearchical_swarm.py +++ b/swarms/structs/hiearchical_swarm.py @@ -51,6 +51,7 @@ from swarms.prompts.reasoning_prompt import INTERNAL_MONOLGUE_PROMPT from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.ma_utils import list_all_agents +from swarms.structs.omni_agent_types import AgentListType from swarms.tools.base_tool import BaseTool from swarms.utils.history_output_formatter import ( history_output_formatter, @@ -672,7 +673,7 @@ class HierarchicalSwarm: name: str = "HierarchicalAgentSwarm", description: str = "Distributed task swarm", director: Optional[Union[Agent, Callable, Any]] = None, - agents: List[Union[Agent, Callable, Any]] = None, + agents: AgentListType = None, max_loops: int = 1, output_type: OutputType = "dict-all-except-first", feedback_director_model_name: str = "gpt-4o-mini", @@ -822,7 +823,7 @@ class HierarchicalSwarm: # Initialize logger only if verbose is enabled if self.verbose: logger.info( - f"šŸš€ Initializing HierarchicalSwarm: {self.name}" + f"[INIT] Initializing HierarchicalSwarm: {self.name}" ) self.conversation = Conversation(time_enabled=False) @@ -847,7 +848,7 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… HierarchicalSwarm: {self.name} initialized successfully." + f"[SUCCESS] HierarchicalSwarm: {self.name} initialized successfully." ) if self.multi_agent_prompt_improvements: @@ -867,7 +868,7 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info("šŸ“ Adding agent context to director") + logger.info("[INFO] Adding agent context to director") list_all_agents( agents=self.agents, @@ -878,15 +879,15 @@ class HierarchicalSwarm: if self.verbose: logger.success( - "āœ… Agent context added to director successfully" + "[SUCCESS] Agent context added to director successfully" ) except Exception as e: error_msg = ( - f"āŒ Failed to add context to director: {str(e)}" + f"[ERROR] Failed to add context to director: {str(e)}" ) logger.error( - f"{error_msg}\nšŸ” Traceback: {traceback.format_exc()}" + f"{error_msg}\n[TRACE] Traceback: {traceback.format_exc()}" ) def setup_director(self): @@ -904,12 +905,12 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info("šŸŽÆ Setting up director agent") + logger.info("[SETUP] Setting up director agent") schema = BaseTool().base_model_to_dict(SwarmSpec) if self.verbose: - logger.debug(f"šŸ“‹ Director schema: {schema}") + logger.debug(f"[SCHEMA] Director schema: {schema}") return Agent( agent_name=self.director_name, @@ -923,7 +924,7 @@ class HierarchicalSwarm: ) except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def reliability_checks(self): @@ -963,7 +964,7 @@ class HierarchicalSwarm: ) except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def agents_no_print(self): @@ -995,7 +996,9 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info(f"šŸŽÆ Running director with task: {task}") + logger.info( + f"[RUN] Running director with task: {task}" + ) if self.planning_director_agent is not None: plan = self.planning_director_agent.run( @@ -1022,15 +1025,17 @@ class HierarchicalSwarm: ) if self.verbose: - logger.success("āœ… Director execution completed") + logger.success( + "[SUCCESS] Director execution completed" + ) logger.debug( - f"šŸ“‹ Director output type: {type(function_call)}" + f"[OUTPUT] Director output type: {type(function_call)}" ) return function_call except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) raise e @@ -1059,7 +1064,7 @@ class HierarchicalSwarm: try: if self.verbose: logger.info( - f"šŸ‘£ Executing single step for task: {task}" + f"[STEP] Executing single step for task: {task}" ) # Update dashboard for director execution @@ -1073,7 +1078,7 @@ class HierarchicalSwarm: if self.verbose: logger.info( - f"šŸ“‹ Parsed plan and {len(orders)} orders" + f"[PARSE] Parsed plan and {len(orders)} orders" ) # Update dashboard with plan and orders information @@ -1094,7 +1099,7 @@ class HierarchicalSwarm: outputs = self.execute_orders(orders) if self.verbose: - logger.info(f"⚔ Executed {len(outputs)} orders") + logger.info(f"[EXEC] Executed {len(outputs)} orders") if self.director_feedback_on is True: feedback = self.feedback_director(outputs) @@ -1102,12 +1107,14 @@ class HierarchicalSwarm: feedback = outputs if self.verbose: - logger.success("āœ… Step completed successfully") + logger.success( + "[SUCCESS] Step completed successfully" + ) return feedback except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def run( @@ -1146,7 +1153,6 @@ class HierarchicalSwarm: if task is None and self.interactive: task = self._get_interactive_task() - current_loop = 0 last_output = None @@ -1157,16 +1163,16 @@ class HierarchicalSwarm: if self.verbose: logger.info( - f"šŸš€ Starting hierarchical swarm run: {self.name}" + f"[START] Starting hierarchical swarm run: {self.name}" ) logger.info( - f"šŸ“Š Configuration - Max loops: {self.max_loops}" + f"[CONFIG] Configuration - Max loops: {self.max_loops}" ) while current_loop < self.max_loops: if self.verbose: logger.info( - f"šŸ”„ Loop {current_loop + 1}/{self.max_loops} - Processing task" + f"[LOOP] Loop {current_loop + 1}/{self.max_loops} - Processing task" ) # Update dashboard loop counter @@ -1196,11 +1202,11 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… Loop {current_loop + 1} completed successfully" + f"[SUCCESS] Loop {current_loop + 1} completed successfully" ) except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) current_loop += 1 @@ -1218,10 +1224,10 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"šŸŽ‰ Hierarchical swarm run completed: {self.name}" + f"[COMPLETE] Hierarchical swarm run completed: {self.name}" ) logger.info( - f"šŸ“Š Total loops executed: {current_loop}" + f"[STATS] Total loops executed: {current_loop}" ) return history_output_formatter( @@ -1234,7 +1240,7 @@ class HierarchicalSwarm: self.dashboard.update_director_status("ERROR") self.dashboard.stop() - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def _get_interactive_task(self) -> str: @@ -1275,7 +1281,7 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info("šŸ“ Generating director feedback") + logger.info("[FEEDBACK] Generating director feedback") task = f"History: {self.conversation.get_str()} \n\n" @@ -1302,13 +1308,13 @@ class HierarchicalSwarm: if self.verbose: logger.success( - "āœ… Director feedback generated successfully" + "[SUCCESS] Director feedback generated successfully" ) return output except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def call_single_agent( @@ -1336,7 +1342,7 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info(f"šŸ“ž Calling agent: {agent_name}") + logger.info(f"[CALL] Calling agent: {agent_name}") # Find agent by name agent = None @@ -1373,7 +1379,7 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… Agent {agent_name} completed task successfully" + f"[SUCCESS] Agent {agent_name} completed task successfully" ) return output @@ -1385,7 +1391,7 @@ class HierarchicalSwarm: agent_name, "ERROR", task, f"Error: {str(e)}" ) - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def parse_orders(self, output): @@ -1409,8 +1415,8 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info("šŸ“‹ Parsing director orders") - logger.debug(f"šŸ“Š Output type: {type(output)}") + logger.info("[PARSE] Parsing director orders") + logger.debug(f"[TYPE] Output type: {type(output)}") import json @@ -1454,7 +1460,7 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… Successfully parsed plan and {len(orders)} orders" + f"[SUCCESS] Successfully parsed plan and {len(orders)} orders" ) return plan, orders @@ -1463,7 +1469,7 @@ class HierarchicalSwarm: ) as json_err: if self.verbose: logger.warning( - f"āš ļø JSON decode error: {json_err}" + f"[WARN] JSON decode error: {json_err}" ) pass # Check if it's a direct function call format @@ -1488,7 +1494,7 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… Successfully parsed plan and {len(orders)} orders" + f"[SUCCESS] Successfully parsed plan and {len(orders)} orders" ) return plan, orders @@ -1497,7 +1503,7 @@ class HierarchicalSwarm: ) as json_err: if self.verbose: logger.warning( - f"āš ļø JSON decode error: {json_err}" + f"[WARN] JSON decode error: {json_err}" ) pass # If no function call found, raise error @@ -1515,7 +1521,7 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… Successfully parsed plan and {len(orders)} orders" + f"[SUCCESS] Successfully parsed plan and {len(orders)} orders" ) return plan, orders @@ -1529,7 +1535,7 @@ class HierarchicalSwarm: ) except Exception as e: - error_msg = f"āŒ Failed to parse orders: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to parse orders: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) raise e @@ -1552,13 +1558,13 @@ class HierarchicalSwarm: """ try: if self.verbose: - logger.info(f"⚔ Executing {len(orders)} orders") + logger.info(f"[EXEC] Executing {len(orders)} orders") outputs = [] for i, order in enumerate(orders): if self.verbose: logger.info( - f"šŸ“‹ Executing order {i+1}/{len(orders)}: {order.agent_name}" + f"[ORDER] Executing order {i+1}/{len(orders)}: {order.agent_name}" ) # Update dashboard for agent execution @@ -1590,13 +1596,13 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"āœ… All {len(orders)} orders executed successfully" + f"[SUCCESS] All {len(orders)} orders executed successfully" ) return outputs except Exception as e: - error_msg = f"āŒ Failed to setup director: {str(e)}\nšŸ” Traceback: {traceback.format_exc()}\nšŸ› If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" + error_msg = f"[ERROR] Failed to setup director: {str(e)}\n[TRACE] Traceback: {traceback.format_exc()}\n[BUG] If this issue persists, please report it at: https://github.com/kyegomez/swarms/issues" logger.error(error_msg) def batched_run( @@ -1624,10 +1630,10 @@ class HierarchicalSwarm: try: if self.verbose: logger.info( - f"šŸš€ Starting batched hierarchical swarm run: {self.name}" + f"[START] Starting batched hierarchical swarm run: {self.name}" ) logger.info( - f"šŸ“Š Configuration - Max loops: {self.max_loops}" + f"[CONFIG] Configuration - Max loops: {self.max_loops}" ) # Initialize a list to store the results @@ -1640,18 +1646,18 @@ class HierarchicalSwarm: if self.verbose: logger.success( - f"šŸŽ‰ Batched hierarchical swarm run completed: {self.name}" + f"[COMPLETE] Batched hierarchical swarm run completed: {self.name}" + ) + logger.info( + f"[STATS] Total tasks processed: {len(tasks)}" ) - logger.info(f"šŸ“Š Total tasks processed: {len(tasks)}") return results except Exception as e: - error_msg = ( - f"āŒ Batched hierarchical swarm run failed: {str(e)}" - ) + error_msg = f"[ERROR] Batched hierarchical swarm run failed: {str(e)}" if self.verbose: logger.error(error_msg) logger.error( - f"šŸ” Traceback: {traceback.format_exc()}" + f"[TRACE] Traceback: {traceback.format_exc()}" ) diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 7c764b36..d4119a7a 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -144,7 +144,6 @@ def run_agents_concurrently_multiprocess( def batched_grid_agent_execution( agents: List[AgentType], tasks: List[str], - max_workers: int = None, ) -> List[Any]: """ Run multiple agents with different tasks concurrently. @@ -158,9 +157,6 @@ def batched_grid_agent_execution( "The number of agents must match the number of tasks." ) - if max_workers is None: - max_workers = os.cpu_count() - results = [] for agent, task in zip(agents, tasks): diff --git a/swarms/structs/multi_agent_router.py b/swarms/structs/multi_agent_router.py index c8a71b16..ca719603 100644 --- a/swarms/structs/multi_agent_router.py +++ b/swarms/structs/multi_agent_router.py @@ -1,20 +1,9 @@ -""" -Todo: - -- Add multi-agent selection for a task and then run them automatically -- Add shared memory for large instances of agents - - - -""" - import os from typing import List, Optional from loguru import logger from pydantic import BaseModel, Field from swarms.utils.function_caller_model import OpenAIFunctionCaller -from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.utils.output_types import OutputType from swarms.utils.any_to_str import any_to_str @@ -22,7 +11,7 @@ from swarms.utils.history_output_formatter import ( history_output_formatter, ) from swarms.utils.formatter import formatter -from typing import Callable, Union +from swarms.structs.omni_agent_types import AgentListType class AgentResponse(BaseModel): @@ -60,7 +49,7 @@ class MultiAgentRouter: self, name: str = "swarm-router", description: str = "Routes tasks to specialized agents based on their capabilities", - agents: List[Union[Agent, Callable]] = [], + agents: AgentListType = None, model: str = "gpt-4o-mini", temperature: float = 0.1, shared_memory_system: callable = None, @@ -86,7 +75,6 @@ class MultiAgentRouter: self.model = model self.temperature = temperature self.if_print = if_print - # Initialize Agents self.agents = {agent.name: agent for agent in agents} self.conversation = Conversation() diff --git a/tests/structs/test_support_mcp.py b/tests/structs/test_support_mcp.py index 4d46baff..084be573 100644 --- a/tests/structs/test_support_mcp.py +++ b/tests/structs/test_support_mcp.py @@ -19,7 +19,7 @@ from swarms.tools.mcp_client_call import ( execute_multiple_tools_on_multiple_mcp_servers_sync, execute_tool_call_simple, get_mcp_tools_sync, - get_tools_for_multiple_mcp_servers, + get_tools_for_multiple_mcp_servers, transform_mcp_tool_to_openai_tool, transform_openai_tool_call_request_to_mcp_tool_call_request, ) @@ -31,23 +31,26 @@ logger.add("test_results.log", rotation="10 MB", level="DEBUG") TEST_CONFIG = { "server_url": "http://localhost:8080/mcp", "transport": "streamable_http", - "timeout": 10 + "timeout": 10, } # Test results storage test_results = [] -def log_test_result(test_name: str, status: str, message: str = "", error: str = ""): + +def log_test_result( + test_name: str, status: str, message: str = "", error: str = "" +): """Log test result and add to results list""" result = { "test_name": test_name, "status": status, "message": message, "error": error, - "timestamp": datetime.now().isoformat() + "timestamp": datetime.now().isoformat(), } test_results.append(result) - + if status == "PASS": logger.success(f"āœ“ {test_name}: {message}") elif status == "FAIL": @@ -55,10 +58,11 @@ def log_test_result(test_name: str, status: str, message: str = "", error: str = else: logger.info(f"~ {test_name}: {message}") + def test_transform_mcp_tool_to_openai_tool(): """Test MCP tool to OpenAI tool transformation""" test_name = "test_transform_mcp_tool_to_openai_tool" - + try: # Create mock MCP tool class MockMCPTool: @@ -66,403 +70,577 @@ def test_transform_mcp_tool_to_openai_tool(): self.name = name self.description = description self.inputSchema = input_schema - + mock_tool = MockMCPTool( name="test_function", description="Test function description", - input_schema={"type": "object", "properties": {"param1": {"type": "string"}}} + input_schema={ + "type": "object", + "properties": {"param1": {"type": "string"}}, + }, ) - + result = transform_mcp_tool_to_openai_tool(mock_tool) - + # Validate result structure assert result["type"] == "function" assert result["function"]["name"] == "test_function" - assert result["function"]["description"] == "Test function description" + assert ( + result["function"]["description"] + == "Test function description" + ) assert result["function"]["parameters"]["type"] == "object" - - log_test_result(test_name, "PASS", "Successfully transformed MCP tool to OpenAI format") - + + log_test_result( + test_name, + "PASS", + "Successfully transformed MCP tool to OpenAI format", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to transform tool: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to transform tool: {str(e)}", + ) + def test_get_function_arguments(): """Test function argument extraction""" test_name = "test_get_function_arguments" - + try: # Test with dict arguments - function_def = {"arguments": {"param1": "value1", "param2": "value2"}} + function_def = { + "arguments": {"param1": "value1", "param2": "value2"} + } result = _get_function_arguments(function_def) assert isinstance(result, dict) assert result["param1"] == "value1" - + # Test with string arguments - function_def_str = {"arguments": '{"param1": "value1", "param2": "value2"}'} + function_def_str = { + "arguments": '{"param1": "value1", "param2": "value2"}' + } result_str = _get_function_arguments(function_def_str) assert isinstance(result_str, dict) assert result_str["param1"] == "value1" - + # Test with empty arguments function_def_empty = {} result_empty = _get_function_arguments(function_def_empty) assert result_empty == {} - - log_test_result(test_name, "PASS", "Successfully extracted function arguments in all formats") - + + log_test_result( + test_name, + "PASS", + "Successfully extracted function arguments in all formats", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to extract arguments: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to extract arguments: {str(e)}", + ) + def test_transform_openai_tool_call_request_to_mcp_tool_call_request(): """Test OpenAI tool call to MCP tool call transformation""" test_name = "test_transform_openai_tool_call_request_to_mcp_tool_call_request" - + try: openai_tool = { "function": { "name": "test_function", - "arguments": {"param1": "value1", "param2": "value2"} + "arguments": {"param1": "value1", "param2": "value2"}, } } - - result = transform_openai_tool_call_request_to_mcp_tool_call_request(openai_tool) - + + result = transform_openai_tool_call_request_to_mcp_tool_call_request( + openai_tool + ) + assert result.name == "test_function" assert result.arguments["param1"] == "value1" assert result.arguments["param2"] == "value2" - - log_test_result(test_name, "PASS", "Successfully transformed OpenAI tool call to MCP format") - + + log_test_result( + test_name, + "PASS", + "Successfully transformed OpenAI tool call to MCP format", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to transform tool call: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to transform tool call: {str(e)}", + ) + def test_auto_detect_transport(): """Test transport auto-detection""" test_name = "test_auto_detect_transport" - + try: # Test HTTP URL http_url = "http://localhost:8080/mcp" transport = auto_detect_transport(http_url) assert transport == "streamable_http" - + # Test HTTPS URL https_url = "https://example.com/mcp" transport = auto_detect_transport(https_url) assert transport == "streamable_http" - + # Test WebSocket URL ws_url = "ws://localhost:8080/mcp" transport = auto_detect_transport(ws_url) assert transport == "sse" - + # Test stdio stdio_url = "stdio://local" transport = auto_detect_transport(stdio_url) assert transport == "stdio" - + # Test unknown scheme unknown_url = "unknown://test" transport = auto_detect_transport(unknown_url) assert transport == "sse" # Default - - log_test_result(test_name, "PASS", "Successfully auto-detected all transport types") - + + log_test_result( + test_name, + "PASS", + "Successfully auto-detected all transport types", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to auto-detect transport: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to auto-detect transport: {str(e)}", + ) + def test_connect_to_mcp_server(): """Test MCP server connection configuration""" test_name = "test_connect_to_mcp_server" - + try: from swarms.schemas.mcp_schemas import MCPConnection - + # Create connection object connection = MCPConnection( url="http://localhost:8080/mcp", transport="streamable_http", timeout=10, headers={"Content-Type": "application/json"}, - authorization_token="test_token" + authorization_token="test_token", + ) + + headers, timeout, transport, url = connect_to_mcp_server( + connection ) - - headers, timeout, transport, url = connect_to_mcp_server(connection) - + assert url == "http://localhost:8080/mcp" assert transport == "streamable_http" assert timeout == 10 assert "Authorization" in headers assert headers["Authorization"] == "Bearer test_token" - - log_test_result(test_name, "PASS", "Successfully configured MCP server connection") - + + log_test_result( + test_name, + "PASS", + "Successfully configured MCP server connection", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to configure connection: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to configure connection: {str(e)}", + ) + async def test_aget_mcp_tools(): """Test async MCP tools fetching""" test_name = "test_aget_mcp_tools" - + try: # This will attempt to connect to the actual server tools = await aget_mcp_tools( server_path=TEST_CONFIG["server_url"], format="openai", - transport=TEST_CONFIG["transport"] + transport=TEST_CONFIG["transport"], ) - + assert isinstance(tools, list) - log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools from server") - + log_test_result( + test_name, + "PASS", + f"Successfully fetched {len(tools)} tools from server", + ) + except MCPConnectionError as e: - log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + log_test_result( + test_name, "SKIP", f"Server not available: {str(e)}" + ) except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to fetch tools: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to fetch tools: {str(e)}", + ) + def test_get_mcp_tools_sync(): """Test synchronous MCP tools fetching""" test_name = "test_get_mcp_tools_sync" - + try: tools = get_mcp_tools_sync( server_path=TEST_CONFIG["server_url"], format="openai", - transport=TEST_CONFIG["transport"] + transport=TEST_CONFIG["transport"], ) - + assert isinstance(tools, list) - log_test_result(test_name, "PASS", f"Successfully fetched {len(tools)} tools synchronously") - + log_test_result( + test_name, + "PASS", + f"Successfully fetched {len(tools)} tools synchronously", + ) + except MCPConnectionError as e: - log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + log_test_result( + test_name, "SKIP", f"Server not available: {str(e)}" + ) except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to fetch tools sync: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to fetch tools sync: {str(e)}", + ) + def test_fetch_tools_for_server(): """Test fetching tools for a single server""" test_name = "test_fetch_tools_for_server" - + try: tools = _fetch_tools_for_server( url=TEST_CONFIG["server_url"], format="openai", - transport=TEST_CONFIG["transport"] + transport=TEST_CONFIG["transport"], ) - + assert isinstance(tools, list) - log_test_result(test_name, "PASS", f"Successfully fetched tools for single server: {len(tools)} tools") - + log_test_result( + test_name, + "PASS", + f"Successfully fetched tools for single server: {len(tools)} tools", + ) + except MCPConnectionError as e: - log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + log_test_result( + test_name, "SKIP", f"Server not available: {str(e)}" + ) except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to fetch tools for server: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to fetch tools for server: {str(e)}", + ) + def test_get_tools_for_multiple_mcp_servers(): """Test fetching tools from multiple servers""" test_name = "test_get_tools_for_multiple_mcp_servers" - + try: - urls = [TEST_CONFIG["server_url"]] # Using single server for testing - + urls = [ + TEST_CONFIG["server_url"] + ] # Using single server for testing + tools = get_tools_for_multiple_mcp_servers( urls=urls, format="openai", transport=TEST_CONFIG["transport"], - max_workers=2 + max_workers=2, ) - + assert isinstance(tools, list) - log_test_result(test_name, "PASS", f"Successfully fetched tools from multiple servers: {len(tools)} tools") - + log_test_result( + test_name, + "PASS", + f"Successfully fetched tools from multiple servers: {len(tools)} tools", + ) + except MCPConnectionError as e: - log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + log_test_result( + test_name, "SKIP", f"Server not available: {str(e)}" + ) except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to fetch tools from multiple servers: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to fetch tools from multiple servers: {str(e)}", + ) + async def test_execute_tool_call_simple(): """Test simple tool execution""" test_name = "test_execute_tool_call_simple" - + try: # First try to get available tools try: tools = await aget_mcp_tools( server_path=TEST_CONFIG["server_url"], format="openai", - transport=TEST_CONFIG["transport"] + transport=TEST_CONFIG["transport"], ) - + if not tools: - log_test_result(test_name, "SKIP", "No tools available for testing") + log_test_result( + test_name, + "SKIP", + "No tools available for testing", + ) return - + # Use the first available tool for testing first_tool = tools[0] tool_name = first_tool["function"]["name"] - + # Create a basic tool call request tool_call_request = { "function": { "name": tool_name, - "arguments": {} # Basic empty arguments + "arguments": {}, # Basic empty arguments } } - + result = await execute_tool_call_simple( response=tool_call_request, server_path=TEST_CONFIG["server_url"], transport=TEST_CONFIG["transport"], - output_type="str" + output_type="str", ) - + assert result is not None - log_test_result(test_name, "PASS", f"Successfully executed tool call for {tool_name}") - + log_test_result( + test_name, + "PASS", + f"Successfully executed tool call for {tool_name}", + ) + except MCPConnectionError: - log_test_result(test_name, "SKIP", "Server not available for tool execution test") - + log_test_result( + test_name, + "SKIP", + "Server not available for tool execution test", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to execute tool call: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to execute tool call: {str(e)}", + ) + async def test_create_server_tool_mapping(): """Test server tool mapping creation""" test_name = "test_create_server_tool_mapping" - + try: urls = [TEST_CONFIG["server_url"]] - + mapping = await _create_server_tool_mapping_async( urls=urls, format="openai", - transport=TEST_CONFIG["transport"] + transport=TEST_CONFIG["transport"], ) - + assert isinstance(mapping, dict) - log_test_result(test_name, "PASS", f"Successfully created server tool mapping with {len(mapping)} functions") - + log_test_result( + test_name, + "PASS", + f"Successfully created server tool mapping with {len(mapping)} functions", + ) + except MCPConnectionError as e: - log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + log_test_result( + test_name, "SKIP", f"Server not available: {str(e)}" + ) except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to create server tool mapping: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to create server tool mapping: {str(e)}", + ) + async def test_execute_multiple_tools_on_multiple_servers(): """Test executing multiple tools across servers""" test_name = "test_execute_multiple_tools_on_multiple_servers" - + try: urls = [TEST_CONFIG["server_url"]] - + # First get available tools try: tools = await aget_mcp_tools( server_path=TEST_CONFIG["server_url"], format="openai", - transport=TEST_CONFIG["transport"] + transport=TEST_CONFIG["transport"], ) - + if not tools: - log_test_result(test_name, "SKIP", "No tools available for testing") + log_test_result( + test_name, + "SKIP", + "No tools available for testing", + ) return - + # Create test requests using available tools responses = [] for tool in tools[:2]: # Test with first 2 tools tool_call = { "function": { "name": tool["function"]["name"], - "arguments": {} + "arguments": {}, } } responses.append(tool_call) - + if not responses: - log_test_result(test_name, "SKIP", "No suitable tools found for testing") + log_test_result( + test_name, + "SKIP", + "No suitable tools found for testing", + ) return - - results = await execute_multiple_tools_on_multiple_mcp_servers( - responses=responses, - urls=urls, - transport=TEST_CONFIG["transport"], - max_concurrent=2 + + results = ( + await execute_multiple_tools_on_multiple_mcp_servers( + responses=responses, + urls=urls, + transport=TEST_CONFIG["transport"], + max_concurrent=2, + ) ) - + assert isinstance(results, list) - log_test_result(test_name, "PASS", f"Successfully executed {len(results)} tool calls") - + log_test_result( + test_name, + "PASS", + f"Successfully executed {len(results)} tool calls", + ) + except MCPConnectionError: - log_test_result(test_name, "SKIP", "Server not available for multiple tool execution test") - + log_test_result( + test_name, + "SKIP", + "Server not available for multiple tool execution test", + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed to execute multiple tools: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed to execute multiple tools: {str(e)}", + ) + def test_execute_multiple_tools_sync(): """Test synchronous multiple tool execution""" test_name = "test_execute_multiple_tools_sync" - + try: urls = [TEST_CONFIG["server_url"]] - + # Create minimal test requests responses = [ { "function": { "name": "test_function", # This will likely fail but tests the sync wrapper - "arguments": {} + "arguments": {}, } } ] - + results = execute_multiple_tools_on_multiple_mcp_servers_sync( responses=responses, urls=urls, transport=TEST_CONFIG["transport"], - max_concurrent=1 + max_concurrent=1, ) - + assert isinstance(results, list) - log_test_result(test_name, "PASS", f"Successfully ran sync multiple tools execution (got {len(results)} results)") - + log_test_result( + test_name, + "PASS", + f"Successfully ran sync multiple tools execution (got {len(results)} results)", + ) + except MCPConnectionError as e: - log_test_result(test_name, "SKIP", f"Server not available: {str(e)}") + log_test_result( + test_name, "SKIP", f"Server not available: {str(e)}" + ) except Exception as e: - log_test_result(test_name, "FAIL", error=f"Failed sync multiple tools execution: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Failed sync multiple tools execution: {str(e)}", + ) + def test_error_handling(): """Test error handling for various scenarios""" test_name = "test_error_handling" - + try: # Test invalid server URL try: get_mcp_tools_sync( server_path="http://invalid-url:99999/mcp", - transport="streamable_http" + transport="streamable_http", ) assert False, "Should have raised an exception" except MCPConnectionError: pass # Expected - + # Test invalid connection object try: connect_to_mcp_server("invalid_connection") assert False, "Should have raised an exception" except MCPValidationError: pass # Expected - + # Test invalid transport detection transport = auto_detect_transport("") assert transport == "sse" # Should default to sse - - log_test_result(test_name, "PASS", "All error handling tests passed") - + + log_test_result( + test_name, "PASS", "All error handling tests passed" + ) + except Exception as e: - log_test_result(test_name, "FAIL", error=f"Error handling test failed: {str(e)}") + log_test_result( + test_name, + "FAIL", + error=f"Error handling test failed: {str(e)}", + ) + async def run_all_tests(): """Run all test functions""" logger.info("Starting MCP unit tests...") - + # Synchronous tests test_transform_mcp_tool_to_openai_tool() test_get_function_arguments() @@ -474,22 +652,25 @@ async def run_all_tests(): test_get_tools_for_multiple_mcp_servers() test_execute_multiple_tools_sync() test_error_handling() - + # Asynchronous tests await test_aget_mcp_tools() await test_execute_tool_call_simple() await test_create_server_tool_mapping() await test_execute_multiple_tools_on_multiple_servers() - - logger.info(f"Completed all tests. Total tests run: {len(test_results)}") + + logger.info( + f"Completed all tests. Total tests run: {len(test_results)}" + ) + def generate_markdown_report(): """Generate markdown report of test results""" - + passed_tests = [r for r in test_results if r["status"] == "PASS"] failed_tests = [r for r in test_results if r["status"] == "FAIL"] skipped_tests = [r for r in test_results if r["status"] == "SKIP"] - + markdown_content = f"""# MCP Unit Test Results ## Summary @@ -508,32 +689,44 @@ def generate_markdown_report(): ### āœ… Passed Tests ({len(passed_tests)}) """ - + for test in passed_tests: - markdown_content += f"- **{test['test_name']}**: {test['message']}\n" - + markdown_content += ( + f"- **{test['test_name']}**: {test['message']}\n" + ) + if failed_tests: - markdown_content += f"\n### āŒ Failed Tests ({len(failed_tests)})\n" + markdown_content += ( + f"\n### āŒ Failed Tests ({len(failed_tests)})\n" + ) for test in failed_tests: - markdown_content += f"- **{test['test_name']}**: {test['error']}\n" - + markdown_content += ( + f"- **{test['test_name']}**: {test['error']}\n" + ) + if skipped_tests: - markdown_content += f"\n### ā­ļø Skipped Tests ({len(skipped_tests)})\n" + markdown_content += ( + f"\n### ā­ļø Skipped Tests ({len(skipped_tests)})\n" + ) for test in skipped_tests: - markdown_content += f"- **{test['test_name']}**: {test['message']}\n" - + markdown_content += ( + f"- **{test['test_name']}**: {test['message']}\n" + ) + markdown_content += """ ## Detailed Results | Test Name | Status | Message/Error | Timestamp | |-----------|---------|---------------|-----------| """ - + for test in test_results: - status_emoji = {"PASS": "āœ…", "FAIL": "āŒ", "SKIP": "ā­ļø"}.get(test["status"], "ā“") + status_emoji = {"PASS": "āœ…", "FAIL": "āŒ", "SKIP": "ā­ļø"}.get( + test["status"], "ā“" + ) message = test.get("message") or test.get("error", "") markdown_content += f"| {test['test_name']} | {status_emoji} {test['status']} | {message} | {test['timestamp']} |\n" - + markdown_content += f""" ## Notes - Tests marked as SKIP typically indicate the MCP server was not available at {TEST_CONFIG["server_url"]} @@ -542,27 +735,34 @@ def generate_markdown_report(): Generated at: {datetime.now().isoformat()} """ - + return markdown_content + async def main(): """Main test runner""" try: await run_all_tests() - + # Generate and save markdown report markdown_report = generate_markdown_report() - + with open("mcp_test_results.md", "w") as f: f.write(markdown_report) - + logger.info("Test results saved to mcp_test_results.md") - + # Print summary - passed = len([r for r in test_results if r["status"] == "PASS"]) - failed = len([r for r in test_results if r["status"] == "FAIL"]) - skipped = len([r for r in test_results if r["status"] == "SKIP"]) - + passed = len( + [r for r in test_results if r["status"] == "PASS"] + ) + failed = len( + [r for r in test_results if r["status"] == "FAIL"] + ) + skipped = len( + [r for r in test_results if r["status"] == "SKIP"] + ) + print(f"\n{'='*50}") print("TEST SUMMARY") print(f"{'='*50}") @@ -572,10 +772,11 @@ async def main(): print(f"Skipped: {skipped}") print(f"Success Rate: {(passed/len(test_results)*100):.1f}%") print(f"{'='*50}") - + except Exception as e: logger.error(f"Error running tests: {str(e)}") logger.error(f"Traceback: {traceback.format_exc()}") + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main())