diff --git a/docs/swarms/agents/agent_judge.md b/docs/swarms/agents/agent_judge.md index d5988a0e..2f5342ad 100644 --- a/docs/swarms/agents/agent_judge.md +++ b/docs/swarms/agents/agent_judge.md @@ -10,15 +10,14 @@ The AgentJudge is designed to evaluate and critique outputs from other AI agents Key capabilities: -- **Quality Assessment**: Evaluates correctness, clarity, and completeness of agent outputs - -- **Structured Feedback**: Provides detailed critiques with strengths, weaknesses, and suggestions - -- **Multimodal Support**: Can evaluate text outputs alongside images - -- **Context Building**: Maintains evaluation context across multiple iterations - -- **Batch Processing**: Efficiently processes multiple evaluations +| Capability | Description | +|------------------------------|-----------------------------------------------------------------------------------------------| +| **Quality Assessment** | Evaluates correctness, clarity, and completeness of agent outputs | +| **Structured Feedback** | Provides detailed critiques with strengths, weaknesses, and suggestions | +| **Multimodal Support** | Can evaluate text outputs alongside images | +| **Context Building** | Maintains evaluation context across multiple iterations | +| **Custom Evaluation Criteria**| Supports weighted evaluation criteria for domain-specific assessments | +| **Batch Processing** | Efficiently processes multiple evaluations | ## Architecture @@ -50,7 +49,6 @@ graph TD J --> O J --> P J --> Q - ``` ## Class Reference @@ -62,10 +60,12 @@ AgentJudge( id: str = str(uuid.uuid4()), agent_name: str = "Agent Judge", description: str = "You're an expert AI agent judge...", - system_prompt: str = AGENT_JUDGE_PROMPT, + system_prompt: str = None, model_name: str = "openai/o1", max_loops: int = 1, verbose: bool = False, + evaluation_criteria: Optional[Dict[str, float]] = None, + return_score: bool = False, *args, **kwargs ) @@ -78,10 +78,12 @@ AgentJudge( | `id` | `str` | `str(uuid.uuid4())` | Unique identifier for the judge instance | | `agent_name` | `str` | `"Agent Judge"` | Name of the agent judge | | `description` | `str` | `"You're an expert AI agent judge..."` | Description of the agent's role | -| `system_prompt` | `str` | `AGENT_JUDGE_PROMPT` | System instructions for evaluation | +| `system_prompt` | `str` | `None` | Custom system instructions (uses default if None) | | `model_name` | `str` | `"openai/o1"` | LLM model for evaluation | | `max_loops` | `int` | `1` | Maximum evaluation iterations | | `verbose` | `bool` | `False` | Enable verbose logging | +| `evaluation_criteria` | `Optional[Dict[str, float]]` | `None` | Dictionary of evaluation criteria and weights | +| `return_score` | `bool` | `False` | Whether to return a numerical score instead of full conversation | ### Methods @@ -90,29 +92,28 @@ AgentJudge( ```python step( task: str = None, - tasks: Optional[List[str]] = None, img: Optional[str] = None ) -> str ``` -Processes a single task or list of tasks and returns evaluation. +Processes a single task and returns the agent's evaluation. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `task` | `str` | `None` | Single task/output to evaluate | -| `tasks` | `List[str]` | `None` | List of tasks/outputs to evaluate | -| `img` | `str` | `None` | Path to image for multimodal evaluation | +| `img` | `Optional[str]` | `None` | Path to image for multimodal evaluation | **Returns:** `str` - Detailed evaluation response +**Raises:** `ValueError` - If no task is provided + #### run() ```python run( task: str = None, - tasks: Optional[List[str]] = None, img: Optional[str] = None -) -> List[str] +) -> Union[str, int] ``` Executes evaluation in multiple iterations with context building. @@ -120,86 +121,115 @@ Executes evaluation in multiple iterations with context building. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `task` | `str` | `None` | Single task/output to evaluate | -| `tasks` | `List[str]` | `None` | List of tasks/outputs to evaluate | -| `img` | `str` | `None` | Path to image for multimodal evaluation | +| `img` | `Optional[str]` | `None` | Path to image for multimodal evaluation | -**Returns:** `List[str]` - List of evaluation responses from each iteration +**Returns:** +- `str` - Full conversation context if `return_score=False` (default) +- `int` - Numerical reward score if `return_score=True` #### run_batched() ```python run_batched( - tasks: Optional[List[str]] = None, - imgs: Optional[List[str]] = None -) -> List[List[str]] + tasks: Optional[List[str]] = None +) -> List[Union[str, int]] ``` -Executes batch evaluation of multiple tasks with corresponding images. +Executes batch evaluation of multiple tasks. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| -| `tasks` | `List[str]` | `None` | List of tasks/outputs to evaluate | -| `imgs` | `List[str]` | `None` | List of image paths (same length as tasks) | +| `tasks` | `Optional[List[str]]` | `None` | List of tasks/outputs to evaluate | -**Returns:** `List[List[str]]` - Evaluation responses for each task +**Returns:** `List[Union[str, int]]` - Evaluation responses for each task ## Examples -### Basic Usage +### Basic Evaluation ```python -from swarms import AgentJudge +from swarms.agents.agent_judge import AgentJudge -# Initialize with default settings -judge = AgentJudge() +# Initialize the agent judge +judge = AgentJudge( + agent_name="quality-judge", + model_name="gpt-4", + max_loops=2 +) -# Single task evaluation -result = judge.step(task="The capital of France is Paris.") -print(result) +# Example agent output to evaluate +agent_output = "The capital of France is Paris. The city is known for its famous Eiffel Tower and delicious croissants. The population is approximately 2.1 million people." + +# Run evaluation with context building +evaluations = judge.run(task=agent_output) ``` -### Custom Configuration +### Technical Evaluation with Custom Criteria ```python -from swarms import AgentJudge +from swarms.agents.agent_judge import AgentJudge -# Custom judge configuration +# Initialize the agent judge with custom evaluation criteria judge = AgentJudge( - agent_name="content-evaluator", + agent_name="technical-judge", model_name="gpt-4", - max_loops=3, - verbose=True + max_loops=1, + evaluation_criteria={ + "accuracy": 0.4, + "completeness": 0.3, + "clarity": 0.2, + "logic": 0.1, + }, ) -# Evaluate multiple outputs -outputs = [ - "Agent CalculusMaster: The integral of x^2 + 3x + 2 is (1/3)x^3 + (3/2)x^2 + 2x + C", - "Agent DerivativeDynamo: The derivative of sin(x) is cos(x)", - "Agent LimitWizard: The limit of sin(x)/x as x approaches 0 is 1" -] +# Example technical agent output to evaluate +technical_output = "To solve the quadratic equation x² + 5x + 6 = 0, we can use the quadratic formula: x = (-b ± √(b² - 4ac)) / 2a. Here, a=1, b=5, c=6. Substituting: x = (-5 ± √(25 - 24)) / 2 = (-5 ± √1) / 2 = (-5 ± 1) / 2. So x = -2 or x = -3." -evaluation = judge.step(tasks=outputs) -print(evaluation) +# Run evaluation with context building +evaluations = judge.run(task=technical_output) ``` -### Iterative Evaluation with Context +### Creative Content Evaluation ```python -from swarms import AgentJudge +from swarms.agents.agent_judge import AgentJudge -# Multiple iterations with context building -judge = AgentJudge(max_loops=3) +# Initialize the agent judge for creative content evaluation +judge = AgentJudge( + agent_name="creative-judge", + model_name="gpt-4", + max_loops=2, + evaluation_criteria={ + "creativity": 0.4, + "originality": 0.3, + "engagement": 0.2, + "coherence": 0.1, + }, +) -# Each iteration builds on previous context -evaluations = judge.run(task="Agent output: 2+2=5") -for i, eval_result in enumerate(evaluations): - print(f"Iteration {i+1}: {eval_result}\n") +# Example creative agent output to evaluate +creative_output = "The moon hung like a silver coin in the velvet sky, casting shadows that danced with the wind. Ancient trees whispered secrets to the stars, while time itself seemed to pause in reverence of this magical moment. The world held its breath, waiting for the next chapter of the eternal story." + +# Run evaluation with context building +evaluations = judge.run(task=creative_output) +``` + +### Single Task Evaluation + +```python +from swarms.agents.agent_judge import AgentJudge + +# Initialize with default settings +judge = AgentJudge() + +# Single task evaluation +result = judge.step(task="The answer is 42.") ``` ### Multimodal Evaluation ```python -from swarms import AgentJudge +from swarms.agents.agent_judge import AgentJudge judge = AgentJudge() @@ -208,32 +238,42 @@ evaluation = judge.step( task="Describe what you see in this image", img="path/to/image.jpg" ) -print(evaluation) ``` ### Batch Processing ```python -from swarms import AgentJudge +from swarms.agents.agent_judge import AgentJudge judge = AgentJudge() -# Batch evaluation with images +# Batch evaluation tasks = [ - "Describe this chart", - "What's the main trend?", - "Any anomalies?" -] -images = [ - "chart1.png", - "chart2.png", - "chart3.png" + "The capital of France is Paris.", + "2 + 2 = 4", + "The Earth is flat." ] # Each task evaluated independently -evaluations = judge.run_batched(tasks=tasks, imgs=images) -for i, task_evals in enumerate(evaluations): - print(f"Task {i+1} evaluations: {task_evals}") +evaluations = judge.run_batched(tasks=tasks) +``` + +### Scoring Mode + +```python +from swarms.agents.agent_judge import AgentJudge + +# Initialize with scoring enabled +judge = AgentJudge( + agent_name="scoring-judge", + model_name="gpt-4", + max_loops=2, + return_score=True +) + +# Get numerical score instead of full conversation +score = judge.run(task="This is a correct and well-explained answer.") +# Returns: 1 (if positive keywords found) or 0 ``` ## Reference diff --git a/docs/swarms/structs/cron_job.md b/docs/swarms/structs/cron_job.md index c2ab0c24..f448a04c 100644 --- a/docs/swarms/structs/cron_job.md +++ b/docs/swarms/structs/cron_job.md @@ -1,496 +1,415 @@ # CronJob -A wrapper class that turns any callable (including Swarms agents) into a scheduled cron job. This class provides functionality to schedule and run tasks at specified intervals using the schedule library with cron-style scheduling. +A wrapper class that turns any callable (including Swarms agents) into a scheduled cron job using the schedule library with cron-style scheduling. -## Overview +Full Path `from swarms.structs.cron_job` -The CronJob class allows you to: +## Class Definition -- Schedule any callable or Swarms Agent to run at specified intervals +```python +class CronJob: + def __init__( + self, + agent: Optional[Union[Any, Callable]] = None, + interval: Optional[str] = None, + job_id: Optional[str] = None, + callback: Optional[Callable[[Any, str, dict], Any]] = None, + ) -> None +``` -- Support for seconds, minutes, and hours intervals +## Constructor Parameters -- Run tasks in a separate thread +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `agent` | `Optional[Union[Any, Callable]]` | `None` | The Swarms Agent instance or callable to be scheduled | +| `interval` | `Optional[str]` | `None` | Interval string in format "Xunit" (e.g., "5seconds", "10minutes", "1hour") | +| `job_id` | `Optional[str]` | `None` | Unique identifier for the job. Auto-generated if not provided | +| `callback` | `Optional[Callable[[Any, str, dict], Any]]` | `None` | Function to customize output processing | -- Handle graceful start/stop of scheduled jobs +## Instance Attributes -- Manage multiple concurrent scheduled jobs +| Attribute | Type | Description | +|-----------|------|-------------| +| `agent` | `Union[Any, Callable]` | The scheduled agent or callable | +| `interval` | `str` | The scheduling interval string | +| `job_id` | `str` | Unique job identifier | +| `is_running` | `bool` | Current execution status | +| `thread` | `Optional[threading.Thread]` | Background execution thread | +| `schedule` | `schedule.Scheduler` | Internal scheduler instance | +| `callback` | `Optional[Callable[[Any, str, dict], Any]]` | Output customization function | +| `execution_count` | `int` | Number of executions completed | +| `start_time` | `Optional[float]` | Job start timestamp | -## Architecture +## Methods -```mermaid -graph TD - A[CronJob] --> B[Initialize] - B --> C[Parse Interval] - C --> D[Schedule Task] - D --> E[Run Job] - E --> F[Execute Task] - F --> G{Is Agent?} - G -->|Yes| H[Run Agent] - G -->|No| I[Run Callable] - H --> J[Handle Result] - I --> J - J --> K[Sleep] - K --> E -``` +### `run(task: str, **kwargs) -> Any` -## Class Reference +Schedules and starts the cron job execution. -### Constructor +**Parameters:** +- `task` (`str`): Task string to be executed by the agent +- `**kwargs` (`dict`): Additional parameters passed to agent's run method -```python -def __init__( - agent: Optional[Union[Agent, Callable]] = None, - interval: Optional[str] = None, - job_id: Optional[str] = None -) -``` +**Returns:** +- `Any`: Result of the cron job execution + +**Raises:** +- `CronJobConfigError`: If agent or interval is not configured +- `CronJobExecutionError`: If task execution fails + +### `__call__(task: str, **kwargs) -> Any` + +Callable interface for the CronJob instance. + +**Parameters:** +- `task` (`str`): Task string to be executed +- `**kwargs` (`dict`): Additional parameters passed to agent's run method + +**Returns:** +- `Any`: Result of the task execution + +### `start() -> None` + +Starts the scheduled job in a separate thread. + +**Raises:** +- `CronJobExecutionError`: If the job fails to start + +### `stop() -> None` + +Stops the scheduled job gracefully. + +**Raises:** +- `CronJobExecutionError`: If the job fails to stop properly -| Parameter | Type | Description | Required | -|-----------|------|-------------|-----------| -| agent | Agent or Callable | The Swarms Agent instance or callable to be scheduled | No | -| interval | str | The interval string (e.g., "5seconds", "10minutes", "1hour") | No | -| job_id | str | Unique identifier for the job. If not provided, one will be generated | No | +### `set_callback(callback: Callable[[Any, str, dict], Any]) -> None` -### Methods +Sets or updates the callback function for output customization. -#### run +**Parameters:** +- `callback` (`Callable[[Any, str, dict], Any]`): Function to customize output processing + +### `get_execution_stats() -> Dict[str, Any]` + +Retrieves execution statistics for the cron job. + +**Returns:** +- `Dict[str, Any]`: Dictionary containing: + - `job_id` (`str`): Unique job identifier + - `is_running` (`bool`): Current execution status + - `execution_count` (`int`): Number of executions completed + - `start_time` (`Optional[float]`): Job start timestamp + - `uptime` (`float`): Seconds since job started + - `interval` (`str`): Scheduled execution interval + +## Callback Function Signature ```python -def run(task: str, **kwargs) +def callback_function( + output: Any, # Original output from the agent + task: str, # Task that was executed + metadata: dict # Job execution metadata +) -> Any: # Customized output (any type) + pass ``` -| Parameter | Type | Description | Required | -|-----------|------|-------------|-----------| -| task | str | The task string to be executed by the agent | Yes | -| **kwargs | dict | Additional parameters to pass to the agent's run method | No | +### Callback Metadata Dictionary + +| Key | Type | Description | +|-----|------|-------------| +| `job_id` | `str` | Unique job identifier | +| `timestamp` | `float` | Execution timestamp (Unix time) | +| `execution_count` | `int` | Sequential execution number | +| `task` | `str` | The task string that was executed | +| `kwargs` | `dict` | Additional parameters passed to agent | +| `start_time` | `Optional[float]` | Job start timestamp | +| `is_running` | `bool` | Current job status | + +## Interval Format + +The `interval` parameter accepts strings in the format `"Xunit"`: + +| Unit | Examples | Description | +|------|----------|-------------| +| `seconds` | `"5seconds"`, `"30seconds"` | Execute every X seconds | +| `minutes` | `"1minute"`, `"15minutes"` | Execute every X minutes | +| `hours` | `"1hour"`, `"6hours"` | Execute every X hours | + +## Exceptions -#### __call__ +### `CronJobError` +Base exception class for all CronJob errors. + +### `CronJobConfigError` +Raised for configuration errors (invalid agent, interval format, etc.). + +### `CronJobScheduleError` +Raised for scheduling-related errors. + +### `CronJobExecutionError` +Raised for execution-related errors (start/stop failures, task execution failures). + +## Type Definitions ```python -def __call__(task: str, **kwargs) -``` +from typing import Any, Callable, Dict, Optional, Union + +# Agent type can be any callable or object with run method +AgentType = Union[Any, Callable] -| Parameter | Type | Description | Required | -|-----------|------|-------------|-----------| -| task | str | The task string to be executed | Yes | -| **kwargs | dict | Additional parameters to pass to the agent's run method | No | +# Callback function signature +CallbackType = Callable[[Any, str, Dict[str, Any]], Any] + +# Execution statistics return type +StatsType = Dict[str, Any] +``` -## Examples +## Quick Start Examples -### Basic Usage with Swarms Agent +### Basic Usage ```python from swarms import Agent, CronJob -from loguru import logger -# Initialize the agent -agent = Agent( - agent_name="Quantitative-Trading-Agent", - agent_description="Advanced quantitative trading and algorithmic analysis agent", - system_prompt="""You are an expert quantitative trading agent...""", - max_loops=1, - model_name="gpt-4.1", - dynamic_temperature_enabled=True, - output_type="str-all-except-first", - streaming_on=True, - print_on=True, - telemetry_enable=False, -) - -# Create and run a cron job every 10 seconds -logger.info("Starting example cron job") -cron_job = CronJob(agent=agent, interval="10seconds") -cron_job.run( - task="What are the best top 3 etfs for gold coverage?" -) +# Simple agent cron job +agent = Agent(agent_name="MyAgent", ...) +cron_job = CronJob(agent=agent, interval="30seconds") +cron_job.run("Analyze market trends") ``` -### Using with a Custom Function +### With Custom Function ```python -def custom_task(task: str): - print(f"Executing task: {task}") - return "Task completed" +def my_task(task: str) -> str: + return f"Completed: {task}" -# Create a cron job with a custom function -cron_job = CronJob( - agent=custom_task, - interval="5minutes", - job_id="custom_task_job" -) -cron_job.run("Perform analysis") +cron_job = CronJob(agent=my_task, interval="1minute") +cron_job.run("Process data") ``` +### With Callback -### Cron Jobs With Multi-Agent Structures +```python +def callback(output, task, metadata): + return {"result": output, "count": metadata["execution_count"]} -You can also run Cron Jobs with multi-agent structures like `SequentialWorkflow`, `ConcurrentWorkflow`, `HiearchicalSwarm`, and other methods. +cron_job = CronJob( + agent=agent, + interval="30seconds", + callback=callback +) +``` -- Just initialize the class as the agent parameter in the `CronJob(agent=swarm)` -- Input your arguments into the `.run(task: str)` method +## Full Examples +### Complete Agent with Callback ```python -""" -Cryptocurrency Concurrent Multi-Agent Cron Job Example +from swarms import Agent, CronJob +from datetime import datetime +import json -This example demonstrates how to use ConcurrentWorkflow with CronJob to create -a powerful cryptocurrency tracking system. Each specialized agent analyzes a -specific cryptocurrency concurrently every minute. +# Create agent +agent = Agent( + agent_name="Financial-Analyst", + system_prompt="You are a financial analyst. Analyze market data and provide insights.", + model_name="gpt-4o-mini", + max_loops=1 +) -Features: -- ConcurrentWorkflow for parallel agent execution -- CronJob scheduling for automated runs every 1 minute -- Each agent specializes in analyzing one specific cryptocurrency -- Real-time data fetching from CoinGecko API -- Concurrent analysis of multiple cryptocurrencies -- Structured output with professional formatting +# Advanced callback with monitoring +class AdvancedCallback: + def __init__(self): + self.history = [] + self.error_count = 0 + + def __call__(self, output, task, metadata): + # Track execution + execution_data = { + "output": output, + "execution_id": metadata["execution_count"], + "timestamp": datetime.fromtimestamp(metadata["timestamp"]).isoformat(), + "task": task, + "job_id": metadata["job_id"], + "success": bool(output and "error" not in str(output).lower()) + } + + if not execution_data["success"]: + self.error_count += 1 + + self.history.append(execution_data) + + # Keep only last 100 executions + if len(self.history) > 100: + self.history.pop(0) + + return execution_data + + def get_stats(self): + return { + "total_executions": len(self.history), + "error_count": self.error_count, + "success_rate": (len(self.history) - self.error_count) / len(self.history) if self.history else 0 + } + +# Use advanced callback +callback = AdvancedCallback() +cron_job = CronJob( + agent=agent, + interval="2minutes", + job_id="financial_analysis_job", + callback=callback +) -Architecture: -CronJob -> ConcurrentWorkflow -> [Bitcoin Agent, Ethereum Agent, Solana Agent, etc.] -> Parallel Analysis -""" +# Run the cron job +try: + cron_job.run("Analyze current market conditions and provide investment recommendations") +except KeyboardInterrupt: + cron_job.stop() + print("Final stats:", json.dumps(callback.get_stats(), indent=2)) +``` -from typing import List -from loguru import logger +### Multi-Agent Workflow with CronJob +```python from swarms import Agent, CronJob, ConcurrentWorkflow -from swarms_tools import coin_gecko_coin_api - - -def create_crypto_specific_agents() -> List[Agent]: - """ - Creates agents that each specialize in analyzing a specific cryptocurrency. - - Returns: - List[Agent]: List of cryptocurrency-specific Agent instances - """ - - # Bitcoin Specialist Agent - bitcoin_agent = Agent( - agent_name="Bitcoin-Analyst", - agent_description="Expert analyst specializing exclusively in Bitcoin (BTC) analysis and market dynamics", - system_prompt="""You are a Bitcoin specialist and expert analyst. Your expertise includes: - -BITCOIN SPECIALIZATION: -- Bitcoin's unique position as digital gold -- Bitcoin halving cycles and their market impact -- Bitcoin mining economics and hash rate analysis -- Lightning Network and Layer 2 developments -- Bitcoin adoption by institutions and countries -- Bitcoin's correlation with traditional markets -- Bitcoin technical analysis and on-chain metrics -- Bitcoin's role as a store of value and hedge against inflation - -ANALYSIS FOCUS: -- Analyze ONLY Bitcoin data from the provided dataset -- Focus on Bitcoin-specific metrics and trends -- Consider Bitcoin's unique market dynamics -- Evaluate Bitcoin's dominance and market leadership -- Assess institutional adoption trends -- Monitor on-chain activity and network health - -DELIVERABLES: -- Bitcoin-specific analysis and insights -- Price action assessment and predictions -- Market dominance analysis -- Institutional adoption impact -- Technical and fundamental outlook -- Risk factors specific to Bitcoin - -Extract Bitcoin data from the provided dataset and provide comprehensive Bitcoin-focused analysis.""", - model_name="groq/moonshotai/kimi-k2-instruct", - max_loops=1, - dynamic_temperature_enabled=True, - streaming_on=False, - tools=[coin_gecko_coin_api], - ) - - # Ethereum Specialist Agent - ethereum_agent = Agent( - agent_name="Ethereum-Analyst", - agent_description="Expert analyst specializing exclusively in Ethereum (ETH) analysis and ecosystem development", - system_prompt="""You are an Ethereum specialist and expert analyst. Your expertise includes: - -ETHEREUM SPECIALIZATION: -- Ethereum's smart contract platform and DeFi ecosystem -- Ethereum 2.0 transition and proof-of-stake mechanics -- Gas fees, network usage, and scalability solutions -- Layer 2 solutions (Arbitrum, Optimism, Polygon) -- DeFi protocols and TVL (Total Value Locked) analysis -- NFT markets and Ethereum's role in digital assets -- Developer activity and ecosystem growth -- EIP proposals and network upgrades - -ANALYSIS FOCUS: -- Analyze ONLY Ethereum data from the provided dataset -- Focus on Ethereum's platform utility and network effects -- Evaluate DeFi ecosystem health and growth -- Assess Layer 2 adoption and scalability solutions -- Monitor network usage and gas fee trends -- Consider Ethereum's competitive position vs other smart contract platforms - -DELIVERABLES: -- Ethereum-specific analysis and insights -- Platform utility and adoption metrics -- DeFi ecosystem impact assessment -- Network health and scalability evaluation -- Competitive positioning analysis -- Technical and fundamental outlook for ETH - -Extract Ethereum data from the provided dataset and provide comprehensive Ethereum-focused analysis.""", - model_name="groq/moonshotai/kimi-k2-instruct", - max_loops=1, - dynamic_temperature_enabled=True, - streaming_on=False, - tools=[coin_gecko_coin_api], - ) - - # Solana Specialist Agent - solana_agent = Agent( - agent_name="Solana-Analyst", - agent_description="Expert analyst specializing exclusively in Solana (SOL) analysis and ecosystem development", - system_prompt="""You are a Solana specialist and expert analyst. Your expertise includes: - -SOLANA SPECIALIZATION: -- Solana's high-performance blockchain architecture -- Proof-of-History consensus mechanism -- Solana's DeFi ecosystem and DEX platforms (Serum, Raydium) -- NFT marketplaces and creator economy on Solana -- Network outages and reliability concerns -- Developer ecosystem and Rust programming adoption -- Validator economics and network decentralization -- Cross-chain bridges and interoperability - -ANALYSIS FOCUS: -- Analyze ONLY Solana data from the provided dataset -- Focus on Solana's performance and scalability advantages -- Evaluate network stability and uptime improvements -- Assess ecosystem growth and developer adoption -- Monitor DeFi and NFT activity on Solana -- Consider Solana's competitive position vs Ethereum - -DELIVERABLES: -- Solana-specific analysis and insights -- Network performance and reliability assessment -- Ecosystem growth and adoption metrics -- DeFi and NFT market analysis -- Competitive advantages and challenges -- Technical and fundamental outlook for SOL - -Extract Solana data from the provided dataset and provide comprehensive Solana-focused analysis.""", - model_name="groq/moonshotai/kimi-k2-instruct", - max_loops=1, - dynamic_temperature_enabled=True, - streaming_on=False, - tools=[coin_gecko_coin_api], - ) - - # Cardano Specialist Agent - cardano_agent = Agent( - agent_name="Cardano-Analyst", - agent_description="Expert analyst specializing exclusively in Cardano (ADA) analysis and research-driven development", - system_prompt="""You are a Cardano specialist and expert analyst. Your expertise includes: - -CARDANO SPECIALIZATION: -- Cardano's research-driven development approach -- Ouroboros proof-of-stake consensus protocol -- Smart contract capabilities via Plutus and Marlowe -- Cardano's three-layer architecture (settlement, computation, control) -- Academic partnerships and peer-reviewed research -- Cardano ecosystem projects and DApp development -- Native tokens and Cardano's UTXO model -- Sustainability and treasury funding mechanisms - -ANALYSIS FOCUS: -- Analyze ONLY Cardano data from the provided dataset -- Focus on Cardano's methodical development approach -- Evaluate smart contract adoption and ecosystem growth -- Assess academic partnerships and research contributions -- Monitor native token ecosystem development -- Consider Cardano's long-term roadmap and milestones - -DELIVERABLES: -- Cardano-specific analysis and insights -- Development progress and milestone achievements -- Smart contract ecosystem evaluation -- Academic research impact assessment -- Native token and DApp adoption metrics -- Technical and fundamental outlook for ADA - -Extract Cardano data from the provided dataset and provide comprehensive Cardano-focused analysis.""", - model_name="groq/moonshotai/kimi-k2-instruct", - max_loops=1, - dynamic_temperature_enabled=True, - streaming_on=False, - tools=[coin_gecko_coin_api], - ) - - # Binance Coin Specialist Agent - bnb_agent = Agent( - agent_name="BNB-Analyst", - agent_description="Expert analyst specializing exclusively in BNB analysis and Binance ecosystem dynamics", - system_prompt="""You are a BNB specialist and expert analyst. Your expertise includes: - -BNB SPECIALIZATION: -- BNB's utility within the Binance ecosystem -- Binance Smart Chain (BSC) development and adoption -- BNB token burns and deflationary mechanics -- Binance exchange volume and market leadership -- BSC DeFi ecosystem and yield farming -- Cross-chain bridges and multi-chain strategies -- Regulatory challenges facing Binance globally -- BNB's role in transaction fee discounts and platform benefits - -ANALYSIS FOCUS: -- Analyze ONLY BNB data from the provided dataset -- Focus on BNB's utility value and exchange benefits -- Evaluate BSC ecosystem growth and competition with Ethereum -- Assess token burn impact on supply and price -- Monitor Binance platform developments and regulations -- Consider BNB's centralized vs decentralized aspects - -DELIVERABLES: -- BNB-specific analysis and insights -- Utility value and ecosystem benefits assessment -- BSC adoption and DeFi growth evaluation -- Token economics and burn mechanism impact -- Regulatory risk and compliance analysis -- Technical and fundamental outlook for BNB - -Extract BNB data from the provided dataset and provide comprehensive BNB-focused analysis.""", - model_name="groq/moonshotai/kimi-k2-instruct", - max_loops=1, - dynamic_temperature_enabled=True, - streaming_on=False, - tools=[coin_gecko_coin_api], - ) - - # XRP Specialist Agent - xrp_agent = Agent( - agent_name="XRP-Analyst", - agent_description="Expert analyst specializing exclusively in XRP analysis and cross-border payment solutions", - system_prompt="""You are an XRP specialist and expert analyst. Your expertise includes: - -XRP SPECIALIZATION: -- XRP's role in cross-border payments and remittances -- RippleNet adoption by financial institutions -- Central Bank Digital Currency (CBDC) partnerships -- Regulatory landscape and SEC lawsuit implications -- XRP Ledger's consensus mechanism and energy efficiency -- On-Demand Liquidity (ODL) usage and growth -- Competition with SWIFT and traditional payment rails -- Ripple's partnerships with banks and payment providers - -ANALYSIS FOCUS: -- Analyze ONLY XRP data from the provided dataset -- Focus on XRP's utility in payments and remittances -- Evaluate RippleNet adoption and institutional partnerships -- Assess regulatory developments and legal clarity -- Monitor ODL usage and transaction volumes -- Consider XRP's competitive position in payments - -DELIVERABLES: -- XRP-specific analysis and insights -- Payment utility and adoption assessment -- Regulatory landscape and legal developments -- Institutional partnership impact evaluation -- Cross-border payment market analysis -- Technical and fundamental outlook for XRP - -Extract XRP data from the provided dataset and provide comprehensive XRP-focused analysis.""", - model_name="groq/moonshotai/kimi-k2-instruct", - max_loops=1, - dynamic_temperature_enabled=True, - streaming_on=False, - tools=[coin_gecko_coin_api], - ) - - return [ - bitcoin_agent, - ethereum_agent, - solana_agent, - cardano_agent, - bnb_agent, - xrp_agent, - ] - - -def create_crypto_workflow() -> ConcurrentWorkflow: - """ - Creates a ConcurrentWorkflow with cryptocurrency-specific analysis agents. - - Returns: - ConcurrentWorkflow: Configured workflow for crypto analysis - """ - agents = create_crypto_specific_agents() - - workflow = ConcurrentWorkflow( - name="Crypto-Specific-Analysis-Workflow", - description="Concurrent execution of cryptocurrency-specific analysis agents", - agents=agents, - max_loops=1, - ) - - return workflow - - -def create_crypto_cron_job() -> CronJob: - """ - Creates a CronJob that runs cryptocurrency-specific analysis every minute using ConcurrentWorkflow. - - Returns: - CronJob: Configured cron job for automated crypto analysis - """ - # Create the concurrent workflow - workflow = create_crypto_workflow() - - # Create the cron job - cron_job = CronJob( - agent=workflow, # Use the workflow as the agent - interval="5seconds", # Run every 1 minute - ) - - return cron_job - - -def main(): - """ - Main function to run the cryptocurrency-specific concurrent analysis cron job. - """ - cron_job = create_crypto_cron_job() - - prompt = """ - - Conduct a comprehensive analysis of your assigned cryptocurrency. - - """ +import json + +# Create specialized agents +bitcoin_agent = Agent( + agent_name="Bitcoin-Analyst", + system_prompt="You are a Bitcoin specialist. Focus only on Bitcoin analysis.", + model_name="gpt-4o-mini", + max_loops=1 +) - # Start the cron job - logger.info("🔄 Starting automated analysis loop...") - logger.info("⏰ Press Ctrl+C to stop the cron job") +ethereum_agent = Agent( + agent_name="Ethereum-Analyst", + system_prompt="You are an Ethereum specialist. Focus only on Ethereum analysis.", + model_name="gpt-4o-mini", + max_loops=1 +) - output = cron_job.run(task=prompt) - print(output) +# Create concurrent workflow +workflow = ConcurrentWorkflow( + name="Crypto-Analysis-Workflow", + agents=[bitcoin_agent, ethereum_agent], + max_loops=1 +) +# Workflow callback +def workflow_callback(output, task, metadata): + """Process multi-agent workflow output.""" + return { + "workflow_results": output, + "execution_id": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "agents_count": len(workflow.agents), + "task": task, + "metadata": { + "job_id": metadata["job_id"], + "uptime": metadata.get("uptime", 0) + } + } + +# Create workflow cron job +workflow_cron = CronJob( + agent=workflow, + interval="5minutes", + job_id="crypto_workflow_job", + callback=workflow_callback +) -if __name__ == "__main__": - main() +# Run workflow cron job +workflow_cron.run("Analyze your assigned cryptocurrency and provide market insights") ``` -## Conclusion +### API Integration Example -The CronJob class provides a powerful way to schedule and automate tasks using Swarms Agents or custom functions. Key benefits include: +```python +import requests +from swarms import Agent, CronJob +import json -- Easy integration with Swarms Agents +# Create agent +agent = Agent( + agent_name="News-Analyst", + system_prompt="Analyze news and provide summaries.", + model_name="gpt-4o-mini", + max_loops=1 +) -- Flexible interval scheduling +# API webhook callback +def api_callback(output, task, metadata): + """Send results to external API.""" + payload = { + "data": output, + "source": "swarms_cronjob", + "job_id": metadata["job_id"], + "execution_id": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "task": task + } + + try: + # Send to webhook (replace with your URL) + response = requests.post( + "https://api.example.com/webhook", + json=payload, + timeout=30 + ) + + return { + "original_output": output, + "api_status": "sent", + "api_response_code": response.status_code, + "execution_id": metadata["execution_count"] + } + except requests.RequestException as e: + return { + "original_output": output, + "api_status": "failed", + "error": str(e), + "execution_id": metadata["execution_count"] + } + +# Database logging callback +def db_callback(output, task, metadata): + """Log to database (pseudo-code).""" + # db.execute( + # "INSERT INTO cron_results (job_id, output, timestamp) VALUES (?, ?, ?)", + # (metadata["job_id"], output, metadata["timestamp"]) + # ) + + return { + "output": output, + "logged_to_db": True, + "execution_id": metadata["execution_count"] + } + +# Create cron job with API integration +api_cron_job = CronJob( + agent=agent, + interval="10minutes", + job_id="news_analysis_api_job", + callback=api_callback +) -- Thread-safe execution +# Dynamic callback switching example +db_cron_job = CronJob( + agent=agent, + interval="1hour", + job_id="news_analysis_db_job" +) -- Graceful error handling +# Start with API callback +db_cron_job.set_callback(api_callback) -- Simple API for task scheduling +# Later switch to database callback +# db_cron_job.set_callback(db_callback) -- Support for both agent and callable-based tasks \ No newline at end of file +# Get execution statistics +stats = db_cron_job.get_execution_stats() +print(f"Job statistics: {json.dumps(stats, indent=2)}") +``` \ No newline at end of file diff --git a/examples/agent_judge_examples/example1_basic_evaluation.py b/examples/agent_judge_examples/example1_basic_evaluation.py new file mode 100644 index 00000000..e61ce6bc --- /dev/null +++ b/examples/agent_judge_examples/example1_basic_evaluation.py @@ -0,0 +1,12 @@ +from swarms.agents.agent_judge import AgentJudge + +# Initialize the agent judge +judge = AgentJudge( + agent_name="quality-judge", model_name="gpt-4", max_loops=2 +) + +# Example agent output to evaluate +agent_output = "The capital of France is Paris. The city is known for its famous Eiffel Tower and delicious croissants. The population is approximately 2.1 million people." + +# Run evaluation with context building +evaluations = judge.run(task=agent_output) diff --git a/examples/agent_judge_examples/example2_technical_evaluation.py b/examples/agent_judge_examples/example2_technical_evaluation.py new file mode 100644 index 00000000..f8ddf33b --- /dev/null +++ b/examples/agent_judge_examples/example2_technical_evaluation.py @@ -0,0 +1,21 @@ +from swarms.agents.agent_judge import AgentJudge + +# Initialize the agent judge with custom evaluation criteria +judge = AgentJudge( + agent_name="technical-judge", + model_name="gpt-4", + max_loops=1, + evaluation_criteria={ + "accuracy": 0.4, + "completeness": 0.3, + "clarity": 0.2, + "logic": 0.1, + }, +) + +# Example technical agent output to evaluate +technical_output = "To solve the quadratic equation x² + 5x + 6 = 0, we can use the quadratic formula: x = (-b ± √(b² - 4ac)) / 2a. Here, a=1, b=5, c=6. Substituting: x = (-5 ± √(25 - 24)) / 2 = (-5 ± √1) / 2 = (-5 ± 1) / 2. So x = -2 or x = -3." + +# Run evaluation with context building +evaluations = judge.run(task=technical_output) +print(evaluations) diff --git a/examples/agent_judge_examples/example3_creative_evaluation.py b/examples/agent_judge_examples/example3_creative_evaluation.py new file mode 100644 index 00000000..5e379132 --- /dev/null +++ b/examples/agent_judge_examples/example3_creative_evaluation.py @@ -0,0 +1,20 @@ +from swarms.agents.agent_judge import AgentJudge + +# Initialize the agent judge for creative content evaluation +judge = AgentJudge( + agent_name="creative-judge", + model_name="gpt-4", + max_loops=2, + evaluation_criteria={ + "creativity": 0.4, + "originality": 0.3, + "engagement": 0.2, + "coherence": 0.1, + }, +) + +# Example creative agent output to evaluate +creative_output = "The moon hung like a silver coin in the velvet sky, casting shadows that danced with the wind. Ancient trees whispered secrets to the stars, while time itself seemed to pause in reverence of this magical moment. The world held its breath, waiting for the next chapter of the eternal story." + +# Run evaluation with context building +evaluations = judge.run(task=creative_output) diff --git a/examples/deployment/cron_job_examples/callback_cron_example.py b/examples/deployment/cron_job_examples/callback_cron_example.py new file mode 100644 index 00000000..b6293350 --- /dev/null +++ b/examples/deployment/cron_job_examples/callback_cron_example.py @@ -0,0 +1,313 @@ +""" +Callback CronJob Example + +This example demonstrates how to use the new callback functionality in CronJob +to customize output processing while the cron job is still running. +""" + +import json +import time +from datetime import datetime +from typing import Any, Dict +from loguru import logger + +from swarms import Agent, CronJob + + +def create_sample_agent(): + """Create a sample agent for demonstration.""" + return Agent( + agent_name="Sample-Analysis-Agent", + system_prompt="""You are a data analysis agent. Analyze the given data and provide insights. + Keep your responses concise and focused on key findings.""", + model_name="gpt-4o-mini", + max_loops=1, + print_on=False, + ) + + +# Example 1: Simple output transformation callback +def transform_output_callback(output: Any, task: str, metadata: Dict) -> Dict: + """Transform the agent output into a structured format. + + Args: + output: The original output from the agent + task: The task that was executed + metadata: Job metadata including execution count, timestamp, etc. + + Returns: + Dict: Transformed output with additional metadata + """ + return { + "original_output": output, + "transformed_at": datetime.fromtimestamp(metadata["timestamp"]).isoformat(), + "execution_number": metadata["execution_count"], + "task_executed": task, + "job_status": "running" if metadata["is_running"] else "stopped", + "uptime_seconds": metadata["uptime"] if metadata["start_time"] else 0 + } + + +# Example 2: Output filtering and enhancement callback +def filter_and_enhance_callback(output: Any, task: str, metadata: Dict) -> Dict: + """Filter and enhance the output based on execution count and content. + + Args: + output: The original output from the agent + task: The task that was executed + metadata: Job metadata + + Returns: + Dict: Filtered and enhanced output + """ + # Only include outputs that contain certain keywords + if isinstance(output, str): + if any(keyword in output.lower() for keyword in ["important", "key", "significant", "trend"]): + enhanced_output = { + "content": output, + "priority": "high", + "execution_id": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "analysis_type": "priority_content" + } + else: + enhanced_output = { + "content": output, + "priority": "normal", + "execution_id": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "analysis_type": "standard_content" + } + else: + enhanced_output = { + "content": str(output), + "priority": "unknown", + "execution_id": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "analysis_type": "non_string_content" + } + + return enhanced_output + + +# Example 3: Real-time monitoring callback +class MonitoringCallback: + """Callback class that provides real-time monitoring capabilities.""" + + def __init__(self): + self.output_history = [] + self.error_count = 0 + self.success_count = 0 + self.last_execution_time = None + + def __call__(self, output: Any, task: str, metadata: Dict) -> Dict: + """Monitor and track execution metrics. + + Args: + output: The original output from the agent + task: The task that was executed + metadata: Job metadata + + Returns: + Dict: Output with monitoring information + """ + current_time = time.time() + + # Calculate execution time + if self.last_execution_time: + execution_time = current_time - self.last_execution_time + else: + execution_time = 0 + + self.last_execution_time = current_time + + # Track success/error + if output and output != "Error": + self.success_count += 1 + status = "success" + else: + self.error_count += 1 + status = "error" + + # Store in history (keep last 100) + monitoring_data = { + "output": output, + "status": status, + "execution_time": execution_time, + "execution_count": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "task": task, + "metrics": { + "success_rate": self.success_count / (self.success_count + self.error_count), + "total_executions": self.success_count + self.error_count, + "error_count": self.error_count, + "success_count": self.success_count + } + } + + self.output_history.append(monitoring_data) + if len(self.output_history) > 100: + self.output_history.pop(0) + + return monitoring_data + + def get_summary(self) -> Dict: + """Get monitoring summary.""" + return { + "total_executions": self.success_count + self.error_count, + "success_count": self.success_count, + "error_count": self.error_count, + "success_rate": self.success_count / (self.success_count + self.error_count) if (self.success_count + self.error_count) > 0 else 0, + "history_length": len(self.output_history), + "last_execution_time": self.last_execution_time + } + + +# Example 4: API integration callback +def api_webhook_callback(output: Any, task: str, metadata: Dict) -> Dict: + """Callback that could send output to an external API. + + Args: + output: The original output from the agent + task: The task that was executed + metadata: Job metadata + + Returns: + Dict: Output with API integration metadata + """ + # In a real implementation, you would send this to your API + api_payload = { + "data": output, + "source": "cron_job", + "job_id": metadata["job_id"], + "execution_id": metadata["execution_count"], + "timestamp": metadata["timestamp"], + "task": task + } + + # Simulate API call (replace with actual HTTP request) + logger.info(f"Would send to API: {json.dumps(api_payload, indent=2)}") + + return { + "output": output, + "api_status": "sent", + "api_payload": api_payload, + "execution_id": metadata["execution_count"] + } + + +def main(): + """Demonstrate different callback usage patterns.""" + logger.info("🚀 Starting Callback CronJob Examples") + + # Create the agent + agent = create_sample_agent() + + # Example 1: Simple transformation callback + logger.info("📝 Example 1: Simple Output Transformation") + transform_cron = CronJob( + agent=agent, + interval="15seconds", + job_id="transform-example", + callback=transform_output_callback + ) + + # Example 2: Filtering and enhancement callback + logger.info("🔍 Example 2: Output Filtering and Enhancement") + filter_cron = CronJob( + agent=agent, + interval="20seconds", + job_id="filter-example", + callback=filter_and_enhance_callback + ) + + # Example 3: Monitoring callback + logger.info("📊 Example 3: Real-time Monitoring") + monitoring_callback = MonitoringCallback() + monitoring_cron = CronJob( + agent=agent, + interval="25seconds", + job_id="monitoring-example", + callback=monitoring_callback + ) + + # Example 4: API integration callback + logger.info("🌐 Example 4: API Integration") + api_cron = CronJob( + agent=agent, + interval="30seconds", + job_id="api-example", + callback=api_webhook_callback + ) + + # Start all cron jobs + logger.info("▶️ Starting all cron jobs...") + + # Start them in separate threads to run concurrently + import threading + + def run_cron(cron_job, task): + try: + cron_job.run(task=task) + except KeyboardInterrupt: + cron_job.stop() + + # Start each cron job in its own thread + threads = [] + tasks = [ + "Analyze the current market trends and provide key insights", + "What are the most important factors affecting today's economy?", + "Provide a summary of recent technological developments", + "Analyze the impact of current events on business operations" + ] + + for i, (cron_job, task) in enumerate([ + (transform_cron, tasks[0]), + (filter_cron, tasks[1]), + (monitoring_cron, tasks[2]), + (api_cron, tasks[3]) + ]): + thread = threading.Thread( + target=run_cron, + args=(cron_job, task), + daemon=True, + name=f"cron-thread-{i}" + ) + thread.start() + threads.append(thread) + + logger.info("✅ All cron jobs started successfully!") + logger.info("📊 Press Ctrl+C to stop and see monitoring summary") + + try: + # Let them run for a while + time.sleep(120) # Run for 2 minutes + + # Show monitoring summary + logger.info("📈 Monitoring Summary:") + logger.info(json.dumps(monitoring_callback.get_summary(), indent=2)) + + # Show execution stats for each cron job + for cron_job, name in [ + (transform_cron, "Transform"), + (filter_cron, "Filter"), + (monitoring_cron, "Monitoring"), + (api_cron, "API") + ]: + stats = cron_job.get_execution_stats() + logger.info(f"{name} Cron Stats: {json.dumps(stats, indent=2)}") + + except KeyboardInterrupt: + logger.info("⏹️ Stopping all cron jobs...") + + # Stop all cron jobs + for cron_job in [transform_cron, filter_cron, monitoring_cron, api_cron]: + cron_job.stop() + + # Show final monitoring summary + logger.info("📊 Final Monitoring Summary:") + logger.info(json.dumps(monitoring_callback.get_summary(), indent=2)) + + +if __name__ == "__main__": + main() diff --git a/examples/deployment/cron_job_examples/simple_callback_example.py b/examples/deployment/cron_job_examples/simple_callback_example.py new file mode 100644 index 00000000..5ffe8c58 --- /dev/null +++ b/examples/deployment/cron_job_examples/simple_callback_example.py @@ -0,0 +1,81 @@ +""" +Simple Callback CronJob Example + +This example shows the basic usage of the new callback functionality +in CronJob to customize output while the job is running. +""" + +import json +import time +from datetime import datetime +from loguru import logger + +from swarms import Agent, CronJob + + +def create_simple_agent(): + """Create a simple agent for demonstration.""" + return Agent( + agent_name="Simple-Analysis-Agent", + system_prompt="You are a simple analysis agent. Provide brief insights on the given topic.", + model_name="gpt-4o-mini", + max_loops=1, + print_on=False, + ) + + +def simple_callback(output, task, metadata): + """Simple callback that adds metadata to the output. + + Args: + output: The original output from the agent + task: The task that was executed + metadata: Job metadata (execution count, timestamp, etc.) + + Returns: + dict: Enhanced output with metadata + """ + return { + "agent_output": output, + "execution_number": metadata["execution_count"], + "timestamp": datetime.fromtimestamp(metadata["timestamp"]).isoformat(), + "task": task, + "job_id": metadata["job_id"] + } + + +def main(): + """Demonstrate basic callback usage.""" + logger.info("🚀 Starting Simple Callback Example") + + # Create agent and cron job with callback + agent = create_simple_agent() + + cron_job = CronJob( + agent=agent, + interval="10seconds", + job_id="simple-callback-example", + callback=simple_callback + ) + + logger.info("▶️ Starting cron job with callback...") + logger.info("📝 The callback will enhance each output with metadata") + logger.info("⏹️ Press Ctrl+C to stop") + + try: + # Start the cron job + cron_job.run( + task="What are the key trends in artificial intelligence today?" + ) + except KeyboardInterrupt: + logger.info("⏹️ Stopping cron job...") + cron_job.stop() + + # Show execution statistics + stats = cron_job.get_execution_stats() + logger.info("📊 Final Statistics:") + logger.info(json.dumps(stats, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/examples/rag/qdrant_rag_example.py b/examples/rag/qdrant_rag_example.py index 0277fd31..b39e0c1e 100644 --- a/examples/rag/qdrant_rag_example.py +++ b/examples/rag/qdrant_rag_example.py @@ -19,9 +19,10 @@ from swarms_memory import QdrantDB # Option 3: Qdrant Cloud (recommended for production) import os + client = QdrantClient( - url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"), - api_key=os.getenv("QDRANT_API_KEY", "your-api-key") + url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"), + api_key=os.getenv("QDRANT_API_KEY", "your-api-key"), ) # Create QdrantDB wrapper for RAG operations @@ -30,7 +31,7 @@ rag_db = QdrantDB( embedding_model="text-embedding-3-small", collection_name="knowledge_base", distance=models.Distance.COSINE, - n_results=3 + n_results=3, ) # Add documents to the knowledge base @@ -38,7 +39,7 @@ documents = [ "Qdrant is a vector database optimized for similarity search and AI applications.", "RAG combines retrieval and generation for more accurate AI responses.", "Vector embeddings enable semantic search across documents.", - "The swarms framework supports multiple memory backends including Qdrant." + "The swarms framework supports multiple memory backends including Qdrant.", ] # Method 1: Add documents individually @@ -54,7 +55,7 @@ for doc in documents: # "Computer vision allows machines to interpret visual information.", # "Reinforcement learning learns through interaction with an environment." # ] -# +# # metadata = [ # {"category": "AI", "difficulty": "beginner", "topic": "overview"}, # {"category": "ML", "difficulty": "intermediate", "topic": "neural_networks"}, @@ -62,18 +63,18 @@ for doc in documents: # {"category": "CV", "difficulty": "advanced", "topic": "vision"}, # {"category": "RL", "difficulty": "advanced", "topic": "learning"} # ] -# +# # # Batch add with metadata # doc_ids = rag_db.batch_add(documents_with_metadata, metadata=metadata, batch_size=3) # print(f"Added {len(doc_ids)} documents in batch") -# +# # # Query with metadata return # results_with_metadata = rag_db.query( -# "What is artificial intelligence?", -# n_results=3, +# "What is artificial intelligence?", +# n_results=3, # return_metadata=True # ) -# +# # for i, result in enumerate(results_with_metadata): # print(f"\nResult {i+1}:") # print(f" Document: {result['document']}") @@ -89,9 +90,9 @@ agent = Agent( model_name="gpt-4o", max_loops=1, dynamic_temperature_enabled=True, - long_term_memory=rag_db + long_term_memory=rag_db, ) # Query with RAG response = agent.run("What is Qdrant and how does it relate to RAG?") -print(response) \ No newline at end of file +print(response) diff --git a/swarms/agents/agent_judge.py b/swarms/agents/agent_judge.py index f1cdfdb2..27801012 100644 --- a/swarms/agents/agent_judge.py +++ b/swarms/agents/agent_judge.py @@ -1,34 +1,131 @@ import traceback - -from typing import List, Optional, Union, Dict - import uuid +from typing import Dict, List, Optional -from swarms.prompts.agent_judge_prompt import AGENT_JUDGE_PROMPT from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation -from swarms.utils.any_to_str import any_to_str +# ============================================================================= +# PROMPT FUNCTIONS FOR AGENT JUDGE +# ============================================================================= -class AgentJudgeInitializationError(Exception): + +def get_reward(input: str) -> int: """ - Exception raised when there is an error initializing the AgentJudge. + Determines whether the input contains any positive evaluation keywords and returns a reward. + + This function checks if the input string contains any of the following words (case-insensitive): + "correct", "good", "excellent", or "perfect". If any of these words are present, the function + returns 1 as a reward, otherwise it returns 0. + + Args: + input (str): The input string to evaluate. + + Returns: + int: 1 if a positive evaluation keyword is found, 0 otherwise. + + Example: + >>> get_reward("That is correct!") + 1 + >>> get_reward("Needs improvement.") + 0 + """ + words = [ + "correct", + "good", + "excellent", + "perfect", + ] + + if any(word in input.lower() for word in words): + return 1 + else: + return 0 + +def get_agent_judge_prompt() -> str: """ + Returns the main system prompt for the agent judge. - pass + Returns: + str: The system prompt for the agent judge + """ + return """# Adaptive Output Evaluator - Role and Protocol +Your role is to critically evaluate outputs across diverse domains by first understanding the context, then applying domain-appropriate evaluation criteria to provide a well-reasoned assessment. -class AgentJudgeExecutionError(Exception): +## Core Responsibilities + +1. **Context Assessment** + - Begin by identifying the domain and specific context of the evaluation (technical, creative, analytical, etc.) + - Determine the appropriate evaluation framework based on domain requirements + - Adjust evaluation criteria and standards to match domain-specific best practices + - If domain is unclear, request clarification with: DOMAIN CLARIFICATION NEEDED: *specific_question* + +2. **Input Validation** + - Ensure all necessary information is present for a comprehensive evaluation + - Identify gaps in provided materials that would impact assessment quality + - Request additional context when needed with: ADDITIONAL CONTEXT NEEDED: *specific_information* + - Consider implicit domain knowledge that may influence proper evaluation + +3. **Evidence-Based Analysis** + - Apply domain-specific criteria to evaluate accuracy, effectiveness, and appropriateness + - Distinguish between factual claims, reasoned arguments, and subjective opinions + - Flag assumptions or claims lacking sufficient support within domain standards + - Evaluate internal consistency and alignment with established principles in the field + - For technical domains, verify logical and methodological soundness + +4. **Comparative Assessment** + - When multiple solutions or approaches are presented, compare relative strengths + - Identify trade-offs between different approaches within domain constraints + - Consider alternative interpretations or solutions not explicitly mentioned + - Balance competing priorities based on domain-specific values and standards + +5. **Final Assessment Declaration** + - Present your final assessment with: **EVALUATION_COMPLETE \\boxed{_assessment_summary_}** + - Follow with a concise justification referencing domain-specific standards + - Include constructive feedback for improvement where appropriate + - When appropriate, suggest alternative approaches that align with domain best practices""" + + +def get_task_evaluation_prompt(outputs: str) -> str: """ - Exception raised when there is an error executing the AgentJudge. + Returns the task instruction prompt for evaluation. + + Args: + outputs (str): The outputs to be evaluated + + Returns: + str: The formatted task evaluation prompt + """ + return f"""You are an expert AI agent judge. Carefully review the following output(s) generated by another agent. Your job is to provide a detailed, constructive, and actionable critique that will help the agent improve its future performance. Your feedback should address the following points: + +1. Strengths: What did the agent do well? Highlight any correct reasoning, clarity, or effective problem-solving. +2. Weaknesses: Identify any errors, omissions, unclear reasoning, or areas where the output could be improved. +3. Suggestions: Offer specific, practical recommendations for how the agent can improve its next attempt. This may include advice on reasoning, structure, completeness, or style. +4. If relevant, point out any factual inaccuracies or logical inconsistencies. + +Be thorough, objective, and professional. Your goal is to help the agent learn and produce better results in the future. + +Output(s) to evaluate: +{outputs}""" + + +# ============================================================================= +# EXCEPTION CLASSES +# ============================================================================= + + +class AgentJudgeInitializationError(Exception): + """ + Exception raised when there is an error initializing the AgentJudge. """ pass -class AgentJudgeFeedbackCycleError(Exception): +class AgentJudgeExecutionError(Exception): """ - Exception raised when there is an error in the feedback cycle. + Exception raised when there is an error executing the AgentJudge. """ pass @@ -84,9 +181,9 @@ class AgentJudge: ``` Methods: - step(task: str = None, tasks: List[str] = None, img: str = None) -> str: - Processes a single task or list of tasks and returns the agent's evaluation. - run(task: str = None, tasks: List[str] = None, img: str = None) -> List[str]: + step(task: str = None, img: str = None) -> str: + Processes a single task and returns the agent's evaluation. + run(task: str = None, img: str = None) -> List[str]: Executes evaluation in a loop with context building, collecting responses. run_batched(tasks: List[str] = None, imgs: List[str] = None) -> List[str]: @@ -98,11 +195,12 @@ class AgentJudge: id: str = str(uuid.uuid4()), agent_name: str = "Agent Judge", description: str = "You're an expert AI agent judge. Carefully review the following output(s) generated by another agent. Your job is to provide a detailed, constructive, and actionable critique that will help the agent improve its future performance.", - system_prompt: str = AGENT_JUDGE_PROMPT, + system_prompt: str = None, model_name: str = "openai/o1", max_loops: int = 1, verbose: bool = False, evaluation_criteria: Optional[Dict[str, float]] = None, + return_score: bool = False, *args, **kwargs, ): @@ -113,98 +211,59 @@ class AgentJudge: self.conversation = Conversation(time_enabled=False) self.max_loops = max_loops self.verbose = verbose - + self.return_score = return_score self.evaluation_criteria = evaluation_criteria or {} - # Enhance system prompt with evaluation criteria if provided - enhanced_prompt = system_prompt - if self.evaluation_criteria: - criteria_str = "\n\nEvaluation Criteria:\n" - for criterion, weight in self.evaluation_criteria.items(): - criteria_str += f"- {criterion}: weight = {weight}\n" - enhanced_prompt += criteria_str - self.agent = Agent( agent_name=agent_name, agent_description=description, - system_prompt=enhanced_prompt, + system_prompt=self.enhanced_prompt(), model_name=model_name, max_loops=1, *args, **kwargs, ) - def feedback_cycle_step( - self, - agent: Union[Agent, callable], - task: str, - img: Optional[str] = None, - ): - try: - # First run the main agent - agent_output = agent.run(task=task, img=img) - - # Then run the judge agent - judge_output = self.run(task=agent_output, img=img) - - # Run the main agent again with the judge's feedback, using a much improved prompt - improved_prompt = ( - f"You have received the following detailed feedback from the expert agent judge ({self.agent_name}):\n\n" - f"--- FEEDBACK START ---\n{judge_output}\n--- FEEDBACK END ---\n\n" - f"Your task is to thoughtfully revise and enhance your previous output based on this critique. " - f"Carefully address all identified weaknesses, incorporate the suggestions, and strive to maximize the strengths noted. " - f"Be specific, accurate, and actionable in your improvements. " - f"Here is the original task for reference:\n\n" - f"--- TASK ---\n{task}\n--- END TASK ---\n\n" - f"Please provide your improved and fully revised output below." - ) + self.reliability_check() - return agent.run(task=improved_prompt, img=img) - except Exception as e: - raise AgentJudgeFeedbackCycleError( - f"Error In Agent Judge Feedback Cycle: {e} Traceback: {traceback.format_exc()}" + def reliability_check(self): + if self.max_loops == 0 or self.max_loops is None: + raise ValueError( + f"AgentJudge: {self.agent_name} max_loops must be greater than 0" ) - def feedback_cycle( - self, - agent: Union[Agent, callable], - task: str, - img: Optional[str] = None, - loops: int = 1, - ): - loop = 0 - original_task = task # Preserve the original task - current_output = None # Track the current output - all_outputs = [] # Collect all outputs from each iteration - - while loop < loops: - # First iteration: run the standard feedback cycle step - current_output = self.feedback_cycle_step( - agent, original_task, img + if self.model_name is None: + raise ValueError( + f"AgentJudge: {self.agent_name} model_name must be provided" ) - # Add the current output to our collection - all_outputs.append(current_output) - loop += 1 + def enhanced_prompt(self): + # Enhance system prompt with evaluation criteria if provided + enhanced_prompt = ( + self.system_prompt or get_agent_judge_prompt() + ) + if self.evaluation_criteria: + criteria_str = "\n\nEvaluation Criteria:\n" + for criterion, weight in self.evaluation_criteria.items(): + criteria_str += f"- {criterion}: weight = {weight}\n" + enhanced_prompt += criteria_str - return all_outputs + return enhanced_prompt def step( self, task: str = None, - tasks: Optional[List[str]] = None, img: Optional[str] = None, ) -> str: """ - Processes a single task or list of tasks and returns the agent's evaluation. + Processes a single task and returns the agent's evaluation. This method performs a one-shot evaluation of the provided content. It takes - either a single task string or a list of tasks and generates a comprehensive - evaluation with strengths, weaknesses, and improvement suggestions. + a single task string (response from another LLM or agent) and generates a + comprehensive evaluation with strengths, weaknesses, and improvement suggestions. Args: - task (str, optional): A single task/output to be evaluated. - tasks (List[str], optional): A list of tasks/outputs to be evaluated. + task (str, optional): The response from another LLM or agent to be evaluated. img (str, optional): Path to an image file for multimodal evaluation. Returns: @@ -215,62 +274,38 @@ class AgentJudge: - Factual accuracy assessment Raises: - ValueError: If neither task nor tasks are provided. + ValueError: If no task is provided. Example: ```python # Single task evaluation evaluation = judge.step(task="The answer is 42.") - - # Multiple tasks evaluation - evaluation = judge.step(tasks=[ - "Response 1: Paris is the capital of France", - "Response 2: 2 + 2 = 5" # Incorrect - ]) - # Multimodal evaluation evaluation = judge.step( - task="Describe this image", + task="The agent described this image as a cat", img="path/to/image.jpg" ) ``` """ try: - prompt = "" - if tasks: - prompt = any_to_str(tasks) - elif task: - prompt = task - else: - raise ValueError("No tasks or task provided") - # 添加评估标准到任务描述中 - task_instruction = "You are an expert AI agent judge. Carefully review the following output(s) generated by another agent. " - task_instruction += "Your job is to provide a detailed, constructive, and actionable critique that will help the agent improve its future performance. " - task_instruction += ( - "Your feedback should address the following points:\n" + # Use the predefined task evaluation prompt + task_instruction = get_task_evaluation_prompt( + outputs=task ) - task_instruction += "1. Strengths: What did the agent do well? Highlight any correct reasoning, clarity, or effective problem-solving.\n" - task_instruction += "2. Weaknesses: Identify any errors, omissions, unclear reasoning, or areas where the output could be improved.\n" - task_instruction += "3. Suggestions: Offer specific, practical recommendations for how the agent can improve its next attempt. " - task_instruction += "This may include advice on reasoning, structure, completeness, or style.\n" - task_instruction += "4. If relevant, point out any factual inaccuracies or logical inconsistencies.\n" - # 在任务说明中添加评估标准 + # Add evaluation criteria if provided if self.evaluation_criteria: - list(self.evaluation_criteria.keys()) - task_instruction += "\nPlease use these specific evaluation criteria with their respective weights:\n" + criteria_str = "\n\nPlease use these specific evaluation criteria with their respective weights:\n" for ( criterion, weight, ) in self.evaluation_criteria.items(): - task_instruction += ( + criteria_str += ( f"- {criterion}: weight = {weight}\n" ) - - task_instruction += "Be thorough, objective, and professional. Your goal is to help the agent learn and produce better results in the future.\n\n" - task_instruction += f"Output(s) to evaluate:\n{prompt}\n" + task_instruction += criteria_str response = self.agent.run( task=task_instruction, @@ -279,166 +314,84 @@ class AgentJudge: return response except Exception as e: - error_message = ( - f"AgentJudge encountered an error: {e}\n" - f"Traceback:\n{traceback.format_exc()}\n\n" - "If this issue persists, please:\n" - "- Open a GitHub issue: https://github.com/swarms-ai/swarms/issues\n" - "- Join our Discord for real-time support: swarms.ai\n" - "- Or book a call: https://cal.com/swarms\n" - ) + error_message = f"AgentJudge: {self.agent_name} encountered an error: {e}\n Traceback: {traceback.format_exc()}" raise AgentJudgeExecutionError(error_message) def run( self, task: str = None, - tasks: Optional[List[str]] = None, img: Optional[str] = None, ): """ - Executes evaluation in multiple iterations with context building and refinement. + Executes evaluation in a loop with context building, collecting responses. - This method runs the evaluation process for the specified number of max_loops, - where each iteration builds upon the previous context. This allows for iterative - refinement of evaluations and deeper analysis over multiple passes. + This method runs the evaluation multiple times (up to max_loops) to build + context and provide iterative feedback. Each iteration uses the previous + response as context for the next evaluation. Args: - task (str, optional): A single task/output to be evaluated. - tasks (List[str], optional): A list of tasks/outputs to be evaluated. + task (str, optional): The response from another LLM or agent to be evaluated. img (str, optional): Path to an image file for multimodal evaluation. Returns: - List[str]: A list of evaluation responses, one for each iteration. - Each subsequent evaluation includes context from previous iterations. + List[str]: A list of evaluation responses from each iteration. Example: ```python - # Single task with iterative refinement - judge = AgentJudge(max_loops=3) - evaluations = judge.run(task="Agent output to evaluate") - # Returns 3 evaluations, each building on the previous - - # Multiple tasks with context building - evaluations = judge.run(tasks=[ - "First agent response", - "Second agent response" - ]) - - # With image analysis - evaluations = judge.run( - task="Analyze this chart", - img="chart.png" + # Evaluate a response with multiple iterations + responses = judge.run(task="The agent said: Paris is the capital of France") + + # Multimodal evaluation with multiple iterations + responses = judge.run( + task="The agent described this image as a cat", + img="path/to/image.jpg" ) ``` - - Note: - - The first iteration evaluates the original task(s) - - Subsequent iterations include context from previous evaluations - - This enables deeper analysis and refinement of judgments - - Useful for complex evaluations requiring multiple perspectives """ try: - responses = [] - context = "" - - # Convert single task to list for consistent processing - if task and not tasks: - tasks = [task] - task = None # Clear to avoid confusion in step method - + # The agent will run in a loop, remembering and updating the conversation context at each step. + self.conversation.add(role="user", content=task) for _ in range(self.max_loops): - # Add context to the tasks if available - if context and tasks: - contextualized_tasks = [ - f"Previous context: {context}\nTask: {t}" - for t in tasks - ] - else: - contextualized_tasks = tasks - + # Retrieve the full conversation context as a string + context = self.conversation.get_str() + # Build the contextualized task, always including the full conversation so far + contextualized_task = f"{context}\n" # Get response for current iteration current_response = self.step( - task=task, - tasks=contextualized_tasks, + task=contextualized_task, img=img, ) + # Add the agent's response to the conversation history + self.conversation.add( + role=self.agent.agent_name, + content=current_response, + ) + # The context will be updated automatically in the next loop iteration - responses.append(current_response) - - # Update context for next iteration - context = current_response - - return responses - except Exception as e: - error_message = ( - f"AgentJudge encountered an error: {e}\n" - f"Traceback:\n{traceback.format_exc()}\n\n" - "If this issue persists, please:\n" - "- Open a GitHub issue: https://github.com/swarms-ai/swarms/issues\n" - "- Join our Discord for real-time support: swarms.ai\n" - "- Or book a call: https://cal.com/swarms\n" - ) + # After all loops, return either the reward or the full conversation + if self.return_score: + return get_reward(self.conversation.get_str()) + else: + return self.conversation.get_str() + except Exception as e: + error_message = f"AgentJudge: {self.agent_name} encountered an error: {e}\n Traceback: {traceback.format_exc()}" raise AgentJudgeExecutionError(error_message) def run_batched( self, tasks: Optional[List[str]] = None, - imgs: Optional[List[str]] = None, ): """ - Executes batch evaluation of multiple tasks with corresponding images. - - This method processes multiple task-image pairs independently, where each - task can be evaluated with its corresponding image. Unlike the run() method, - this doesn't build context between different tasks - each is evaluated - independently. - + Runs the agent judge on a batch of tasks. Args: - tasks (List[str], optional): A list of tasks/outputs to be evaluated. - imgs (List[str], optional): A list of image paths corresponding to each task. - Must be the same length as tasks if provided. + tasks (Optional[List[str]]): A list of tasks (strings) to be evaluated. Returns: - List[List[str]]: A list of evaluation responses for each task. Each inner - list contains the responses from all iterations (max_loops) - for that particular task. - - - Example: - ```python - # Batch evaluation with images - tasks = [ - "Describe what you see in this image", - "What's wrong with this chart?", - "Analyze the trends shown" - ] - images = [ - "photo1.jpg", - "chart1.png", - "graph1.png" - ] - evaluations = judge.run_batched(tasks=tasks, imgs=images) - # Returns evaluations for each task-image pair - - # Batch evaluation without images - evaluations = judge.run_batched(tasks=[ - "Agent response 1", - "Agent response 2", - "Agent response 3" - ]) - ``` - - - Note: - - Each task is processed independently - - If imgs is provided, it must have the same length as tasks - - Each task goes through max_loops iterations independently - - No context is shared between different tasks in the batch + List[List[str]]: A list where each element is the list of evaluation responses + for the corresponding task. """ - responses = [] - for task, img in zip(tasks, imgs): - response = self.run(task=task, img=img) - responses.append(response) - - return responses + outputs = [] + for task in tasks: + outputs.append(self.run(task=task)) + return outputs diff --git a/swarms/structs/cron_job.py b/swarms/structs/cron_job.py index 6bdd0826..4af41010 100644 --- a/swarms/structs/cron_job.py +++ b/swarms/structs/cron_job.py @@ -46,6 +46,7 @@ class CronJob: job_id: Unique identifier for the job is_running: Flag indicating if the job is currently running thread: Thread object for running the job + callback: Optional callback function to customize output processing """ def __init__( @@ -53,6 +54,7 @@ class CronJob: agent: Optional[Union[Any, Callable]] = None, interval: Optional[str] = None, job_id: Optional[str] = None, + callback: Optional[Callable[[Any, str, dict], Any]] = None, ): """Initialize the CronJob wrapper. @@ -60,6 +62,12 @@ class CronJob: agent: The Swarms Agent instance or callable to be scheduled interval: The interval string (e.g., "5seconds", "10minutes", "1hour") job_id: Optional unique identifier for the job. If not provided, one will be generated. + callback: Optional callback function to customize output processing. + Signature: callback(output: Any, task: str, metadata: dict) -> Any + - output: The original output from the agent + - task: The task that was executed + - metadata: Dictionary containing job_id, timestamp, execution_count, etc. + Returns: The customized output Raises: CronJobConfigError: If the interval format is invalid @@ -70,6 +78,9 @@ class CronJob: self.is_running = False self.thread = None self.schedule = schedule.Scheduler() + self.callback = callback + self.execution_count = 0 + self.start_time = None logger.info(f"Initializing CronJob with ID: {self.job_id}") @@ -242,17 +253,47 @@ class CronJob: (e.g., img=image_path, streaming_callback=callback_func) Returns: - Any: The result of the task execution + Any: The result of the task execution (original or customized by callback) Raises: CronJobExecutionError: If task execution fails """ try: logger.debug(f"Executing task for job {self.job_id}") + + # Execute the agent if isinstance(self.agent, Callable): - return self.agent.run(task=task, **kwargs) + original_output = self.agent.run(task=task, **kwargs) else: - return self.agent(task, **kwargs) + original_output = self.agent(task, **kwargs) + + # Increment execution count + self.execution_count += 1 + + # Prepare metadata for callback + metadata = { + "job_id": self.job_id, + "timestamp": time.time(), + "execution_count": self.execution_count, + "task": task, + "kwargs": kwargs, + "start_time": self.start_time, + "is_running": self.is_running + } + + # Apply callback if provided + if self.callback: + try: + customized_output = self.callback(original_output, task, metadata) + logger.debug(f"Callback applied to job {self.job_id}, execution {self.execution_count}") + return customized_output + except Exception as callback_error: + logger.warning(f"Callback failed for job {self.job_id}: {callback_error}") + # Return original output if callback fails + return original_output + + return original_output + except Exception as e: logger.error( f"Task execution failed for job {self.job_id}: {str(e)}" @@ -300,6 +341,7 @@ class CronJob: try: if not self.is_running: self.is_running = True + self.start_time = time.time() self.thread = threading.Thread( target=self._run_schedule, daemon=True, @@ -362,6 +404,31 @@ class CronJob: f"Schedule loop failed: {str(e)}" ) + def set_callback(self, callback: Callable[[Any, str, dict], Any]): + """Set or update the callback function for output customization. + + Args: + callback: Function to customize output processing. + Signature: callback(output: Any, task: str, metadata: dict) -> Any + """ + self.callback = callback + logger.info(f"Callback updated for job {self.job_id}") + + def get_execution_stats(self) -> dict: + """Get execution statistics for the cron job. + + Returns: + dict: Statistics including execution count, start time, running status, etc. + """ + return { + "job_id": self.job_id, + "is_running": self.is_running, + "execution_count": self.execution_count, + "start_time": self.start_time, + "uptime": time.time() - self.start_time if self.start_time else 0, + "interval": self.interval + } + # # Example usage # if __name__ == "__main__": diff --git a/tests/structs/test_reasoning_agent_router_all.py b/tests/structs/test_reasoning_agent_router_all.py index 8a7d2bee..931f62e7 100644 --- a/tests/structs/test_reasoning_agent_router_all.py +++ b/tests/structs/test_reasoning_agent_router_all.py @@ -2,12 +2,13 @@ - Parameters: description, model_name, system_prompt, max_loops, swarm_type, num_samples, output_types, num_knowledge_items, memory_capacity, eval, random_models_on, majority_voting_prompt, reasoning_model_name - Methods: select_swarm(), run (task: str, img: Optional[List[str]] = None, **kwargs), batched_run (tasks: List[str], imgs: Optional[List[List[str]]] = None, **kwargs) """ + import time from swarms.agents import ReasoningAgentRouter -from swarms.structs.agent import Agent from datetime import datetime + class TestReport: def __init__(self): self.results = [] @@ -42,13 +43,21 @@ class TestReport: report_lines = [] report_lines.append("=" * 60) - report_lines.append("REASONING AGENT ROUTER TEST SUITE REPORT") + report_lines.append( + "REASONING AGENT ROUTER TEST SUITE REPORT" + ) report_lines.append("=" * 60) if self.start_time: - report_lines.append(f"Test Run Started: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") + report_lines.append( + f"Test Run Started: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}" + ) if self.end_time: - report_lines.append(f"Test Run Ended: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}") - report_lines.append(f"Duration: {duration:.2f} seconds") + report_lines.append( + f"Test Run Ended: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}" + ) + report_lines.append( + f"Duration: {duration:.2f} seconds" + ) report_lines.append(f"Total Tests: {total_tests}") report_lines.append(f"Passed: {passed_tests}") report_lines.append(f"Failed: {failed_tests}") @@ -65,9 +74,13 @@ class TestReport: return "\n".join(report_lines) # INSERT_YOUR_CODE + + # Default parameters for ReasoningAgentRouter, can be overridden in each test DEFAULT_AGENT_NAME = "reasoning-agent" -DEFAULT_DESCRIPTION = "A reasoning agent that can answer questions and help with tasks." +DEFAULT_DESCRIPTION = ( + "A reasoning agent that can answer questions and help with tasks." +) DEFAULT_MODEL_NAME = "gpt-4o-mini" DEFAULT_SYSTEM_PROMPT = "You are a helpful assistant that can answer questions and help with tasks." DEFAULT_MAX_LOOPS = 1 @@ -77,6 +90,7 @@ DEFAULT_EVAL = False DEFAULT_RANDOM_MODELS_ON = False DEFAULT_MAJORITY_VOTING_PROMPT = None + def test_agents_swarm( agent_name=DEFAULT_AGENT_NAME, description=DEFAULT_DESCRIPTION, @@ -112,19 +126,39 @@ def test_agents_swarm( PARAMETERS TESTING """ + def test_router_description(report): """Test ReasoningAgentRouter with custom description (only change description param)""" start_time = time.time() try: - result = test_agents_swarm(description="Test description for router") + result = test_agents_swarm( + description="Test description for router" + ) # Check if the description was set correctly - router = ReasoningAgentRouter(description="Test description for router") + router = ReasoningAgentRouter( + description="Test description for router" + ) if router.description == "Test description for router": - report.add_result("Parameter: description", True, duration=time.time() - start_time) + report.add_result( + "Parameter: description", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: description", False, message=f"Expected description 'Test description for router', got '{router.description}'", duration=time.time() - start_time) + report.add_result( + "Parameter: description", + False, + message=f"Expected description 'Test description for router', got '{router.description}'", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: description", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: description", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_model_name(report): """Test ReasoningAgentRouter with custom model_name (only change model_name param)""" @@ -133,24 +167,58 @@ def test_router_model_name(report): result = test_agents_swarm(model_name="gpt-4") router = ReasoningAgentRouter(model_name="gpt-4") if router.model_name == "gpt-4": - report.add_result("Parameter: model_name", True, duration=time.time() - start_time) + report.add_result( + "Parameter: model_name", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: model_name", False, message=f"Expected model_name 'gpt-4', got '{router.model_name}'", duration=time.time() - start_time) + report.add_result( + "Parameter: model_name", + False, + message=f"Expected model_name 'gpt-4', got '{router.model_name}'", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: model_name", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: model_name", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_system_prompt(report): """Test ReasoningAgentRouter with custom system_prompt (only change system_prompt param)""" start_time = time.time() try: - result = test_agents_swarm(system_prompt="You are a test router.") - router = ReasoningAgentRouter(system_prompt="You are a test router.") + result = test_agents_swarm( + system_prompt="You are a test router." + ) + router = ReasoningAgentRouter( + system_prompt="You are a test router." + ) if router.system_prompt == "You are a test router.": - report.add_result("Parameter: system_prompt", True, duration=time.time() - start_time) + report.add_result( + "Parameter: system_prompt", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: system_prompt", False, message=f"Expected system_prompt 'You are a test router.', got '{router.system_prompt}'", duration=time.time() - start_time) + report.add_result( + "Parameter: system_prompt", + False, + message=f"Expected system_prompt 'You are a test router.', got '{router.system_prompt}'", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: system_prompt", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: system_prompt", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_max_loops(report): """Test ReasoningAgentRouter with custom max_loops (only change max_loops param)""" @@ -159,11 +227,26 @@ def test_router_max_loops(report): result = test_agents_swarm(max_loops=5) router = ReasoningAgentRouter(max_loops=5) if router.max_loops == 5: - report.add_result("Parameter: max_loops", True, duration=time.time() - start_time) + report.add_result( + "Parameter: max_loops", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: max_loops", False, message=f"Expected max_loops 5, got {router.max_loops}", duration=time.time() - start_time) + report.add_result( + "Parameter: max_loops", + False, + message=f"Expected max_loops 5, got {router.max_loops}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: max_loops", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: max_loops", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_swarm_type(report): """Test ReasoningAgentRouter with custom swarm_type (only change swarm_type param)""" @@ -172,26 +255,54 @@ def test_router_swarm_type(report): result = test_agents_swarm(swarm_type="reasoning-agent") router = ReasoningAgentRouter(swarm_type="reasoning-agent") if router.swarm_type == "reasoning-agent": - report.add_result("Parameter: swarm_type", True, duration=time.time() - start_time) + report.add_result( + "Parameter: swarm_type", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: swarm_type", False, message=f"Expected swarm_type 'reasoning-agent', got '{router.swarm_type}'", duration=time.time() - start_time) + report.add_result( + "Parameter: swarm_type", + False, + message=f"Expected swarm_type 'reasoning-agent', got '{router.swarm_type}'", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: swarm_type", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: swarm_type", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_num_samples(report): """Test ReasoningAgentRouter with custom num_samples (only change num_samples param)""" start_time = time.time() try: - router = ReasoningAgentRouter( - num_samples=3 - ) + router = ReasoningAgentRouter(num_samples=3) output = router.run("How many samples do you use?") if router.num_samples == 3: - report.add_result("Parameter: num_samples", True, duration=time.time() - start_time) + report.add_result( + "Parameter: num_samples", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: num_samples", False, message=f"Expected num_samples 3, got {router.num_samples}", duration=time.time() - start_time) + report.add_result( + "Parameter: num_samples", + False, + message=f"Expected num_samples 3, got {router.num_samples}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: num_samples", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: num_samples", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_output_types(report): """Test ReasoningAgentRouter with custom output_type (only change output_type param)""" @@ -199,11 +310,26 @@ def test_router_output_types(report): try: router = ReasoningAgentRouter(output_type=["text", "json"]) if getattr(router, "output_type", None) == ["text", "json"]: - report.add_result("Parameter: output_type", True, duration=time.time() - start_time) + report.add_result( + "Parameter: output_type", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: output_type", False, message=f"Expected output_type ['text', 'json'], got {getattr(router, 'output_type', None)}", duration=time.time() - start_time) + report.add_result( + "Parameter: output_type", + False, + message=f"Expected output_type ['text', 'json'], got {getattr(router, 'output_type', None)}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: output_type", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: output_type", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_num_knowledge_items(report): """Test ReasoningAgentRouter with custom num_knowledge_items (only change num_knowledge_items param)""" @@ -211,11 +337,26 @@ def test_router_num_knowledge_items(report): try: router = ReasoningAgentRouter(num_knowledge_items=7) if router.num_knowledge_items == 7: - report.add_result("Parameter: num_knowledge_items", True, duration=time.time() - start_time) + report.add_result( + "Parameter: num_knowledge_items", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: num_knowledge_items", False, message=f"Expected num_knowledge_items 7, got {router.num_knowledge_items}", duration=time.time() - start_time) + report.add_result( + "Parameter: num_knowledge_items", + False, + message=f"Expected num_knowledge_items 7, got {router.num_knowledge_items}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: num_knowledge_items", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: num_knowledge_items", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_memory_capacity(report): """Test ReasoningAgentRouter with custom memory_capacity (only change memory_capacity param)""" @@ -223,11 +364,26 @@ def test_router_memory_capacity(report): try: router = ReasoningAgentRouter(memory_capacity=10) if router.memory_capacity == 10: - report.add_result("Parameter: memory_capacity", True, duration=time.time() - start_time) + report.add_result( + "Parameter: memory_capacity", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: memory_capacity", False, message=f"Expected memory_capacity 10, got {router.memory_capacity}", duration=time.time() - start_time) + report.add_result( + "Parameter: memory_capacity", + False, + message=f"Expected memory_capacity 10, got {router.memory_capacity}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: memory_capacity", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: memory_capacity", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_eval(report): """Test ReasoningAgentRouter with eval enabled (only change eval param)""" @@ -236,11 +392,26 @@ def test_router_eval(report): result = test_agents_swarm(eval=True) router = ReasoningAgentRouter(eval=True) if router.eval is True: - report.add_result("Parameter: eval", True, duration=time.time() - start_time) + report.add_result( + "Parameter: eval", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: eval", False, message=f"Expected eval True, got {router.eval}", duration=time.time() - start_time) + report.add_result( + "Parameter: eval", + False, + message=f"Expected eval True, got {router.eval}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: eval", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: eval", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_random_models_on(report): """Test ReasoningAgentRouter with random_models_on enabled (only change random_models_on param)""" @@ -249,24 +420,61 @@ def test_router_random_models_on(report): result = test_agents_swarm(random_models_on=True) router = ReasoningAgentRouter(random_models_on=True) if router.random_models_on is True: - report.add_result("Parameter: random_models_on", True, duration=time.time() - start_time) + report.add_result( + "Parameter: random_models_on", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: random_models_on", False, message=f"Expected random_models_on True, got {router.random_models_on}", duration=time.time() - start_time) + report.add_result( + "Parameter: random_models_on", + False, + message=f"Expected random_models_on True, got {router.random_models_on}", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: random_models_on", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: random_models_on", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_majority_voting_prompt(report): """Test ReasoningAgentRouter with custom majority_voting_prompt (only change majority_voting_prompt param)""" start_time = time.time() try: - result = test_agents_swarm(majority_voting_prompt="Vote for the best answer.") - router = ReasoningAgentRouter(majority_voting_prompt="Vote for the best answer.") - if router.majority_voting_prompt == "Vote for the best answer.": - report.add_result("Parameter: majority_voting_prompt", True, duration=time.time() - start_time) + result = test_agents_swarm( + majority_voting_prompt="Vote for the best answer." + ) + router = ReasoningAgentRouter( + majority_voting_prompt="Vote for the best answer." + ) + if ( + router.majority_voting_prompt + == "Vote for the best answer." + ): + report.add_result( + "Parameter: majority_voting_prompt", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: majority_voting_prompt", False, message=f"Expected majority_voting_prompt 'Vote for the best answer.', got '{router.majority_voting_prompt}'", duration=time.time() - start_time) + report.add_result( + "Parameter: majority_voting_prompt", + False, + message=f"Expected majority_voting_prompt 'Vote for the best answer.', got '{router.majority_voting_prompt}'", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: majority_voting_prompt", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: majority_voting_prompt", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_reasoning_model_name(report): """Test ReasoningAgentRouter with custom reasoning_model_name (only change reasoning_model_name param)""" @@ -274,17 +482,32 @@ def test_router_reasoning_model_name(report): try: router = ReasoningAgentRouter(reasoning_model_name="gpt-3.5") if router.reasoning_model_name == "gpt-3.5": - report.add_result("Parameter: reasoning_model_name", True, duration=time.time() - start_time) + report.add_result( + "Parameter: reasoning_model_name", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Parameter: reasoning_model_name", False, message=f"Expected reasoning_model_name 'gpt-3.5', got '{router.reasoning_model_name}'", duration=time.time() - start_time) + report.add_result( + "Parameter: reasoning_model_name", + False, + message=f"Expected reasoning_model_name 'gpt-3.5', got '{router.reasoning_model_name}'", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Parameter: reasoning_model_name", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Parameter: reasoning_model_name", + False, + message=str(e), + duration=time.time() - start_time, + ) """ Methods Testing """ + def test_router_select_swarm(report): """Test ReasoningAgentRouter's select_swarm() method using test_agents_swarm""" start_time = time.time() @@ -305,9 +528,19 @@ def test_router_select_swarm(report): # Run the method to test result = router.select_swarm() # Determine if the result is as expected (not raising error is enough for this test) - report.add_result("Method: select_swarm()", True, duration=time.time() - start_time) + report.add_result( + "Method: select_swarm()", + True, + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Method: select_swarm()", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Method: select_swarm()", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_run(report): """Test ReasoningAgentRouter's run() method using test_agents_swarm""" @@ -332,11 +565,26 @@ def test_router_run(report): if not isinstance(output, str): output = str(output) if isinstance(output, str): - report.add_result("Method: run()", True, duration=time.time() - start_time) + report.add_result( + "Method: run()", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Method: run()", False, message="Output is not a string", duration=time.time() - start_time) + report.add_result( + "Method: run()", + False, + message="Output is not a string", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Method: run()", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Method: run()", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_router_batched_run(report): """Test ReasoningAgentRouter's batched_run() method using test_agents_swarm""" @@ -360,17 +608,34 @@ def test_router_batched_run(report): outputs = router.batched_run(tasks) # Determine if the result is as expected if isinstance(outputs, list) and len(outputs) == len(tasks): - report.add_result("Method: batched_run()", True, duration=time.time() - start_time) + report.add_result( + "Method: batched_run()", + True, + duration=time.time() - start_time, + ) else: - report.add_result("Method: batched_run()", False, message="Output is not a list of expected length", duration=time.time() - start_time) + report.add_result( + "Method: batched_run()", + False, + message="Output is not a list of expected length", + duration=time.time() - start_time, + ) except Exception as e: - report.add_result("Method: batched_run()", False, message=str(e), duration=time.time() - start_time) + report.add_result( + "Method: batched_run()", + False, + message=str(e), + duration=time.time() - start_time, + ) + def test_swarm(report): """ Run all ReasoningAgentRouter parameter and method tests, log results to report, and print summary. """ - print("\n=== Starting ReasoningAgentRouter Parameter & Method Test Suite ===") + print( + "\n=== Starting ReasoningAgentRouter Parameter & Method Test Suite ===" + ) start_time = time.time() tests = [ ("Parameter: description", test_router_description), @@ -380,12 +645,21 @@ def test_swarm(report): ("Parameter: swarm_type", test_router_swarm_type), ("Parameter: num_samples", test_router_num_samples), ("Parameter: output_types", test_router_output_types), - ("Parameter: num_knowledge_items", test_router_num_knowledge_items), + ( + "Parameter: num_knowledge_items", + test_router_num_knowledge_items, + ), ("Parameter: memory_capacity", test_router_memory_capacity), ("Parameter: eval", test_router_eval), ("Parameter: random_models_on", test_router_random_models_on), - ("Parameter: majority_voting_prompt", test_router_majority_voting_prompt), - ("Parameter: reasoning_model_name", test_router_reasoning_model_name), + ( + "Parameter: majority_voting_prompt", + test_router_majority_voting_prompt, + ), + ( + "Parameter: reasoning_model_name", + test_router_reasoning_model_name, + ), ("Method: select_swarm()", test_router_select_swarm), ("Method: run()", test_router_run), ("Method: batched_run()", test_router_batched_run), @@ -404,6 +678,7 @@ def test_swarm(report): # INSERT_YOUR_CODE + if __name__ == "__main__": report = TestReport() report.start()