[Improvement][AgentJudge][docs] [FEAT][CronJob][Callbacks for API deployments]

pull/1054/head
Kye Gomez 3 months ago
parent e43e4d8f73
commit 30b94c0d6b

@ -10,15 +10,14 @@ The AgentJudge is designed to evaluate and critique outputs from other AI agents
Key capabilities:
- **Quality Assessment**: Evaluates correctness, clarity, and completeness of agent outputs
- **Structured Feedback**: Provides detailed critiques with strengths, weaknesses, and suggestions
- **Multimodal Support**: Can evaluate text outputs alongside images
- **Context Building**: Maintains evaluation context across multiple iterations
- **Batch Processing**: Efficiently processes multiple evaluations
| Capability | Description |
|------------------------------|-----------------------------------------------------------------------------------------------|
| **Quality Assessment** | Evaluates correctness, clarity, and completeness of agent outputs |
| **Structured Feedback** | Provides detailed critiques with strengths, weaknesses, and suggestions |
| **Multimodal Support** | Can evaluate text outputs alongside images |
| **Context Building** | Maintains evaluation context across multiple iterations |
| **Custom Evaluation Criteria**| Supports weighted evaluation criteria for domain-specific assessments |
| **Batch Processing** | Efficiently processes multiple evaluations |
## Architecture
@ -50,7 +49,6 @@ graph TD
J --> O
J --> P
J --> Q
```
## Class Reference
@ -62,10 +60,12 @@ AgentJudge(
id: str = str(uuid.uuid4()),
agent_name: str = "Agent Judge",
description: str = "You're an expert AI agent judge...",
system_prompt: str = AGENT_JUDGE_PROMPT,
system_prompt: str = None,
model_name: str = "openai/o1",
max_loops: int = 1,
verbose: bool = False,
evaluation_criteria: Optional[Dict[str, float]] = None,
return_score: bool = False,
*args,
**kwargs
)
@ -78,10 +78,12 @@ AgentJudge(
| `id` | `str` | `str(uuid.uuid4())` | Unique identifier for the judge instance |
| `agent_name` | `str` | `"Agent Judge"` | Name of the agent judge |
| `description` | `str` | `"You're an expert AI agent judge..."` | Description of the agent's role |
| `system_prompt` | `str` | `AGENT_JUDGE_PROMPT` | System instructions for evaluation |
| `system_prompt` | `str` | `None` | Custom system instructions (uses default if None) |
| `model_name` | `str` | `"openai/o1"` | LLM model for evaluation |
| `max_loops` | `int` | `1` | Maximum evaluation iterations |
| `verbose` | `bool` | `False` | Enable verbose logging |
| `evaluation_criteria` | `Optional[Dict[str, float]]` | `None` | Dictionary of evaluation criteria and weights |
| `return_score` | `bool` | `False` | Whether to return a numerical score instead of full conversation |
### Methods
@ -90,29 +92,28 @@ AgentJudge(
```python
step(
task: str = None,
tasks: Optional[List[str]] = None,
img: Optional[str] = None
) -> str
```
Processes a single task or list of tasks and returns evaluation.
Processes a single task and returns the agent's evaluation.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task` | `str` | `None` | Single task/output to evaluate |
| `tasks` | `List[str]` | `None` | List of tasks/outputs to evaluate |
| `img` | `str` | `None` | Path to image for multimodal evaluation |
| `img` | `Optional[str]` | `None` | Path to image for multimodal evaluation |
**Returns:** `str` - Detailed evaluation response
**Raises:** `ValueError` - If no task is provided
#### run()
```python
run(
task: str = None,
tasks: Optional[List[str]] = None,
img: Optional[str] = None
) -> List[str]
) -> Union[str, int]
```
Executes evaluation in multiple iterations with context building.
@ -120,86 +121,115 @@ Executes evaluation in multiple iterations with context building.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task` | `str` | `None` | Single task/output to evaluate |
| `tasks` | `List[str]` | `None` | List of tasks/outputs to evaluate |
| `img` | `str` | `None` | Path to image for multimodal evaluation |
| `img` | `Optional[str]` | `None` | Path to image for multimodal evaluation |
**Returns:** `List[str]` - List of evaluation responses from each iteration
**Returns:**
- `str` - Full conversation context if `return_score=False` (default)
- `int` - Numerical reward score if `return_score=True`
#### run_batched()
```python
run_batched(
tasks: Optional[List[str]] = None,
imgs: Optional[List[str]] = None
) -> List[List[str]]
tasks: Optional[List[str]] = None
) -> List[Union[str, int]]
```
Executes batch evaluation of multiple tasks with corresponding images.
Executes batch evaluation of multiple tasks.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `tasks` | `List[str]` | `None` | List of tasks/outputs to evaluate |
| `imgs` | `List[str]` | `None` | List of image paths (same length as tasks) |
| `tasks` | `Optional[List[str]]` | `None` | List of tasks/outputs to evaluate |
**Returns:** `List[List[str]]` - Evaluation responses for each task
**Returns:** `List[Union[str, int]]` - Evaluation responses for each task
## Examples
### Basic Usage
### Basic Evaluation
```python
from swarms import AgentJudge
from swarms.agents.agent_judge import AgentJudge
# Initialize with default settings
judge = AgentJudge()
# Initialize the agent judge
judge = AgentJudge(
agent_name="quality-judge",
model_name="gpt-4",
max_loops=2
)
# Single task evaluation
result = judge.step(task="The capital of France is Paris.")
print(result)
# Example agent output to evaluate
agent_output = "The capital of France is Paris. The city is known for its famous Eiffel Tower and delicious croissants. The population is approximately 2.1 million people."
# Run evaluation with context building
evaluations = judge.run(task=agent_output)
```
### Custom Configuration
### Technical Evaluation with Custom Criteria
```python
from swarms import AgentJudge
from swarms.agents.agent_judge import AgentJudge
# Custom judge configuration
# Initialize the agent judge with custom evaluation criteria
judge = AgentJudge(
agent_name="content-evaluator",
agent_name="technical-judge",
model_name="gpt-4",
max_loops=3,
verbose=True
max_loops=1,
evaluation_criteria={
"accuracy": 0.4,
"completeness": 0.3,
"clarity": 0.2,
"logic": 0.1,
},
)
# Evaluate multiple outputs
outputs = [
"Agent CalculusMaster: The integral of x^2 + 3x + 2 is (1/3)x^3 + (3/2)x^2 + 2x + C",
"Agent DerivativeDynamo: The derivative of sin(x) is cos(x)",
"Agent LimitWizard: The limit of sin(x)/x as x approaches 0 is 1"
]
# Example technical agent output to evaluate
technical_output = "To solve the quadratic equation x² + 5x + 6 = 0, we can use the quadratic formula: x = (-b ± √(b² - 4ac)) / 2a. Here, a=1, b=5, c=6. Substituting: x = (-5 ± √(25 - 24)) / 2 = (-5 ± √1) / 2 = (-5 ± 1) / 2. So x = -2 or x = -3."
evaluation = judge.step(tasks=outputs)
print(evaluation)
# Run evaluation with context building
evaluations = judge.run(task=technical_output)
```
### Iterative Evaluation with Context
### Creative Content Evaluation
```python
from swarms import AgentJudge
from swarms.agents.agent_judge import AgentJudge
# Multiple iterations with context building
judge = AgentJudge(max_loops=3)
# Initialize the agent judge for creative content evaluation
judge = AgentJudge(
agent_name="creative-judge",
model_name="gpt-4",
max_loops=2,
evaluation_criteria={
"creativity": 0.4,
"originality": 0.3,
"engagement": 0.2,
"coherence": 0.1,
},
)
# Each iteration builds on previous context
evaluations = judge.run(task="Agent output: 2+2=5")
for i, eval_result in enumerate(evaluations):
print(f"Iteration {i+1}: {eval_result}\n")
# Example creative agent output to evaluate
creative_output = "The moon hung like a silver coin in the velvet sky, casting shadows that danced with the wind. Ancient trees whispered secrets to the stars, while time itself seemed to pause in reverence of this magical moment. The world held its breath, waiting for the next chapter of the eternal story."
# Run evaluation with context building
evaluations = judge.run(task=creative_output)
```
### Single Task Evaluation
```python
from swarms.agents.agent_judge import AgentJudge
# Initialize with default settings
judge = AgentJudge()
# Single task evaluation
result = judge.step(task="The answer is 42.")
```
### Multimodal Evaluation
```python
from swarms import AgentJudge
from swarms.agents.agent_judge import AgentJudge
judge = AgentJudge()
@ -208,32 +238,42 @@ evaluation = judge.step(
task="Describe what you see in this image",
img="path/to/image.jpg"
)
print(evaluation)
```
### Batch Processing
```python
from swarms import AgentJudge
from swarms.agents.agent_judge import AgentJudge
judge = AgentJudge()
# Batch evaluation with images
# Batch evaluation
tasks = [
"Describe this chart",
"What's the main trend?",
"Any anomalies?"
]
images = [
"chart1.png",
"chart2.png",
"chart3.png"
"The capital of France is Paris.",
"2 + 2 = 4",
"The Earth is flat."
]
# Each task evaluated independently
evaluations = judge.run_batched(tasks=tasks, imgs=images)
for i, task_evals in enumerate(evaluations):
print(f"Task {i+1} evaluations: {task_evals}")
evaluations = judge.run_batched(tasks=tasks)
```
### Scoring Mode
```python
from swarms.agents.agent_judge import AgentJudge
# Initialize with scoring enabled
judge = AgentJudge(
agent_name="scoring-judge",
model_name="gpt-4",
max_loops=2,
return_score=True
)
# Get numerical score instead of full conversation
score = judge.run(task="This is a correct and well-explained answer.")
# Returns: 1 (if positive keywords found) or 0
```
## Reference

@ -1,496 +1,415 @@
# CronJob
A wrapper class that turns any callable (including Swarms agents) into a scheduled cron job. This class provides functionality to schedule and run tasks at specified intervals using the schedule library with cron-style scheduling.
A wrapper class that turns any callable (including Swarms agents) into a scheduled cron job using the schedule library with cron-style scheduling.
## Overview
Full Path `from swarms.structs.cron_job`
The CronJob class allows you to:
## Class Definition
- Schedule any callable or Swarms Agent to run at specified intervals
```python
class CronJob:
def __init__(
self,
agent: Optional[Union[Any, Callable]] = None,
interval: Optional[str] = None,
job_id: Optional[str] = None,
callback: Optional[Callable[[Any, str, dict], Any]] = None,
) -> None
```
- Support for seconds, minutes, and hours intervals
## Constructor Parameters
- Run tasks in a separate thread
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `agent` | `Optional[Union[Any, Callable]]` | `None` | The Swarms Agent instance or callable to be scheduled |
| `interval` | `Optional[str]` | `None` | Interval string in format "Xunit" (e.g., "5seconds", "10minutes", "1hour") |
| `job_id` | `Optional[str]` | `None` | Unique identifier for the job. Auto-generated if not provided |
| `callback` | `Optional[Callable[[Any, str, dict], Any]]` | `None` | Function to customize output processing |
- Handle graceful start/stop of scheduled jobs
## Instance Attributes
- Manage multiple concurrent scheduled jobs
| Attribute | Type | Description |
|-----------|------|-------------|
| `agent` | `Union[Any, Callable]` | The scheduled agent or callable |
| `interval` | `str` | The scheduling interval string |
| `job_id` | `str` | Unique job identifier |
| `is_running` | `bool` | Current execution status |
| `thread` | `Optional[threading.Thread]` | Background execution thread |
| `schedule` | `schedule.Scheduler` | Internal scheduler instance |
| `callback` | `Optional[Callable[[Any, str, dict], Any]]` | Output customization function |
| `execution_count` | `int` | Number of executions completed |
| `start_time` | `Optional[float]` | Job start timestamp |
## Architecture
## Methods
```mermaid
graph TD
A[CronJob] --> B[Initialize]
B --> C[Parse Interval]
C --> D[Schedule Task]
D --> E[Run Job]
E --> F[Execute Task]
F --> G{Is Agent?}
G -->|Yes| H[Run Agent]
G -->|No| I[Run Callable]
H --> J[Handle Result]
I --> J
J --> K[Sleep]
K --> E
```
### `run(task: str, **kwargs) -> Any`
## Class Reference
Schedules and starts the cron job execution.
### Constructor
**Parameters:**
- `task` (`str`): Task string to be executed by the agent
- `**kwargs` (`dict`): Additional parameters passed to agent's run method
```python
def __init__(
agent: Optional[Union[Agent, Callable]] = None,
interval: Optional[str] = None,
job_id: Optional[str] = None
)
```
**Returns:**
- `Any`: Result of the cron job execution
| Parameter | Type | Description | Required |
|-----------|------|-------------|-----------|
| agent | Agent or Callable | The Swarms Agent instance or callable to be scheduled | No |
| interval | str | The interval string (e.g., "5seconds", "10minutes", "1hour") | No |
| job_id | str | Unique identifier for the job. If not provided, one will be generated | No |
**Raises:**
- `CronJobConfigError`: If agent or interval is not configured
- `CronJobExecutionError`: If task execution fails
### Methods
### `__call__(task: str, **kwargs) -> Any`
#### run
Callable interface for the CronJob instance.
```python
def run(task: str, **kwargs)
```
**Parameters:**
- `task` (`str`): Task string to be executed
- `**kwargs` (`dict`): Additional parameters passed to agent's run method
| Parameter | Type | Description | Required |
|-----------|------|-------------|-----------|
| task | str | The task string to be executed by the agent | Yes |
| **kwargs | dict | Additional parameters to pass to the agent's run method | No |
**Returns:**
- `Any`: Result of the task execution
#### __call__
### `start() -> None`
```python
def __call__(task: str, **kwargs)
```
Starts the scheduled job in a separate thread.
| Parameter | Type | Description | Required |
|-----------|------|-------------|-----------|
| task | str | The task string to be executed | Yes |
| **kwargs | dict | Additional parameters to pass to the agent's run method | No |
**Raises:**
- `CronJobExecutionError`: If the job fails to start
## Examples
### `stop() -> None`
### Basic Usage with Swarms Agent
Stops the scheduled job gracefully.
```python
from swarms import Agent, CronJob
from loguru import logger
**Raises:**
- `CronJobExecutionError`: If the job fails to stop properly
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent...""",
max_loops=1,
model_name="gpt-4.1",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=True,
print_on=True,
telemetry_enable=False,
)
### `set_callback(callback: Callable[[Any, str, dict], Any]) -> None`
# Create and run a cron job every 10 seconds
logger.info("Starting example cron job")
cron_job = CronJob(agent=agent, interval="10seconds")
cron_job.run(
task="What are the best top 3 etfs for gold coverage?"
)
```
Sets or updates the callback function for output customization.
### Using with a Custom Function
**Parameters:**
- `callback` (`Callable[[Any, str, dict], Any]`): Function to customize output processing
```python
def custom_task(task: str):
print(f"Executing task: {task}")
return "Task completed"
### `get_execution_stats() -> Dict[str, Any]`
# Create a cron job with a custom function
cron_job = CronJob(
agent=custom_task,
interval="5minutes",
job_id="custom_task_job"
)
cron_job.run("Perform analysis")
```
Retrieves execution statistics for the cron job.
**Returns:**
- `Dict[str, Any]`: Dictionary containing:
- `job_id` (`str`): Unique job identifier
- `is_running` (`bool`): Current execution status
- `execution_count` (`int`): Number of executions completed
- `start_time` (`Optional[float]`): Job start timestamp
- `uptime` (`float`): Seconds since job started
- `interval` (`str`): Scheduled execution interval
### Cron Jobs With Multi-Agent Structures
## Callback Function Signature
You can also run Cron Jobs with multi-agent structures like `SequentialWorkflow`, `ConcurrentWorkflow`, `HiearchicalSwarm`, and other methods.
```python
def callback_function(
output: Any, # Original output from the agent
task: str, # Task that was executed
metadata: dict # Job execution metadata
) -> Any: # Customized output (any type)
pass
```
- Just initialize the class as the agent parameter in the `CronJob(agent=swarm)`
### Callback Metadata Dictionary
- Input your arguments into the `.run(task: str)` method
| Key | Type | Description |
|-----|------|-------------|
| `job_id` | `str` | Unique job identifier |
| `timestamp` | `float` | Execution timestamp (Unix time) |
| `execution_count` | `int` | Sequential execution number |
| `task` | `str` | The task string that was executed |
| `kwargs` | `dict` | Additional parameters passed to agent |
| `start_time` | `Optional[float]` | Job start timestamp |
| `is_running` | `bool` | Current job status |
## Interval Format
```python
"""
Cryptocurrency Concurrent Multi-Agent Cron Job Example
The `interval` parameter accepts strings in the format `"Xunit"`:
This example demonstrates how to use ConcurrentWorkflow with CronJob to create
a powerful cryptocurrency tracking system. Each specialized agent analyzes a
specific cryptocurrency concurrently every minute.
| Unit | Examples | Description |
|------|----------|-------------|
| `seconds` | `"5seconds"`, `"30seconds"` | Execute every X seconds |
| `minutes` | `"1minute"`, `"15minutes"` | Execute every X minutes |
| `hours` | `"1hour"`, `"6hours"` | Execute every X hours |
Features:
- ConcurrentWorkflow for parallel agent execution
- CronJob scheduling for automated runs every 1 minute
- Each agent specializes in analyzing one specific cryptocurrency
- Real-time data fetching from CoinGecko API
- Concurrent analysis of multiple cryptocurrencies
- Structured output with professional formatting
## Exceptions
Architecture:
CronJob -> ConcurrentWorkflow -> [Bitcoin Agent, Ethereum Agent, Solana Agent, etc.] -> Parallel Analysis
"""
### `CronJobError`
Base exception class for all CronJob errors.
from typing import List
from loguru import logger
### `CronJobConfigError`
Raised for configuration errors (invalid agent, interval format, etc.).
from swarms import Agent, CronJob, ConcurrentWorkflow
from swarms_tools import coin_gecko_coin_api
### `CronJobScheduleError`
Raised for scheduling-related errors.
### `CronJobExecutionError`
Raised for execution-related errors (start/stop failures, task execution failures).
def create_crypto_specific_agents() -> List[Agent]:
"""
Creates agents that each specialize in analyzing a specific cryptocurrency.
## Type Definitions
Returns:
List[Agent]: List of cryptocurrency-specific Agent instances
"""
```python
from typing import Any, Callable, Dict, Optional, Union
# Bitcoin Specialist Agent
bitcoin_agent = Agent(
agent_name="Bitcoin-Analyst",
agent_description="Expert analyst specializing exclusively in Bitcoin (BTC) analysis and market dynamics",
system_prompt="""You are a Bitcoin specialist and expert analyst. Your expertise includes:
BITCOIN SPECIALIZATION:
- Bitcoin's unique position as digital gold
- Bitcoin halving cycles and their market impact
- Bitcoin mining economics and hash rate analysis
- Lightning Network and Layer 2 developments
- Bitcoin adoption by institutions and countries
- Bitcoin's correlation with traditional markets
- Bitcoin technical analysis and on-chain metrics
- Bitcoin's role as a store of value and hedge against inflation
ANALYSIS FOCUS:
- Analyze ONLY Bitcoin data from the provided dataset
- Focus on Bitcoin-specific metrics and trends
- Consider Bitcoin's unique market dynamics
- Evaluate Bitcoin's dominance and market leadership
- Assess institutional adoption trends
- Monitor on-chain activity and network health
DELIVERABLES:
- Bitcoin-specific analysis and insights
- Price action assessment and predictions
- Market dominance analysis
- Institutional adoption impact
- Technical and fundamental outlook
- Risk factors specific to Bitcoin
Extract Bitcoin data from the provided dataset and provide comprehensive Bitcoin-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Agent type can be any callable or object with run method
AgentType = Union[Any, Callable]
# Ethereum Specialist Agent
ethereum_agent = Agent(
agent_name="Ethereum-Analyst",
agent_description="Expert analyst specializing exclusively in Ethereum (ETH) analysis and ecosystem development",
system_prompt="""You are an Ethereum specialist and expert analyst. Your expertise includes:
ETHEREUM SPECIALIZATION:
- Ethereum's smart contract platform and DeFi ecosystem
- Ethereum 2.0 transition and proof-of-stake mechanics
- Gas fees, network usage, and scalability solutions
- Layer 2 solutions (Arbitrum, Optimism, Polygon)
- DeFi protocols and TVL (Total Value Locked) analysis
- NFT markets and Ethereum's role in digital assets
- Developer activity and ecosystem growth
- EIP proposals and network upgrades
ANALYSIS FOCUS:
- Analyze ONLY Ethereum data from the provided dataset
- Focus on Ethereum's platform utility and network effects
- Evaluate DeFi ecosystem health and growth
- Assess Layer 2 adoption and scalability solutions
- Monitor network usage and gas fee trends
- Consider Ethereum's competitive position vs other smart contract platforms
DELIVERABLES:
- Ethereum-specific analysis and insights
- Platform utility and adoption metrics
- DeFi ecosystem impact assessment
- Network health and scalability evaluation
- Competitive positioning analysis
- Technical and fundamental outlook for ETH
Extract Ethereum data from the provided dataset and provide comprehensive Ethereum-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Callback function signature
CallbackType = Callable[[Any, str, Dict[str, Any]], Any]
# Solana Specialist Agent
solana_agent = Agent(
agent_name="Solana-Analyst",
agent_description="Expert analyst specializing exclusively in Solana (SOL) analysis and ecosystem development",
system_prompt="""You are a Solana specialist and expert analyst. Your expertise includes:
SOLANA SPECIALIZATION:
- Solana's high-performance blockchain architecture
- Proof-of-History consensus mechanism
- Solana's DeFi ecosystem and DEX platforms (Serum, Raydium)
- NFT marketplaces and creator economy on Solana
- Network outages and reliability concerns
- Developer ecosystem and Rust programming adoption
- Validator economics and network decentralization
- Cross-chain bridges and interoperability
ANALYSIS FOCUS:
- Analyze ONLY Solana data from the provided dataset
- Focus on Solana's performance and scalability advantages
- Evaluate network stability and uptime improvements
- Assess ecosystem growth and developer adoption
- Monitor DeFi and NFT activity on Solana
- Consider Solana's competitive position vs Ethereum
DELIVERABLES:
- Solana-specific analysis and insights
- Network performance and reliability assessment
- Ecosystem growth and adoption metrics
- DeFi and NFT market analysis
- Competitive advantages and challenges
- Technical and fundamental outlook for SOL
Extract Solana data from the provided dataset and provide comprehensive Solana-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
# Execution statistics return type
StatsType = Dict[str, Any]
```
# Cardano Specialist Agent
cardano_agent = Agent(
agent_name="Cardano-Analyst",
agent_description="Expert analyst specializing exclusively in Cardano (ADA) analysis and research-driven development",
system_prompt="""You are a Cardano specialist and expert analyst. Your expertise includes:
CARDANO SPECIALIZATION:
- Cardano's research-driven development approach
- Ouroboros proof-of-stake consensus protocol
- Smart contract capabilities via Plutus and Marlowe
- Cardano's three-layer architecture (settlement, computation, control)
- Academic partnerships and peer-reviewed research
- Cardano ecosystem projects and DApp development
- Native tokens and Cardano's UTXO model
- Sustainability and treasury funding mechanisms
ANALYSIS FOCUS:
- Analyze ONLY Cardano data from the provided dataset
- Focus on Cardano's methodical development approach
- Evaluate smart contract adoption and ecosystem growth
- Assess academic partnerships and research contributions
- Monitor native token ecosystem development
- Consider Cardano's long-term roadmap and milestones
DELIVERABLES:
- Cardano-specific analysis and insights
- Development progress and milestone achievements
- Smart contract ecosystem evaluation
- Academic research impact assessment
- Native token and DApp adoption metrics
- Technical and fundamental outlook for ADA
Extract Cardano data from the provided dataset and provide comprehensive Cardano-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
## Quick Start Examples
# Binance Coin Specialist Agent
bnb_agent = Agent(
agent_name="BNB-Analyst",
agent_description="Expert analyst specializing exclusively in BNB analysis and Binance ecosystem dynamics",
system_prompt="""You are a BNB specialist and expert analyst. Your expertise includes:
BNB SPECIALIZATION:
- BNB's utility within the Binance ecosystem
- Binance Smart Chain (BSC) development and adoption
- BNB token burns and deflationary mechanics
- Binance exchange volume and market leadership
- BSC DeFi ecosystem and yield farming
- Cross-chain bridges and multi-chain strategies
- Regulatory challenges facing Binance globally
- BNB's role in transaction fee discounts and platform benefits
ANALYSIS FOCUS:
- Analyze ONLY BNB data from the provided dataset
- Focus on BNB's utility value and exchange benefits
- Evaluate BSC ecosystem growth and competition with Ethereum
- Assess token burn impact on supply and price
- Monitor Binance platform developments and regulations
- Consider BNB's centralized vs decentralized aspects
DELIVERABLES:
- BNB-specific analysis and insights
- Utility value and ecosystem benefits assessment
- BSC adoption and DeFi growth evaluation
- Token economics and burn mechanism impact
- Regulatory risk and compliance analysis
- Technical and fundamental outlook for BNB
Extract BNB data from the provided dataset and provide comprehensive BNB-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
### Basic Usage
# XRP Specialist Agent
xrp_agent = Agent(
agent_name="XRP-Analyst",
agent_description="Expert analyst specializing exclusively in XRP analysis and cross-border payment solutions",
system_prompt="""You are an XRP specialist and expert analyst. Your expertise includes:
XRP SPECIALIZATION:
- XRP's role in cross-border payments and remittances
- RippleNet adoption by financial institutions
- Central Bank Digital Currency (CBDC) partnerships
- Regulatory landscape and SEC lawsuit implications
- XRP Ledger's consensus mechanism and energy efficiency
- On-Demand Liquidity (ODL) usage and growth
- Competition with SWIFT and traditional payment rails
- Ripple's partnerships with banks and payment providers
ANALYSIS FOCUS:
- Analyze ONLY XRP data from the provided dataset
- Focus on XRP's utility in payments and remittances
- Evaluate RippleNet adoption and institutional partnerships
- Assess regulatory developments and legal clarity
- Monitor ODL usage and transaction volumes
- Consider XRP's competitive position in payments
DELIVERABLES:
- XRP-specific analysis and insights
- Payment utility and adoption assessment
- Regulatory landscape and legal developments
- Institutional partnership impact evaluation
- Cross-border payment market analysis
- Technical and fundamental outlook for XRP
Extract XRP data from the provided dataset and provide comprehensive XRP-focused analysis.""",
model_name="groq/moonshotai/kimi-k2-instruct",
max_loops=1,
dynamic_temperature_enabled=True,
streaming_on=False,
tools=[coin_gecko_coin_api],
)
```python
from swarms import Agent, CronJob
# Simple agent cron job
agent = Agent(agent_name="MyAgent", ...)
cron_job = CronJob(agent=agent, interval="30seconds")
cron_job.run("Analyze market trends")
```
### With Custom Function
return [
bitcoin_agent,
ethereum_agent,
solana_agent,
cardano_agent,
bnb_agent,
xrp_agent,
]
```python
def my_task(task: str) -> str:
return f"Completed: {task}"
cron_job = CronJob(agent=my_task, interval="1minute")
cron_job.run("Process data")
```
def create_crypto_workflow() -> ConcurrentWorkflow:
"""
Creates a ConcurrentWorkflow with cryptocurrency-specific analysis agents.
### With Callback
Returns:
ConcurrentWorkflow: Configured workflow for crypto analysis
"""
agents = create_crypto_specific_agents()
```python
def callback(output, task, metadata):
return {"result": output, "count": metadata["execution_count"]}
workflow = ConcurrentWorkflow(
name="Crypto-Specific-Analysis-Workflow",
description="Concurrent execution of cryptocurrency-specific analysis agents",
agents=agents,
max_loops=1,
cron_job = CronJob(
agent=agent,
interval="30seconds",
callback=callback
)
```
return workflow
## Full Examples
def create_crypto_cron_job() -> CronJob:
"""
Creates a CronJob that runs cryptocurrency-specific analysis every minute using ConcurrentWorkflow.
### Complete Agent with Callback
Returns:
CronJob: Configured cron job for automated crypto analysis
"""
# Create the concurrent workflow
workflow = create_crypto_workflow()
```python
from swarms import Agent, CronJob
from datetime import datetime
import json
# Create the cron job
cron_job = CronJob(
agent=workflow, # Use the workflow as the agent
interval="5seconds", # Run every 1 minute
# Create agent
agent = Agent(
agent_name="Financial-Analyst",
system_prompt="You are a financial analyst. Analyze market data and provide insights.",
model_name="gpt-4o-mini",
max_loops=1
)
return cron_job
# Advanced callback with monitoring
class AdvancedCallback:
def __init__(self):
self.history = []
self.error_count = 0
def __call__(self, output, task, metadata):
# Track execution
execution_data = {
"output": output,
"execution_id": metadata["execution_count"],
"timestamp": datetime.fromtimestamp(metadata["timestamp"]).isoformat(),
"task": task,
"job_id": metadata["job_id"],
"success": bool(output and "error" not in str(output).lower())
}
if not execution_data["success"]:
self.error_count += 1
self.history.append(execution_data)
# Keep only last 100 executions
if len(self.history) > 100:
self.history.pop(0)
return execution_data
def get_stats(self):
return {
"total_executions": len(self.history),
"error_count": self.error_count,
"success_rate": (len(self.history) - self.error_count) / len(self.history) if self.history else 0
}
# Use advanced callback
callback = AdvancedCallback()
cron_job = CronJob(
agent=agent,
interval="2minutes",
job_id="financial_analysis_job",
callback=callback
)
def main():
"""
Main function to run the cryptocurrency-specific concurrent analysis cron job.
"""
cron_job = create_crypto_cron_job()
# Run the cron job
try:
cron_job.run("Analyze current market conditions and provide investment recommendations")
except KeyboardInterrupt:
cron_job.stop()
print("Final stats:", json.dumps(callback.get_stats(), indent=2))
```
prompt = """
### Multi-Agent Workflow with CronJob
Conduct a comprehensive analysis of your assigned cryptocurrency.
```python
from swarms import Agent, CronJob, ConcurrentWorkflow
import json
"""
# Create specialized agents
bitcoin_agent = Agent(
agent_name="Bitcoin-Analyst",
system_prompt="You are a Bitcoin specialist. Focus only on Bitcoin analysis.",
model_name="gpt-4o-mini",
max_loops=1
)
# Start the cron job
logger.info("🔄 Starting automated analysis loop...")
logger.info("⏰ Press Ctrl+C to stop the cron job")
ethereum_agent = Agent(
agent_name="Ethereum-Analyst",
system_prompt="You are an Ethereum specialist. Focus only on Ethereum analysis.",
model_name="gpt-4o-mini",
max_loops=1
)
output = cron_job.run(task=prompt)
print(output)
# Create concurrent workflow
workflow = ConcurrentWorkflow(
name="Crypto-Analysis-Workflow",
agents=[bitcoin_agent, ethereum_agent],
max_loops=1
)
# Workflow callback
def workflow_callback(output, task, metadata):
"""Process multi-agent workflow output."""
return {
"workflow_results": output,
"execution_id": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"agents_count": len(workflow.agents),
"task": task,
"metadata": {
"job_id": metadata["job_id"],
"uptime": metadata.get("uptime", 0)
}
}
# Create workflow cron job
workflow_cron = CronJob(
agent=workflow,
interval="5minutes",
job_id="crypto_workflow_job",
callback=workflow_callback
)
if __name__ == "__main__":
main()
# Run workflow cron job
workflow_cron.run("Analyze your assigned cryptocurrency and provide market insights")
```
## Conclusion
### API Integration Example
The CronJob class provides a powerful way to schedule and automate tasks using Swarms Agents or custom functions. Key benefits include:
```python
import requests
from swarms import Agent, CronJob
import json
# Create agent
agent = Agent(
agent_name="News-Analyst",
system_prompt="Analyze news and provide summaries.",
model_name="gpt-4o-mini",
max_loops=1
)
- Easy integration with Swarms Agents
# API webhook callback
def api_callback(output, task, metadata):
"""Send results to external API."""
payload = {
"data": output,
"source": "swarms_cronjob",
"job_id": metadata["job_id"],
"execution_id": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"task": task
}
try:
# Send to webhook (replace with your URL)
response = requests.post(
"https://api.example.com/webhook",
json=payload,
timeout=30
)
- Flexible interval scheduling
return {
"original_output": output,
"api_status": "sent",
"api_response_code": response.status_code,
"execution_id": metadata["execution_count"]
}
except requests.RequestException as e:
return {
"original_output": output,
"api_status": "failed",
"error": str(e),
"execution_id": metadata["execution_count"]
}
# Database logging callback
def db_callback(output, task, metadata):
"""Log to database (pseudo-code)."""
# db.execute(
# "INSERT INTO cron_results (job_id, output, timestamp) VALUES (?, ?, ?)",
# (metadata["job_id"], output, metadata["timestamp"])
# )
return {
"output": output,
"logged_to_db": True,
"execution_id": metadata["execution_count"]
}
# Create cron job with API integration
api_cron_job = CronJob(
agent=agent,
interval="10minutes",
job_id="news_analysis_api_job",
callback=api_callback
)
- Thread-safe execution
# Dynamic callback switching example
db_cron_job = CronJob(
agent=agent,
interval="1hour",
job_id="news_analysis_db_job"
)
- Graceful error handling
# Start with API callback
db_cron_job.set_callback(api_callback)
- Simple API for task scheduling
# Later switch to database callback
# db_cron_job.set_callback(db_callback)
- Support for both agent and callable-based tasks
# Get execution statistics
stats = db_cron_job.get_execution_stats()
print(f"Job statistics: {json.dumps(stats, indent=2)}")
```

@ -0,0 +1,12 @@
from swarms.agents.agent_judge import AgentJudge
# Initialize the agent judge
judge = AgentJudge(
agent_name="quality-judge", model_name="gpt-4", max_loops=2
)
# Example agent output to evaluate
agent_output = "The capital of France is Paris. The city is known for its famous Eiffel Tower and delicious croissants. The population is approximately 2.1 million people."
# Run evaluation with context building
evaluations = judge.run(task=agent_output)

@ -0,0 +1,21 @@
from swarms.agents.agent_judge import AgentJudge
# Initialize the agent judge with custom evaluation criteria
judge = AgentJudge(
agent_name="technical-judge",
model_name="gpt-4",
max_loops=1,
evaluation_criteria={
"accuracy": 0.4,
"completeness": 0.3,
"clarity": 0.2,
"logic": 0.1,
},
)
# Example technical agent output to evaluate
technical_output = "To solve the quadratic equation x² + 5x + 6 = 0, we can use the quadratic formula: x = (-b ± √(b² - 4ac)) / 2a. Here, a=1, b=5, c=6. Substituting: x = (-5 ± √(25 - 24)) / 2 = (-5 ± √1) / 2 = (-5 ± 1) / 2. So x = -2 or x = -3."
# Run evaluation with context building
evaluations = judge.run(task=technical_output)
print(evaluations)

@ -0,0 +1,20 @@
from swarms.agents.agent_judge import AgentJudge
# Initialize the agent judge for creative content evaluation
judge = AgentJudge(
agent_name="creative-judge",
model_name="gpt-4",
max_loops=2,
evaluation_criteria={
"creativity": 0.4,
"originality": 0.3,
"engagement": 0.2,
"coherence": 0.1,
},
)
# Example creative agent output to evaluate
creative_output = "The moon hung like a silver coin in the velvet sky, casting shadows that danced with the wind. Ancient trees whispered secrets to the stars, while time itself seemed to pause in reverence of this magical moment. The world held its breath, waiting for the next chapter of the eternal story."
# Run evaluation with context building
evaluations = judge.run(task=creative_output)

@ -0,0 +1,313 @@
"""
Callback CronJob Example
This example demonstrates how to use the new callback functionality in CronJob
to customize output processing while the cron job is still running.
"""
import json
import time
from datetime import datetime
from typing import Any, Dict
from loguru import logger
from swarms import Agent, CronJob
def create_sample_agent():
"""Create a sample agent for demonstration."""
return Agent(
agent_name="Sample-Analysis-Agent",
system_prompt="""You are a data analysis agent. Analyze the given data and provide insights.
Keep your responses concise and focused on key findings.""",
model_name="gpt-4o-mini",
max_loops=1,
print_on=False,
)
# Example 1: Simple output transformation callback
def transform_output_callback(output: Any, task: str, metadata: Dict) -> Dict:
"""Transform the agent output into a structured format.
Args:
output: The original output from the agent
task: The task that was executed
metadata: Job metadata including execution count, timestamp, etc.
Returns:
Dict: Transformed output with additional metadata
"""
return {
"original_output": output,
"transformed_at": datetime.fromtimestamp(metadata["timestamp"]).isoformat(),
"execution_number": metadata["execution_count"],
"task_executed": task,
"job_status": "running" if metadata["is_running"] else "stopped",
"uptime_seconds": metadata["uptime"] if metadata["start_time"] else 0
}
# Example 2: Output filtering and enhancement callback
def filter_and_enhance_callback(output: Any, task: str, metadata: Dict) -> Dict:
"""Filter and enhance the output based on execution count and content.
Args:
output: The original output from the agent
task: The task that was executed
metadata: Job metadata
Returns:
Dict: Filtered and enhanced output
"""
# Only include outputs that contain certain keywords
if isinstance(output, str):
if any(keyword in output.lower() for keyword in ["important", "key", "significant", "trend"]):
enhanced_output = {
"content": output,
"priority": "high",
"execution_id": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"analysis_type": "priority_content"
}
else:
enhanced_output = {
"content": output,
"priority": "normal",
"execution_id": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"analysis_type": "standard_content"
}
else:
enhanced_output = {
"content": str(output),
"priority": "unknown",
"execution_id": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"analysis_type": "non_string_content"
}
return enhanced_output
# Example 3: Real-time monitoring callback
class MonitoringCallback:
"""Callback class that provides real-time monitoring capabilities."""
def __init__(self):
self.output_history = []
self.error_count = 0
self.success_count = 0
self.last_execution_time = None
def __call__(self, output: Any, task: str, metadata: Dict) -> Dict:
"""Monitor and track execution metrics.
Args:
output: The original output from the agent
task: The task that was executed
metadata: Job metadata
Returns:
Dict: Output with monitoring information
"""
current_time = time.time()
# Calculate execution time
if self.last_execution_time:
execution_time = current_time - self.last_execution_time
else:
execution_time = 0
self.last_execution_time = current_time
# Track success/error
if output and output != "Error":
self.success_count += 1
status = "success"
else:
self.error_count += 1
status = "error"
# Store in history (keep last 100)
monitoring_data = {
"output": output,
"status": status,
"execution_time": execution_time,
"execution_count": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"task": task,
"metrics": {
"success_rate": self.success_count / (self.success_count + self.error_count),
"total_executions": self.success_count + self.error_count,
"error_count": self.error_count,
"success_count": self.success_count
}
}
self.output_history.append(monitoring_data)
if len(self.output_history) > 100:
self.output_history.pop(0)
return monitoring_data
def get_summary(self) -> Dict:
"""Get monitoring summary."""
return {
"total_executions": self.success_count + self.error_count,
"success_count": self.success_count,
"error_count": self.error_count,
"success_rate": self.success_count / (self.success_count + self.error_count) if (self.success_count + self.error_count) > 0 else 0,
"history_length": len(self.output_history),
"last_execution_time": self.last_execution_time
}
# Example 4: API integration callback
def api_webhook_callback(output: Any, task: str, metadata: Dict) -> Dict:
"""Callback that could send output to an external API.
Args:
output: The original output from the agent
task: The task that was executed
metadata: Job metadata
Returns:
Dict: Output with API integration metadata
"""
# In a real implementation, you would send this to your API
api_payload = {
"data": output,
"source": "cron_job",
"job_id": metadata["job_id"],
"execution_id": metadata["execution_count"],
"timestamp": metadata["timestamp"],
"task": task
}
# Simulate API call (replace with actual HTTP request)
logger.info(f"Would send to API: {json.dumps(api_payload, indent=2)}")
return {
"output": output,
"api_status": "sent",
"api_payload": api_payload,
"execution_id": metadata["execution_count"]
}
def main():
"""Demonstrate different callback usage patterns."""
logger.info("🚀 Starting Callback CronJob Examples")
# Create the agent
agent = create_sample_agent()
# Example 1: Simple transformation callback
logger.info("📝 Example 1: Simple Output Transformation")
transform_cron = CronJob(
agent=agent,
interval="15seconds",
job_id="transform-example",
callback=transform_output_callback
)
# Example 2: Filtering and enhancement callback
logger.info("🔍 Example 2: Output Filtering and Enhancement")
filter_cron = CronJob(
agent=agent,
interval="20seconds",
job_id="filter-example",
callback=filter_and_enhance_callback
)
# Example 3: Monitoring callback
logger.info("📊 Example 3: Real-time Monitoring")
monitoring_callback = MonitoringCallback()
monitoring_cron = CronJob(
agent=agent,
interval="25seconds",
job_id="monitoring-example",
callback=monitoring_callback
)
# Example 4: API integration callback
logger.info("🌐 Example 4: API Integration")
api_cron = CronJob(
agent=agent,
interval="30seconds",
job_id="api-example",
callback=api_webhook_callback
)
# Start all cron jobs
logger.info("▶️ Starting all cron jobs...")
# Start them in separate threads to run concurrently
import threading
def run_cron(cron_job, task):
try:
cron_job.run(task=task)
except KeyboardInterrupt:
cron_job.stop()
# Start each cron job in its own thread
threads = []
tasks = [
"Analyze the current market trends and provide key insights",
"What are the most important factors affecting today's economy?",
"Provide a summary of recent technological developments",
"Analyze the impact of current events on business operations"
]
for i, (cron_job, task) in enumerate([
(transform_cron, tasks[0]),
(filter_cron, tasks[1]),
(monitoring_cron, tasks[2]),
(api_cron, tasks[3])
]):
thread = threading.Thread(
target=run_cron,
args=(cron_job, task),
daemon=True,
name=f"cron-thread-{i}"
)
thread.start()
threads.append(thread)
logger.info("✅ All cron jobs started successfully!")
logger.info("📊 Press Ctrl+C to stop and see monitoring summary")
try:
# Let them run for a while
time.sleep(120) # Run for 2 minutes
# Show monitoring summary
logger.info("📈 Monitoring Summary:")
logger.info(json.dumps(monitoring_callback.get_summary(), indent=2))
# Show execution stats for each cron job
for cron_job, name in [
(transform_cron, "Transform"),
(filter_cron, "Filter"),
(monitoring_cron, "Monitoring"),
(api_cron, "API")
]:
stats = cron_job.get_execution_stats()
logger.info(f"{name} Cron Stats: {json.dumps(stats, indent=2)}")
except KeyboardInterrupt:
logger.info("⏹️ Stopping all cron jobs...")
# Stop all cron jobs
for cron_job in [transform_cron, filter_cron, monitoring_cron, api_cron]:
cron_job.stop()
# Show final monitoring summary
logger.info("📊 Final Monitoring Summary:")
logger.info(json.dumps(monitoring_callback.get_summary(), indent=2))
if __name__ == "__main__":
main()

@ -0,0 +1,81 @@
"""
Simple Callback CronJob Example
This example shows the basic usage of the new callback functionality
in CronJob to customize output while the job is running.
"""
import json
import time
from datetime import datetime
from loguru import logger
from swarms import Agent, CronJob
def create_simple_agent():
"""Create a simple agent for demonstration."""
return Agent(
agent_name="Simple-Analysis-Agent",
system_prompt="You are a simple analysis agent. Provide brief insights on the given topic.",
model_name="gpt-4o-mini",
max_loops=1,
print_on=False,
)
def simple_callback(output, task, metadata):
"""Simple callback that adds metadata to the output.
Args:
output: The original output from the agent
task: The task that was executed
metadata: Job metadata (execution count, timestamp, etc.)
Returns:
dict: Enhanced output with metadata
"""
return {
"agent_output": output,
"execution_number": metadata["execution_count"],
"timestamp": datetime.fromtimestamp(metadata["timestamp"]).isoformat(),
"task": task,
"job_id": metadata["job_id"]
}
def main():
"""Demonstrate basic callback usage."""
logger.info("🚀 Starting Simple Callback Example")
# Create agent and cron job with callback
agent = create_simple_agent()
cron_job = CronJob(
agent=agent,
interval="10seconds",
job_id="simple-callback-example",
callback=simple_callback
)
logger.info("▶️ Starting cron job with callback...")
logger.info("📝 The callback will enhance each output with metadata")
logger.info("⏹️ Press Ctrl+C to stop")
try:
# Start the cron job
cron_job.run(
task="What are the key trends in artificial intelligence today?"
)
except KeyboardInterrupt:
logger.info("⏹️ Stopping cron job...")
cron_job.stop()
# Show execution statistics
stats = cron_job.get_execution_stats()
logger.info("📊 Final Statistics:")
logger.info(json.dumps(stats, indent=2))
if __name__ == "__main__":
main()

@ -19,9 +19,10 @@ from swarms_memory import QdrantDB
# Option 3: Qdrant Cloud (recommended for production)
import os
client = QdrantClient(
url=os.getenv("QDRANT_URL", "https://your-cluster.qdrant.io"),
api_key=os.getenv("QDRANT_API_KEY", "your-api-key")
api_key=os.getenv("QDRANT_API_KEY", "your-api-key"),
)
# Create QdrantDB wrapper for RAG operations
@ -30,7 +31,7 @@ rag_db = QdrantDB(
embedding_model="text-embedding-3-small",
collection_name="knowledge_base",
distance=models.Distance.COSINE,
n_results=3
n_results=3,
)
# Add documents to the knowledge base
@ -38,7 +39,7 @@ documents = [
"Qdrant is a vector database optimized for similarity search and AI applications.",
"RAG combines retrieval and generation for more accurate AI responses.",
"Vector embeddings enable semantic search across documents.",
"The swarms framework supports multiple memory backends including Qdrant."
"The swarms framework supports multiple memory backends including Qdrant.",
]
# Method 1: Add documents individually
@ -89,7 +90,7 @@ agent = Agent(
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
long_term_memory=rag_db
long_term_memory=rag_db,
)
# Query with RAG

@ -1,34 +1,131 @@
import traceback
from typing import List, Optional, Union, Dict
import uuid
from typing import Dict, List, Optional
from swarms.prompts.agent_judge_prompt import AGENT_JUDGE_PROMPT
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.any_to_str import any_to_str
# =============================================================================
# PROMPT FUNCTIONS FOR AGENT JUDGE
# =============================================================================
class AgentJudgeInitializationError(Exception):
def get_reward(input: str) -> int:
"""
Exception raised when there is an error initializing the AgentJudge.
Determines whether the input contains any positive evaluation keywords and returns a reward.
This function checks if the input string contains any of the following words (case-insensitive):
"correct", "good", "excellent", or "perfect". If any of these words are present, the function
returns 1 as a reward, otherwise it returns 0.
Args:
input (str): The input string to evaluate.
Returns:
int: 1 if a positive evaluation keyword is found, 0 otherwise.
Example:
>>> get_reward("That is correct!")
1
>>> get_reward("Needs improvement.")
0
"""
words = [
"correct",
"good",
"excellent",
"perfect",
]
pass
if any(word in input.lower() for word in words):
return 1
else:
return 0
def get_agent_judge_prompt() -> str:
"""
Returns the main system prompt for the agent judge.
class AgentJudgeExecutionError(Exception):
Returns:
str: The system prompt for the agent judge
"""
Exception raised when there is an error executing the AgentJudge.
return """# Adaptive Output Evaluator - Role and Protocol
Your role is to critically evaluate outputs across diverse domains by first understanding the context, then applying domain-appropriate evaluation criteria to provide a well-reasoned assessment.
## Core Responsibilities
1. **Context Assessment**
- Begin by identifying the domain and specific context of the evaluation (technical, creative, analytical, etc.)
- Determine the appropriate evaluation framework based on domain requirements
- Adjust evaluation criteria and standards to match domain-specific best practices
- If domain is unclear, request clarification with: DOMAIN CLARIFICATION NEEDED: *specific_question*
2. **Input Validation**
- Ensure all necessary information is present for a comprehensive evaluation
- Identify gaps in provided materials that would impact assessment quality
- Request additional context when needed with: ADDITIONAL CONTEXT NEEDED: *specific_information*
- Consider implicit domain knowledge that may influence proper evaluation
3. **Evidence-Based Analysis**
- Apply domain-specific criteria to evaluate accuracy, effectiveness, and appropriateness
- Distinguish between factual claims, reasoned arguments, and subjective opinions
- Flag assumptions or claims lacking sufficient support within domain standards
- Evaluate internal consistency and alignment with established principles in the field
- For technical domains, verify logical and methodological soundness
4. **Comparative Assessment**
- When multiple solutions or approaches are presented, compare relative strengths
- Identify trade-offs between different approaches within domain constraints
- Consider alternative interpretations or solutions not explicitly mentioned
- Balance competing priorities based on domain-specific values and standards
5. **Final Assessment Declaration**
- Present your final assessment with: **EVALUATION_COMPLETE \\boxed{_assessment_summary_}**
- Follow with a concise justification referencing domain-specific standards
- Include constructive feedback for improvement where appropriate
- When appropriate, suggest alternative approaches that align with domain best practices"""
def get_task_evaluation_prompt(outputs: str) -> str:
"""
Returns the task instruction prompt for evaluation.
Args:
outputs (str): The outputs to be evaluated
Returns:
str: The formatted task evaluation prompt
"""
return f"""You are an expert AI agent judge. Carefully review the following output(s) generated by another agent. Your job is to provide a detailed, constructive, and actionable critique that will help the agent improve its future performance. Your feedback should address the following points:
1. Strengths: What did the agent do well? Highlight any correct reasoning, clarity, or effective problem-solving.
2. Weaknesses: Identify any errors, omissions, unclear reasoning, or areas where the output could be improved.
3. Suggestions: Offer specific, practical recommendations for how the agent can improve its next attempt. This may include advice on reasoning, structure, completeness, or style.
4. If relevant, point out any factual inaccuracies or logical inconsistencies.
Be thorough, objective, and professional. Your goal is to help the agent learn and produce better results in the future.
Output(s) to evaluate:
{outputs}"""
# =============================================================================
# EXCEPTION CLASSES
# =============================================================================
class AgentJudgeInitializationError(Exception):
"""
Exception raised when there is an error initializing the AgentJudge.
"""
pass
class AgentJudgeFeedbackCycleError(Exception):
class AgentJudgeExecutionError(Exception):
"""
Exception raised when there is an error in the feedback cycle.
Exception raised when there is an error executing the AgentJudge.
"""
pass
@ -84,9 +181,9 @@ class AgentJudge:
```
Methods:
step(task: str = None, tasks: List[str] = None, img: str = None) -> str:
Processes a single task or list of tasks and returns the agent's evaluation.
run(task: str = None, tasks: List[str] = None, img: str = None) -> List[str]:
step(task: str = None, img: str = None) -> str:
Processes a single task and returns the agent's evaluation.
run(task: str = None, img: str = None) -> List[str]:
Executes evaluation in a loop with context building, collecting responses.
run_batched(tasks: List[str] = None, imgs: List[str] = None) -> List[str]:
@ -98,11 +195,12 @@ class AgentJudge:
id: str = str(uuid.uuid4()),
agent_name: str = "Agent Judge",
description: str = "You're an expert AI agent judge. Carefully review the following output(s) generated by another agent. Your job is to provide a detailed, constructive, and actionable critique that will help the agent improve its future performance.",
system_prompt: str = AGENT_JUDGE_PROMPT,
system_prompt: str = None,
model_name: str = "openai/o1",
max_loops: int = 1,
verbose: bool = False,
evaluation_criteria: Optional[Dict[str, float]] = None,
return_score: bool = False,
*args,
**kwargs,
):
@ -113,98 +211,59 @@ class AgentJudge:
self.conversation = Conversation(time_enabled=False)
self.max_loops = max_loops
self.verbose = verbose
self.return_score = return_score
self.evaluation_criteria = evaluation_criteria or {}
# Enhance system prompt with evaluation criteria if provided
enhanced_prompt = system_prompt
if self.evaluation_criteria:
criteria_str = "\n\nEvaluation Criteria:\n"
for criterion, weight in self.evaluation_criteria.items():
criteria_str += f"- {criterion}: weight = {weight}\n"
enhanced_prompt += criteria_str
self.agent = Agent(
agent_name=agent_name,
agent_description=description,
system_prompt=enhanced_prompt,
system_prompt=self.enhanced_prompt(),
model_name=model_name,
max_loops=1,
*args,
**kwargs,
)
def feedback_cycle_step(
self,
agent: Union[Agent, callable],
task: str,
img: Optional[str] = None,
):
try:
# First run the main agent
agent_output = agent.run(task=task, img=img)
# Then run the judge agent
judge_output = self.run(task=agent_output, img=img)
# Run the main agent again with the judge's feedback, using a much improved prompt
improved_prompt = (
f"You have received the following detailed feedback from the expert agent judge ({self.agent_name}):\n\n"
f"--- FEEDBACK START ---\n{judge_output}\n--- FEEDBACK END ---\n\n"
f"Your task is to thoughtfully revise and enhance your previous output based on this critique. "
f"Carefully address all identified weaknesses, incorporate the suggestions, and strive to maximize the strengths noted. "
f"Be specific, accurate, and actionable in your improvements. "
f"Here is the original task for reference:\n\n"
f"--- TASK ---\n{task}\n--- END TASK ---\n\n"
f"Please provide your improved and fully revised output below."
)
self.reliability_check()
return agent.run(task=improved_prompt, img=img)
except Exception as e:
raise AgentJudgeFeedbackCycleError(
f"Error In Agent Judge Feedback Cycle: {e} Traceback: {traceback.format_exc()}"
def reliability_check(self):
if self.max_loops == 0 or self.max_loops is None:
raise ValueError(
f"AgentJudge: {self.agent_name} max_loops must be greater than 0"
)
def feedback_cycle(
self,
agent: Union[Agent, callable],
task: str,
img: Optional[str] = None,
loops: int = 1,
):
loop = 0
original_task = task # Preserve the original task
current_output = None # Track the current output
all_outputs = [] # Collect all outputs from each iteration
while loop < loops:
# First iteration: run the standard feedback cycle step
current_output = self.feedback_cycle_step(
agent, original_task, img
if self.model_name is None:
raise ValueError(
f"AgentJudge: {self.agent_name} model_name must be provided"
)
# Add the current output to our collection
all_outputs.append(current_output)
loop += 1
def enhanced_prompt(self):
# Enhance system prompt with evaluation criteria if provided
enhanced_prompt = (
self.system_prompt or get_agent_judge_prompt()
)
if self.evaluation_criteria:
criteria_str = "\n\nEvaluation Criteria:\n"
for criterion, weight in self.evaluation_criteria.items():
criteria_str += f"- {criterion}: weight = {weight}\n"
enhanced_prompt += criteria_str
return all_outputs
return enhanced_prompt
def step(
self,
task: str = None,
tasks: Optional[List[str]] = None,
img: Optional[str] = None,
) -> str:
"""
Processes a single task or list of tasks and returns the agent's evaluation.
Processes a single task and returns the agent's evaluation.
This method performs a one-shot evaluation of the provided content. It takes
either a single task string or a list of tasks and generates a comprehensive
evaluation with strengths, weaknesses, and improvement suggestions.
a single task string (response from another LLM or agent) and generates a
comprehensive evaluation with strengths, weaknesses, and improvement suggestions.
Args:
task (str, optional): A single task/output to be evaluated.
tasks (List[str], optional): A list of tasks/outputs to be evaluated.
task (str, optional): The response from another LLM or agent to be evaluated.
img (str, optional): Path to an image file for multimodal evaluation.
Returns:
@ -215,62 +274,38 @@ class AgentJudge:
- Factual accuracy assessment
Raises:
ValueError: If neither task nor tasks are provided.
ValueError: If no task is provided.
Example:
```python
# Single task evaluation
evaluation = judge.step(task="The answer is 42.")
# Multiple tasks evaluation
evaluation = judge.step(tasks=[
"Response 1: Paris is the capital of France",
"Response 2: 2 + 2 = 5" # Incorrect
])
# Multimodal evaluation
evaluation = judge.step(
task="Describe this image",
task="The agent described this image as a cat",
img="path/to/image.jpg"
)
```
"""
try:
prompt = ""
if tasks:
prompt = any_to_str(tasks)
elif task:
prompt = task
else:
raise ValueError("No tasks or task provided")
# 添加评估标准到任务描述中
task_instruction = "You are an expert AI agent judge. Carefully review the following output(s) generated by another agent. "
task_instruction += "Your job is to provide a detailed, constructive, and actionable critique that will help the agent improve its future performance. "
task_instruction += (
"Your feedback should address the following points:\n"
# Use the predefined task evaluation prompt
task_instruction = get_task_evaluation_prompt(
outputs=task
)
task_instruction += "1. Strengths: What did the agent do well? Highlight any correct reasoning, clarity, or effective problem-solving.\n"
task_instruction += "2. Weaknesses: Identify any errors, omissions, unclear reasoning, or areas where the output could be improved.\n"
task_instruction += "3. Suggestions: Offer specific, practical recommendations for how the agent can improve its next attempt. "
task_instruction += "This may include advice on reasoning, structure, completeness, or style.\n"
task_instruction += "4. If relevant, point out any factual inaccuracies or logical inconsistencies.\n"
# 在任务说明中添加评估标准
# Add evaluation criteria if provided
if self.evaluation_criteria:
list(self.evaluation_criteria.keys())
task_instruction += "\nPlease use these specific evaluation criteria with their respective weights:\n"
criteria_str = "\n\nPlease use these specific evaluation criteria with their respective weights:\n"
for (
criterion,
weight,
) in self.evaluation_criteria.items():
task_instruction += (
criteria_str += (
f"- {criterion}: weight = {weight}\n"
)
task_instruction += "Be thorough, objective, and professional. Your goal is to help the agent learn and produce better results in the future.\n\n"
task_instruction += f"Output(s) to evaluate:\n{prompt}\n"
task_instruction += criteria_str
response = self.agent.run(
task=task_instruction,
@ -279,166 +314,84 @@ class AgentJudge:
return response
except Exception as e:
error_message = (
f"AgentJudge encountered an error: {e}\n"
f"Traceback:\n{traceback.format_exc()}\n\n"
"If this issue persists, please:\n"
"- Open a GitHub issue: https://github.com/swarms-ai/swarms/issues\n"
"- Join our Discord for real-time support: swarms.ai\n"
"- Or book a call: https://cal.com/swarms\n"
)
error_message = f"AgentJudge: {self.agent_name} encountered an error: {e}\n Traceback: {traceback.format_exc()}"
raise AgentJudgeExecutionError(error_message)
def run(
self,
task: str = None,
tasks: Optional[List[str]] = None,
img: Optional[str] = None,
):
"""
Executes evaluation in multiple iterations with context building and refinement.
Executes evaluation in a loop with context building, collecting responses.
This method runs the evaluation process for the specified number of max_loops,
where each iteration builds upon the previous context. This allows for iterative
refinement of evaluations and deeper analysis over multiple passes.
This method runs the evaluation multiple times (up to max_loops) to build
context and provide iterative feedback. Each iteration uses the previous
response as context for the next evaluation.
Args:
task (str, optional): A single task/output to be evaluated.
tasks (List[str], optional): A list of tasks/outputs to be evaluated.
task (str, optional): The response from another LLM or agent to be evaluated.
img (str, optional): Path to an image file for multimodal evaluation.
Returns:
List[str]: A list of evaluation responses, one for each iteration.
Each subsequent evaluation includes context from previous iterations.
List[str]: A list of evaluation responses from each iteration.
Example:
```python
# Single task with iterative refinement
judge = AgentJudge(max_loops=3)
evaluations = judge.run(task="Agent output to evaluate")
# Returns 3 evaluations, each building on the previous
# Multiple tasks with context building
evaluations = judge.run(tasks=[
"First agent response",
"Second agent response"
])
# With image analysis
evaluations = judge.run(
task="Analyze this chart",
img="chart.png"
# Evaluate a response with multiple iterations
responses = judge.run(task="The agent said: Paris is the capital of France")
# Multimodal evaluation with multiple iterations
responses = judge.run(
task="The agent described this image as a cat",
img="path/to/image.jpg"
)
```
Note:
- The first iteration evaluates the original task(s)
- Subsequent iterations include context from previous evaluations
- This enables deeper analysis and refinement of judgments
- Useful for complex evaluations requiring multiple perspectives
"""
try:
responses = []
context = ""
# Convert single task to list for consistent processing
if task and not tasks:
tasks = [task]
task = None # Clear to avoid confusion in step method
# The agent will run in a loop, remembering and updating the conversation context at each step.
self.conversation.add(role="user", content=task)
for _ in range(self.max_loops):
# Add context to the tasks if available
if context and tasks:
contextualized_tasks = [
f"Previous context: {context}\nTask: {t}"
for t in tasks
]
else:
contextualized_tasks = tasks
# Retrieve the full conversation context as a string
context = self.conversation.get_str()
# Build the contextualized task, always including the full conversation so far
contextualized_task = f"{context}\n"
# Get response for current iteration
current_response = self.step(
task=task,
tasks=contextualized_tasks,
task=contextualized_task,
img=img,
)
# Add the agent's response to the conversation history
self.conversation.add(
role=self.agent.agent_name,
content=current_response,
)
# The context will be updated automatically in the next loop iteration
responses.append(current_response)
# Update context for next iteration
context = current_response
return responses
# After all loops, return either the reward or the full conversation
if self.return_score:
return get_reward(self.conversation.get_str())
else:
return self.conversation.get_str()
except Exception as e:
error_message = (
f"AgentJudge encountered an error: {e}\n"
f"Traceback:\n{traceback.format_exc()}\n\n"
"If this issue persists, please:\n"
"- Open a GitHub issue: https://github.com/swarms-ai/swarms/issues\n"
"- Join our Discord for real-time support: swarms.ai\n"
"- Or book a call: https://cal.com/swarms\n"
)
error_message = f"AgentJudge: {self.agent_name} encountered an error: {e}\n Traceback: {traceback.format_exc()}"
raise AgentJudgeExecutionError(error_message)
def run_batched(
self,
tasks: Optional[List[str]] = None,
imgs: Optional[List[str]] = None,
):
"""
Executes batch evaluation of multiple tasks with corresponding images.
This method processes multiple task-image pairs independently, where each
task can be evaluated with its corresponding image. Unlike the run() method,
this doesn't build context between different tasks - each is evaluated
independently.
Runs the agent judge on a batch of tasks.
Args:
tasks (List[str], optional): A list of tasks/outputs to be evaluated.
imgs (List[str], optional): A list of image paths corresponding to each task.
Must be the same length as tasks if provided.
tasks (Optional[List[str]]): A list of tasks (strings) to be evaluated.
Returns:
List[List[str]]: A list of evaluation responses for each task. Each inner
list contains the responses from all iterations (max_loops)
for that particular task.
Example:
```python
# Batch evaluation with images
tasks = [
"Describe what you see in this image",
"What's wrong with this chart?",
"Analyze the trends shown"
]
images = [
"photo1.jpg",
"chart1.png",
"graph1.png"
]
evaluations = judge.run_batched(tasks=tasks, imgs=images)
# Returns evaluations for each task-image pair
# Batch evaluation without images
evaluations = judge.run_batched(tasks=[
"Agent response 1",
"Agent response 2",
"Agent response 3"
])
```
Note:
- Each task is processed independently
- If imgs is provided, it must have the same length as tasks
- Each task goes through max_loops iterations independently
- No context is shared between different tasks in the batch
List[List[str]]: A list where each element is the list of evaluation responses
for the corresponding task.
"""
responses = []
for task, img in zip(tasks, imgs):
response = self.run(task=task, img=img)
responses.append(response)
return responses
outputs = []
for task in tasks:
outputs.append(self.run(task=task))
return outputs

@ -46,6 +46,7 @@ class CronJob:
job_id: Unique identifier for the job
is_running: Flag indicating if the job is currently running
thread: Thread object for running the job
callback: Optional callback function to customize output processing
"""
def __init__(
@ -53,6 +54,7 @@ class CronJob:
agent: Optional[Union[Any, Callable]] = None,
interval: Optional[str] = None,
job_id: Optional[str] = None,
callback: Optional[Callable[[Any, str, dict], Any]] = None,
):
"""Initialize the CronJob wrapper.
@ -60,6 +62,12 @@ class CronJob:
agent: The Swarms Agent instance or callable to be scheduled
interval: The interval string (e.g., "5seconds", "10minutes", "1hour")
job_id: Optional unique identifier for the job. If not provided, one will be generated.
callback: Optional callback function to customize output processing.
Signature: callback(output: Any, task: str, metadata: dict) -> Any
- output: The original output from the agent
- task: The task that was executed
- metadata: Dictionary containing job_id, timestamp, execution_count, etc.
Returns: The customized output
Raises:
CronJobConfigError: If the interval format is invalid
@ -70,6 +78,9 @@ class CronJob:
self.is_running = False
self.thread = None
self.schedule = schedule.Scheduler()
self.callback = callback
self.execution_count = 0
self.start_time = None
logger.info(f"Initializing CronJob with ID: {self.job_id}")
@ -242,17 +253,47 @@ class CronJob:
(e.g., img=image_path, streaming_callback=callback_func)
Returns:
Any: The result of the task execution
Any: The result of the task execution (original or customized by callback)
Raises:
CronJobExecutionError: If task execution fails
"""
try:
logger.debug(f"Executing task for job {self.job_id}")
# Execute the agent
if isinstance(self.agent, Callable):
return self.agent.run(task=task, **kwargs)
original_output = self.agent.run(task=task, **kwargs)
else:
return self.agent(task, **kwargs)
original_output = self.agent(task, **kwargs)
# Increment execution count
self.execution_count += 1
# Prepare metadata for callback
metadata = {
"job_id": self.job_id,
"timestamp": time.time(),
"execution_count": self.execution_count,
"task": task,
"kwargs": kwargs,
"start_time": self.start_time,
"is_running": self.is_running
}
# Apply callback if provided
if self.callback:
try:
customized_output = self.callback(original_output, task, metadata)
logger.debug(f"Callback applied to job {self.job_id}, execution {self.execution_count}")
return customized_output
except Exception as callback_error:
logger.warning(f"Callback failed for job {self.job_id}: {callback_error}")
# Return original output if callback fails
return original_output
return original_output
except Exception as e:
logger.error(
f"Task execution failed for job {self.job_id}: {str(e)}"
@ -300,6 +341,7 @@ class CronJob:
try:
if not self.is_running:
self.is_running = True
self.start_time = time.time()
self.thread = threading.Thread(
target=self._run_schedule,
daemon=True,
@ -362,6 +404,31 @@ class CronJob:
f"Schedule loop failed: {str(e)}"
)
def set_callback(self, callback: Callable[[Any, str, dict], Any]):
"""Set or update the callback function for output customization.
Args:
callback: Function to customize output processing.
Signature: callback(output: Any, task: str, metadata: dict) -> Any
"""
self.callback = callback
logger.info(f"Callback updated for job {self.job_id}")
def get_execution_stats(self) -> dict:
"""Get execution statistics for the cron job.
Returns:
dict: Statistics including execution count, start time, running status, etc.
"""
return {
"job_id": self.job_id,
"is_running": self.is_running,
"execution_count": self.execution_count,
"start_time": self.start_time,
"uptime": time.time() - self.start_time if self.start_time else 0,
"interval": self.interval
}
# # Example usage
# if __name__ == "__main__":

@ -2,12 +2,13 @@
- Parameters: description, model_name, system_prompt, max_loops, swarm_type, num_samples, output_types, num_knowledge_items, memory_capacity, eval, random_models_on, majority_voting_prompt, reasoning_model_name
- Methods: select_swarm(), run (task: str, img: Optional[List[str]] = None, **kwargs), batched_run (tasks: List[str], imgs: Optional[List[List[str]]] = None, **kwargs)
"""
import time
from swarms.agents import ReasoningAgentRouter
from swarms.structs.agent import Agent
from datetime import datetime
class TestReport:
def __init__(self):
self.results = []
@ -42,13 +43,21 @@ class TestReport:
report_lines = []
report_lines.append("=" * 60)
report_lines.append("REASONING AGENT ROUTER TEST SUITE REPORT")
report_lines.append(
"REASONING AGENT ROUTER TEST SUITE REPORT"
)
report_lines.append("=" * 60)
if self.start_time:
report_lines.append(f"Test Run Started: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append(
f"Test Run Started: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}"
)
if self.end_time:
report_lines.append(f"Test Run Ended: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append(f"Duration: {duration:.2f} seconds")
report_lines.append(
f"Test Run Ended: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}"
)
report_lines.append(
f"Duration: {duration:.2f} seconds"
)
report_lines.append(f"Total Tests: {total_tests}")
report_lines.append(f"Passed: {passed_tests}")
report_lines.append(f"Failed: {failed_tests}")
@ -65,9 +74,13 @@ class TestReport:
return "\n".join(report_lines)
# INSERT_YOUR_CODE
# Default parameters for ReasoningAgentRouter, can be overridden in each test
DEFAULT_AGENT_NAME = "reasoning-agent"
DEFAULT_DESCRIPTION = "A reasoning agent that can answer questions and help with tasks."
DEFAULT_DESCRIPTION = (
"A reasoning agent that can answer questions and help with tasks."
)
DEFAULT_MODEL_NAME = "gpt-4o-mini"
DEFAULT_SYSTEM_PROMPT = "You are a helpful assistant that can answer questions and help with tasks."
DEFAULT_MAX_LOOPS = 1
@ -77,6 +90,7 @@ DEFAULT_EVAL = False
DEFAULT_RANDOM_MODELS_ON = False
DEFAULT_MAJORITY_VOTING_PROMPT = None
def test_agents_swarm(
agent_name=DEFAULT_AGENT_NAME,
description=DEFAULT_DESCRIPTION,
@ -112,19 +126,39 @@ def test_agents_swarm(
PARAMETERS TESTING
"""
def test_router_description(report):
"""Test ReasoningAgentRouter with custom description (only change description param)"""
start_time = time.time()
try:
result = test_agents_swarm(description="Test description for router")
result = test_agents_swarm(
description="Test description for router"
)
# Check if the description was set correctly
router = ReasoningAgentRouter(description="Test description for router")
router = ReasoningAgentRouter(
description="Test description for router"
)
if router.description == "Test description for router":
report.add_result("Parameter: description", True, duration=time.time() - start_time)
report.add_result(
"Parameter: description",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: description", False, message=f"Expected description 'Test description for router', got '{router.description}'", duration=time.time() - start_time)
report.add_result(
"Parameter: description",
False,
message=f"Expected description 'Test description for router', got '{router.description}'",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: description", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: description",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_model_name(report):
"""Test ReasoningAgentRouter with custom model_name (only change model_name param)"""
@ -133,24 +167,58 @@ def test_router_model_name(report):
result = test_agents_swarm(model_name="gpt-4")
router = ReasoningAgentRouter(model_name="gpt-4")
if router.model_name == "gpt-4":
report.add_result("Parameter: model_name", True, duration=time.time() - start_time)
report.add_result(
"Parameter: model_name",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: model_name", False, message=f"Expected model_name 'gpt-4', got '{router.model_name}'", duration=time.time() - start_time)
report.add_result(
"Parameter: model_name",
False,
message=f"Expected model_name 'gpt-4', got '{router.model_name}'",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: model_name", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: model_name",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_system_prompt(report):
"""Test ReasoningAgentRouter with custom system_prompt (only change system_prompt param)"""
start_time = time.time()
try:
result = test_agents_swarm(system_prompt="You are a test router.")
router = ReasoningAgentRouter(system_prompt="You are a test router.")
result = test_agents_swarm(
system_prompt="You are a test router."
)
router = ReasoningAgentRouter(
system_prompt="You are a test router."
)
if router.system_prompt == "You are a test router.":
report.add_result("Parameter: system_prompt", True, duration=time.time() - start_time)
report.add_result(
"Parameter: system_prompt",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: system_prompt", False, message=f"Expected system_prompt 'You are a test router.', got '{router.system_prompt}'", duration=time.time() - start_time)
report.add_result(
"Parameter: system_prompt",
False,
message=f"Expected system_prompt 'You are a test router.', got '{router.system_prompt}'",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: system_prompt", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: system_prompt",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_max_loops(report):
"""Test ReasoningAgentRouter with custom max_loops (only change max_loops param)"""
@ -159,11 +227,26 @@ def test_router_max_loops(report):
result = test_agents_swarm(max_loops=5)
router = ReasoningAgentRouter(max_loops=5)
if router.max_loops == 5:
report.add_result("Parameter: max_loops", True, duration=time.time() - start_time)
report.add_result(
"Parameter: max_loops",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: max_loops", False, message=f"Expected max_loops 5, got {router.max_loops}", duration=time.time() - start_time)
report.add_result(
"Parameter: max_loops",
False,
message=f"Expected max_loops 5, got {router.max_loops}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: max_loops", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: max_loops",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_swarm_type(report):
"""Test ReasoningAgentRouter with custom swarm_type (only change swarm_type param)"""
@ -172,26 +255,54 @@ def test_router_swarm_type(report):
result = test_agents_swarm(swarm_type="reasoning-agent")
router = ReasoningAgentRouter(swarm_type="reasoning-agent")
if router.swarm_type == "reasoning-agent":
report.add_result("Parameter: swarm_type", True, duration=time.time() - start_time)
report.add_result(
"Parameter: swarm_type",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: swarm_type", False, message=f"Expected swarm_type 'reasoning-agent', got '{router.swarm_type}'", duration=time.time() - start_time)
report.add_result(
"Parameter: swarm_type",
False,
message=f"Expected swarm_type 'reasoning-agent', got '{router.swarm_type}'",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: swarm_type", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: swarm_type",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_num_samples(report):
"""Test ReasoningAgentRouter with custom num_samples (only change num_samples param)"""
start_time = time.time()
try:
router = ReasoningAgentRouter(
num_samples=3
)
router = ReasoningAgentRouter(num_samples=3)
output = router.run("How many samples do you use?")
if router.num_samples == 3:
report.add_result("Parameter: num_samples", True, duration=time.time() - start_time)
report.add_result(
"Parameter: num_samples",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: num_samples", False, message=f"Expected num_samples 3, got {router.num_samples}", duration=time.time() - start_time)
report.add_result(
"Parameter: num_samples",
False,
message=f"Expected num_samples 3, got {router.num_samples}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: num_samples", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: num_samples",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_output_types(report):
"""Test ReasoningAgentRouter with custom output_type (only change output_type param)"""
@ -199,11 +310,26 @@ def test_router_output_types(report):
try:
router = ReasoningAgentRouter(output_type=["text", "json"])
if getattr(router, "output_type", None) == ["text", "json"]:
report.add_result("Parameter: output_type", True, duration=time.time() - start_time)
report.add_result(
"Parameter: output_type",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: output_type", False, message=f"Expected output_type ['text', 'json'], got {getattr(router, 'output_type', None)}", duration=time.time() - start_time)
report.add_result(
"Parameter: output_type",
False,
message=f"Expected output_type ['text', 'json'], got {getattr(router, 'output_type', None)}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: output_type", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: output_type",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_num_knowledge_items(report):
"""Test ReasoningAgentRouter with custom num_knowledge_items (only change num_knowledge_items param)"""
@ -211,11 +337,26 @@ def test_router_num_knowledge_items(report):
try:
router = ReasoningAgentRouter(num_knowledge_items=7)
if router.num_knowledge_items == 7:
report.add_result("Parameter: num_knowledge_items", True, duration=time.time() - start_time)
report.add_result(
"Parameter: num_knowledge_items",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: num_knowledge_items", False, message=f"Expected num_knowledge_items 7, got {router.num_knowledge_items}", duration=time.time() - start_time)
report.add_result(
"Parameter: num_knowledge_items",
False,
message=f"Expected num_knowledge_items 7, got {router.num_knowledge_items}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: num_knowledge_items", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: num_knowledge_items",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_memory_capacity(report):
"""Test ReasoningAgentRouter with custom memory_capacity (only change memory_capacity param)"""
@ -223,11 +364,26 @@ def test_router_memory_capacity(report):
try:
router = ReasoningAgentRouter(memory_capacity=10)
if router.memory_capacity == 10:
report.add_result("Parameter: memory_capacity", True, duration=time.time() - start_time)
report.add_result(
"Parameter: memory_capacity",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: memory_capacity", False, message=f"Expected memory_capacity 10, got {router.memory_capacity}", duration=time.time() - start_time)
report.add_result(
"Parameter: memory_capacity",
False,
message=f"Expected memory_capacity 10, got {router.memory_capacity}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: memory_capacity", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: memory_capacity",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_eval(report):
"""Test ReasoningAgentRouter with eval enabled (only change eval param)"""
@ -236,11 +392,26 @@ def test_router_eval(report):
result = test_agents_swarm(eval=True)
router = ReasoningAgentRouter(eval=True)
if router.eval is True:
report.add_result("Parameter: eval", True, duration=time.time() - start_time)
report.add_result(
"Parameter: eval",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: eval", False, message=f"Expected eval True, got {router.eval}", duration=time.time() - start_time)
report.add_result(
"Parameter: eval",
False,
message=f"Expected eval True, got {router.eval}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: eval", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: eval",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_random_models_on(report):
"""Test ReasoningAgentRouter with random_models_on enabled (only change random_models_on param)"""
@ -249,24 +420,61 @@ def test_router_random_models_on(report):
result = test_agents_swarm(random_models_on=True)
router = ReasoningAgentRouter(random_models_on=True)
if router.random_models_on is True:
report.add_result("Parameter: random_models_on", True, duration=time.time() - start_time)
report.add_result(
"Parameter: random_models_on",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: random_models_on", False, message=f"Expected random_models_on True, got {router.random_models_on}", duration=time.time() - start_time)
report.add_result(
"Parameter: random_models_on",
False,
message=f"Expected random_models_on True, got {router.random_models_on}",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: random_models_on", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: random_models_on",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_majority_voting_prompt(report):
"""Test ReasoningAgentRouter with custom majority_voting_prompt (only change majority_voting_prompt param)"""
start_time = time.time()
try:
result = test_agents_swarm(majority_voting_prompt="Vote for the best answer.")
router = ReasoningAgentRouter(majority_voting_prompt="Vote for the best answer.")
if router.majority_voting_prompt == "Vote for the best answer.":
report.add_result("Parameter: majority_voting_prompt", True, duration=time.time() - start_time)
result = test_agents_swarm(
majority_voting_prompt="Vote for the best answer."
)
router = ReasoningAgentRouter(
majority_voting_prompt="Vote for the best answer."
)
if (
router.majority_voting_prompt
== "Vote for the best answer."
):
report.add_result(
"Parameter: majority_voting_prompt",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: majority_voting_prompt", False, message=f"Expected majority_voting_prompt 'Vote for the best answer.', got '{router.majority_voting_prompt}'", duration=time.time() - start_time)
report.add_result(
"Parameter: majority_voting_prompt",
False,
message=f"Expected majority_voting_prompt 'Vote for the best answer.', got '{router.majority_voting_prompt}'",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: majority_voting_prompt", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: majority_voting_prompt",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_reasoning_model_name(report):
"""Test ReasoningAgentRouter with custom reasoning_model_name (only change reasoning_model_name param)"""
@ -274,17 +482,32 @@ def test_router_reasoning_model_name(report):
try:
router = ReasoningAgentRouter(reasoning_model_name="gpt-3.5")
if router.reasoning_model_name == "gpt-3.5":
report.add_result("Parameter: reasoning_model_name", True, duration=time.time() - start_time)
report.add_result(
"Parameter: reasoning_model_name",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Parameter: reasoning_model_name", False, message=f"Expected reasoning_model_name 'gpt-3.5', got '{router.reasoning_model_name}'", duration=time.time() - start_time)
report.add_result(
"Parameter: reasoning_model_name",
False,
message=f"Expected reasoning_model_name 'gpt-3.5', got '{router.reasoning_model_name}'",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Parameter: reasoning_model_name", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Parameter: reasoning_model_name",
False,
message=str(e),
duration=time.time() - start_time,
)
"""
Methods Testing
"""
def test_router_select_swarm(report):
"""Test ReasoningAgentRouter's select_swarm() method using test_agents_swarm"""
start_time = time.time()
@ -305,9 +528,19 @@ def test_router_select_swarm(report):
# Run the method to test
result = router.select_swarm()
# Determine if the result is as expected (not raising error is enough for this test)
report.add_result("Method: select_swarm()", True, duration=time.time() - start_time)
report.add_result(
"Method: select_swarm()",
True,
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Method: select_swarm()", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Method: select_swarm()",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_run(report):
"""Test ReasoningAgentRouter's run() method using test_agents_swarm"""
@ -332,11 +565,26 @@ def test_router_run(report):
if not isinstance(output, str):
output = str(output)
if isinstance(output, str):
report.add_result("Method: run()", True, duration=time.time() - start_time)
report.add_result(
"Method: run()",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Method: run()", False, message="Output is not a string", duration=time.time() - start_time)
report.add_result(
"Method: run()",
False,
message="Output is not a string",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Method: run()", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Method: run()",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_router_batched_run(report):
"""Test ReasoningAgentRouter's batched_run() method using test_agents_swarm"""
@ -360,17 +608,34 @@ def test_router_batched_run(report):
outputs = router.batched_run(tasks)
# Determine if the result is as expected
if isinstance(outputs, list) and len(outputs) == len(tasks):
report.add_result("Method: batched_run()", True, duration=time.time() - start_time)
report.add_result(
"Method: batched_run()",
True,
duration=time.time() - start_time,
)
else:
report.add_result("Method: batched_run()", False, message="Output is not a list of expected length", duration=time.time() - start_time)
report.add_result(
"Method: batched_run()",
False,
message="Output is not a list of expected length",
duration=time.time() - start_time,
)
except Exception as e:
report.add_result("Method: batched_run()", False, message=str(e), duration=time.time() - start_time)
report.add_result(
"Method: batched_run()",
False,
message=str(e),
duration=time.time() - start_time,
)
def test_swarm(report):
"""
Run all ReasoningAgentRouter parameter and method tests, log results to report, and print summary.
"""
print("\n=== Starting ReasoningAgentRouter Parameter & Method Test Suite ===")
print(
"\n=== Starting ReasoningAgentRouter Parameter & Method Test Suite ==="
)
start_time = time.time()
tests = [
("Parameter: description", test_router_description),
@ -380,12 +645,21 @@ def test_swarm(report):
("Parameter: swarm_type", test_router_swarm_type),
("Parameter: num_samples", test_router_num_samples),
("Parameter: output_types", test_router_output_types),
("Parameter: num_knowledge_items", test_router_num_knowledge_items),
(
"Parameter: num_knowledge_items",
test_router_num_knowledge_items,
),
("Parameter: memory_capacity", test_router_memory_capacity),
("Parameter: eval", test_router_eval),
("Parameter: random_models_on", test_router_random_models_on),
("Parameter: majority_voting_prompt", test_router_majority_voting_prompt),
("Parameter: reasoning_model_name", test_router_reasoning_model_name),
(
"Parameter: majority_voting_prompt",
test_router_majority_voting_prompt,
),
(
"Parameter: reasoning_model_name",
test_router_reasoning_model_name,
),
("Method: select_swarm()", test_router_select_swarm),
("Method: run()", test_router_run),
("Method: batched_run()", test_router_batched_run),
@ -404,6 +678,7 @@ def test_swarm(report):
# INSERT_YOUR_CODE
if __name__ == "__main__":
report = TestReport()
report.start()

Loading…
Cancel
Save