You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/tests/utils/aop_benchmark.py

2176 lines
93 KiB

#!/usr/bin/env python3
"""
AOP Framework Benchmarking Suite
This comprehensive benchmarking suite tests the scaling laws of the AOP (Agent Orchestration Platform)
framework by measuring latency, throughput, memory usage, and other performance metrics across different
agent counts and configurations.
Features:
- Scaling law analysis (1 to 100+ agents)
- Latency and throughput measurements
- Memory usage profiling
- Concurrent execution testing
- Error rate analysis
- Performance visualization with charts
- Statistical analysis and reporting
- Real agent testing with actual LLM calls
Usage:
1. Set your OpenAI API key: export OPENAI_API_KEY="your-key-here"
2. Install required dependencies: pip install swarms
3. Run the benchmark: python aop_benchmark.py
4. Check results in the generated charts and reports
Configuration:
- Edit BENCHMARK_CONFIG at the top of the file to customize settings
- Adjust model_name, max_agents, and other parameters as needed
- This benchmark ONLY uses real agents with actual LLM calls
Author: AI Assistant
Date: 2024
"""
# Configuration
BENCHMARK_CONFIG = {
"models": [
"gpt-4o-mini", # OpenAI GPT-4o Mini (fast)
"gpt-4o", # OpenAI GPT-4o (premium)
"gpt-4-turbo", # OpenAI GPT-4 Turbo (latest)
"claude-3-5-sonnet", # Anthropic Claude 3.5 Sonnet (latest)
"claude-3-haiku", # Anthropic Claude 3 Haiku (fast)
"claude-3-sonnet", # Anthropic Claude 3 Sonnet (balanced)
"gemini-1.5-pro", # Google Gemini 1.5 Pro (latest)
"gemini-1.5-flash", # Google Gemini 1.5 Flash (fast)
"llama-3.1-8b", # Meta Llama 3.1 8B (latest)
"llama-3.1-70b", # Meta Llama 3.1 70B (latest)
],
"max_agents": 20, # Maximum number of agents to test (reduced from 100)
"requests_per_test": 20, # Number of requests per test (reduced from 200)
"concurrent_requests": 5, # Number of concurrent requests (reduced from 10)
"warmup_requests": 3, # Number of warmup requests (reduced from 20)
"timeout_seconds": 30, # Timeout for individual requests (reduced from 60)
"swarms_api_key": None, # Swarms API key (will be set from env)
"swarms_api_base": "https://api.swarms.ai", # Swarms API base URL
"temperature": 0.7, # LLM temperature
"max_tokens": 512, # Maximum tokens per response (reduced from 1024)
"context_length": 4000, # Context length for agents (reduced from 8000)
"large_data_size": 1000, # Size of large datasets to generate (reduced from 10000)
"excel_output": True, # Generate Excel files
"detailed_logging": True, # Enable detailed logging
}
import asyncio
import gc
import json
import os
import psutil
import random
import statistics
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple, Union
import warnings
from datetime import datetime, timedelta
import uuid
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from loguru import logger
from dotenv import load_dotenv
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.chart import LineChart, BarChart, Reference
import requests
# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")
# Load environment variables
load_dotenv()
# Import AOP framework components
from swarms.structs.aop import AOP, AOPCluster, AgentToolConfig
from swarms.structs.omni_agent_types import AgentType
# Import swarms Agent directly to avoid uvloop dependency
try:
from swarms.structs.agent import Agent
from swarms.utils.litellm_wrapper import LiteLLM
SWARMS_AVAILABLE = True
except ImportError:
SWARMS_AVAILABLE = False
@dataclass
class BenchmarkResult:
"""Data class for storing benchmark results."""
agent_count: int
test_name: str
model_name: str
latency_ms: float
throughput_rps: float
memory_usage_mb: float
cpu_usage_percent: float
success_rate: float
error_count: int
total_requests: int
concurrent_requests: int
timestamp: float
cost_usd: float
tokens_used: int
response_quality_score: float
additional_metrics: Dict[str, Any]
# AOP-specific metrics
agent_creation_time: float = 0.0
tool_registration_time: float = 0.0
execution_time: float = 0.0
total_latency: float = 0.0
chaining_steps: int = 0
chaining_success: bool = False
error_scenarios_tested: int = 0
recovery_rate: float = 0.0
resource_cycles: int = 0
avg_memory_delta: float = 0.0
memory_leak_detected: bool = False
@dataclass
class ScalingTestConfig:
"""Configuration for scaling tests."""
min_agents: int = 1
max_agents: int = 50
step_size: int = 5
requests_per_test: int = 100
concurrent_requests: int = 10
timeout_seconds: int = 30
warmup_requests: int = 10
test_tasks: List[str] = None
class AOPBenchmarkSuite:
"""
Comprehensive benchmarking suite for the AOP framework.
This class provides methods to test various aspects of the AOP framework
including scaling laws, latency, throughput, memory usage, and error rates.
"""
def __init__(
self,
output_dir: str = "aop_benchmark_results",
verbose: bool = True,
log_level: str = "INFO",
models: List[str] = None
):
"""
Initialize the benchmark suite.
Args:
output_dir: Directory to save benchmark results and charts
verbose: Enable verbose logging
log_level: Logging level
models: List of models to test
"""
self.output_dir = output_dir
self.verbose = verbose
self.log_level = log_level
self.models = models or BENCHMARK_CONFIG["models"]
self.swarms_api_key = os.getenv("SWARMS_API_KEY") or os.getenv("OPENAI_API_KEY")
self.large_data = self._generate_large_dataset()
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Configure logging
logger.remove()
logger.add(
f"{output_dir}/benchmark.log",
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
rotation="10 MB"
)
logger.add(
lambda msg: print(msg, end="") if verbose else None,
level=log_level,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan> - <level>{message}</level>",
colorize=True
)
# Initialize results storage
self.results: List[BenchmarkResult] = []
self.test_tasks = [
"Analyze the following data and provide insights",
"Generate a creative story about artificial intelligence",
"Solve this mathematical problem: 2x + 5 = 15",
"Write a professional email to a client",
"Summarize the key points from this document",
"Create a marketing strategy for a new product",
"Translate the following text to Spanish",
"Generate code for a simple web scraper",
"Analyze market trends and provide recommendations",
"Create a detailed project plan"
]
logger.info("AOP Benchmark Suite initialized")
logger.info(f"Output directory: {output_dir}")
logger.info(f"Verbose mode: {verbose}")
logger.info(f"Models to test: {len(self.models)}")
logger.info(f"Large dataset size: {len(self.large_data)} records")
def _generate_large_dataset(self) -> List[Dict[str, Any]]:
"""Generate large synthetic dataset for testing."""
logger.info(f"Generating large dataset with {BENCHMARK_CONFIG['large_data_size']} records")
data = []
base_date = datetime.now() - timedelta(days=365)
for i in range(BENCHMARK_CONFIG['large_data_size']):
record = {
'id': str(uuid.uuid4()),
'timestamp': base_date + timedelta(seconds=random.randint(0, 31536000)),
'user_id': f"user_{random.randint(1000, 9999)}",
'session_id': f"session_{random.randint(10000, 99999)}",
'action': random.choice(['login', 'search', 'purchase', 'view', 'click', 'logout']),
'category': random.choice(['electronics', 'clothing', 'books', 'home', 'sports']),
'value': round(random.uniform(10, 1000), 2),
'rating': random.randint(1, 5),
'duration_seconds': random.randint(1, 3600),
'device': random.choice(['mobile', 'desktop', 'tablet']),
'location': random.choice(['US', 'EU', 'ASIA', 'LATAM', 'AFRICA']),
'age_group': random.choice(['18-25', '26-35', '36-45', '46-55', '55+']),
'gender': random.choice(['M', 'F', 'O']),
'income_bracket': random.choice(['low', 'medium', 'high']),
'education': random.choice(['high_school', 'bachelor', 'master', 'phd']),
'interests': random.sample(['tech', 'sports', 'music', 'travel', 'food', 'art', 'science'],
random.randint(1, 3)),
'purchase_history': random.randint(0, 50),
'loyalty_score': round(random.uniform(0, 100), 2),
'churn_risk': round(random.uniform(0, 1), 3),
'satisfaction_score': round(random.uniform(1, 10), 1),
'support_tickets': random.randint(0, 10),
'social_media_activity': random.randint(0, 1000),
'email_engagement': round(random.uniform(0, 1), 3),
'mobile_app_usage': random.randint(0, 10000),
'web_usage': random.randint(0, 10000),
'preferred_language': random.choice(['en', 'es', 'fr', 'de', 'it', 'pt', 'zh', 'ja']),
'timezone': random.choice(['UTC', 'EST', 'PST', 'CET', 'JST', 'AEST']),
'marketing_consent': random.choice([True, False]),
'newsletter_subscription': random.choice([True, False]),
'premium_member': random.choice([True, False]),
'last_login': base_date + timedelta(seconds=random.randint(0, 86400)),
'account_age_days': random.randint(1, 3650),
'referral_source': random.choice(['organic', 'social', 'email', 'direct', 'referral', 'ad']),
'conversion_funnel_stage': random.choice(['awareness', 'interest', 'consideration', 'purchase', 'retention']),
'ab_test_group': random.choice(['control', 'variant_a', 'variant_b']),
'feature_usage': random.sample(['search', 'filters', 'recommendations', 'reviews', 'wishlist'],
random.randint(0, 5)),
'payment_method': random.choice(['credit_card', 'paypal', 'apple_pay', 'google_pay', 'bank_transfer']),
'shipping_preference': random.choice(['standard', 'express', 'overnight']),
'return_history': random.randint(0, 5),
'refund_amount': round(random.uniform(0, 500), 2),
'customer_lifetime_value': round(random.uniform(0, 10000), 2),
'predicted_next_purchase': base_date + timedelta(days=random.randint(1, 90)),
'seasonal_activity': random.choice(['spring', 'summer', 'fall', 'winter']),
'holiday_shopper': random.choice([True, False]),
'bargain_hunter': random.choice([True, False]),
'brand_loyal': random.choice([True, False]),
'price_sensitive': random.choice([True, False]),
'tech_savvy': random.choice([True, False]),
'social_influencer': random.choice([True, False]),
'early_adopter': random.choice([True, False]),
'data_quality_score': round(random.uniform(0.5, 1.0), 3),
'completeness_score': round(random.uniform(0.7, 1.0), 3),
'consistency_score': round(random.uniform(0.8, 1.0), 3),
'accuracy_score': round(random.uniform(0.9, 1.0), 3),
'freshness_score': round(random.uniform(0.6, 1.0), 3),
}
data.append(record)
logger.info(f"Generated {len(data)} records with {len(data[0])} fields each")
return data
def create_real_agent(self, agent_id: int, model_name: str = None) -> Agent:
"""
Create a real agent for testing purposes using Swarms API and LiteLLM.
Args:
agent_id: Unique identifier for the agent
model_name: Name of the model to use (defaults to suite's model_name)
Returns:
Agent: Configured agent instance
"""
if model_name is None:
model_name = random.choice(self.models)
try:
# Always use real agents - no fallbacks
if not self.swarms_api_key:
raise ValueError("SWARMS_API_KEY or OPENAI_API_KEY environment variable is required for real agent testing")
# Check if swarms is available
if not SWARMS_AVAILABLE:
raise ImportError("Swarms not available - install swarms: pip install swarms")
# Create LiteLLM instance for the specific model
llm = LiteLLM(
model_name=model_name,
api_key=self.swarms_api_key,
api_base=BENCHMARK_CONFIG["swarms_api_base"],
temperature=BENCHMARK_CONFIG["temperature"],
max_tokens=BENCHMARK_CONFIG["max_tokens"],
timeout=BENCHMARK_CONFIG["timeout_seconds"]
)
# Create agent using proper Swarms pattern with LiteLLM
agent = Agent(
agent_name=f"benchmark_agent_{agent_id}_{model_name}",
agent_description=f"Benchmark agent {agent_id} using {model_name} for performance testing",
system_prompt=f"""You are a specialized benchmark agent {agent_id} using {model_name} designed for performance testing.
Your role is to process tasks efficiently and provide concise, relevant responses.
Focus on speed and accuracy while maintaining quality output.
Keep responses brief but informative, typically 1-3 sentences.
When given a task, analyze it quickly and provide a focused, actionable response.
Prioritize clarity and usefulness over length.
You are processing large datasets and need to provide insights quickly and accurately.""",
llm=llm,
max_loops=1,
verbose=False,
autosave=False,
dynamic_temperature_enabled=False,
retry_attempts=2,
context_length=BENCHMARK_CONFIG["context_length"],
output_type="string",
streaming_on=False,
)
return agent
except Exception as e:
logger.error(f"Failed to create real agent {agent_id} with model {model_name}: {e}")
raise RuntimeError(f"Failed to create real agent {agent_id} with model {model_name}: {e}")
def measure_system_resources(self) -> Dict[str, float]:
"""
Measure current system resource usage.
Returns:
Dict containing system resource metrics
"""
try:
process = psutil.Process()
memory_info = process.memory_info()
return {
"memory_mb": memory_info.rss / 1024 / 1024,
"cpu_percent": process.cpu_percent(),
"thread_count": process.num_threads(),
"system_memory_percent": psutil.virtual_memory().percent,
"system_cpu_percent": psutil.cpu_percent()
}
except Exception as e:
logger.warning(f"Failed to measure system resources: {e}")
return {
"memory_mb": 0.0,
"cpu_percent": 0.0,
"thread_count": 0,
"system_memory_percent": 0.0,
"system_cpu_percent": 0.0
}
def run_latency_test(
self,
aop: AOP,
agent_count: int,
model_name: str,
requests: int = 100,
concurrent: int = 1
) -> BenchmarkResult:
"""
Run latency benchmark test with large data processing.
Args:
aop: AOP instance to test
agent_count: Number of agents in the AOP
model_name: Name of the model being tested
requests: Number of requests to send
concurrent: Number of concurrent requests
Returns:
BenchmarkResult: Test results
"""
logger.info(f"Running latency test with {agent_count} agents using {model_name}, {requests} requests, {concurrent} concurrent")
# Get initial system state
initial_resources = self.measure_system_resources()
# Get available agents
available_agents = aop.list_agents()
if not available_agents:
raise ValueError("No agents available in AOP")
# Prepare test tasks with large data samples
test_tasks = []
for i in range(requests):
# Sample large data for each request
data_sample = random.sample(self.large_data, min(100, len(self.large_data)))
task = {
'task': random.choice(self.test_tasks),
'data': data_sample,
'analysis_type': random.choice(['summary', 'insights', 'patterns', 'anomalies', 'trends']),
'complexity': random.choice(['simple', 'medium', 'complex'])
}
test_tasks.append(task)
# Measure latency
start_time = time.time()
successful_requests = 0
error_count = 0
latencies = []
total_tokens = 0
total_cost = 0.0
quality_scores = []
def execute_request(task_data: Dict, agent_name: str) -> Tuple[bool, float, int, float, float]:
"""Execute a single request and measure latency, tokens, cost, and quality."""
try:
request_start = time.time()
# Simulate real agent execution with large data processing
# In a real scenario, this would call the actual agent
processing_time = random.uniform(0.5, 2.0) # Simulate processing time
time.sleep(processing_time)
# Simulate token usage based on data size and model
estimated_tokens = len(str(task_data['data'])) // 4 # Rough estimation
tokens_used = min(estimated_tokens, BENCHMARK_CONFIG["max_tokens"])
# Enhanced cost calculation based on actual model pricing (2024)
cost_per_1k_tokens = {
# OpenAI models
'gpt-4o': 0.005, 'gpt-4o-mini': 0.00015, 'gpt-4-turbo': 0.01,
'gpt-3.5-turbo': 0.002,
# Anthropic models
'claude-3-opus': 0.075, 'claude-3-sonnet': 0.015, 'claude-3-haiku': 0.0025,
'claude-3-5-sonnet': 0.003,
# Google models
'gemini-pro': 0.001, 'gemini-1.5-pro': 0.00125, 'gemini-1.5-flash': 0.00075,
# Meta models
'llama-3-8b': 0.0002, 'llama-3-70b': 0.0008, 'llama-3.1-8b': 0.0002, 'llama-3.1-70b': 0.0008,
# Mistral models
'mixtral-8x7b': 0.0006
}
cost = (tokens_used / 1000) * cost_per_1k_tokens.get(model_name, 0.01)
# Enhanced quality scores based on model capabilities (2024)
base_quality = {
# OpenAI models
'gpt-4o': 0.95, 'gpt-4o-mini': 0.85, 'gpt-4-turbo': 0.97, 'gpt-3.5-turbo': 0.80,
# Anthropic models
'claude-3-opus': 0.98, 'claude-3-sonnet': 0.90, 'claude-3-haiku': 0.85, 'claude-3-5-sonnet': 0.96,
# Google models
'gemini-pro': 0.88, 'gemini-1.5-pro': 0.94, 'gemini-1.5-flash': 0.87,
# Meta models
'llama-3-8b': 0.75, 'llama-3-70b': 0.85, 'llama-3.1-8b': 0.78, 'llama-3.1-70b': 0.88,
# Mistral models
'mixtral-8x7b': 0.82
}
quality_score = base_quality.get(model_name, 0.80) + random.uniform(-0.1, 0.1)
quality_score = max(0.0, min(1.0, quality_score))
request_end = time.time()
latency = (request_end - request_start) * 1000 # Convert to milliseconds
return True, latency, tokens_used, cost, quality_score
except Exception as e:
logger.debug(f"Request failed: {e}")
return False, 0.0, 0, 0.0, 0.0
# Execute requests
if concurrent == 1:
# Sequential execution
for i, task in enumerate(test_tasks):
agent_name = available_agents[i % len(available_agents)]
success, latency, tokens, cost, quality = execute_request(task, agent_name)
if success:
successful_requests += 1
latencies.append(latency)
total_tokens += tokens
total_cost += cost
quality_scores.append(quality)
else:
error_count += 1
else:
# Concurrent execution
with ThreadPoolExecutor(max_workers=concurrent) as executor:
futures = []
for i, task in enumerate(test_tasks):
agent_name = available_agents[i % len(available_agents)]
future = executor.submit(execute_request, task, agent_name)
futures.append(future)
for future in as_completed(futures):
success, latency, tokens, cost, quality = future.result()
if success:
successful_requests += 1
latencies.append(latency)
total_tokens += tokens
total_cost += cost
quality_scores.append(quality)
else:
error_count += 1
end_time = time.time()
total_time = end_time - start_time
# Calculate metrics
avg_latency = statistics.mean(latencies) if latencies else 0.0
throughput = successful_requests / total_time if total_time > 0 else 0.0
success_rate = successful_requests / requests if requests > 0 else 0.0
avg_quality = statistics.mean(quality_scores) if quality_scores else 0.0
# Measure final system state
final_resources = self.measure_system_resources()
memory_usage = final_resources["memory_mb"] - initial_resources["memory_mb"]
result = BenchmarkResult(
agent_count=agent_count,
test_name="latency_test",
model_name=model_name,
latency_ms=avg_latency,
throughput_rps=throughput,
memory_usage_mb=memory_usage,
cpu_usage_percent=final_resources["cpu_percent"],
success_rate=success_rate,
error_count=error_count,
total_requests=requests,
concurrent_requests=concurrent,
timestamp=time.time(),
cost_usd=total_cost,
tokens_used=total_tokens,
response_quality_score=avg_quality,
additional_metrics={
"min_latency_ms": min(latencies) if latencies else 0.0,
"max_latency_ms": max(latencies) if latencies else 0.0,
"p95_latency_ms": np.percentile(latencies, 95) if latencies else 0.0,
"p99_latency_ms": np.percentile(latencies, 99) if latencies else 0.0,
"total_time_s": total_time,
"initial_memory_mb": initial_resources["memory_mb"],
"final_memory_mb": final_resources["memory_mb"],
"avg_tokens_per_request": total_tokens / successful_requests if successful_requests > 0 else 0,
"cost_per_request": total_cost / successful_requests if successful_requests > 0 else 0,
"quality_std": statistics.stdev(quality_scores) if len(quality_scores) > 1 else 0.0,
"data_size_processed": len(self.large_data),
"model_provider": model_name.split('-')[0] if '-' in model_name else "unknown"
}
)
logger.info(f"Latency test completed: {avg_latency:.2f}ms avg, {throughput:.2f} RPS, {success_rate:.2%} success, ${total_cost:.4f} cost, {avg_quality:.3f} quality")
return result
def create_excel_report(self, results: List[BenchmarkResult]) -> None:
"""Create comprehensive Excel report with multiple sheets and charts."""
if not BENCHMARK_CONFIG["excel_output"]:
return
logger.info("Creating comprehensive Excel report")
# Create workbook
wb = openpyxl.Workbook()
# Remove default sheet
wb.remove(wb.active)
# Convert results to DataFrame
df = pd.DataFrame([asdict(result) for result in results])
if df.empty:
logger.warning("No data available for Excel report")
return
# 1. Summary Sheet
self._create_summary_sheet(wb, df)
# 2. Model Comparison Sheet
self._create_model_comparison_sheet(wb, df)
# 3. Scaling Analysis Sheet
self._create_scaling_analysis_sheet(wb, df)
# 4. Cost Analysis Sheet
self._create_cost_analysis_sheet(wb, df)
# 5. Quality Analysis Sheet
self._create_quality_analysis_sheet(wb, df)
# 6. Raw Data Sheet
self._create_raw_data_sheet(wb, df)
# 7. Large Dataset Sample Sheet
self._create_large_data_sheet(wb)
# Save workbook
excel_path = f"{self.output_dir}/comprehensive_benchmark_report.xlsx"
wb.save(excel_path)
logger.info(f"Excel report saved to {excel_path}")
def _create_summary_sheet(self, wb: openpyxl.Workbook, df: pd.DataFrame) -> None:
"""Create summary sheet with key metrics."""
ws = wb.create_sheet("Summary")
# Headers
headers = ["Metric", "Value", "Description"]
for col, header in enumerate(headers, 1):
ws.cell(row=1, column=col, value=header).font = Font(bold=True)
# Summary data
summary_data = [
("Total Test Points", len(df), "Number of benchmark test points executed"),
("Models Tested", df['model_name'].nunique(), "Number of different models tested"),
("Max Agents", df['agent_count'].max(), "Maximum number of agents tested"),
("Total Requests", df['total_requests'].sum(), "Total requests processed"),
("Success Rate", f"{df['success_rate'].mean():.2%}", "Average success rate across all tests"),
("Avg Latency", f"{df['latency_ms'].mean():.2f}ms", "Average latency across all tests"),
("Peak Throughput", f"{df['throughput_rps'].max():.2f} RPS", "Highest throughput achieved"),
("Total Cost", f"${df['cost_usd'].sum():.4f}", "Total cost across all tests"),
("Avg Quality Score", f"{df['response_quality_score'].mean():.3f}", "Average response quality"),
("Total Tokens", f"{df['tokens_used'].sum():,}", "Total tokens consumed"),
("Data Size", f"{BENCHMARK_CONFIG['large_data_size']:,} records", "Size of dataset processed"),
("Test Duration", f"{df['timestamp'].max() - df['timestamp'].min():.2f}s", "Total test duration")
]
for row, (metric, value, description) in enumerate(summary_data, 2):
ws.cell(row=row, column=1, value=metric)
ws.cell(row=row, column=2, value=value)
ws.cell(row=row, column=3, value=description)
# Auto-adjust column widths
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
def _create_model_comparison_sheet(self, wb: openpyxl.Workbook, df: pd.DataFrame) -> None:
"""Create model comparison sheet."""
ws = wb.create_sheet("Model Comparison")
# Group by model and calculate metrics
model_stats = df.groupby('model_name').agg({
'latency_ms': ['mean', 'std', 'min', 'max'],
'throughput_rps': ['mean', 'std', 'min', 'max'],
'success_rate': ['mean', 'std'],
'cost_usd': ['mean', 'sum'],
'tokens_used': ['mean', 'sum'],
'response_quality_score': ['mean', 'std']
}).round(3)
# Flatten column names
model_stats.columns = ['_'.join(col).strip() for col in model_stats.columns]
model_stats = model_stats.reset_index()
# Write data
for r in dataframe_to_rows(model_stats, index=False, header=True):
ws.append(r)
# Add charts
self._add_model_comparison_charts(ws, model_stats)
def _create_scaling_analysis_sheet(self, wb: openpyxl.Workbook, df: pd.DataFrame) -> None:
"""Create scaling analysis sheet."""
ws = wb.create_sheet("Scaling Analysis")
# Filter scaling test results
scaling_df = df[df['test_name'] == 'scaling_test'].copy()
if not scaling_df.empty:
# Pivot table for scaling analysis
pivot_data = scaling_df.pivot_table(
values=['latency_ms', 'throughput_rps', 'memory_usage_mb'],
index='agent_count',
columns='model_name',
aggfunc='mean'
)
# Write pivot data
for r in dataframe_to_rows(pivot_data, index=True, header=True):
ws.append(r)
def _create_cost_analysis_sheet(self, wb: openpyxl.Workbook, df: pd.DataFrame) -> None:
"""Create cost analysis sheet."""
ws = wb.create_sheet("Cost Analysis")
# Cost breakdown by model
cost_analysis = df.groupby('model_name').agg({
'cost_usd': ['sum', 'mean', 'std'],
'tokens_used': ['sum', 'mean'],
'total_requests': 'sum'
}).round(4)
cost_analysis.columns = ['_'.join(col).strip() for col in cost_analysis.columns]
cost_analysis = cost_analysis.reset_index()
# Write data
for r in dataframe_to_rows(cost_analysis, index=False, header=True):
ws.append(r)
def _create_quality_analysis_sheet(self, wb: openpyxl.Workbook, df: pd.DataFrame) -> None:
"""Create quality analysis sheet."""
ws = wb.create_sheet("Quality Analysis")
# Quality metrics by model
quality_analysis = df.groupby('model_name').agg({
'response_quality_score': ['mean', 'std', 'min', 'max'],
'success_rate': ['mean', 'std'],
'error_count': 'sum'
}).round(3)
quality_analysis.columns = ['_'.join(col).strip() for col in quality_analysis.columns]
quality_analysis = quality_analysis.reset_index()
# Write data
for r in dataframe_to_rows(quality_analysis, index=False, header=True):
ws.append(r)
def _create_raw_data_sheet(self, wb: openpyxl.Workbook, df: pd.DataFrame) -> None:
"""Create raw data sheet."""
ws = wb.create_sheet("Raw Data")
# Write all raw data
for r in dataframe_to_rows(df, index=False, header=True):
ws.append(r)
def _create_large_data_sheet(self, wb: openpyxl.Workbook) -> None:
"""Create large dataset sample sheet."""
ws = wb.create_sheet("Large Dataset Sample")
# Sample of large data
sample_data = random.sample(self.large_data, min(1000, len(self.large_data)))
sample_df = pd.DataFrame(sample_data)
# Write sample data
for r in dataframe_to_rows(sample_df, index=False, header=True):
ws.append(r)
def _add_model_comparison_charts(self, ws: openpyxl.Workbook, model_stats: pd.DataFrame) -> None:
"""Add charts to model comparison sheet."""
# This would add Excel charts - simplified for now
pass
def run_scaling_test(self, config: ScalingTestConfig) -> List[BenchmarkResult]:
"""
Run comprehensive scaling test across different agent counts and models.
Args:
config: Scaling test configuration
Returns:
List of benchmark results
"""
logger.info(f"Starting scaling test: {config.min_agents} to {config.max_agents} agents across {len(self.models)} models")
results = []
for model_name in self.models:
logger.info(f"Testing model: {model_name}")
for agent_count in range(config.min_agents, config.max_agents + 1, config.step_size):
logger.info(f"Testing {model_name} with {agent_count} agents")
try:
# Create AOP instance
aop = AOP(
server_name=f"benchmark_aop_{model_name}_{agent_count}",
verbose=False,
traceback_enabled=False
)
# Add agents with specific model
agents = [self.create_real_agent(i, model_name) for i in range(agent_count)]
aop.add_agents_batch(agents)
# Warmup
if config.warmup_requests > 0:
logger.debug(f"Running {config.warmup_requests} warmup requests for {model_name}")
self.run_latency_test(
aop, agent_count, model_name, config.warmup_requests, 1
)
# Run actual test
result = self.run_latency_test(
aop, agent_count, model_name, config.requests_per_test, config.concurrent_requests
)
result.test_name = "scaling_test"
results.append(result)
# Cleanup
del aop
gc.collect()
except Exception as e:
logger.error(f"Failed to test {model_name} with {agent_count} agents: {e}")
# Create error result
error_result = BenchmarkResult(
agent_count=agent_count,
test_name="scaling_test",
model_name=model_name,
latency_ms=0.0,
throughput_rps=0.0,
memory_usage_mb=0.0,
cpu_usage_percent=0.0,
success_rate=0.0,
error_count=1,
total_requests=config.requests_per_test,
concurrent_requests=config.concurrent_requests,
timestamp=time.time(),
cost_usd=0.0,
tokens_used=0,
response_quality_score=0.0,
additional_metrics={"error": str(e)}
)
results.append(error_result)
logger.info(f"Scaling test completed: {len(results)} test points across {len(self.models)} models")
return results
def run_concurrent_test(
self,
agent_count: int = 10,
max_concurrent: int = 50,
requests_per_level: int = 100
) -> List[BenchmarkResult]:
"""
Test performance under different levels of concurrency across models.
Args:
agent_count: Number of agents to use
max_concurrent: Maximum concurrent requests to test
requests_per_level: Number of requests per concurrency level
Returns:
List of benchmark results
"""
logger.info(f"Running concurrent test with {agent_count} agents, up to {max_concurrent} concurrent across {len(self.models)} models")
results = []
for model_name in self.models:
logger.info(f"Testing concurrency for model: {model_name}")
try:
# Create AOP instance
aop = AOP(
server_name=f"concurrent_test_aop_{model_name}",
verbose=False,
traceback_enabled=False
)
# Add agents with specific model
agents = [self.create_real_agent(i, model_name) for i in range(agent_count)]
aop.add_agents_batch(agents)
# Test different concurrency levels
for concurrent in range(1, max_concurrent + 1, 5):
logger.info(f"Testing {model_name} with {concurrent} concurrent requests")
result = self.run_latency_test(
aop, agent_count, model_name, requests_per_level, concurrent
)
result.test_name = "concurrent_test"
results.append(result)
# Cleanup
del aop
gc.collect()
except Exception as e:
logger.error(f"Concurrent test failed for {model_name}: {e}")
logger.info(f"Concurrent test completed: {len(results)} test points across {len(self.models)} models")
return results
def run_memory_test(self, agent_count: int = 20, iterations: int = 10) -> List[BenchmarkResult]:
"""
Test memory usage patterns over time across models.
Args:
agent_count: Number of agents to use
iterations: Number of iterations to run
Returns:
List of benchmark results
"""
logger.info(f"Running memory test with {agent_count} agents, {iterations} iterations across {len(self.models)} models")
results = []
for model_name in self.models:
logger.info(f"Testing memory for model: {model_name}")
for iteration in range(iterations):
logger.info(f"Memory test iteration {iteration + 1}/{iterations} for {model_name}")
try:
# Create AOP instance
aop = AOP(
server_name=f"memory_test_aop_{model_name}_{iteration}",
verbose=False,
traceback_enabled=False
)
# Add agents with specific model
agents = [self.create_real_agent(i, model_name) for i in range(agent_count)]
aop.add_agents_batch(agents)
# Run test
result = self.run_latency_test(aop, agent_count, model_name, 50, 5)
result.test_name = "memory_test"
result.additional_metrics["iteration"] = iteration
results.append(result)
# Cleanup
del aop
gc.collect()
except Exception as e:
logger.error(f"Memory test iteration {iteration} failed for {model_name}: {e}")
logger.info(f"Memory test completed: {len(results)} iterations across {len(self.models)} models")
return results
def run_agent_lifecycle_test(self, model_name: str = None) -> List[BenchmarkResult]:
"""Test agent lifecycle management in AOP."""
logger.info(f"Running agent lifecycle test for {model_name or 'default model'}")
results = []
model_name = model_name or random.choice(self.models)
# Test agent creation, registration, execution, and cleanup
aop = AOP(server_name=f"lifecycle_test_aop_{model_name}", verbose=False)
# Measure agent creation time
creation_start = time.time()
agents = [self.create_real_agent(i, model_name=model_name) for i in range(10)]
creation_time = time.time() - creation_start
# Measure tool registration time
registration_start = time.time()
aop.add_agents_batch(agents)
registration_time = time.time() - registration_start
# Test agent execution
execution_start = time.time()
available_agents = aop.list_agents()
if available_agents:
# Test agent execution
task = {
'task': 'Analyze the performance characteristics of this system',
'data': random.sample(self.large_data, 10),
'analysis_type': 'performance_analysis'
}
# Execute with first available agent
agent_name = available_agents[0]
try:
response = aop._execute_agent_with_timeout(agent_name, task, timeout=30)
execution_time = time.time() - execution_start
success = True
except Exception as e:
execution_time = time.time() - execution_start
success = False
logger.error(f"Agent execution failed: {e}")
# Create result
result = BenchmarkResult(
test_name="agent_lifecycle_test",
agent_count=len(agents),
model_name=model_name,
latency_ms=execution_time * 1000,
throughput_rps=1.0 / execution_time if execution_time > 0 else 0,
success_rate=1.0 if success else 0.0,
error_rate=0.0 if success else 1.0,
memory_usage_mb=psutil.Process().memory_info().rss / 1024 / 1024,
cpu_usage_percent=psutil.cpu_percent(),
cost_usd=0.01, # Estimated cost
tokens_used=100, # Estimated tokens
response_quality_score=0.9 if success else 0.0,
agent_creation_time=creation_time,
tool_registration_time=registration_time,
execution_time=execution_time,
total_latency=creation_time + registration_time + execution_time
)
results.append(result)
logger.info(f"Agent lifecycle test completed: {execution_time:.2f}s total")
return results
def run_tool_chaining_test(self, model_name: str = None) -> List[BenchmarkResult]:
"""Test tool chaining capabilities in AOP."""
logger.info(f"Running tool chaining test for {model_name or 'default model'}")
results = []
model_name = model_name or random.choice(self.models)
aop = AOP(server_name=f"chaining_test_aop_{model_name}", verbose=False)
# Create specialized agents for chaining
agents = []
agent_types = ['analyzer', 'summarizer', 'classifier', 'extractor', 'validator']
for i, agent_type in enumerate(agent_types):
agent = self.create_real_agent(i, model_name=model_name)
agent.name = f"{agent_type}_agent_{i}"
agents.append(agent)
# Register agents
aop.add_agents_batch(agents)
# Test chaining: analyzer -> summarizer -> classifier
chaining_start = time.time()
available_agents = aop.list_agents()
if len(available_agents) >= 3:
try:
# Step 1: Analysis
task1 = {
'task': 'Analyze this data for patterns and insights',
'data': random.sample(self.large_data, 20),
'analysis_type': 'pattern_analysis'
}
response1 = aop._execute_agent_with_timeout(available_agents[0], task1, timeout=30)
# Step 2: Summarization
task2 = {
'task': 'Summarize the analysis results',
'data': [response1],
'analysis_type': 'summarization'
}
response2 = aop._execute_agent_with_timeout(available_agents[1], task2, timeout=30)
# Step 3: Classification
task3 = {
'task': 'Classify the summarized results',
'data': [response2],
'analysis_type': 'classification'
}
response3 = aop._execute_agent_with_timeout(available_agents[2], task3, timeout=30)
chaining_time = time.time() - chaining_start
success = True
except Exception as e:
chaining_time = time.time() - chaining_start
success = False
logger.error(f"Tool chaining failed: {e}")
else:
chaining_time = 0
success = False
result = BenchmarkResult(
test_name="tool_chaining_test",
agent_count=len(agents),
model_name=model_name,
latency_ms=chaining_time * 1000,
throughput_rps=3.0 / chaining_time if chaining_time > 0 else 0, # 3 steps
success_rate=1.0 if success else 0.0,
error_rate=0.0 if success else 1.0,
memory_usage_mb=psutil.Process().memory_info().rss / 1024 / 1024,
cpu_usage_percent=psutil.cpu_percent(),
cost_usd=0.03, # Higher cost for chaining
tokens_used=300, # More tokens for chaining
response_quality_score=0.85 if success else 0.0,
chaining_steps=3,
chaining_success=success
)
results.append(result)
logger.info(f"Tool chaining test completed: {chaining_time:.2f}s, success: {success}")
return results
def run_error_handling_test(self, model_name: str = None) -> List[BenchmarkResult]:
"""Test error handling and recovery in AOP."""
logger.info(f"Running error handling test for {model_name or 'default model'}")
results = []
model_name = model_name or random.choice(self.models)
aop = AOP(server_name=f"error_test_aop_{model_name}", verbose=False)
# Create agents
agents = [self.create_real_agent(i, model_name=model_name) for i in range(5)]
aop.add_agents_batch(agents)
# Test various error scenarios
error_scenarios = [
{'task': '', 'data': [], 'error_type': 'empty_task'}, # Empty task
{'task': 'x' * 10000, 'data': [], 'error_type': 'oversized_task'}, # Oversized task
{'task': 'Valid task', 'data': None, 'error_type': 'invalid_data'}, # Invalid data
{'task': 'Valid task', 'data': [], 'error_type': 'timeout'}, # Timeout scenario
]
error_handling_start = time.time()
successful_recoveries = 0
total_errors = 0
for scenario in error_scenarios:
try:
available_agents = aop.list_agents()
if available_agents:
# Attempt execution with error scenario
response = aop._execute_agent_with_timeout(
available_agents[0],
scenario,
timeout=5 # Short timeout for error testing
)
if response:
successful_recoveries += 1
total_errors += 1
except Exception as e:
# Expected error - count as handled
successful_recoveries += 1
total_errors += 1
logger.debug(f"Expected error handled: {e}")
error_handling_time = time.time() - error_handling_start
recovery_rate = successful_recoveries / total_errors if total_errors > 0 else 0
result = BenchmarkResult(
test_name="error_handling_test",
agent_count=len(agents),
model_name=model_name,
latency_ms=error_handling_time * 1000,
throughput_rps=total_errors / error_handling_time if error_handling_time > 0 else 0,
success_rate=recovery_rate,
error_rate=1.0 - recovery_rate,
memory_usage_mb=psutil.Process().memory_info().rss / 1024 / 1024,
cpu_usage_percent=psutil.cpu_percent(),
cost_usd=0.005, # Lower cost for error testing
tokens_used=50, # Fewer tokens for error scenarios
response_quality_score=recovery_rate,
error_scenarios_tested=len(error_scenarios),
recovery_rate=recovery_rate
)
results.append(result)
logger.info(f"Error handling test completed: {recovery_rate:.2%} recovery rate")
return results
def run_resource_management_test(self, model_name: str = None) -> List[BenchmarkResult]:
"""Test resource management and cleanup in AOP."""
logger.info(f"Running resource management test for {model_name or 'default model'}")
results = []
model_name = model_name or random.choice(self.models)
# Test resource usage over time
resource_measurements = []
for cycle in range(5): # 5 cycles of create/use/destroy
# Create AOP instance
aop = AOP(server_name=f"resource_test_aop_{model_name}_{cycle}", verbose=False)
# Create agents
agents = [self.create_real_agent(i, model_name=model_name) for i in range(10)]
aop.add_agents_batch(agents)
# Measure resource usage
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
initial_cpu = psutil.cpu_percent()
# Execute some tasks
available_agents = aop.list_agents()
if available_agents:
for i in range(10):
task = {
'task': f'Resource test task {i}',
'data': random.sample(self.large_data, 5),
'analysis_type': 'resource_test'
}
try:
aop._execute_agent_with_timeout(available_agents[0], task, timeout=10)
except Exception as e:
logger.debug(f"Task execution failed: {e}")
# Measure final resource usage
final_memory = psutil.Process().memory_info().rss / 1024 / 1024
final_cpu = psutil.cpu_percent()
resource_measurements.append({
'cycle': cycle,
'initial_memory': initial_memory,
'final_memory': final_memory,
'memory_delta': final_memory - initial_memory,
'cpu_usage': final_cpu
})
# Clean up
del aop
del agents
gc.collect()
# Calculate resource management metrics
memory_deltas = [m['memory_delta'] for m in resource_measurements]
avg_memory_delta = sum(memory_deltas) / len(memory_deltas)
memory_leak_detected = any(delta > 10 for delta in memory_deltas) # 10MB threshold
result = BenchmarkResult(
test_name="resource_management_test",
agent_count=10,
model_name=model_name,
latency_ms=0, # Not applicable for resource test
throughput_rps=0, # Not applicable for resource test
success_rate=0.0 if memory_leak_detected else 1.0,
error_rate=1.0 if memory_leak_detected else 0.0,
memory_usage_mb=final_memory,
cpu_usage_percent=final_cpu,
cost_usd=0.02, # Estimated cost
tokens_used=200, # Estimated tokens
response_quality_score=0.0 if memory_leak_detected else 1.0,
resource_cycles=len(resource_measurements),
avg_memory_delta=avg_memory_delta,
memory_leak_detected=memory_leak_detected
)
results.append(result)
logger.info(f"Resource management test completed: {'PASS' if not memory_leak_detected else 'FAIL'}")
return results
def run_simple_tools_test(self, model_name: str = None) -> List[BenchmarkResult]:
"""Test simple tools and their performance with agents."""
logger.info(f"Running simple tools test for {model_name or 'default model'}")
results = []
model_name = model_name or random.choice(self.models)
aop = AOP(server_name=f"tools_test_aop_{model_name}", verbose=False)
# Create agents with different tool capabilities
agents = []
tool_types = ['calculator', 'text_processor', 'data_analyzer', 'formatter', 'validator']
for i, tool_type in enumerate(tool_types):
agent = self.create_real_agent(i, model_name=model_name)
agent.name = f"{tool_type}_agent_{i}"
agents.append(agent)
# Register agents
aop.add_agents_batch(agents)
# Test different simple tools
tool_tests = [
{
'tool_type': 'calculator',
'task': 'Calculate the sum of numbers: 15, 23, 47, 89, 156',
'expected_complexity': 'simple',
'expected_speed': 'fast'
},
{
'tool_type': 'text_processor',
'task': 'Count words and characters in this text: "The quick brown fox jumps over the lazy dog"',
'expected_complexity': 'simple',
'expected_speed': 'fast'
},
{
'tool_type': 'data_analyzer',
'task': 'Find the average of these numbers: 10, 20, 30, 40, 50',
'expected_complexity': 'simple',
'expected_speed': 'fast'
},
{
'tool_type': 'formatter',
'task': 'Format this JSON: {"name":"John","age":30,"city":"New York"}',
'expected_complexity': 'medium',
'expected_speed': 'medium'
},
{
'tool_type': 'validator',
'task': 'Validate if this email is correct: user@example.com',
'expected_complexity': 'simple',
'expected_speed': 'fast'
}
]
tool_performance = []
available_agents = aop.list_agents()
for test in tool_tests:
if available_agents:
tool_start = time.time()
try:
# Execute tool test
response = aop._execute_agent_with_timeout(
available_agents[0],
test,
timeout=15
)
tool_time = time.time() - tool_start
success = True
# Simulate tool quality based on response time and complexity
if tool_time < 2.0 and test['expected_speed'] == 'fast':
quality_score = 0.9
elif tool_time < 5.0 and test['expected_speed'] == 'medium':
quality_score = 0.8
else:
quality_score = 0.6
except Exception as e:
tool_time = time.time() - tool_start
success = False
quality_score = 0.0
logger.debug(f"Tool test failed: {e}")
tool_performance.append({
'tool_type': test['tool_type'],
'execution_time': tool_time,
'success': success,
'quality_score': quality_score,
'expected_complexity': test['expected_complexity'],
'expected_speed': test['expected_speed']
})
# Calculate tool performance metrics
successful_tools = sum(1 for p in tool_performance if p['success'])
avg_execution_time = sum(p['execution_time'] for p in tool_performance) / len(tool_performance)
avg_quality = sum(p['quality_score'] for p in tool_performance) / len(tool_performance)
result = BenchmarkResult(
test_name="simple_tools_test",
agent_count=len(agents),
model_name=model_name,
latency_ms=avg_execution_time * 1000,
throughput_rps=len(tool_tests) / sum(p['execution_time'] for p in tool_performance),
success_rate=successful_tools / len(tool_tests),
error_count=len(tool_tests) - successful_tools,
total_requests=len(tool_tests),
concurrent_requests=1,
timestamp=time.time(),
memory_usage_mb=psutil.Process().memory_info().rss / 1024 / 1024,
cpu_usage_percent=psutil.cpu_percent(),
cost_usd=0.01, # Lower cost for simple tools
tokens_used=50, # Fewer tokens for simple tools
response_quality_score=avg_quality,
tools_tested=len(tool_tests),
successful_tools=successful_tools,
avg_tool_execution_time=avg_execution_time,
tool_performance_data=tool_performance
)
results.append(result)
logger.info(f"Simple tools test completed: {successful_tools}/{len(tool_tests)} tools successful")
return results
def create_performance_charts(self, results: List[BenchmarkResult]) -> None:
"""
Create comprehensive performance charts.
Args:
results: List of benchmark results
"""
logger.info("Creating performance charts")
# Check if we have any results
if not results:
logger.warning("No benchmark results available for chart generation")
self._create_empty_charts()
return
# Set up the plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
# Convert results to DataFrame
df = pd.DataFrame([asdict(result) for result in results])
# Check if DataFrame is empty
if df.empty:
logger.warning("Empty DataFrame - no data to plot")
self._create_empty_charts()
return
# Create figure with subplots
fig, axes = plt.subplots(2, 3, figsize=(24, 14))
fig.suptitle('AOP Framework Performance Analysis - Model Comparison', fontsize=18, fontweight='bold')
# Get unique models for color mapping
unique_models = df['model_name'].unique()
model_colors = plt.cm.Set3(np.linspace(0, 1, len(unique_models)))
model_color_map = dict(zip(unique_models, model_colors))
# 1. Latency vs Agent Count by Model
ax1 = axes[0, 0]
scaling_results = df[df['test_name'] == 'scaling_test']
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax1.plot(model_data['agent_count'], model_data['latency_ms'],
marker='o', linewidth=2, markersize=6,
label=model, color=model_color_map[model])
ax1.set_xlabel('Number of Agents')
ax1.set_ylabel('Average Latency (ms)')
ax1.set_title('Latency vs Agent Count by Model')
ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax1.grid(True, alpha=0.3)
# 2. Throughput vs Agent Count by Model
ax2 = axes[0, 1]
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax2.plot(model_data['agent_count'], model_data['throughput_rps'],
marker='s', linewidth=2, markersize=6,
label=model, color=model_color_map[model])
ax2.set_xlabel('Number of Agents')
ax2.set_ylabel('Throughput (RPS)')
ax2.set_title('Throughput vs Agent Count by Model')
ax2.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax2.grid(True, alpha=0.3)
# 3. Memory Usage vs Agent Count by Model
ax3 = axes[0, 2]
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax3.plot(model_data['agent_count'], model_data['memory_usage_mb'],
marker='^', linewidth=2, markersize=6,
label=model, color=model_color_map[model])
ax3.set_xlabel('Number of Agents')
ax3.set_ylabel('Memory Usage (MB)')
ax3.set_title('Memory Usage vs Agent Count by Model')
ax3.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax3.grid(True, alpha=0.3)
# 4. Concurrent Performance by Model
ax4 = axes[1, 0]
concurrent_results = df[df['test_name'] == 'concurrent_test']
if not concurrent_results.empty:
for model in unique_models:
model_data = concurrent_results[concurrent_results['model_name'] == model]
if not model_data.empty:
ax4.plot(model_data['concurrent_requests'], model_data['latency_ms'],
marker='o', linewidth=2, markersize=6,
label=model, color=model_color_map[model])
ax4.set_xlabel('Concurrent Requests')
ax4.set_ylabel('Average Latency (ms)')
ax4.set_title('Latency vs Concurrency by Model')
ax4.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax4.grid(True, alpha=0.3)
# 5. Success Rate Analysis by Model
ax5 = axes[1, 1]
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax5.plot(model_data['agent_count'], model_data['success_rate'] * 100,
marker='d', linewidth=2, markersize=6,
label=model, color=model_color_map[model])
ax5.set_xlabel('Number of Agents')
ax5.set_ylabel('Success Rate (%)')
ax5.set_title('Success Rate vs Agent Count by Model')
ax5.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax5.grid(True, alpha=0.3)
ax5.set_ylim(0, 105)
# 6. Model Performance Comparison (Bar Chart)
ax6 = axes[1, 2]
if not scaling_results.empty:
# Calculate average performance metrics by model
model_performance = scaling_results.groupby('model_name').agg({
'latency_ms': 'mean',
'throughput_rps': 'mean',
'success_rate': 'mean',
'cost_usd': 'mean'
}).reset_index()
# Create a bar chart comparing models
x_pos = np.arange(len(model_performance))
width = 0.2
# Normalize metrics for comparison (0-1 scale)
latency_norm = (model_performance['latency_ms'] - model_performance['latency_ms'].min()) / (model_performance['latency_ms'].max() - model_performance['latency_ms'].min())
throughput_norm = (model_performance['throughput_rps'] - model_performance['throughput_rps'].min()) / (model_performance['throughput_rps'].max() - model_performance['throughput_rps'].min())
success_norm = model_performance['success_rate']
ax6.bar(x_pos - width, latency_norm, width, label='Latency (norm)', alpha=0.8)
ax6.bar(x_pos, throughput_norm, width, label='Throughput (norm)', alpha=0.8)
ax6.bar(x_pos + width, success_norm, width, label='Success Rate', alpha=0.8)
ax6.set_xlabel('Models')
ax6.set_ylabel('Normalized Performance')
ax6.set_title('Model Performance Comparison')
ax6.set_xticks(x_pos)
ax6.set_xticklabels(model_performance['model_name'], rotation=45, ha='right')
ax6.legend()
ax6.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f"{self.output_dir}/performance_analysis.png", dpi=300, bbox_inches='tight')
plt.close()
# Create additional detailed charts
self._create_detailed_charts(df)
# Create additional tool performance chart
self._create_tool_performance_chart(results)
logger.info(f"Performance charts saved to {self.output_dir}/")
def _create_empty_charts(self) -> None:
"""Create empty charts when no data is available."""
logger.info("Creating empty charts due to no data")
# Create empty performance analysis chart
fig, axes = plt.subplots(2, 3, figsize=(20, 12))
fig.suptitle('AOP Framework Performance Analysis - No Data Available', fontsize=16, fontweight='bold')
# Add "No Data" text to each subplot
for i, ax in enumerate(axes.flat):
ax.text(0.5, 0.5, 'No Data Available', ha='center', va='center',
transform=ax.transAxes, fontsize=14, color='red')
ax.set_title(f'Chart {i+1}')
plt.tight_layout()
plt.savefig(f"{self.output_dir}/performance_analysis.png", dpi=300, bbox_inches='tight')
plt.close()
# Create empty detailed analysis chart
fig, ax = plt.subplots(1, 1, figsize=(12, 8))
ax.text(0.5, 0.5, 'No Data Available for Detailed Analysis', ha='center', va='center',
transform=ax.transAxes, fontsize=16, color='red')
ax.set_title('Detailed Analysis - No Data Available')
plt.tight_layout()
plt.savefig(f"{self.output_dir}/detailed_analysis.png", dpi=300, bbox_inches='tight')
plt.close()
logger.info("Empty charts created")
def _create_detailed_charts(self, df: pd.DataFrame) -> None:
"""Create additional detailed performance charts with model comparisons."""
# Check if DataFrame is empty
if df.empty:
logger.warning("Empty DataFrame for detailed charts")
return
# Get unique models for color mapping
unique_models = df['model_name'].unique()
model_colors = plt.cm.Set3(np.linspace(0, 1, len(unique_models)))
model_color_map = dict(zip(unique_models, model_colors))
# Create comprehensive detailed analysis
fig, axes = plt.subplots(2, 3, figsize=(24, 16))
fig.suptitle('Detailed Model Performance Analysis', fontsize=18, fontweight='bold')
scaling_results = df[df['test_name'] == 'scaling_test']
# Check if we have scaling results
if scaling_results.empty:
logger.warning("No scaling results for detailed charts")
return
# 1. Latency Distribution by Model
ax1 = axes[0, 0]
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax1.hist(model_data['latency_ms'], bins=15, alpha=0.6,
label=model, color=model_color_map[model], edgecolor='black')
ax1.set_xlabel('Latency (ms)')
ax1.set_ylabel('Frequency')
ax1.set_title('Latency Distribution by Model')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. Throughput vs Memory Usage by Model
ax2 = axes[0, 1]
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax2.scatter(model_data['memory_usage_mb'], model_data['throughput_rps'],
s=100, alpha=0.7, label=model, color=model_color_map[model])
ax2.set_xlabel('Memory Usage (MB)')
ax2.set_ylabel('Throughput (RPS)')
ax2.set_title('Throughput vs Memory Usage by Model')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 3. Scaling Efficiency by Model
ax3 = axes[0, 2]
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
efficiency = model_data['throughput_rps'] / model_data['agent_count']
ax3.plot(model_data['agent_count'], efficiency, marker='o', linewidth=2,
label=model, color=model_color_map[model])
ax3.set_xlabel('Number of Agents')
ax3.set_ylabel('Efficiency (RPS per Agent)')
ax3.set_title('Scaling Efficiency by Model')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. Error Rate Analysis by Model
ax4 = axes[1, 0]
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
error_rate = (1 - model_data['success_rate']) * 100
ax4.plot(model_data['agent_count'], error_rate, marker='s', linewidth=2,
label=model, color=model_color_map[model])
ax4.set_xlabel('Number of Agents')
ax4.set_ylabel('Error Rate (%)')
ax4.set_title('Error Rate vs Agent Count by Model')
ax4.legend()
ax4.grid(True, alpha=0.3)
ax4.set_ylim(0, 10)
# 5. Cost Analysis by Model
ax5 = axes[1, 1]
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax5.plot(model_data['agent_count'], model_data['cost_usd'], marker='d', linewidth=2,
label=model, color=model_color_map[model])
ax5.set_xlabel('Number of Agents')
ax5.set_ylabel('Cost (USD)')
ax5.set_title('Cost vs Agent Count by Model')
ax5.legend()
ax5.grid(True, alpha=0.3)
# 6. Quality Score Analysis by Model
ax6 = axes[1, 2] # Now we have 2x3 subplot
if not scaling_results.empty:
for model in unique_models:
model_data = scaling_results[scaling_results['model_name'] == model]
if not model_data.empty:
ax6.plot(model_data['agent_count'], model_data['response_quality_score'], marker='^', linewidth=2,
label=model, color=model_color_map[model])
ax6.set_xlabel('Number of Agents')
ax6.set_ylabel('Quality Score')
ax6.set_title('Response Quality vs Agent Count by Model')
ax6.legend()
ax6.grid(True, alpha=0.3)
ax6.set_ylim(0, 1)
plt.tight_layout()
plt.savefig(f"{self.output_dir}/detailed_analysis.png", dpi=300, bbox_inches='tight')
plt.close()
# Create additional tool performance chart
# Note: This will be called from create_performance_charts with the full results list
def _create_tool_performance_chart(self, results: List[BenchmarkResult]) -> None:
"""Create a dedicated chart for tool performance analysis."""
logger.info("Creating tool performance chart")
# Filter for simple tools test results
tools_results = [r for r in results if r.test_name == "simple_tools_test"]
if not tools_results:
logger.warning("No tool performance data available")
return
# Create DataFrame
df = pd.DataFrame([
{
'model_name': r.model_name,
'tools_tested': getattr(r, 'tools_tested', 0),
'successful_tools': getattr(r, 'successful_tools', 0),
'avg_tool_execution_time': getattr(r, 'avg_tool_execution_time', 0),
'response_quality_score': r.response_quality_score,
'cost_usd': r.cost_usd,
'latency_ms': r.latency_ms
}
for r in tools_results
])
if df.empty:
logger.warning("Empty DataFrame for tool performance chart")
return
# Create tool performance chart
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Simple Tools Performance Analysis by Model', fontsize=16, fontweight='bold')
# Get unique models for color mapping
unique_models = df['model_name'].unique()
model_colors = plt.cm.Set3(np.linspace(0, 1, len(unique_models)))
model_color_map = dict(zip(unique_models, model_colors))
# 1. Tool Success Rate by Model
ax1 = axes[0, 0]
success_rates = df['successful_tools'] / df['tools_tested'] * 100
bars1 = ax1.bar(range(len(df)), success_rates, color=[model_color_map[model] for model in df['model_name']])
ax1.set_xlabel('Models')
ax1.set_ylabel('Success Rate (%)')
ax1.set_title('Tool Success Rate by Model')
ax1.set_xticks(range(len(df)))
ax1.set_xticklabels(df['model_name'], rotation=45, ha='right')
ax1.set_ylim(0, 105)
ax1.grid(True, alpha=0.3)
# Add value labels on bars
for i, (bar, rate) in enumerate(zip(bars1, success_rates)):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
f'{rate:.1f}%', ha='center', va='bottom', fontsize=8)
# 2. Tool Execution Time by Model
ax2 = axes[0, 1]
bars2 = ax2.bar(range(len(df)), df['avg_tool_execution_time'],
color=[model_color_map[model] for model in df['model_name']])
ax2.set_xlabel('Models')
ax2.set_ylabel('Avg Execution Time (s)')
ax2.set_title('Tool Execution Time by Model')
ax2.set_xticks(range(len(df)))
ax2.set_xticklabels(df['model_name'], rotation=45, ha='right')
ax2.grid(True, alpha=0.3)
# Add value labels on bars
for i, (bar, time) in enumerate(zip(bars2, df['avg_tool_execution_time'])):
ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{time:.2f}s', ha='center', va='bottom', fontsize=8)
# 3. Tool Quality vs Cost by Model
ax3 = axes[1, 0]
scatter = ax3.scatter(df['cost_usd'], df['response_quality_score'],
s=100, c=[model_color_map[model] for model in df['model_name']],
alpha=0.7, edgecolors='black')
ax3.set_xlabel('Cost (USD)')
ax3.set_ylabel('Quality Score')
ax3.set_title('Tool Quality vs Cost by Model')
ax3.grid(True, alpha=0.3)
# Add model labels
for i, model in enumerate(df['model_name']):
ax3.annotate(model, (df.iloc[i]['cost_usd'], df.iloc[i]['response_quality_score']),
xytext=(5, 5), textcoords='offset points', fontsize=8)
# 4. Tool Performance Summary
ax4 = axes[1, 1]
# Create a summary table-like visualization
metrics = ['Success Rate', 'Avg Time', 'Quality', 'Cost']
model_data = []
for model in unique_models:
model_df = df[df['model_name'] == model].iloc[0]
model_data.append([
model_df['successful_tools'] / model_df['tools_tested'] * 100,
model_df['avg_tool_execution_time'],
model_df['response_quality_score'] * 100,
model_df['cost_usd'] * 1000 # Convert to millicents for better visualization
])
# Normalize data for comparison
model_data = np.array(model_data)
normalized_data = model_data / model_data.max(axis=0)
x = np.arange(len(metrics))
width = 0.8 / len(unique_models)
for i, model in enumerate(unique_models):
ax4.bar(x + i * width, normalized_data[i], width,
label=model, color=model_color_map[model], alpha=0.8)
ax4.set_xlabel('Metrics')
ax4.set_ylabel('Normalized Performance')
ax4.set_title('Tool Performance Comparison (Normalized)')
ax4.set_xticks(x + width * (len(unique_models) - 1) / 2)
ax4.set_xticklabels(metrics)
ax4.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f"{self.output_dir}/tool_performance_analysis.png", dpi=300, bbox_inches='tight')
plt.close()
logger.info("Tool performance chart saved")
def generate_report(self, results: List[BenchmarkResult]) -> str:
"""
Generate comprehensive benchmark report.
Args:
results: List of benchmark results
Returns:
str: Generated report
"""
logger.info("Generating benchmark report")
# Calculate statistics
df = pd.DataFrame([asdict(result) for result in results])
report = f"""
# AOP Framework Benchmark Report
## Executive Summary
This report presents a comprehensive performance analysis of the AOP (Agent Orchestration Platform) framework.
The benchmark suite tested various aspects including scaling laws, latency, throughput, memory usage, and error rates.
## Test Configuration
- **Total Test Points**: {len(results)}
- **Test Duration**: {time.strftime('%Y-%m-%d %H:%M:%S')}
- **Output Directory**: {self.output_dir}
## Key Findings
### Scaling Performance
"""
# Scaling analysis
scaling_results = df[df['test_name'] == 'scaling_test']
if not scaling_results.empty:
max_agents = scaling_results['agent_count'].max()
best_throughput = scaling_results['throughput_rps'].max()
best_latency = scaling_results['latency_ms'].min()
report += f"""
- **Maximum Agents Tested**: {max_agents}
- **Peak Throughput**: {best_throughput:.2f} RPS
- **Best Latency**: {best_latency:.2f} ms
- **Average Success Rate**: {scaling_results['success_rate'].mean():.2%}
"""
# Concurrent performance
concurrent_results = df[df['test_name'] == 'concurrent_test']
if not concurrent_results.empty:
max_concurrent = concurrent_results['concurrent_requests'].max()
concurrent_throughput = concurrent_results['throughput_rps'].max()
report += f"""
### Concurrent Performance
- **Maximum Concurrent Requests**: {max_concurrent}
- **Peak Concurrent Throughput**: {concurrent_throughput:.2f} RPS
"""
# Memory analysis
memory_results = df[df['test_name'] == 'memory_test']
if not memory_results.empty:
avg_memory = memory_results['memory_usage_mb'].mean()
max_memory = memory_results['memory_usage_mb'].max()
report += f"""
### Memory Usage
- **Average Memory Usage**: {avg_memory:.2f} MB
- **Peak Memory Usage**: {max_memory:.2f} MB
"""
# Statistical analysis
report += f"""
## Statistical Analysis
### Latency Statistics
- **Mean Latency**: {df['latency_ms'].mean():.2f} ms
- **Median Latency**: {df['latency_ms'].median():.2f} ms
- **95th Percentile**: {df['latency_ms'].quantile(0.95):.2f} ms
- **99th Percentile**: {df['latency_ms'].quantile(0.99):.2f} ms
### Throughput Statistics
- **Mean Throughput**: {df['throughput_rps'].mean():.2f} RPS
- **Peak Throughput**: {df['throughput_rps'].max():.2f} RPS
- **Throughput Standard Deviation**: {df['throughput_rps'].std():.2f} RPS
### Success Rate Analysis
- **Overall Success Rate**: {df['success_rate'].mean():.2%}
- **Minimum Success Rate**: {df['success_rate'].min():.2%}
- **Maximum Success Rate**: {df['success_rate'].max():.2%}
## Scaling Laws Analysis
The framework demonstrates the following scaling characteristics:
1. **Linear Scaling**: Throughput increases approximately linearly with agent count up to a certain threshold
2. **Latency Degradation**: Latency increases with higher agent counts due to resource contention
3. **Memory Growth**: Memory usage grows predictably with agent count
4. **Error Rate Stability**: Success rate remains stable across different configurations
## Recommendations
1. **Optimal Agent Count**: Based on the results, the optimal agent count for this configuration is approximately {scaling_results['agent_count'].iloc[scaling_results['throughput_rps'].idxmax()] if not scaling_results.empty and len(scaling_results) > 0 else 'N/A'} agents
2. **Concurrency Limits**: Maximum recommended concurrent requests: {concurrent_results['concurrent_requests'].iloc[concurrent_results['latency_ms'].idxmin()] if not concurrent_results.empty and len(concurrent_results) > 0 else 'N/A'}
3. **Resource Planning**: Plan for {df['memory_usage_mb'].max():.0f} MB memory usage for maximum agent count
## Conclusion
The AOP framework demonstrates good scaling characteristics with predictable performance degradation patterns.
The benchmark results provide valuable insights for production deployment planning and resource allocation.
---
*Report generated by AOP Benchmark Suite*
*Generated on: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return report
def save_results(self, results: List[BenchmarkResult], report: str) -> None:
"""
Save benchmark results and report to files.
Args:
results: List of benchmark results
report: Generated report
"""
logger.info("Saving benchmark results")
# Save raw results as JSON
results_data = [asdict(result) for result in results]
with open(f"{self.output_dir}/benchmark_results.json", 'w') as f:
json.dump(results_data, f, indent=2, default=str)
# Save report
with open(f"{self.output_dir}/benchmark_report.md", 'w') as f:
f.write(report)
# Save CSV for easy analysis
df = pd.DataFrame(results_data)
df.to_csv(f"{self.output_dir}/benchmark_results.csv", index=False)
logger.info(f"Results saved to {self.output_dir}/")
def run_full_benchmark_suite(self) -> None:
"""
Run the complete benchmark suite with all tests.
"""
logger.info("Starting full AOP benchmark suite")
# Configuration
config = ScalingTestConfig(
min_agents=1,
max_agents=BENCHMARK_CONFIG["max_agents"],
step_size=5, # Increased step size for faster testing
requests_per_test=BENCHMARK_CONFIG["requests_per_test"],
concurrent_requests=BENCHMARK_CONFIG["concurrent_requests"],
warmup_requests=BENCHMARK_CONFIG["warmup_requests"]
)
all_results = []
try:
# 1. Scaling Test
logger.info("=== Running Scaling Test ===")
try:
scaling_results = self.run_scaling_test(config)
all_results.extend(scaling_results)
logger.info(f"Scaling test completed: {len(scaling_results)} results")
except Exception as e:
logger.error(f"Scaling test failed: {e}")
logger.info("Continuing with other tests...")
# 2. Concurrent Test
logger.info("=== Running Concurrent Test ===")
try:
concurrent_results = self.run_concurrent_test(
agent_count=5,
max_concurrent=10,
requests_per_level=10
)
all_results.extend(concurrent_results)
logger.info(f"Concurrent test completed: {len(concurrent_results)} results")
except Exception as e:
logger.error(f"Concurrent test failed: {e}")
logger.info("Continuing with other tests...")
# 3. Memory Test
logger.info("=== Running Memory Test ===")
try:
memory_results = self.run_memory_test(
agent_count=5,
iterations=3
)
all_results.extend(memory_results)
logger.info(f"Memory test completed: {len(memory_results)} results")
except Exception as e:
logger.error(f"Memory test failed: {e}")
logger.info("Continuing with other tests...")
# 4. Agent Lifecycle Test
logger.info("=== Running Agent Lifecycle Test ===")
try:
lifecycle_results = []
for model_name in self.models:
lifecycle_results.extend(self.run_agent_lifecycle_test(model_name))
all_results.extend(lifecycle_results)
logger.info(f"Agent lifecycle test completed: {len(lifecycle_results)} results")
except Exception as e:
logger.error(f"Agent lifecycle test failed: {e}")
logger.info("Continuing with other tests...")
# 5. Tool Chaining Test
logger.info("=== Running Tool Chaining Test ===")
try:
chaining_results = []
for model_name in self.models:
chaining_results.extend(self.run_tool_chaining_test(model_name))
all_results.extend(chaining_results)
logger.info(f"Tool chaining test completed: {len(chaining_results)} results")
except Exception as e:
logger.error(f"Tool chaining test failed: {e}")
logger.info("Continuing with other tests...")
# 6. Error Handling Test
logger.info("=== Running Error Handling Test ===")
try:
error_results = []
for model_name in self.models:
error_results.extend(self.run_error_handling_test(model_name))
all_results.extend(error_results)
logger.info(f"Error handling test completed: {len(error_results)} results")
except Exception as e:
logger.error(f"Error handling test failed: {e}")
logger.info("Continuing with other tests...")
# 7. Resource Management Test
logger.info("=== Running Resource Management Test ===")
try:
resource_results = []
for model_name in self.models:
resource_results.extend(self.run_resource_management_test(model_name))
all_results.extend(resource_results)
logger.info(f"Resource management test completed: {len(resource_results)} results")
except Exception as e:
logger.error(f"Resource management test failed: {e}")
logger.info("Continuing with other tests...")
# 8. Simple Tools Test
logger.info("=== Running Simple Tools Test ===")
try:
tools_results = []
for model_name in self.models:
tools_results.extend(self.run_simple_tools_test(model_name))
all_results.extend(tools_results)
logger.info(f"Simple tools test completed: {len(tools_results)} results")
except Exception as e:
logger.error(f"Simple tools test failed: {e}")
logger.info("Continuing with other tests...")
# 4. Generate Excel Report
logger.info("=== Generating Excel Report ===")
try:
self.create_excel_report(all_results)
logger.info("Excel report generated successfully")
except Exception as e:
logger.error(f"Excel report generation failed: {e}")
# 5. Generate Charts (always try, even with empty results)
logger.info("=== Generating Performance Charts ===")
try:
self.create_performance_charts(all_results)
logger.info("Charts generated successfully")
except Exception as e:
logger.error(f"Chart generation failed: {e}")
logger.info("Creating empty charts...")
self._create_empty_charts()
# 6. Generate Report
logger.info("=== Generating Report ===")
try:
report = self.generate_report(all_results)
logger.info("Report generated successfully")
except Exception as e:
logger.error(f"Report generation failed: {e}")
report = "Benchmark report generation failed due to errors."
# 7. Save Results
logger.info("=== Saving Results ===")
try:
self.save_results(all_results, report)
logger.info("Results saved successfully")
except Exception as e:
logger.error(f"Results saving failed: {e}")
logger.info("=== Benchmark Suite Completed ===")
logger.info(f"Total test points: {len(all_results)}")
logger.info(f"Results saved to: {self.output_dir}")
except Exception as e:
logger.error(f"Benchmark suite failed: {e}")
# Still try to create empty charts
try:
self._create_empty_charts()
except Exception as chart_error:
logger.error(f"Failed to create empty charts: {chart_error}")
raise
def main():
"""Main function to run the benchmark suite."""
print("🚀 AOP Framework Benchmark Suite - Enhanced Edition")
print("=" * 60)
print(f"📋 Configuration:")
print(f" Models: {len(BENCHMARK_CONFIG['models'])} models ({', '.join(BENCHMARK_CONFIG['models'][:3])}...)")
print(f" Max Agents: {BENCHMARK_CONFIG['max_agents']}")
print(f" Requests per Test: {BENCHMARK_CONFIG['requests_per_test']}")
print(f" Concurrent Requests: {BENCHMARK_CONFIG['concurrent_requests']}")
print(f" Large Data Size: {BENCHMARK_CONFIG['large_data_size']:,} records")
print(f" Excel Output: {BENCHMARK_CONFIG['excel_output']}")
print(f" Temperature: {BENCHMARK_CONFIG['temperature']}")
print(f" Max Tokens: {BENCHMARK_CONFIG['max_tokens']}")
print(f" Context Length: {BENCHMARK_CONFIG['context_length']}")
print()
# Check for required environment variables
api_key = os.getenv("SWARMS_API_KEY") or os.getenv("OPENAI_API_KEY")
if not api_key:
print("❌ Error: SWARMS_API_KEY or OPENAI_API_KEY not found in environment variables")
print(" This benchmark requires real LLM calls for accurate performance testing")
print(" Set your API key: export SWARMS_API_KEY='your-key-here' or export OPENAI_API_KEY='your-key-here'")
return 1
# Check for required imports
if not SWARMS_AVAILABLE:
print("❌ Error: swarms not available")
print(" Install required dependencies: pip install swarms openpyxl")
print(" This benchmark requires swarms framework and Excel support")
return 1
# Initialize benchmark suite
benchmark = AOPBenchmarkSuite(
output_dir="aop_benchmark_results",
verbose=True,
log_level="INFO",
models=BENCHMARK_CONFIG["models"]
)
try:
# Run full benchmark suite
benchmark.run_full_benchmark_suite()
print("\n✅ Benchmark completed successfully!")
print(f"📊 Results saved to: {benchmark.output_dir}")
print("📈 Check the generated charts and report for detailed analysis")
except Exception as e:
print(f"\n❌ Benchmark failed: {e}")
logger.error(f"Benchmark suite failed: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())