From cae0db7b4d23d6e800a07ed62327f883a2bd33a0 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Tue, 15 Apr 2025 16:35:38 -0700 Subject: [PATCH 1/2] swarms api as mcp --- docs/mkdocs.yml | 1 + docs/swarms_cloud/mcp.md | 348 +++++++++++++++++++++++++++++ example.py | 4 +- paper_idea_agent.py | 72 ++++++ paper_idea_profile.py | 120 ++++++++++ sequential_swarm_example.py | 4 +- swarms/prompts/paper_idea_agent.py | 31 +++ swarms/structs/agent.py | 266 +++++++++++++--------- 8 files changed, 740 insertions(+), 106 deletions(-) create mode 100644 docs/swarms_cloud/mcp.md create mode 100644 paper_idea_agent.py create mode 100644 paper_idea_profile.py create mode 100644 swarms/prompts/paper_idea_agent.py diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index e60f3646..e6473822 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -355,6 +355,7 @@ nav: - Swarm Types: "swarms_cloud/swarm_types.md" - Swarms Cloud Subscription Tiers: "swarms_cloud/subscription_tiers.md" - Swarms API Best Practices: "swarms_cloud/best_practices.md" + - Swarms API as MCP: "swarms_cloud/mcp.md" - Swarm Ecosystem APIs: - MCS API: "swarms_cloud/mcs_api.md" # - CreateNow API: "swarms_cloud/create_api.md" diff --git a/docs/swarms_cloud/mcp.md b/docs/swarms_cloud/mcp.md new file mode 100644 index 00000000..848281f9 --- /dev/null +++ b/docs/swarms_cloud/mcp.md @@ -0,0 +1,348 @@ +# Swarms API as MCP + +- Launch MCP server as a tool +- Put `SWARMS_API_KEY` in `.env` +- Client side code below + + +## Server Side + +```python +# server.py +from datetime import datetime +import os +from typing import Any, Dict, List, Optional + +import requests +import httpx +from fastmcp import FastMCP +from pydantic import BaseModel, Field +from swarms import SwarmType +from dotenv import load_dotenv + +load_dotenv() + +class AgentSpec(BaseModel): + agent_name: Optional[str] = Field( + description="The unique name assigned to the agent, which identifies its role and functionality within the swarm.", + ) + description: Optional[str] = Field( + description="A detailed explanation of the agent's purpose, capabilities, and any specific tasks it is designed to perform.", + ) + system_prompt: Optional[str] = Field( + description="The initial instruction or context provided to the agent, guiding its behavior and responses during execution.", + ) + model_name: Optional[str] = Field( + default="gpt-4o-mini", + description="The name of the AI model that the agent will utilize for processing tasks and generating outputs. For example: gpt-4o, gpt-4o-mini, openai/o3-mini", + ) + auto_generate_prompt: Optional[bool] = Field( + default=False, + description="A flag indicating whether the agent should automatically create prompts based on the task requirements.", + ) + max_tokens: Optional[int] = Field( + default=8192, + description="The maximum number of tokens that the agent is allowed to generate in its responses, limiting output length.", + ) + temperature: Optional[float] = Field( + default=0.5, + description="A parameter that controls the randomness of the agent's output; lower values result in more deterministic responses.", + ) + role: Optional[str] = Field( + default="worker", + description="The designated role of the agent within the swarm, which influences its behavior and interaction with other agents.", + ) + max_loops: Optional[int] = Field( + default=1, + description="The maximum number of times the agent is allowed to repeat its task, enabling iterative processing if necessary.", + ) + # New fields for RAG functionality + rag_collection: Optional[str] = Field( + None, + description="The Qdrant collection name for RAG functionality. If provided, this agent will perform RAG queries.", + ) + rag_documents: Optional[List[str]] = Field( + None, + description="Documents to ingest into the Qdrant collection for RAG. (List of text strings)", + ) + tools: Optional[List[Dict[str, Any]]] = Field( + None, + description="A dictionary of tools that the agent can use to complete its task.", + ) + + +class AgentCompletion(BaseModel): + """ + Configuration for a single agent that works together as a swarm to accomplish tasks. + """ + + agent: AgentSpec = Field( + ..., + description="The agent to run.", + ) + task: Optional[str] = Field( + ..., + description="The task to run.", + ) + img: Optional[str] = Field( + None, + description="An optional image URL that may be associated with the swarm's task or representation.", + ) + output_type: Optional[str] = Field( + "list", + description="The type of output to return.", + ) + + +class AgentCompletionResponse(BaseModel): + """ + Response from an agent completion. + """ + + agent_id: str = Field( + ..., + description="The unique identifier for the agent that completed the task.", + ) + agent_name: str = Field( + ..., + description="The name of the agent that completed the task.", + ) + agent_description: str = Field( + ..., + description="The description of the agent that completed the task.", + ) + messages: Any = Field( + ..., + description="The messages from the agent completion.", + ) + + cost: Dict[str, Any] = Field( + ..., + description="The cost of the agent completion.", + ) + + +class Agents(BaseModel): + """Configuration for a collection of agents that work together as a swarm to accomplish tasks.""" + + agents: List[AgentSpec] = Field( + description="A list containing the specifications of each agent that will participate in the swarm, detailing their roles and functionalities." + ) + + +class ScheduleSpec(BaseModel): + scheduled_time: datetime = Field( + ..., + description="The exact date and time (in UTC) when the swarm is scheduled to execute its tasks.", + ) + timezone: Optional[str] = Field( + "UTC", + description="The timezone in which the scheduled time is defined, allowing for proper scheduling across different regions.", + ) + + +class SwarmSpec(BaseModel): + name: Optional[str] = Field( + None, + description="The name of the swarm, which serves as an identifier for the group of agents and their collective task.", + max_length=100, + ) + description: Optional[str] = Field( + None, + description="A comprehensive description of the swarm's objectives, capabilities, and intended outcomes.", + ) + agents: Optional[List[AgentSpec]] = Field( + None, + description="A list of agents or specifications that define the agents participating in the swarm.", + ) + max_loops: Optional[int] = Field( + default=1, + description="The maximum number of execution loops allowed for the swarm, enabling repeated processing if needed.", + ) + swarm_type: Optional[SwarmType] = Field( + None, + description="The classification of the swarm, indicating its operational style and methodology.", + ) + rearrange_flow: Optional[str] = Field( + None, + description="Instructions on how to rearrange the flow of tasks among agents, if applicable.", + ) + task: Optional[str] = Field( + None, + description="The specific task or objective that the swarm is designed to accomplish.", + ) + img: Optional[str] = Field( + None, + description="An optional image URL that may be associated with the swarm's task or representation.", + ) + return_history: Optional[bool] = Field( + True, + description="A flag indicating whether the swarm should return its execution history along with the final output.", + ) + rules: Optional[str] = Field( + None, + description="Guidelines or constraints that govern the behavior and interactions of the agents within the swarm.", + ) + schedule: Optional[ScheduleSpec] = Field( + None, + description="Details regarding the scheduling of the swarm's execution, including timing and timezone information.", + ) + tasks: Optional[List[str]] = Field( + None, + description="A list of tasks that the swarm should complete.", + ) + messages: Optional[List[Dict[str, Any]]] = Field( + None, + description="A list of messages that the swarm should complete.", + ) + # rag_on: Optional[bool] = Field( + # None, + # description="A flag indicating whether the swarm should use RAG.", + # ) + # collection_name: Optional[str] = Field( + # None, + # description="The name of the collection to use for RAG.", + # ) + stream: Optional[bool] = Field( + False, + description="A flag indicating whether the swarm should stream its output.", + ) + + +class SwarmCompletionResponse(BaseModel): + """ + Response from a swarm completion. + """ + + status: str = Field(..., description="The status of the swarm completion.") + swarm_name: str = Field(..., description="The name of the swarm.") + description: str = Field(..., description="Description of the swarm.") + swarm_type: str = Field(..., description="The type of the swarm.") + task: str = Field( + ..., description="The task that the swarm is designed to accomplish." + ) + output: List[Dict[str, Any]] = Field( + ..., description="The output generated by the swarm." + ) + number_of_agents: int = Field( + ..., description="The number of agents involved in the swarm." + ) + # "input_config": Optional[Dict[str, Any]] = Field(None, description="The input configuration for the swarm.") + + +BASE_URL = "https://swarms-api-285321057562.us-east1.run.app" + + +# Create an MCP server +mcp = FastMCP("swarms-api") + + +# Add an addition tool +@mcp.tool(name="swarm_completion", description="Run a swarm completion.") +def swarm_completion(swarm: SwarmSpec) -> Dict[str, Any]: + api_key = os.getenv("SWARMS_API_KEY") + headers = {"x-api-key": api_key, "Content-Type": "application/json"} + + payload = swarm.model_dump() + + response = requests.post(f"{BASE_URL}/v1/swarm/completions", json=payload, headers=headers) + + return response.json() + +@mcp.tool(name="swarms_available", description="Get the list of available swarms.") +async def swarms_available() -> Any: + """ + Get the list of available swarms. + """ + headers = {"Content-Type": "application/json"} + + async with httpx.AsyncClient() as client: + response = await client.get(f"{BASE_URL}/v1/models/available", headers=headers) + response.raise_for_status() # Raise an error for bad responses + return response.json() + + +if __name__ == "__main__": + mcp.run(transport="sse") +``` + +## Client side + +- Call the tool with it's name and the payload config + +```python +import asyncio +from fastmcp import Client + +swarm_config = { + "name": "Simple Financial Analysis", + "description": "A swarm to analyze financial data", + "agents": [ + { + "agent_name": "Data Analyzer", + "description": "Looks at financial data", + "system_prompt": "Analyze the data.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1, + "max_tokens": 1000, + "temperature": 0.5, + "auto_generate_prompt": False, + }, + { + "agent_name": "Risk Analyst", + "description": "Checks risk levels", + "system_prompt": "Evaluate the risks.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1, + "max_tokens": 1000, + "temperature": 0.5, + "auto_generate_prompt": False, + }, + { + "agent_name": "Strategy Checker", + "description": "Validates strategies", + "system_prompt": "Review the strategy.", + "model_name": "gpt-4o", + "role": "worker", + "max_loops": 1, + "max_tokens": 1000, + "temperature": 0.5, + "auto_generate_prompt": False, + }, + ], + "max_loops": 1, + "swarm_type": "SequentialWorkflow", + "task": "Analyze the financial data and provide insights.", + "return_history": False, # Added required field + "stream": False, # Added required field + "rules": None, # Added optional field + "img": None, # Added optional field +} + + +async def fetch_weather_and_resource(): + """Connect to a server over SSE and fetch available swarms.""" + + async with Client( + transport="http://localhost:8000/sse" + # SSETransport( + # url="http://localhost:8000/sse", + # headers={"x_api_key": os.getenv("SWARMS_API_KEY"), "Content-Type": "application/json"} + # ) + ) as client: + # Basic connectivity testing + # print("Ping check:", await client.ping()) + # print("Available tools:", await client.list_tools()) + # print("Swarms available:", await client.call_tool("swarms_available", None)) + result = await client.call_tool("swarm_completion", {"swarm": swarm_config}) + print("Swarm completion:", result) + + +# Execute the function +if __name__ == "__main__": + asyncio.run(fetch_weather_and_resource()) + + +``` \ No newline at end of file diff --git a/example.py b/example.py index b6205225..846d015c 100644 --- a/example.py +++ b/example.py @@ -12,7 +12,7 @@ agent = Agent( agent_name="Financial-Analysis-Agent", agent_description="Personal finance advisor agent", system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - max_loops=1, + max_loops="auto", model_name="gpt-4o-mini", dynamic_temperature_enabled=True, user_name="swarms_corp", @@ -23,7 +23,7 @@ agent = Agent( auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task max_tokens=4000, # max output tokens saved_state_path="agent_00.json", - interactive=False, + interactive=True, role="director", ) diff --git a/paper_idea_agent.py b/paper_idea_agent.py new file mode 100644 index 00000000..231d747f --- /dev/null +++ b/paper_idea_agent.py @@ -0,0 +1,72 @@ +from swarms.prompts.paper_idea_agent import ( + PAPER_IDEA_AGENT_SYSTEM_PROMPT, +) +from swarms import Agent +from swarms.utils.any_to_str import any_to_str + +tools = [ + { + "type": "function", + "function": { + "name": "generate_paper_idea", + "description": "Generate a structured academic paper idea with all required components.", + "parameters": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Concise identifier for the paper idea", + }, + "title": { + "type": "string", + "description": "Academic paper title", + }, + "short_hypothesis": { + "type": "string", + "description": "Core hypothesis in 1-2 sentences", + }, + "related_work": { + "type": "string", + "description": "Key papers and how this differs from existing work", + }, + "abstract": { + "type": "string", + "description": "Complete paper abstract", + }, + "experiments": { + "type": "string", + "description": "Detailed experimental plan", + }, + "risk_factors": { + "type": "string", + "description": "Known challenges and constraints", + }, + }, + "required": [ + "name", + "title", + "short_hypothesis", + "related_work", + "abstract", + "experiments", + "risk_factors", + ], + }, + }, + } +] + +agent = Agent( + agent_name="Paper Idea Agent", + agent_role="You are an experienced AI researcher tasked with proposing high-impact research ideas.", + system_prompt=PAPER_IDEA_AGENT_SYSTEM_PROMPT, + tools_list_dictionary=tools, + max_loops=1, + model_name="gpt-4o-mini", + output_type="final", +) + +out = agent.run( + "Generate a paper idea for collaborative foundation transformer models" +) +print(any_to_str(out)) diff --git a/paper_idea_profile.py b/paper_idea_profile.py new file mode 100644 index 00000000..84c84d61 --- /dev/null +++ b/paper_idea_profile.py @@ -0,0 +1,120 @@ +import cProfile +import time + +from swarms.prompts.paper_idea_agent import ( + PAPER_IDEA_AGENT_SYSTEM_PROMPT, +) +from swarms import Agent +from swarms.utils.any_to_str import any_to_str + +print("All imports completed...") + + +tools = [ + { + "type": "function", + "function": { + "name": "generate_paper_idea", + "description": "Generate a structured academic paper idea with all required components.", + "parameters": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Concise identifier for the paper idea", + }, + "title": { + "type": "string", + "description": "Academic paper title", + }, + "short_hypothesis": { + "type": "string", + "description": "Core hypothesis in 1-2 sentences", + }, + "related_work": { + "type": "string", + "description": "Key papers and how this differs from existing work", + }, + "abstract": { + "type": "string", + "description": "Complete paper abstract", + }, + "experiments": { + "type": "string", + "description": "Detailed experimental plan", + }, + "risk_factors": { + "type": "string", + "description": "Known challenges and constraints", + }, + }, + "required": [ + "name", + "title", + "short_hypothesis", + "related_work", + "abstract", + "experiments", + "risk_factors", + ], + }, + }, + } +] + + +# agent = Agent( +# agent_name="Paper Idea Agent", +# agent_role="You are an experienced AI researcher tasked with proposing high-impact research ideas.", +# system_prompt=PAPER_IDEA_AGENT_SYSTEM_PROMPT, +# tools_list_dictionary=tools, +# max_loops=1, +# model_name="gpt-4o-mini", +# output_type="final", +# ) +def generate_paper_idea(): + print("Starting generate_paper_idea function...") + try: + print("Creating agent...") + agent = Agent( + agent_name="Paper Idea Agent", + agent_role="You are an experienced AI researcher tasked with proposing high-impact research ideas.", + system_prompt=PAPER_IDEA_AGENT_SYSTEM_PROMPT, + tools_list_dictionary=tools, + max_loops=1, + model_name="gpt-4o-mini", + output_type="final", + ) + + print("Agent created, starting run...") + start_time = time.time() + out = agent.run( + "Generate a paper idea for collaborative foundation transformer models" + ) + end_time = time.time() + + print(f"Execution time: {end_time - start_time:.2f} seconds") + print("Output:", any_to_str(out)) + return out + except Exception as e: + print(f"Error occurred: {str(e)}") + raise + + +print("Defining main block...") +if __name__ == "__main__": + print("Entering main block...") + + # Basic timing first + print("\nRunning basic timing...") + generate_paper_idea() + + # Then with profiler + print("\nRunning with profiler...") + profiler = cProfile.Profile() + profiler.enable() + generate_paper_idea() + profiler.disable() + profiler.print_stats(sort="cumulative") + +print("Script completed.") diff --git a/sequential_swarm_example.py b/sequential_swarm_example.py index 2c73b2cb..8667e4a6 100644 --- a/sequential_swarm_example.py +++ b/sequential_swarm_example.py @@ -1,4 +1,4 @@ -from swarms import Agent, SequentialWorkflow +from swarms import Agent, ConcurrentWorkflow # Core Legal Agent Definitions with enhanced system prompts @@ -27,7 +27,7 @@ ip_agent = Agent( ) -swarm = SequentialWorkflow( +swarm = ConcurrentWorkflow( agents=[litigation_agent, corporate_agent, ip_agent], name="litigation-practice", description="Handle all aspects of litigation with a focus on thorough legal analysis and effective case management.", diff --git a/swarms/prompts/paper_idea_agent.py b/swarms/prompts/paper_idea_agent.py new file mode 100644 index 00000000..755c92a4 --- /dev/null +++ b/swarms/prompts/paper_idea_agent.py @@ -0,0 +1,31 @@ +# System Role Definition +PAPER_IDEA_AGENT_SYSTEM_PROMPT = """ +You are an experienced AI researcher tasked with proposing high-impact research ideas. Your ideas should: + +- Be novel and creative +- Think outside conventional boundaries +- Start from simple, elegant questions or observations +- Be distinguishable from existing literature +- Be feasible within academic lab resources +- Be publishable at top ML conferences +- Be implementable using the provided codebase + +Your responses must follow this strict format: + + +IDEA JSON Structure: + { + "Name": "Concise identifier", + "Title": "Academic paper title", + "Short Hypothesis": "Core hypothesis in 1-2 sentences", + "Related Work": "Key papers and how this differs", + "Abstract": "Complete paper abstract", + "Experiments": "Detailed experimental plan", + "Risk Factors and Limitations": "Known challenges and constraints" + } + +Important Guidelines: +- Perform at least one literature search before finalizing any idea +- Ensure JSON formatting is valid for automated parsing +- Keep proposals clear and implementable +""" diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index f9b0417f..c45d039b 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1,3 +1,4 @@ +import concurrent.futures import asyncio import json import logging @@ -524,6 +525,7 @@ class Agent: self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary # self.mcp_servers = mcp_servers + self._cached_llm = ( None # Add this line to cache the LLM instance ) @@ -558,72 +560,76 @@ class Agent: max_workers=executor_workers ) - # Initialize the tool struct - if ( - exists(tools) - or exists(list_base_models) - or exists(tool_schema) - ): - - self.tool_struct = BaseTool( - tools=tools, - base_models=list_base_models, - tool_system_prompt=tool_system_prompt, - ) - - # Some common configuration settings - threading.Thread( - target=self.setup_config, daemon=True - ).start() - - # If docs folder exists then get the docs from docs folder - if exists(self.docs_folder): - threading.Thread( - target=self.get_docs_from_doc_folders - ).start() - - if tools is not None: - logger.info( - "Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable." - ) - # Add the tool prompt to the memory - self.short_memory.add( - role="system", content=tool_system_prompt - ) - - # Log the tools - logger.info( - f"Tools provided: Accessing {len(tools)} tools" - ) - - # Transform the tools into an openai schema - # self.convert_tool_into_openai_schema() - - # Transform the tools into an openai schema - tool_dict = ( - self.tool_struct.convert_tool_into_openai_schema() - ) - self.short_memory.add(role="system", content=tool_dict) + self.init_handling() - # Now create a function calling map for every tools - self.function_map = { - tool.__name__: tool for tool in tools + def init_handling(self): + # Define tasks as pairs of (function, condition) + # Each task will only run if its condition is True + tasks = [ + (self.setup_config, True), # Always run setup_config + ( + self.get_docs_from_doc_folders, + exists(self.docs_folder), + ), + (self.handle_tool_init, True), # Always run tool init + ( + self.handle_tool_schema_ops, + exists(self.tool_schema) + or exists(self.list_base_models), + ), + ( + self.handle_sop_ops, + exists(self.sop) or exists(self.sop_list), + ), + ] + + # Filter out tasks whose conditions are False + filtered_tasks = [ + task for task, condition in tasks if condition + ] + + # Execute all tasks concurrently + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count() * 4 + ) as executor: + # Map tasks to futures and collect results + results = {} + future_to_task = { + executor.submit(task): task.__name__ + for task in filtered_tasks } - # If the tool schema exists or a list of base models exists then convert the tool schema into an openai schema - if exists(tool_schema) or exists(list_base_models): - threading.Thread( - target=self.handle_tool_schema_ops() - ).start() + # Wait for each future to complete and collect results/exceptions + for future in concurrent.futures.as_completed( + future_to_task + ): + task_name = future_to_task[future] + try: + result = future.result() + results[task_name] = result + logging.info( + f"Task {task_name} completed successfully" + ) + except Exception as e: + results[task_name] = None + logging.error( + f"Task {task_name} failed with error: {e}" + ) + + # Run sequential operations after all concurrent tasks are done + self.agent_output = self.agent_output_model() + log_agent_data(self.to_dict()) - # If the sop or sop_list exists then handle the sop ops - if exists(self.sop) or exists(self.sop_list): - threading.Thread(target=self.handle_sop_ops()).start() + if self.llm is None: + self.llm = self.llm_handling() + def agent_output_model(self): # Many steps - self.agent_output = ManySteps( - agent_id=agent_id, - agent_name=agent_name, + id = agent_id() + + return ManySteps( + agent_id=id, + agent_name=self.agent_name, # run_id=run_id, task="", max_loops=self.max_loops, @@ -637,18 +643,6 @@ class Agent: dynamic_temperature_enabled=self.dynamic_temperature_enabled, ) - # Telemetry Processor to log agent data - log_agent_data(self.to_dict()) - - if self.llm is None: - self.llm = self.llm_handling() - - # if ( - # self.tools_list_dictionary is None - # and self.mcp_servers is not None - # ): - # self.tools_list_dictionary = self.mcp_tool_handling() - def llm_handling(self): # Use cached instance if available if self._cached_llm is not None: @@ -695,6 +689,48 @@ class Agent: ) return None + def handle_tool_init(self): + # Initialize the tool struct + if ( + exists(self.tools) + or exists(self.list_base_models) + or exists(self.tool_schema) + ): + + self.tool_struct = BaseTool( + tools=self.tools, + base_models=self.list_base_models, + tool_system_prompt=self.tool_system_prompt, + ) + + if self.tools is not None: + logger.info( + "Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable." + ) + # Add the tool prompt to the memory + self.short_memory.add( + role="system", content=self.tool_system_prompt + ) + + # Log the tools + logger.info( + f"Tools provided: Accessing {len(self.tools)} tools" + ) + + # Transform the tools into an openai schema + # self.convert_tool_into_openai_schema() + + # Transform the tools into an openai schema + tool_dict = ( + self.tool_struct.convert_tool_into_openai_schema() + ) + self.short_memory.add(role="system", content=tool_dict) + + # Now create a function calling map for every tools + self.function_map = { + tool.__name__: tool for tool in self.tools + } + # def mcp_execution_flow(self, response: any): # """ # Executes the MCP (Model Context Protocol) flow based on the provided response. @@ -955,14 +991,24 @@ class Agent: agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) """ try: - self.check_if_no_prompt_then_autogenerate(task) - - # Add task to memory - self.short_memory.add(role=self.user_name, content=task) + # 1. Batch process initial setup + setup_tasks = [ + lambda: self.check_if_no_prompt_then_autogenerate( + task + ), + lambda: self.short_memory.add( + role=self.user_name, content=task + ), + lambda: ( + self.plan(task) if self.plan_enabled else None + ), + ] - # Plan - if self.plan_enabled is True: - self.plan(task) + # Execute setup tasks concurrently + with ThreadPoolExecutor( + max_workers=len(setup_tasks) + ) as executor: + executor.map(lambda f: f(), setup_tasks) # Set the loop count loop_count = 0 @@ -1035,15 +1081,31 @@ class Agent: # Convert to a str if the response is not a str response = self.parse_llm_output(response) - self.short_memory.add( - role=self.agent_name, content=response - ) + # self.short_memory.add( + # role=self.agent_name, content=response + # ) + + # # Print + # self.pretty_print(response, loop_count) + + # # Output Cleaner + # self.output_cleaner_op(response) - # Print - self.pretty_print(response, loop_count) + # 9. Batch memory updates and prints + update_tasks = [ + lambda: self.short_memory.add( + role=self.agent_name, content=response + ), + lambda: self.pretty_print( + response, loop_count + ), + lambda: self.output_cleaner_op(response), + ] - # Output Cleaner - self.output_cleaner_op(response) + with ThreadPoolExecutor( + max_workers=len(update_tasks) + ) as executor: + executor.map(lambda f: f(), update_tasks) # Check and execute tools if self.tools is not None: @@ -1120,7 +1182,7 @@ class Agent: break if self.interactive: - logger.info("Interactive mode enabled.") + # logger.info("Interactive mode enabled.") user_input = input("You: ") # User-defined exit command @@ -1147,10 +1209,21 @@ class Agent: if self.autosave is True: self.save() - log_agent_data(self.to_dict()) + # log_agent_data(self.to_dict()) - if self.autosave is True: - self.save() + # if self.autosave is True: + # self.save() + + # 14. Batch final operations + final_tasks = [ + lambda: log_agent_data(self.to_dict()), + lambda: self.save() if self.autosave else None, + ] + + with ThreadPoolExecutor( + max_workers=len(final_tasks) + ) as executor: + executor.map(lambda f: f(), final_tasks) return history_output_formatter( self.short_memory, type=self.output_type @@ -1212,12 +1285,6 @@ class Agent: self.run, task=task, img=img, - is_last=is_last, - device=device, - device_id=device_id, - all_cores=all_cores, - do_not_use_cluster_ops=do_not_use_cluster_ops, - all_gpus=all_gpus, *args, **kwargs, ) @@ -1253,12 +1320,7 @@ class Agent: return self.run( task=task, img=img, - is_last=is_last, - device=device, - device_id=device_id, - all_cores=all_cores, - do_not_use_cluster_ops=do_not_use_cluster_ops, - all_gpus=all_gpus * args, + *args, **kwargs, ) except Exception as error: From 175bd05ce59680a6f3add790d6506de9238111af Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Tue, 15 Apr 2025 16:40:31 -0700 Subject: [PATCH 2/2] cleanup --- docs/swarms_cloud/mcp.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/swarms_cloud/mcp.md b/docs/swarms_cloud/mcp.md index 848281f9..3098b82e 100644 --- a/docs/swarms_cloud/mcp.md +++ b/docs/swarms_cloud/mcp.md @@ -322,15 +322,11 @@ swarm_config = { } -async def fetch_weather_and_resource(): +async def swarm_completion(): """Connect to a server over SSE and fetch available swarms.""" async with Client( transport="http://localhost:8000/sse" - # SSETransport( - # url="http://localhost:8000/sse", - # headers={"x_api_key": os.getenv("SWARMS_API_KEY"), "Content-Type": "application/json"} - # ) ) as client: # Basic connectivity testing # print("Ping check:", await client.ping()) @@ -342,7 +338,7 @@ async def fetch_weather_and_resource(): # Execute the function if __name__ == "__main__": - asyncio.run(fetch_weather_and_resource()) + asyncio.run(swarm_completion()) ``` \ No newline at end of file