[FEAT][CouncilAsAJudge]

dependabot/pip/transformers-gte-4.39.0-and-lt-4.53.0
Kye Gomez 2 weeks ago
parent 3edf5553f7
commit 5d5ac53c9e

@ -9,7 +9,7 @@ if __name__ == "__main__":
base_agent = Agent( base_agent = Agent(
agent_name="Financial-Analysis-Agent", agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.", system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.",
model_name="gpt-4o-mini", model_name="claude-opus-4-20250514",
max_loops=1, max_loops=1,
) )

@ -0,0 +1,283 @@
# CouncilAsAJudge
The `CouncilAsAJudge` is a sophisticated evaluation system that employs multiple AI agents to assess model responses across various dimensions. It provides comprehensive, multi-dimensional analysis of AI model outputs through parallel evaluation and aggregation.
## Overview
The `CouncilAsAJudge` implements a council of specialized AI agents that evaluate different aspects of a model's response. Each agent focuses on a specific dimension of evaluation, and their findings are aggregated into a comprehensive report.
## Key Features
- Parallel evaluation across multiple dimensions
- Caching system for improved performance
- Dynamic model selection
- Comprehensive evaluation metrics
- Thread-safe execution
- Detailed technical analysis
## Installation
```bash
pip install swarms
```
## Basic Usage
```python
from swarms import Agent, CouncilAsAJudge
# Create a base agent
base_agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.",
model_name="claude-opus-4-20250514",
max_loops=1,
)
# Run the base agent
user_query = "How can I establish a ROTH IRA to buy stocks and get a tax break?"
model_output = base_agent.run(user_query)
# Create and run the council
panel = CouncilAsAJudge()
results = panel.run(user_query, model_output)
print(results)
```
## Advanced Usage
### Custom Model Configuration
```python
from swarms import CouncilAsAJudge
# Initialize with custom model
council = CouncilAsAJudge(
model_name="anthropic/claude-3-sonnet-20240229",
output_type="all",
cache_size=256,
max_workers=4,
random_model_name=False
)
```
### Parallel Processing Configuration
```python
from swarms import CouncilAsAJudge
# Configure parallel processing
council = CouncilAsAJudge(
max_workers=8, # Custom number of worker threads
random_model_name=True # Enable dynamic model selection
)
```
## Evaluation Dimensions
The council evaluates responses across six key dimensions:
1. **Accuracy**
- Factual correctness
- Source credibility
- Temporal consistency
- Technical accuracy
2. **Helpfulness**
- Problem-solving efficacy
- Solution feasibility
- Context inclusion
- Proactive addressing of follow-ups
3. **Harmlessness**
- Safety assessment
- Ethical considerations
- Age-appropriateness
- Content sensitivity
4. **Coherence**
- Structural integrity
- Logical flow
- Information hierarchy
- Transition effectiveness
5. **Conciseness**
- Communication efficiency
- Information density
- Redundancy elimination
- Focus maintenance
6. **Instruction Adherence**
- Requirement coverage
- Constraint compliance
- Format matching
- Scope appropriateness
## API Reference
### CouncilAsAJudge
```python
class CouncilAsAJudge:
def __init__(
self,
id: str = swarm_id(),
name: str = "CouncilAsAJudge",
description: str = "Evaluates the model's response across multiple dimensions",
model_name: str = "gpt-4o-mini",
output_type: str = "all",
cache_size: int = 128,
max_workers: int = None,
random_model_name: bool = True,
)
```
#### Parameters
- `id` (str): Unique identifier for the council
- `name` (str): Display name of the council
- `description` (str): Description of the council's purpose
- `model_name` (str): Name of the model to use for evaluations
- `output_type` (str): Type of output to return
- `cache_size` (int): Size of the LRU cache for prompts
- `max_workers` (int): Maximum number of worker threads
- `random_model_name` (bool): Whether to use random model selection
### Methods
#### run
```python
def run(self, task: str, model_response: str) -> None
```
Evaluates a model response across all dimensions.
##### Parameters
- `task` (str): Original user prompt
- `model_response` (str): Model's response to evaluate
##### Returns
- Comprehensive evaluation report
## Examples
### Financial Analysis Example
```python
from swarms import Agent, CouncilAsAJudge
# Create financial analysis agent
financial_agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.",
model_name="claude-opus-4-20250514",
max_loops=1,
)
# Run analysis
query = "How can I establish a ROTH IRA to buy stocks and get a tax break?"
response = financial_agent.run(query)
# Evaluate response
council = CouncilAsAJudge()
evaluation = council.run(query, response)
print(evaluation)
```
### Technical Documentation Example
```python
from swarms import Agent, CouncilAsAJudge
# Create documentation agent
doc_agent = Agent(
agent_name="Documentation-Agent",
system_prompt="You are a technical documentation expert.",
model_name="gpt-4",
max_loops=1,
)
# Generate documentation
query = "Explain how to implement a REST API using FastAPI"
response = doc_agent.run(query)
# Evaluate documentation quality
council = CouncilAsAJudge(
model_name="anthropic/claude-3-sonnet-20240229",
output_type="all"
)
evaluation = council.run(query, response)
print(evaluation)
```
## Best Practices
### Model Selection
!!! tip "Model Selection Best Practices"
- Choose appropriate models for your use case
- Consider using random model selection for diverse evaluations
- Match model capabilities to evaluation requirements
### Performance Optimization
!!! note "Performance Tips"
- Adjust cache size based on memory constraints
- Configure worker threads based on CPU cores
- Monitor memory usage with large responses
### Error Handling
!!! warning "Error Handling Guidelines"
- Implement proper exception handling
- Monitor evaluation failures
- Log evaluation results for analysis
### Resource Management
!!! info "Resource Management"
- Clean up resources after evaluation
- Monitor thread pool usage
- Implement proper shutdown procedures
## Troubleshooting
### Memory Issues
!!! danger "Memory Problems"
If you encounter memory-related problems:
- Reduce cache size
- Decrease number of worker threads
- Process smaller chunks of text
### Performance Problems
!!! warning "Performance Issues"
To improve performance:
- Increase cache size
- Adjust worker thread count
- Use more efficient models
### Evaluation Failures
!!! danger "Evaluation Issues"
When evaluations fail:
- Check model availability
- Verify input format
- Monitor error logs
## Contributing
!!! success "Contributing"
Contributions are welcome! Please feel free to submit a Pull Request.
## License
!!! info "License"
This project is licensed under the MIT License - see the LICENSE file for details.

