From 273a823f35c75cca6f67f5e811a250d240fea945 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 21 Jul 2025 11:33:00 +0000 Subject: [PATCH 01/11] Bump codacy/codacy-analysis-cli-action from 4.4.5 to 4.4.7 Bumps [codacy/codacy-analysis-cli-action](https://github.com/codacy/codacy-analysis-cli-action) from 4.4.5 to 4.4.7. - [Release notes](https://github.com/codacy/codacy-analysis-cli-action/releases) - [Commits](https://github.com/codacy/codacy-analysis-cli-action/compare/97bf5df3c09e75f5bcd72695998f96ebd701846e...562ee3e92b8e92df8b67e0a5ff8aa8e261919c08) --- updated-dependencies: - dependency-name: codacy/codacy-analysis-cli-action dependency-version: 4.4.7 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- .github/workflows/codacy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/codacy.yml b/.github/workflows/codacy.yml index 06d9d87e..23466431 100644 --- a/.github/workflows/codacy.yml +++ b/.github/workflows/codacy.yml @@ -24,7 +24,7 @@ jobs: uses: actions/checkout@v4 # Execute Codacy Analysis CLI and generate a SARIF output with the security issues identified during the analysis - name: Run Codacy Analysis CLI - uses: codacy/codacy-analysis-cli-action@97bf5df3c09e75f5bcd72695998f96ebd701846e + uses: codacy/codacy-analysis-cli-action@562ee3e92b8e92df8b67e0a5ff8aa8e261919c08 with: # Check https://github.com/codacy/codacy-analysis-cli#project-token to # get your project token from your Codacy repository From b3aabd0a84786df289044ea1789fd2d336213214 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 21 Jul 2025 11:37:04 +0000 Subject: [PATCH 02/11] Bump actions/first-interaction from 1.3.0 to 2.0.0 Bumps [actions/first-interaction](https://github.com/actions/first-interaction) from 1.3.0 to 2.0.0. - [Release notes](https://github.com/actions/first-interaction/releases) - [Commits](https://github.com/actions/first-interaction/compare/v1.3.0...v2.0.0) --- updated-dependencies: - dependency-name: actions/first-interaction dependency-version: 2.0.0 dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/welcome.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/welcome.yml b/.github/workflows/welcome.yml index dd16f9c3..9372a9dc 100644 --- a/.github/workflows/welcome.yml +++ b/.github/workflows/welcome.yml @@ -11,7 +11,7 @@ jobs: permissions: write-all runs-on: ubuntu-latest steps: - - uses: actions/first-interaction@v1.3.0 + - uses: actions/first-interaction@v2.0.0 with: repo-token: ${{ secrets.GITHUB_TOKEN }} issue-message: From 43a92cea711f7c4abf5d00c5555bfcad9c01931f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 23 Jul 2025 04:17:44 +0000 Subject: [PATCH 03/11] Update ruff requirement from >=0.5.1,<0.12.4 to >=0.5.1,<0.12.5 --- updated-dependencies: - dependency-name: ruff dependency-version: 0.12.4 dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6291af13..4847953c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,7 +86,7 @@ swarms = "swarms.cli.main:main" [tool.poetry.group.lint.dependencies] black = ">=23.1,<26.0" -ruff = ">=0.5.1,<0.12.4" +ruff = ">=0.5.1,<0.12.5" types-toml = "^0.10.8.1" types-pytz = ">=2023.3,<2026.0" types-chardet = "^5.0.4.6" From 68849e7b5db224e9959fdefc618aa5c8a470ff75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A5=A5=E5=AE=87?= <625024108@qq.com> Date: Wed, 23 Jul 2025 16:11:08 +0800 Subject: [PATCH 04/11] fix random_speaker --- swarms/structs/interactive_groupchat.py | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/swarms/structs/interactive_groupchat.py b/swarms/structs/interactive_groupchat.py index 5613e9ff..9c6a4964 100644 --- a/swarms/structs/interactive_groupchat.py +++ b/swarms/structs/interactive_groupchat.py @@ -947,6 +947,30 @@ Remember: You are part of a team. Your response should reflect that you've read, for agent_name in speaking_order: self._get_agent_response(agent_name, img, imgs) + def _process_random_speaker( + self, + mentioned_agents: List[str], + img: Optional[str], + imgs: Optional[List[str]], + ) -> None: + """ + Process responses using the random speaker function. + This function randomly selects a single agent from the mentioned agents + to respond to the user query. + """ + # Filter out invalid agents + valid_agents = [name for name in mentioned_agents if name in self.agent_map] + + if not valid_agents: + raise AgentNotFoundError("No valid agents found in the conversation") + + # Randomly select exactly one agent to respond + random_agent = random.choice(valid_agents) + logger.info(f"Random speaker selected: {random_agent}") + + # Get response from the randomly selected agent + self._get_agent_response(random_agent, img, imgs) + def run( self, task: str, @@ -972,6 +996,11 @@ Remember: You are part of a team. Your response should reflect that you've read, self._process_dynamic_speakers( mentioned_agents, img, imgs ) + elif self.speaker_function == random_speaker: + # Use the specialized function for random_speaker + self._process_random_speaker( + mentioned_agents, img, imgs + ) else: self._process_static_speakers( mentioned_agents, img, imgs From 69aced2bf8b5c615555c8958b39f826e86dd048f Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Thu, 24 Jul 2025 20:23:09 +0530 Subject: [PATCH 05/11] fix streaming in ConcurrentWorkflow ! --- swarms/structs/agent.py | 26 +++++++++++++++++++++++++- swarms/structs/concurrent_workflow.py | 22 ++++++++++++++++++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 8ed7707a..ce57c65f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -996,6 +996,7 @@ class Agent: self, task: Optional[Union[str, Any]] = None, img: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -1077,6 +1078,7 @@ class Agent: task=task_prompt, img=img, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -1084,6 +1086,7 @@ class Agent: response = self.call_llm( task=task_prompt, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -2470,6 +2473,7 @@ class Agent: task: str, img: Optional[str] = None, current_loop: int = 0, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> str: @@ -2480,6 +2484,7 @@ class Agent: task (str): The task to be performed by the `llm` object. img (str, optional): Path or URL to an image file. audio (str, optional): Path or URL to an audio file. + streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. @@ -2515,8 +2520,24 @@ class Agent: if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): + # Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration) + if streaming_callback is not None: + # Real-time callback streaming for dashboard integration + chunks = [] + for chunk in streaming_response: + if ( + hasattr(chunk, "choices") + and chunk.choices[0].delta.content + ): + content = chunk.choices[ + 0 + ].delta.content + chunks.append(content) + # Call the streaming callback with the new chunk + streaming_callback(content) + complete_response = "".join(chunks) # Check print_on parameter for different streaming behaviors - if self.print_on is False: + elif self.print_on is False: # Silent streaming - no printing, just collect chunks chunks = [] for chunk in streaming_response: @@ -2599,6 +2620,7 @@ class Agent: img: Optional[str] = None, imgs: Optional[List[str]] = None, correct_answer: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -2613,6 +2635,7 @@ class Agent: task (Optional[str], optional): The task to be executed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None. imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None. + streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method. @@ -2644,6 +2667,7 @@ class Agent: output = self._run( task=task, img=img, + streaming_callback=streaming_callback, *args, **kwargs, ) diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index b60399a2..57ac2c97 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -1,5 +1,6 @@ import concurrent.futures import os +import time from typing import Callable, List, Optional, Union from swarms.structs.agent import Agent @@ -450,8 +451,25 @@ class ConcurrentWorkflow(BaseSwarm): if self.show_dashboard: self.display_agent_dashboard() - # Run the agent - output = agent.run(task=task, img=img, imgs=imgs) + # Create a streaming callback for this agent with throttling + last_update_time = [0] # Use list to allow modification in nested function + update_interval = 0.1 # Update dashboard every 100ms for smooth streaming + + def streaming_callback(chunk: str): + """Update dashboard with streaming content""" + if self.show_dashboard: + # Append the chunk to the agent's current output + current_output = self.agent_statuses[agent.agent_name]["output"] + self.agent_statuses[agent.agent_name]["output"] = current_output + chunk + + # Throttle dashboard updates for better performance + current_time = time.time() + if current_time - last_update_time[0] >= update_interval: + self.display_agent_dashboard() + last_update_time[0] = current_time + + # Run the agent with streaming callback + output = agent.run(task=task, img=img, imgs=imgs, streaming_callback=streaming_callback) # Update status to completed self.agent_statuses[agent.agent_name][ From dd7ac00dcb32bcbc239a578c0163745b562f6883 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 24 Jul 2025 09:20:41 -0700 Subject: [PATCH 06/11] [CLEANUP][InteractiveGroupChat] --- .../client_example.py => client_example.py | 23 +- swarms/structs/concurrent_workflow.py | 2 +- swarms/structs/interactive_groupchat.py | 257 +++++++++--------- 3 files changed, 146 insertions(+), 136 deletions(-) rename examples/api/client_example.py => client_example.py (70%) diff --git a/examples/api/client_example.py b/client_example.py similarity index 70% rename from examples/api/client_example.py rename to client_example.py index f0880586..ef3aba22 100644 --- a/examples/api/client_example.py +++ b/client_example.py @@ -1,4 +1,3 @@ -import json import os from swarms_client import SwarmsClient from swarms_client.types import AgentSpecParam @@ -12,7 +11,7 @@ agent_spec = AgentSpecParam( agent_name="doctor_agent", description="A virtual doctor agent that provides evidence-based, safe, and empathetic medical advice for common health questions. Always reminds users to consult a healthcare professional for diagnoses or prescriptions.", task="What is the best medicine for a cold?", - model_name="claude-3-5-sonnet-20241022", + model_name="claude-4-sonnet-20250514", system_prompt=( "You are a highly knowledgeable, ethical, and empathetic virtual doctor. " "Always provide evidence-based, safe, and practical medical advice. " @@ -26,15 +25,15 @@ agent_spec = AgentSpecParam( role="doctor", ) -# response = client.agent.run( -# agent_config=agent_spec, -# task="What is the best medicine for a cold?", -# ) +response = client.agent.run( + agent_config=agent_spec, + task="What is the best medicine for a cold?", +) -# print(response) +print(response) -print(json.dumps(client.models.list_available(), indent=4)) -print(json.dumps(client.health.check(), indent=4)) -print(json.dumps(client.swarms.get_logs(), indent=4)) -print(json.dumps(client.client.rate.get_limits(), indent=4)) -print(json.dumps(client.swarms.check_available(), indent=4)) +# print(json.dumps(client.models.list_available(), indent=4)) +# print(json.dumps(client.health.check(), indent=4)) +# print(json.dumps(client.swarms.get_logs(), indent=4)) +# print(json.dumps(client.client.rate.get_limits(), indent=4)) +# print(json.dumps(client.swarms.check_available(), indent=4)) diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index b60399a2..7f0c0d65 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -138,7 +138,7 @@ class ConcurrentWorkflow(BaseSwarm): self, name: str = "ConcurrentWorkflow", description: str = "Execution of multiple agents concurrently", - agents: List[Union[Agent, Callable]] = [], + agents: List[Union[Agent, Callable]] = None, metadata_output_path: str = "agent_metadata.json", auto_save: bool = True, output_type: str = "dict-all-except-first", diff --git a/swarms/structs/interactive_groupchat.py b/swarms/structs/interactive_groupchat.py index 5613e9ff..8e88046a 100644 --- a/swarms/structs/interactive_groupchat.py +++ b/swarms/structs/interactive_groupchat.py @@ -1,6 +1,7 @@ -import re import random -from typing import Callable, List, Union, Optional +import re +import traceback +from typing import Callable, List, Optional, Union from loguru import logger @@ -183,60 +184,33 @@ speaker_functions = { class InteractiveGroupChat: """ - An interactive group chat system that enables conversations with multiple agents using @mentions. + InteractiveGroupChat enables collaborative conversations among multiple agents (or callables) + with flexible speaker selection strategies, conversation history, and interactive terminal sessions. - This class allows users to interact with multiple agents by mentioning them using @agent_name syntax. - When multiple agents are mentioned, they can see and respond to each other's tasks. + Features: + - Supports multiple agents (LLMs or callable functions) + - Customizable speaker selection (round robin, random, priority, dynamic, or custom) + - Maintains conversation history with time stamps + - Interactive REPL session for human-in-the-loop group chat + - Agents can @mention each other to request input or delegate tasks + - Automatic prompt augmentation to teach agents collaborative protocols - Attributes: + Args: + id (str): Unique identifier for the group chat (default: generated) name (str): Name of the group chat description (str): Description of the group chat's purpose - agents (List[Union[Agent, Callable]]): List of Agent instances or callable functions - max_loops (int): Maximum number of conversation turns - conversation (Conversation): Stores the chat history - agent_map (Dict[str, Union[Agent, Callable]]): Mapping of agent names to their instances - speaker_function (Callable): Function to determine speaking order - speaker_state (dict): State for speaker functions that need it - - Args: - name (str, optional): Name of the group chat. Defaults to "InteractiveGroupChat". - description (str, optional): Description of the chat. Defaults to "An interactive group chat for multiple agents". - agents (List[Union[Agent, Callable]], optional): List of participating agents or callables. Defaults to empty list. - max_loops (int, optional): Maximum conversation turns. Defaults to 1. - output_type (str, optional): Type of output format. Defaults to "string". - interactive (bool, optional): Whether to enable interactive terminal mode. Defaults to False. - speaker_function (Union[str, Callable], optional): Function to determine speaking order. Can be: - - A string name: "round-robin-speaker", "random-speaker", "priority-speaker", "random-dynamic-speaker" - - A custom callable function - - None (defaults to round_robin_speaker) - speaker_state (dict, optional): Initial state for speaker function. Defaults to empty dict. + agents (List[Union[Agent, Callable]]): List of agent objects or callables + max_loops (int): Maximum number of conversation loops per run + output_type (str): Output format for conversation history ("dict", "str", etc.) + interactive (bool): If True, enables interactive terminal session + speaker_function (Optional[Union[str, Callable]]): Speaker selection strategy + speaker_state (Optional[dict]): State/config for the speaker function Raises: - ValueError: If invalid initialization parameters are provided - InvalidSpeakerFunctionError: If the speaker function is invalid - - Examples: - # Initialize with string-based speaker function - group_chat = InteractiveGroupChat( - agents=[agent1, agent2, agent3], - speaker_function="random-speaker" - ) - - # Initialize with priority speaker function - group_chat = InteractiveGroupChat( - agents=[agent1, agent2, agent3], - speaker_function="priority-speaker", - speaker_state={"priorities": {"agent1": 3, "agent2": 2, "agent3": 1}} - ) - - # Initialize with dynamic speaker function (agents mention each other) - group_chat = InteractiveGroupChat( - agents=[agent1, agent2, agent3], - speaker_function="random-dynamic-speaker" - ) - - # Change speaker function during runtime - group_chat.set_speaker_function("round-robin-speaker") + ValueError: If required arguments are missing or invalid + InvalidSpeakerFunctionError: If an invalid speaker function is provided + InteractiveGroupChatError: For interactive session errors + AgentNotFoundError: If an agent is not found by name """ def __init__( @@ -246,11 +220,25 @@ class InteractiveGroupChat: description: str = "An interactive group chat for multiple agents", agents: List[Union[Agent, Callable]] = [], max_loops: int = 1, - output_type: str = "string", + output_type: str = "dict", interactive: bool = False, speaker_function: Optional[Union[str, Callable]] = None, speaker_state: Optional[dict] = None, ): + """ + Initialize the InteractiveGroupChat. + + Args: + id (str): Unique identifier for the group chat. + name (str): Name of the group chat. + description (str): Description of the group chat. + agents (List[Union[Agent, Callable]]): List of agent objects or callables. + max_loops (int): Maximum number of conversation loops per run. + output_type (str): Output format for conversation history. + interactive (bool): If True, enables interactive terminal session. + speaker_function (Optional[Union[str, Callable]]): Speaker selection strategy. + speaker_state (Optional[dict]): State/config for the speaker function. + """ self.id = id self.name = name self.description = description @@ -258,37 +246,49 @@ class InteractiveGroupChat: self.max_loops = max_loops self.output_type = output_type self.interactive = interactive + self.speaker_function = speaker_function + self.speaker_state = speaker_state + self.setup() + + def _setup_speaker_function(self): # Speaker function configuration - if speaker_function is None: + if self.speaker_function is None: self.speaker_function = round_robin_speaker - elif isinstance(speaker_function, str): - if speaker_function not in speaker_functions: + elif isinstance(self.speaker_function, str): + if self.speaker_function not in speaker_functions: available_functions = ", ".join( speaker_functions.keys() ) raise InvalidSpeakerFunctionError( - f"Invalid speaker function: '{speaker_function}'. " + f"Invalid speaker function: '{self.speaker_function}'. " f"Available functions: {available_functions}" ) self.speaker_function = speaker_functions[ - speaker_function + self.speaker_function ] - elif callable(speaker_function): - self.speaker_function = speaker_function + elif callable(self.speaker_function): + self.speaker_function = self.speaker_function else: raise InvalidSpeakerFunctionError( "Speaker function must be either a string, callable, or None" ) - self.speaker_state = speaker_state or {"current_index": 0} + self.speaker_state = self.speaker_state or { + "current_index": 0 + } - # Validate speaker function - self._validate_speaker_function() + def setup(self): + """ + Set up the group chat, including speaker function, conversation history, + agent mapping, and prompt augmentation. + """ # Initialize conversation history self.conversation = Conversation(time_enabled=True) + self._setup_speaker_function() + self.agent_map = create_agent_map(self.agents) self._validate_initialization() @@ -350,16 +350,6 @@ class InteractiveGroupChat: # Validate the speaker function self._validate_speaker_function() - def set_priorities(self, priorities: dict) -> None: - """ - Set agent priorities for priority-based speaking order. - - Args: - priorities: Dictionary mapping agent names to priority weights - """ - self.speaker_state["priorities"] = priorities - logger.info(f"Agent priorities set: {priorities}") - def get_available_speaker_functions(self) -> List[str]: """ Get a list of available speaker function names. @@ -519,32 +509,6 @@ class InteractiveGroupChat: "The session will continue. You can type 'exit' to end it." ) - def _validate_speaker_function(self) -> None: - """ - Validates the speaker function. - - Raises: - InvalidSpeakerFunctionError: If the speaker function is invalid - """ - if not callable(self.speaker_function): - raise InvalidSpeakerFunctionError( - "Speaker function must be callable" - ) - - # Test the speaker function with a dummy list - try: - test_result = self.speaker_function( - ["test_agent"], **self.speaker_state - ) - if not isinstance(test_result, str): - raise InvalidSpeakerFunctionError( - "Speaker function must return a string" - ) - except Exception as e: - raise InvalidSpeakerFunctionError( - f"Speaker function validation failed: {e}" - ) - def _validate_initialization(self) -> None: """ Validates the group chat configuration. @@ -561,7 +525,10 @@ class InteractiveGroupChat: raise ValueError("Max loops must be greater than 0") def _setup_conversation_context(self) -> None: - """Sets up the initial conversation context with group chat information.""" + """ + Sets up the initial conversation context with group chat information. + Adds a system message describing the group and its agents. + """ agent_info = [] for agent in self.agents: if isinstance(agent, Agent): @@ -581,7 +548,10 @@ class InteractiveGroupChat: self.conversation.add(role="System", content=context) def _update_agent_prompts(self) -> None: - """Updates each agent's system prompt with information about other agents and the group chat.""" + """ + Updates each agent's system prompt with information about other agents and the group chat. + This includes collaborative instructions and @mention usage guidelines. + """ agent_info = [] for agent in self.agents: if isinstance(agent, Agent): @@ -696,7 +666,7 @@ Remember: You are part of a team. Your response should reflect that you've read, List[str]: List of mentioned agent names or all agent names if no mentions Raises: - InvalidtaskFormatError: If the task format is invalid + InvalidTaskFormatError: If the task format is invalid """ try: # Find all @mentions using regex @@ -812,6 +782,14 @@ Remember: You are part of a team. Your response should reflect that you've read, ) -> None: """ Process responses using the dynamic speaker function. + + Args: + mentioned_agents (List[str]): List of agent names to consider for speaking. + img (Optional[str]): Optional image input for the agents. + imgs (Optional[List[str]]): Optional list of images for the agents. + + Returns: + None """ # Get strategy from speaker state (default to sequential) strategy = self.speaker_state.get("strategy", "sequential") @@ -882,6 +860,15 @@ Remember: You are part of a team. Your response should reflect that you've read, ) -> None: """ Process speakers sequentially. + + Args: + speakers (List[str]): List of agent names to process in order. + spoken_agents (set): Set of agent names that have already spoken. + img (Optional[str]): Optional image input for the agents. + imgs (Optional[List[str]]): Optional list of images for the agents. + + Returns: + None """ for next_speaker in speakers: if next_speaker in spoken_agents: @@ -903,6 +890,15 @@ Remember: You are part of a team. Your response should reflect that you've read, ) -> None: """ Process speakers in parallel. + + Args: + speakers (List[str]): List of agent names to process in parallel. + spoken_agents (set): Set of agent names that have already spoken. + img (Optional[str]): Optional image input for the agents. + imgs (Optional[List[str]]): Optional list of images for the agents. + + Returns: + None """ import concurrent.futures @@ -939,6 +935,14 @@ Remember: You are part of a team. Your response should reflect that you've read, ) -> None: """ Process responses using a static speaker function. + + Args: + mentioned_agents (List[str]): List of agent names to process. + img (Optional[str]): Optional image input for the agents. + imgs (Optional[List[str]]): Optional list of images for the agents. + + Returns: + None """ speaking_order = self._get_speaking_order(mentioned_agents) logger.info(f"Speaking order determined: {speaking_order}") @@ -956,6 +960,17 @@ Remember: You are part of a team. Your response should reflect that you've read, """ Process a task and get responses from agents. If no agents are mentioned, randomly selects agents to participate. + + Args: + task (str): The user input or task to process. + img (Optional[str]): Optional image input for the agents. + imgs (Optional[List[str]]): Optional list of images for the agents. + + Returns: + str: The formatted conversation history (format depends on output_type). + + Raises: + InteractiveGroupChatError: If an unexpected error occurs. """ try: # Extract mentioned agents (or all agents if none mentioned) @@ -982,11 +997,21 @@ Remember: You are part of a team. Your response should reflect that you've read, ) except Exception as e: - logger.error(f"Unexpected error: {e}") + logger.error( + f"InteractiveGroupChat: Unexpected error: {e} Traceback: {traceback.format_exc()}" + ) raise InteractiveGroupChatError( - f"Unexpected error occurred: {str(e)}" + f"InteractiveGroupChat: Unexpected error occurred: {str(e)} Traceback: {traceback.format_exc()}" ) + def __call__( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): + return self.run(task=task, img=img, imgs=imgs) + def _get_agent_response( self, agent_name: str, @@ -997,12 +1022,15 @@ Remember: You are part of a team. Your response should reflect that you've read, Get response from a specific agent. Args: - agent_name: Name of the agent to get response from - img: Optional image for the task - imgs: Optional list of images for the task + agent_name (str): Name of the agent to get response from. + img (Optional[str]): Optional image for the task. + imgs (Optional[List[str]]): Optional list of images for the task. Returns: - The agent's response or None if error + Optional[str]: The agent's response or None if error. + + Raises: + AgentNotFoundError: If the agent is not found. """ agent = self.agent_map.get(agent_name) if not agent: @@ -1063,20 +1091,3 @@ Remember: You are part of a collaborative team. Your response should demonstrate return f"Error: Unable to generate response - {str(e)}" return None - - def set_dynamic_strategy(self, strategy: str) -> None: - """ - Set the strategy for the random-dynamic-speaker function. - - Args: - strategy: Either "sequential" or "parallel" - - "sequential": Process one agent at a time based on @mentions - - "parallel": Process all mentioned agents simultaneously - """ - if strategy not in ["sequential", "parallel"]: - raise ValueError( - "Strategy must be either 'sequential' or 'parallel'" - ) - - self.speaker_state["strategy"] = strategy - logger.info(f"Dynamic speaker strategy set to: {strategy}") From ee4bc9e37e0393a89dc4405947b2d7c041c6fa37 Mon Sep 17 00:00:00 2001 From: harshalmore31 Date: Thu, 24 Jul 2025 22:11:31 +0530 Subject: [PATCH 07/11] updated with an example ! --- .../streaming_concurrent_workflow.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py diff --git a/examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py b/examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py new file mode 100644 index 00000000..c8dc9366 --- /dev/null +++ b/examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py @@ -0,0 +1,62 @@ +from swarms import Agent, ConcurrentWorkflow, SwarmRouter + +# Initialize market research agent +market_researcher = Agent( + agent_name="Market-Researcher", + system_prompt="""You are a market research specialist. Your tasks include: + 1. Analyzing market trends and patterns + 2. Identifying market opportunities and threats + 3. Evaluating competitor strategies + 4. Assessing customer needs and preferences + 5. Providing actionable market insights""", + model_name="claude-3-5-sonnet-20240620", + max_loops=1, + streaming_on=True, + print_on=False, +) + +# Initialize financial analyst agent +financial_analyst = Agent( + agent_name="Financial-Analyst", + system_prompt="""You are a financial analysis expert. Your responsibilities include: + 1. Analyzing financial statements + 2. Evaluating investment opportunities + 3. Assessing risk factors + 4. Providing financial forecasts + 5. Recommending financial strategies""", + model_name="claude-3-5-sonnet-20240620", + max_loops=1, + streaming_on=True, + print_on=False, +) + +# Initialize technical analyst agent +technical_analyst = Agent( + agent_name="Technical-Analyst", + system_prompt="""You are a technical analysis specialist. Your focus areas include: + 1. Analyzing price patterns and trends + 2. Evaluating technical indicators + 3. Identifying support and resistance levels + 4. Assessing market momentum + 5. Providing trading recommendations""", + model_name="claude-3-5-sonnet-20240620", + max_loops=1, + streaming_on=True, + print_on=False, +) + +# Create list of agents +agents = [market_researcher, financial_analyst, technical_analyst] + +# Initialize the concurrent workflow +workflow = ConcurrentWorkflow( + name="market-analysis-workflow", + agents=agents, + max_loops=1, + show_dashboard=True, +) + +# Run the workflow +result = workflow.run( + "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives" +) \ No newline at end of file From 254ded0b22a743c870cf45962f6ea51964db257d Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 24 Jul 2025 10:40:08 -0700 Subject: [PATCH 08/11] [ENHANCEMENT][MCP Expanded Support with Streamable HTTP] [Protocol Section of Docs] --- docs/mkdocs.yml | 4 + docs/protocol/bc.md | 1 + docs/protocol/overview.md | 416 +++++++++++++++++++ docs/protocol/sip.md | 159 ++++++++ examples/mcp/client.py | 26 ++ examples/mcp/test.py | 28 ++ pyproject.toml | 1 + requirements.txt | 1 + swarms/schemas/mcp_schemas.py | 7 +- swarms/tools/mcp_client_call.py | 548 ++++++++++++++++---------- swarms/utils/function_caller_model.py | 14 +- 11 files changed, 975 insertions(+), 230 deletions(-) create mode 100644 docs/protocol/bc.md create mode 100644 docs/protocol/overview.md create mode 100644 docs/protocol/sip.md create mode 100644 examples/mcp/client.py create mode 100644 examples/mcp/test.py diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 4027d032..c06f7daa 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -315,6 +315,10 @@ nav: - AgentRegistry: "swarms/structs/agent_registry.md" - Communication Structure: "swarms/structs/conversation.md" + + - Protocol: + - Overview: "swarms/protocol/overview.md" + - SIPs: "swarms/protocol/sip.md" - Tools: - Overview: "swarms_tools/overview.md" diff --git a/docs/protocol/bc.md b/docs/protocol/bc.md new file mode 100644 index 00000000..be6c0035 --- /dev/null +++ b/docs/protocol/bc.md @@ -0,0 +1 @@ +# Backwards Compatability \ No newline at end of file diff --git a/docs/protocol/overview.md b/docs/protocol/overview.md new file mode 100644 index 00000000..43cdec2a --- /dev/null +++ b/docs/protocol/overview.md @@ -0,0 +1,416 @@ +# Swarms Protocol Overview & Architecture + +This document provides a comprehensive overview of the Swarms protocol architecture, illustrating the flow from agent classes to multi-agent structures, and showcasing the main components and folders within the `swarms/` package. The Swarms framework is designed for extensibility, modularity, and production-readiness, enabling the orchestration of intelligent agents, tools, memory, and complex multi-agent systems. + +--- + +## Introduction + +Swarms is an enterprise-grade, production-ready multi-agent orchestration framework. It enables developers and organizations to build, deploy, and manage intelligent agents that can reason, collaborate, and solve complex tasks autonomously or in groups. The architecture is inspired by the principles of modularity, composability, and scalability, ensuring that each component can be extended or replaced as needed. + +The protocol is structured to support a wide range of use cases, from simple single-agent automations to sophisticated multi-agent workflows involving memory, tool use, and advanced reasoning. + +For a high-level introduction and installation instructions, see the [Swarms Docs Home](https://docs.swarms.world/en/latest/). + +--- + +## High-Level Architecture Flow + +The Swarms protocol is organized into several key layers, each responsible for a specific aspect of the system. The typical flow is as follows: + +1. **Agent Class (`swarms/agents`)** + + - The core building block of the framework. Agents encapsulate logic, state, and behavior. They can be simple (stateless) or complex + (stateful, with memory and reasoning capabilities). + + - Agents can be specialized for different tasks (e.g., reasoning agents, tool agents, judge agents, etc.). + + + - Example: A `ReasoningAgent` that can analyze data and make decisions, or a `ToolAgent` that wraps external APIs. + + + - [Quickstart for Agents](https://docs.swarms.world/en/latest/swarms/agents/) + + + - [Agent API Reference](https://docs.swarms.world/en/latest/swarms/structs/agent/) + + +2. **Tools with Memory (`swarms/tools`, `swarms/utils`)** + - Tools are modular components that agents use to interact with the outside world, perform computations, or access resources (APIs, + databases, files, etc.). + + - Memory modules and utility functions allow agents to retain context, cache results, and manage state across interactions. + + - Example: A tool for calling an LLM API, a memory cache for conversation history, or a utility for parsing and formatting data. + + - [Tools Overview](https://docs.swarms.world/en/latest/swarms_tools/overview/) + + - [BaseTool Reference](https://docs.swarms.world/en/latest/swarms/tools/base_tool/) + + +3. **Reasoning & Specialized Agents (`swarms/agents`)** + - These agents build on the base agent class, adding advanced reasoning, self-consistency, and specialized logic for tasks like + planning, evaluation, or multi-step workflows. + + - Includes agents for self-reflection, iterative improvement, and domain-specific expertise. + + - Example: A `SelfConsistencyAgent` that aggregates multiple reasoning paths, or a `JudgeAgent` that evaluates outputs from other + agents. + + - [Reasoning Agents Overview](https://docs.swarms.world/en/latest/swarms/agents/reasoning_agents_overview/) + + - [Self Consistency Agent](https://docs.swarms.world/en/latest/swarms/agents/consistency_agent/) + + - [Agent Judge](https://docs.swarms.world/en/latest/swarms/agents/agent_judge/) + + +4. **Multi-Agent Structures (`swarms/structs`)** + - Agents are composed into higher-order structures for collaboration, voting, parallelism, and workflow orchestration. + + - Includes swarms for majority voting, round-robin execution, hierarchical delegation, and more. + + - Example: A `MajorityVotingSwarm` that aggregates outputs from several agents, or a `HierarchicalSwarm` that delegates tasks to + sub-agents. + + - [Multi-Agent Architectures Overview](https://docs.swarms.world/en/latest/swarms/concept/swarm_architectures/) + + - [MajorityVotingSwarm](https://docs.swarms.world/en/latest/swarms/structs/majorityvoting/) + + - [HierarchicalSwarm](https://docs.swarms.world/en/latest/swarms/structs/hierarchical_swarm/) + + - [Sequential Workflow](https://docs.swarms.world/en/latest/swarms/structs/sequential_workflow/) + + - [Concurrent Workflow](https://docs.swarms.world/en/latest/swarms/structs/concurrentworkflow/) + + +5. **Supporting Components** + + - **Communication (`swarms/communication`)**: Provides wrappers for inter-agent communication, database access, message passing, and + integration with external systems (e.g., Redis, DuckDB, Pulsar). See [Communication Structure](https://docs.swarms.world/en/latest/swarms/structs/conversation/) + + - **Artifacts (`swarms/artifacts`)**: Manages the creation, storage, and retrieval of artifacts (outputs, files, logs) generated by + agents and swarms. + + - **Prompts (`swarms/prompts`)**: Houses prompt templates, system prompts, and agent-specific prompts for LLM-based agents. See + [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/) + + - **Telemetry (`swarms/telemetry`)**: Handles logging, monitoring, and bootup routines for observability and debugging. + + - **Schemas (`swarms/schemas`)**: Defines data schemas for agents, tools, completions, and communication protocols, ensuring type + safety and consistency. + + - **CLI (`swarms/cli`)**: Provides command-line utilities for agent creation, management, and orchestration. See [CLI Documentation] + (https://docs.swarms.world/en/latest/swarms/cli/main/) + +--- + +## Detailed Architecture Diagram + +The following Mermaid diagram visualizes the protocol flow and the relationship between the main folders in the `swarms/` package: + +```mermaid +flowchart TD + A["Agent Class
(swarms/agents)"] --> B["Tools with Memory
(swarms/tools, swarms/utils)"] + B --> C["Reasoning & Specialized Agents
(swarms/agents)"] + C --> D["Multi-Agent Structures
(swarms/structs)"] + D --> E["Communication, Artifacts, Prompts, Telemetry, Schemas, CLI"] + + subgraph Folders + A1["agents"] + A2["tools"] + A3["structs"] + A4["utils"] + A5["telemetry"] + A6["schemas"] + A7["prompts"] + A8["artifacts"] + A9["communication"] + A10["cli"] + end + + %% Folder showcase + subgraph "swarms/" + A1 + A2 + A3 + A4 + A5 + A6 + A7 + A8 + A9 + A10 + end + + %% Connect folder showcase to main flow + A1 -.-> A + A2 -.-> B + A3 -.-> D + A4 -.-> B + A5 -.-> E + A6 -.-> E + A7 -.-> E + A8 -.-> E + A9 -.-> E + A10 -.-> E +``` + +--- + +## Folder-by-Folder Breakdown + +### `agents/` + +**Purpose:** Defines all agent classes, including base agents, reasoning agents, tool agents, judge agents, and more. + +**Highlights:** + +- Modular agent design for extensibility. + +- Support for YAML-based agent creation and configuration. See [YAML Agent Creation](https://docs.swarms.world/en/latest/swarms/ +agents/create_agents_yaml/) + +- Specialized agents for self-consistency, evaluation, and domain-specific tasks. + +- **Example:** + +- `ReasoningAgent`, `ToolAgent`, `JudgeAgent`, `ConsistencyAgent`, `OpenAIAssistant`, etc. + +- [Agents Overview](https://docs.swarms.world/en/latest/swarms/framework/agents_explained/) + + +### `tools/` + +**Purpose:** Houses all tool-related logic, including tool registry, function calling, tool schemas, and integration with external +APIs. + +**Highlights:** + +- Tools can be dynamically registered and called by agents. + +- Support for OpenAI function calling, Cohere, and custom tool schemas. + +- Utilities for parsing, formatting, and executing tool calls. + +- **Example:** + +- `base_tool.py`, `tool_registry.py`, `mcp_client_call.py`, `func_calling_utils.py`, etc. + +- [Tools Reference](https://docs.swarms.world/en/latest/swarms/tools/tools_examples/) + +- [What are tools?](https://docs.swarms.world/en/latest/swarms/tools/build_tool/) + + +### `structs/` +**Purpose:** Implements multi-agent structures, workflows, routers, registries, and orchestration logic. + +**Highlights:** + +- Swarms for majority voting, round-robin, hierarchical delegation, spreadsheet processing, and more. + +- Workflow orchestration (sequential, concurrent, graph-based). + +- Utilities for agent matching, rearrangement, and evaluation. + +- **Example:** + +- `MajorityVotingSwarm`, `HierarchicalSwarm`, `SwarmRouter`, `SequentialWorkflow`, `ConcurrentWorkflow`, etc. + +- [Custom Multi Agent Architectures](https://docs.swarms.world/en/latest/swarms/structs/custom_swarm/) + +- [SwarmRouter](https://docs.swarms.world/en/latest/swarms/structs/swarm_router/) + +- [AgentRearrange](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/) + + +### `utils/` + +**Purpose:** Provides utility functions, memory management, caching, wrappers, and helpers used throughout the framework. + +**Highlights:** + +- Memory and caching for agents and tools. See [Integrating RAG with Agents](https://docs.swarms.world/en/latest/swarms/memory/ +diy_memory/) + +- Wrappers for concurrency, logging, and data processing. + +- General-purpose utilities for string, file, and data manipulation. + +**Example:** + +- `agent_cache.py`, `concurrent_wrapper.py`, `file_processing.py`, `formatter.py`, etc. + + +### `telemetry/` + +**Purpose:** Handles telemetry, logging, monitoring, and bootup routines for the framework. + +**Highlights:** + +- Centralized logging and execution tracking. + +- Bootup routines for initializing the framework. + +- Utilities for monitoring agent and swarm performance. + +- **Example:** + +- `bootup.py`, `log_executions.py`, `main.py`. + + +### `schemas/` + +**Purpose:** Defines data schemas for agents, tools, completions, and communication protocols. + +**Highlights:** + +- Ensures type safety and consistency across the framework. + +- Pydantic-based schemas for validation and serialization. + +- Schemas for agent classes, tool calls, completions, and more. + +**Example:** + +- `agent_class_schema.py`, `tool_schema_base_model.py`, `agent_completion_response.py`, etc. + + +### `prompts/` + +**Purpose:** Contains prompt templates, system prompts, and agent-specific prompts for LLM-based agents. + +**Highlights:** + +- Modular prompt design for easy customization. + +- Support for multi-modal, collaborative, and domain-specific prompts. + +- Templates for system, task, and conversational prompts. + +**Example:** + +- `prompt.py`, `reasoning_prompt.py`, `multi_agent_collab_prompt.py`, etc. + +- [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/) + + +### `artifacts/` + +**Purpose:** Manages the creation, storage, and retrieval of artifacts (outputs, files, logs) generated by agents and swarms. + +**Highlights:** + +- Artifact management for reproducibility and traceability. +- Support for various output types and formats. + +**Example:** + +- `main_artifact.py`. + + +### `communication/` + +**Purpose:** Provides wrappers for inter-agent communication, database access, message passing, and integration with external systems. + +**Highlights:** + +- Support for Redis, DuckDB, Pulsar, Supabase, and more. +- Abstractions for message passing and data exchange between agents. + +**Example:** + +- `redis_wrap.py`, `duckdb_wrap.py`, `base_communication.py`, etc. + +- [Communication Structure](https://docs.swarms.world/en/latest/swarms/structs/conversation/) + + +### `cli/` + +**Purpose:** Command-line utilities for agent creation, management, and orchestration. + +**Highlights:** + +- Scripts for onboarding, agent creation, and management. + +- CLI entry points for interacting with the framework. + +**Example:** + +- `main.py`, `create_agent.py`, `onboarding_process.py`. + +- [CLI Documentation](https://docs.swarms.world/en/latest/swarms/cli/main/) + + +--- + +## How the System Works Together + +The Swarms protocol is designed for composability. Agents can be created and configured independently, then composed into larger structures (swarms) for collaborative or competitive workflows. Tools and memory modules are injected into agents as needed, enabling them to perform complex tasks and retain context. Multi-agent structures orchestrate the flow of information and decision-making, while supporting components (communication, telemetry, artifacts, etc.) ensure robustness, observability, and extensibility. + +For example, a typical workflow might involve: + +- Creating a set of specialized agents (e.g., data analyst, summarizer, judge). + +- Registering tools (e.g., LLM API, database access, web search) and memory modules. + +- Composing agents into a `MajorityVotingSwarm` for collaborative decision-making. + +- Using communication wrappers to exchange data between agents and external systems. + +- Logging all actions and outputs for traceability and debugging. + + +For more advanced examples, see the [Examples Overview](https://docs.swarms.world/en/latest/examples/index/). + +--- + +## Swarms Framework Philosophy + +Swarms is built on the following principles: + +- **Modularity:** Every component (agent, tool, prompt, schema) is a module that can be extended or replaced. + +- **Composability:** Agents and tools can be composed into larger structures for complex workflows. + +- **Observability:** Telemetry and artifact management ensure that all actions are traceable and debuggable. + +- **Extensibility:** New agents, tools, and workflows can be added with minimal friction. + +- **Production-Readiness:** The framework is designed for reliability, scalability, and real-world deployment. + + +For more on the philosophy and architecture, see [Development Philosophy & Principles](https://docs.swarms.world/en/latest/swarms/concept/philosophy/) and [Understanding Swarms Architecture](https://docs.swarms.world/en/latest/swarms/concept/framework_architecture/). + +--- + +## Further Reading & References + +- [Swarms Docs Home](https://docs.swarms.world/en/latest/) + +- [Quickstart for Agents](https://docs.swarms.world/en/latest/swarms/agents/) + +- [Agent API Reference](https://docs.swarms.world/en/latest/swarms/structs/agent/) + +- [Tools Overview](https://docs.swarms.world/en/latest/swarms_tools/overview/) + +- [BaseTool Reference](https://docs.swarms.world/en/latest/swarms/tools/base_tool/) + +- [Reasoning Agents Overview](https://docs.swarms.world/en/latest/swarms/agents/reasoning_agents_overview/) + +- [Multi-Agent Architectures Overview](https://docs.swarms.world/en/latest/swarms/concept/swarm_architectures/) + +- [Examples Overview](https://docs.swarms.world/en/latest/examples/index/) + +- [CLI Documentation](https://docs.swarms.world/en/latest/swarms/cli/main/) + +- [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/) + +- [Development Philosophy & Principles](https://docs.swarms.world/en/latest/swarms/concept/philosophy/) + +- [Understanding Swarms Architecture](https://docs.swarms.world/en/latest/swarms/concept/framework_architecture/) + + +# Conclusion + +The Swarms protocol provides a robust foundation for building intelligent, collaborative, and autonomous systems. By organizing the codebase into clear, modular folders and defining a logical flow from agents to multi-agent structures, Swarms enables rapid development and deployment of advanced AI solutions. Whether you are building a simple automation or a complex multi-agent application, the Swarms architecture provides the tools and abstractions you need to succeed. + diff --git a/docs/protocol/sip.md b/docs/protocol/sip.md new file mode 100644 index 00000000..312d79bd --- /dev/null +++ b/docs/protocol/sip.md @@ -0,0 +1,159 @@ +# Swarms Improvement Proposal (SIP) Guidelines + +A simplified process for proposing new functionality and enhancements to the Swarms framework. + +## What is a SIP? + +A **Swarms Improvement Proposal (SIP)** is a design document that describes a new feature, enhancement, or change to the Swarms framework. SIPs serve as the primary mechanism for proposing significant changes, collecting community feedback, and documenting design decisions. + +The SIP author is responsible for building consensus within the community and documenting the proposal clearly and concisely. + +## When to Submit a SIP + +Consider submitting a SIP for: + +- **New Agent Types or Behaviors**: Adding new agent architectures, swarm patterns, or coordination mechanisms +- **Core Framework Changes**: Modifications to the Swarms API, core classes, or fundamental behaviors +- **New Integrations**: Adding support for new LLM providers, tools, or external services +- **Breaking Changes**: Any change that affects backward compatibility +- **Complex Features**: Multi-component features that require community discussion and design review + +For simple bug fixes, minor enhancements, or straightforward additions, use regular GitHub issues and pull requests instead. + +## SIP Types + +**Standard SIP**: Describes a new feature or change to the Swarms framework +**Process SIP**: Describes changes to development processes, governance, or community guidelines +**Informational SIP**: Provides information or guidelines to the community without proposing changes + +## Submitting a SIP + +1. **Discuss First**: Post your idea in [GitHub Discussions](https://github.com/kyegomez/swarms/discussions) to gauge community interest +2. **Create Issue**: Submit your SIP as a GitHub Issue with the `SIP` and `proposal` labels +3. **Follow Format**: Use the SIP template format below +4. **Engage Community**: Respond to feedback and iterate on your proposal + +## SIP Format + +### Required Sections + +#### **SIP Header** +``` +Title: [Descriptive title] +Author: [Your name and contact] +Type: [Standard/Process/Informational] +Status: Proposal +Created: [Date] +``` + +#### **Abstract** (200 words max) +A brief summary of what you're proposing and why. + +#### **Motivation** +- What problem does this solve? +- Why can't the current framework handle this? +- What are the benefits to the Swarms ecosystem? + +#### **Specification** +- Detailed technical description +- API changes or new interfaces +- Code examples showing usage +- Integration points with existing framework + +#### **Implementation Plan** +- High-level implementation approach +- Breaking changes (if any) +- Migration path for existing users +- Testing strategy + +#### **Alternatives Considered** +- Other approaches you evaluated +- Why you chose this solution +- Trade-offs and limitations + +### Optional Sections + +#### **Reference Implementation** +Link to prototype code or proof-of-concept (can be added later) + +#### **Security Considerations** +Any security implications or requirements + +## SIP Workflow + +``` +Proposal → Draft → Review → Accepted/Rejected → Final +``` + +1. **Proposal**: Initial submission as GitHub Issue +2. **Draft**: Maintainer assigns SIP number and `draft` label +3. **Review**: Community and maintainer review period +4. **Decision**: Accepted, rejected, or needs revision +5. **Final**: Implementation completed and merged + +## SIP Status + +- **Proposal**: Newly submitted, awaiting initial review +- **Draft**: Under active discussion and refinement +- **Review**: Formal review by maintainers +- **Accepted**: Approved for implementation +- **Rejected**: Not accepted (with reasons) +- **Final**: Implementation completed and merged +- **Withdrawn**: Author withdrew the proposal + +## Review Process + +- SIPs are reviewed during regular maintainer meetings +- Community feedback is collected via GitHub comments +- Acceptance requires: + - Clear benefit to the Swarms ecosystem + - Technical feasibility + - Community support + - Working prototype (for complex features) + +## Getting Help + +- **Discussions**: Use [GitHub Discussions](https://github.com/kyegomez/swarms/discussions) for questions +- **Documentation**: Check [docs.swarms.world](https://docs.swarms.world) for framework details +- **Examples**: Look at existing SIPs for reference + +## SIP Template + +When creating your SIP, copy this template: + +```markdown +# SIP-XXX: [Title] + +**Author**: [Your name] <[email]> +**Type**: Standard +**Status**: Proposal +**Created**: [Date] + +## Abstract + +[Brief 200-word summary] + +## Motivation + +[Why is this needed? What problem does it solve?] + +## Specification + +[Detailed technical description with code examples] + +## Implementation Plan + +[How will this be built? Any breaking changes?] + +## Alternatives Considered + +[Other approaches and why you chose this one] + +## Reference Implementation + +[Link to prototype code if available] +``` + +--- + +**Note**: This process is designed to be lightweight while ensuring important changes get proper community review. For questions about whether your idea needs a SIP, start a discussion in the GitHub Discussions forum. \ No newline at end of file diff --git a/examples/mcp/client.py b/examples/mcp/client.py new file mode 100644 index 00000000..cf256edd --- /dev/null +++ b/examples/mcp/client.py @@ -0,0 +1,26 @@ +from swarms.tools.mcp_client_call import ( + execute_tool_call_simple, + get_mcp_tools_sync, +) + + +async def main(): + # Prepare the tool call in OpenAI-compatible format + response = { + "function": {"name": "greet", "arguments": {"name": "Alice"}} + } + result = await execute_tool_call_simple( + server_path="http://localhost:8000/mcp", + response=response, + # transport="streamable_http", + ) + print("Tool call result:", result) + return result + + +if __name__ == "__main__": + print(get_mcp_tools_sync(server_path="http://localhost:8000/mcp")) + + import asyncio + + asyncio.run(main()) diff --git a/examples/mcp/test.py b/examples/mcp/test.py new file mode 100644 index 00000000..44eb2a69 --- /dev/null +++ b/examples/mcp/test.py @@ -0,0 +1,28 @@ +""" +Run from the repository root: + uv run examples/snippets/servers/streamable_config.py +""" + +from mcp.server.fastmcp import FastMCP + +# Stateful server (maintains session state) +mcp = FastMCP("StatefulServer", json_response=True) + +# Other configuration options: +# Stateless server (no session persistence) +# mcp = FastMCP("StatelessServer", stateless_http=True) + +# Stateless server (no session persistence, no sse stream with supported client) +# mcp = FastMCP("StatelessServer", stateless_http=True, json_response=True) + + +# Add a simple tool to demonstrate the server +@mcp.tool() +def greet(name: str = "World") -> str: + """Greet someone by name.""" + return f"Hello, {name}!" + + +# Run server with streamable_http transport +if __name__ == "__main__": + mcp.run(transport="streamable-http") diff --git a/pyproject.toml b/pyproject.toml index 6291af13..43ca6488 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ litellm = "*" torch = "*" httpx = "*" mcp = "*" +openai = "*" aiohttp = "*" [tool.poetry.scripts] diff --git a/requirements.txt b/requirements.txt index 2f512d55..4f7ae7f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,4 @@ httpx aiohttp mcp numpy +openai \ No newline at end of file diff --git a/swarms/schemas/mcp_schemas.py b/swarms/schemas/mcp_schemas.py index 196ebd24..624d2416 100644 --- a/swarms/schemas/mcp_schemas.py +++ b/swarms/schemas/mcp_schemas.py @@ -8,7 +8,7 @@ class MCPConnection(BaseModel): description="The type of connection, defaults to 'mcp'", ) url: Optional[str] = Field( - default="localhost:8000/sse", + default="http://localhost:8000/mcp", description="The URL endpoint for the MCP server", ) tool_configurations: Optional[Dict[Any, Any]] = Field( @@ -20,18 +20,19 @@ class MCPConnection(BaseModel): description="Authentication token for accessing the MCP server", ) transport: Optional[str] = Field( - default="sse", + default="streamable_http", description="The transport protocol to use for the MCP server", ) headers: Optional[Dict[str, str]] = Field( default=None, description="Headers to send to the MCP server" ) timeout: Optional[int] = Field( - default=5, description="Timeout for the MCP server" + default=10, description="Timeout for the MCP server" ) class Config: arbitrary_types_allowed = True + extra = "allow" class MultipleMCPConnections(BaseModel): diff --git a/swarms/tools/mcp_client_call.py b/swarms/tools/mcp_client_call.py index 3fa3a9fa..9409b736 100644 --- a/swarms/tools/mcp_client_call.py +++ b/swarms/tools/mcp_client_call.py @@ -1,16 +1,24 @@ -import os import asyncio import contextlib import json +import os import random +from concurrent.futures import ThreadPoolExecutor, as_completed from functools import wraps from typing import Any, Dict, List, Literal, Optional, Union -from concurrent.futures import ThreadPoolExecutor, as_completed from litellm.types.utils import ChatCompletionMessageToolCall from loguru import logger from mcp import ClientSession from mcp.client.sse import sse_client + +try: + from mcp.client.streamable_http import streamablehttp_client +except ImportError: + logger.error( + "streamablehttp_client is not available. Please ensure the MCP SDK is up to date with pip3 install -U mcp" + ) + from mcp.types import ( CallToolRequestParams as MCPCallToolRequestParams, ) @@ -25,6 +33,7 @@ from swarms.schemas.mcp_schemas import ( MCPConnection, ) from swarms.utils.index import exists +from urllib.parse import urlparse class MCPError(Exception): @@ -63,7 +72,16 @@ class MCPExecutionError(MCPError): def transform_mcp_tool_to_openai_tool( mcp_tool: MCPTool, ) -> ChatCompletionToolParam: - """Convert an MCP tool to an OpenAI tool.""" + """ + Convert an MCP tool to an OpenAI tool. + Args: + mcp_tool (MCPTool): The MCP tool object. + Returns: + ChatCompletionToolParam: The OpenAI-compatible tool parameter. + """ + logger.info( + f"Transforming MCP tool '{mcp_tool.name}' to OpenAI tool format." + ) return ChatCompletionToolParam( type="function", function=FunctionDefinition( @@ -79,15 +97,14 @@ async def load_mcp_tools( session: ClientSession, format: Literal["mcp", "openai"] = "mcp" ) -> Union[List[MCPTool], List[ChatCompletionToolParam]]: """ - Load all available MCP tools - + Load all available MCP tools from the session. Args: - session: The MCP session to use - format: The format to convert the tools to - By default, the tools are returned in MCP format. - - If format is set to "openai", the tools are converted to OpenAI API compatible tools. + session (ClientSession): The MCP session to use. + format (Literal["mcp", "openai"]): The format to convert the tools to. + Returns: + List of tools in the specified format. """ + logger.info(f"Loading MCP tools with format '{format}'.") tools = await session.list_tools() if format == "openai": return [ @@ -106,16 +123,28 @@ async def call_mcp_tool( session: ClientSession, call_tool_request_params: MCPCallToolRequestParams, ) -> MCPCallToolResult: - """Call an MCP tool.""" - tool_result = await session.call_tool( + """ + Call an MCP tool using the provided session and request parameters. + Args: + session (ClientSession): The MCP session to use. + call_tool_request_params (MCPCallToolRequestParams): The tool call request params. + Returns: + MCPCallToolResult: The result of the tool call. + """ + return await session.call_tool( name=call_tool_request_params.name, arguments=call_tool_request_params.arguments, ) - return tool_result def _get_function_arguments(function: FunctionDefinition) -> dict: - """Helper to safely get and parse function arguments.""" + """ + Helper to safely get and parse function arguments from a function definition. + Args: + function (FunctionDefinition): The function definition. + Returns: + dict: Parsed arguments as a dictionary. + """ arguments = function.get("arguments", {}) if isinstance(arguments, str): try: @@ -128,7 +157,13 @@ def _get_function_arguments(function: FunctionDefinition) -> dict: def transform_openai_tool_call_request_to_mcp_tool_call_request( openai_tool: Union[ChatCompletionMessageToolCall, Dict], ) -> MCPCallToolRequestParams: - """Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams.""" + """ + Convert an OpenAI ChatCompletionMessageToolCall to an MCP CallToolRequestParams. + Args: + openai_tool (Union[ChatCompletionMessageToolCall, Dict]): The OpenAI tool call request. + Returns: + MCPCallToolRequestParams: The MCP tool call request params. + """ function = openai_tool["function"] return MCPCallToolRequestParams( name=function["name"], @@ -142,12 +177,11 @@ async def call_openai_tool( ) -> MCPCallToolResult: """ Call an OpenAI tool using MCP client. - Args: - session: The MCP session to use - openai_tool: The OpenAI tool to call. You can get this from the `choices[0].message.tool_calls[0]` of the response from the OpenAI API. + session (ClientSession): The MCP session to use. + openai_tool (dict): The OpenAI tool to call. Returns: - The result of the MCP tool call. + MCPCallToolResult: The result of the MCP tool call. """ mcp_tool_call_request_params = ( transform_openai_tool_call_request_to_mcp_tool_call_request( @@ -161,7 +195,14 @@ async def call_openai_tool( def retry_with_backoff(retries=3, backoff_in_seconds=1): - """Decorator for retrying functions with exponential backoff.""" + """ + Decorator for retrying async functions with exponential backoff. + Args: + retries (int): Number of retry attempts. + backoff_in_seconds (int): Initial backoff time in seconds. + Returns: + Decorated async function with retry logic. + """ def decorator(func): @wraps(func) @@ -193,13 +234,17 @@ def retry_with_backoff(retries=3, backoff_in_seconds=1): @contextlib.contextmanager def get_or_create_event_loop(): - """Context manager to handle event loop creation and cleanup.""" + """ + Context manager to handle event loop creation and cleanup. + Yields: + asyncio.AbstractEventLoop: The event loop to use. + Ensures the event loop is properly closed if created here. + """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - try: yield loop finally: @@ -210,27 +255,28 @@ def get_or_create_event_loop(): def connect_to_mcp_server(connection: MCPConnection = None): - """Connect to an MCP server. - + """ + Connect to an MCP server using the provided connection configuration. Args: - connection (MCPConnection): The connection configuration object - + connection (MCPConnection): The connection configuration object. Returns: - tuple: A tuple containing (headers, timeout, transport, url) - + tuple: (headers, timeout, transport, url) Raises: - MCPValidationError: If the connection object is invalid + MCPValidationError: If the connection object is invalid. """ + logger.info( + "Connecting to MCP server using MCPConnection object." + ) if not isinstance(connection, MCPConnection): + logger.error( + "Invalid connection type provided to connect_to_mcp_server." + ) raise MCPValidationError("Invalid connection type") - - # Direct attribute access is faster than property access headers = dict(connection.headers or {}) if connection.authorization_token: headers["Authorization"] = ( f"Bearer {connection.authorization_token}" ) - return ( headers, connection.timeout or 5, @@ -239,31 +285,104 @@ def connect_to_mcp_server(connection: MCPConnection = None): ) +def get_mcp_client(transport, url, headers=None, timeout=5, **kwargs): + """ + Helper to select the correct MCP client context manager based on transport. + Supports 'sse' (default) and 'streamable_http'. + Args: + transport (str): The transport type ('sse' or 'streamable_http'). + url (str): The server URL. + headers (dict): Optional headers. + timeout (int): Timeout in seconds. + **kwargs: Additional arguments. + Returns: + Context manager for the selected client. + Raises: + ImportError: If streamablehttp_client is not available when requested. + """ + logger.info( + f"Getting MCP client for transport '{transport}' and url '{url}'." + ) + if transport == "streamable_http": + if streamablehttp_client is None: + logger.error("streamablehttp_client is not available.") + raise ImportError( + "streamablehttp_client is not available. Please ensure the MCP SDK is up to date." + ) + return streamablehttp_client( + url, headers=headers, timeout=timeout, **kwargs + ) + else: + return sse_client( + url, headers=headers, timeout=timeout, **kwargs + ) + + +def auto_detect_transport(url: str) -> str: + """ + Guess the MCP transport based on the URL scheme and path. + Does not make any network requests. + Returns one of: 'streamable_http', 'sse', or 'stdio'. + Args: + url (str): The server URL. + Returns: + str: The detected transport type. + """ + parsed = urlparse(url) + scheme = parsed.scheme.lower() + if scheme in ("http", "https"): + logger.info( + f"Automatically selected 'streamable_http' transport for {url}" + ) + return "streamable_http" + elif scheme in ("ws", "wss"): + logger.info( + f"Automatically selected 'sse' transport for {url}" + ) + return "sse" # or 'websocket' if you support it + elif "stdio" in url or scheme == "": + logger.info( + f"Automatically selected 'stdio' transport for {url}" + ) + return "stdio" + else: + logger.info(f"Defaulting to 'sse' transport for {url}") + return "sse" + + @retry_with_backoff(retries=3) async def aget_mcp_tools( server_path: Optional[str] = None, format: str = "openai", connection: Optional[MCPConnection] = None, + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: """ Fetch available MCP tools from the server with retry logic. - Args: - server_path (str): Path to the MCP server script - + server_path (str): Path to the MCP server script. + format (str): Format to return tools in ('openai' or 'mcp'). + connection (Optional[MCPConnection]): Optional connection object. + transport (Optional[str]): Transport type. If None, auto-detects. Returns: - List[Dict[str, Any]]: List of available MCP tools in OpenAI format - + List[Dict[str, Any]]: List of available MCP tools in OpenAI format. Raises: - MCPValidationError: If server_path is invalid - MCPConnectionError: If connection to server fails + MCPValidationError: If server_path is invalid. + MCPConnectionError: If connection to server fails. """ + logger.info( + f"aget_mcp_tools called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) if exists(connection): - headers, timeout, transport, url = connect_to_mcp_server( - connection + headers, timeout, transport_from_conn, url = ( + connect_to_mcp_server(connection) ) + if transport_from_conn: + transport = transport_from_conn else: headers, timeout, _transport, _url = ( None, @@ -271,20 +390,23 @@ async def aget_mcp_tools( None, server_path, ) - - logger.info(f"Fetching MCP tools from server: {server_path}") - + url = server_path + logger.info( + f"Fetching MCP tools from server: {server_path} using transport: {transport}" + ) try: - async with sse_client( - url=server_path, + async with get_mcp_client( + transport, + url=url, headers=headers, timeout=timeout, *args, **kwargs, - ) as ( - read, - write, - ): + ) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx async with ClientSession(read, write) as session: await session.initialize() tools = await load_mcp_tools( @@ -305,23 +427,29 @@ def get_mcp_tools_sync( server_path: Optional[str] = None, format: str = "openai", connection: Optional[MCPConnection] = None, + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: """ Synchronous version of get_mcp_tools that handles event loop management. - Args: - server_path (str): Path to the MCP server script - + server_path (str): Path to the MCP server script. + format (str): Format to return tools in ('openai' or 'mcp'). + connection (Optional[MCPConnection]): Optional connection object. + transport (Optional[str]): Transport type. If None, auto-detects. Returns: - List[Dict[str, Any]]: List of available MCP tools in OpenAI format - + List[Dict[str, Any]]: List of available MCP tools in OpenAI format. Raises: - MCPValidationError: If server_path is invalid - MCPConnectionError: If connection to server fails - MCPExecutionError: If event loop management fails + MCPValidationError: If server_path is invalid. + MCPConnectionError: If connection to server fails. + MCPExecutionError: If event loop management fails. """ + logger.info( + f"get_mcp_tools_sync called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) with get_or_create_event_loop() as loop: try: return loop.run_until_complete( @@ -329,6 +457,7 @@ def get_mcp_tools_sync( server_path=server_path, format=format, connection=connection, + transport=transport, *args, **kwargs, ) @@ -344,12 +473,26 @@ def _fetch_tools_for_server( url: str, connection: Optional[MCPConnection] = None, format: str = "openai", + transport: Optional[str] = None, ) -> List[Dict[str, Any]]: - """Helper function to fetch tools for a single server.""" + """ + Helper function to fetch tools for a single server. + Args: + url (str): The server URL. + connection (Optional[MCPConnection]): Optional connection object. + format (str): Format to return tools in. + transport (Optional[str]): Transport type. If None, auto-detects. + Returns: + List[Dict[str, Any]]: List of available MCP tools. + """ + logger.info(f"_fetch_tools_for_server called for url: {url}") + if transport is None: + transport = auto_detect_transport(url) return get_mcp_tools_sync( server_path=url, connection=connection, format=format, + transport=transport, ) @@ -359,19 +502,23 @@ def get_tools_for_multiple_mcp_servers( format: str = "openai", output_type: Literal["json", "dict", "str"] = "str", max_workers: Optional[int] = None, + transport: Optional[str] = None, ) -> List[Dict[str, Any]]: - """Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. - + """ + Get tools for multiple MCP servers concurrently using ThreadPoolExecutor. Args: - urls: List of server URLs to fetch tools from - connections: Optional list of MCPConnection objects corresponding to each URL - format: Format to return tools in (default: "openai") - output_type: Type of output format (default: "str") - max_workers: Maximum number of worker threads (default: None, uses min(32, os.cpu_count() + 4)) - + urls (List[str]): List of server URLs to fetch tools from. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + format (str): Format to return tools in. + output_type (Literal): Output format type. + max_workers (Optional[int]): Max worker threads. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - List[Dict[str, Any]]: Combined list of tools from all servers + List[Dict[str, Any]]: Combined list of tools from all servers. """ + logger.info( + f"get_tools_for_multiple_mcp_servers called for {len(urls)} urls." + ) tools = [] ( min(32, os.cpu_count() + 4) @@ -380,23 +527,27 @@ def get_tools_for_multiple_mcp_servers( ) with ThreadPoolExecutor(max_workers=max_workers) as executor: if exists(connections): - # Create future tasks for each URL-connection pair future_to_url = { executor.submit( - _fetch_tools_for_server, url, connection, format + _fetch_tools_for_server, + url, + connection, + format, + transport, ): url for url, connection in zip(urls, connections) } else: - # Create future tasks for each URL without connections future_to_url = { executor.submit( - _fetch_tools_for_server, url, None, format + _fetch_tools_for_server, + url, + None, + format, + transport, ): url for url in urls } - - # Process completed futures as they come in for future in as_completed(future_to_url): url = future_to_url[future] try: @@ -409,7 +560,6 @@ def get_tools_for_multiple_mcp_servers( raise MCPExecutionError( f"Failed to fetch tools from {url}: {str(e)}" ) - return tools @@ -418,14 +568,34 @@ async def _execute_tool_call_simple( server_path: str = None, connection: Optional[MCPConnection] = None, output_type: Literal["json", "dict", "str"] = "str", + transport: Optional[str] = None, *args, **kwargs, ): - """Execute a tool call using the MCP client.""" + """ + Execute a tool call using the MCP client, supporting both SSE and streamable HTTP. + Args: + response (any): The tool call request. + server_path (str): The server URL. + connection (Optional[MCPConnection]): Optional connection object. + output_type (Literal): Output format type. + transport (Optional[str]): Transport type. If None, auto-detects. + Returns: + The tool call result in the specified output format. + Raises: + MCPExecutionError, MCPConnectionError + """ + logger.info( + f"_execute_tool_call_simple called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) if exists(connection): - headers, timeout, transport, url = connect_to_mcp_server( - connection + headers, timeout, transport_from_conn, url = ( + connect_to_mcp_server(connection) ) + if transport_from_conn: + transport = transport_from_conn else: headers, timeout, _transport, url = ( None, @@ -433,23 +603,25 @@ async def _execute_tool_call_simple( "sse", server_path, ) - try: - async with sse_client( - url=url, headers=headers, timeout=timeout, *args, **kwargs - ) as ( - read, - write, - ): + async with get_mcp_client( + transport, + url=url, + headers=headers, + timeout=timeout, + *args, + **kwargs, + ) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx async with ClientSession(read, write) as session: try: await session.initialize() - call_result = await call_openai_tool( - session=session, - openai_tool=response, + session=session, openai_tool=response ) - if output_type == "json": out = call_result.model_dump_json(indent=4) elif output_type == "dict": @@ -470,19 +642,21 @@ async def _execute_tool_call_simple( f"{key}: {value}" ) out = "\n".join(formatted_lines) - + else: + out = call_result.model_dump() + logger.info( + f"Tool call executed successfully for {server_path}" + ) return out - except Exception as e: logger.error(f"Error in tool execution: {str(e)}") raise MCPExecutionError( - f"Tool execution failed: {str(e)}" + f"Tool execution failed for tool '{getattr(response, 'function', {}).get('name', 'unknown')}' on server '{url}': {str(e)}" ) - except Exception as e: - logger.error(f"Error in SSE client connection: {str(e)}") + logger.error(f"Error in MCP client connection: {str(e)}") raise MCPConnectionError( - f"Failed to connect to MCP server: {str(e)}" + f"Failed to connect to MCP server '{url}' using transport '{transport}': {str(e)}" ) @@ -491,17 +665,34 @@ async def execute_tool_call_simple( server_path: str = None, connection: Optional[MCPConnection] = None, output_type: Literal["json", "dict", "str", "formatted"] = "str", + transport: Optional[str] = None, *args, **kwargs, ) -> List[Dict[str, Any]]: + """ + High-level async function to execute a tool call on an MCP server. + Args: + response (any): The tool call request. + server_path (str): The server URL. + connection (Optional[MCPConnection]): Optional connection object. + output_type (Literal): Output format type. + transport (Optional[str]): Transport type. If None, auto-detects. + Returns: + The tool call result in the specified output format. + """ + logger.info( + f"execute_tool_call_simple called for server_path: {server_path}" + ) + if transport is None: + transport = auto_detect_transport(server_path) if isinstance(response, str): response = json.loads(response) - return await _execute_tool_call_simple( response=response, server_path=server_path, connection=connection, output_type=output_type, + transport=transport, *args, **kwargs, ) @@ -511,36 +702,32 @@ def _create_server_tool_mapping( urls: List[str], connections: List[MCPConnection] = None, format: str = "openai", + transport: Optional[str] = None, ) -> Dict[str, Dict[str, Any]]: """ Create a mapping of function names to server information for all MCP servers. - Args: - urls: List of server URLs - connections: Optional list of MCPConnection objects - format: Format to fetch tools in - + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + format (str): Format to fetch tools in. + transport (Optional[str]): Transport type. If None, auto-detects per URL. Returns: - Dict mapping function names to server info (url, connection, tool) + Dict[str, Dict[str, Any]]: Mapping of function names to server info. """ server_tool_mapping = {} - for i, url in enumerate(urls): connection = ( connections[i] if connections and i < len(connections) else None ) - try: - # Get tools for this server tools = get_mcp_tools_sync( server_path=url, connection=connection, format=format, + transport=transport, ) - - # Create mapping for each tool for tool in tools: if isinstance(tool, dict) and "function" in tool: function_name = tool["function"]["name"] @@ -551,20 +738,17 @@ def _create_server_tool_mapping( "server_index": i, } elif hasattr(tool, "name"): - # Handle MCPTool objects server_tool_mapping[tool.name] = { "url": url, "connection": connection, "tool": tool, "server_index": i, } - except Exception as e: logger.warning( f"Failed to fetch tools from server {url}: {str(e)}" ) continue - return server_tool_mapping @@ -572,36 +756,32 @@ async def _create_server_tool_mapping_async( urls: List[str], connections: List[MCPConnection] = None, format: str = "openai", + transport: str = "sse", ) -> Dict[str, Dict[str, Any]]: """ Async version: Create a mapping of function names to server information for all MCP servers. - Args: - urls: List of server URLs - connections: Optional list of MCPConnection objects - format: Format to fetch tools in - + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + format (str): Format to fetch tools in. + transport (str): Transport type. Returns: - Dict mapping function names to server info (url, connection, tool) + Dict[str, Dict[str, Any]]: Mapping of function names to server info. """ server_tool_mapping = {} - for i, url in enumerate(urls): connection = ( connections[i] if connections and i < len(connections) else None ) - try: - # Get tools for this server using async function tools = await aget_mcp_tools( server_path=url, connection=connection, format=format, + transport=transport, ) - - # Create mapping for each tool for tool in tools: if isinstance(tool, dict) and "function" in tool: function_name = tool["function"]["name"] @@ -612,20 +792,17 @@ async def _create_server_tool_mapping_async( "server_index": i, } elif hasattr(tool, "name"): - # Handle MCPTool objects server_tool_mapping[tool.name] = { "url": url, "connection": connection, "tool": tool, "server_index": i, } - except Exception as e: logger.warning( f"Failed to fetch tools from server {url}: {str(e)}" ) continue - return server_tool_mapping @@ -633,17 +810,17 @@ async def _execute_tool_on_server( tool_call: Dict[str, Any], server_info: Dict[str, Any], output_type: Literal["json", "dict", "str", "formatted"] = "str", + transport: str = "sse", ) -> Dict[str, Any]: """ Execute a single tool call on a specific server. - Args: - tool_call: The tool call to execute - server_info: Server information from the mapping - output_type: Output format type - + tool_call (Dict[str, Any]): The tool call to execute. + server_info (Dict[str, Any]): Server information from the mapping. + output_type (Literal): Output format type. + transport (str): Transport type. Returns: - Execution result with server metadata + Dict[str, Any]: Execution result with server metadata. """ try: result = await _execute_tool_call_simple( @@ -651,8 +828,8 @@ async def _execute_tool_on_server( server_path=server_info["url"], connection=server_info["connection"], output_type=output_type, + transport=transport, ) - return { "server_url": server_info["url"], "server_index": server_info["server_index"], @@ -662,7 +839,6 @@ async def _execute_tool_on_server( "result": result, "status": "success", } - except Exception as e: logger.error( f"Failed to execute tool on server {server_info['url']}: {str(e)}" @@ -674,7 +850,7 @@ async def _execute_tool_on_server( "name", "unknown" ), "result": None, - "error": str(e), + "error": f"Custom error: Failed to execute tool '{tool_call.get('function', {}).get('name', 'unknown')}' on server '{server_info['url']}': {str(e)}", "status": "error", } @@ -685,79 +861,47 @@ async def execute_multiple_tools_on_multiple_mcp_servers( connections: List[MCPConnection] = None, output_type: Literal["json", "dict", "str", "formatted"] = "str", max_concurrent: Optional[int] = None, + transport: str = "sse", *args, **kwargs, ) -> List[Dict[str, Any]]: """ Execute multiple tool calls across multiple MCP servers. - - This function creates a mapping of function names to servers, then for each response - that contains tool calls, it finds the appropriate server for each function and - executes the calls concurrently. - Args: - responses: List of responses containing tool calls (OpenAI format) - urls: List of MCP server URLs - connections: Optional list of MCPConnection objects corresponding to each URL - output_type: Output format type for results - max_concurrent: Maximum number of concurrent executions (default: len(responses)) - + responses (List[Dict[str, Any]]): List of tool call requests. + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + output_type (Literal): Output format type. + max_concurrent (Optional[int]): Max concurrent tasks. + transport (str): Transport type. Returns: - List of execution results with server metadata - - Example: - # Example responses format: - responses = [ - { - "function": { - "name": "search_web", - "arguments": {"query": "python programming"} - } - }, - { - "function": { - "name": "search_database", - "arguments": {"table": "users", "id": 123} - } - } - ] - - urls = ["http://server1:8000", "http://server2:8000"] - - results = await execute_multiple_tools_on_multiple_mcp_servers( - responses=responses, - urls=urls - ) + List[Dict[str, Any]]: List of execution results. """ if not responses: logger.warning("No responses provided for execution") return [] - if not urls: raise MCPValidationError("No server URLs provided") - - # Create mapping of function names to servers using async version - logger.info(f"Creating tool mapping for {len(urls)} servers") + logger.info( + f"Creating tool mapping for {len(urls)} servers using transport: {transport}" + ) server_tool_mapping = await _create_server_tool_mapping_async( - urls=urls, connections=connections, format="openai" + urls=urls, + connections=connections, + format="openai", + transport=transport, ) - if not server_tool_mapping: raise MCPExecutionError( "No tools found on any of the provided servers" ) - logger.info( f"Found {len(server_tool_mapping)} unique functions across all servers" ) - - # Extract all tool calls from responses all_tool_calls = [] logger.info( f"Processing {len(responses)} responses for tool call extraction" ) - - # Check if responses are individual characters that need to be reconstructed if len(responses) > 10 and all( isinstance(r, str) and len(r) == 1 for r in responses ): @@ -772,8 +916,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.debug( f"Reconstructed response: {reconstructed_response}" ) - - # Try to parse the reconstructed response to validate it try: json.loads(reconstructed_response) logger.info( @@ -789,19 +931,15 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.debug( f"Last 100 chars: {reconstructed_response[-100:]}" ) - responses = [reconstructed_response] except Exception as e: logger.warning( f"Failed to reconstruct response from characters: {str(e)}" ) - for i, response in enumerate(responses): logger.debug( f"Processing response {i}: {type(response)} - {response}" ) - - # Handle JSON string responses if isinstance(response, str): try: response = json.loads(response) @@ -813,14 +951,11 @@ async def execute_multiple_tools_on_multiple_mcp_servers( f"Failed to parse JSON response at index {i}: {response}" ) continue - if isinstance(response, dict): - # Single tool call if "function" in response: logger.debug( f"Found single tool call in response {i}: {response['function']}" ) - # Parse arguments if they're a JSON string if isinstance( response["function"].get("arguments"), str ): @@ -837,15 +972,12 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.warning( f"Failed to parse function arguments: {response['function']['arguments']}" ) - all_tool_calls.append((i, response)) - # Multiple tool calls elif "tool_calls" in response: logger.debug( f"Found multiple tool calls in response {i}: {len(response['tool_calls'])} calls" ) for tool_call in response["tool_calls"]: - # Parse arguments if they're a JSON string if isinstance( tool_call.get("function", {}).get( "arguments" @@ -865,14 +997,11 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.warning( f"Failed to parse tool call arguments: {tool_call['function']['arguments']}" ) - all_tool_calls.append((i, tool_call)) - # Direct tool call elif "name" in response and "arguments" in response: logger.debug( f"Found direct tool call in response {i}: {response}" ) - # Parse arguments if they're a JSON string if isinstance(response.get("arguments"), str): try: response["arguments"] = json.loads( @@ -885,7 +1014,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( logger.warning( f"Failed to parse direct tool call arguments: {response['arguments']}" ) - all_tool_calls.append((i, {"function": response})) else: logger.debug( @@ -896,14 +1024,10 @@ async def execute_multiple_tools_on_multiple_mcp_servers( f"Unsupported response type at index {i}: {type(response)}" ) continue - if not all_tool_calls: logger.warning("No tool calls found in responses") return [] - logger.info(f"Found {len(all_tool_calls)} tool calls to execute") - - # Execute tool calls concurrently max_concurrent = max_concurrent or len(all_tool_calls) semaphore = asyncio.Semaphore(max_concurrent) @@ -913,7 +1037,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( function_name = tool_call.get("function", {}).get( "name", "unknown" ) - if function_name not in server_tool_mapping: logger.warning( f"Function '{function_name}' not found on any server" @@ -925,24 +1048,21 @@ async def execute_multiple_tools_on_multiple_mcp_servers( "error": f"Function '{function_name}' not available on any server", "status": "not_found", } - server_info = server_tool_mapping[function_name] result = await _execute_tool_on_server( tool_call=tool_call, server_info=server_info, output_type=output_type, + transport=transport, ) result["response_index"] = response_index return result - # Execute all tool calls concurrently tasks = [ execute_with_semaphore(tool_call_info) for tool_call_info in all_tool_calls ] results = await asyncio.gather(*tasks, return_exceptions=True) - - # Process results and handle exceptions processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): @@ -964,7 +1084,6 @@ async def execute_multiple_tools_on_multiple_mcp_servers( ) else: processed_results.append(result) - logger.info( f"Completed execution of {len(processed_results)} tool calls" ) @@ -977,21 +1096,21 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync( connections: List[MCPConnection] = None, output_type: Literal["json", "dict", "str", "formatted"] = "str", max_concurrent: Optional[int] = None, + transport: str = "sse", *args, **kwargs, ) -> List[Dict[str, Any]]: """ Synchronous version of execute_multiple_tools_on_multiple_mcp_servers. - Args: - responses: List of responses containing tool calls (OpenAI format) - urls: List of MCP server URLs - connections: Optional list of MCPConnection objects corresponding to each URL - output_type: Output format type for results - max_concurrent: Maximum number of concurrent executions - + responses (List[Dict[str, Any]]): List of tool call requests. + urls (List[str]): List of server URLs. + connections (List[MCPConnection]): Optional list of MCPConnection objects. + output_type (Literal): Output format type. + max_concurrent (Optional[int]): Max concurrent tasks. + transport (str): Transport type. Returns: - List of execution results with server metadata + List[Dict[str, Any]]: List of execution results. """ with get_or_create_event_loop() as loop: try: @@ -1002,6 +1121,7 @@ def execute_multiple_tools_on_multiple_mcp_servers_sync( connections=connections, output_type=output_type, max_concurrent=max_concurrent, + transport=transport, *args, **kwargs, ) diff --git a/swarms/utils/function_caller_model.py b/swarms/utils/function_caller_model.py index 36642308..fb9135fc 100644 --- a/swarms/utils/function_caller_model.py +++ b/swarms/utils/function_caller_model.py @@ -1,23 +1,11 @@ import os -import subprocess from concurrent.futures import ThreadPoolExecutor from typing import List -from loguru import logger from pydantic import BaseModel -try: - from openai import OpenAI -except ImportError: - logger.error( - "OpenAI library not found. Please install the OpenAI library by running 'pip install openai'" - ) - import sys - - subprocess.run([sys.executable, "-m", "pip", "install", "openai"]) - from openai import OpenAI - +from openai import OpenAI SUPPORTED_MODELS = [ "o3-mini-2025-1-31", From 9ed09f44b5d92a0838b42bbf74890b43cef05045 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 24 Jul 2025 10:50:18 -0700 Subject: [PATCH 09/11] fix protocol links in docs --- docs/mkdocs.yml | 4 ++-- client_example.py => examples/api/client_example.py | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename client_example.py => examples/api/client_example.py (100%) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index c06f7daa..c39ee197 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -317,8 +317,8 @@ nav: - Communication Structure: "swarms/structs/conversation.md" - Protocol: - - Overview: "swarms/protocol/overview.md" - - SIPs: "swarms/protocol/sip.md" + - Overview: "protocol/overview.md" + - SIPs: "protocol/sip.md" - Tools: - Overview: "swarms_tools/overview.md" diff --git a/client_example.py b/examples/api/client_example.py similarity index 100% rename from client_example.py rename to examples/api/client_example.py From 8f3e2aef840b1b32b738a0906f9252712196b2e2 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 24 Jul 2025 12:19:51 -0700 Subject: [PATCH 10/11] sips in protocol overview docs --- docs/protocol/overview.md | 57 +++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/docs/protocol/overview.md b/docs/protocol/overview.md index 43cdec2a..74445d17 100644 --- a/docs/protocol/overview.md +++ b/docs/protocol/overview.md @@ -24,30 +24,25 @@ The Swarms protocol is organized into several key layers, each responsible for a (stateful, with memory and reasoning capabilities). - Agents can be specialized for different tasks (e.g., reasoning agents, tool agents, judge agents, etc.). - - Example: A `ReasoningAgent` that can analyze data and make decisions, or a `ToolAgent` that wraps external APIs. - - [Quickstart for Agents](https://docs.swarms.world/en/latest/swarms/agents/) - - [Agent API Reference](https://docs.swarms.world/en/latest/swarms/structs/agent/) - 2. **Tools with Memory (`swarms/tools`, `swarms/utils`)** - Tools are modular components that agents use to interact with the outside world, perform computations, or access resources (APIs, databases, files, etc.). - Memory modules and utility functions allow agents to retain context, cache results, and manage state across interactions. - + - Example: A tool for calling an LLM API, a memory cache for conversation history, or a utility for parsing and formatting data. - + - [Tools Overview](https://docs.swarms.world/en/latest/swarms_tools/overview/) - + - [BaseTool Reference](https://docs.swarms.world/en/latest/swarms/tools/base_tool/) - 3. **Reasoning & Specialized Agents (`swarms/agents`)** - These agents build on the base agent class, adding advanced reasoning, self-consistency, and specialized logic for tasks like planning, evaluation, or multi-step workflows. @@ -58,12 +53,11 @@ The Swarms protocol is organized into several key layers, each responsible for a agents. - [Reasoning Agents Overview](https://docs.swarms.world/en/latest/swarms/agents/reasoning_agents_overview/) - + - [Self Consistency Agent](https://docs.swarms.world/en/latest/swarms/agents/consistency_agent/) - + - [Agent Judge](https://docs.swarms.world/en/latest/swarms/agents/agent_judge/) - 4. **Multi-Agent Structures (`swarms/structs`)** - Agents are composed into higher-order structures for collaboration, voting, parallelism, and workflow orchestration. @@ -73,16 +67,15 @@ The Swarms protocol is organized into several key layers, each responsible for a sub-agents. - [Multi-Agent Architectures Overview](https://docs.swarms.world/en/latest/swarms/concept/swarm_architectures/) - + - [MajorityVotingSwarm](https://docs.swarms.world/en/latest/swarms/structs/majorityvoting/) - + - [HierarchicalSwarm](https://docs.swarms.world/en/latest/swarms/structs/hierarchical_swarm/) - + - [Sequential Workflow](https://docs.swarms.world/en/latest/swarms/structs/sequential_workflow/) - + - [Concurrent Workflow](https://docs.swarms.world/en/latest/swarms/structs/concurrentworkflow/) - 5. **Supporting Components** - **Communication (`swarms/communication`)**: Provides wrappers for inter-agent communication, database access, message passing, and @@ -95,7 +88,7 @@ The Swarms protocol is organized into several key layers, each responsible for a [Prompts Management](https://docs.swarms.world/en/latest/swarms/prompts/main/) - **Telemetry (`swarms/telemetry`)**: Handles logging, monitoring, and bootup routines for observability and debugging. - + - **Schemas (`swarms/schemas`)**: Defines data schemas for agents, tools, completions, and communication protocols, ensuring type safety and consistency. @@ -104,6 +97,34 @@ The Swarms protocol is organized into several key layers, each responsible for a --- +## Proposing Large Improvements or Enhancements: Swarms Improvement Proposals (SIPs) + +For significant changes, new agent architectures, or radical new features, Swarms uses a formal process called **Swarms Improvement Proposals (SIPs)**. SIPs are design documents that describe new features, enhancements, or changes to the Swarms framework. They ensure that major changes are well-documented, discussed, and reviewed by the community before implementation. + +**When to use a SIP:** + +- Proposing new agent types, swarm patterns, or coordination mechanisms + +- Core framework changes or breaking changes + +- New integrations (LLM providers, tools, external services) + +- Any complex or multi-component feature + +**SIP Process Overview:** + +1. Discuss your idea in [GitHub Discussions](https://github.com/kyegomez/swarms/discussions) + +2. Submit a SIP as a GitHub Issue using the SIP template + +3. Engage with the community and iterate on your proposal + +4. Undergo review and, if accepted, proceed to implementation + +**Learn more:** See the full [SIP Guidelines and Template](https://docs.swarms.world/en/latest/protocol/sip/) + +--- + ## Detailed Architecture Diagram The following Mermaid diagram visualizes the protocol flow and the relationship between the main folders in the `swarms/` package: @@ -409,6 +430,8 @@ For more on the philosophy and architecture, see [Development Philosophy & Princ - [Understanding Swarms Architecture](https://docs.swarms.world/en/latest/swarms/concept/framework_architecture/) +- [SIP Guidelines and Template](https://docs.swarms.world/en/latest/protocol/sip/) + # Conclusion From f59704db44c6b78562ccd1ee429776a3e451fa5d Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Thu, 24 Jul 2025 16:31:00 -0700 Subject: [PATCH 11/11] fix docs protocols --- docs/mkdocs.yml | 24 ++++++++++++------------ swarms/structs/interactive_groupchat.py | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index c39ee197..e48e92a7 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -225,7 +225,15 @@ nav: - Quickstart: "quickstart.md" - Agents: "swarms/agents/index.md" - Multi-Agent Architectures: "swarms/structs/index.md" + + - Protocol: + - Overview: "protocol/overview.md" + - SIPs: "protocol/sip.md" - Feature Set: "swarms/features.md" + - Swarms Ecosystem: "swarms/ecosystem.md" + - Technical Support: "swarms/support.md" + + - Agents: - Overview: "swarms/framework/agents_explained.md" - Quickstart: "swarms/agents/index.md" @@ -316,10 +324,6 @@ nav: - Communication Structure: "swarms/structs/conversation.md" - - Protocol: - - Overview: "protocol/overview.md" - - SIPs: "protocol/sip.md" - - Tools: - Overview: "swarms_tools/overview.md" - BaseTool Reference: "swarms/tools/base_tool.md" @@ -343,10 +347,6 @@ nav: - Deploy on Phala: "swarms_cloud/phala_deploy.md" # - Deploy on FastAPI: "swarms_cloud/fastapi_deploy.md" - - More About Us: - - Swarms Ecosystem: "swarms/ecosystem.md" - - Technical Support: "swarms/support.md" - - Examples: - Overview: "examples/index.md" @@ -488,7 +488,7 @@ nav: - Understanding Swarms Architecture: "swarms/concept/framework_architecture.md" - Development Philosophy & Principles: "swarms/concept/philosophy.md" - - About Swarms: - - Vision & Mission: "swarms/concept/vision.md" - - Swarm Ecosystem: "swarms/concept/swarm_ecosystem.md" - - Products: "swarms/products.md" + # - About Swarms: + # - Vision & Mission: "swarms/concept/vision.md" + # - Swarm Ecosystem: "swarms/concept/swarm_ecosystem.md" + # - Products: "swarms/products.md" diff --git a/swarms/structs/interactive_groupchat.py b/swarms/structs/interactive_groupchat.py index f87a2091..53e786ff 100644 --- a/swarms/structs/interactive_groupchat.py +++ b/swarms/structs/interactive_groupchat.py @@ -963,15 +963,21 @@ Remember: You are part of a team. Your response should reflect that you've read, to respond to the user query. """ # Filter out invalid agents - valid_agents = [name for name in mentioned_agents if name in self.agent_map] - + valid_agents = [ + name + for name in mentioned_agents + if name in self.agent_map + ] + if not valid_agents: - raise AgentNotFoundError("No valid agents found in the conversation") - + raise AgentNotFoundError( + "No valid agents found in the conversation" + ) + # Randomly select exactly one agent to respond random_agent = random.choice(valid_agents) logger.info(f"Random speaker selected: {random_agent}") - + # Get response from the randomly selected agent self._get_agent_response(random_agent, img, imgs) @@ -1015,7 +1021,7 @@ Remember: You are part of a team. Your response should reflect that you've read, # Use the specialized function for random_speaker self._process_random_speaker( mentioned_agents, img, imgs - ) + ) else: self._process_static_speakers( mentioned_agents, img, imgs