swarms api as mcp

pull/818/head^2
Kye Gomez 3 days ago
parent da41a7e03f
commit cae0db7b4d

@ -355,6 +355,7 @@ nav:
- Swarm Types: "swarms_cloud/swarm_types.md"
- Swarms Cloud Subscription Tiers: "swarms_cloud/subscription_tiers.md"
- Swarms API Best Practices: "swarms_cloud/best_practices.md"
- Swarms API as MCP: "swarms_cloud/mcp.md"
- Swarm Ecosystem APIs:
- MCS API: "swarms_cloud/mcs_api.md"
# - CreateNow API: "swarms_cloud/create_api.md"

@ -0,0 +1,348 @@
# Swarms API as MCP
- Launch MCP server as a tool
- Put `SWARMS_API_KEY` in `.env`
- Client side code below
## Server Side
```python
# server.py
from datetime import datetime
import os
from typing import Any, Dict, List, Optional
import requests
import httpx
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from swarms import SwarmType
from dotenv import load_dotenv
load_dotenv()
class AgentSpec(BaseModel):
agent_name: Optional[str] = Field(
description="The unique name assigned to the agent, which identifies its role and functionality within the swarm.",
)
description: Optional[str] = Field(
description="A detailed explanation of the agent's purpose, capabilities, and any specific tasks it is designed to perform.",
)
system_prompt: Optional[str] = Field(
description="The initial instruction or context provided to the agent, guiding its behavior and responses during execution.",
)
model_name: Optional[str] = Field(
default="gpt-4o-mini",
description="The name of the AI model that the agent will utilize for processing tasks and generating outputs. For example: gpt-4o, gpt-4o-mini, openai/o3-mini",
)
auto_generate_prompt: Optional[bool] = Field(
default=False,
description="A flag indicating whether the agent should automatically create prompts based on the task requirements.",
)
max_tokens: Optional[int] = Field(
default=8192,
description="The maximum number of tokens that the agent is allowed to generate in its responses, limiting output length.",
)
temperature: Optional[float] = Field(
default=0.5,
description="A parameter that controls the randomness of the agent's output; lower values result in more deterministic responses.",
)
role: Optional[str] = Field(
default="worker",
description="The designated role of the agent within the swarm, which influences its behavior and interaction with other agents.",
)
max_loops: Optional[int] = Field(
default=1,
description="The maximum number of times the agent is allowed to repeat its task, enabling iterative processing if necessary.",
)
# New fields for RAG functionality
rag_collection: Optional[str] = Field(
None,
description="The Qdrant collection name for RAG functionality. If provided, this agent will perform RAG queries.",
)
rag_documents: Optional[List[str]] = Field(
None,
description="Documents to ingest into the Qdrant collection for RAG. (List of text strings)",
)
tools: Optional[List[Dict[str, Any]]] = Field(
None,
description="A dictionary of tools that the agent can use to complete its task.",
)
class AgentCompletion(BaseModel):
"""
Configuration for a single agent that works together as a swarm to accomplish tasks.
"""
agent: AgentSpec = Field(
...,
description="The agent to run.",
)
task: Optional[str] = Field(
...,
description="The task to run.",
)
img: Optional[str] = Field(
None,
description="An optional image URL that may be associated with the swarm's task or representation.",
)
output_type: Optional[str] = Field(
"list",
description="The type of output to return.",
)
class AgentCompletionResponse(BaseModel):
"""
Response from an agent completion.
"""
agent_id: str = Field(
...,
description="The unique identifier for the agent that completed the task.",
)
agent_name: str = Field(
...,
description="The name of the agent that completed the task.",
)
agent_description: str = Field(
...,
description="The description of the agent that completed the task.",
)
messages: Any = Field(
...,
description="The messages from the agent completion.",
)
cost: Dict[str, Any] = Field(
...,
description="The cost of the agent completion.",
)
class Agents(BaseModel):
"""Configuration for a collection of agents that work together as a swarm to accomplish tasks."""
agents: List[AgentSpec] = Field(
description="A list containing the specifications of each agent that will participate in the swarm, detailing their roles and functionalities."
)
class ScheduleSpec(BaseModel):
scheduled_time: datetime = Field(
...,
description="The exact date and time (in UTC) when the swarm is scheduled to execute its tasks.",
)
timezone: Optional[str] = Field(
"UTC",
description="The timezone in which the scheduled time is defined, allowing for proper scheduling across different regions.",
)
class SwarmSpec(BaseModel):
name: Optional[str] = Field(
None,
description="The name of the swarm, which serves as an identifier for the group of agents and their collective task.",
max_length=100,
)
description: Optional[str] = Field(
None,
description="A comprehensive description of the swarm's objectives, capabilities, and intended outcomes.",
)
agents: Optional[List[AgentSpec]] = Field(
None,
description="A list of agents or specifications that define the agents participating in the swarm.",
)
max_loops: Optional[int] = Field(
default=1,
description="The maximum number of execution loops allowed for the swarm, enabling repeated processing if needed.",
)
swarm_type: Optional[SwarmType] = Field(
None,
description="The classification of the swarm, indicating its operational style and methodology.",
)
rearrange_flow: Optional[str] = Field(
None,
description="Instructions on how to rearrange the flow of tasks among agents, if applicable.",
)
task: Optional[str] = Field(
None,
description="The specific task or objective that the swarm is designed to accomplish.",
)
img: Optional[str] = Field(
None,
description="An optional image URL that may be associated with the swarm's task or representation.",
)
return_history: Optional[bool] = Field(
True,
description="A flag indicating whether the swarm should return its execution history along with the final output.",
)
rules: Optional[str] = Field(
None,
description="Guidelines or constraints that govern the behavior and interactions of the agents within the swarm.",
)
schedule: Optional[ScheduleSpec] = Field(
None,
description="Details regarding the scheduling of the swarm's execution, including timing and timezone information.",
)
tasks: Optional[List[str]] = Field(
None,
description="A list of tasks that the swarm should complete.",
)
messages: Optional[List[Dict[str, Any]]] = Field(
None,
description="A list of messages that the swarm should complete.",
)
# rag_on: Optional[bool] = Field(
# None,
# description="A flag indicating whether the swarm should use RAG.",
# )
# collection_name: Optional[str] = Field(
# None,
# description="The name of the collection to use for RAG.",
# )
stream: Optional[bool] = Field(
False,
description="A flag indicating whether the swarm should stream its output.",
)
class SwarmCompletionResponse(BaseModel):
"""
Response from a swarm completion.
"""
status: str = Field(..., description="The status of the swarm completion.")
swarm_name: str = Field(..., description="The name of the swarm.")
description: str = Field(..., description="Description of the swarm.")
swarm_type: str = Field(..., description="The type of the swarm.")
task: str = Field(
..., description="The task that the swarm is designed to accomplish."
)
output: List[Dict[str, Any]] = Field(
..., description="The output generated by the swarm."
)
number_of_agents: int = Field(
..., description="The number of agents involved in the swarm."
)
# "input_config": Optional[Dict[str, Any]] = Field(None, description="The input configuration for the swarm.")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
# Create an MCP server
mcp = FastMCP("swarms-api")
# Add an addition tool
@mcp.tool(name="swarm_completion", description="Run a swarm completion.")
def swarm_completion(swarm: SwarmSpec) -> Dict[str, Any]:
api_key = os.getenv("SWARMS_API_KEY")
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
payload = swarm.model_dump()
response = requests.post(f"{BASE_URL}/v1/swarm/completions", json=payload, headers=headers)
return response.json()
@mcp.tool(name="swarms_available", description="Get the list of available swarms.")
async def swarms_available() -> Any:
"""
Get the list of available swarms.
"""
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/v1/models/available", headers=headers)
response.raise_for_status() # Raise an error for bad responses
return response.json()
if __name__ == "__main__":
mcp.run(transport="sse")
```
## Client side
- Call the tool with it's name and the payload config
```python
import asyncio
from fastmcp import Client
swarm_config = {
"name": "Simple Financial Analysis",
"description": "A swarm to analyze financial data",
"agents": [
{
"agent_name": "Data Analyzer",
"description": "Looks at financial data",
"system_prompt": "Analyze the data.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 1000,
"temperature": 0.5,
"auto_generate_prompt": False,
},
{
"agent_name": "Risk Analyst",
"description": "Checks risk levels",
"system_prompt": "Evaluate the risks.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 1000,
"temperature": 0.5,
"auto_generate_prompt": False,
},
{
"agent_name": "Strategy Checker",
"description": "Validates strategies",
"system_prompt": "Review the strategy.",
"model_name": "gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 1000,
"temperature": 0.5,
"auto_generate_prompt": False,
},
],
"max_loops": 1,
"swarm_type": "SequentialWorkflow",
"task": "Analyze the financial data and provide insights.",
"return_history": False, # Added required field
"stream": False, # Added required field
"rules": None, # Added optional field
"img": None, # Added optional field
}
async def fetch_weather_and_resource():
"""Connect to a server over SSE and fetch available swarms."""
async with Client(
transport="http://localhost:8000/sse"
# SSETransport(
# url="http://localhost:8000/sse",
# headers={"x_api_key": os.getenv("SWARMS_API_KEY"), "Content-Type": "application/json"}
# )
) as client:
# Basic connectivity testing
# print("Ping check:", await client.ping())
# print("Available tools:", await client.list_tools())
# print("Swarms available:", await client.call_tool("swarms_available", None))
result = await client.call_tool("swarm_completion", {"swarm": swarm_config})
print("Swarm completion:", result)
# Execute the function
if __name__ == "__main__":
asyncio.run(fetch_weather_and_resource())
```