@ -1,92 +0,0 @@
from swarms.structs.agent import Agent
from te import run_concurrently_greenlets, with_retries
from typing import Callable, List, Tuple
# Define some example agent tasks
@with_retries(max_retries=2)
def financial_analysis_task(query: str) -> str:
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt="You are a personal finance advisor agent",
max_loops=2,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
interactive=False,
output_type="final",
safety_prompt_on=True,
)
return agent.run(query)
@with_retries(max_retries=2)
def investment_advice_task(query: str) -> str:
agent = Agent(
agent_name="Investment-Advisor-Agent",
agent_description="Investment strategy advisor agent",
system_prompt="You are an investment strategy advisor agent",
max_loops=2,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
interactive=False,
output_type="final",
safety_prompt_on=True,
)
return agent.run(query)
async def market_analysis_task(query: str) -> str:
agent = Agent(
agent_name="Market-Analysis-Agent",
agent_description="Market analysis agent",
system_prompt="You are a market analysis agent",
max_loops=2,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
interactive=False,
output_type="final",
safety_prompt_on=True,
)
return agent.run(query)
def main():
# Define the tasks to run concurrently
tasks: List[Tuple[Callable, tuple, dict]] = [
(
financial_analysis_task,
("What are the best practices for saving money?",),
{},
),
(
investment_advice_task,
("What are the current market trends?",),
{},
),
(
market_analysis_task,
("Analyze the current market conditions",),
{},
),
]
# Run the tasks concurrently
results = run_concurrently_greenlets(
tasks,
timeout=30, # 30 seconds global timeout
max_concurrency=3, # Run 3 tasks concurrently
max_retries=2,
task_timeout=10, # 10 seconds per task timeout
)
# Process and display results
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed with error: {result}")
else:
print(f"Task {i} succeeded with result: {result}")
if __name__ == "__main__":
main()

