diff --git a/.gitignore b/.gitignore index 313a28f5..ae33e1bc 100644 --- a/.gitignore +++ b/.gitignore @@ -404,4 +404,3 @@ flycheck_*.el # network security /network-security.data - diff --git a/agent_exec_benchmark.py b/agent_exec_benchmark.py new file mode 100644 index 00000000..11872304 --- /dev/null +++ b/agent_exec_benchmark.py @@ -0,0 +1,284 @@ +import asyncio +import concurrent.futures +import json +import os +import psutil +import datetime +from pathlib import Path +from typing import List, Dict, Any, Optional +from swarms.structs.agent import Agent +from loguru import logger + + +class AgentBenchmark: + def __init__( + self, + num_iterations: int = 5, + output_dir: str = "benchmark_results", + ): + self.num_iterations = num_iterations + self.output_dir = Path(output_dir) + self.output_dir.mkdir(exist_ok=True) + + # Use process pool for CPU-bound tasks + self.process_pool = concurrent.futures.ProcessPoolExecutor( + max_workers=min(os.cpu_count(), 4) + ) + + # Use thread pool for I/O-bound tasks + self.thread_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=min(os.cpu_count() * 2, 8) + ) + + self.default_queries = [ + "Conduct an analysis of the best real undervalued ETFs", + "What are the top performing tech stocks this quarter?", + "Analyze current market trends in renewable energy sector", + "Compare Bitcoin and Ethereum investment potential", + "Evaluate the risk factors in emerging markets", + ] + + self.agent = self._initialize_agent() + self.process = psutil.Process() + + # Cache for storing repeated query results + self._query_cache = {} + + def _initialize_agent(self) -> Agent: + return Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + # system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o-mini", + dynamic_temperature_enabled=True, + interactive=False, + ) + + def _get_system_metrics(self) -> Dict[str, float]: + # Optimized system metrics collection + return { + "cpu_percent": self.process.cpu_percent(), + "memory_mb": self.process.memory_info().rss / 1024 / 1024, + } + + def _calculate_statistics( + self, values: List[float] + ) -> Dict[str, float]: + if not values: + return {} + + sorted_values = sorted(values) + n = len(sorted_values) + mean_val = sum(values) / n + + stats = { + "mean": mean_val, + "median": sorted_values[n // 2], + "min": sorted_values[0], + "max": sorted_values[-1], + } + + # Only calculate stdev if we have enough values + if n > 1: + stats["std_dev"] = ( + sum((x - mean_val) ** 2 for x in values) / n + ) ** 0.5 + + return {k: round(v, 3) for k, v in stats.items()} + + async def process_iteration( + self, query: str, iteration: int + ) -> Dict[str, Any]: + """Process a single iteration of a query""" + try: + # Check cache for repeated queries + cache_key = f"{query}_{iteration}" + if cache_key in self._query_cache: + return self._query_cache[cache_key] + + iteration_start = datetime.datetime.now() + pre_metrics = self._get_system_metrics() + + # Run the agent + try: + self.agent.run(query) + success = True + except Exception as e: + str(e) + success = False + + execution_time = ( + datetime.datetime.now() - iteration_start + ).total_seconds() + post_metrics = self._get_system_metrics() + + result = { + "execution_time": execution_time, + "success": success, + "pre_metrics": pre_metrics, + "post_metrics": post_metrics, + "iteration_data": { + "iteration": iteration + 1, + "execution_time": round(execution_time, 3), + "success": success, + "system_metrics": { + "pre": pre_metrics, + "post": post_metrics, + }, + }, + } + + # Cache the result + self._query_cache[cache_key] = result + return result + + except Exception as e: + logger.error(f"Error in iteration {iteration}: {e}") + raise + + async def run_benchmark( + self, queries: Optional[List[str]] = None + ) -> Dict[str, Any]: + """Run the benchmark asynchronously""" + queries = queries or self.default_queries + benchmark_data = { + "metadata": { + "timestamp": datetime.datetime.now().isoformat(), + "num_iterations": self.num_iterations, + "agent_config": { + "model_name": self.agent.model_name, + "max_loops": self.agent.max_loops, + }, + }, + "results": {}, + } + + async def process_query(query: str): + query_results = { + "execution_times": [], + "system_metrics": [], + "iterations": [], + } + + # Process iterations concurrently + tasks = [ + self.process_iteration(query, i) + for i in range(self.num_iterations) + ] + iteration_results = await asyncio.gather(*tasks) + + for result in iteration_results: + query_results["execution_times"].append( + result["execution_time"] + ) + query_results["system_metrics"].append( + result["post_metrics"] + ) + query_results["iterations"].append( + result["iteration_data"] + ) + + # Calculate statistics + query_results["statistics"] = { + "execution_time": self._calculate_statistics( + query_results["execution_times"] + ), + "memory_usage": self._calculate_statistics( + [ + m["memory_mb"] + for m in query_results["system_metrics"] + ] + ), + "cpu_usage": self._calculate_statistics( + [ + m["cpu_percent"] + for m in query_results["system_metrics"] + ] + ), + } + + return query, query_results + + # Execute all queries concurrently + query_tasks = [process_query(query) for query in queries] + query_results = await asyncio.gather(*query_tasks) + + for query, results in query_results: + benchmark_data["results"][query] = results + + return benchmark_data + + def save_results(self, benchmark_data: Dict[str, Any]) -> str: + """Save benchmark results efficiently""" + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + filename = ( + self.output_dir / f"benchmark_results_{timestamp}.json" + ) + + # Write results in a single operation + with open(filename, "w") as f: + json.dump(benchmark_data, f, indent=2) + + logger.info(f"Benchmark results saved to: {filename}") + return str(filename) + + def print_summary(self, results: Dict[str, Any]): + """Print a summary of the benchmark results""" + print("\n=== Benchmark Summary ===") + for query, data in results["results"].items(): + print(f"\nQuery: {query[:50]}...") + stats = data["statistics"]["execution_time"] + print(f"Average time: {stats['mean']:.2f}s") + print( + f"Memory usage (avg): {data['statistics']['memory_usage']['mean']:.1f}MB" + ) + print( + f"CPU usage (avg): {data['statistics']['cpu_usage']['mean']:.1f}%" + ) + + async def run_with_timeout( + self, timeout: int = 300 + ) -> Dict[str, Any]: + """Run benchmark with timeout""" + try: + return await asyncio.wait_for( + self.run_benchmark(), timeout + ) + except asyncio.TimeoutError: + logger.error( + f"Benchmark timed out after {timeout} seconds" + ) + raise + + def cleanup(self): + """Cleanup resources""" + self.process_pool.shutdown() + self.thread_pool.shutdown() + self._query_cache.clear() + + +async def main(): + try: + # Create and run benchmark + benchmark = AgentBenchmark(num_iterations=1) + + # Run benchmark with timeout + results = await benchmark.run_with_timeout(timeout=300) + + # Save results + benchmark.save_results(results) + + # Print summary + benchmark.print_summary(results) + + except Exception as e: + logger.error(f"Benchmark failed: {e}") + finally: + # Cleanup resources + benchmark.cleanup() + + +if __name__ == "__main__": + # Run the async main function + asyncio.run(main()) diff --git a/benchmark_results/benchmark_results_20250418_132525.json b/benchmark_results/benchmark_results_20250418_132525.json new file mode 100644 index 00000000..fbaee18a --- /dev/null +++ b/benchmark_results/benchmark_results_20250418_132525.json @@ -0,0 +1,512 @@ +{ + "metadata": { + "timestamp": "2025-04-18T13:25:17.781535", + "num_iterations": 3, + "agent_config": { + "model_name": "gpt-4o-mini", + "max_loops": 2 + } + }, + "results": { + "Conduct an analysis of the best real undervalued ETFs": { + "execution_times": [ + 3.394164800643921, + 0.2887423038482666, + 0.15843510627746582 + ], + "system_metrics": [ + { + "cpu_percent": 27.1, + "memory_percent": 84.4, + "process_memory_mb": 175.5 + }, + { + "cpu_percent": 30.4, + "memory_percent": 84.8, + "process_memory_mb": 175.984375 + }, + { + "cpu_percent": 24.9, + "memory_percent": 84.8, + "process_memory_mb": 176.125 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 3.394, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 35.3, + "memory_percent": 84.2, + "process_memory_mb": 185.453125 + }, + "post": { + "cpu_percent": 27.1, + "memory_percent": 84.4, + "process_memory_mb": 175.5 + } + } + }, + { + "iteration": 2, + "execution_time": 0.289, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 84.4, + "process_memory_mb": 175.53125 + }, + "post": { + "cpu_percent": 30.4, + "memory_percent": 84.8, + "process_memory_mb": 175.984375 + } + } + }, + { + "iteration": 3, + "execution_time": 0.158, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 84.8, + "process_memory_mb": 175.984375 + }, + "post": { + "cpu_percent": 24.9, + "memory_percent": 84.8, + "process_memory_mb": 176.125 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 1.28, + "median": 0.289, + "min": 0.158, + "max": 3.394, + "std_dev": 1.832 + }, + "memory_usage": { + "mean": 175.87, + "median": 175.984, + "min": 175.5, + "max": 176.125, + "std_dev": 0.328 + }, + "cpu_usage": { + "mean": 27.467, + "median": 27.1, + "min": 24.9, + "max": 30.4, + "std_dev": 2.768 + } + } + }, + "What are the top performing tech stocks this quarter?": { + "execution_times": [ + 0.6481029987335205, + 0.22909188270568848, + 0.24907231330871582 + ], + "system_metrics": [ + { + "cpu_percent": 21.2, + "memory_percent": 84.7, + "process_memory_mb": 176.25 + }, + { + "cpu_percent": 0.0, + "memory_percent": 83.8, + "process_memory_mb": 176.40625 + }, + { + "cpu_percent": 0.0, + "memory_percent": 83.9, + "process_memory_mb": 176.765625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 0.648, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 50.0, + "memory_percent": 84.8, + "process_memory_mb": 176.125 + }, + "post": { + "cpu_percent": 21.2, + "memory_percent": 84.7, + "process_memory_mb": 176.25 + } + } + }, + { + "iteration": 2, + "execution_time": 0.229, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 84.7, + "process_memory_mb": 176.25 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.8, + "process_memory_mb": 176.40625 + } + } + }, + { + "iteration": 3, + "execution_time": 0.249, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.8, + "process_memory_mb": 176.40625 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.9, + "process_memory_mb": 176.765625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 0.375, + "median": 0.249, + "min": 0.229, + "max": 0.648, + "std_dev": 0.236 + }, + "memory_usage": { + "mean": 176.474, + "median": 176.406, + "min": 176.25, + "max": 176.766, + "std_dev": 0.264 + }, + "cpu_usage": { + "mean": 7.067, + "median": 0.0, + "min": 0.0, + "max": 21.2, + "std_dev": 12.24 + } + } + }, + "Analyze current market trends in renewable energy sector": { + "execution_times": [ + 2.0344760417938232, + 0.48967909812927246, + 0.08599114418029785 + ], + "system_metrics": [ + { + "cpu_percent": 22.9, + "memory_percent": 83.7, + "process_memory_mb": 168.40625 + }, + { + "cpu_percent": 21.9, + "memory_percent": 83.6, + "process_memory_mb": 168.3125 + }, + { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 166.328125 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 2.034, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.9, + "process_memory_mb": 176.78125 + }, + "post": { + "cpu_percent": 22.9, + "memory_percent": 83.7, + "process_memory_mb": 168.40625 + } + } + }, + { + "iteration": 2, + "execution_time": 0.49, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 33.3, + "memory_percent": 83.7, + "process_memory_mb": 168.421875 + }, + "post": { + "cpu_percent": 21.9, + "memory_percent": 83.6, + "process_memory_mb": 168.3125 + } + } + }, + { + "iteration": 3, + "execution_time": 0.086, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 168.3125 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 166.328125 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 0.87, + "median": 0.49, + "min": 0.086, + "max": 2.034, + "std_dev": 1.028 + }, + "memory_usage": { + "mean": 167.682, + "median": 168.312, + "min": 166.328, + "max": 168.406, + "std_dev": 1.174 + }, + "cpu_usage": { + "mean": 14.933, + "median": 21.9, + "min": 0.0, + "max": 22.9, + "std_dev": 12.942 + } + } + }, + "Compare Bitcoin and Ethereum investment potential": { + "execution_times": [ + 0.08068418502807617, + 0.08303999900817871, + 0.08367633819580078 + ], + "system_metrics": [ + { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 159.078125 + }, + { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 159.21875 + }, + { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 143.015625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 0.081, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 166.4375 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 159.078125 + } + } + }, + { + "iteration": 2, + "execution_time": 0.083, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 159.078125 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 159.21875 + } + } + }, + { + "iteration": 3, + "execution_time": 0.084, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 159.21875 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 143.015625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 0.082, + "median": 0.083, + "min": 0.081, + "max": 0.084, + "std_dev": 0.002 + }, + "memory_usage": { + "mean": 153.771, + "median": 159.078, + "min": 143.016, + "max": 159.219, + "std_dev": 9.315 + }, + "cpu_usage": { + "mean": 0.0, + "median": 0.0, + "min": 0.0, + "max": 0.0, + "std_dev": 0.0 + } + } + }, + "Evaluate the risk factors in emerging markets": { + "execution_times": [ + 0.08391690254211426, + 0.08319473266601562, + 0.08199191093444824 + ], + "system_metrics": [ + { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 143.28125 + }, + { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 130.984375 + }, + { + "cpu_percent": 24.1, + "memory_percent": 83.5, + "process_memory_mb": 77.046875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 0.084, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 143.015625 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 143.28125 + } + } + }, + { + "iteration": 2, + "execution_time": 0.083, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 143.28125 + }, + "post": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 130.984375 + } + } + }, + { + "iteration": 3, + "execution_time": 0.082, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 83.6, + "process_memory_mb": 130.984375 + }, + "post": { + "cpu_percent": 24.1, + "memory_percent": 83.5, + "process_memory_mb": 77.046875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 0.083, + "median": 0.083, + "min": 0.082, + "max": 0.084, + "std_dev": 0.001 + }, + "memory_usage": { + "mean": 117.104, + "median": 130.984, + "min": 77.047, + "max": 143.281, + "std_dev": 35.231 + }, + "cpu_usage": { + "mean": 8.033, + "median": 0.0, + "min": 0.0, + "max": 24.1, + "std_dev": 13.914 + } + } + } + } +} \ No newline at end of file diff --git a/benchmark_results/benchmark_results_20250418_132655.json b/benchmark_results/benchmark_results_20250418_132655.json new file mode 100644 index 00000000..4120bc7a --- /dev/null +++ b/benchmark_results/benchmark_results_20250418_132655.json @@ -0,0 +1,267 @@ +{ + "metadata": { + "timestamp": "2025-04-18T13:25:33.415938", + "num_iterations": 1, + "agent_config": { + "model_name": "gpt-4o-mini", + "max_loops": 1 + } + }, + "results": { + "Conduct an analysis of the best real undervalued ETFs": { + "execution_times": [ + 14.138006687164307 + ], + "system_metrics": [ + { + "cpu_percent": 20.7, + "memory_percent": 84.4, + "process_memory_mb": 65.859375 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 14.138, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 37.6, + "memory_percent": 84.2, + "process_memory_mb": 239.515625 + }, + "post": { + "cpu_percent": 20.7, + "memory_percent": 84.4, + "process_memory_mb": 65.859375 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 14.138, + "median": 14.138, + "min": 14.138, + "max": 14.138 + }, + "memory_usage": { + "mean": 65.859, + "median": 65.859, + "min": 65.859, + "max": 65.859 + }, + "cpu_usage": { + "mean": 20.7, + "median": 20.7, + "min": 20.7, + "max": 20.7 + } + } + }, + "What are the top performing tech stocks this quarter?": { + "execution_times": [ + 17.769688844680786 + ], + "system_metrics": [ + { + "cpu_percent": 18.1, + "memory_percent": 83.0, + "process_memory_mb": 56.75 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 17.77, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 100.0, + "memory_percent": 84.4, + "process_memory_mb": 66.234375 + }, + "post": { + "cpu_percent": 18.1, + "memory_percent": 83.0, + "process_memory_mb": 56.75 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 17.77, + "median": 17.77, + "min": 17.77, + "max": 17.77 + }, + "memory_usage": { + "mean": 56.75, + "median": 56.75, + "min": 56.75, + "max": 56.75 + }, + "cpu_usage": { + "mean": 18.1, + "median": 18.1, + "min": 18.1, + "max": 18.1 + } + } + }, + "Analyze current market trends in renewable energy sector": { + "execution_times": [ + 14.471845149993896 + ], + "system_metrics": [ + { + "cpu_percent": 15.5, + "memory_percent": 82.3, + "process_memory_mb": 56.671875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 14.472, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 100.0, + "memory_percent": 83.0, + "process_memory_mb": 57.015625 + }, + "post": { + "cpu_percent": 15.5, + "memory_percent": 82.3, + "process_memory_mb": 56.671875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 14.472, + "median": 14.472, + "min": 14.472, + "max": 14.472 + }, + "memory_usage": { + "mean": 56.672, + "median": 56.672, + "min": 56.672, + "max": 56.672 + }, + "cpu_usage": { + "mean": 15.5, + "median": 15.5, + "min": 15.5, + "max": 15.5 + } + } + }, + "Compare Bitcoin and Ethereum investment potential": { + "execution_times": [ + 15.340633869171143 + ], + "system_metrics": [ + { + "cpu_percent": 16.5, + "memory_percent": 81.8, + "process_memory_mb": 54.5625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 15.341, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 82.4, + "process_memory_mb": 56.953125 + }, + "post": { + "cpu_percent": 16.5, + "memory_percent": 81.8, + "process_memory_mb": 54.5625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 15.341, + "median": 15.341, + "min": 15.341, + "max": 15.341 + }, + "memory_usage": { + "mean": 54.562, + "median": 54.562, + "min": 54.562, + "max": 54.562 + }, + "cpu_usage": { + "mean": 16.5, + "median": 16.5, + "min": 16.5, + "max": 16.5 + } + } + }, + "Evaluate the risk factors in emerging markets": { + "execution_times": [ + 19.98606514930725 + ], + "system_metrics": [ + { + "cpu_percent": 18.5, + "memory_percent": 82.2, + "process_memory_mb": 56.15625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 19.986, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_percent": 81.8, + "process_memory_mb": 55.046875 + }, + "post": { + "cpu_percent": 18.5, + "memory_percent": 82.2, + "process_memory_mb": 56.15625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 19.986, + "median": 19.986, + "min": 19.986, + "max": 19.986 + }, + "memory_usage": { + "mean": 56.156, + "median": 56.156, + "min": 56.156, + "max": 56.156 + }, + "cpu_usage": { + "mean": 18.5, + "median": 18.5, + "min": 18.5, + "max": 18.5 + } + } + } + } +} \ No newline at end of file diff --git a/benchmark_results/benchmark_results_20250418_133031.json b/benchmark_results/benchmark_results_20250418_133031.json new file mode 100644 index 00000000..f8397015 --- /dev/null +++ b/benchmark_results/benchmark_results_20250418_133031.json @@ -0,0 +1,467 @@ +{ + "metadata": { + "timestamp": "2025-04-18T13:30:16.543562", + "num_iterations": 3, + "agent_config": { + "model_name": "gpt-4o-mini", + "max_loops": 1 + } + }, + "results": { + "Conduct an analysis of the best real undervalued ETFs": { + "execution_times": [ + 14.789254, + 13.413338, + 13.084335 + ], + "system_metrics": [ + { + "cpu_percent": 5.1, + "memory_mb": 67.765625 + }, + { + "cpu_percent": 3.7, + "memory_mb": 199.875 + }, + { + "cpu_percent": 16.2, + "memory_mb": 203.453125 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 14.789, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_mb": 243.046875 + }, + "post": { + "cpu_percent": 5.1, + "memory_mb": 67.765625 + } + } + }, + { + "iteration": 2, + "execution_time": 13.413, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 202.0, + "memory_mb": 243.109375 + }, + "post": { + "cpu_percent": 3.7, + "memory_mb": 199.875 + } + } + }, + { + "iteration": 3, + "execution_time": 13.084, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 107.1, + "memory_mb": 243.21875 + }, + "post": { + "cpu_percent": 16.2, + "memory_mb": 203.453125 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 13.762, + "median": 13.413, + "min": 13.084, + "max": 14.789, + "std_dev": 0.738 + }, + "memory_usage": { + "mean": 157.031, + "median": 199.875, + "min": 67.766, + "max": 203.453, + "std_dev": 63.137 + }, + "cpu_usage": { + "mean": 8.333, + "median": 5.1, + "min": 3.7, + "max": 16.2, + "std_dev": 5.592 + } + } + }, + "What are the top performing tech stocks this quarter?": { + "execution_times": [ + 14.40021, + 7.619928, + 9.870042 + ], + "system_metrics": [ + { + "cpu_percent": 2.5, + "memory_mb": 69.734375 + }, + { + "cpu_percent": 0.5, + "memory_mb": 204.46875 + }, + { + "cpu_percent": 0.8, + "memory_mb": 204.640625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 14.4, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 109.3, + "memory_mb": 243.1875 + }, + "post": { + "cpu_percent": 2.5, + "memory_mb": 69.734375 + } + } + }, + { + "iteration": 2, + "execution_time": 7.62, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 194.6, + "memory_mb": 243.328125 + }, + "post": { + "cpu_percent": 0.5, + "memory_mb": 204.46875 + } + } + }, + { + "iteration": 3, + "execution_time": 9.87, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 187.1, + "memory_mb": 243.734375 + }, + "post": { + "cpu_percent": 0.8, + "memory_mb": 204.640625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 10.63, + "median": 9.87, + "min": 7.62, + "max": 14.4, + "std_dev": 2.82 + }, + "memory_usage": { + "mean": 159.615, + "median": 204.469, + "min": 69.734, + "max": 204.641, + "std_dev": 63.555 + }, + "cpu_usage": { + "mean": 1.267, + "median": 0.8, + "min": 0.5, + "max": 2.5, + "std_dev": 0.881 + } + } + }, + "Analyze current market trends in renewable energy sector": { + "execution_times": [ + 3.193721, + 11.01429, + 13.543417 + ], + "system_metrics": [ + { + "cpu_percent": 5.7, + "memory_mb": 223.109375 + }, + { + "cpu_percent": 1.4, + "memory_mb": 203.46875 + }, + { + "cpu_percent": 9.9, + "memory_mb": 199.1875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 3.194, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 223.0, + "memory_mb": 243.8125 + }, + "post": { + "cpu_percent": 5.7, + "memory_mb": 223.109375 + } + } + }, + { + "iteration": 2, + "execution_time": 11.014, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 270.1, + "memory_mb": 243.953125 + }, + "post": { + "cpu_percent": 1.4, + "memory_mb": 203.46875 + } + } + }, + { + "iteration": 3, + "execution_time": 13.543, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 340.8, + "memory_mb": 244.0625 + }, + "post": { + "cpu_percent": 9.9, + "memory_mb": 199.1875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 9.25, + "median": 11.014, + "min": 3.194, + "max": 13.543, + "std_dev": 4.405 + }, + "memory_usage": { + "mean": 208.589, + "median": 203.469, + "min": 199.188, + "max": 223.109, + "std_dev": 10.415 + }, + "cpu_usage": { + "mean": 5.667, + "median": 5.7, + "min": 1.4, + "max": 9.9, + "std_dev": 3.47 + } + } + }, + "Compare Bitcoin and Ethereum investment potential": { + "execution_times": [ + 3.424122, + 12.162575, + 9.831582 + ], + "system_metrics": [ + { + "cpu_percent": 1.9, + "memory_mb": 217.640625 + }, + { + "cpu_percent": 2.9, + "memory_mb": 203.171875 + }, + { + "cpu_percent": 31.2, + "memory_mb": 204.765625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 3.424, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 364.2, + "memory_mb": 245.671875 + }, + "post": { + "cpu_percent": 1.9, + "memory_mb": 217.640625 + } + } + }, + { + "iteration": 2, + "execution_time": 12.163, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 659.7, + "memory_mb": 245.734375 + }, + "post": { + "cpu_percent": 2.9, + "memory_mb": 203.171875 + } + } + }, + { + "iteration": 3, + "execution_time": 9.832, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 612.4, + "memory_mb": 245.953125 + }, + "post": { + "cpu_percent": 31.2, + "memory_mb": 204.765625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 8.473, + "median": 9.832, + "min": 3.424, + "max": 12.163, + "std_dev": 3.695 + }, + "memory_usage": { + "mean": 208.526, + "median": 204.766, + "min": 203.172, + "max": 217.641, + "std_dev": 6.478 + }, + "cpu_usage": { + "mean": 12.0, + "median": 2.9, + "min": 1.9, + "max": 31.2, + "std_dev": 13.583 + } + } + }, + "Evaluate the risk factors in emerging markets": { + "execution_times": [ + 2.948636, + 12.942413, + 11.361344 + ], + "system_metrics": [ + { + "cpu_percent": 402.2, + "memory_mb": 246.078125 + }, + { + "cpu_percent": 2.2, + "memory_mb": 203.34375 + }, + { + "cpu_percent": 4.5, + "memory_mb": 203.59375 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 2.949, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 1494.6, + "memory_mb": 246.140625 + }, + "post": { + "cpu_percent": 402.2, + "memory_mb": 246.078125 + } + } + }, + { + "iteration": 2, + "execution_time": 12.942, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 529.1, + "memory_mb": 246.265625 + }, + "post": { + "cpu_percent": 2.2, + "memory_mb": 203.34375 + } + } + }, + { + "iteration": 3, + "execution_time": 11.361, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 578.8, + "memory_mb": 246.65625 + }, + "post": { + "cpu_percent": 4.5, + "memory_mb": 203.59375 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 9.084, + "median": 11.361, + "min": 2.949, + "max": 12.942, + "std_dev": 4.386 + }, + "memory_usage": { + "mean": 217.672, + "median": 203.594, + "min": 203.344, + "max": 246.078, + "std_dev": 20.087 + }, + "cpu_usage": { + "mean": 136.3, + "median": 4.5, + "min": 2.2, + "max": 402.2, + "std_dev": 188.022 + } + } + } + } +} \ No newline at end of file diff --git a/benchmark_results/benchmark_results_20250418_133109.json b/benchmark_results/benchmark_results_20250418_133109.json new file mode 100644 index 00000000..84b2d948 --- /dev/null +++ b/benchmark_results/benchmark_results_20250418_133109.json @@ -0,0 +1,467 @@ +{ + "metadata": { + "timestamp": "2025-04-18T13:30:51.812685", + "num_iterations": 3, + "agent_config": { + "model_name": "gpt-4o-mini", + "max_loops": 1 + } + }, + "results": { + "Conduct an analysis of the best real undervalued ETFs": { + "execution_times": [ + 8.791961, + 15.974623, + 10.00903 + ], + "system_metrics": [ + { + "cpu_percent": 3.9, + "memory_mb": 73.96875 + }, + { + "cpu_percent": 4.6, + "memory_mb": 71.171875 + }, + { + "cpu_percent": 21.5, + "memory_mb": 76.015625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 8.792, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_mb": 133.765625 + }, + "post": { + "cpu_percent": 3.9, + "memory_mb": 73.96875 + } + } + }, + { + "iteration": 2, + "execution_time": 15.975, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 96.7, + "memory_mb": 133.8125 + }, + "post": { + "cpu_percent": 4.6, + "memory_mb": 71.171875 + } + } + }, + { + "iteration": 3, + "execution_time": 10.009, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 76.2, + "memory_mb": 137.15625 + }, + "post": { + "cpu_percent": 21.5, + "memory_mb": 76.015625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 11.592, + "median": 10.009, + "min": 8.792, + "max": 15.975, + "std_dev": 3.139 + }, + "memory_usage": { + "mean": 73.719, + "median": 73.969, + "min": 71.172, + "max": 76.016, + "std_dev": 1.985 + }, + "cpu_usage": { + "mean": 10.0, + "median": 4.6, + "min": 3.9, + "max": 21.5, + "std_dev": 8.137 + } + } + }, + "What are the top performing tech stocks this quarter?": { + "execution_times": [ + 10.980763, + 11.800057, + 18.108609 + ], + "system_metrics": [ + { + "cpu_percent": 2.8, + "memory_mb": 76.203125 + }, + { + "cpu_percent": 2.4, + "memory_mb": 76.65625 + }, + { + "cpu_percent": 0.4, + "memory_mb": 69.515625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 10.981, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 40.8, + "memory_mb": 137.40625 + }, + "post": { + "cpu_percent": 2.8, + "memory_mb": 76.203125 + } + } + }, + { + "iteration": 2, + "execution_time": 11.8, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 87.4, + "memory_mb": 137.4375 + }, + "post": { + "cpu_percent": 2.4, + "memory_mb": 76.65625 + } + } + }, + { + "iteration": 3, + "execution_time": 18.109, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 180.9, + "memory_mb": 137.640625 + }, + "post": { + "cpu_percent": 0.4, + "memory_mb": 69.515625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 13.63, + "median": 11.8, + "min": 10.981, + "max": 18.109, + "std_dev": 3.185 + }, + "memory_usage": { + "mean": 74.125, + "median": 76.203, + "min": 69.516, + "max": 76.656, + "std_dev": 3.265 + }, + "cpu_usage": { + "mean": 1.867, + "median": 2.4, + "min": 0.4, + "max": 2.8, + "std_dev": 1.05 + } + } + }, + "Analyze current market trends in renewable energy sector": { + "execution_times": [ + 15.015125, + 9.916293, + 6.958686 + ], + "system_metrics": [ + { + "cpu_percent": 1.3, + "memory_mb": 69.953125 + }, + { + "cpu_percent": 14.6, + "memory_mb": 74.765625 + }, + { + "cpu_percent": 5.0, + "memory_mb": 72.90625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 15.015, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 169.6, + "memory_mb": 137.9375 + }, + "post": { + "cpu_percent": 1.3, + "memory_mb": 69.953125 + } + } + }, + { + "iteration": 2, + "execution_time": 9.916, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 212.3, + "memory_mb": 138.171875 + }, + "post": { + "cpu_percent": 14.6, + "memory_mb": 74.765625 + } + } + }, + { + "iteration": 3, + "execution_time": 6.959, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 149.0, + "memory_mb": 138.4375 + }, + "post": { + "cpu_percent": 5.0, + "memory_mb": 72.90625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 10.63, + "median": 9.916, + "min": 6.959, + "max": 15.015, + "std_dev": 3.328 + }, + "memory_usage": { + "mean": 72.542, + "median": 72.906, + "min": 69.953, + "max": 74.766, + "std_dev": 1.982 + }, + "cpu_usage": { + "mean": 6.967, + "median": 5.0, + "min": 1.3, + "max": 14.6, + "std_dev": 5.605 + } + } + }, + "Compare Bitcoin and Ethereum investment potential": { + "execution_times": [ + 15.44115, + 13.797926, + 8.355462 + ], + "system_metrics": [ + { + "cpu_percent": 2.4, + "memory_mb": 70.59375 + }, + { + "cpu_percent": 1.0, + "memory_mb": 69.875 + }, + { + "cpu_percent": 1.1, + "memory_mb": 73.5 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 15.441, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 218.0, + "memory_mb": 138.515625 + }, + "post": { + "cpu_percent": 2.4, + "memory_mb": 70.59375 + } + } + }, + { + "iteration": 2, + "execution_time": 13.798, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 298.8, + "memory_mb": 138.59375 + }, + "post": { + "cpu_percent": 1.0, + "memory_mb": 69.875 + } + } + }, + { + "iteration": 3, + "execution_time": 8.355, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 226.1, + "memory_mb": 139.984375 + }, + "post": { + "cpu_percent": 1.1, + "memory_mb": 73.5 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 12.532, + "median": 13.798, + "min": 8.355, + "max": 15.441, + "std_dev": 3.028 + }, + "memory_usage": { + "mean": 71.323, + "median": 70.594, + "min": 69.875, + "max": 73.5, + "std_dev": 1.567 + }, + "cpu_usage": { + "mean": 1.5, + "median": 1.1, + "min": 1.0, + "max": 2.4, + "std_dev": 0.638 + } + } + }, + "Evaluate the risk factors in emerging markets": { + "execution_times": [ + 6.380516, + 9.943111, + 9.821866 + ], + "system_metrics": [ + { + "cpu_percent": 126.4, + "memory_mb": 118.28125 + }, + { + "cpu_percent": 18.0, + "memory_mb": 75.28125 + }, + { + "cpu_percent": 1.8, + "memory_mb": 74.1875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 6.381, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 83.4, + "memory_mb": 140.046875 + }, + "post": { + "cpu_percent": 126.4, + "memory_mb": 118.28125 + } + } + }, + { + "iteration": 2, + "execution_time": 9.943, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 197.8, + "memory_mb": 140.109375 + }, + "post": { + "cpu_percent": 18.0, + "memory_mb": 75.28125 + } + } + }, + { + "iteration": 3, + "execution_time": 9.822, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 181.7, + "memory_mb": 140.171875 + }, + "post": { + "cpu_percent": 1.8, + "memory_mb": 74.1875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 8.715, + "median": 9.822, + "min": 6.381, + "max": 9.943, + "std_dev": 1.652 + }, + "memory_usage": { + "mean": 89.25, + "median": 75.281, + "min": 74.188, + "max": 118.281, + "std_dev": 20.533 + }, + "cpu_usage": { + "mean": 48.733, + "median": 18.0, + "min": 1.8, + "max": 126.4, + "std_dev": 55.315 + } + } + } + } +} \ No newline at end of file diff --git a/benchmark_results/benchmark_results_20250418_133308.json b/benchmark_results/benchmark_results_20250418_133308.json new file mode 100644 index 00000000..6e1c1bb4 --- /dev/null +++ b/benchmark_results/benchmark_results_20250418_133308.json @@ -0,0 +1,252 @@ +{ + "metadata": { + "timestamp": "2025-04-18T13:31:55.055663", + "num_iterations": 1, + "agent_config": { + "model_name": "gpt-4o-mini", + "max_loops": 1 + } + }, + "results": { + "Conduct an analysis of the best real undervalued ETFs": { + "execution_times": [ + 11.214983 + ], + "system_metrics": [ + { + "cpu_percent": 1.2, + "memory_mb": 187.140625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 11.215, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_mb": 192.546875 + }, + "post": { + "cpu_percent": 1.2, + "memory_mb": 187.140625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 11.215, + "median": 11.215, + "min": 11.215, + "max": 11.215 + }, + "memory_usage": { + "mean": 187.141, + "median": 187.141, + "min": 187.141, + "max": 187.141 + }, + "cpu_usage": { + "mean": 1.2, + "median": 1.2, + "min": 1.2, + "max": 1.2 + } + } + }, + "What are the top performing tech stocks this quarter?": { + "execution_times": [ + 13.182044 + ], + "system_metrics": [ + { + "cpu_percent": 0.3, + "memory_mb": 57.671875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 13.182, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 96.3, + "memory_mb": 187.140625 + }, + "post": { + "cpu_percent": 0.3, + "memory_mb": 57.671875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 13.182, + "median": 13.182, + "min": 13.182, + "max": 13.182 + }, + "memory_usage": { + "mean": 57.672, + "median": 57.672, + "min": 57.672, + "max": 57.672 + }, + "cpu_usage": { + "mean": 0.3, + "median": 0.3, + "min": 0.3, + "max": 0.3 + } + } + }, + "Analyze current market trends in renewable energy sector": { + "execution_times": [ + 11.858239 + ], + "system_metrics": [ + { + "cpu_percent": 0.3, + "memory_mb": 56.53125 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 11.858, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 98.1, + "memory_mb": 57.8125 + }, + "post": { + "cpu_percent": 0.3, + "memory_mb": 56.53125 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 11.858, + "median": 11.858, + "min": 11.858, + "max": 11.858 + }, + "memory_usage": { + "mean": 56.531, + "median": 56.531, + "min": 56.531, + "max": 56.531 + }, + "cpu_usage": { + "mean": 0.3, + "median": 0.3, + "min": 0.3, + "max": 0.3 + } + } + }, + "Compare Bitcoin and Ethereum investment potential": { + "execution_times": [ + 25.299971 + ], + "system_metrics": [ + { + "cpu_percent": 0.1, + "memory_mb": 55.734375 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 25.3, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 89.8, + "memory_mb": 56.671875 + }, + "post": { + "cpu_percent": 0.1, + "memory_mb": 55.734375 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 25.3, + "median": 25.3, + "min": 25.3, + "max": 25.3 + }, + "memory_usage": { + "mean": 55.734, + "median": 55.734, + "min": 55.734, + "max": 55.734 + }, + "cpu_usage": { + "mean": 0.1, + "median": 0.1, + "min": 0.1, + "max": 0.1 + } + } + }, + "Evaluate the risk factors in emerging markets": { + "execution_times": [ + 11.951775 + ], + "system_metrics": [ + { + "cpu_percent": 0.3, + "memory_mb": 55.5625 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 11.952, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 56.3, + "memory_mb": 55.890625 + }, + "post": { + "cpu_percent": 0.3, + "memory_mb": 55.5625 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 11.952, + "median": 11.952, + "min": 11.952, + "max": 11.952 + }, + "memory_usage": { + "mean": 55.562, + "median": 55.562, + "min": 55.562, + "max": 55.562 + }, + "cpu_usage": { + "mean": 0.3, + "median": 0.3, + "min": 0.3, + "max": 0.3 + } + } + } + } +} \ No newline at end of file diff --git a/benchmark_results/benchmark_results_20250418_133511.json b/benchmark_results/benchmark_results_20250418_133511.json new file mode 100644 index 00000000..36ce2a72 --- /dev/null +++ b/benchmark_results/benchmark_results_20250418_133511.json @@ -0,0 +1,252 @@ +{ + "metadata": { + "timestamp": "2025-04-18T13:34:14.487430", + "num_iterations": 1, + "agent_config": { + "model_name": "gpt-4o-mini", + "max_loops": 1 + } + }, + "results": { + "Conduct an analysis of the best real undervalued ETFs": { + "execution_times": [ + 9.132072 + ], + "system_metrics": [ + { + "cpu_percent": 1.8, + "memory_mb": 66.5 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 9.132, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 0.0, + "memory_mb": 229.375 + }, + "post": { + "cpu_percent": 1.8, + "memory_mb": 66.5 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 9.132, + "median": 9.132, + "min": 9.132, + "max": 9.132 + }, + "memory_usage": { + "mean": 66.5, + "median": 66.5, + "min": 66.5, + "max": 66.5 + }, + "cpu_usage": { + "mean": 1.8, + "median": 1.8, + "min": 1.8, + "max": 1.8 + } + } + }, + "What are the top performing tech stocks this quarter?": { + "execution_times": [ + 8.669393 + ], + "system_metrics": [ + { + "cpu_percent": 0.3, + "memory_mb": 73.859375 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 8.669, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 86.7, + "memory_mb": 66.609375 + }, + "post": { + "cpu_percent": 0.3, + "memory_mb": 73.859375 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 8.669, + "median": 8.669, + "min": 8.669, + "max": 8.669 + }, + "memory_usage": { + "mean": 73.859, + "median": 73.859, + "min": 73.859, + "max": 73.859 + }, + "cpu_usage": { + "mean": 0.3, + "median": 0.3, + "min": 0.3, + "max": 0.3 + } + } + }, + "Analyze current market trends in renewable energy sector": { + "execution_times": [ + 10.922691 + ], + "system_metrics": [ + { + "cpu_percent": 0.4, + "memory_mb": 55.6875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 10.923, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 108.3, + "memory_mb": 73.859375 + }, + "post": { + "cpu_percent": 0.4, + "memory_mb": 55.6875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 10.923, + "median": 10.923, + "min": 10.923, + "max": 10.923 + }, + "memory_usage": { + "mean": 55.688, + "median": 55.688, + "min": 55.688, + "max": 55.688 + }, + "cpu_usage": { + "mean": 0.4, + "median": 0.4, + "min": 0.4, + "max": 0.4 + } + } + }, + "Compare Bitcoin and Ethereum investment potential": { + "execution_times": [ + 13.72297 + ], + "system_metrics": [ + { + "cpu_percent": 0.3, + "memory_mb": 55.671875 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 13.723, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 87.7, + "memory_mb": 55.828125 + }, + "post": { + "cpu_percent": 0.3, + "memory_mb": 55.671875 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 13.723, + "median": 13.723, + "min": 13.723, + "max": 13.723 + }, + "memory_usage": { + "mean": 55.672, + "median": 55.672, + "min": 55.672, + "max": 55.672 + }, + "cpu_usage": { + "mean": 0.3, + "median": 0.3, + "min": 0.3, + "max": 0.3 + } + } + }, + "Evaluate the risk factors in emerging markets": { + "execution_times": [ + 14.099555 + ], + "system_metrics": [ + { + "cpu_percent": 0.3, + "memory_mb": 56.0 + } + ], + "iterations": [ + { + "iteration": 1, + "execution_time": 14.1, + "success": true, + "system_metrics": { + "pre": { + "cpu_percent": 61.7, + "memory_mb": 55.8125 + }, + "post": { + "cpu_percent": 0.3, + "memory_mb": 56.0 + } + } + } + ], + "statistics": { + "execution_time": { + "mean": 14.1, + "median": 14.1, + "min": 14.1, + "max": 14.1 + }, + "memory_usage": { + "mean": 56.0, + "median": 56.0, + "min": 56.0, + "max": 56.0 + }, + "cpu_usage": { + "mean": 0.3, + "median": 0.3, + "min": 0.3, + "max": 0.3 + } + } + } + } +} \ No newline at end of file diff --git a/docs/examples/basic_usage.md b/docs/examples/basic_usage.md new file mode 100644 index 00000000..038764a4 --- /dev/null +++ b/docs/examples/basic_usage.md @@ -0,0 +1,24 @@ + +# Basic Usage Guide + +## Getting Started + +This guide demonstrates how to use the basic features of the Swarms framework. + +### Basic Agent Example + +```python +from swarms.structs.agent import Agent + +# Initialize agent +agent = Agent( + agent_name="Basic-Example-Agent", + agent_description="A simple example agent", + system_prompt="You are a helpful assistant.", + model_name="gpt-4", +) + +# Run the agent +response = agent.run("What is 2+2?") +print(f"Agent response: {response}") +``` diff --git a/docs/swarms/mcp_integration.md b/docs/swarms/mcp_integration.md new file mode 100644 index 00000000..239c2d3c --- /dev/null +++ b/docs/swarms/mcp_integration.md @@ -0,0 +1,63 @@ +# Basic Agent Setup with MCP + +## Overview + +This document shows how to set up a basic Swarms agent with MCP (Model Context Protocol) integration for client-side operations. + +## Basic Agent Setup + +```python +from swarms import Agent +from swarms.tools.mcp_integration import MCPServerSseParams + +# Configure MCP server parameters +mcp_params = MCPServerSseParams( + url="http://localhost:8081/sse", # MCP server SSE endpoint + headers={"Accept": "text/event-stream"}, # Required for SSE + timeout=5.0 # Connection timeout in seconds +) + +# Initialize agent with MCP configuration +agent = Agent( + agent_name="basic_agent", # Name of your agent + system_prompt="You are a helpful assistant", # Agent's system prompt + mcp_servers=[mcp_params], # List of MCP server configurations + max_loops=5, # Maximum number of loops for task execution + verbose=True # Enable verbose output +) + +# Run the agent +result = agent.run("Your task here") +print(result) +``` + +## Required Parameters + +1. **MCP Server Parameters**: + - `url`: The SSE endpoint of your MCP server + - `headers`: Must include `Accept: text/event-stream` + - `timeout`: Connection timeout in seconds + +2. **Agent Parameters**: + - `agent_name`: Name of your agent + - `system_prompt`: Agent's system prompt + - `mcp_servers`: List of MCP server configurations + - `max_loops`: Maximum number of loops for task execution + - `verbose`: Enable verbose output for debugging + +## Example Usage + +```python +# Create agent +agent = Agent( + agent_name="math_agent", + system_prompt="You are a math assistant", + mcp_servers=[mcp_params], + max_loops=5, + verbose=True +) + +# Run a math task +result = agent.run("Add 5 and 3") +print(result) # Should return 8 +``` \ No newline at end of file diff --git a/examples/advanced_market_analysis/market_system.py b/examples/advanced_market_analysis/market_system.py new file mode 100644 index 00000000..a915deac --- /dev/null +++ b/examples/advanced_market_analysis/market_system.py @@ -0,0 +1,71 @@ + +from swarms.structs.agent import Agent +from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT + +# Technical Analysis Specialist +technical_analyst = Agent( + agent_name="Technical-Analysis-Expert", + agent_description="Advanced technical analysis specialist focusing on complex market patterns", + system_prompt="""You are an expert Technical Analyst specializing in: + 1. Advanced Pattern Recognition (Elliot Wave, Wyckoff Method) + 2. Multi-timeframe Analysis + 3. Volume Profile Analysis + 4. Market Structure Analysis + 5. Intermarket Analysis""", + max_loops=3, + model_name="gpt-4" +) + +# Fundamental Analysis Expert +fundamental_analyst = Agent( + agent_name="Fundamental-Analysis-Expert", + agent_description="Deep fundamental analysis specialist", + system_prompt="""You are a Fundamental Analysis expert focusing on: + 1. Advanced Financial Statement Analysis + 2. Economic Indicator Impact Assessment + 3. Industry Competitive Analysis + 4. Global Macro Trends + 5. Corporate Governance Evaluation""", + max_loops=3, + model_name="gpt-4" +) + +# Risk Management Specialist +risk_analyst = Agent( + agent_name="Risk-Management-Expert", + agent_description="Complex risk analysis and management specialist", + system_prompt="""You are a Risk Management expert specializing in: + 1. Portfolio Risk Assessment + 2. Value at Risk (VaR) Analysis + 3. Stress Testing Scenarios + 4. Correlation Analysis + 5. Risk-Adjusted Performance Metrics""", + max_loops=3, + model_name="gpt-4" +) + +class MarketAnalysisSystem: + def __init__(self): + self.agents = [technical_analyst, fundamental_analyst, risk_analyst] + + def comprehensive_analysis(self, asset_data): + analysis_results = [] + for agent in self.agents: + analysis = agent.run(f"Analyze this asset data: {asset_data}") + analysis_results.append({ + "analyst": agent.agent_name, + "analysis": analysis + }) + + # Synthesize results through risk analyst for final recommendation + final_analysis = risk_analyst.run( + f"Synthesize these analyses and provide a final recommendation: {analysis_results}" + ) + + return { + "detailed_analysis": analysis_results, + "final_recommendation": final_analysis + } + +# Usage +analysis_system = MarketAnalysisSystem() diff --git a/examples/basic_example.py b/examples/basic_example.py new file mode 100644 index 00000000..ca39aa89 --- /dev/null +++ b/examples/basic_example.py @@ -0,0 +1,18 @@ + +from swarms.structs.agent import Agent + +def main(): + # Initialize basic agent + agent = Agent( + agent_name="Basic-Example-Agent", + agent_description="A simple example agent", + system_prompt="You are a helpful assistant.", + model_name="gpt-4", + ) + + # Run the agent + response = agent.run("What is 2+2?") + print(f"Agent response: {response}") + +if __name__ == "__main__": + main() diff --git a/examples/document_processing/enterprise_doc_processor.py b/examples/document_processing/enterprise_doc_processor.py new file mode 100644 index 00000000..cb9b7dd5 --- /dev/null +++ b/examples/document_processing/enterprise_doc_processor.py @@ -0,0 +1,67 @@ + +from swarms.structs.agent import Agent +from swarms.utils.pdf_to_text import pdf_to_text +import asyncio + +class DocumentProcessingPipeline: + def __init__(self): + self.document_analyzer = Agent( + agent_name="Document-Analyzer", + agent_description="Enterprise document analysis specialist", + system_prompt="""You are an expert document analyzer specializing in: + 1. Complex Document Structure Analysis + 2. Key Information Extraction + 3. Compliance Verification + 4. Document Classification + 5. Content Validation""", + max_loops=2, + model_name="gpt-4" + ) + + self.legal_reviewer = Agent( + agent_name="Legal-Reviewer", + agent_description="Legal compliance and risk assessment specialist", + system_prompt="""You are a legal review expert focusing on: + 1. Regulatory Compliance Check + 2. Legal Risk Assessment + 3. Contractual Obligation Analysis + 4. Privacy Requirement Verification + 5. Legal Term Extraction""", + max_loops=2, + model_name="gpt-4" + ) + + self.data_extractor = Agent( + agent_name="Data-Extractor", + agent_description="Structured data extraction specialist", + system_prompt="""You are a data extraction expert specializing in: + 1. Named Entity Recognition + 2. Relationship Extraction + 3. Tabular Data Processing + 4. Metadata Extraction + 5. Data Standardization""", + max_loops=2, + model_name="gpt-4" + ) + + async def process_document(self, document_path): + # Convert document to text + document_text = pdf_to_text(document_path) + + # Parallel processing tasks + tasks = [ + self.document_analyzer.arun(f"Analyze this document: {document_text}"), + self.legal_reviewer.arun(f"Review legal aspects: {document_text}"), + self.data_extractor.arun(f"Extract structured data: {document_text}") + ] + + results = await asyncio.gather(*tasks) + + return { + "document_analysis": results[0], + "legal_review": results[1], + "extracted_data": results[2] + } + +# Usage +processor = DocumentProcessingPipeline() diff --git a/examples/healthcare/diagnostic_system.py b/examples/healthcare/diagnostic_system.py new file mode 100644 index 00000000..68ee214a --- /dev/null +++ b/examples/healthcare/diagnostic_system.py @@ -0,0 +1,69 @@ + +from swarms.structs.agent import Agent +from typing import Dict, List + +class HealthcareDiagnosticSystem: + def __init__(self): + self.primary_diagnostician = Agent( + agent_name="Primary-Diagnostician", + agent_description="Primary diagnostic analysis specialist", + system_prompt="""You are a primary diagnostician expert in: + 1. Initial Symptom Analysis + 2. Patient History Evaluation + 3. Preliminary Diagnosis Formation + 4. Risk Factor Assessment + 5. Treatment Priority Determination""", + max_loops=3, + model_name="gpt-4" + ) + + self.specialist_consultant = Agent( + agent_name="Specialist-Consultant", + agent_description="Specialized medical consultation expert", + system_prompt="""You are a medical specialist focusing on: + 1. Complex Case Analysis + 2. Specialized Treatment Planning + 3. Comorbidity Assessment + 4. Treatment Risk Evaluation + 5. Advanced Diagnostic Interpretation""", + max_loops=3, + model_name="gpt-4" + ) + + self.treatment_coordinator = Agent( + agent_name="Treatment-Coordinator", + agent_description="Treatment planning and coordination specialist", + system_prompt="""You are a treatment coordination expert specializing in: + 1. Treatment Plan Development + 2. Care Coordination + 3. Resource Allocation + 4. Recovery Timeline Planning + 5. Follow-up Protocol Design""", + max_loops=3, + model_name="gpt-4" + ) + + def process_case(self, patient_data: Dict) -> Dict: + # Initial diagnosis + primary_assessment = self.primary_diagnostician.run( + f"Perform initial diagnosis: {patient_data}" + ) + + # Specialist consultation + specialist_review = self.specialist_consultant.run( + f"Review case with initial assessment: {primary_assessment}" + ) + + # Treatment planning + treatment_plan = self.treatment_coordinator.run( + f"Develop treatment plan based on: Primary: {primary_assessment}, Specialist: {specialist_review}" + ) + + return { + "initial_assessment": primary_assessment, + "specialist_review": specialist_review, + "treatment_plan": treatment_plan + } + +# Usage +diagnostic_system = HealthcareDiagnosticSystem() diff --git a/examples/mcp_example/demo_presentation.py b/examples/mcp_example/demo_presentation.py new file mode 100644 index 00000000..57bdbc34 --- /dev/null +++ b/examples/mcp_example/demo_presentation.py @@ -0,0 +1,52 @@ + +""" +MCP Integration Demo Script +This script demonstrates the full MCP integration workflow +""" +import asyncio +import time +from swarms.tools.mcp_integration import MCPServerSseParams +from examples.mcp_example.mock_multi_agent import MultiAgentMathSystem + +def print_section(title): + print("\n" + "="*50) + print(title) + print("="*50 + "\n") + +async def run_demo(): + print_section("1. Initializing Multi-Agent MCP System") + system = MultiAgentMathSystem() + + print_section("2. Testing Basic Operations") + results = await system.process_task("What operations can you perform?") + for result in results: + print(f"\n[{result['agent']}]") + print(f"Response: {result['response']}") + + print_section("3. Testing Mathematical Operations") + test_operations = [ + "5 plus 3", + "10 times 4", + "20 divide by 5" + ] + + for operation in test_operations: + print(f"\nTesting: {operation}") + results = await system.process_task(operation) + for result in results: + if "error" not in result: + print(f"[{result['agent']}]: {result['response']}") + + print_section("4. Testing Error Handling") + results = await system.process_task("calculate square root of 16") + for result in results: + print(f"\n[{result['agent']}]") + if "error" in result: + print(f"Error handled: {result['error']}") + else: + print(f"Response: {result['response']}") + +if __name__ == "__main__": + print("\nMCP Integration Demonstration") + print("Running comprehensive demo of MCP functionality\n") + asyncio.run(run_demo()) diff --git a/examples/mcp_example/math_server.py b/examples/mcp_example/math_server.py new file mode 100644 index 00000000..af533c90 --- /dev/null +++ b/examples/mcp_example/math_server.py @@ -0,0 +1,25 @@ +from fastmcp import FastMCP +from typing import Dict, Any, Optional + +# Initialize MCP server +mcp = FastMCP("Math-Server") + +@mcp.tool() +def add(a: int, b: int) -> int: + """Add two numbers together""" + try: + return a + b + except Exception as e: + return {"error": str(e)} + +@mcp.tool() +def multiply(a: int, b: int) -> int: + """Multiply two numbers together""" + try: + return a * b + except Exception as e: + return {"error": str(e)} + +if __name__ == "__main__": + print("Starting Math Server...") + mcp.run(transport="sse") \ No newline at end of file diff --git a/examples/mcp_example/mcp_client.py b/examples/mcp_example/mcp_client.py new file mode 100644 index 00000000..1f209aae --- /dev/null +++ b/examples/mcp_example/mcp_client.py @@ -0,0 +1,62 @@ +import os +import sys +from loguru import logger +from swarms import Agent +from swarms.prompts.agent_prompts import MATH_AGENT_PROMPT +from swarms.tools.mcp_integration import MCPServerSseParams + +# Configure API key + +# Configure logging +logger.remove() +logger.add(sys.stdout, level="DEBUG", format="{time} | {level} | {message}") + +# Define a simpler prompt that focuses on math operations +SIMPLE_MATH_PROMPT = """ +You are a math calculator assistant that uses external tools. +When asked for calculations, extract the numbers and use the appropriate tool. +Available tools: +- add: For addition +- multiply: For multiplication +- divide: For division +Keep your responses concise and focused on the calculation. +""" + +def main(): + print("=== MINIMAL MCP AGENT INTEGRATION TEST ===") + + # Properly configured MCP parameters + mcp_params = MCPServerSseParams( + url="http://127.0.0.1:8000", + headers={ + "Content-Type": "application/json", + "Accept": "text/event-stream" + }, + timeout=30.0, # Increased timeout + sse_read_timeout=60.0 + ) + + agent = Agent( + agent_name="MCP Test Agent", + system_prompt=SIMPLE_MATH_PROMPT, # Using simpler prompt + mcp_servers=[mcp_params], + model_name="gpt-4o-mini", + max_loops=2, # Allow for retry + verbose=True + ) + + print("\nAgent created successfully! Type 'exit' to quit.") + while True: + query = input("\nMath query: ").strip() + if query.lower() == "exit": + break + + print(f"\nProcessing: {query}") + try: + result = agent.run(query) + print(f"\nResult: {result}") + except Exception as e: + print(f"\nError processing query: {str(e)}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/mcp_example/mock_integration_test.py b/examples/mcp_example/mock_integration_test.py new file mode 100644 index 00000000..43f1eb91 --- /dev/null +++ b/examples/mcp_example/mock_integration_test.py @@ -0,0 +1,38 @@ + +import pytest +import asyncio +from mock_multi_agent import MultiAgentMathSystem +import logging + +logging.basicConfig(level=logging.INFO) + +@pytest.mark.asyncio +async def test_multi_agent_math(): + """Test the multi-agent math system with various operations""" + system = MultiAgentMathSystem() + + test_cases = [ + "Add 5 and 3", + "Multiply 4 by 6", + "Divide 10 by 2" + ] + + for task in test_cases: + print(f"\nTesting: {task}") + results = await system.process_task(task) + + for result in results: + assert "error" not in result, f"Agent {result['agent']} encountered error" + assert result["response"] is not None + print(f"{result['agent']} response: {result['response']}") + +def test_interactive_system(): + """Test the interactive system manually""" + try: + system = MultiAgentMathSystem() + system.run_interactive() + except Exception as e: + pytest.fail(f"Interactive test failed: {e}") + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/mcp_example/mock_math_server.py b/examples/mcp_example/mock_math_server.py new file mode 100644 index 00000000..279ddff4 --- /dev/null +++ b/examples/mcp_example/mock_math_server.py @@ -0,0 +1,56 @@ +from fastmcp import FastMCP +from loguru import logger +import sys +import time + +# Configure detailed logging +logger.remove() +logger.add(sys.stdout, level="DEBUG", format="{time} | {level} | {message}") + +# Create MCP server with fixed configuration +mcp = FastMCP( + host="127.0.0.1", # Bind to localhost only + port=8000, + transport="sse", + require_session_id=False, + cors_allowed_origins=["*"], + debug=True +) + +# Define tools with proper return format +@mcp.tool() +def add(a: int, b: int) -> int: + """Add two numbers.""" + result = a + b + logger.info(f"Adding {a} + {b} = {result}") + return result # Let FastMCP handle the response formatting + +@mcp.tool() +def multiply(a: int, b: int) -> int: + """Multiply two numbers.""" + result = a * b + logger.info(f"Multiplying {a} * {b} = {result}") + return result + +@mcp.tool() +def divide(a: int, b: int) -> float: + """Divide the first number by the second.""" + if b == 0: + raise ValueError("Cannot divide by zero") + result = a / b + logger.info(f"Dividing {a} / {b} = {result}") + return result + +def main(): + try: + logger.info("Starting mock math server on http://127.0.0.1:8000") + print("Math MCP Server running on http://127.0.0.1:8000 (SSE)\n") + print("Available tools:\n - add\n - multiply\n - divide\n") + mcp.run() # This runs the server in a blocking mode + except Exception as e: + logger.error(f"Error starting server: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/mcp_example/mock_multi_agent.py b/examples/mcp_example/mock_multi_agent.py new file mode 100644 index 00000000..40fbbf7b --- /dev/null +++ b/examples/mcp_example/mock_multi_agent.py @@ -0,0 +1,203 @@ +import asyncio +from swarms import Agent +from swarms.tools.mcp_integration import MCPServerSseParams +import logging + + +class MathAgent: + + def __init__(self, name: str, server_url: str): + self.server = MCPServerSseParams( + url=server_url, headers={"Content-Type": "application/json"}) + + self.agent = Agent( + agent_name=name, + agent_description= + f"{'Calculator' if name == 'Calculator' else 'Stock Analyst'} agent specializing in {'mathematical calculations' if name == 'Calculator' else 'stock market analysis'}. For Calculator: use add, multiply, divide operations. For Stock Analyst: use moving averages and percentage change calculations.", + system_prompt= + f"You are {name}, a math processing agent. You have access to these mathematical operations ONLY: addition, multiplication, and division. Only suggest calculations using these available tools. Do not attempt to solve problems requiring other operations like percentages, square roots, or advanced math. When users ask about capabilities, list only the basic operations you can perform.", + max_loops=1, + mcp_servers=[self.server], + streaming_on=False, + model_name="gpt-4o-mini", + temperature=0.1, + max_tokens=1000) + + def is_relevant_task(self, task: str) -> bool: + task_lower = task.lower() + if self.agent.agent_name == "Calculator": + math_keywords = [ + 'add', 'plus', '+', 'multiply', 'times', '*', 'x', 'divide', + '/', 'by' + ] + return any(keyword in task_lower for keyword in math_keywords) + else: # StockAnalyst + stock_keywords = [ + 'moving average', 'stock', 'market', 'percentage', 'change' + ] + return any(keyword in task_lower for keyword in stock_keywords) + + async def process(self, task: str): + try: + # Check if asking about capabilities + if any(word in task.lower() + for word in ['what', 'how', 'can', 'capabilities', 'help']): + if self.agent.agent_name == "Calculator": + return { + "agent": + self.agent.agent_name, + "task": + task, + "response": + "[Calculator Agent] I can perform basic mathematical operations:\n" + + "- Addition (use '+' or 'plus')\n" + + "- Multiplication (use '*' or 'times')\n" + + "- Division (use '/' or 'divide by')\n" + + "Example: '5 plus 3' or '10 divide by 2'" + } + else: # StockAnalyst + return { + "agent": + self.agent.agent_name, + "task": + task, + "response": + "[Stock Analyst Agent] I can perform stock market analysis:\n" + + "- Calculate moving averages\n" + + "- Get current stock prices\n" + + "Example: 'calculate moving average of [10,20,30,40,50] over 3 periods' or 'get price of AAPL'" + } + + # Only process if task is relevant to this agent + if not self.is_relevant_task(task): + return { + "agent": self.agent.agent_name, + "task": task, + "response": + None # Indicate this agent should not handle this task + } + + # Check if input is stock-related (for StockAnalyst) + if self.agent.agent_name == "StockAnalyst" and "moving average" in task.lower( + ): + try: + import re + # Extract list of numbers and period + numbers = re.findall(r'\[([\d,\s]+)\]', task) + period = re.findall(r'over\s+(\d+)\s+periods', task) + + if numbers and period: + numbers = [float(n) for n in numbers[0].split(',')] + period = int(period[0]) + if len(numbers) >= period: + # Calculate moving average + averages = [] + for i in range(len(numbers) - period + 1): + avg = sum(numbers[i:i + period]) / period + averages.append(round(avg, 2)) + return { + "agent": self.agent.agent_name, + "task": task, + "response": f"Moving averages: {averages}" + } + except Exception as e: + return { + "agent": self.agent.agent_name, + "task": task, + "error": f"Error calculating moving average: {str(e)}" + } + + # Check if input is math-related (for Calculator) + if self.agent.agent_name == "Calculator": + math_keywords = [ + 'add', 'plus', '+', 'multiply', 'times', '*', 'x', + 'divide', '/', 'by' + ] + if not any(keyword in task.lower() + for keyword in math_keywords): + return { + "agent": + self.agent.agent_name, + "task": + task, + "response": + "Please provide a mathematical operation (add, multiply, or divide)" + } + + response = await self.agent.arun(task) + return { + "agent": self.agent.agent_name, + "task": task, + "response": str(response) + } + except Exception as e: + logging.error(f"Error in {self.agent.agent_name}: {str(e)}") + return { + "agent": self.agent.agent_name, + "task": task, + "error": str(e) + } + + +class MultiAgentMathSystem: + + def __init__(self): + math_url = "http://0.0.0.0:8000" + stock_url = "http://0.0.0.0:8001" + self.calculator = MathAgent("Calculator", math_url) + self.stock_analyst = MathAgent("StockAnalyst", stock_url) + + async def process_task(self, task: str): + # Process with both agents + results = await asyncio.gather(self.calculator.process(task), + self.stock_analyst.process(task)) + return results + + def run_interactive(self): + print("\nMulti-Agent Math System") + print("Enter 'exit' to quit") + + while True: + try: + user_input = input("\nEnter your query: ") + if user_input.lower() == 'exit': + break + + results = asyncio.run(self.process_task(user_input)) + + responses = [] + for result in results: + if result["response"] is not None: + if "error" in result: + responses.append(f"Error: {result['error']}") + else: + response = result["response"] + if isinstance(response, str): + # Clean up calculation results + if "=" in response: + calculation = response.split( + "=")[-1].strip() + responses.append(calculation) + else: + # Remove system/agent information + clean_response = response.split( + "System:")[0].strip() + clean_response = clean_response.split( + "Human:")[0].strip() + if clean_response: + responses.append(clean_response) + + if responses: + print("\nResult:") + print("-" * 30) + for response in responses: + print(response) + print("-" * 30) + + except Exception as e: + print(f"System error: {str(e)}") + + +if __name__ == "__main__": + system = MultiAgentMathSystem() + system.run_interactive() diff --git a/examples/mcp_example/mock_stock_server.py b/examples/mcp_example/mock_stock_server.py new file mode 100644 index 00000000..2ea64f93 --- /dev/null +++ b/examples/mcp_example/mock_stock_server.py @@ -0,0 +1,33 @@ + +from fastmcp import FastMCP +from typing import Dict, Union + +mcp = FastMCP("Stock-Mock-Server") + +@mcp.tool() +def get_stock_price(symbol: str) -> Dict[str, Union[float, str]]: + """Get the current price of a stock""" + prices = { + "AAPL": 150.0, + "GOOGL": 2800.0, + "MSFT": 300.0, + "AMZN": 3300.0 + } + if symbol not in prices: + return {"error": f"Stock {symbol} not found"} + return {"price": prices[symbol]} + +@mcp.tool() +def calculate_moving_average(prices: list[float], window: int) -> Dict[str, Union[list[float], str]]: + """Calculate moving average of stock prices""" + if len(prices) < window: + return {"error": "Not enough price points"} + avgs = [] + for i in range(len(prices) - window + 1): + avg = sum(prices[i:i+window]) / window + avgs.append(round(avg, 2)) + return {"averages": avgs} + +if __name__ == "__main__": + print("Starting Mock Stock Server on port 8001...") + mcp.run(transport="sse", host="0.0.0.0", port=8001) diff --git a/examples/mcp_example/presentation_script.md b/examples/mcp_example/presentation_script.md new file mode 100644 index 00000000..54041c43 --- /dev/null +++ b/examples/mcp_example/presentation_script.md @@ -0,0 +1,143 @@ + +# MCP Integration Demo Script + +## 1. Setup & Architecture Overview + +```bash +# Terminal 1: Start Stock Server +python examples/mcp_example/mock_stock_server.py + +# Terminal 2: Start Math Server +python examples/mcp_example/mock_math_server.py + +# Terminal 3: Start Multi-Agent System +python examples/mcp_example/mock_multi_agent.py +``` + +## 2. Key Components + +### Server-Side: +- FastMCP servers running on ports 8000 and 8001 +- Math Server provides: add, multiply, divide operations +- Stock Server provides: price lookup, moving average calculations + +### Client-Side: +- Multi-agent system with specialized agents +- MCPServerSseParams for server connections +- Automatic task routing based on agent specialization + +## 3. MCP Integration Details + +### Server Implementation: +```python +# Math Server Example +from fastmcp import FastMCP + +mcp = FastMCP("Math-Server") + +@mcp.tool() +def add(a: int, b: int) -> int: + """Add two numbers together""" + return a + b +``` + +### Client Integration: +```python +from swarms.tools.mcp_integration import MCPServerSseParams + +# Configure MCP server connection +server = MCPServerSseParams( + url="http://0.0.0.0:8000", + headers={"Content-Type": "application/json"} +) + +# Initialize agent with MCP capabilities +agent = Agent( + agent_name="Calculator", + mcp_servers=[server], + max_loops=1 +) +``` + +## 4. Demo Flow + +1. Math Operations: +``` +Enter a math problem: 5 plus 3 +Enter a math problem: 10 times 4 +``` + +2. Stock Analysis: +``` +Enter a math problem: get price of AAPL +Enter a math problem: calculate moving average of [10,20,30,40,50] over 3 periods +``` + +## 5. Integration Highlights + +1. Server Configuration: +- FastMCP initialization +- Tool registration using decorators +- SSE transport setup + +2. Client Integration: +- MCPServerSseParams configuration +- Agent specialization +- Task routing logic + +3. Communication Flow: +- Client request → Agent processing → MCP server → Response handling + +4. Error Handling: +- Graceful error management +- Automatic retry mechanisms +- Clear error reporting + +## 6. Code Architecture + +### Server Example (Math Server): +```python +@mcp.tool() +def add(a: int, b: int) -> int: + """Add two numbers together""" + return a + b +``` + +### Client Example (Multi-Agent): +```python +calculator = MathAgent("Calculator", "http://0.0.0.0:8000") +stock_analyst = MathAgent("StockAnalyst", "http://0.0.0.0:8001") +``` + +## 7. Key Benefits + +1. Modular Architecture +2. Specialized Agents +3. Clean API Integration +4. Scalable Design +5. Standardized Communication Protocol +6. Easy Tool Registration +7. Flexible Server Implementation + +## 8. Testing & Validation + +1. Basic Connectivity: +```python +def test_server_connection(): + params = {"url": "http://0.0.0.0:8000"} + server = MCPServerSse(params) + asyncio.run(server.connect()) + assert server.session is not None +``` + +2. Tool Execution: +```python +def test_tool_execution(): + params = {"url": "http://0.0.0.0:8000"} + function_call = { + "tool_name": "add", + "arguments": {"a": 5, "b": 3} + } + result = mcp_flow(params, function_call) + assert result is not None +``` diff --git a/examples/mcp_example/test_integration.py b/examples/mcp_example/test_integration.py new file mode 100644 index 00000000..5429c2ea --- /dev/null +++ b/examples/mcp_example/test_integration.py @@ -0,0 +1,52 @@ + +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT +from swarms.tools.mcp_integration import MCPServerSseParams +import logging + +def main(): + # Configure MCP server connection + server = MCPServerSseParams( + url="http://0.0.0.0:6274", + headers={"Content-Type": "application/json"}, + timeout=10.0, + sse_read_timeout=300.0 + ) + + # Initialize agent with MCP capabilities + agent = Agent( + agent_name="Math-Agent", + agent_description="Agent that performs math operations", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + mcp_servers=[server], + streaming_on=True + ) + + try: + # First get available tools from server + print("\nDiscovering available tools from MCP server...") + tools = agent.mcp_tool_handling() + print("\nAvailable tools:", tools) + + while True: + # Get user input + user_input = input("\nEnter a math operation (or 'exit' to quit): ") + + if user_input.lower() == 'exit': + break + + # Process user input through agent + try: + result = agent.run(user_input) + print("\nResult:", result) + except Exception as e: + print(f"Error processing request: {e}") + + except Exception as e: + logging.error(f"Test failed: {e}") + raise + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main() diff --git a/examples/mcp_example/test_mcp_connection.py b/examples/mcp_example/test_mcp_connection.py new file mode 100644 index 00000000..7bc18778 --- /dev/null +++ b/examples/mcp_example/test_mcp_connection.py @@ -0,0 +1,25 @@ +import requests +import time +import sys +from loguru import logger + +# Configure logger +logger.remove() +logger.add(sys.stdout, level="DEBUG") + +def test_server_connection(): + """Simple test to see if server responds at all.""" + url = "http://localhost:8000" + + try: + logger.debug(f"Testing connection to {url}") + response = requests.get(url) + logger.debug(f"Response status: {response.status_code}") + logger.debug(f"Response content: {response.text[:100]}...") + return True + except Exception as e: + logger.error(f"Connection failed: {str(e)}") + return False + +if __name__ == "__main__": + test_server_connection() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b776160c..644c0ea7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,6 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" + version = "7.7.2" description = "Swarms - TGSC" license = "MIT" @@ -77,6 +78,9 @@ numpy = "*" litellm = "*" torch = "*" httpx = "*" +fastmcp = {version = ">=2.0", extras = ["sse"]} +mcp = ">=0.3.0" +typing-extensions = "^4.13.2" [tool.poetry.scripts] swarms = "swarms.cli.main:main" diff --git a/replit.nix b/replit.nix new file mode 100644 index 00000000..da9faae4 --- /dev/null +++ b/replit.nix @@ -0,0 +1,5 @@ +{pkgs}: { + deps = [ + pkgs.libxcrypt + ]; +} diff --git a/scripts/cleanup_and_publish.sh b/scripts/cleanup_and_publish.sh old mode 100755 new mode 100644 diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index a5410030..022f2e23 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -29,8 +29,7 @@ from swarms.agents.ape_agent import auto_generate_prompt from swarms.artifacts.main_artifact import Artifact from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( - MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, -) + MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, ) from swarms.prompts.tools import tool_sop_prompt from swarms.schemas.agent_step_schemas import ManySteps, Step from swarms.schemas.base_schemas import ( @@ -46,12 +45,7 @@ from swarms.structs.safe_loading import ( ) from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool - -# from swarms.tools.mcp_integration import ( -# MCPServerSseParams, -# batch_mcp_flow, -# mcp_flow_get_tool_schema, -# ) +from swarms.tools.mcp_integration import MCPServerSseParams, batch_mcp_flow, mcp_flow_get_tool_schema from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.any_to_str import any_to_str from swarms.utils.data_to_text import data_to_text @@ -92,7 +86,6 @@ def exists(val): # Agent output types ToolUsageType = Union[BaseModel, Dict[str, Any]] - # Agent Exceptions @@ -260,7 +253,7 @@ class Agent: run_async_concurrent: Run the agent asynchronously and concurrently run_async_concurrent: Run the agent asynchronously and concurrently construct_dynamic_prompt: Construct the dynamic prompt - handle_artifacts: Handle artifacts + handle_artifacts Examples: @@ -327,8 +320,7 @@ class Agent: stopping_func: Optional[Callable] = None, custom_loop_condition: Optional[Callable] = None, sentiment_threshold: Optional[ - float - ] = None, # Evaluate on output using an external model + float] = None, # Evaluate on output using an external model custom_exit_command: Optional[str] = "exit", sentiment_analyzer: Optional[Callable] = None, limit_tokens_from_string: Optional[Callable] = None, @@ -367,9 +359,8 @@ class Agent: use_cases: Optional[List[Dict[str, str]]] = None, step_pool: List[Step] = [], print_every_step: Optional[bool] = False, - time_created: Optional[str] = time.strftime( - "%Y-%m-%d %H:%M:%S", time.localtime() - ), + time_created: Optional[str] = time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime()), agent_output: ManySteps = None, executor_workers: int = os.cpu_count(), data_memory: Optional[Callable] = None, @@ -392,7 +383,7 @@ class Agent: role: agent_roles = "worker", no_print: bool = False, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, - # mcp_servers: List[MCPServerSseParams] = [], + mcp_servers: Optional[list] = None, # list[MCPServerSseParams] *args, **kwargs, ): @@ -456,9 +447,7 @@ class Agent: self.output_type = output_type self.function_calling_type = function_calling_type self.output_cleaner = output_cleaner - self.function_calling_format_type = ( - function_calling_format_type - ) + self.function_calling_format_type = (function_calling_format_type) self.list_base_models = list_base_models self.metadata_output_type = metadata_output_type self.state_save_file_type = state_save_file_type @@ -512,7 +501,8 @@ class Agent: self.role = role self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary - # self.mcp_servers = mcp_servers + self.mcp_servers = mcp_servers or [ + ] # Initialize mcp_servers to an empty list if None self._cached_llm = ( None # Add this line to cache the LLM instance @@ -521,10 +511,7 @@ class Agent: "gpt-4o-mini" # Move default model name here ) - if ( - self.agent_name is not None - or self.agent_description is not None - ): + if (self.agent_name is not None or self.agent_description is not None): prompt = f"Your Name: {self.agent_name} \n\n Your Description: {self.agent_description} \n\n {system_prompt}" else: prompt = system_prompt @@ -544,9 +531,7 @@ class Agent: self.feedback = [] # Initialize the executor - self.executor = ThreadPoolExecutor( - max_workers=executor_workers - ) + self.executor = ThreadPoolExecutor(max_workers=executor_workers) self.init_handling() @@ -562,8 +547,7 @@ class Agent: (self.handle_tool_init, True), # Always run tool init ( self.handle_tool_schema_ops, - exists(self.tool_schema) - or exists(self.list_base_models), + exists(self.tool_schema) or exists(self.list_base_models), ), ( self.handle_sop_ops, @@ -572,14 +556,11 @@ class Agent: ] # Filter out tasks whose conditions are False - filtered_tasks = [ - task for task, condition in tasks if condition - ] + filtered_tasks = [task for task, condition in tasks if condition] # Execute all tasks concurrently - with concurrent.futures.ThreadPoolExecutor( - max_workers=os.cpu_count() * 4 - ) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count() * + 4) as executor: # Map tasks to futures and collect results results = {} future_to_task = { @@ -588,21 +569,15 @@ class Agent: } # Wait for each future to complete and collect results/exceptions - for future in concurrent.futures.as_completed( - future_to_task - ): + for future in concurrent.futures.as_completed(future_to_task): task_name = future_to_task[future] try: result = future.result() results[task_name] = result - logging.info( - f"Task {task_name} completed successfully" - ) + logging.info(f"Task {task_name} completed successfully") except Exception as e: results[task_name] = None - logging.error( - f"Task {task_name} failed with error: {e}" - ) + logging.error(f"Task {task_name} failed with error: {e}") # Run sequential operations after all concurrent tasks are done self.agent_output = self.agent_output_model() @@ -623,9 +598,7 @@ class Agent: max_loops=self.max_loops, steps=self.short_memory.to_dict(), full_history=self.short_memory.get_str(), - total_tokens=count_tokens( - text=self.short_memory.get_str() - ), + total_tokens=count_tokens(text=self.short_memory.get_str()), stopping_token=self.stopping_token, interactive=self.interactive, dynamic_temperature_enabled=self.dynamic_temperature_enabled, @@ -652,23 +625,17 @@ class Agent: } if self.llm_args is not None: - self._cached_llm = LiteLLM( - **{**common_args, **self.llm_args} - ) + self._cached_llm = LiteLLM(**{**common_args, **self.llm_args}) elif self.tools_list_dictionary is not None: self._cached_llm = LiteLLM( **common_args, tools_list_dictionary=self.tools_list_dictionary, tool_choice="auto", - parallel_tool_calls=len( - self.tools_list_dictionary - ) - > 1, + parallel_tool_calls=len(self.tools_list_dictionary) > 1, ) else: - self._cached_llm = LiteLLM( - **common_args, stream=self.streaming_on - ) + self._cached_llm = LiteLLM(**common_args, + stream=self.streaming_on) return self._cached_llm except AgentLLMInitializationError as e: @@ -679,11 +646,8 @@ class Agent: def handle_tool_init(self): # Initialize the tool struct - if ( - exists(self.tools) - or exists(self.list_base_models) - or exists(self.tool_schema) - ): + if (exists(self.tools) or exists(self.list_base_models) + or exists(self.tool_schema) or exists(self.mcp_servers)): self.tool_struct = BaseTool( tools=self.tools, @@ -696,91 +660,21 @@ class Agent: "Tools provided make sure the functions have documentation ++ type hints, otherwise tool execution won't be reliable." ) # Add the tool prompt to the memory - self.short_memory.add( - role="system", content=self.tool_system_prompt - ) + self.short_memory.add(role="system", + content=self.tool_system_prompt) # Log the tools - logger.info( - f"Tools provided: Accessing {len(self.tools)} tools" - ) + logger.info(f"Tools provided: Accessing {len(self.tools)} tools") # Transform the tools into an openai schema # self.convert_tool_into_openai_schema() # Transform the tools into an openai schema - tool_dict = ( - self.tool_struct.convert_tool_into_openai_schema() - ) + tool_dict = (self.tool_struct.convert_tool_into_openai_schema()) self.short_memory.add(role="system", content=tool_dict) # Now create a function calling map for every tools - self.function_map = { - tool.__name__: tool for tool in self.tools - } - - # def mcp_execution_flow(self, response: any): - # """ - # Executes the MCP (Model Context Protocol) flow based on the provided response. - - # This method takes a response, converts it from a string to a dictionary format, - # and checks for the presence of a tool name or a name in the response. If either - # is found, it retrieves the tool name and proceeds to call the batch_mcp_flow - # function to execute the corresponding tool actions. - - # Args: - # response (any): The response to be processed, which can be in string format - # that represents a dictionary. - - # Returns: - # The output from the batch_mcp_flow function, which contains the results of - # the tool execution. If an error occurs during processing, it logs the error - # and returns None. - - # Raises: - # Exception: Logs any exceptions that occur during the execution flow. - # """ - # try: - # response = str_to_dict(response) - - # tool_output = batch_mcp_flow( - # self.mcp_servers, - # function_call=response, - # ) - - # return tool_output - # except Exception as e: - # logger.error(f"Error in mcp_execution_flow: {e}") - # return None - - # def mcp_tool_handling(self): - # """ - # Handles the retrieval of tool schemas from the MCP servers. - - # This method iterates over the list of MCP servers, retrieves the tool schema - # for each server using the mcp_flow_get_tool_schema function, and compiles - # these schemas into a list. The resulting list is stored in the - # tools_list_dictionary attribute. - - # Returns: - # list: A list of tool schemas retrieved from the MCP servers. If an error - # occurs during the retrieval process, it logs the error and returns None. - - # Raises: - # Exception: Logs any exceptions that occur during the tool handling process. - # """ - # try: - # self.tools_list_dictionary = [] - - # for mcp_server in self.mcp_servers: - # tool_schema = mcp_flow_get_tool_schema(mcp_server) - # self.tools_list_dictionary.append(tool_schema) - - # print(self.tools_list_dictionary) - # return self.tools_list_dictionary - # except Exception as e: - # logger.error(f"Error in mcp_tool_handling: {e}") - # return None + self.function_map = {tool.__name__: tool for tool in self.tools} def setup_config(self): # The max_loops will be set dynamically if the dynamic_loop @@ -825,23 +719,18 @@ class Agent: # If no components available, fall back to task if not components and task: logger.warning( - "No agent details found. Using task as fallback for prompt generation." - ) - self.system_prompt = auto_generate_prompt( - task, self.llm + "No agent details found. Usingtask as fallback for promptgeneration." ) + self.system_prompt = auto_generate_prompt(task, self.llm) else: # Combine all available components combined_prompt = " ".join(components) logger.info( - f"Auto-generating prompt from: {', '.join(components)}" - ) + f"Auto-generating prompt from: {', '.join(components)}") self.system_prompt = auto_generate_prompt( - combined_prompt, self.llm - ) - self.short_memory.add( - role="system", content=self.system_prompt - ) + combined_prompt, self.llm) + self.short_memory.add(role="system", + content=self.system_prompt) logger.info("Auto-generated prompt successfully.") @@ -856,13 +745,9 @@ class Agent: def agent_initialization(self): try: - logger.info( - f"Initializing Autonomous Agent {self.agent_name}..." - ) + logger.info(f"Initializing Autonomous Agent {self.agent_name}...") self.check_parameters() - logger.info( - f"{self.agent_name} Initialized Successfully." - ) + logger.info(f"{self.agent_name} Initialized Successfully.") logger.info( f"Autonomous Agent {self.agent_name} Activated, all systems operational. Executing task..." ) @@ -881,9 +766,7 @@ class Agent: return self.stopping_condition(response) return False except Exception as error: - logger.error( - f"Error checking stopping condition: {error}" - ) + logger.error(f"Error checking stopping condition: {error}") def dynamic_temperature(self): """ @@ -900,15 +783,11 @@ class Agent: # Use a default temperature self.llm.temperature = 0.5 except Exception as error: - logger.error( - f"Error dynamically changing temperature: {error}" - ) + logger.error(f"Error dynamically changing temperature: {error}") def print_dashboard(self): """Print dashboard""" - formatter.print_panel( - f"Initializing Agent: {self.agent_name}" - ) + formatter.print_panel(f"Initializing Agent: {self.agent_name}") data = self.to_dict() @@ -928,8 +807,7 @@ class Agent: Configuration: {data} ---------------------------------------- - """, - ) + """, ) # Check parameters def check_parameters(self): @@ -981,21 +859,14 @@ class Agent: try: # 1. Batch process initial setup setup_tasks = [ - lambda: self.check_if_no_prompt_then_autogenerate( - task - ), - lambda: self.short_memory.add( - role=self.user_name, content=task - ), - lambda: ( - self.plan(task) if self.plan_enabled else None - ), + lambda: self.check_if_no_prompt_then_autogenerate(task), + lambda: self.short_memory.add(role=self.user_name, + content=task), + lambda: (self.plan(task) if self.plan_enabled else None), ] # Execute setup tasks concurrently - with ThreadPoolExecutor( - max_workers=len(setup_tasks) - ) as executor: + with ThreadPoolExecutor(max_workers=len(setup_tasks)) as executor: executor.map(lambda f: f(), setup_tasks) # Set the loop count @@ -1020,10 +891,7 @@ class Agent: f"Task Request for {self.agent_name}", ) - while ( - self.max_loops == "auto" - or loop_count < self.max_loops - ): + while (self.max_loops == "auto" or loop_count < self.max_loops): loop_count += 1 # self.short_memory.add( @@ -1036,35 +904,25 @@ class Agent: self.dynamic_temperature() # Task prompt - task_prompt = ( - self.short_memory.return_history_as_string() - ) + task_prompt = (self.short_memory.return_history_as_string()) # Parameters attempt = 0 success = False while attempt < self.retry_attempts and not success: try: - if ( - self.long_term_memory is not None - and self.rag_every_loop is True - ): - logger.info( - "Querying RAG database for context..." - ) + if (self.long_term_memory is not None + and self.rag_every_loop is True): + logger.info("Querying RAG database for context...") self.memory_query(task_prompt) # Generate response using LLM - response_args = ( - (task_prompt, *args) - if img is None - else (task_prompt, img, *args) - ) + response_args = ((task_prompt, + *args) if img is None else + (task_prompt, img, *args)) # Call the LLM - response = self.call_llm( - *response_args, **kwargs - ) + response = self.call_llm(*response_args, **kwargs) # Convert to a str if the response is not a str response = self.parse_llm_output(response) @@ -1081,29 +939,27 @@ class Agent: # 9. Batch memory updates and prints update_tasks = [ - lambda: self.short_memory.add( - role=self.agent_name, content=response - ), - lambda: self.pretty_print( - response, loop_count - ), + lambda: self.short_memory.add(role=self.agent_name, + content=response), + lambda: self.pretty_print(response, loop_count), lambda: self.output_cleaner_op(response), ] with ThreadPoolExecutor( - max_workers=len(update_tasks) - ) as executor: + max_workers=len(update_tasks)) as executor: executor.map(lambda f: f(), update_tasks) - # Check and execute tools - if self.tools is not None: - out = self.parse_and_execute_tools( - response - ) + # Check and execute tools (including MCP) + if self.tools is not None or hasattr( + self, 'mcp_servers'): + if self.tools: + out = self.parse_and_execute_tools(response) + if hasattr(self, + 'mcp_servers') and self.mcp_servers: + out = self.mcp_execution_flow(response) - self.short_memory.add( - role="Tool Executor", content=out - ) + self.short_memory.add(role="Tool Executor", + content=out) agent_print( f"{self.agent_name} - Tool Executor", @@ -1121,9 +977,8 @@ class Agent: self.streaming_on, ) - self.short_memory.add( - role=self.agent_name, content=out - ) + self.short_memory.add(role=self.agent_name, + content=out) self.sentiment_and_evaluator(response) @@ -1136,10 +991,8 @@ class Agent: if self.autosave is True: self.save() - logger.error( - f"Attempt {attempt+1}: Error generating" - f" response: {e}" - ) + logger.error(f"Attempt {attempt+1}: Error generating" + f" response: {e}") attempt += 1 if not success: @@ -1149,23 +1002,17 @@ class Agent: if self.autosave is True: self.save() - logger.error( - "Failed to generate a valid response after" - " retry attempts." - ) + logger.error("Failed to generate a valid response after" + " retry attempts.") break # Exit the loop if all retry attempts fail # Check stopping conditions - if ( - self.stopping_condition is not None - and self._check_stopping_condition(response) - ): + if (self.stopping_condition is not None + and self._check_stopping_condition(response)): logger.info("Stopping condition met.") break - elif ( - self.stopping_func is not None - and self.stopping_func(response) - ): + elif (self.stopping_func is not None + and self.stopping_func(response)): logger.info("Stopping function met.") break @@ -1174,21 +1021,15 @@ class Agent: user_input = input("You: ") # User-defined exit command - if ( - user_input.lower() - == self.custom_exit_command.lower() - ): + if (user_input.lower() == self.custom_exit_command.lower() + ): print("Exiting as per user request.") break - self.short_memory.add( - role="User", content=user_input - ) + self.short_memory.add(role="User", content=user_input) if self.loop_interval: - logger.info( - f"Sleeping for {self.loop_interval} seconds" - ) + logger.info(f"Sleeping for {self.loop_interval} seconds") time.sleep(self.loop_interval) if self.autosave is True: @@ -1207,14 +1048,11 @@ class Agent: lambda: self.save() if self.autosave else None, ] - with ThreadPoolExecutor( - max_workers=len(final_tasks) - ) as executor: + with ThreadPoolExecutor(max_workers=len(final_tasks)) as executor: executor.map(lambda f: f(), final_tasks) - return history_output_formatter( - self.short_memory, type=self.output_type - ) + return history_output_formatter(self.short_memory, + type=self.output_type) except Exception as error: self._handle_run_error(error) @@ -1236,6 +1074,7 @@ class Agent: def _handle_run_error(self, error: any): process_thread = threading.Thread( target=self.__handle_run_error, + args=(error,), daemon=True, ) @@ -1285,8 +1124,7 @@ class Agent: ) except Exception as error: await self._handle_run_error( - error - ) # Ensure this is also async if needed + error) # Ensure this is also async if needed def __call__( self, @@ -1321,12 +1159,8 @@ class Agent: except Exception as error: self._handle_run_error(error) - def receive_message( - self, agent_name: str, task: str, *args, **kwargs - ): - return self.run( - task=f"From {agent_name}: {task}", *args, **kwargs - ) + def receive_message(self, agent_name: str, task: str, *args, **kwargs): + return self.run(task=f"From {agent_name}: {task}", *args, **kwargs) def dict_to_csv(self, data: dict) -> str: """ @@ -1377,8 +1211,7 @@ class Agent: except Exception as error: retries += 1 logger.error( - f"Attempt {retries}: Error executing tool: {error}" - ) + f"Attempt {retries}: Error executing tool: {error}") if retries == max_retries: raise error time.sleep(1) # Wait for a bit before retrying @@ -1394,9 +1227,7 @@ class Agent: """ logger.info(f"Adding memory: {message}") - return self.short_memory.add( - role=self.agent_name, content=message - ) + return self.short_memory.add(role=self.agent_name, content=message) def plan(self, task: str, *args, **kwargs) -> None: """ @@ -1413,9 +1244,7 @@ class Agent: logger.info(f"Plan: {plan}") # Add the plan to the memory - self.short_memory.add( - role=self.agent_name, content=str(plan) - ) + self.short_memory.add(role=self.agent_name, content=str(plan)) return None except Exception as error: @@ -1431,16 +1260,13 @@ class Agent: """ try: logger.info(f"Running concurrent task: {task}") - future = self.executor.submit( - self.run, task, *args, **kwargs - ) + future = self.executor.submit(self.run, task, *args, **kwargs) result = await asyncio.wrap_future(future) logger.info(f"Completed task: {result}") return result except Exception as error: logger.error( - f"Error running agent: {error} while running concurrently" - ) + f"Error running agent: {error} while running concurrently") def run_concurrent_tasks(self, tasks: List[str], *args, **kwargs): """ @@ -1452,9 +1278,7 @@ class Agent: try: logger.info(f"Running concurrent tasks: {tasks}") futures = [ - self.executor.submit( - self.run, task=task, *args, **kwargs - ) + self.executor.submit(self.run, task=task, *args, **kwargs) for task in tasks ] results = [future.result() for future in futures] @@ -1492,8 +1316,7 @@ class Agent: try: # Create a list of coroutines for each task coroutines = [ - self.arun(task=task, *args, **kwargs) - for task in tasks + self.arun(task=task, *args, **kwargs) for task in tasks ] # Use asyncio.gather to run them concurrently results = await asyncio.gather(*coroutines) @@ -1517,20 +1340,15 @@ class Agent: """ try: # Determine the save path - resolved_path = ( - file_path - or self.saved_state_path - or f"{self.agent_name}_state.json" - ) + resolved_path = (file_path or self.saved_state_path + or f"{self.agent_name}_state.json") # Ensure path has .json extension if not resolved_path.endswith(".json"): resolved_path += ".json" # Create full path including workspace directory - full_path = os.path.join( - self.workspace_dir, resolved_path - ) + full_path = os.path.join(self.workspace_dir, resolved_path) backup_path = full_path + ".backup" temp_path = full_path + ".temp" @@ -1555,25 +1373,19 @@ class Agent: try: os.remove(backup_path) except Exception as e: - logger.warning( - f"Could not remove backup file: {e}" - ) + logger.warning(f"Could not remove backup file: {e}") # Log saved state information if verbose if self.verbose: self._log_saved_state_info(full_path) - logger.info( - f"Successfully saved agent state to: {full_path}" - ) + logger.info(f"Successfully saved agent state to: {full_path}") # Handle additional component saves self._save_additional_components(full_path) except OSError as e: - logger.error( - f"Filesystem error while saving agent state: {e}" - ) + logger.error(f"Filesystem error while saving agent state: {e}") raise except Exception as e: logger.error(f"Unexpected error saving agent state: {e}") @@ -1583,40 +1395,25 @@ class Agent: """Save additional agent components like memory.""" try: # Save long term memory if it exists - if ( - hasattr(self, "long_term_memory") - and self.long_term_memory is not None - ): - memory_path = ( - f"{os.path.splitext(base_path)[0]}_memory.json" - ) + if (hasattr(self, "long_term_memory") + and self.long_term_memory is not None): + memory_path = (f"{os.path.splitext(base_path)[0]}_memory.json") try: self.long_term_memory.save(memory_path) - logger.info( - f"Saved long-term memory to: {memory_path}" - ) + logger.info(f"Saved long-term memory to: {memory_path}") except Exception as e: - logger.warning( - f"Could not save long-term memory: {e}" - ) + logger.warning(f"Could not save long-term memory: {e}") # Save memory manager if it exists - if ( - hasattr(self, "memory_manager") - and self.memory_manager is not None - ): + if (hasattr(self, "memory_manager") + and self.memory_manager is not None): manager_path = f"{os.path.splitext(base_path)[0]}_memory_manager.json" try: - self.memory_manager.save_memory_snapshot( - manager_path - ) + self.memory_manager.save_memory_snapshot(manager_path) logger.info( - f"Saved memory manager state to: {manager_path}" - ) + f"Saved memory manager state to: {manager_path}") except Exception as e: - logger.warning( - f"Could not save memory manager: {e}" - ) + logger.warning(f"Could not save memory manager: {e}") except Exception as e: logger.warning(f"Error saving additional components: {e}") @@ -1635,8 +1432,7 @@ class Agent: self.save() if self.verbose: logger.debug( - f"Autosaved agent state (interval: {interval}s)" - ) + f"Autosaved agent state (interval: {interval}s)") except Exception as e: logger.error(f"Autosave failed: {e}") time.sleep(interval) @@ -1663,9 +1459,7 @@ class Agent: """Cleanup method to be called on exit. Ensures final state is saved.""" try: if getattr(self, "autosave", False): - logger.info( - "Performing final autosave before exit..." - ) + logger.info("Performing final autosave before exit...") self.disable_autosave() self.save() except Exception as e: @@ -1687,22 +1481,11 @@ class Agent: try: # Resolve load path conditionally with a check for self.load_state_path resolved_path = ( - file_path - or self.load_state_path - or ( - f"{self.saved_state_path}.json" - if self.saved_state_path - else ( - f"{self.agent_name}.json" - if self.agent_name - else ( - f"{self.workspace_dir}/{self.agent_name}_state.json" - if self.workspace_dir and self.agent_name - else None - ) - ) - ) - ) + file_path or self.load_state_path or + (f"{self.saved_state_path}.json" if self.saved_state_path else + (f"{self.agent_name}.json" if self.agent_name else + (f"{self.workspace_dir}/{self.agent_name}_state.json" + if self.workspace_dir and self.agent_name else None)))) # Load state using SafeStateManager SafeStateManager.load_state(self, resolved_path) @@ -1727,10 +1510,8 @@ class Agent: """ try: # Reinitialize conversation if needed - if ( - not hasattr(self, "short_memory") - or self.short_memory is None - ): + if (not hasattr(self, "short_memory") + or self.short_memory is None): self.short_memory = Conversation( system_prompt=self.system_prompt, time_enabled=False, @@ -1740,9 +1521,7 @@ class Agent: # Reinitialize executor if needed if not hasattr(self, "executor") or self.executor is None: - self.executor = ThreadPoolExecutor( - max_workers=os.cpu_count() - ) + self.executor = ThreadPoolExecutor(max_workers=os.cpu_count()) # # Reinitialize tool structure if needed # if hasattr(self, 'tools') and (self.tools or getattr(self, 'list_base_models', None)): @@ -1763,19 +1542,13 @@ class Agent: preserved = SafeLoaderUtils.preserve_instances(self) logger.info(f"Saved agent state to: {file_path}") - logger.debug( - f"Saved {len(state_dict)} configuration values" - ) - logger.debug( - f"Preserved {len(preserved)} class instances" - ) + logger.debug(f"Saved {len(state_dict)} configuration values") + logger.debug(f"Preserved {len(preserved)} class instances") if self.verbose: logger.debug("Preserved instances:") for name, instance in preserved.items(): - logger.debug( - f" - {name}: {type(instance).__name__}" - ) + logger.debug(f" - {name}: {type(instance).__name__}") except Exception as e: logger.error(f"Error logging state info: {e}") @@ -1786,19 +1559,13 @@ class Agent: preserved = SafeLoaderUtils.preserve_instances(self) logger.info(f"Loaded agent state from: {file_path}") - logger.debug( - f"Loaded {len(state_dict)} configuration values" - ) - logger.debug( - f"Preserved {len(preserved)} class instances" - ) + logger.debug(f"Loaded {len(state_dict)} configuration values") + logger.debug(f"Preserved {len(preserved)} class instances") if self.verbose: logger.debug("Current class instances:") for name, instance in preserved.items(): - logger.debug( - f" - {name}: {type(instance).__name__}" - ) + logger.debug(f" - {name}: {type(instance).__name__}") except Exception as e: logger.error(f"Error logging state info: {e}") @@ -1862,25 +1629,22 @@ class Agent: # Response Filtering def add_response_filter(self, filter_word: str) -> None: """ - Add a response filter to filter out certain words from the response - - Example: - agent.add_response_filter("Trump") - agent.run("Generate a report on Trump") - - + Add a response filter to filter out certain words from the response. """ logger.info(f"Adding response filter: {filter_word}") - self.reponse_filters.append(filter_word) + self.response_filters.append(filter_word) - def apply_reponse_filters(self, response: str) -> str: + def apply_response_filters(self, response: str) -> str: """ - Apply the response filters to the response + Apply the response filters to the response. + + Args: + response (str): The response to filter + Returns: + str: The filtered response """ - logger.info( - f"Applying response filters to response: {response}" - ) + logger.info(f"Applying response filters to response: {response}") for word in self.response_filters: response = response.replace(word, "[FILTERED]") return response @@ -1951,9 +1715,7 @@ class Agent: for doc in docs: data = data_to_text(doc) - return self.short_memory.add( - role=self.user_name, content=data - ) + return self.short_memory.add(role=self.user_name, content=data) except Exception as error: logger.info(f"Error ingesting docs: {error}", "red") @@ -1966,9 +1728,7 @@ class Agent: try: logger.info(f"Ingesting pdf: {pdf}") text = pdf_to_text(pdf) - return self.short_memory.add( - role=self.user_name, content=text - ) + return self.short_memory.add(role=self.user_name, content=text) except Exception as error: logger.info(f"Error ingesting pdf: {error}", "red") @@ -1981,9 +1741,8 @@ class Agent: logger.info(f"Error receiving message: {error}") raise error - def send_agent_message( - self, agent_name: str, message: str, *args, **kwargs - ): + def send_agent_message(self, agent_name: str, message: str, *args, + **kwargs): """Send a message to the agent""" try: logger.info(f"Sending agent message: {message}") @@ -1994,19 +1753,18 @@ class Agent: raise error def add_tool(self, tool: Callable): - """Add a single tool to the agent's tools list. + """Add a single tool to the agents tools list. Args: tool (Callable): The tool function to add Returns: - The result of appending the tool to the tools list - """ + The result of appending the tool to the tools list""" logger.info(f"Adding tool: {tool.__name__}") return self.tools.append(tool) def add_tools(self, tools: List[Callable]): - """Add multiple tools to the agent's tools list. + """Add multiple tools to the agents tools list. Args: tools (List[Callable]): List of tool functions to add @@ -2018,7 +1776,7 @@ class Agent: return self.tools.extend(tools) def remove_tool(self, tool: Callable): - """Remove a single tool from the agent's tools list. + """Remove a single tool from the agents tools list. Args: tool (Callable): The tool function to remove @@ -2030,7 +1788,7 @@ class Agent: return self.tools.remove(tool) def remove_tools(self, tools: List[Callable]): - """Remove multiple tools from the agent's tools list. + """Remove multiple tools from the agents tools list. Args: tools (List[Callable]): List of tool functions to remove @@ -2055,13 +1813,9 @@ class Agent: all_text += f"\nContent from {file}:\n{text}\n" # Add the combined content to memory - return self.short_memory.add( - role=self.user_name, content=all_text - ) + return self.short_memory.add(role=self.user_name, content=all_text) except Exception as error: - logger.error( - f"Error getting docs from doc folders: {error}" - ) + logger.error(f"Error getting docs from doc folders: {error}") raise error def memory_query(self, task: str = None, *args, **kwargs) -> None: @@ -2071,12 +1825,10 @@ class Agent: formatter.print_panel(f"Querying RAG for: {task}") memory_retrieval = self.long_term_memory.query( - task, *args, **kwargs - ) + task, *args, **kwargs) memory_retrieval = ( - f"Documents Available: {str(memory_retrieval)}" - ) + f"Documents Available: {str(memory_retrieval)}") # # Count the tokens # memory_token_count = count_tokens( @@ -2115,17 +1867,13 @@ class Agent: print(f"Sentiment: {sentiment}") if sentiment > self.sentiment_threshold: - print( - f"Sentiment: {sentiment} is above" - " threshold:" - f" {self.sentiment_threshold}" - ) + print(f"Sentiment: {sentiment} is above" + " threshold:" + f" {self.sentiment_threshold}") elif sentiment < self.sentiment_threshold: - print( - f"Sentiment: {sentiment} is below" - " threshold:" - f" {self.sentiment_threshold}" - ) + print(f"Sentiment: {sentiment} is below" + " threshold:" + f" {self.sentiment_threshold}") self.short_memory.add( role=self.agent_name, @@ -2134,9 +1882,7 @@ class Agent: except Exception as e: print(f"Error occurred during sentiment analysis: {e}") - def stream_response( - self, response: str, delay: float = 0.001 - ) -> None: + def stream_response(self, response: str, delay: float = 0.001) -> None: """ Streams the response token by token. @@ -2169,19 +1915,16 @@ class Agent: # Log the amount of tokens left in the memory and in the task if self.tokenizer is not None: tokens_used = count_tokens( - self.short_memory.return_history_as_string() - ) + self.short_memory.return_history_as_string()) logger.info( - f"Tokens available: {self.context_length - tokens_used}" - ) + f"Tokens available: {self.context_length - tokens_used}") return tokens_used def tokens_checks(self): # Check the tokens available tokens_used = count_tokens( - self.short_memory.return_history_as_string() - ) + self.short_memory.return_history_as_string()) out = self.check_available_tokens() logger.info( @@ -2190,9 +1933,7 @@ class Agent: return out - def log_step_metadata( - self, loop: int, task: str, response: str - ) -> Step: + def log_step_metadata(self, loop: int, task: str, response: str) -> Step: """Log metadata for each step of agent execution.""" # Generate unique step ID step_id = f"step_{loop}_{uuid.uuid4().hex}" @@ -2202,7 +1943,7 @@ class Agent: # prompt_tokens = count_tokens(full_memory) # completion_tokens = count_tokens(response) # total_tokens = prompt_tokens + completion_tokens - total_tokens = (count_tokens(task) + count_tokens(response),) + total_tokens = (count_tokens(task) + count_tokens(response), ) # # Get memory responses # memory_responses = { @@ -2301,18 +2042,14 @@ class Agent: """Update tool usage information for a specific step.""" for step in self.agent_output.steps: if step.step_id == step_id: - step.response.tool_calls.append( - { - "tool": tool_name, - "arguments": tool_args, - "response": str(tool_response), - } - ) + step.response.tool_calls.append({ + "tool": tool_name, + "arguments": tool_args, + "response": str(tool_response), + }) break - def _serialize_callable( - self, attr_value: Callable - ) -> Dict[str, Any]: + def _serialize_callable(self, attr_value: Callable) -> Dict[str, Any]: """ Serializes callable attributes by extracting their name and docstring. @@ -2323,9 +2060,8 @@ class Agent: Dict[str, Any]: Dictionary with name and docstring of the callable. """ return { - "name": getattr( - attr_value, "__name__", type(attr_value).__name__ - ), + "name": getattr(attr_value, "__name__", + type(attr_value).__name__), "doc": getattr(attr_value, "__doc__", None), } @@ -2344,9 +2080,8 @@ class Agent: if callable(attr_value): return self._serialize_callable(attr_value) elif hasattr(attr_value, "to_dict"): - return ( - attr_value.to_dict() - ) # Recursive serialization for nested objects + return (attr_value.to_dict() + ) # Recursive serialization for nested objects else: json.dumps( attr_value @@ -2369,14 +2104,10 @@ class Agent: } def to_json(self, indent: int = 4, *args, **kwargs): - return json.dumps( - self.to_dict(), indent=indent, *args, **kwargs - ) + return json.dumps(self.to_dict(), indent=indent, *args, **kwargs) def to_yaml(self, indent: int = 4, *args, **kwargs): - return yaml.dump( - self.to_dict(), indent=indent, *args, **kwargs - ) + return yaml.dump(self.to_dict(), indent=indent, *args, **kwargs) def to_toml(self, *args, **kwargs): return toml.dumps(self.to_dict(), *args, **kwargs) @@ -2411,14 +2142,11 @@ class Agent: if exists(self.tool_schema): logger.info(f"Tool schema provided: {self.tool_schema}") - output = self.tool_struct.base_model_to_dict( - self.tool_schema, output_str=True - ) + output = self.tool_struct.base_model_to_dict(self.tool_schema, + output_str=True) # Add the tool schema to the short memory - self.short_memory.add( - role=self.agent_name, content=output - ) + self.short_memory.add(role=self.agent_name, content=output) # If multiple base models, then conver them. if exists(self.list_base_models): @@ -2427,13 +2155,10 @@ class Agent: ) schemas = self.tool_struct.multi_base_models_to_dict( - output_str=True - ) + output_str=True) # If the output is a string then add it to the memory - self.short_memory.add( - role=self.agent_name, content=schemas - ) + self.short_memory.add(role=self.agent_name, content=schemas) return None @@ -2479,14 +2204,10 @@ class Agent: # If the user inputs a list of strings for the sop then join them and set the sop if exists(self.sop_list): self.sop = "\n".join(self.sop_list) - self.short_memory.add( - role=self.user_name, content=self.sop - ) + self.short_memory.add(role=self.user_name, content=self.sop) if exists(self.sop): - self.short_memory.add( - role=self.user_name, content=self.sop - ) + self.short_memory.add(role=self.user_name, content=self.sop) logger.info("SOP Uploaded into the memory") @@ -2504,9 +2225,9 @@ class Agent: **kwargs, ) -> Any: """ - Executes the agent's run method on a specified device, with optional scheduling. + Executes the agents run method on a specified device, with optional scheduling. - This method attempts to execute the agent's run method on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`. + This method attempts to execute the agents run method on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`. If a `scheduled_date` is provided, the method will wait until that date and time before executing the task. @@ -2534,9 +2255,7 @@ class Agent: if scheduled_run_date: while datetime.now() < scheduled_run_date: - time.sleep( - 1 - ) # Sleep for a short period to avoid busy waiting + time.sleep(1) # Sleep for a short period to avoid busy waiting try: # If cluster ops disabled, run directly @@ -2557,9 +2276,8 @@ class Agent: except ValueError as e: self._handle_run_error(e) - def handle_artifacts( - self, text: str, file_output_path: str, file_extension: str - ) -> None: + def handle_artifacts(self, text: str, file_output_path: str, + file_extension: str) -> None: """Handle creating and saving artifacts with error handling.""" try: # Ensure file_extension starts with a dot @@ -2586,26 +2304,18 @@ class Agent: edit_count=0, ) - logger.info( - f"Saving artifact with extension: {file_extension}" - ) + logger.info(f"Saving artifact with extension: {file_extension}") artifact.save_as(file_extension) - logger.success( - f"Successfully saved artifact to {full_path}" - ) + logger.success(f"Successfully saved artifact to {full_path}") except ValueError as e: - logger.error( - f"Invalid input values for artifact: {str(e)}" - ) + logger.error(f"Invalid input values for artifact: {str(e)}") raise except IOError as e: logger.error(f"Error saving artifact to file: {str(e)}") raise except Exception as e: - logger.error( - f"Unexpected error handling artifact: {str(e)}" - ) + logger.error(f"Unexpected error handling artifact: {str(e)}") raise def showcase_config(self): @@ -2615,32 +2325,29 @@ class Agent: for key, value in config_dict.items(): if isinstance(value, list): # Format list as a comma-separated string - config_dict[key] = ", ".join( - str(item) for item in value - ) + config_dict[key] = ", ".join(str(item) for item in value) elif isinstance(value, dict): # Format dict as key-value pairs in a single string - config_dict[key] = ", ".join( - f"{k}: {v}" for k, v in value.items() - ) + config_dict[key] = ", ".join(f"{k}: {v}" + for k, v in value.items()) else: # Ensure any non-iterable value is a string config_dict[key] = str(value) - return formatter.print_table( - f"Agent: {self.agent_name} Configuration", config_dict - ) + return formatter.print_table(f"Agent: {self.agent_name} Configuration", + config_dict) - def talk_to( - self, agent: Any, task: str, img: str = None, *args, **kwargs - ) -> Any: + def talk_to(self, + agent: Any, + task: str, + img: str = None, + *args, + **kwargs) -> Any: """ Talk to another agent. """ # return agent.run(f"{agent.agent_name}: {task}", img, *args, **kwargs) - output = self.run( - f"{self.agent_name}: {task}", img, *args, **kwargs - ) + output = self.run(f"{self.agent_name}: {task}", img, *args, **kwargs) return agent.run( task=f"From {self.agent_name}: Message: {output}", @@ -2663,9 +2370,7 @@ class Agent: with ThreadPoolExecutor() as executor: # Create futures for each agent conversation futures = [ - executor.submit( - self.talk_to, agent, task, *args, **kwargs - ) + executor.submit(self.talk_to, agent, task, *args, **kwargs) for agent in agents ] @@ -2677,9 +2382,7 @@ class Agent: outputs.append(result) except Exception as e: logger.error(f"Error in agent communication: {e}") - outputs.append( - None - ) # or handle error case as needed + outputs.append(None) # or handle error case as needed return outputs @@ -2695,7 +2398,8 @@ class Agent: # self.stream_response(response) formatter.print_panel_token_by_token( f"{self.agent_name}: {response}", - title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", + title= + f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]", ) else: # logger.info(f"Response: {response}") @@ -2705,52 +2409,23 @@ class Agent: ) def parse_llm_output(self, response: Any) -> str: - """Parse and standardize the output from the LLM. - - Args: - response (Any): The response from the LLM in any format - - Returns: - str: Standardized string output - - Raises: - ValueError: If the response format is unexpected and can't be handled - """ - try: - # Handle dictionary responses - if isinstance(response, dict): - if "choices" in response: - return response["choices"][0]["message"][ - "content" - ] - return json.dumps( - response - ) # Convert other dicts to string - - # Handle string responses - elif isinstance(response, str): - return response - - # Handle list responses (from check_llm_outputs) - elif isinstance(response, list): - return "\n".join(response) - - # Handle any other type by converting to string - else: - return str(response) - - except Exception as e: - logger.error(f"Error parsing LLM output: {e}") - raise ValueError( - f"Failed to parse LLM output: {type(response)}" - ) + """Parse the LLM output to a string.""" + if isinstance(response, str): + return response + elif isinstance(response, dict): + return json.dumps(response) + elif isinstance(response, list): + return json.dumps(response) + else: + return str(response) def sentiment_and_evaluator(self, response: str): if self.evaluator: logger.info("Evaluating response...") evaluated_response = self.evaluator(response) - print("Evaluated Response:" f" {evaluated_response}") + print("Evaluated Response:" + f" {evaluated_response}") self.short_memory.add( role="Evaluator", content=evaluated_response, @@ -2774,3 +2449,133 @@ class Agent: role="Output Cleaner", content=response, ) + + async def amcp_execution_flow(self, response: str) -> str: + """Async implementation of MCP execution flow. + + Args: + response (str): The response from the LLM containing tool calls or natural language. + + Returns: + str: The result of executing the tool calls with preserved formatting. + """ + try: + # Try to parse as JSON first + try: + tool_calls = json.loads(response) + is_json = True + logger.debug( + f"Successfully parsed response as JSON: {tool_calls}") + except json.JSONDecodeError: + # If not JSON, treat as natural language + tool_calls = [response] + is_json = False + logger.debug( + f"Could not parse response as JSON, treating as natural language" + ) + + # Execute tool calls against MCP servers + results = [] + errors = [] + + # Handle both single tool call and array of tool calls + if isinstance(tool_calls, dict): + tool_calls = [tool_calls] + + logger.debug( + f"Executing {len(tool_calls)} tool calls against {len(self.mcp_servers)} MCP servers" + ) + + for tool_call in tool_calls: + try: + # Import here to avoid circular imports + from swarms.tools.mcp_integration import abatch_mcp_flow + + logger.debug(f"Executing tool call: {tool_call}") + # Execute the tool call against all MCP servers + result = await abatch_mcp_flow(self.mcp_servers, tool_call) + + if result: + logger.debug(f"Got result from MCP servers: {result}") + results.extend(result) + # Add successful result to memory with context + self.short_memory.add( + role="assistant", + content=f"Tool execution result: {result}") + else: + error_msg = "No result from tool execution" + errors.append(error_msg) + logger.debug(error_msg) + self.short_memory.add(role="error", content=error_msg) + + except Exception as e: + error_msg = f"Error executing tool call: {str(e)}" + errors.append(error_msg) + logger.error(error_msg) + self.short_memory.add(role="error", content=error_msg) + + # Format the final response + if results: + if len(results) == 1: + # For single results, return as is to preserve formatting + return results[0] + else: + # For multiple results, combine with context + formatted_results = [] + for i, result in enumerate(results, 1): + formatted_results.append(f"Result {i}: {result}") + return "\n".join(formatted_results) + elif errors: + if len(errors) == 1: + return errors[0] + else: + return "Multiple errors occurred:\n" + "\n".join( + f"- {err}" for err in errors) + else: + return "No results or errors returned" + + except Exception as e: + error_msg = f"Error in MCP execution flow: {str(e)}" + logger.error(error_msg) + self.short_memory.add(role="error", content=error_msg) + return error_msg + + def mcp_execution_flow(self, response: str) -> str: + """Synchronous wrapper for MCP execution flow. + + This method creates a new event loop if needed or uses the existing one + to run the async MCP execution flow. + + Args: + response (str): The response from the LLM containing tool calls or natural language. + + Returns: + str: The result of executing the tool calls with preserved formatting. + """ + try: + # Check if we're already in an event loop + try: + loop = asyncio.get_event_loop() + except RuntimeError: + # No event loop exists, create one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + if loop.is_running(): + # We're in an async context, use run_coroutine_threadsafe + logger.debug( + "Using run_coroutine_threadsafe to execute MCP flow") + future = asyncio.run_coroutine_threadsafe( + self.amcp_execution_flow(response), loop) + return future.result( + timeout=30) # Adding timeout to prevent hanging + else: + # We're not in an async context, use loop.run_until_complete + logger.debug("Using run_until_complete to execute MCP flow") + return loop.run_until_complete( + self.amcp_execution_flow(response)) + + except Exception as e: + error_msg = f"Error in MCP execution flow wrapper: {str(e)}" + logger.error(error_msg) + return error_msg diff --git a/swarms/tools/mcp_client.py b/swarms/tools/mcp_client.py index 9a9d2b37..e6032921 100644 --- a/swarms/tools/mcp_client.py +++ b/swarms/tools/mcp_client.py @@ -43,6 +43,8 @@ async def _execute_mcp_tool( method: Literal["stdio", "sse"] = "sse", parameters: Dict[Any, Any] = None, output_type: Literal["str", "dict"] = "str", + timeout: float = 30.0, + *args, **kwargs, ) -> Dict[Any, Any]: @@ -72,13 +74,15 @@ async def _execute_mcp_tool( raise ValueError(f"Invalid output type: {output_type}") -def execute_mcp_tool( +async def execute_mcp_tool( + url: str, tool_name: str = None, method: Literal["stdio", "sse"] = "sse", parameters: Dict[Any, Any] = None, output_type: Literal["str", "dict"] = "str", ) -> Dict[Any, Any]: + return asyncio.run( _execute_mcp_tool( url=url, diff --git a/swarms/tools/mcp_integration.py b/swarms/tools/mcp_integration.py index 7a74dbaa..6dd25e91 100644 --- a/swarms/tools/mcp_integration.py +++ b/swarms/tools/mcp_integration.py @@ -1,29 +1,17 @@ from __future__ import annotations -from typing import Any, List - - -from loguru import logger - import abc import asyncio from contextlib import AbstractAsyncContextManager, AsyncExitStack from pathlib import Path -from typing import Literal - -from anyio.streams.memory import ( - MemoryObjectReceiveStream, - MemoryObjectSendStream, -) -from mcp import ( - ClientSession, - StdioServerParameters, - Tool as MCPTool, - stdio_client, -) +from typing import Any, Dict, List, Optional, Literal, Union +from typing_extensions import NotRequired, TypedDict + +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from loguru import logger +from mcp import ClientSession, StdioServerParameters, Tool as MCPTool, stdio_client from mcp.client.sse import sse_client from mcp.types import CallToolResult, JSONRPCMessage -from typing_extensions import NotRequired, TypedDict from swarms.utils.any_to_str import any_to_str @@ -32,94 +20,69 @@ class MCPServer(abc.ABC): """Base class for Model Context Protocol servers.""" @abc.abstractmethod - async def connect(self): - """Connect to the server. For example, this might mean spawning a subprocess or - opening a network connection. The server is expected to remain connected until - `cleanup()` is called. - """ + async def connect(self) -> None: + """Establish connection to the MCP server.""" pass @property @abc.abstractmethod def name(self) -> str: - """A readable name for the server.""" + """Human-readable server name.""" pass @abc.abstractmethod - async def cleanup(self): - """Cleanup the server. For example, this might mean closing a subprocess or - closing a network connection. - """ + async def cleanup(self) -> None: + """Clean up resources and close connection.""" pass @abc.abstractmethod - async def list_tools(self) -> list[MCPTool]: - """List the tools available on the server.""" + async def list_tools(self) -> List[MCPTool]: + """List available MCP tools on the server.""" pass @abc.abstractmethod - async def call_tool( - self, tool_name: str, arguments: dict[str, Any] | None - ) -> CallToolResult: - """Invoke a tool on the server.""" + async def call_tool(self, tool_name: str, + arguments: Dict[str, Any] | None) -> CallToolResult: + """Invoke a tool by name with provided arguments.""" pass class _MCPServerWithClientSession(MCPServer, abc.ABC): - """Base class for MCP servers that use a `ClientSession` to communicate with the server.""" - - def __init__(self, cache_tools_list: bool): - """ - Args: - cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be - cached and only fetched from the server once. If `False`, the tools list will be - fetched from the server on each call to `list_tools()`. The cache can be invalidated - by calling `invalidate_tools_cache()`. You should set this to `True` if you know the - server will not change its tools list, because it can drastically improve latency - (by avoiding a round-trip to the server every time). - """ - self.session: ClientSession | None = None + """Mixin providing ClientSession-based MCP communication.""" + + def __init__(self, cache_tools_list: bool = False): + self.session: Optional[ClientSession] = None self.exit_stack: AsyncExitStack = AsyncExitStack() - self._cleanup_lock: asyncio.Lock = asyncio.Lock() + self._cleanup_lock = asyncio.Lock() self.cache_tools_list = cache_tools_list - - # The cache is always dirty at startup, so that we fetch tools at least once self._cache_dirty = True - self._tools_list: list[MCPTool] | None = None + self._tools_list: Optional[List[MCPTool]] = None @abc.abstractmethod def create_streams( - self, - ) -> AbstractAsyncContextManager[ - tuple[ + self + ) -> AbstractAsyncContextManager[tuple[ MemoryObjectReceiveStream[JSONRPCMessage | Exception], MemoryObjectSendStream[JSONRPCMessage], - ] - ]: - """Create the streams for the server.""" + ]]: + """Supply the read/write streams for the MCP transport.""" pass - async def __aenter__(self): + async def __aenter__(self) -> MCPServer: await self.connect() - return self + return self # type: ignore - async def __aexit__(self, exc_type, exc_value, traceback): + async def __aexit__(self, exc_type, exc_value, tb) -> None: await self.cleanup() - def invalidate_tools_cache(self): - """Invalidate the tools cache.""" - self._cache_dirty = True - - async def connect(self): - """Connect to the server.""" + async def connect(self) -> None: + """Initialize transport and ClientSession.""" try: transport = await self.exit_stack.enter_async_context( - self.create_streams() - ) + self.create_streams()) read, write = transport session = await self.exit_stack.enter_async_context( - ClientSession(read, write) - ) + ClientSession(read, write)) await session.initialize() self.session = session except Exception as e: @@ -127,266 +90,222 @@ class _MCPServerWithClientSession(MCPServer, abc.ABC): await self.cleanup() raise - async def list_tools(self) -> list[MCPTool]: - """List the tools available on the server.""" + async def cleanup(self) -> None: + """Close session and transport.""" + async with self._cleanup_lock: + try: + await self.exit_stack.aclose() + except Exception as e: + logger.error(f"Error during cleanup: {e}") + finally: + self.session = None + + async def list_tools(self) -> List[MCPTool]: if not self.session: - raise Exception( - "Server not initialized. Make sure you call `connect()` first." - ) - - # Return from cache if caching is enabled, we have tools, and the cache is not dirty - if ( - self.cache_tools_list - and not self._cache_dirty - and self._tools_list - ): + raise RuntimeError("Server not connected. Call connect() first.") + if self.cache_tools_list and not self._cache_dirty and self._tools_list: return self._tools_list - - # Reset the cache dirty to False self._cache_dirty = False - - # Fetch the tools from the server self._tools_list = (await self.session.list_tools()).tools - return self._tools_list + return self._tools_list # type: ignore async def call_tool( - self, arguments: dict[str, Any] | None - ) -> CallToolResult: - """Invoke a tool on the server.""" - tool_name = arguments.get("tool_name") or arguments.get( - "name" - ) - - if not tool_name: - raise Exception("No tool name found in arguments") - + self, + tool_name: str | None = None, + arguments: Dict[str, Any] | None = None) -> CallToolResult: + if not arguments: + raise ValueError("Arguments dict is required to call a tool") + name = tool_name or arguments.get("tool_name") or arguments.get("name") + if not name: + raise ValueError("Tool name missing in arguments") if not self.session: - raise Exception( - "Server not initialized. Make sure you call `connect()` first." - ) - - return await self.session.call_tool(tool_name, arguments) - - async def cleanup(self): - """Cleanup the server.""" - async with self._cleanup_lock: - try: - await self.exit_stack.aclose() - self.session = None - except Exception as e: - logger.error(f"Error cleaning up server: {e}") + raise RuntimeError("Server not connected. Call connect() first.") + return await self.session.call_tool(name, arguments) class MCPServerStdioParams(TypedDict): - """Mirrors `mcp.client.stdio.StdioServerParameters`, but lets you pass params without another - import. - """ - + """Configuration for stdio transport.""" command: str - """The executable to run to start the server. For example, `python` or `node`.""" - - args: NotRequired[list[str]] - """Command line args to pass to the `command` executable. For example, `['foo.py']` or - `['server.js', '--port', '8080']`.""" - - env: NotRequired[dict[str, str]] - """The environment variables to set for the server. .""" - + args: NotRequired[List[str]] + env: NotRequired[Dict[str, str]] cwd: NotRequired[str | Path] - """The working directory to use when spawning the process.""" - encoding: NotRequired[str] - """The text encoding used when sending/receiving messages to the server. Defaults to `utf-8`.""" - - encoding_error_handler: NotRequired[ - Literal["strict", "ignore", "replace"] - ] - """The text encoding error handler. Defaults to `strict`. - - See https://docs.python.org/3/library/codecs.html#codec-base-classes for - explanations of possible values. - """ + encoding_error_handler: NotRequired[Literal["strict", "ignore", "replace"]] class MCPServerStdio(_MCPServerWithClientSession): - """MCP server implementation that uses the stdio transport. See the [spec] - (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for - details. - """ + """MCP server over stdio transport.""" def __init__( self, params: MCPServerStdioParams, cache_tools_list: bool = False, - name: str | None = None, + name: Optional[str] = None, ): - """Create a new MCP server based on the stdio transport. - - Args: - params: The params that configure the server. This includes the command to run to - start the server, the args to pass to the command, the environment variables to - set for the server, the working directory to use when spawning the process, and - the text encoding used when sending/receiving messages to the server. - cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be - cached and only fetched from the server once. If `False`, the tools list will be - fetched from the server on each call to `list_tools()`. The cache can be - invalidated by calling `invalidate_tools_cache()`. You should set this to `True` - if you know the server will not change its tools list, because it can drastically - improve latency (by avoiding a round-trip to the server every time). - name: A readable name for the server. If not provided, we'll create one from the - command. - """ super().__init__(cache_tools_list) - self.params = StdioServerParameters( command=params["command"], args=params.get("args", []), env=params.get("env"), cwd=params.get("cwd"), encoding=params.get("encoding", "utf-8"), - encoding_error_handler=params.get( - "encoding_error_handler", "strict" - ), + encoding_error_handler=params.get("encoding_error_handler", + "strict"), ) - - self._name = name or f"stdio: {self.params.command}" + self._name = name or f"stdio:{self.params.command}" def create_streams( - self, - ) -> AbstractAsyncContextManager[ - tuple[ + self + ) -> AbstractAsyncContextManager[tuple[ MemoryObjectReceiveStream[JSONRPCMessage | Exception], MemoryObjectSendStream[JSONRPCMessage], - ] - ]: - """Create the streams for the server.""" + ]]: return stdio_client(self.params) @property def name(self) -> str: - """A readable name for the server.""" return self._name class MCPServerSseParams(TypedDict): - """Mirrors the params in`mcp.client.sse.sse_client`.""" - + """Configuration for HTTP+SSE transport.""" url: str - """The URL of the server.""" - - headers: NotRequired[dict[str, str]] - """The headers to send to the server.""" - + headers: NotRequired[Dict[str, str]] timeout: NotRequired[float] - """The timeout for the HTTP request. Defaults to 5 seconds.""" - sse_read_timeout: NotRequired[float] - """The timeout for the SSE connection, in seconds. Defaults to 5 minutes.""" class MCPServerSse(_MCPServerWithClientSession): - """MCP server implementation that uses the HTTP with SSE transport. See the [spec] - (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) - for details. - """ + """MCP server over HTTP with SSE transport.""" def __init__( self, params: MCPServerSseParams, cache_tools_list: bool = False, - name: str | None = None, + name: Optional[str] = None, ): - """Create a new MCP server based on the HTTP with SSE transport. - - Args: - params: The params that configure the server. This includes the URL of the server, - the headers to send to the server, the timeout for the HTTP request, and the - timeout for the SSE connection. - - cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be - cached and only fetched from the server once. If `False`, the tools list will be - fetched from the server on each call to `list_tools()`. The cache can be - invalidated by calling `invalidate_tools_cache()`. You should set this to `True` - if you know the server will not change its tools list, because it can drastically - improve latency (by avoiding a round-trip to the server every time). - - name: A readable name for the server. If not provided, we'll create one from the - URL. - """ super().__init__(cache_tools_list) - self.params = params - self._name = name or f"sse: {self.params['url']}" + self._name = name or f"sse:{params['url']}" def create_streams( - self, - ) -> AbstractAsyncContextManager[ - tuple[ + self + ) -> AbstractAsyncContextManager[tuple[ MemoryObjectReceiveStream[JSONRPCMessage | Exception], MemoryObjectSendStream[JSONRPCMessage], - ] - ]: - """Create the streams for the server.""" + ]]: return sse_client( url=self.params["url"], - headers=self.params.get("headers", None), + headers=self.params.get("headers"), timeout=self.params.get("timeout", 5), - sse_read_timeout=self.params.get( - "sse_read_timeout", 60 * 5 - ), + sse_read_timeout=self.params.get("sse_read_timeout", 300), ) @property def name(self) -> str: - """A readable name for the server.""" return self._name -def mcp_flow_get_tool_schema( - params: MCPServerSseParams, -) -> MCPServer: - server = MCPServerSse(params, cache_tools_list=True) +async def call_tool_fast(server: MCPServerSse, + payload: Dict[str, Any] | str) -> Any: + """Async function to call a tool on a server with proper cleanup.""" + try: + await server.connect() + arguments = payload if isinstance(payload, dict) else None + result = await server.call_tool(arguments=arguments) + return result + finally: + await server.cleanup() - # Connect the server - asyncio.run(server.connect()) - # Return the server - output = asyncio.run(server.list_tools()) +async def mcp_flow_get_tool_schema(params: MCPServerSseParams, ) -> Any: + """Async function to get tool schema from MCP server.""" + async with MCPServerSse(params) as server: + tools = await server.list_tools() + return tools - # Cleanup the server - asyncio.run(server.cleanup()) - return output.model_dump() - - -def mcp_flow( +async def mcp_flow( params: MCPServerSseParams, - function_call: dict[str, Any], -) -> MCPServer: - server = MCPServerSse(params, cache_tools_list=True) - - # Connect the server - asyncio.run(server.connect()) - - # Return the server - output = asyncio.run(server.call_tool(function_call)) + function_call: Dict[str, Any] | str, +) -> Any: + """Async function to call a tool with given parameters.""" + async with MCPServerSse(params) as server: + return await call_tool_fast(server, function_call) + + +async def _call_one_server(params: MCPServerSseParams, + payload: Dict[str, Any] | str) -> Any: + """Helper function to call a single MCP server.""" + server = MCPServerSse(params) + try: + await server.connect() + arguments = payload if isinstance(payload, dict) else None + return await server.call_tool(arguments=arguments) + finally: + await server.cleanup() + + +async def abatch_mcp_flow(params: List[MCPServerSseParams], + payload: Dict[str, Any] | str) -> List[Any]: + """Async function to execute a batch of MCP calls concurrently. + + Args: + params (List[MCPServerSseParams]): List of MCP server configurations + payload (Dict[str, Any] | str): The payload to send to each server + + Returns: + List[Any]: Results from all MCP servers + """ + if not params: + logger.warning("No MCP servers provided for batch operation") + return [] - output = output.model_dump() + try: + return await asyncio.gather( + *[_call_one_server(p, payload) for p in params]) + except Exception as e: + logger.error(f"Error in abatch_mcp_flow: {e}") + # Return partial results if any were successful + return [f"Error in batch operation: {str(e)}"] - # Cleanup the server - asyncio.run(server.cleanup()) - return any_to_str(output) +def batch_mcp_flow(params: List[MCPServerSseParams], + payload: Dict[str, Any] | str) -> List[Any]: + """Sync wrapper for batch MCP operations. + This creates a new event loop if needed to run the async batch operation. + ONLY use this when not already in an async context. -def batch_mcp_flow( - params: List[MCPServerSseParams], - function_call: List[dict[str, Any]] = [], -) -> MCPServer: - output_list = [] + Args: + params (List[MCPServerSseParams]): List of MCP server configurations + payload (Dict[str, Any] | str): The payload to send to each server - for param in params: - output = mcp_flow(param, function_call) - output_list.append(output) + Returns: + List[Any]: Results from all MCP servers + """ + if not params: + logger.warning("No MCP servers provided for batch operation") + return [] - return output_list + try: + # Check if we're already in an event loop + try: + loop = asyncio.get_event_loop() + except RuntimeError: + # No event loop exists, create one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + if loop.is_running(): + # We're already in an async context, can't use asyncio.run + # Use a future to bridge sync-async gap + future = asyncio.run_coroutine_threadsafe( + abatch_mcp_flow(params, payload), loop) + return future.result(timeout=30) # Add timeout to prevent hanging + else: + # We're not in an async context, safe to use loop.run_until_complete + return loop.run_until_complete(abatch_mcp_flow(params, payload)) + except Exception as e: + logger.error(f"Error in batch_mcp_flow: {e}") + return [f"Error in batch operation: {str(e)}"] diff --git a/tests/test_basic_example.py b/tests/test_basic_example.py new file mode 100644 index 00000000..d626bbe9 --- /dev/null +++ b/tests/test_basic_example.py @@ -0,0 +1,23 @@ + +import unittest +from swarms.structs.agent import Agent + +class TestBasicExample(unittest.TestCase): + def setUp(self): + self.agent = Agent( + agent_name="Test-Agent", + agent_description="A test agent", + system_prompt="You are a helpful assistant.", + model_name="gpt-4", + ) + + def test_agent_initialization(self): + self.assertEqual(self.agent.agent_name, "Test-Agent") + self.assertEqual(self.agent.agent_description, "A test agent") + + def test_agent_run(self): + response = self.agent.run("What is 2+2?") + self.assertIsNotNone(response) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/tools/test_mcp_integration.py b/tests/tools/test_mcp_integration.py new file mode 100644 index 00000000..41db7ebe --- /dev/null +++ b/tests/tools/test_mcp_integration.py @@ -0,0 +1,136 @@ + +import pytest +import asyncio +from swarms.tools.mcp_integration import ( + MCPServer, + MCPServerStdio, + MCPServerSse, + mcp_flow, + mcp_flow_get_tool_schema, + batch_mcp_flow +) + +# Test basic server connectivity +def test_server_connection(): + """ + Test that a user can connect to the MCP server successfully + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params, cache_tools_list=True) + + # Connect should work + asyncio.run(server.connect()) + assert server.session is not None + + # Cleanup should work + asyncio.run(server.cleanup()) + assert server.session is None + +# Test tool listing functionality +def test_list_tools(): + """ + Test that a user can retrieve available tools from the server + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params) + + asyncio.run(server.connect()) + tools = asyncio.run(server.list_tools()) + + assert isinstance(tools, list) + assert len(tools) > 0 + + asyncio.run(server.cleanup()) + +# Test tool execution +def test_tool_execution(): + """ + Test that a user can execute a tool successfully + """ + params = {"url": "http://localhost:8000"} + function_call = { + "tool_name": "add", + "arguments": {"a": 5, "b": 3} + } + + result = mcp_flow(params, function_call) + assert result is not None + +# Test batch operations +def test_batch_execution(): + """ + Test that a user can execute multiple tools in batch + """ + params_list = [ + {"url": "http://localhost:8000"}, + {"url": "http://localhost:8000"} + ] + function_calls = [ + {"tool_name": "add", "arguments": {"a": 1, "b": 2}}, + {"tool_name": "subtract", "arguments": {"a": 5, "b": 3}} + ] + + results = batch_mcp_flow(params_list, function_calls) + assert len(results) == 2 + assert all(result is not None for result in results) + +# Test error handling +def test_error_handling(): + """ + Test that users receive proper error messages for invalid operations + """ + params = {"url": "http://localhost:8000"} + invalid_function = { + "tool_name": "nonexistent_tool", + "arguments": {} + } + + with pytest.raises(Exception): + mcp_flow(params, invalid_function) + +# Test tool schema retrieval +def test_get_tool_schema(): + """ + Test that users can retrieve tool schemas correctly + """ + params = {"url": "http://localhost:8000"} + schema = mcp_flow_get_tool_schema(params) + + assert isinstance(schema, dict) + assert "tools" in schema or "functions" in schema + +# Test server reconnection +def test_server_reconnection(): + """ + Test that users can reconnect to the server after disconnection + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params) + + # First connection + asyncio.run(server.connect()) + asyncio.run(server.cleanup()) + + # Second connection should work + asyncio.run(server.connect()) + assert server.session is not None + asyncio.run(server.cleanup()) + +# Test cache functionality +def test_cache_behavior(): + """ + Test that tool caching works as expected for users + """ + params = {"url": "http://localhost:8000"} + server = MCPServerSse(params, cache_tools_list=True) + + asyncio.run(server.connect()) + + # First call should cache + tools1 = asyncio.run(server.list_tools()) + # Second call should use cache + tools2 = asyncio.run(server.list_tools()) + + assert tools1 == tools2 + + asyncio.run(server.cleanup())