@ -12,7 +12,7 @@ agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
max_loops="auto",
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
@ -23,7 +23,7 @@ agent = Agent(
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
interactive=True,
role="director",
)

@ -0,0 +1,72 @@
from swarms.prompts.paper_idea_agent import (
PAPER_IDEA_AGENT_SYSTEM_PROMPT,
)
from swarms import Agent
from swarms.utils.any_to_str import any_to_str
tools = [
{
"type": "function",
"function": {
"name": "generate_paper_idea",
"description": "Generate a structured academic paper idea with all required components.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Concise identifier for the paper idea",
},
"title": {
"type": "string",
"description": "Academic paper title",
},
"short_hypothesis": {
"type": "string",
"description": "Core hypothesis in 1-2 sentences",
},
"related_work": {
"type": "string",
"description": "Key papers and how this differs from existing work",
},
"abstract": {
"type": "string",
"description": "Complete paper abstract",
},
"experiments": {
"type": "string",
"description": "Detailed experimental plan",
},
"risk_factors": {
"type": "string",
"description": "Known challenges and constraints",
},
},
"required": [
"name",
"title",
"short_hypothesis",
"related_work",
"abstract",
"experiments",
"risk_factors",
],
},
},
}
]
agent = Agent(
agent_name="Paper Idea Agent",
agent_role="You are an experienced AI researcher tasked with proposing high-impact research ideas.",
system_prompt=PAPER_IDEA_AGENT_SYSTEM_PROMPT,
tools_list_dictionary=tools,
max_loops=1,
model_name="gpt-4o-mini",
output_type="final",
)
out = agent.run(
"Generate a paper idea for collaborative foundation transformer models"
)
print(any_to_str(out))