@ -78,6 +78,7 @@ from swarms.structs.swarming_architectures import (
star_swarm, star_swarm,
) )
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from swarms.structs.council_judge import CouncilAsAJudge
__all__ = [ __all__ = [
"Agent", "Agent",
@ -146,4 +147,5 @@ __all__ = [
"get_agents_info", "get_agents_info",
"get_swarms_info", "get_swarms_info",
"AutoSwarmBuilder", "AutoSwarmBuilder",
"CouncilAsAJudge",
] ]

@ -1,11 +1,13 @@
from typing import Dict, Tuple
from functools import lru_cache
import multiprocessing import multiprocessing
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from typing import Dict, Tuple
from loguru import logger
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from loguru import logger
import uuid
from swarms.structs.ma_utils import set_random_models_for_agents from swarms.structs.ma_utils import set_random_models_for_agents
from swarms.utils.history_output_formatter import ( from swarms.utils.history_output_formatter import (
history_output_formatter, history_output_formatter,

245
te.py

@ -1,245 +0,0 @@
import gevent
from gevent import monkey, pool
import asyncio
from functools import wraps
from typing import Callable, List, Tuple, Union, Optional, Any, Dict
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from loguru import logger
# Move monkey patching to the top and be more specific about what we patch
monkey.patch_all(thread=False, select=False, ssl=False)
@dataclass
class TaskMetrics:
start_time: datetime
end_time: Optional[datetime] = None
success: bool = False
error: Optional[Exception] = None
retries: int = 0
class TaskExecutionError(Exception):
"""Custom exception for task execution errors"""
def __init__(self, task_name: str, error: Exception):
self.task_name = task_name
self.original_error = error
super().__init__(
f"Task {task_name} failed with error: {str(error)}"
)
@contextmanager
def task_timer(task_name: str):
"""Context manager for timing task execution"""
start_time = datetime.now()
try:
yield
finally:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logger.debug(
f"Task {task_name} completed in {duration:.2f} seconds"
)
def with_retries(max_retries: int = 3, delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
time.sleep(
delay * (attempt + 1)
) # Exponential backoff
logger.warning(
f"Retry {attempt + 1}/{max_retries} for {func.__name__}"
)
else:
logger.error(
f"All {max_retries} retries failed for {func.__name__}"
)
return last_exception # Return the exception instead of raising it
return last_exception
return wrapper
return decorator
def run_concurrently_greenlets(
tasks: List[Union[Callable, Tuple[Callable, tuple, dict]]],
timeout: Optional[float] = None,
max_concurrency: int = 100,
max_retries: int = 3,
task_timeout: Optional[float] = None,
metrics: Optional[Dict[str, TaskMetrics]] = None,
) -> List[Any]:
"""
Execute multiple tasks concurrently using gevent greenlets.
Args:
tasks: List of tasks to execute. Each task can be a callable or a tuple of (callable, args, kwargs)
timeout: Global timeout for all tasks in seconds
max_concurrency: Maximum number of concurrent tasks
max_retries: Maximum number of retries per task
task_timeout: Individual task timeout in seconds
metrics: Optional dictionary to store task execution metrics
Returns:
List of results from all tasks. Failed tasks will return their exception.
"""
if metrics is None:
metrics = {}
pool_obj = pool.Pool(max_concurrency)
jobs = []
start_time = datetime.now()
def wrapper(task_info):
if isinstance(task_info, tuple):
fn, args, kwargs = task_info
else:
fn, args, kwargs = task_info, (), {}
task_name = (
fn.__name__ if hasattr(fn, "__name__") else str(fn)
)
metrics[task_name] = TaskMetrics(start_time=datetime.now())
with task_timer(task_name):
try:
if asyncio.iscoroutinefunction(fn):
# Handle async functions
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if task_timeout:
result = asyncio.wait_for(
fn(*args, **kwargs),
timeout=task_timeout,
)
else:
result = loop.run_until_complete(
fn(*args, **kwargs)
)
metrics[task_name].success = True
return result
finally:
loop.close()
else:
if task_timeout:
with gevent.Timeout(
task_timeout,
TimeoutError(
f"Task {task_name} timed out after {task_timeout} seconds"
),
):
result = fn(*args, **kwargs)
else:
result = fn(*args, **kwargs)
if isinstance(result, Exception):
metrics[task_name].error = result
return result
metrics[task_name].success = True
return result
except Exception as e:
metrics[task_name].error = e
logger.exception(
f"Task {task_name} failed with error: {str(e)}"
)
return TaskExecutionError(task_name, e)
finally:
metrics[task_name].end_time = datetime.now()
try:
for task in tasks:
jobs.append(pool_obj.spawn(wrapper, task))
gevent.joinall(jobs, timeout=timeout)
results = []
for job in jobs:
if job.ready():
results.append(job.value)
else:
timeout_error = TimeoutError("Task timed out")
results.append(timeout_error)
if hasattr(job, "value") and hasattr(
job.value, "__name__"
):
metrics[job.value.__name__].error = timeout_error
metrics[job.value.__name__].end_time = (
datetime.now()
)
return results
except Exception:
logger.exception("Fatal error in task execution")
raise
finally:
# Cleanup
pool_obj.kill()
execution_time = (datetime.now() - start_time).total_seconds()
logger.info(
f"Total execution time: {execution_time:.2f} seconds"
)
# Log metrics summary
success_count = sum(1 for m in metrics.values() if m.success)
failure_count = len(metrics) - success_count
logger.info(
f"Task execution summary: {success_count} succeeded, {failure_count} failed"
)
# # Example tasks
# @with_retries(max_retries=3)
# def task_1(x: int, y: int):
# import time
# time.sleep(1)
# return f"task 1 done with {x + y}"
# @with_retries(max_retries=3)
# def task_3():
# import time
# time.sleep(0.5)
# return "task 3 done"
# async def async_task(x: int):
# await asyncio.sleep(1)
# return f"async task done with {x}"
# if __name__ == "__main__":
# # Example usage with different types of tasks
# tasks = [
# (task_1, (1, 2), {}), # Function with args
# (task_3, (), {}), # Function without args (explicit)
# (async_task, (42,), {}), # Async function
# ]
# results = run_concurrently_greenlets(
# tasks, timeout=5, max_concurrency=10, max_retries=3
# )
# for i, result in enumerate(results):
# if isinstance(result, Exception):
# print(f"Task {i} failed with {result}")
# else:
# print(f"Task {i} succeeded with result: {result}")
Loading…
Cancel
Save