diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 4027d032..c06f7daa 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -315,6 +315,10 @@ nav:
- AgentRegistry: "swarms/structs/agent_registry.md"
- Communication Structure: "swarms/structs/conversation.md"
+
+ - Protocol:
+ - Overview: "swarms/protocol/overview.md"
+ - SIPs: "swarms/protocol/sip.md"
- Tools:
- Overview: "swarms_tools/overview.md"
diff --git a/docs/protocol/bc.md b/docs/protocol/bc.md
new file mode 100644
index 00000000..be6c0035
--- /dev/null
+++ b/docs/protocol/bc.md
@@ -0,0 +1 @@
+# Backwards Compatability
\ No newline at end of file
diff --git a/docs/protocol/overview.md b/docs/protocol/overview.md
new file mode 100644
index 00000000..43cdec2a
--- /dev/null
+++ b/docs/protocol/overview.md
@@ -0,0 +1,416 @@
+# Swarms Protocol Overview & Architecture
+
+This document provides a comprehensive overview of the Swarms protocol architecture, illustrating the flow from agent classes to multi-agent structures, and showcasing the main components and folders within the `swarms/` package. The Swarms framework is designed for extensibility, modularity, and production-readiness, enabling the orchestration of intelligent agents, tools, memory, and complex multi-agent systems.
+
+---
+
+## Introduction
+
+Swarms is an enterprise-grade, production-ready multi-agent orchestration framework. It enables developers and organizations to build, deploy, and manage intelligent agents that can reason, collaborate, and solve complex tasks autonomously or in groups. The architecture is inspired by the principles of modularity, composability, and scalability, ensuring that each component can be extended or replaced as needed.
+
+The protocol is structured to support a wide range of use cases, from simple single-agent automations to sophisticated multi-agent workflows involving memory, tool use, and advanced reasoning.
+
+For a high-level introduction and installation instructions, see the [Swarms Docs Home](https://docs.swarms.world/en/latest/).
+
+---
+
+## High-Level Architecture Flow
+
+The Swarms protocol is organized into several key layers, each responsible for a specific aspect of the system. The typical flow is as follows:
+
+1. **Agent Class (`swarms/agents`)**
+
+ - The core building block of the framework. Agents encapsulate logic, state, and behavior. They can be simple (stateless) or complex
+ (stateful, with memory and reasoning capabilities).
+
+ - Agents can be specialized for different tasks (e.g., reasoning agents, tool agents, judge agents, etc.).
+
+
+ - Example: A `ReasoningAgent` that can analyze data and make decisions, or a `ToolAgent` that wraps external APIs.
+
+
+ - [Quickstart for Agents](https://docs.swarms.world/en/latest/swarms/agents/)
+
+
+ - [Agent API Reference](https://docs.swarms.world/en/latest/swarms/structs/agent/)
+
+
+2. **Tools with Memory (`swarms/tools`, `swarms/utils`)**
+ - Tools are modular components that agents use to interact with the outside world, perform computations, or access resources (APIs,
+ databases, files, etc.).
+
+ - Memory modules and utility functions allow agents to retain context, cache results, and manage state across interactions.
+
+ - Example: A tool for calling an LLM API, a memory cache for conversation history, or a utility for parsing and formatting data.
+
+ - [Tools Overview](https://docs.swarms.world/en/latest/swarms_tools/overview/)
+
+ - [BaseTool Reference](https://docs.swarms.world/en/latest/swarms/tools/base_tool/)
+
+
+3. **Reasoning & Specialized Agents (`swarms/agents`)**
+ - These agents build on the base agent class, adding advanced reasoning, self-consistency, and specialized logic for tasks like
+ planning, evaluation, or multi-step workflows.
+
+ - Includes agents for self-reflection, iterative improvement, and domain-specific expertise.
+
+ - Example: A `SelfConsistencyAgent` that aggregates multiple reasoning paths, or a `JudgeAgent` that evaluates outputs from other
+ agents.
+
+ - [Reasoning Agents Overview](https://docs.swarms.world/en/latest/swarms/agents/reasoning_agents_overview/)
+
+ - [Self Consistency Agent](https://docs.swarms.world/en/latest/swarms/agents/consistency_agent/)
+
+ - [Agent Judge](https://docs.swarms.world/en/latest/swarms/agents/agent_judge/)
+
+
+4. **Multi-Agent Structures (`swarms/structs`)**
+ - Agents are composed into higher-order structures for collaboration, voting, parallelism, and workflow orchestration.
+
+ - Includes swarms for majority voting, round-robin execution, hierarchical delegation, and more.
+
+ - Example: A `MajorityVotingSwarm` that aggregates outputs from several agents, or a `HierarchicalSwarm` that delegates tasks to
+ sub-agents.
+
+ - [Multi-Agent Architectures Overview](https://docs.swarms.world/en/latest/swarms/concept/swarm_architectures/)
+
+ - [MajorityVotingSwarm](https://docs.swarms.world/en/latest/swarms/structs/majorityvoting/)
+
+ - [HierarchicalSwarm](https://docs.swarms.world/en/latest/swarms/structs/hierarchical_swarm/)
+
+ - [Sequential Workflow](https://docs.swarms.world/en/latest/swarms/structs/sequential_workflow/)
+
+ - [Concurrent Workflow](https://docs.swarms.world/en/latest/swarms/structs/concurrentworkflow/)
+
+
+5. **Supporting Components**
+
+ - **Communication (`swarms/communication`)**: Provides wrappers for inter-agent communication, database access, message passing, and
+ integration with external systems (e.g., Redis, DuckDB, Pulsar). See [Communication Structure](https://docs.swarms.world/en/latest/swarms/structs/conversation/)
+
+ - **Artifacts (`swarms/artifacts`)**: Manages the creation, storage, and retrieval of artifacts (outputs, files, logs) generated by
+ agents and swarms.
+
+ - **Prompts (`swarms/prompts`)**: Houses prompt templates, system prompts, and agent-specific prompts for LLM-based agents. See
+ [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/)
+
+ - **Telemetry (`swarms/telemetry`)**: Handles logging, monitoring, and bootup routines for observability and debugging.
+
+ - **Schemas (`swarms/schemas`)**: Defines data schemas for agents, tools, completions, and communication protocols, ensuring type
+ safety and consistency.
+
+ - **CLI (`swarms/cli`)**: Provides command-line utilities for agent creation, management, and orchestration. See [CLI Documentation]
+ (https://docs.swarms.world/en/latest/swarms/cli/main/)
+
+---
+
+## Detailed Architecture Diagram
+
+The following Mermaid diagram visualizes the protocol flow and the relationship between the main folders in the `swarms/` package:
+
+```mermaid
+flowchart TD
+ A["Agent Class
(swarms/agents)"] --> B["Tools with Memory
(swarms/tools, swarms/utils)"]
+ B --> C["Reasoning & Specialized Agents
(swarms/agents)"]
+ C --> D["Multi-Agent Structures
(swarms/structs)"]
+ D --> E["Communication, Artifacts, Prompts, Telemetry, Schemas, CLI"]
+
+ subgraph Folders
+ A1["agents"]
+ A2["tools"]
+ A3["structs"]
+ A4["utils"]
+ A5["telemetry"]
+ A6["schemas"]
+ A7["prompts"]
+ A8["artifacts"]
+ A9["communication"]
+ A10["cli"]
+ end
+
+ %% Folder showcase
+ subgraph "swarms/"
+ A1
+ A2
+ A3
+ A4
+ A5
+ A6
+ A7
+ A8
+ A9
+ A10
+ end
+
+ %% Connect folder showcase to main flow
+ A1 -.-> A
+ A2 -.-> B
+ A3 -.-> D
+ A4 -.-> B
+ A5 -.-> E
+ A6 -.-> E
+ A7 -.-> E
+ A8 -.-> E
+ A9 -.-> E
+ A10 -.-> E
+```
+
+---
+
+## Folder-by-Folder Breakdown
+
+### `agents/`
+
+**Purpose:** Defines all agent classes, including base agents, reasoning agents, tool agents, judge agents, and more.
+
+**Highlights:**
+
+- Modular agent design for extensibility.
+
+- Support for YAML-based agent creation and configuration. See [YAML Agent Creation](https://docs.swarms.world/en/latest/swarms/
+agents/create_agents_yaml/)
+
+- Specialized agents for self-consistency, evaluation, and domain-specific tasks.
+
+- **Example:**
+
+- `ReasoningAgent`, `ToolAgent`, `JudgeAgent`, `ConsistencyAgent`, `OpenAIAssistant`, etc.
+
+- [Agents Overview](https://docs.swarms.world/en/latest/swarms/framework/agents_explained/)
+
+
+### `tools/`
+
+**Purpose:** Houses all tool-related logic, including tool registry, function calling, tool schemas, and integration with external
+APIs.
+
+**Highlights:**
+
+- Tools can be dynamically registered and called by agents.
+
+- Support for OpenAI function calling, Cohere, and custom tool schemas.
+
+- Utilities for parsing, formatting, and executing tool calls.
+
+- **Example:**
+
+- `base_tool.py`, `tool_registry.py`, `mcp_client_call.py`, `func_calling_utils.py`, etc.
+
+- [Tools Reference](https://docs.swarms.world/en/latest/swarms/tools/tools_examples/)
+
+- [What are tools?](https://docs.swarms.world/en/latest/swarms/tools/build_tool/)
+
+
+### `structs/`
+**Purpose:** Implements multi-agent structures, workflows, routers, registries, and orchestration logic.
+
+**Highlights:**
+
+- Swarms for majority voting, round-robin, hierarchical delegation, spreadsheet processing, and more.
+
+- Workflow orchestration (sequential, concurrent, graph-based).
+
+- Utilities for agent matching, rearrangement, and evaluation.
+
+- **Example:**
+
+- `MajorityVotingSwarm`, `HierarchicalSwarm`, `SwarmRouter`, `SequentialWorkflow`, `ConcurrentWorkflow`, etc.
+
+- [Custom Multi Agent Architectures](https://docs.swarms.world/en/latest/swarms/structs/custom_swarm/)
+
+- [SwarmRouter](https://docs.swarms.world/en/latest/swarms/structs/swarm_router/)
+
+- [AgentRearrange](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/)
+
+
+### `utils/`
+
+**Purpose:** Provides utility functions, memory management, caching, wrappers, and helpers used throughout the framework.
+
+**Highlights:**
+
+- Memory and caching for agents and tools. See [Integrating RAG with Agents](https://docs.swarms.world/en/latest/swarms/memory/
+diy_memory/)
+
+- Wrappers for concurrency, logging, and data processing.
+
+- General-purpose utilities for string, file, and data manipulation.
+
+**Example:**
+
+- `agent_cache.py`, `concurrent_wrapper.py`, `file_processing.py`, `formatter.py`, etc.
+
+
+### `telemetry/`
+
+**Purpose:** Handles telemetry, logging, monitoring, and bootup routines for the framework.
+
+**Highlights:**
+
+- Centralized logging and execution tracking.
+
+- Bootup routines for initializing the framework.
+
+- Utilities for monitoring agent and swarm performance.
+
+- **Example:**
+
+- `bootup.py`, `log_executions.py`, `main.py`.
+
+
+### `schemas/`
+
+**Purpose:** Defines data schemas for agents, tools, completions, and communication protocols.
+
+**Highlights:**
+
+- Ensures type safety and consistency across the framework.
+
+- Pydantic-based schemas for validation and serialization.
+
+- Schemas for agent classes, tool calls, completions, and more.
+
+**Example:**
+
+- `agent_class_schema.py`, `tool_schema_base_model.py`, `agent_completion_response.py`, etc.
+
+
+### `prompts/`
+
+**Purpose:** Contains prompt templates, system prompts, and agent-specific prompts for LLM-based agents.
+
+**Highlights:**
+
+- Modular prompt design for easy customization.
+
+- Support for multi-modal, collaborative, and domain-specific prompts.
+
+- Templates for system, task, and conversational prompts.
+
+**Example:**
+
+- `prompt.py`, `reasoning_prompt.py`, `multi_agent_collab_prompt.py`, etc.
+
+- [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/)
+
+
+### `artifacts/`
+
+**Purpose:** Manages the creation, storage, and retrieval of artifacts (outputs, files, logs) generated by agents and swarms.
+
+**Highlights:**
+
+- Artifact management for reproducibility and traceability.
+- Support for various output types and formats.
+
+**Example:**
+
+- `main_artifact.py`.
+
+
+### `communication/`
+
+**Purpose:** Provides wrappers for inter-agent communication, database access, message passing, and integration with external systems.
+
+**Highlights:**
+
+- Support for Redis, DuckDB, Pulsar, Supabase, and more.
+- Abstractions for message passing and data exchange between agents.
+
+**Example:**
+
+- `redis_wrap.py`, `duckdb_wrap.py`, `base_communication.py`, etc.
+
+- [Communication Structure](https://docs.swarms.world/en/latest/swarms/structs/conversation/)
+
+
+### `cli/`
+
+**Purpose:** Command-line utilities for agent creation, management, and orchestration.
+
+**Highlights:**
+
+- Scripts for onboarding, agent creation, and management.
+
+- CLI entry points for interacting with the framework.
+
+**Example:**
+
+- `main.py`, `create_agent.py`, `onboarding_process.py`.
+
+- [CLI Documentation](https://docs.swarms.world/en/latest/swarms/cli/main/)
+
+
+---
+
+## How the System Works Together
+
+The Swarms protocol is designed for composability. Agents can be created and configured independently, then composed into larger structures (swarms) for collaborative or competitive workflows. Tools and memory modules are injected into agents as needed, enabling them to perform complex tasks and retain context. Multi-agent structures orchestrate the flow of information and decision-making, while supporting components (communication, telemetry, artifacts, etc.) ensure robustness, observability, and extensibility.
+
+For example, a typical workflow might involve:
+
+- Creating a set of specialized agents (e.g., data analyst, summarizer, judge).
+
+- Registering tools (e.g., LLM API, database access, web search) and memory modules.
+
+- Composing agents into a `MajorityVotingSwarm` for collaborative decision-making.
+
+- Using communication wrappers to exchange data between agents and external systems.
+
+- Logging all actions and outputs for traceability and debugging.
+
+
+For more advanced examples, see the [Examples Overview](https://docs.swarms.world/en/latest/examples/index/).
+
+---
+
+## Swarms Framework Philosophy
+
+Swarms is built on the following principles:
+
+- **Modularity:** Every component (agent, tool, prompt, schema) is a module that can be extended or replaced.
+
+- **Composability:** Agents and tools can be composed into larger structures for complex workflows.
+
+- **Observability:** Telemetry and artifact management ensure that all actions are traceable and debuggable.
+
+- **Extensibility:** New agents, tools, and workflows can be added with minimal friction.
+
+- **Production-Readiness:** The framework is designed for reliability, scalability, and real-world deployment.
+
+
+For more on the philosophy and architecture, see [Development Philosophy & Principles](https://docs.swarms.world/en/latest/swarms/concept/philosophy/) and [Understanding Swarms Architecture](https://docs.swarms.world/en/latest/swarms/concept/framework_architecture/).
+
+---
+
+## Further Reading & References
+
+- [Swarms Docs Home](https://docs.swarms.world/en/latest/)
+
+- [Quickstart for Agents](https://docs.swarms.world/en/latest/swarms/agents/)
+
+- [Agent API Reference](https://docs.swarms.world/en/latest/swarms/structs/agent/)
+
+- [Tools Overview](https://docs.swarms.world/en/latest/swarms_tools/overview/)
+
+- [BaseTool Reference](https://docs.swarms.world/en/latest/swarms/tools/base_tool/)
+
+- [Reasoning Agents Overview](https://docs.swarms.world/en/latest/swarms/agents/reasoning_agents_overview/)
+
+- [Multi-Agent Architectures Overview](https://docs.swarms.world/en/latest/swarms/concept/swarm_architectures/)
+
+- [Examples Overview](https://docs.swarms.world/en/latest/examples/index/)
+
+- [CLI Documentation](https://docs.swarms.world/en/latest/swarms/cli/main/)
+
+- [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/)
+
+- [Development Philosophy & Principles](https://docs.swarms.world/en/latest/swarms/concept/philosophy/)
+
+- [Understanding Swarms Architecture](https://docs.swarms.world/en/latest/swarms/concept/framework_architecture/)
+
+
+# Conclusion
+
+The Swarms protocol provides a robust foundation for building intelligent, collaborative, and autonomous systems. By organizing the codebase into clear, modular folders and defining a logical flow from agents to multi-agent structures, Swarms enables rapid development and deployment of advanced AI solutions. Whether you are building a simple automation or a complex multi-agent application, the Swarms architecture provides the tools and abstractions you need to succeed.
+
diff --git a/docs/protocol/sip.md b/docs/protocol/sip.md
new file mode 100644
index 00000000..312d79bd
--- /dev/null
+++ b/docs/protocol/sip.md
@@ -0,0 +1,159 @@
+# Swarms Improvement Proposal (SIP) Guidelines
+
+A simplified process for proposing new functionality and enhancements to the Swarms framework.
+
+## What is a SIP?
+
+A **Swarms Improvement Proposal (SIP)** is a design document that describes a new feature, enhancement, or change to the Swarms framework. SIPs serve as the primary mechanism for proposing significant changes, collecting community feedback, and documenting design decisions.
+
+The SIP author is responsible for building consensus within the community and documenting the proposal clearly and concisely.
+
+## When to Submit a SIP
+
+Consider submitting a SIP for:
+
+- **New Agent Types or Behaviors**: Adding new agent architectures, swarm patterns, or coordination mechanisms
+- **Core Framework Changes**: Modifications to the Swarms API, core classes, or fundamental behaviors
+- **New Integrations**: Adding support for new LLM providers, tools, or external services
+- **Breaking Changes**: Any change that affects backward compatibility
+- **Complex Features**: Multi-component features that require community discussion and design review
+
+For simple bug fixes, minor enhancements, or straightforward additions, use regular GitHub issues and pull requests instead.
+
+## SIP Types
+
+**Standard SIP**: Describes a new feature or change to the Swarms framework
+**Process SIP**: Describes changes to development processes, governance, or community guidelines
+**Informational SIP**: Provides information or guidelines to the community without proposing changes
+
+## Submitting a SIP
+
+1. **Discuss First**: Post your idea in [GitHub Discussions](https://github.com/kyegomez/swarms/discussions) to gauge community interest
+2. **Create Issue**: Submit your SIP as a GitHub Issue with the `SIP` and `proposal` labels
+3. **Follow Format**: Use the SIP template format below
+4. **Engage Community**: Respond to feedback and iterate on your proposal
+
+## SIP Format
+
+### Required Sections
+
+#### **SIP Header**
+```
+Title: [Descriptive title]
+Author: [Your name and contact]
+Type: [Standard/Process/Informational]
+Status: Proposal
+Created: [Date]
+```
+
+#### **Abstract** (200 words max)
+A brief summary of what you're proposing and why.
+
+#### **Motivation**
+- What problem does this solve?
+- Why can't the current framework handle this?
+- What are the benefits to the Swarms ecosystem?
+
+#### **Specification**
+- Detailed technical description
+- API changes or new interfaces
+- Code examples showing usage
+- Integration points with existing framework
+
+#### **Implementation Plan**
+- High-level implementation approach
+- Breaking changes (if any)
+- Migration path for existing users
+- Testing strategy
+
+#### **Alternatives Considered**
+- Other approaches you evaluated
+- Why you chose this solution
+- Trade-offs and limitations
+
+### Optional Sections
+
+#### **Reference Implementation**
+Link to prototype code or proof-of-concept (can be added later)
+
+#### **Security Considerations**
+Any security implications or requirements
+
+## SIP Workflow
+
+```
+Proposal → Draft → Review → Accepted/Rejected → Final
+```
+
+1. **Proposal**: Initial submission as GitHub Issue
+2. **Draft**: Maintainer assigns SIP number and `draft` label
+3. **Review**: Community and maintainer review period
+4. **Decision**: Accepted, rejected, or needs revision
+5. **Final**: Implementation completed and merged
+
+## SIP Status
+
+- **Proposal**: Newly submitted, awaiting initial review
+- **Draft**: Under active discussion and refinement
+- **Review**: Formal review by maintainers
+- **Accepted**: Approved for implementation
+- **Rejected**: Not accepted (with reasons)
+- **Final**: Implementation completed and merged
+- **Withdrawn**: Author withdrew the proposal
+
+## Review Process
+
+- SIPs are reviewed during regular maintainer meetings
+- Community feedback is collected via GitHub comments
+- Acceptance requires:
+ - Clear benefit to the Swarms ecosystem
+ - Technical feasibility
+ - Community support
+ - Working prototype (for complex features)
+
+## Getting Help
+
+- **Discussions**: Use [GitHub Discussions](https://github.com/kyegomez/swarms/discussions) for questions
+- **Documentation**: Check [docs.swarms.world](https://docs.swarms.world) for framework details
+- **Examples**: Look at existing SIPs for reference
+
+## SIP Template
+
+When creating your SIP, copy this template:
+
+```markdown
+# SIP-XXX: [Title]
+
+**Author**: [Your name] <[email]>
+**Type**: Standard
+**Status**: Proposal
+**Created**: [Date]
+
+## Abstract
+
+[Brief 200-word summary]
+
+## Motivation
+
+[Why is this needed? What problem does it solve?]
+
+## Specification
+
+[Detailed technical description with code examples]
+
+## Implementation Plan
+
+[How will this be built? Any breaking changes?]
+
+## Alternatives Considered
+
+[Other approaches and why you chose this one]
+
+## Reference Implementation
+
+[Link to prototype code if available]
+```
+
+---
+
+**Note**: This process is designed to be lightweight while ensuring important changes get proper community review. For questions about whether your idea needs a SIP, start a discussion in the GitHub Discussions forum.
\ No newline at end of file
diff --git a/examples/mcp/client.py b/examples/mcp/client.py
new file mode 100644
index 00000000..cf256edd
--- /dev/null
+++ b/examples/mcp/client.py
@@ -0,0 +1,26 @@
+from swarms.tools.mcp_client_call import (
+ execute_tool_call_simple,
+ get_mcp_tools_sync,
+)
+
+
+async def main():
+ # Prepare the tool call in OpenAI-compatible format
+ response = {
+ "function": {"name": "greet", "arguments": {"name": "Alice"}}
+ }
+ result = await execute_tool_call_simple(
+ server_path="http://localhost:8000/mcp",
+ response=response,
+ # transport="streamable_http",
+ )
+ print("Tool call result:", result)
+ return result
+
+
+if __name__ == "__main__":
+ print(get_mcp_tools_sync(server_path="http://localhost:8000/mcp"))
+
+ import asyncio
+
+ asyncio.run(main())
diff --git a/examples/mcp/test.py b/examples/mcp/test.py
new file mode 100644
index 00000000..44eb2a69
--- /dev/null
+++ b/examples/mcp/test.py
@@ -0,0 +1,28 @@
+"""
+Run from the repository root:
+ uv run examples/snippets/servers/streamable_config.py
+"""
+
+from mcp.server.fastmcp import FastMCP
+
+# Stateful server (maintains session state)
+mcp = FastMCP("StatefulServer", json_response=True)
+
+# Other configuration options:
+# Stateless server (no session persistence)
+# mcp = FastMCP("StatelessServer", stateless_http=True)
+
+# Stateless server (no session persistence, no sse stream with supported client)
+# mcp = FastMCP("StatelessServer", stateless_http=True, json_response=True)
+
+
+# Add a simple tool to demonstrate the server
+@mcp.tool()
+def greet(name: str = "World") -> str:
+ """Greet someone by name."""
+ return f"Hello, {name}!"
+
+
+# Run server with streamable_http transport
+if __name__ == "__main__":
+ mcp.run(transport="streamable-http")
diff --git a/pyproject.toml b/pyproject.toml
index 6291af13..43ca6488 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -78,6 +78,7 @@ litellm = "*"
torch = "*"
httpx = "*"
mcp = "*"
+openai = "*"
aiohttp = "*"
[tool.poetry.scripts]
diff --git a/requirements.txt b/requirements.txt
index 2f512d55..4f7ae7f3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,3 +26,4 @@ httpx
aiohttp
mcp
numpy
+openai
\ No newline at end of file
diff --git a/swarms/schemas/mcp_schemas.py b/swarms/schemas/mcp_schemas.py
index 196ebd24..624d2416 100644
--- a/swarms/schemas/mcp_schemas.py
+++ b/swarms/schemas/mcp_schemas.py
@@ -8,7 +8,7 @@ class MCPConnection(BaseModel):
description="The type of connection, defaults to 'mcp'",
)
url: Optional[str] = Field(
- default="localhost:8000/sse",
+ default="http://localhost:8000/mcp",
description="The URL endpoint for the MCP server",
)
tool_configurations: Optional[Dict[Any, Any]] = Field(
@@ -20,18 +20,19 @@ class MCPConnection(BaseModel):
description="Authentication token for accessing the MCP server",
)
transport: Optional[str] = Field(
- default="sse",
+ default="streamable_http",
description="The transport protocol to use for the MCP server",
)
headers: Optional[Dict[str, str]] = Field(
default=None, description="Headers to send to the MCP server"
)
timeout: Optional[int] = Field(
- default=5, description="Timeout for the MCP server"
+ default=10, description="Timeout for the MCP server"
)
class Config:
arbitrary_types_allowed = True
+ extra = "allow"
class MultipleMCPConnections(BaseModel):
diff --git a/swarms/tools/mcp_client_call.py b/swarms/tools/mcp_client_call.py
index 3fa3a9fa..9409b736 100644
--- a/swarms/tools/mcp_client_call.py
+++ b/swarms/tools/mcp_client_call.py
@@ -1,16 +1,24 @@
-import os
import asyncio
import contextlib
import json
+import os
import random
+from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from typing import Any, Dict, List, Literal, Optional, Union
-from concurrent.futures import ThreadPoolExecutor, as_completed
from litellm.types.utils import ChatCompletionMessageToolCall
from loguru import logger
from mcp import ClientSession
from mcp.client.sse import sse_client
+
+try:
+ from mcp.client.streamable_http import streamablehttp_client
+except ImportError:
+ logger.error(
+ "streamablehttp_client is not available. Please ensure the MCP SDK is up to date with pip3 install -U mcp"
+ )
+
from mcp.types import (
CallToolRequestParams as MCPCallToolRequestParams,
)
@@ -25,6 +33,7 @@ from swarms.schemas.mcp_schemas import (
MCPConnection,
)
from swarms.utils.index import exists
+from urllib.parse import urlparse
class MCPError(Exception):
@@ -63,7 +72,16 @@ class MCPExecutionError(MCPError):
def transform_mcp_tool_to_openai_tool(
mcp_tool: MCPTool,
) -> ChatCompletionToolParam:
- """Convert an MCP tool to an OpenAI tool."""
+ """
+ Convert an MCP tool to an OpenAI tool.
+ Args:
+ mcp_tool (MCPTool): The MCP tool object.
+ Returns:
+ ChatCompletionToolParam: The OpenAI-compatible tool parameter.
+ """
+ logger.info(
+ f"Transforming MCP tool '{mcp_tool.name}' to OpenAI tool format."
+ )
return ChatCompletionToolParam(
type="function",
function=FunctionDefinition(
@@ -79,15 +97,14 @@ async def load_mcp_tools(
session: ClientSession, format: Literal["mcp", "openai"] = "mcp"
) -> Union[List[MCPTool], List[ChatCompletionToolParam]]:
"""
- Load all available MCP tools
-
+ Load all available MCP tools from the session.
Args:
- session: The MCP session to use
- format: The format to convert the tools to
- By default, the tools are returned in MCP format.
-
- If format is set to "openai", the tools are converted to OpenAI API compatible tools.
+ session (ClientSession): The MCP session to use.
+ format (Literal["mcp", "openai"]): The format to convert the tools to.
+ Returns:
+ List of tools in the specified format.
"""
+ logger.info(f"Loading MCP tools with format '{format}'.")
tools = await session.list_tools()
if format == "openai":
return [
@@ -106,16 +123,28 @@ async def call_mcp_tool(
session: ClientSession,
call_tool_request_params: MCPCallToolRequestParams,
) -> MCPCallToolResult:
- """Call an MCP tool."""
- tool_result = await session.call_tool(
+ """
+ Call an MCP tool using the provided session and request parameters.
+ Args:
+ session (ClientSession): The MCP session to use.
+ call_tool_request_params (MCPCallToolRequestParams): The tool call request params.
+ Returns:
+ MCPCallToolResult: The result of the tool call.
+ """
+ return await session.call_tool(
name=call_tool_request_params.name,
arguments=call_tool_request_params.arguments,
)
- return tool_result
def _get_function_arguments(function: FunctionDefinition) -> dict:
- """Helper to safely get and parse function arguments."""
+ """
+ Helper to safely get and parse function arguments from a function definition.
+ Args:
+ function (FunctionDefinition): The function definition.
+ Returns:
+ dict: Parsed arguments as a dictionary.
+ """
arguments = function.get("arguments", {})
if isinstance(arguments, str):
try:
@@ -128,7 +157,13 @@ def _get_function_arguments(function: FunctionDefinition) -> dict:
def transform_openai_tool_call_request_to_mcp_tool_call_request(
openai_tool: Union[ChatCompletionMessageToolCall, Dict],
) -> MCPCallToolRequestParams:
- """Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams."""
+ """
+ Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams.
+ Args:
+ openai_tool (Union[ChatCompletionMessageToolCall, Dict]): The OpenAI tool call request.
+ Returns:
+ MCPCallToolRequestParams: The MCP tool call request params.
+ """
function = openai_tool["function"]
return MCPCallToolRequestParams(
name=function["name"],
@@ -142,12 +177,11 @@ async def call_openai_tool(
) -> MCPCallToolResult:
"""
Call an OpenAI tool using MCP client.
-
Args:
- session: The MCP session to use
- openai_tool: The OpenAI tool to call. You can get this from the `choices[0].message.tool_calls[0]` of the response from the OpenAI API.
+ session (ClientSession): The MCP session to use.
+ openai_tool (dict): The OpenAI tool to call.
Returns:
- The result of the MCP tool call.
+ MCPCallToolResult: The result of the MCP tool call.
"""
mcp_tool_call_request_params = (
transform_openai_tool_call_request_to_mcp_tool_call_request(
@@ -161,7 +195,14 @@ async def call_openai_tool(
def retry_with_backoff(retries=3, backoff_in_seconds=1):
- """Decorator for retrying functions with exponential backoff."""
+ """
+ Decorator for retrying async functions with exponential backoff.
+ Args:
+ retries (int): Number of retry attempts.
+ backoff_in_seconds (int): Initial backoff time in seconds.
+ Returns:
+ Decorated async function with retry logic.
+ """
def decorator(func):
@wraps(func)
@@ -193,13 +234,17 @@ def retry_with_backoff(retries=3, backoff_in_seconds=1):
@contextlib.contextmanager
def get_or_create_event_loop():
- """Context manager to handle event loop creation and cleanup."""
+ """
+ Context manager to handle event loop creation and cleanup.
+ Yields:
+ asyncio.AbstractEventLoop: The event loop to use.
+ Ensures the event loop is properly closed if created here.
+ """
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
-
try:
yield loop
finally:
@@ -210,27 +255,28 @@ def get_or_create_event_loop():
def connect_to_mcp_server(connection: MCPConnection = None):
- """Connect to an MCP server.
-
+ """
+ Connect to an MCP server using the provided connection configuration.
Args:
- connection (MCPConnection): The connection configuration object
-
+ connection (MCPConnection): The connection configuration object.
Returns:
- tuple: A tuple containing (headers, timeout, transport, url)
-
+ tuple: (headers, timeout, transport, url)
Raises:
- MCPValidationError: If the connection object is invalid
+ MCPValidationError: If the connection object is invalid.
"""
+ logger.info(
+ "Connecting to MCP server using MCPConnection object."
+ )
if not isinstance(connection, MCPConnection):
+ logger.error(
+ "Invalid connection type provided to connect_to_mcp_server."
+ )
raise MCPValidationError("Invalid connection type")
-
- # Direct attribute access is faster than property access
headers = dict(connection.headers or {})
if connection.authorization_token:
headers["Authorization"] = (
f"Bearer {connection.authorization_token}"
)
-
return (
headers,
connection.timeout or 5,
@@ -239,31 +285,104 @@ def connect_to_mcp_server(connection: MCPConnection = None):
)
+def get_mcp_client(transport, url, headers=None, timeout=5, **kwargs):
+ """
+ Helper to select the correct MCP client context manager based on transport.
+ Supports 'sse' (default) and 'streamable_http'.
+ Args:
+ transport (str): The transport type ('sse' or 'streamable_http').
+ url (str): The server URL.
+ headers (dict): Optional headers.
+ timeout (int): Timeout in seconds.
+ **kwargs: Additional arguments.
+ Returns:
+ Context manager for the selected client.
+ Raises:
+ ImportError: If streamablehttp_client is not available when requested.
+ """
+ logger.info(
+ f"Getting MCP client for transport '{transport}' and url '{url}'."
+ )
+ if transport == "streamable_http":
+ if streamablehttp_client is None:
+ logger.error("streamablehttp_client is not available.")
+ raise ImportError(
+ "streamablehttp_client is not available. Please ensure the MCP SDK is up to date."
+ )
+ return streamablehttp_client(
+ url, headers=headers, timeout=timeout, **kwargs
+ )
+ else:
+ return sse_client(
+ url, headers=headers, timeout=timeout, **kwargs
+ )
+
+
+def auto_detect_transport(url: str) -> str:
+ """
+ Guess the MCP transport based on the URL scheme and path.
+ Does not make any network requests.
+ Returns one of: 'streamable_http', 'sse', or 'stdio'.
+ Args:
+ url (str): The server URL.
+ Returns:
+ str: The detected transport type.
+ """
+ parsed = urlparse(url)
+ scheme = parsed.scheme.lower()
+ if scheme in ("http", "https"):
+ logger.info(
+ f"Automatically selected 'streamable_http' transport for {url}"
+ )
+ return "streamable_http"
+ elif scheme in ("ws", "wss"):
+ logger.info(
+ f"Automatically selected 'sse' transport for {url}"
+ )
+ return "sse" # or 'websocket' if you support it
+ elif "stdio" in url or scheme == "":
+ logger.info(
+ f"Automatically selected 'stdio' transport for {url}"
+ )
+ return "stdio"
+ else:
+ logger.info(f"Defaulting to 'sse' transport for {url}")
+ return "sse"
+
+
@retry_with_backoff(retries=3)
async def aget_mcp_tools(
server_path: Optional[str] = None,
format: str = "openai",
connection: Optional[MCPConnection] = None,
+ transport: Optional[str] = None,
*args,
**kwargs,
) -> List[Dict[str, Any]]:
"""
Fetch available MCP tools from the server with retry logic.
-
Args:
- server_path (str): Path to the MCP server script
-
+ server_path (str): Path to the MCP server script.
+ format (str): Format to return tools in ('openai' or 'mcp').
+ connection (Optional[MCPConnection]): Optional connection object.
+ transport (Optional[str]): Transport type. If None, auto-detects.
Returns:
- List[Dict[str, Any]]: List of available MCP tools in OpenAI format
-
+ List[Dict[str, Any]]: List of available MCP tools in OpenAI format.
Raises:
- MCPValidationError: If server_path is invalid
- MCPConnectionError: If connection to server fails
+ MCPValidationError: If server_path is invalid.
+ MCPConnectionError: If connection to server fails.
"""
+ logger.info(
+ f"aget_mcp_tools called for server_path: {server_path}"
+ )
+ if transport is None:
+ transport = auto_detect_transport(server_path)
if exists(connection):
- headers, timeout, transport, url = connect_to_mcp_server(
- connection
+ headers, timeout, transport_from_conn, url = (
+ connect_to_mcp_server(connection)
)
+ if transport_from_conn:
+ transport = transport_from_conn
else:
headers, timeout, _transport, _url = (
None,
@@ -271,20 +390,23 @@ async def aget_mcp_tools(
None,
server_path,
)
-
- logger.info(f"Fetching MCP tools from server: {server_path}")
-
+ url = server_path
+ logger.info(
+ f"Fetching MCP tools from server: {server_path} using transport: {transport}"
+ )
try:
- async with sse_client(
- url=server_path,
+ async with get_mcp_client(
+ transport,
+ url=url,
headers=headers,
timeout=timeout,
*args,
**kwargs,
- ) as (
- read,
- write,
- ):
+ ) as ctx:
+ if len(ctx) == 2:
+ read, write = ctx
+ else:
+ read, write, *_ = ctx
async with ClientSession(read, write) as session:
await session.initialize()
tools = await load_mcp_tools(
@@ -305,23 +427,29 @@ def get_mcp_tools_sync(
server_path: Optional[str] = None,
format: str = "openai",
connection: Optional[MCPConnection] = None,
+ transport: Optional[str] = None,
*args,
**kwargs,
) -> List[Dict[str, Any]]:
"""
Synchronous version of get_mcp_tools that handles event loop management.
-
Args:
- server_path (str): Path to the MCP server script
-
+ server_path (str): Path to the MCP server script.
+ format (str): Format to return tools in ('openai' or 'mcp').
+ connection (Optional[MCPConnection]): Optional connection object.
+ transport (Optional[str]): Transport type. If None, auto-detects.
Returns:
- List[Dict[str, Any]]: List of available MCP tools in OpenAI format
-
+ List[Dict[str, Any]]: List of available MCP tools in OpenAI format.
Raises:
- MCPValidationError: If server_path is invalid
- MCPConnectionError: If connection to server fails
- MCPExecutionError: If event loop management fails
+ MCPValidationError: If server_path is invalid.
+ MCPConnectionError: If connection to server fails.
+ MCPExecutionError: If event loop management fails.
"""
+ logger.info(
+ f"get_mcp_tools_sync called for server_path: {server_path}"
+ )
+ if transport is None:
+ transport = auto_detect_transport(server_path)
with get_or_create_event_loop() as loop:
try:
return loop.run_until_complete(
@@ -329,6 +457,7 @@ def get_mcp_tools_sync(
server_path=server_path,
format=format,
connection=connection,
+ transport=transport,
*args,
**kwargs,
)
@@ -344,12 +473,26 @@ def _fetch_tools_for_server(
url: str,
connection: Optional[MCPConnection] = None,
format: str = "openai",
+ transport: Optional[str] = None,
) -> List[Dict[str, Any]]:
- """Helper function to fetch tools for a single server."""
+ """
+ Helper function to fetch tools for a single server.
+ Args:
+ url (str): The server URL.
+ connection (Optional[MCPConnection]): Optional connection object.
+ format (str): Format to return tools in.
+ transport (Optional[str]): Transport type. If None, auto-detects.
+ Returns:
+ List[Dict[str, Any]]: List of available MCP tools.
+ """
+ logger.info(f"_fetch_tools_for_server called for url: {url}")
+ if transport is None:
+ transport = auto_detect_transport(url)
return get_mcp_tools_sync(
server_path=url,
connection=connection,
format=format,
+ transport=transport,
)
@@ -359,19 +502,23 @@ def get_tools_for_multiple_mcp_servers(
format: str = "openai",
output_type: Literal["json", "dict", "str"] = "str",
max_workers: Optional[int] = None,
+ transport: Optional[str] = None,
) -> List[Dict[str, Any]]:
- """Get tools for multiple MCP servers concurrently using ThreadPoolExecutor.
-
+ """
+ Get tools for multiple MCP servers concurrently using ThreadPoolExecutor.
Args:
- urls: List of server URLs to fetch tools from
- connections: Optional list of MCPConnection objects corresponding to each URL
- format: Format to return tools in (default: "openai")
- output_type: Type of output format (default: "str")
- max_workers: Maximum number of worker threads (default: None, uses min(32, os.cpu_count() + 4))
-
+ urls (List[str]): List of server URLs to fetch tools from.
+ connections (List[MCPConnection]): Optional list of MCPConnection objects.
+ format (str): Format to return tools in.
+ output_type (Literal): Output format type.
+ max_workers (Optional[int]): Max worker threads.
+ transport (Optional[str]): Transport type. If None, auto-detects per URL.
Returns:
- List[Dict[str, Any]]: Combined list of tools from all servers
+ List[Dict[str, Any]]: Combined list of tools from all servers.
"""
+ logger.info(
+ f"get_tools_for_multiple_mcp_servers called for {len(urls)} urls."
+ )
tools = []
(
min(32, os.cpu_count() + 4)
@@ -380,23 +527,27 @@ def get_tools_for_multiple_mcp_servers(
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
if exists(connections):
- # Create future tasks for each URL-connection pair
future_to_url = {
executor.submit(
- _fetch_tools_for_server, url, connection, format
+ _fetch_tools_for_server,
+ url,
+ connection,
+ format,
+ transport,
): url
for url, connection in zip(urls, connections)
}
else:
- # Create future tasks for each URL without connections
future_to_url = {
executor.submit(
- _fetch_tools_for_server, url, None, format
+ _fetch_tools_for_server,
+ url,
+ None,
+ format,
+ transport,
): url
for url in urls
}
-
- # Process completed futures as they come in
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
@@ -409,7 +560,6 @@ def get_tools_for_multiple_mcp_servers(
raise MCPExecutionError(
f"Failed to fetch tools from {url}: {str(e)}"
)
-
return tools
@@ -418,14 +568,34 @@ async def _execute_tool_call_simple(
server_path: str = None,
connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str"] = "str",
+ transport: Optional[str] = None,
*args,
**kwargs,
):
- """Execute a tool call using the MCP client."""
+ """
+ Execute a tool call using the MCP client, supporting both SSE and streamable HTTP.
+ Args:
+ response (any): The tool call request.
+ server_path (str): The server URL.
+ connection (Optional[MCPConnection]): Optional connection object.
+ output_type (Literal): Output format type.
+ transport (Optional[str]): Transport type. If None, auto-detects.
+ Returns:
+ The tool call result in the specified output format.
+ Raises:
+ MCPExecutionError, MCPConnectionError
+ """
+ logger.info(
+ f"_execute_tool_call_simple called for server_path: {server_path}"
+ )
+ if transport is None:
+ transport = auto_detect_transport(server_path)
if exists(connection):
- headers, timeout, transport, url = connect_to_mcp_server(
- connection
+ headers, timeout, transport_from_conn, url = (
+ connect_to_mcp_server(connection)
)
+ if transport_from_conn:
+ transport = transport_from_conn
else:
headers, timeout, _transport, url = (
None,
@@ -433,23 +603,25 @@ async def _execute_tool_call_simple(
"sse",
server_path,
)
-
try:
- async with sse_client(
- url=url, headers=headers, timeout=timeout, *args, **kwargs
- ) as (
- read,
- write,
- ):
+ async with get_mcp_client(
+ transport,
+ url=url,
+ headers=headers,
+ timeout=timeout,
+ *args,
+ **kwargs,
+ ) as ctx:
+ if len(ctx) == 2:
+ read, write = ctx
+ else:
+ read, write, *_ = ctx
async with ClientSession(read, write) as session:
try:
await session.initialize()
-
call_result = await call_openai_tool(
- session=session,
- openai_tool=response,
+ session=session, openai_tool=response
)
-
if output_type == "json":
out = call_result.model_dump_json(indent=4)
elif output_type == "dict":
@@ -470,19 +642,21 @@ async def _execute_tool_call_simple(
f"{key}: {value}"
)
out = "\n".join(formatted_lines)
-
+ else:
+ out = call_result.model_dump()
+ logger.info(
+ f"Tool call executed successfully for {server_path}"
+ )
return out
-
except Exception as e:
logger.error(f"Error in tool execution: {str(e)}")
raise MCPExecutionError(
- f"Tool execution failed: {str(e)}"
+ f"Tool execution failed for tool '{getattr(response, 'function', {}).get('name', 'unknown')}' on server '{url}': {str(e)}"
)
-
except Exception as e:
- logger.error(f"Error in SSE client connection: {str(e)}")
+ logger.error(f"Error in MCP client connection: {str(e)}")
raise MCPConnectionError(
- f"Failed to connect to MCP server: {str(e)}"
+ f"Failed to connect to MCP server '{url}' using transport '{transport}': {str(e)}"
)
@@ -491,17 +665,34 @@ async def execute_tool_call_simple(
server_path: str = None,
connection: Optional[MCPConnection] = None,
output_type: Literal["json", "dict", "str", "formatted"] = "str",
+ transport: Optional[str] = None,
*args,
**kwargs,
) -> List[Dict[str, Any]]:
+ """
+ High-level async function to execute a tool call on an MCP server.
+ Args:
+ response (any): The tool call request.
+ server_path (str): The server URL.
+ connection (Optional[MCPConnection]): Optional connection object.
+ output_type (Literal): Output format type.
+ transport (Optional[str]): Transport type. If None, auto-detects.
+ Returns:
+ The tool call result in the specified output format.
+ """
+ logger.info(
+ f"execute_tool_call_simple called for server_path: {server_path}"
+ )
+ if transport is None:
+ transport = auto_detect_transport(server_path)
if isinstance(response, str):
response = json.loads(response)
-
return await _execute_tool_call_simple(
response=response,
server_path=server_path,
connection=connection,
output_type=output_type,
+ transport=transport,
*args,
**kwargs,
)
@@ -511,36 +702,32 @@ def _create_server_tool_mapping(
urls: List[str],
connections: List[MCPConnection] = None,
format: str = "openai",
+ transport: Optional[str] = None,
) -> Dict[str, Dict[str, Any]]:
"""
Create a mapping of function names to server information for all MCP servers.
-
Args:
- urls: List of server URLs
- connections: Optional list of MCPConnection objects
- format: Format to fetch tools in
-
+ urls (List[str]): List of server URLs.
+ connections (List[MCPConnection]): Optional list of MCPConnection objects.
+ format (str): Format to fetch tools in.
+ transport (Optional[str]): Transport type. If None, auto-detects per URL.
Returns:
- Dict mapping function names to server info (url, connection, tool)
+ Dict[str, Dict[str, Any]]: Mapping of function names to server info.
"""
server_tool_mapping = {}
-
for i, url in enumerate(urls):
connection = (
connections[i]
if connections and i < len(connections)
else None
)
-
try:
- # Get tools for this server
tools = get_mcp_tools_sync(
server_path=url,
connection=connection,
format=format,
+ transport=transport,
)
-
- # Create mapping for each tool
for tool in tools:
if isinstance(tool, dict) and "function" in tool:
function_name = tool["function"]["name"]
@@ -551,20 +738,17 @@ def _create_server_tool_mapping(
"server_index": i,
}
elif hasattr(tool, "name"):
- # Handle MCPTool objects
server_tool_mapping[tool.name] = {
"url": url,
"connection": connection,
"tool": tool,
"server_index": i,
}
-
except Exception as e:
logger.warning(
f"Failed to fetch tools from server {url}: {str(e)}"
)
continue
-
return server_tool_mapping
@@ -572,36 +756,32 @@ async def _create_server_tool_mapping_async(
urls: List[str],
connections: List[MCPConnection] = None,
format: str = "openai",
+ transport: str = "sse",
) -> Dict[str, Dict[str, Any]]:
"""
Async version: Create a mapping of function names to server information for all MCP servers.
-
Args:
- urls: List of server URLs
- connections: Optional list of MCPConnection objects
- format: Format to fetch tools in
-
+ urls (List[str]): List of server URLs.
+ connections (List[MCPConnection]): Optional list of MCPConnection objects.
+ format (str): Format to fetch tools in.
+ transport (str): Transport type.
Returns:
- Dict mapping function names to server info (url, connection, tool)
+ Dict[str, Dict[str, Any]]: Mapping of function names to server info.
"""
server_tool_mapping = {}
-
for i, url in enumerate(urls):
connection = (
connections[i]
if connections and i < len(connections)
else None
)
-
try:
- # Get tools for this server using async function
tools = await aget_mcp_tools(
server_path=url,
connection=connection,
format=format,
+ transport=transport,
)
-
- # Create mapping for each tool
for tool in tools:
if isinstance(tool, dict) and "function" in tool:
function_name = tool["function"]["name"]
@@ -612,20 +792,17 @@ async def _create_server_tool_mapping_async(
"server_index": i,
}
elif hasattr(tool, "name"):
- # Handle MCPTool objects
server_tool_mapping[tool.name] = {
"url": url,
"connection": connection,
"tool": tool,
"server_index": i,
}
-
except Exception as e:
logger.warning(
f"Failed to fetch tools from server {url}: {str(e)}"
)
continue
-
return server_tool_mapping
@@ -633,17 +810,17 @@ async def _execute_tool_on_server(
tool_call: Dict[str, Any],
server_info: Dict[str, Any],
output_type: Literal["json", "dict", "str", "formatted"] = "str",
+ transport: str = "sse",
) -> Dict[str, Any]:
"""
Execute a single tool call on a specific server.
-
Args:
- tool_call: The tool call to execute
- server_info: Server information from the mapping
- output_type: Output format type
-
+ tool_call (Dict[str, Any]): The tool call to execute.
+ server_info (Dict[str, Any]): Server information from the mapping.
+ output_type (Literal): Output format type.
+ transport (str): Transport type.
Returns:
- Execution result with server metadata
+ Dict[str, Any]: Execution result with server metadata.
"""
try:
result = await _execute_tool_call_simple(
@@ -651,8 +828,8 @@ async def _execute_tool_on_server(
server_path=server_info["url"],
connection=server_info["connection"],
output_type=output_type,
+ transport=transport,
)
-
return {
"server_url": server_info["url"],
"server_index": server_info["server_index"],
@@ -662,7 +839,6 @@ async def _execute_tool_on_server(
"result": result,
"status": "success",
}
-
except Exception as e:
logger.error(
f"Failed to execute tool on server {server_info['url']}: {str(e)}"
@@ -674,7 +850,7 @@ async def _execute_tool_on_server(
"name", "unknown"
),
"result": None,
- "error": str(e),
+ "error": f"Custom error: Failed to execute tool '{tool_call.get('function', {}).get('name', 'unknown')}' on server '{server_info['url']}': {str(e)}",
"status": "error",
}
@@ -685,79 +861,47 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
connections: List[MCPConnection] = None,
output_type: Literal["json", "dict", "str", "formatted"] = "str",
max_concurrent: Optional[int] = None,
+ transport: str = "sse",
*args,
**kwargs,
) -> List[Dict[str, Any]]:
"""
Execute multiple tool calls across multiple MCP servers.
-
- This function creates a mapping of function names to servers, then for each response
- that contains tool calls, it finds the appropriate server for each function and
- executes the calls concurrently.
-
Args:
- responses: List of responses containing tool calls (OpenAI format)
- urls: List of MCP server URLs
- connections: Optional list of MCPConnection objects corresponding to each URL
- output_type: Output format type for results
- max_concurrent: Maximum number of concurrent executions (default: len(responses))
-
+ responses (List[Dict[str, Any]]): List of tool call requests.
+ urls (List[str]): List of server URLs.
+ connections (List[MCPConnection]): Optional list of MCPConnection objects.
+ output_type (Literal): Output format type.
+ max_concurrent (Optional[int]): Max concurrent tasks.
+ transport (str): Transport type.
Returns:
- List of execution results with server metadata
-
- Example:
- # Example responses format:
- responses = [
- {
- "function": {
- "name": "search_web",
- "arguments": {"query": "python programming"}
- }
- },
- {
- "function": {
- "name": "search_database",
- "arguments": {"table": "users", "id": 123}
- }
- }
- ]
-
- urls = ["http://server1:8000", "http://server2:8000"]
-
- results = await execute_multiple_tools_on_multiple_mcp_servers(
- responses=responses,
- urls=urls
- )
+ List[Dict[str, Any]]: List of execution results.
"""
if not responses:
logger.warning("No responses provided for execution")
return []
-
if not urls:
raise MCPValidationError("No server URLs provided")
-
- # Create mapping of function names to servers using async version
- logger.info(f"Creating tool mapping for {len(urls)} servers")
+ logger.info(
+ f"Creating tool mapping for {len(urls)} servers using transport: {transport}"
+ )
server_tool_mapping = await _create_server_tool_mapping_async(
- urls=urls, connections=connections, format="openai"
+ urls=urls,
+ connections=connections,
+ format="openai",
+ transport=transport,
)
-
if not server_tool_mapping:
raise MCPExecutionError(
"No tools found on any of the provided servers"
)
-
logger.info(
f"Found {len(server_tool_mapping)} unique functions across all servers"
)
-
- # Extract all tool calls from responses
all_tool_calls = []
logger.info(
f"Processing {len(responses)} responses for tool call extraction"
)
-
- # Check if responses are individual characters that need to be reconstructed
if len(responses) > 10 and all(
isinstance(r, str) and len(r) == 1 for r in responses
):
@@ -772,8 +916,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
logger.debug(
f"Reconstructed response: {reconstructed_response}"
)
-
- # Try to parse the reconstructed response to validate it
try:
json.loads(reconstructed_response)
logger.info(
@@ -789,19 +931,15 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
logger.debug(
f"Last 100 chars: {reconstructed_response[-100:]}"
)
-
responses = [reconstructed_response]
except Exception as e:
logger.warning(
f"Failed to reconstruct response from characters: {str(e)}"
)
-
for i, response in enumerate(responses):
logger.debug(
f"Processing response {i}: {type(response)} - {response}"
)
-
- # Handle JSON string responses
if isinstance(response, str):
try:
response = json.loads(response)
@@ -813,14 +951,11 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
f"Failed to parse JSON response at index {i}: {response}"
)
continue
-
if isinstance(response, dict):
- # Single tool call
if "function" in response:
logger.debug(
f"Found single tool call in response {i}: {response['function']}"
)
- # Parse arguments if they're a JSON string
if isinstance(
response["function"].get("arguments"), str
):
@@ -837,15 +972,12 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
logger.warning(
f"Failed to parse function arguments: {response['function']['arguments']}"
)
-
all_tool_calls.append((i, response))
- # Multiple tool calls
elif "tool_calls" in response:
logger.debug(
f"Found multiple tool calls in response {i}: {len(response['tool_calls'])} calls"
)
for tool_call in response["tool_calls"]:
- # Parse arguments if they're a JSON string
if isinstance(
tool_call.get("function", {}).get(
"arguments"
@@ -865,14 +997,11 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
logger.warning(
f"Failed to parse tool call arguments: {tool_call['function']['arguments']}"
)
-
all_tool_calls.append((i, tool_call))
- # Direct tool call
elif "name" in response and "arguments" in response:
logger.debug(
f"Found direct tool call in response {i}: {response}"
)
- # Parse arguments if they're a JSON string
if isinstance(response.get("arguments"), str):
try:
response["arguments"] = json.loads(
@@ -885,7 +1014,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
logger.warning(
f"Failed to parse direct tool call arguments: {response['arguments']}"
)
-
all_tool_calls.append((i, {"function": response}))
else:
logger.debug(
@@ -896,14 +1024,10 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
f"Unsupported response type at index {i}: {type(response)}"
)
continue
-
if not all_tool_calls:
logger.warning("No tool calls found in responses")
return []
-
logger.info(f"Found {len(all_tool_calls)} tool calls to execute")
-
- # Execute tool calls concurrently
max_concurrent = max_concurrent or len(all_tool_calls)
semaphore = asyncio.Semaphore(max_concurrent)
@@ -913,7 +1037,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
function_name = tool_call.get("function", {}).get(
"name", "unknown"
)
-
if function_name not in server_tool_mapping:
logger.warning(
f"Function '{function_name}' not found on any server"
@@ -925,24 +1048,21 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
"error": f"Function '{function_name}' not available on any server",
"status": "not_found",
}
-
server_info = server_tool_mapping[function_name]
result = await _execute_tool_on_server(
tool_call=tool_call,
server_info=server_info,
output_type=output_type,
+ transport=transport,
)
result["response_index"] = response_index
return result
- # Execute all tool calls concurrently
tasks = [
execute_with_semaphore(tool_call_info)
for tool_call_info in all_tool_calls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # Process results and handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
@@ -964,7 +1084,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers(
)
else:
processed_results.append(result)
-
logger.info(
f"Completed execution of {len(processed_results)} tool calls"
)
@@ -977,21 +1096,21 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync(
connections: List[MCPConnection] = None,
output_type: Literal["json", "dict", "str", "formatted"] = "str",
max_concurrent: Optional[int] = None,
+ transport: str = "sse",
*args,
**kwargs,
) -> List[Dict[str, Any]]:
"""
Synchronous version of execute_multiple_tools_on_multiple_mcp_servers.
-
Args:
- responses: List of responses containing tool calls (OpenAI format)
- urls: List of MCP server URLs
- connections: Optional list of MCPConnection objects corresponding to each URL
- output_type: Output format type for results
- max_concurrent: Maximum number of concurrent executions
-
+ responses (List[Dict[str, Any]]): List of tool call requests.
+ urls (List[str]): List of server URLs.
+ connections (List[MCPConnection]): Optional list of MCPConnection objects.
+ output_type (Literal): Output format type.
+ max_concurrent (Optional[int]): Max concurrent tasks.
+ transport (str): Transport type.
Returns:
- List of execution results with server metadata
+ List[Dict[str, Any]]: List of execution results.
"""
with get_or_create_event_loop() as loop:
try:
@@ -1002,6 +1121,7 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync(
connections=connections,
output_type=output_type,
max_concurrent=max_concurrent,
+ transport=transport,
*args,
**kwargs,
)
diff --git a/swarms/utils/function_caller_model.py b/swarms/utils/function_caller_model.py
index 36642308..fb9135fc 100644
--- a/swarms/utils/function_caller_model.py
+++ b/swarms/utils/function_caller_model.py
@@ -1,23 +1,11 @@
import os
-import subprocess
from concurrent.futures import ThreadPoolExecutor
from typing import List
-from loguru import logger
from pydantic import BaseModel
-try:
- from openai import OpenAI
-except ImportError:
- logger.error(
- "OpenAI library not found. Please install the OpenAI library by running 'pip install openai'"
- )
- import sys
-
- subprocess.run([sys.executable, "-m", "pip", "install", "openai"])
- from openai import OpenAI
-
+from openai import OpenAI
SUPPORTED_MODELS = [
"o3-mini-2025-1-31",