@ -0,0 +1,120 @@
import cProfile
import time
from swarms.prompts.paper_idea_agent import (
PAPER_IDEA_AGENT_SYSTEM_PROMPT,
)
from swarms import Agent
from swarms.utils.any_to_str import any_to_str
print("All imports completed...")
tools = [
{
"type": "function",
"function": {
"name": "generate_paper_idea",
"description": "Generate a structured academic paper idea with all required components.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Concise identifier for the paper idea",
},
"title": {
"type": "string",
"description": "Academic paper title",
},
"short_hypothesis": {
"type": "string",
"description": "Core hypothesis in 1-2 sentences",
},
"related_work": {
"type": "string",
"description": "Key papers and how this differs from existing work",
},
"abstract": {
"type": "string",
"description": "Complete paper abstract",
},
"experiments": {
"type": "string",
"description": "Detailed experimental plan",
},
"risk_factors": {
"type": "string",
"description": "Known challenges and constraints",
},
},
"required": [
"name",
"title",
"short_hypothesis",
"related_work",
"abstract",
"experiments",
"risk_factors",
],
},
},
}
]
# agent = Agent(
# agent_name="Paper Idea Agent",
# agent_role="You are an experienced AI researcher tasked with proposing high-impact research ideas.",
# system_prompt=PAPER_IDEA_AGENT_SYSTEM_PROMPT,
# tools_list_dictionary=tools,
# max_loops=1,
# model_name="gpt-4o-mini",
# output_type="final",
# )
def generate_paper_idea():
print("Starting generate_paper_idea function...")
try:
print("Creating agent...")
agent = Agent(
agent_name="Paper Idea Agent",
agent_role="You are an experienced AI researcher tasked with proposing high-impact research ideas.",
system_prompt=PAPER_IDEA_AGENT_SYSTEM_PROMPT,
tools_list_dictionary=tools,
max_loops=1,
model_name="gpt-4o-mini",
output_type="final",
)
print("Agent created, starting run...")
start_time = time.time()
out = agent.run(
"Generate a paper idea for collaborative foundation transformer models"
)
end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")
print("Output:", any_to_str(out))
return out
except Exception as e:
print(f"Error occurred: {str(e)}")
raise
print("Defining main block...")
if __name__ == "__main__":
print("Entering main block...")
# Basic timing first
print("\nRunning basic timing...")
generate_paper_idea()
# Then with profiler
print("\nRunning with profiler...")
profiler = cProfile.Profile()
profiler.enable()
generate_paper_idea()
profiler.disable()
profiler.print_stats(sort="cumulative")
print("Script completed.")

