diff --git a/council_judge_example.py b/council_judge_example.py index d6b1ef40..491d5c83 100644 --- a/council_judge_example.py +++ b/council_judge_example.py @@ -9,7 +9,7 @@ if __name__ == "__main__": base_agent = Agent( agent_name="Financial-Analysis-Agent", system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.", - model_name="gpt-4o-mini", + model_name="claude-opus-4-20250514", max_loops=1, ) diff --git a/docs/swarms/structs/council_of_judges.md b/docs/swarms/structs/council_of_judges.md new file mode 100644 index 00000000..7706d994 --- /dev/null +++ b/docs/swarms/structs/council_of_judges.md @@ -0,0 +1,283 @@ +# CouncilAsAJudge + +The `CouncilAsAJudge` is a sophisticated evaluation system that employs multiple AI agents to assess model responses across various dimensions. It provides comprehensive, multi-dimensional analysis of AI model outputs through parallel evaluation and aggregation. + +## Overview + +The `CouncilAsAJudge` implements a council of specialized AI agents that evaluate different aspects of a model's response. Each agent focuses on a specific dimension of evaluation, and their findings are aggregated into a comprehensive report. + +## Key Features + +- Parallel evaluation across multiple dimensions +- Caching system for improved performance +- Dynamic model selection +- Comprehensive evaluation metrics +- Thread-safe execution +- Detailed technical analysis + +## Installation + +```bash +pip install swarms +``` + +## Basic Usage + +```python +from swarms import Agent, CouncilAsAJudge + +# Create a base agent +base_agent = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.", + model_name="claude-opus-4-20250514", + max_loops=1, +) + +# Run the base agent +user_query = "How can I establish a ROTH IRA to buy stocks and get a tax break?" +model_output = base_agent.run(user_query) + +# Create and run the council +panel = CouncilAsAJudge() +results = panel.run(user_query, model_output) +print(results) +``` + +## Advanced Usage + +### Custom Model Configuration + +```python +from swarms import CouncilAsAJudge + +# Initialize with custom model +council = CouncilAsAJudge( + model_name="anthropic/claude-3-sonnet-20240229", + output_type="all", + cache_size=256, + max_workers=4, + random_model_name=False +) +``` + +### Parallel Processing Configuration + +```python +from swarms import CouncilAsAJudge + +# Configure parallel processing +council = CouncilAsAJudge( + max_workers=8, # Custom number of worker threads + random_model_name=True # Enable dynamic model selection +) +``` + +## Evaluation Dimensions + +The council evaluates responses across six key dimensions: + +1. **Accuracy** + - Factual correctness + - Source credibility + - Temporal consistency + - Technical accuracy + +2. **Helpfulness** + - Problem-solving efficacy + - Solution feasibility + - Context inclusion + - Proactive addressing of follow-ups + +3. **Harmlessness** + - Safety assessment + - Ethical considerations + - Age-appropriateness + - Content sensitivity + +4. **Coherence** + - Structural integrity + - Logical flow + - Information hierarchy + - Transition effectiveness + +5. **Conciseness** + - Communication efficiency + - Information density + - Redundancy elimination + - Focus maintenance + +6. **Instruction Adherence** + - Requirement coverage + - Constraint compliance + - Format matching + - Scope appropriateness + +## API Reference + +### CouncilAsAJudge + +```python +class CouncilAsAJudge: + def __init__( + self, + id: str = swarm_id(), + name: str = "CouncilAsAJudge", + description: str = "Evaluates the model's response across multiple dimensions", + model_name: str = "gpt-4o-mini", + output_type: str = "all", + cache_size: int = 128, + max_workers: int = None, + random_model_name: bool = True, + ) +``` + +#### Parameters + +- `id` (str): Unique identifier for the council +- `name` (str): Display name of the council +- `description` (str): Description of the council's purpose +- `model_name` (str): Name of the model to use for evaluations +- `output_type` (str): Type of output to return +- `cache_size` (int): Size of the LRU cache for prompts +- `max_workers` (int): Maximum number of worker threads +- `random_model_name` (bool): Whether to use random model selection + +### Methods + +#### run + +```python +def run(self, task: str, model_response: str) -> None +``` + +Evaluates a model response across all dimensions. + +##### Parameters + +- `task` (str): Original user prompt +- `model_response` (str): Model's response to evaluate + +##### Returns + +- Comprehensive evaluation report + +## Examples + +### Financial Analysis Example + +```python +from swarms import Agent, CouncilAsAJudge + +# Create financial analysis agent +financial_agent = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.", + model_name="claude-opus-4-20250514", + max_loops=1, +) + +# Run analysis +query = "How can I establish a ROTH IRA to buy stocks and get a tax break?" +response = financial_agent.run(query) + +# Evaluate response +council = CouncilAsAJudge() +evaluation = council.run(query, response) +print(evaluation) +``` + +### Technical Documentation Example + +```python +from swarms import Agent, CouncilAsAJudge + +# Create documentation agent +doc_agent = Agent( + agent_name="Documentation-Agent", + system_prompt="You are a technical documentation expert.", + model_name="gpt-4", + max_loops=1, +) + +# Generate documentation +query = "Explain how to implement a REST API using FastAPI" +response = doc_agent.run(query) + +# Evaluate documentation quality +council = CouncilAsAJudge( + model_name="anthropic/claude-3-sonnet-20240229", + output_type="all" +) +evaluation = council.run(query, response) +print(evaluation) +``` + +## Best Practices + +### Model Selection + +!!! tip "Model Selection Best Practices" + - Choose appropriate models for your use case + - Consider using random model selection for diverse evaluations + - Match model capabilities to evaluation requirements + +### Performance Optimization + +!!! note "Performance Tips" + - Adjust cache size based on memory constraints + - Configure worker threads based on CPU cores + - Monitor memory usage with large responses + +### Error Handling + +!!! warning "Error Handling Guidelines" + - Implement proper exception handling + - Monitor evaluation failures + - Log evaluation results for analysis + +### Resource Management + +!!! info "Resource Management" + - Clean up resources after evaluation + - Monitor thread pool usage + - Implement proper shutdown procedures + +## Troubleshooting + +### Memory Issues + +!!! danger "Memory Problems" + If you encounter memory-related problems: + + - Reduce cache size + - Decrease number of worker threads + - Process smaller chunks of text + +### Performance Problems + +!!! warning "Performance Issues" + To improve performance: + + - Increase cache size + - Adjust worker thread count + - Use more efficient models + +### Evaluation Failures + +!!! danger "Evaluation Issues" + When evaluations fail: + + - Check model availability + - Verify input format + - Monitor error logs + +## Contributing + +!!! success "Contributing" + Contributions are welcome! Please feel free to submit a Pull Request. + +## License + +!!! info "License" + This project is licensed under the MIT License - see the LICENSE file for details. \ No newline at end of file diff --git a/example_concurrent.py b/example_concurrent.py deleted file mode 100644 index fb9d9194..00000000 --- a/example_concurrent.py +++ /dev/null @@ -1,92 +0,0 @@ -from swarms.structs.agent import Agent -from te import run_concurrently_greenlets, with_retries -from typing import Callable, List, Tuple - - -# Define some example agent tasks -@with_retries(max_retries=2) -def financial_analysis_task(query: str) -> str: - agent = Agent( - agent_name="Financial-Analysis-Agent", - agent_description="Personal finance advisor agent", - system_prompt="You are a personal finance advisor agent", - max_loops=2, - model_name="gpt-4o-mini", - dynamic_temperature_enabled=True, - interactive=False, - output_type="final", - safety_prompt_on=True, - ) - return agent.run(query) - - -@with_retries(max_retries=2) -def investment_advice_task(query: str) -> str: - agent = Agent( - agent_name="Investment-Advisor-Agent", - agent_description="Investment strategy advisor agent", - system_prompt="You are an investment strategy advisor agent", - max_loops=2, - model_name="gpt-4o-mini", - dynamic_temperature_enabled=True, - interactive=False, - output_type="final", - safety_prompt_on=True, - ) - return agent.run(query) - - -async def market_analysis_task(query: str) -> str: - agent = Agent( - agent_name="Market-Analysis-Agent", - agent_description="Market analysis agent", - system_prompt="You are a market analysis agent", - max_loops=2, - model_name="gpt-4o-mini", - dynamic_temperature_enabled=True, - interactive=False, - output_type="final", - safety_prompt_on=True, - ) - return agent.run(query) - - -def main(): - # Define the tasks to run concurrently - tasks: List[Tuple[Callable, tuple, dict]] = [ - ( - financial_analysis_task, - ("What are the best practices for saving money?",), - {}, - ), - ( - investment_advice_task, - ("What are the current market trends?",), - {}, - ), - ( - market_analysis_task, - ("Analyze the current market conditions",), - {}, - ), - ] - - # Run the tasks concurrently - results = run_concurrently_greenlets( - tasks, - timeout=30, # 30 seconds global timeout - max_concurrency=3, # Run 3 tasks concurrently - max_retries=2, - task_timeout=10, # 10 seconds per task timeout - ) - - # Process and display results - for i, result in enumerate(results): - if isinstance(result, Exception): - print(f"Task {i} failed with error: {result}") - else: - print(f"Task {i} succeeded with result: {result}") - - -if __name__ == "__main__": - main() diff --git a/claude_4.py b/examples/models/claude_4.py similarity index 100% rename from claude_4.py rename to examples/models/claude_4.py diff --git a/claude_4_example.py b/examples/models/claude_4_example.py similarity index 100% rename from claude_4_example.py rename to examples/models/claude_4_example.py diff --git a/redis_conversation.py b/examples/redis_conversation.py similarity index 100% rename from redis_conversation.py rename to examples/redis_conversation.py diff --git a/pinecone_example.py b/examples/single_agent/rag/pinecone_example.py similarity index 100% rename from pinecone_example.py rename to examples/single_agent/rag/pinecone_example.py diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index ca4ef653..3d6f002c 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -78,6 +78,7 @@ from swarms.structs.swarming_architectures import ( star_swarm, ) from swarms.structs.auto_swarm_builder import AutoSwarmBuilder +from swarms.structs.council_judge import CouncilAsAJudge __all__ = [ "Agent", @@ -146,4 +147,5 @@ __all__ = [ "get_agents_info", "get_swarms_info", "AutoSwarmBuilder", + "CouncilAsAJudge", ] diff --git a/swarms/structs/council_judge.py b/swarms/structs/council_judge.py index 20917e51..063d0317 100644 --- a/swarms/structs/council_judge.py +++ b/swarms/structs/council_judge.py @@ -1,11 +1,13 @@ -from typing import Dict, Tuple -from functools import lru_cache import multiprocessing +import uuid from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import lru_cache +from typing import Dict, Tuple + +from loguru import logger + from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation -from loguru import logger -import uuid from swarms.structs.ma_utils import set_random_models_for_agents from swarms.utils.history_output_formatter import ( history_output_formatter, diff --git a/te.py b/te.py deleted file mode 100644 index cf65154c..00000000 --- a/te.py +++ /dev/null @@ -1,245 +0,0 @@ -import gevent -from gevent import monkey, pool -import asyncio -from functools import wraps -from typing import Callable, List, Tuple, Union, Optional, Any, Dict -import time -from contextlib import contextmanager -from dataclasses import dataclass -from datetime import datetime -from loguru import logger - -# Move monkey patching to the top and be more specific about what we patch -monkey.patch_all(thread=False, select=False, ssl=False) - - -@dataclass -class TaskMetrics: - start_time: datetime - end_time: Optional[datetime] = None - success: bool = False - error: Optional[Exception] = None - retries: int = 0 - - -class TaskExecutionError(Exception): - """Custom exception for task execution errors""" - - def __init__(self, task_name: str, error: Exception): - self.task_name = task_name - self.original_error = error - super().__init__( - f"Task {task_name} failed with error: {str(error)}" - ) - - -@contextmanager -def task_timer(task_name: str): - """Context manager for timing task execution""" - start_time = datetime.now() - try: - yield - finally: - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - logger.debug( - f"Task {task_name} completed in {duration:.2f} seconds" - ) - - -def with_retries(max_retries: int = 3, delay: float = 1.0): - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - last_exception = None - for attempt in range(max_retries): - try: - return func(*args, **kwargs) - except Exception as e: - last_exception = e - if attempt < max_retries - 1: - time.sleep( - delay * (attempt + 1) - ) # Exponential backoff - logger.warning( - f"Retry {attempt + 1}/{max_retries} for {func.__name__}" - ) - else: - logger.error( - f"All {max_retries} retries failed for {func.__name__}" - ) - return last_exception # Return the exception instead of raising it - return last_exception - - return wrapper - - return decorator - - -def run_concurrently_greenlets( - tasks: List[Union[Callable, Tuple[Callable, tuple, dict]]], - timeout: Optional[float] = None, - max_concurrency: int = 100, - max_retries: int = 3, - task_timeout: Optional[float] = None, - metrics: Optional[Dict[str, TaskMetrics]] = None, -) -> List[Any]: - """ - Execute multiple tasks concurrently using gevent greenlets. - - Args: - tasks: List of tasks to execute. Each task can be a callable or a tuple of (callable, args, kwargs) - timeout: Global timeout for all tasks in seconds - max_concurrency: Maximum number of concurrent tasks - max_retries: Maximum number of retries per task - task_timeout: Individual task timeout in seconds - metrics: Optional dictionary to store task execution metrics - - Returns: - List of results from all tasks. Failed tasks will return their exception. - """ - if metrics is None: - metrics = {} - - pool_obj = pool.Pool(max_concurrency) - jobs = [] - start_time = datetime.now() - - def wrapper(task_info): - if isinstance(task_info, tuple): - fn, args, kwargs = task_info - else: - fn, args, kwargs = task_info, (), {} - - task_name = ( - fn.__name__ if hasattr(fn, "__name__") else str(fn) - ) - metrics[task_name] = TaskMetrics(start_time=datetime.now()) - - with task_timer(task_name): - try: - if asyncio.iscoroutinefunction(fn): - # Handle async functions - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - if task_timeout: - result = asyncio.wait_for( - fn(*args, **kwargs), - timeout=task_timeout, - ) - else: - result = loop.run_until_complete( - fn(*args, **kwargs) - ) - metrics[task_name].success = True - return result - finally: - loop.close() - else: - if task_timeout: - with gevent.Timeout( - task_timeout, - TimeoutError( - f"Task {task_name} timed out after {task_timeout} seconds" - ), - ): - result = fn(*args, **kwargs) - else: - result = fn(*args, **kwargs) - - if isinstance(result, Exception): - metrics[task_name].error = result - return result - - metrics[task_name].success = True - return result - except Exception as e: - metrics[task_name].error = e - logger.exception( - f"Task {task_name} failed with error: {str(e)}" - ) - return TaskExecutionError(task_name, e) - finally: - metrics[task_name].end_time = datetime.now() - - try: - for task in tasks: - jobs.append(pool_obj.spawn(wrapper, task)) - - gevent.joinall(jobs, timeout=timeout) - - results = [] - for job in jobs: - if job.ready(): - results.append(job.value) - else: - timeout_error = TimeoutError("Task timed out") - results.append(timeout_error) - if hasattr(job, "value") and hasattr( - job.value, "__name__" - ): - metrics[job.value.__name__].error = timeout_error - metrics[job.value.__name__].end_time = ( - datetime.now() - ) - - return results - except Exception: - logger.exception("Fatal error in task execution") - raise - finally: - # Cleanup - pool_obj.kill() - execution_time = (datetime.now() - start_time).total_seconds() - logger.info( - f"Total execution time: {execution_time:.2f} seconds" - ) - - # Log metrics summary - success_count = sum(1 for m in metrics.values() if m.success) - failure_count = len(metrics) - success_count - logger.info( - f"Task execution summary: {success_count} succeeded, {failure_count} failed" - ) - - -# # Example tasks -# @with_retries(max_retries=3) -# def task_1(x: int, y: int): -# import time - -# time.sleep(1) -# return f"task 1 done with {x + y}" - - -# @with_retries(max_retries=3) -# def task_3(): -# import time - -# time.sleep(0.5) -# return "task 3 done" - - -# async def async_task(x: int): -# await asyncio.sleep(1) -# return f"async task done with {x}" - - -# if __name__ == "__main__": -# # Example usage with different types of tasks -# tasks = [ -# (task_1, (1, 2), {}), # Function with args -# (task_3, (), {}), # Function without args (explicit) -# (async_task, (42,), {}), # Async function -# ] - -# results = run_concurrently_greenlets( -# tasks, timeout=5, max_concurrency=10, max_retries=3 -# ) - -# for i, result in enumerate(results): -# if isinstance(result, Exception): -# print(f"Task {i} failed with {result}") -# else: -# print(f"Task {i} succeeded with result: {result}")