diff --git a/examples/multi_agent/asb/asb_research.py b/asb_research.py similarity index 69% rename from examples/multi_agent/asb/asb_research.py rename to asb_research.py index 9f09d1af..0afa08b6 100644 --- a/examples/multi_agent/asb/asb_research.py +++ b/asb_research.py @@ -1,16 +1,15 @@ import orjson -from dotenv import load_dotenv -from swarms.structs.auto_swarm_builder import AutoSwarmBuilder +from swarms import AutoSwarmBuilder -load_dotenv() swarm = AutoSwarmBuilder( name="My Swarm", description="My Swarm Description", verbose=True, max_loops=1, - return_agents=True, + execution_type="return-agents", + model_name="gpt-4.1", ) result = swarm.run( diff --git a/examples/aop_examples/README.md b/examples/aop_examples/README.md new file mode 100644 index 00000000..0dfcfe1e --- /dev/null +++ b/examples/aop_examples/README.md @@ -0,0 +1,66 @@ +# AOP Examples + +This directory contains runnable examples that demonstrate AOP (Agents over Protocol) patterns in Swarms: spinning up a simple MCP server, discovering available agents/tools, and invoking agent tools from client scripts. + +## What’s inside + +- **Top-level demos** + - [`example_new_agent_tools.py`](./example_new_agent_tools.py): End‑to‑end demo of agent discovery utilities (list/search agents, get details for one or many). Targets an MCP server at `http://localhost:5932/mcp`. + - [`list_agents_and_call_them.py`](./list_agents_and_call_them.py): Utility helpers to fetch tools from an MCP server and call an agent‑style tool with a task prompt. Defaults to `http://localhost:8000/mcp`. + - [`get_all_agents.py`](./get_all_agents.py): Minimal snippet to print all tools exposed by an MCP server as JSON. Defaults to `http://0.0.0.0:8000/mcp`. + +- **Server** + - [`server/server.py`](./server/server.py): Simple MCP server entrypoint you can run locally to expose tools/agents for the client examples. + +- **Client** + - [`client/aop_cluster_example.py`](./client/aop_cluster_example.py): Connect to an AOP cluster and interact with agents. + - [`client/aop_queue_example.py`](./client/aop_queue_example.py): Example of queue‑style task submission to agents. + - [`client/aop_raw_task_example.py`](./client/aop_raw_task_example.py): Shows how to send a raw task payload without additional wrappers. + - [`client/aop_raw_client_code.py`](./client/aop_raw_client_code.py): Minimal, low‑level client calls against the MCP endpoint. + +- **Discovery** + - [`discovery/example_agent_communication.py`](./discovery/example_agent_communication.py): Illustrates simple agent‑to‑agent or agent‑to‑service communication patterns. + - [`discovery/example_aop_discovery.py`](./discovery/example_aop_discovery.py): Demonstrates discovering available agents/tools via AOP. + - [`discovery/simple_discovery_example.py`](./discovery/simple_discovery_example.py): A pared‑down discovery walkthrough. + - [`discovery/test_aop_discovery.py`](./discovery/test_aop_discovery.py): Test‑style script validating discovery functionality. + +## Prerequisites + +- Python environment with project dependencies installed. +- An MCP server running locally (you can use the provided server example). + +## Quick start + +1. Start a local MCP server (in a separate terminal): + +```bash +python examples/aop_examples/server/server.py +``` + +1. Try discovery utilities (adjust the URL if your server uses a different port): + +```bash +# List exposed tools (defaults to http://0.0.0.0:8000/mcp) +python examples/aop_examples/get_all_agents.py + +# Fetch tools and call the first agent-like tool (defaults to http://localhost:8000/mcp) +python examples/aop_examples/list_agents_and_call_them.py + +# Rich demo of agent info utilities (expects http://localhost:5932/mcp by default) +python examples/aop_examples/example_new_agent_tools.py +``` + +1. Explore client variants: + +```bash +python examples/aop_examples/client/aop_cluster_example.py +python examples/aop_examples/client/aop_queue_example.py +python examples/aop_examples/client/aop_raw_task_example.py +python examples/aop_examples/client/aop_raw_client_code.py +``` + +## Tips + +- **Server URL/port**: Several examples assume `http://localhost:8000/mcp` or `http://localhost:5932/mcp`. If your server runs elsewhere, update the `server_path`/URL variables at the top of the scripts. +- **Troubleshooting**: If a script reports “No tools available”, ensure the MCP server is running and that the endpoint path (`/mcp`) and port match the script. +- **Next steps**: Use these scripts as templates—swap in your own tools/agents, change the search queries, or extend the client calls to fit your workflow. diff --git a/examples/aop_examples/aop_cluster_example.py b/examples/aop_examples/aop_cluster_example.py deleted file mode 100644 index a4ab85cc..00000000 --- a/examples/aop_examples/aop_cluster_example.py +++ /dev/null @@ -1,68 +0,0 @@ -import json -import asyncio - -from swarms.structs.aop import AOPCluster -from swarms.tools.mcp_client_tools import execute_tool_call_simple - - -async def discover_agents_example(): - """Example of how to call the discover_agents tool.""" - - # Create AOP cluster connection - aop_cluster = AOPCluster( - urls=["http://localhost:5932/mcp"], - transport="streamable-http", - ) - - # Check if discover_agents tool is available - discover_tool = aop_cluster.find_tool_by_server_name( - "discover_agents" - ) - if discover_tool: - try: - # Create the tool call request - tool_call_request = { - "type": "function", - "function": { - "name": "discover_agents", - "arguments": json.dumps( - {} - ), # No specific agent name = get all - }, - } - - # Execute the tool call - result = await execute_tool_call_simple( - response=tool_call_request, - server_path="http://localhost:5932/mcp", - output_type="dict", - verbose=False, - ) - - print(json.dumps(result, indent=2)) - - # Parse the result - if isinstance(result, list) and len(result) > 0: - discovery_data = result[0] - if discovery_data.get("success"): - agents = discovery_data.get("agents", []) - return agents - else: - return None - else: - return None - - except Exception: - return None - else: - return None - - -def main(): - """Main function to run the discovery example.""" - # Run the async function - return asyncio.run(discover_agents_example()) - - -if __name__ == "__main__": - main() diff --git a/examples/aop_examples/client/aop_cluster_example.py b/examples/aop_examples/client/aop_cluster_example.py new file mode 100644 index 00000000..13221604 --- /dev/null +++ b/examples/aop_examples/client/aop_cluster_example.py @@ -0,0 +1,47 @@ +import json +import asyncio + +from swarms.structs.aop import AOPCluster +from swarms.tools.mcp_client_tools import execute_tool_call_simple + + +async def discover_agents_example(): + """ + Discover all agents using the AOPCluster and print the result. + """ + aop_cluster = AOPCluster( + urls=["http://localhost:5932/mcp"], + transport="streamable-http", + ) + tool = aop_cluster.find_tool_by_server_name("discover_agents") + if not tool: + print("discover_agents tool not found.") + return None + + tool_call_request = { + "type": "function", + "function": { + "name": "discover_agents", + "arguments": "{}", + }, + } + + result = await execute_tool_call_simple( + response=tool_call_request, + server_path="http://localhost:5932/mcp", + output_type="dict", + verbose=False, + ) + print(json.dumps(result, indent=2)) + return result + + +def main(): + """ + Run the discover_agents_example coroutine. + """ + asyncio.run(discover_agents_example()) + + +if __name__ == "__main__": + main() diff --git a/examples/aop_examples/client/aop_queue_example.py b/examples/aop_examples/client/aop_queue_example.py new file mode 100644 index 00000000..a049af64 --- /dev/null +++ b/examples/aop_examples/client/aop_queue_example.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +""" +Example demonstrating the AOP queue system for agent execution. + +This example shows how to use the new queue-based execution system +in the AOP framework for improved performance and reliability. +""" + +import time +from swarms import Agent +from swarms.structs.aop import AOP + + +def main(): + """Demonstrate AOP queue functionality.""" + + # Create some sample agents + agent1 = Agent( + agent_name="Research Agent", + agent_description="Specialized in research tasks", + model_name="gpt-4", + max_loops=1, + ) + + agent2 = Agent( + agent_name="Writing Agent", + agent_description="Specialized in writing tasks", + model_name="gpt-4", + max_loops=1, + ) + + # Create AOP with queue enabled + aop = AOP( + server_name="Queue Demo Cluster", + description="A demonstration of queue-based agent execution", + queue_enabled=True, + max_workers_per_agent=2, # 2 workers per agent + max_queue_size_per_agent=100, # Max 100 tasks per queue + processing_timeout=60, # 60 second timeout + retry_delay=2.0, # 2 second delay between retries + verbose=True, + ) + + # Add agents to the cluster + print("Adding agents to cluster...") + aop.add_agent(agent1, tool_name="researcher") + aop.add_agent(agent2, tool_name="writer") + + # Get initial queue stats + print("\nInitial queue stats:") + stats = aop.get_queue_stats() + print(f"Stats: {stats}") + + # Add some tasks to the queues + print("\nAdding tasks to queues...") + + # Add high priority research task + research_task_id = aop.task_queues["researcher"].add_task( + task="Research the latest developments in quantum computing", + priority=10, # High priority + max_retries=2, + ) + print(f"Added research task: {research_task_id}") + + # Add medium priority writing task + writing_task_id = aop.task_queues["writer"].add_task( + task="Write a summary of AI trends in 2024", + priority=5, # Medium priority + max_retries=3, + ) + print(f"Added writing task: {writing_task_id}") + + # Add multiple low priority tasks + for i in range(3): + task_id = aop.task_queues["researcher"].add_task( + task=f"Research task {i+1}: Analyze market trends", + priority=1, # Low priority + max_retries=1, + ) + print(f"Added research task {i+1}: {task_id}") + + # Get updated queue stats + print("\nUpdated queue stats:") + stats = aop.get_queue_stats() + print(f"Stats: {stats}") + + # Monitor task progress + print("\nMonitoring task progress...") + for _ in range(10): # Monitor for 10 iterations + time.sleep(1) + + # Check research task status + research_status = aop.get_task_status( + "researcher", research_task_id + ) + print( + f"Research task status: {research_status['task']['status'] if research_status['success'] else 'Error'}" + ) + + # Check writing task status + writing_status = aop.get_task_status( + "writer", writing_task_id + ) + print( + f"Writing task status: {writing_status['task']['status'] if writing_status['success'] else 'Error'}" + ) + + # Get current queue stats + current_stats = aop.get_queue_stats() + if current_stats["success"]: + for agent_name, agent_stats in current_stats[ + "stats" + ].items(): + print( + f"{agent_name}: {agent_stats['pending_tasks']} pending, {agent_stats['processing_tasks']} processing, {agent_stats['completed_tasks']} completed" + ) + + print("---") + + # Demonstrate queue management + print("\nDemonstrating queue management...") + + # Pause the research agent queue + print("Pausing research agent queue...") + aop.pause_agent_queue("researcher") + + # Get queue status + research_queue_status = aop.task_queues["researcher"].get_status() + print(f"Research queue status: {research_queue_status.value}") + + # Resume the research agent queue + print("Resuming research agent queue...") + aop.resume_agent_queue("researcher") + + # Clear all queues + print("Clearing all queues...") + cleared = aop.clear_all_queues() + print(f"Cleared tasks: {cleared}") + + # Final stats + print("\nFinal queue stats:") + final_stats = aop.get_queue_stats() + print(f"Final stats: {final_stats}") + + print("\nQueue demonstration completed!") + + +if __name__ == "__main__": + main() diff --git a/examples/aop_examples/client/aop_raw_client_code.py b/examples/aop_examples/client/aop_raw_client_code.py new file mode 100644 index 00000000..19b1efd7 --- /dev/null +++ b/examples/aop_examples/client/aop_raw_client_code.py @@ -0,0 +1,88 @@ +import json +import asyncio + +from swarms.structs.aop import AOPCluster +from swarms.tools.mcp_client_tools import execute_tool_call_simple +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + + +async def discover_agents_example(): + """ + Discover all agents using the AOPCluster and print the result. + """ + aop_cluster = AOPCluster( + urls=["http://localhost:5932/mcp"], + transport="streamable-http", + ) + tool = aop_cluster.find_tool_by_server_name("discover_agents") + if not tool: + print("discover_agents tool not found.") + return None + + tool_call_request = { + "type": "function", + "function": { + "name": "discover_agents", + "arguments": "{}", + }, + } + + result = await execute_tool_call_simple( + response=tool_call_request, + server_path="http://localhost:5932/mcp", + output_type="dict", + verbose=False, + ) + print(json.dumps(result, indent=2)) + return result + + +async def raw_mcp_discover_agents_example(): + """ + Call the MCP server directly using the raw MCP client to execute the + built-in "discover_agents" tool and print the JSON result. + + This demonstrates how to: + - Initialize an MCP client over streamable HTTP + - List available tools (optional) + - Call a specific tool by name with arguments + """ + url = "http://localhost:5932/mcp" + + # Open a raw MCP client connection + async with streamablehttp_client(url, timeout=10) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx + + async with ClientSession(read, write) as session: + # Initialize the MCP session and optionally inspect tools + await session.initialize() + + # Optional: list tools (uncomment to print) + # tools = await session.list_tools() + # print(json.dumps(tools.model_dump(), indent=2)) + + # Call the built-in discovery tool with empty arguments + result = await session.call_tool( + name="discover_agents", + arguments={}, + ) + + # Convert to dict for pretty printing + print(json.dumps(result.model_dump(), indent=2)) + return result.model_dump() + + +def main(): + """ + Run the helper-based and raw MCP client discovery examples. + """ + asyncio.run(discover_agents_example()) + asyncio.run(raw_mcp_discover_agents_example()) + + +if __name__ == "__main__": + main() diff --git a/examples/aop_examples/client/aop_raw_task_example.py b/examples/aop_examples/client/aop_raw_task_example.py new file mode 100644 index 00000000..b1b0dba3 --- /dev/null +++ b/examples/aop_examples/client/aop_raw_task_example.py @@ -0,0 +1,107 @@ +import json +import asyncio + +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + + +async def call_agent_tool_raw( + url: str, + tool_name: str, + task: str, + img: str | None = None, + imgs: list[str] | None = None, + correct_answer: str | None = None, +) -> dict: + """ + Call a specific agent tool on an MCP server using the raw MCP client. + + Args: + url: MCP server URL (e.g., "http://localhost:5932/mcp"). + tool_name: Name of the tool/agent to invoke. + task: Task prompt to execute. + img: Optional single image path/URL. + imgs: Optional list of image paths/URLs. + correct_answer: Optional expected answer for validation. + + Returns: + A dict containing the tool's JSON response. + """ + # Open a raw MCP client connection over streamable HTTP + async with streamablehttp_client(url, timeout=30) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx + + async with ClientSession(read, write) as session: + # Initialize the MCP session + await session.initialize() + + # Prepare arguments in the canonical AOP tool format + arguments: dict = {"task": task} + if img is not None: + arguments["img"] = img + if imgs is not None: + arguments["imgs"] = imgs + if correct_answer is not None: + arguments["correct_answer"] = correct_answer + + # Invoke the tool by name + result = await session.call_tool( + name=tool_name, arguments=arguments + ) + + # Convert to dict for return/printing + return result.model_dump() + + +async def list_available_tools(url: str) -> dict: + """ + List tools from an MCP server using the raw client. + + Args: + url: MCP server URL (e.g., "http://localhost:5932/mcp"). + + Returns: + A dict representation of the tools listing. + """ + async with streamablehttp_client(url, timeout=30) as ctx: + if len(ctx) == 2: + read, write = ctx + else: + read, write, *_ = ctx + + async with ClientSession(read, write) as session: + await session.initialize() + tools = await session.list_tools() + return tools.model_dump() + + +def main() -> None: + """ + Demonstration entrypoint: list tools, then call a specified tool with a task. + """ + url = "http://localhost:5932/mcp" + tool_name = "Research-Agent" # Change to your agent tool name + task = "Summarize the latest advances in agent orchestration protocols." + + # List tools + tools_info = asyncio.run(list_available_tools(url)) + print("Available tools:") + print(json.dumps(tools_info, indent=2)) + + # Call the tool + print(f"\nCalling tool '{tool_name}' with task...\n") + result = asyncio.run( + call_agent_tool_raw( + url=url, + tool_name=tool_name, + task=task, + ) + ) + print(json.dumps(result, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/examples/aop_examples/server.py b/examples/aop_examples/server/server.py similarity index 100% rename from examples/aop_examples/server.py rename to examples/aop_examples/server/server.py diff --git a/uvloop_example.py b/examples/multi_agent/uvloop_example.py similarity index 100% rename from uvloop_example.py rename to examples/multi_agent/uvloop_example.py diff --git a/swarms/structs/aop.py b/swarms/structs/aop.py index d04b956e..8896678a 100644 --- a/swarms/structs/aop.py +++ b/swarms/structs/aop.py @@ -1,8 +1,13 @@ import asyncio import sys import traceback -from dataclasses import dataclass +import threading +import time +from collections import deque +from dataclasses import dataclass, field +from enum import Enum from typing import Any, Dict, List, Literal, Optional +from uuid import uuid4 from loguru import logger from mcp.server.fastmcp import FastMCP @@ -14,6 +19,507 @@ from swarms.tools.mcp_client_tools import ( ) +class TaskStatus(Enum): + """Status of a task in the queue.""" + + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class QueueStatus(Enum): + """Status of a task queue.""" + + RUNNING = "running" + PAUSED = "paused" + STOPPED = "stopped" + + +@dataclass +class Task: + """ + Represents a task to be executed by an agent. + + Attributes: + task_id: Unique identifier for the task + task: The task or prompt to execute + img: Optional image to be processed + imgs: Optional list of images to be processed + correct_answer: Optional correct answer for validation + priority: Task priority (higher number = higher priority) + created_at: Timestamp when task was created + status: Current status of the task + result: Result of task execution + error: Error message if task failed + retry_count: Number of times task has been retried + max_retries: Maximum number of retries allowed + """ + + task_id: str = field(default_factory=lambda: str(uuid4())) + task: str = "" + img: Optional[str] = None + imgs: Optional[List[str]] = None + correct_answer: Optional[str] = None + priority: int = 0 + created_at: float = field(default_factory=time.time) + status: TaskStatus = TaskStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + retry_count: int = 0 + max_retries: int = 3 + + +@dataclass +class QueueStats: + """ + Statistics for a task queue. + + Attributes: + total_tasks: Total number of tasks processed + completed_tasks: Number of successfully completed tasks + failed_tasks: Number of failed tasks + pending_tasks: Number of tasks currently pending + processing_tasks: Number of tasks currently being processed + average_processing_time: Average time to process a task + queue_size: Current size of the queue + """ + + total_tasks: int = 0 + completed_tasks: int = 0 + failed_tasks: int = 0 + pending_tasks: int = 0 + processing_tasks: int = 0 + average_processing_time: float = 0.0 + queue_size: int = 0 + + +class TaskQueue: + """ + A thread-safe task queue for managing agent tasks. + + This class provides functionality to: + 1. Add tasks to the queue with priority support + 2. Process tasks in background workers + 3. Handle task retries and error management + 4. Provide queue statistics and monitoring + """ + + def __init__( + self, + agent_name: str, + agent: AgentType, + max_workers: int = 1, + max_queue_size: int = 1000, + processing_timeout: int = 30, + retry_delay: float = 1.0, + verbose: bool = False, + ): + """ + Initialize the task queue. + + Args: + agent_name: Name of the agent this queue belongs to + agent: The agent instance to execute tasks + max_workers: Maximum number of worker threads + max_queue_size: Maximum number of tasks in queue + processing_timeout: Timeout for task processing in seconds + retry_delay: Delay between retries in seconds + verbose: Enable verbose logging + """ + self.agent_name = agent_name + self.agent = agent + self.max_workers = max_workers + self.max_queue_size = max_queue_size + self.processing_timeout = processing_timeout + self.retry_delay = retry_delay + self.verbose = verbose + + # Queue management + self._queue = deque() + self._lock = threading.RLock() + self._status = QueueStatus.STOPPED + self._workers = [] + self._stop_event = threading.Event() + + # Statistics + self._stats = QueueStats() + self._processing_times = deque( + maxlen=100 + ) # Keep last 100 processing times + + # Task tracking + self._tasks = {} # task_id -> Task + self._processing_tasks = ( + set() + ) # Currently processing task IDs + + logger.info( + f"Initialized TaskQueue for agent '{agent_name}' with {max_workers} workers" + ) + + def add_task( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + correct_answer: Optional[str] = None, + priority: int = 0, + max_retries: int = 3, + ) -> str: + """ + Add a task to the queue. + + Args: + task: The task or prompt to execute + img: Optional image to be processed + imgs: Optional list of images to be processed + correct_answer: Optional correct answer for validation + priority: Task priority (higher number = higher priority) + max_retries: Maximum number of retries allowed + + Returns: + str: Task ID + + Raises: + ValueError: If queue is full or task is invalid + """ + if not task: + raise ValueError("Task cannot be empty") + + with self._lock: + if len(self._queue) >= self.max_queue_size: + raise ValueError( + f"Queue is full (max size: {self.max_queue_size})" + ) + + task_obj = Task( + task=task, + img=img, + imgs=imgs, + correct_answer=correct_answer, + priority=priority, + max_retries=max_retries, + ) + + # Insert task based on priority (higher priority first) + inserted = False + for i, existing_task in enumerate(self._queue): + if task_obj.priority > existing_task.priority: + self._queue.insert(i, task_obj) + inserted = True + break + + if not inserted: + self._queue.append(task_obj) + + self._tasks[task_obj.task_id] = task_obj + self._stats.total_tasks += 1 + self._stats.pending_tasks += 1 + self._stats.queue_size = len(self._queue) + + if self.verbose: + logger.debug( + f"Added task '{task_obj.task_id}' to queue for agent '{self.agent_name}'" + ) + + return task_obj.task_id + + def get_task(self, task_id: str) -> Optional[Task]: + """ + Get a task by ID. + + Args: + task_id: The task ID + + Returns: + Task object or None if not found + """ + with self._lock: + return self._tasks.get(task_id) + + def cancel_task(self, task_id: str) -> bool: + """ + Cancel a task. + + Args: + task_id: The task ID to cancel + + Returns: + bool: True if task was cancelled, False if not found or already processed + """ + with self._lock: + if task_id not in self._tasks: + return False + + task = self._tasks[task_id] + if task.status in [ + TaskStatus.COMPLETED, + TaskStatus.FAILED, + TaskStatus.CANCELLED, + ]: + return False + + # Remove from queue if still pending + if task.status == TaskStatus.PENDING: + try: + self._queue.remove(task) + self._stats.pending_tasks -= 1 + self._stats.queue_size = len(self._queue) + except ValueError: + pass # Task not in queue + + # Mark as cancelled + task.status = TaskStatus.CANCELLED + self._processing_tasks.discard(task_id) + + if self.verbose: + logger.debug( + f"Cancelled task '{task_id}' for agent '{self.agent_name}'" + ) + + return True + + def start_workers(self) -> None: + """Start the background worker threads.""" + with self._lock: + if self._status != QueueStatus.STOPPED: + logger.warning( + f"Workers for agent '{self.agent_name}' are already running" + ) + return + + self._status = QueueStatus.RUNNING + self._stop_event.clear() + + for i in range(self.max_workers): + worker = threading.Thread( + target=self._worker_loop, + name=f"Worker-{self.agent_name}-{i}", + daemon=True, + ) + worker.start() + self._workers.append(worker) + + logger.info( + f"Started {self.max_workers} workers for agent '{self.agent_name}'" + ) + + def stop_workers(self) -> None: + """Stop the background worker threads.""" + with self._lock: + if self._status == QueueStatus.STOPPED: + return + + self._status = QueueStatus.STOPPED + self._stop_event.set() + + # Wait for workers to finish + for worker in self._workers: + worker.join(timeout=5.0) + + self._workers.clear() + logger.info( + f"Stopped workers for agent '{self.agent_name}'" + ) + + def pause_workers(self) -> None: + """Pause the workers (they will finish current tasks but not start new ones).""" + with self._lock: + if self._status == QueueStatus.RUNNING: + self._status = QueueStatus.PAUSED + logger.info( + f"Paused workers for agent '{self.agent_name}'" + ) + + def resume_workers(self) -> None: + """Resume the workers.""" + with self._lock: + if self._status == QueueStatus.PAUSED: + self._status = QueueStatus.RUNNING + logger.info( + f"Resumed workers for agent '{self.agent_name}'" + ) + + def clear_queue(self) -> int: + """ + Clear all pending tasks from the queue. + + Returns: + int: Number of tasks cleared + """ + with self._lock: + cleared_count = len(self._queue) + self._queue.clear() + self._stats.pending_tasks = 0 + self._stats.queue_size = 0 + + # Mark all pending tasks as cancelled + for task in self._tasks.values(): + if task.status == TaskStatus.PENDING: + task.status = TaskStatus.CANCELLED + + if self.verbose: + logger.debug( + f"Cleared {cleared_count} tasks from queue for agent '{self.agent_name}'" + ) + + return cleared_count + + def get_stats(self) -> QueueStats: + """Get current queue statistics.""" + with self._lock: + # Update current stats + self._stats.pending_tasks = len( + [ + t + for t in self._tasks.values() + if t.status == TaskStatus.PENDING + ] + ) + self._stats.processing_tasks = len(self._processing_tasks) + self._stats.queue_size = len(self._queue) + + # Calculate average processing time + if self._processing_times: + self._stats.average_processing_time = sum( + self._processing_times + ) / len(self._processing_times) + + return QueueStats( + total_tasks=self._stats.total_tasks, + completed_tasks=self._stats.completed_tasks, + failed_tasks=self._stats.failed_tasks, + pending_tasks=self._stats.pending_tasks, + processing_tasks=self._stats.processing_tasks, + average_processing_time=self._stats.average_processing_time, + queue_size=self._stats.queue_size, + ) + + def get_status(self) -> QueueStatus: + """Get current queue status.""" + return self._status + + def _worker_loop(self) -> None: + """Main worker loop for processing tasks.""" + while not self._stop_event.is_set(): + try: + # Check if we should process tasks + with self._lock: + if ( + self._status != QueueStatus.RUNNING + or not self._queue + ): + self._stop_event.wait(0.1) + continue + + # Get next task + task = self._queue.popleft() + self._processing_tasks.add(task.task_id) + task.status = TaskStatus.PROCESSING + self._stats.pending_tasks -= 1 + self._stats.processing_tasks += 1 + + # Process the task + self._process_task(task) + + except Exception as e: + logger.error( + f"Error in worker loop for agent '{self.agent_name}': {e}" + ) + if self.verbose: + logger.error(traceback.format_exc()) + time.sleep(0.1) + + def _process_task(self, task: Task) -> None: + """ + Process a single task. + + Args: + task: The task to process + """ + start_time = time.time() + + try: + if self.verbose: + logger.debug( + f"Processing task '{task.task_id}' for agent '{self.agent_name}'" + ) + + # Execute the agent + result = self.agent.run( + task=task.task, + img=task.img, + imgs=task.imgs, + correct_answer=task.correct_answer, + ) + + # Update task with result + task.result = result + task.status = TaskStatus.COMPLETED + + # Update statistics + processing_time = time.time() - start_time + self._processing_times.append(processing_time) + + with self._lock: + self._stats.completed_tasks += 1 + self._stats.processing_tasks -= 1 + self._processing_tasks.discard(task.task_id) + + if self.verbose: + logger.debug( + f"Completed task '{task.task_id}' in {processing_time:.2f}s" + ) + + except Exception as e: + error_msg = str(e) + task.error = error_msg + task.retry_count += 1 + + if self.verbose: + logger.error( + f"Error processing task '{task.task_id}': {error_msg}" + ) + logger.error(traceback.format_exc()) + + # Handle retries + if task.retry_count <= task.max_retries: + if self.verbose: + logger.debug( + f"Retrying task '{task.task_id}' (attempt {task.retry_count + 1})" + ) + + # Re-queue the task with a delay + time.sleep(self.retry_delay) + + with self._lock: + if self._status == QueueStatus.RUNNING: + task.status = TaskStatus.PENDING + self._queue.append( + task + ) # Add to end of queue + self._stats.pending_tasks += 1 + self._stats.queue_size = len(self._queue) + else: + task.status = TaskStatus.FAILED + self._stats.failed_tasks += 1 + else: + # Max retries exceeded + task.status = TaskStatus.FAILED + + with self._lock: + self._stats.failed_tasks += 1 + self._stats.processing_tasks -= 1 + self._processing_tasks.discard(task.task_id) + + if self.verbose: + logger.error( + f"Task '{task.task_id}' failed after {task.max_retries} retries" + ) + + @dataclass class AgentToolConfig: """ @@ -49,12 +555,15 @@ class AOP: 2. Deploy multiple agents as individual tools 3. Handle tool execution with proper error handling 4. Manage the MCP server lifecycle + 5. Queue-based task execution for improved performance and reliability Attributes: mcp_server: The FastMCP server instance agents: Dictionary mapping tool names to agent instances tool_configs: Dictionary mapping tool names to their configurations + task_queues: Dictionary mapping tool names to their task queues server_name: Name of the MCP server + queue_enabled: Whether queue-based execution is enabled """ def __init__( @@ -68,6 +577,11 @@ class AOP: traceback_enabled: bool = True, host: str = "localhost", log_level: str = "INFO", + queue_enabled: bool = True, + max_workers_per_agent: int = 1, + max_queue_size_per_agent: int = 1000, + processing_timeout: int = 30, + retry_delay: float = 1.0, *args, **kwargs, ): @@ -76,21 +590,36 @@ class AOP: Args: server_name: Name for the MCP server + description: Description of the AOP cluster agents: Optional list of agents to add initially port: Port for the MCP server transport: Transport type for the MCP server verbose: Enable verbose logging traceback_enabled: Enable traceback logging for errors + host: Host to bind the server to log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + queue_enabled: Enable queue-based task execution + max_workers_per_agent: Maximum number of workers per agent + max_queue_size_per_agent: Maximum queue size per agent + processing_timeout: Timeout for task processing in seconds + retry_delay: Delay between retries in seconds """ self.server_name = server_name + self.description = description self.verbose = verbose self.traceback_enabled = traceback_enabled self.log_level = log_level self.host = host self.port = port + self.queue_enabled = queue_enabled + self.max_workers_per_agent = max_workers_per_agent + self.max_queue_size_per_agent = max_queue_size_per_agent + self.processing_timeout = processing_timeout + self.retry_delay = retry_delay + self.agents: Dict[str, Agent] = {} self.tool_configs: Dict[str, AgentToolConfig] = {} + self.task_queues: Dict[str, TaskQueue] = {} self.transport = transport self.mcp_server = FastMCP( name=server_name, port=port, *args, **kwargs @@ -117,6 +646,10 @@ class AOP: # Register the agent discovery tool self._register_agent_discovery_tool() + # Register queue management tools if queue is enabled + if self.queue_enabled: + self._register_queue_management_tools() + def add_agent( self, agent: AgentType, @@ -242,6 +775,20 @@ class AOP: traceback_enabled=traceback_enabled, ) + # Create task queue if queue is enabled + if self.queue_enabled: + self.task_queues[tool_name] = TaskQueue( + agent_name=tool_name, + agent=agent, + max_workers=self.max_workers_per_agent, + max_queue_size=self.max_queue_size_per_agent, + processing_timeout=self.processing_timeout, + retry_delay=self.retry_delay, + verbose=verbose, + ) + # Start the queue workers + self.task_queues[tool_name].start_workers() + # Register the tool with the MCP server self._register_tool(tool_name, agent) @@ -249,7 +796,7 @@ class AOP: self._register_agent_discovery_tool() logger.info( - f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled})" + f"Added agent '{agent.agent_name}' as tool '{tool_name}' (verbose={verbose}, traceback={traceback_enabled}, queue_enabled={self.queue_enabled})" ) return tool_name @@ -378,6 +925,7 @@ class AOP: img: str = None, imgs: List[str] = None, correct_answer: str = None, + max_retries: int = None, ) -> Dict[str, Any]: """ Execute the agent with the provided parameters. @@ -387,7 +935,7 @@ class AOP: img: Optional image to be processed by the agent imgs: Optional list of images to be processed by the agent correct_answer: Optional correct answer for validation or comparison - **kwargs: Additional parameters passed to the agent + max_retries: Maximum number of retries (uses config default if None) Returns: Dict containing the agent's response and execution status @@ -426,31 +974,49 @@ class AOP: "error": error_msg, } - # Execute the agent with timeout and all parameters - result = self._execute_agent_with_timeout( - agent, - task, - config.timeout, - img, - imgs, - correct_answer, - ) - - if config.verbose and start_time: - execution_time = ( - asyncio.get_event_loop().time() - start_time - if asyncio.get_event_loop().is_running() - else 0 + # Use queue-based execution if enabled + if ( + self.queue_enabled + and tool_name in self.task_queues + ): + return self._execute_with_queue( + tool_name, + task, + img, + imgs, + correct_answer, + 0, + max_retries, + True, + config, ) - logger.debug( - f"Tool '{tool_name}' completed successfully in {execution_time:.2f}s" + else: + # Fallback to direct execution + result = self._execute_agent_with_timeout( + agent, + task, + config.timeout, + img, + imgs, + correct_answer, ) - return { - "result": str(result), - "success": True, - "error": None, - } + if config.verbose and start_time: + execution_time = ( + asyncio.get_event_loop().time() + - start_time + if asyncio.get_event_loop().is_running() + else 0 + ) + logger.debug( + f"Tool '{tool_name}' completed successfully in {execution_time:.2f}s" + ) + + return { + "result": str(result), + "success": True, + "error": None, + } except Exception as e: error_msg = str(e) @@ -478,6 +1044,133 @@ class AOP: "error": error_msg, } + def _execute_with_queue( + self, + tool_name: str, + task: str, + img: Optional[str], + imgs: Optional[List[str]], + correct_answer: Optional[str], + priority: int, + max_retries: Optional[int], + wait_for_completion: bool, + config: AgentToolConfig, + ) -> Dict[str, Any]: + """ + Execute a task using the queue system. + + Args: + tool_name: Name of the tool/agent + task: The task to execute + img: Optional image to process + imgs: Optional list of images to process + correct_answer: Optional correct answer for validation + priority: Task priority + max_retries: Maximum number of retries + wait_for_completion: Whether to wait for completion + config: Tool configuration + + Returns: + Dict containing the result or task information + """ + try: + # Use config max_retries if not specified + if max_retries is None: + max_retries = config.max_retries + + # Add task to queue + task_id = self.task_queues[tool_name].add_task( + task=task, + img=img, + imgs=imgs, + correct_answer=correct_answer, + priority=priority, + max_retries=max_retries, + ) + + if not wait_for_completion: + # Return task ID immediately + return { + "task_id": task_id, + "status": "queued", + "success": True, + "message": f"Task '{task_id}' queued for agent '{tool_name}'", + } + + # Wait for task completion + return self._wait_for_task_completion( + tool_name, task_id, config.timeout + ) + + except Exception as e: + error_msg = str(e) + logger.error( + f"Error adding task to queue for '{tool_name}': {error_msg}" + ) + return { + "result": "", + "success": False, + "error": error_msg, + } + + def _wait_for_task_completion( + self, tool_name: str, task_id: str, timeout: int + ) -> Dict[str, Any]: + """ + Wait for a task to complete. + + Args: + tool_name: Name of the tool/agent + task_id: ID of the task to wait for + timeout: Maximum time to wait in seconds + + Returns: + Dict containing the task result + """ + start_time = time.time() + + while time.time() - start_time < timeout: + task = self.task_queues[tool_name].get_task(task_id) + if not task: + return { + "result": "", + "success": False, + "error": f"Task '{task_id}' not found", + } + + if task.status == TaskStatus.COMPLETED: + return { + "result": task.result or "", + "success": True, + "error": None, + "task_id": task_id, + } + elif task.status == TaskStatus.FAILED: + return { + "result": "", + "success": False, + "error": task.error or "Task failed", + "task_id": task_id, + } + elif task.status == TaskStatus.CANCELLED: + return { + "result": "", + "success": False, + "error": "Task was cancelled", + "task_id": task_id, + } + + # Wait a bit before checking again + time.sleep(0.1) + + # Timeout reached + return { + "result": "", + "success": False, + "error": f"Task '{task_id}' timed out after {timeout} seconds", + "task_id": task_id, + } + def _execute_agent_with_timeout( self, agent: AgentType, @@ -545,6 +1238,11 @@ class AOP: bool: True if agent was removed, False if not found """ if tool_name in self.agents: + # Stop and remove task queue if it exists + if tool_name in self.task_queues: + self.task_queues[tool_name].stop_workers() + del self.task_queues[tool_name] + del self.agents[tool_name] del self.tool_configs[tool_name] logger.info(f"Removed agent tool '{tool_name}'") @@ -607,6 +1305,331 @@ class AOP: return info + def get_queue_stats( + self, tool_name: Optional[str] = None + ) -> Dict[str, Any]: + """ + Get queue statistics for agents. + + Args: + tool_name: Optional specific agent name. If None, returns stats for all agents. + + Returns: + Dict containing queue statistics + """ + if not self.queue_enabled: + return { + "success": False, + "error": "Queue system is not enabled", + "stats": {}, + } + + try: + if tool_name: + if tool_name not in self.task_queues: + return { + "success": False, + "error": f"Agent '{tool_name}' not found or has no queue", + "stats": {}, + } + + stats = self.task_queues[tool_name].get_stats() + return { + "success": True, + "agent_name": tool_name, + "stats": { + "total_tasks": stats.total_tasks, + "completed_tasks": stats.completed_tasks, + "failed_tasks": stats.failed_tasks, + "pending_tasks": stats.pending_tasks, + "processing_tasks": stats.processing_tasks, + "average_processing_time": stats.average_processing_time, + "queue_size": stats.queue_size, + "queue_status": self.task_queues[tool_name] + .get_status() + .value, + }, + } + else: + # Get stats for all agents + all_stats = {} + for name, queue in self.task_queues.items(): + stats = queue.get_stats() + all_stats[name] = { + "total_tasks": stats.total_tasks, + "completed_tasks": stats.completed_tasks, + "failed_tasks": stats.failed_tasks, + "pending_tasks": stats.pending_tasks, + "processing_tasks": stats.processing_tasks, + "average_processing_time": stats.average_processing_time, + "queue_size": stats.queue_size, + "queue_status": queue.get_status().value, + } + + return { + "success": True, + "stats": all_stats, + "total_agents": len(all_stats), + } + + except Exception as e: + error_msg = str(e) + logger.error(f"Error getting queue stats: {error_msg}") + return { + "success": False, + "error": error_msg, + "stats": {}, + } + + def pause_agent_queue(self, tool_name: str) -> bool: + """ + Pause the task queue for a specific agent. + + Args: + tool_name: Name of the agent tool + + Returns: + bool: True if paused successfully, False if not found + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return False + + if tool_name not in self.task_queues: + logger.warning( + f"Agent '{tool_name}' not found or has no queue" + ) + return False + + try: + self.task_queues[tool_name].pause_workers() + logger.info(f"Paused queue for agent '{tool_name}'") + return True + except Exception as e: + logger.error( + f"Error pausing queue for agent '{tool_name}': {e}" + ) + return False + + def resume_agent_queue(self, tool_name: str) -> bool: + """ + Resume the task queue for a specific agent. + + Args: + tool_name: Name of the agent tool + + Returns: + bool: True if resumed successfully, False if not found + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return False + + if tool_name not in self.task_queues: + logger.warning( + f"Agent '{tool_name}' not found or has no queue" + ) + return False + + try: + self.task_queues[tool_name].resume_workers() + logger.info(f"Resumed queue for agent '{tool_name}'") + return True + except Exception as e: + logger.error( + f"Error resuming queue for agent '{tool_name}': {e}" + ) + return False + + def clear_agent_queue(self, tool_name: str) -> int: + """ + Clear all pending tasks from an agent's queue. + + Args: + tool_name: Name of the agent tool + + Returns: + int: Number of tasks cleared, -1 if error + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return -1 + + if tool_name not in self.task_queues: + logger.warning( + f"Agent '{tool_name}' not found or has no queue" + ) + return -1 + + try: + cleared_count = self.task_queues[tool_name].clear_queue() + logger.info( + f"Cleared {cleared_count} tasks from queue for agent '{tool_name}'" + ) + return cleared_count + except Exception as e: + logger.error( + f"Error clearing queue for agent '{tool_name}': {e}" + ) + return -1 + + def get_task_status( + self, tool_name: str, task_id: str + ) -> Dict[str, Any]: + """ + Get the status of a specific task. + + Args: + tool_name: Name of the agent tool + task_id: ID of the task + + Returns: + Dict containing task status information + """ + if not self.queue_enabled: + return { + "success": False, + "error": "Queue system is not enabled", + "task": None, + } + + if tool_name not in self.task_queues: + return { + "success": False, + "error": f"Agent '{tool_name}' not found or has no queue", + "task": None, + } + + try: + task = self.task_queues[tool_name].get_task(task_id) + if not task: + return { + "success": False, + "error": f"Task '{task_id}' not found", + "task": None, + } + + return { + "success": True, + "task": { + "task_id": task.task_id, + "status": task.status.value, + "created_at": task.created_at, + "result": task.result, + "error": task.error, + "retry_count": task.retry_count, + "max_retries": task.max_retries, + "priority": task.priority, + }, + } + except Exception as e: + logger.error(f"Error getting task status: {e}") + return { + "success": False, + "error": str(e), + "task": None, + } + + def cancel_task(self, tool_name: str, task_id: str) -> bool: + """ + Cancel a specific task. + + Args: + tool_name: Name of the agent tool + task_id: ID of the task to cancel + + Returns: + bool: True if cancelled successfully, False otherwise + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return False + + if tool_name not in self.task_queues: + logger.warning( + f"Agent '{tool_name}' not found or has no queue" + ) + return False + + try: + success = self.task_queues[tool_name].cancel_task(task_id) + if success: + logger.info( + f"Cancelled task '{task_id}' for agent '{tool_name}'" + ) + else: + logger.warning( + f"Could not cancel task '{task_id}' for agent '{tool_name}'" + ) + return success + except Exception as e: + logger.error(f"Error cancelling task '{task_id}': {e}") + return False + + def pause_all_queues(self) -> Dict[str, bool]: + """ + Pause all agent queues. + + Returns: + Dict mapping agent names to success status + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return {} + + results = {} + for tool_name in self.task_queues.keys(): + results[tool_name] = self.pause_agent_queue(tool_name) + + logger.info( + f"Paused {sum(results.values())} out of {len(results)} agent queues" + ) + return results + + def resume_all_queues(self) -> Dict[str, bool]: + """ + Resume all agent queues. + + Returns: + Dict mapping agent names to success status + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return {} + + results = {} + for tool_name in self.task_queues.keys(): + results[tool_name] = self.resume_agent_queue(tool_name) + + logger.info( + f"Resumed {sum(results.values())} out of {len(results)} agent queues" + ) + return results + + def clear_all_queues(self) -> Dict[str, int]: + """ + Clear all agent queues. + + Returns: + Dict mapping agent names to number of tasks cleared + """ + if not self.queue_enabled: + logger.warning("Queue system is not enabled") + return {} + + results = {} + total_cleared = 0 + for tool_name in self.task_queues.keys(): + cleared = self.clear_agent_queue(tool_name) + results[tool_name] = cleared + if cleared > 0: + total_cleared += cleared + + logger.info( + f"Cleared {total_cleared} tasks from all agent queues" + ) + return results + def _register_agent_discovery_tool(self) -> None: """ Register the agent discovery tools that allow agents to learn about each other. @@ -911,6 +1934,192 @@ class AOP: "matching_agents": [], } + def _register_queue_management_tools(self) -> None: + """ + Register queue management tools for the MCP server. + """ + + @self.mcp_server.tool( + name="get_queue_stats", + description="Get queue statistics for agents including task counts, processing times, and queue status.", + ) + def get_queue_stats(agent_name: str = None) -> Dict[str, Any]: + """ + Get queue statistics for agents. + + Args: + agent_name: Optional specific agent name. If None, returns stats for all agents. + + Returns: + Dict containing queue statistics + """ + return self.get_queue_stats(agent_name) + + @self.mcp_server.tool( + name="pause_agent_queue", + description="Pause the task queue for a specific agent.", + ) + def pause_agent_queue(agent_name: str) -> Dict[str, Any]: + """ + Pause the task queue for a specific agent. + + Args: + agent_name: Name of the agent tool + + Returns: + Dict containing success status + """ + success = self.pause_agent_queue(agent_name) + return { + "success": success, + "message": f"Queue for agent '{agent_name}' {'paused' if success else 'not found or already paused'}", + } + + @self.mcp_server.tool( + name="resume_agent_queue", + description="Resume the task queue for a specific agent.", + ) + def resume_agent_queue(agent_name: str) -> Dict[str, Any]: + """ + Resume the task queue for a specific agent. + + Args: + agent_name: Name of the agent tool + + Returns: + Dict containing success status + """ + success = self.resume_agent_queue(agent_name) + return { + "success": success, + "message": f"Queue for agent '{agent_name}' {'resumed' if success else 'not found or already running'}", + } + + @self.mcp_server.tool( + name="clear_agent_queue", + description="Clear all pending tasks from an agent's queue.", + ) + def clear_agent_queue(agent_name: str) -> Dict[str, Any]: + """ + Clear all pending tasks from an agent's queue. + + Args: + agent_name: Name of the agent tool + + Returns: + Dict containing number of tasks cleared + """ + cleared_count = self.clear_agent_queue(agent_name) + return { + "success": cleared_count >= 0, + "cleared_tasks": cleared_count, + "message": ( + f"Cleared {cleared_count} tasks from queue for agent '{agent_name}'" + if cleared_count >= 0 + else f"Failed to clear queue for agent '{agent_name}'" + ), + } + + @self.mcp_server.tool( + name="get_task_status", + description="Get the status of a specific task by task ID.", + ) + def get_task_status( + agent_name: str, task_id: str + ) -> Dict[str, Any]: + """ + Get the status of a specific task. + + Args: + agent_name: Name of the agent tool + task_id: ID of the task + + Returns: + Dict containing task status information + """ + return self.get_task_status(agent_name, task_id) + + @self.mcp_server.tool( + name="cancel_task", + description="Cancel a specific task by task ID.", + ) + def cancel_task( + agent_name: str, task_id: str + ) -> Dict[str, Any]: + """ + Cancel a specific task. + + Args: + agent_name: Name of the agent tool + task_id: ID of the task to cancel + + Returns: + Dict containing success status + """ + success = self.cancel_task(agent_name, task_id) + return { + "success": success, + "message": f"Task '{task_id}' {'cancelled' if success else 'not found or already processed'}", + } + + @self.mcp_server.tool( + name="pause_all_queues", + description="Pause all agent queues.", + ) + def pause_all_queues() -> Dict[str, Any]: + """ + Pause all agent queues. + + Returns: + Dict containing results for each agent + """ + results = self.pause_all_queues() + return { + "success": True, + "results": results, + "total_agents": len(results), + "successful_pauses": sum(results.values()), + } + + @self.mcp_server.tool( + name="resume_all_queues", + description="Resume all agent queues.", + ) + def resume_all_queues() -> Dict[str, Any]: + """ + Resume all agent queues. + + Returns: + Dict containing results for each agent + """ + results = self.resume_all_queues() + return { + "success": True, + "results": results, + "total_agents": len(results), + "successful_resumes": sum(results.values()), + } + + @self.mcp_server.tool( + name="clear_all_queues", + description="Clear all agent queues.", + ) + def clear_all_queues() -> Dict[str, Any]: + """ + Clear all agent queues. + + Returns: + Dict containing results for each agent + """ + results = self.clear_all_queues() + total_cleared = sum(results.values()) + return { + "success": True, + "results": results, + "total_agents": len(results), + "total_cleared": total_cleared, + } + def _get_agent_discovery_info( self, tool_name: str ) -> Optional[Dict[str, Any]]: @@ -988,6 +2197,7 @@ class AOP: f"Log level: {self.log_level}\n" f"Verbose mode: {self.verbose}\n" f"Traceback enabled: {self.traceback_enabled}\n" + f"Queue enabled: {self.queue_enabled}\n" f"Available tools: {self.list_agents()}" ) @@ -998,6 +2208,7 @@ class AOP: f" - Host: {self.host}\n" f" - Port: {self.port}\n" f" - Transport: {self.transport}\n" + f" - Queue enabled: {self.queue_enabled}\n" f" - Total agents: {len(self.agents)}" ) for tool_name, config in self.tool_configs.items(): @@ -1005,7 +2216,36 @@ class AOP: f" - Tool '{tool_name}': timeout={config.timeout}s, verbose={config.verbose}, traceback={config.traceback_enabled}" ) - self.mcp_server.run(transport=self.transport) + if self.queue_enabled: + logger.debug( + f" - Max workers per agent: {self.max_workers_per_agent}" + ) + logger.debug( + f" - Max queue size per agent: {self.max_queue_size_per_agent}" + ) + logger.debug( + f" - Processing timeout: {self.processing_timeout}s" + ) + logger.debug(f" - Retry delay: {self.retry_delay}s") + + try: + self.mcp_server.run(transport=self.transport) + except KeyboardInterrupt: + logger.info("Server interrupted by user") + finally: + # Clean up queues when server stops + if self.queue_enabled: + logger.info("Stopping all agent queues...") + for tool_name in list(self.task_queues.keys()): + try: + self.task_queues[tool_name].stop_workers() + logger.debug( + f"Stopped queue for agent '{tool_name}'" + ) + except Exception as e: + logger.error( + f"Error stopping queue for agent '{tool_name}': {e}" + ) logger.info( f"MCP Server '{self.server_name}' is ready with {len(self.agents)} tools" @@ -1029,18 +2269,49 @@ class AOP: """ info = { "server_name": self.server_name, + "description": self.description, "total_tools": len(self.agents), "tools": self.list_agents(), "verbose": self.verbose, "traceback_enabled": self.traceback_enabled, "log_level": self.log_level, "transport": self.transport, + "queue_enabled": self.queue_enabled, "tool_details": { tool_name: self.get_agent_info(tool_name) for tool_name in self.agents.keys() }, } + # Add queue information if enabled + if self.queue_enabled: + info["queue_config"] = { + "max_workers_per_agent": self.max_workers_per_agent, + "max_queue_size_per_agent": self.max_queue_size_per_agent, + "processing_timeout": self.processing_timeout, + "retry_delay": self.retry_delay, + } + + # Add queue stats for each agent + queue_stats = {} + for tool_name in self.agents.keys(): + if tool_name in self.task_queues: + stats = self.task_queues[tool_name].get_stats() + queue_stats[tool_name] = { + "status": self.task_queues[tool_name] + .get_status() + .value, + "total_tasks": stats.total_tasks, + "completed_tasks": stats.completed_tasks, + "failed_tasks": stats.failed_tasks, + "pending_tasks": stats.pending_tasks, + "processing_tasks": stats.processing_tasks, + "average_processing_time": stats.average_processing_time, + "queue_size": stats.queue_size, + } + + info["queue_stats"] = queue_stats + if self.verbose: logger.debug(f"Retrieved server info: {info}") diff --git a/swarms/structs/ma_utils.py b/swarms/structs/ma_utils.py index 51980e35..3f0c8d6d 100644 --- a/swarms/structs/ma_utils.py +++ b/swarms/structs/ma_utils.py @@ -1,12 +1,13 @@ -from typing import Dict, List, Any, Optional, Union, Callable import random -from swarms.prompts.collaborative_prompts import ( - get_multi_agent_collaboration_prompt_one, -) from functools import lru_cache +from typing import Any, Callable, Dict, List, Optional, Union from loguru import logger +from swarms.prompts.collaborative_prompts import ( + get_multi_agent_collaboration_prompt_one, +) + def list_all_agents( agents: List[Union[Callable, Any]], @@ -131,11 +132,9 @@ def set_random_models_for_agents( return random.choice(model_names) if isinstance(agents, list): - return [ + for agent in agents: setattr(agent, "model_name", random.choice(model_names)) - or agent - for agent in agents - ] + return agents else: setattr(agents, "model_name", random.choice(model_names)) return agents