@ -1,4 +1,4 @@
from swarms import Agent, SequentialWorkflow
from swarms import Agent, ConcurrentWorkflow
# Core Legal Agent Definitions with enhanced system prompts
@ -27,7 +27,7 @@ ip_agent = Agent(
)
swarm = SequentialWorkflow(
swarm = ConcurrentWorkflow(
agents=[litigation_agent, corporate_agent, ip_agent],
name="litigation-practice",
description="Handle all aspects of litigation with a focus on thorough legal analysis and effective case management.",

@ -0,0 +1,31 @@
# System Role Definition
PAPER_IDEA_AGENT_SYSTEM_PROMPT = """
You are an experienced AI researcher tasked with proposing high-impact research ideas. Your ideas should:
- Be novel and creative
- Think outside conventional boundaries
- Start from simple, elegant questions or observations
- Be distinguishable from existing literature
- Be feasible within academic lab resources
- Be publishable at top ML conferences
- Be implementable using the provided codebase
Your responses must follow this strict format:
IDEA JSON Structure:
{
"Name": "Concise identifier",
"Title": "Academic paper title",
"Short Hypothesis": "Core hypothesis in 1-2 sentences",
"Related Work": "Key papers and how this differs",
"Abstract": "Complete paper abstract",
"Experiments": "Detailed experimental plan",
"Risk Factors and Limitations": "Known challenges and constraints"
}
Important Guidelines:
- Perform at least one literature search before finalizing any idea
- Ensure JSON formatting is valid for automated parsing
- Keep proposals clear and implementable
"""

@ -1,3 +1,4 @@
import concurrent.futures
import asyncio
import json
import logging
@ -524,6 +525,7 @@ class Agent:
self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary
# self.mcp_servers = mcp_servers
self._cached_llm = (
None # Add this line to cache the LLM instance
)
@ -558,72 +560,76 @@ class Agent:
max_workers=executor_workers
)
# Initialize the tool struct
if (
exists(tools)
or exists(list_base_models)
or exists(tool_schema)
):
self.tool_struct = BaseTool(
tools=tools,
base_models=list_base_models,
tool_system_prompt=tool_system_prompt,
)
# Some common configuration settings
threading.Thread(
target=self.setup_config, daemon=True
).start()
# If docs folder exists then get the docs from docs folder
if exists(self.docs_folder):
threading.Thread(
target=self.get_docs_from_doc_folders
).start()
if tools is not None:
logger.info(
"Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable."
)
# Add the tool prompt to the memory
self.short_memory.add(
role="system", content=tool_system_prompt
)
# Log the tools
logger.info(
f"Tools provided: Accessing {len(tools)} tools"
)
# Transform the tools into an openai schema
# self.convert_tool_into_openai_schema()
# Transform the tools into an openai schema
tool_dict = (
self.tool_struct.convert_tool_into_openai_schema()
)
self.short_memory.add(role="system", content=tool_dict)
self.init_handling()
# Now create a function calling map for every tools
self.function_map = {
tool.__name__: tool for tool in tools
def init_handling(self):
# Define tasks as pairs of (function, condition)
# Each task will only run if its condition is True
tasks = [
(self.setup_config, True), # Always run setup_config
(
self.get_docs_from_doc_folders,
exists(self.docs_folder),
),
(self.handle_tool_init, True), # Always run tool init
(
self.handle_tool_schema_ops,
exists(self.tool_schema)
or exists(self.list_base_models),
),
(
self.handle_sop_ops,
exists(self.sop) or exists(self.sop_list),
),
]
# Filter out tasks whose conditions are False
filtered_tasks = [
task for task, condition in tasks if condition
]
# Execute all tasks concurrently
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count() * 4
) as executor:
# Map tasks to futures and collect results
results = {}
future_to_task = {
executor.submit(task): task.__name__
for task in filtered_tasks
}
# If the tool schema exists or a list of base models exists then convert the tool schema into an openai schema
if exists(tool_schema) or exists(list_base_models):
threading.Thread(
target=self.handle_tool_schema_ops()
).start()
# Wait for each future to complete and collect results/exceptions
for future in concurrent.futures.as_completed(
future_to_task
):
task_name = future_to_task[future]
try:
result = future.result()
results[task_name] = result
logging.info(
f"Task {task_name} completed successfully"
)
except Exception as e:
results[task_name] = None
logging.error(
f"Task {task_name} failed with error: {e}"
)
# Run sequential operations after all concurrent tasks are done
self.agent_output = self.agent_output_model()
log_agent_data(self.to_dict())
# If the sop or sop_list exists then handle the sop ops
if exists(self.sop) or exists(self.sop_list):
threading.Thread(target=self.handle_sop_ops()).start()
if self.llm is None:
self.llm = self.llm_handling()
def agent_output_model(self):
# Many steps
self.agent_output = ManySteps(
agent_id=agent_id,
agent_name=agent_name,
id = agent_id()
return ManySteps(
agent_id=id,
agent_name=self.agent_name,
# run_id=run_id,
task="",
max_loops=self.max_loops,
@ -637,18 +643,6 @@ class Agent:
dynamic_temperature_enabled=self.dynamic_temperature_enabled,
)
# Telemetry Processor to log agent data
log_agent_data(self.to_dict())
if self.llm is None:
self.llm = self.llm_handling()
# if (
# self.tools_list_dictionary is None
# and self.mcp_servers is not None
# ):
# self.tools_list_dictionary = self.mcp_tool_handling()
def llm_handling(self):
# Use cached instance if available
if self._cached_llm is not None:
@ -695,6 +689,48 @@ class Agent:
)
return None
def handle_tool_init(self):
# Initialize the tool struct
if (
exists(self.tools)
or exists(self.list_base_models)
or exists(self.tool_schema)
):
self.tool_struct = BaseTool(
tools=self.tools,
base_models=self.list_base_models,
tool_system_prompt=self.tool_system_prompt,
)
if self.tools is not None:
logger.info(
"Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable."
)
# Add the tool prompt to the memory
self.short_memory.add(
role="system", content=self.tool_system_prompt
)
# Log the tools
logger.info(
f"Tools provided: Accessing {len(self.tools)} tools"
)
# Transform the tools into an openai schema
# self.convert_tool_into_openai_schema()
# Transform the tools into an openai schema
tool_dict = (
self.tool_struct.convert_tool_into_openai_schema()
)
self.short_memory.add(role="system", content=tool_dict)
# Now create a function calling map for every tools
self.function_map = {
tool.__name__: tool for tool in self.tools
}
# def mcp_execution_flow(self, response: any):
# """
# Executes the MCP (Model Context Protocol) flow based on the provided response.
@ -955,14 +991,24 @@ class Agent:
agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True)
"""
try:
self.check_if_no_prompt_then_autogenerate(task)
# Add task to memory
self.short_memory.add(role=self.user_name, content=task)
# 1. Batch process initial setup
setup_tasks = [
lambda: self.check_if_no_prompt_then_autogenerate(
task
),
lambda: self.short_memory.add(
role=self.user_name, content=task
),
lambda: (
self.plan(task) if self.plan_enabled else None
),
]
# Plan
if self.plan_enabled is True:
self.plan(task)
# Execute setup tasks concurrently
with ThreadPoolExecutor(
max_workers=len(setup_tasks)
) as executor:
executor.map(lambda f: f(), setup_tasks)
# Set the loop count
loop_count = 0
@ -1035,15 +1081,31 @@ class Agent:
# Convert to a str if the response is not a str
response = self.parse_llm_output(response)
self.short_memory.add(
role=self.agent_name, content=response
)
# self.short_memory.add(
# role=self.agent_name, content=response
# )
# # Print
# self.pretty_print(response, loop_count)
# # Output Cleaner
# self.output_cleaner_op(response)
# Print
self.pretty_print(response, loop_count)
# 9. Batch memory updates and prints
update_tasks = [
lambda: self.short_memory.add(
role=self.agent_name, content=response
),
lambda: self.pretty_print(
response, loop_count
),
lambda: self.output_cleaner_op(response),
]
# Output Cleaner
self.output_cleaner_op(response)
with ThreadPoolExecutor(
max_workers=len(update_tasks)
) as executor:
executor.map(lambda f: f(), update_tasks)
# Check and execute tools
if self.tools is not None:
@ -1120,7 +1182,7 @@ class Agent:
break
if self.interactive:
logger.info("Interactive mode enabled.")
# logger.info("Interactive mode enabled.")
user_input = input("You: ")
# User-defined exit command
@ -1147,10 +1209,21 @@ class Agent:
if self.autosave is True:
self.save()
log_agent_data(self.to_dict())
# log_agent_data(self.to_dict())
if self.autosave is True:
self.save()
# if self.autosave is True:
# self.save()
# 14. Batch final operations
final_tasks = [
lambda: log_agent_data(self.to_dict()),
lambda: self.save() if self.autosave else None,
]
with ThreadPoolExecutor(
max_workers=len(final_tasks)
) as executor:
executor.map(lambda f: f(), final_tasks)
return history_output_formatter(
self.short_memory, type=self.output_type
@ -1212,12 +1285,6 @@ class Agent:
self.run,
task=task,
img=img,
is_last=is_last,
device=device,
device_id=device_id,
all_cores=all_cores,
do_not_use_cluster_ops=do_not_use_cluster_ops,
all_gpus=all_gpus,
*args,
**kwargs,
)
@ -1253,12 +1320,7 @@ class Agent:
return self.run(
task=task,
img=img,
is_last=is_last,
device=device,
device_id=device_id,
all_cores=all_cores,
do_not_use_cluster_ops=do_not_use_cluster_ops,
all_gpus=all_gpus * args,
*args,
**kwargs,
)
except Exception as error:

Loading…
Cancel
Save