examples/multi-agent/council

pull/850/head^2
Kye Gomez 3 weeks ago
parent a6fcac5c4d
commit 42aa843dbd

@ -4,12 +4,12 @@ from swarms.structs.agent import Agent
agent = Agent(
agent_name="Clinical-Documentation-Agent",
agent_description="Specialized agent for clinical documentation and "
"medical record analysis",
"medical record analysis",
system_prompt="You are a clinical documentation specialist with expertise "
"in medical terminology, SOAP notes, and healthcare "
"documentation standards. You help analyze and improve "
"clinical documentation for accuracy, completeness, and "
"compliance.",
"in medical terminology, SOAP notes, and healthcare "
"documentation standards. You help analyze and improve "
"clinical documentation for accuracy, completeness, and "
"compliance.",
max_loops=1,
model_name="claude-opus-4-20250514",
dynamic_temperature_enabled=True,

@ -0,0 +1,369 @@
import json
import time
from pathlib import Path
from typing import Any, Dict, Optional
from datasets import load_dataset
from loguru import logger
from tqdm import tqdm
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
# Dataset configurations
DATASET_CONFIGS = {
"gsm8k": "main",
"squad": None, # No specific config needed
"winogrande": None,
"commonsense_qa": None,
}
base_agent = Agent(
agent_name="General-Problem-Solver",
system_prompt="""You are an expert problem solver and analytical thinker with deep expertise across multiple domains. Your role is to break down complex problems, identify key patterns, and provide well-reasoned solutions.
Key Responsibilities:
1. Analyze problems systematically by breaking them into manageable components
2. Identify relevant patterns, relationships, and dependencies
3. Apply logical reasoning and critical thinking to evaluate solutions
4. Consider multiple perspectives and potential edge cases
5. Provide clear, step-by-step explanations of your reasoning
6. Validate solutions against given constraints and requirements
Problem-Solving Framework:
1. Problem Understanding
- Identify the core problem and key objectives
- Clarify constraints and requirements
- Define success criteria
2. Analysis
- Break down complex problems into components
- Identify relevant patterns and relationships
- Consider multiple perspectives and approaches
3. Solution Development
- Generate potential solutions
- Evaluate trade-offs and implications
- Select optimal approach based on criteria
4. Validation
- Test solution against requirements
- Consider edge cases and potential issues
- Verify logical consistency
5. Communication
- Present clear, structured reasoning
- Explain key decisions and trade-offs
- Provide actionable recommendations
Remember to maintain a systematic, analytical approach while being adaptable to different problem domains.""",
model_name="gpt-4o-mini",
max_loops=1,
max_tokens=16000,
)
class CouncilJudgeEvaluator:
"""
Evaluates the Council of Judges using various datasets from Hugging Face.
Checks if the council's output contains the correct answer from the dataset.
"""
def __init__(
self,
base_agent: Optional[Agent] = base_agent,
model_name: str = "gpt-4o-mini",
output_dir: str = "evaluation_results",
):
"""
Initialize the Council Judge Evaluator.
Args:
base_agent: Optional base agent to use for responses
model_name: Model to use for evaluations
output_dir: Directory to save evaluation results
"""
self.council = CouncilAsAJudge(
base_agent=base_agent,
output_type="final",
)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize or load existing results
self.results_file = (
self.output_dir / "evaluation_results.json"
)
self.results = self._load_or_create_results()
def _load_or_create_results(self) -> Dict[str, Any]:
"""Load existing results or create new results structure."""
if self.results_file.exists():
try:
with open(self.results_file, "r") as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning(
"Existing results file is corrupted. Creating new one."
)
return {
"datasets": {},
"last_updated": time.strftime("%Y-%m-%d %H:%M:%S"),
"total_evaluations": 0,
"total_correct": 0,
}
def _save_results(self):
"""Save current results to file."""
self.results["last_updated"] = time.strftime(
"%Y-%m-%d %H:%M:%S"
)
with open(self.results_file, "w") as f:
json.dump(self.results, f, indent=2)
logger.info(f"Results saved to {self.results_file}")
def evaluate_dataset(
self,
dataset_name: str,
split: str = "test",
num_samples: Optional[int] = None,
save_results: bool = True,
) -> Dict[str, Any]:
"""
Evaluate the Council of Judges on a specific dataset.
Args:
dataset_name: Name of the Hugging Face dataset
split: Dataset split to use
num_samples: Number of samples to evaluate (None for all)
save_results: Whether to save results to file
Returns:
Dictionary containing evaluation metrics and results
"""
logger.info(
f"Loading dataset {dataset_name} (split: {split})..."
)
# Get dataset config if needed
config = DATASET_CONFIGS.get(dataset_name)
if config:
dataset = load_dataset(dataset_name, config, split=split)
else:
dataset = load_dataset(dataset_name, split=split)
if num_samples:
dataset = dataset.select(
range(min(num_samples, len(dataset)))
)
# Initialize or get existing dataset results
if dataset_name not in self.results["datasets"]:
self.results["datasets"][dataset_name] = {
"evaluations": [],
"correct_answers": 0,
"total_evaluated": 0,
"accuracy": 0.0,
"last_updated": time.strftime("%Y-%m-%d %H:%M:%S"),
}
start_time = time.time()
for idx, example in enumerate(
tqdm(dataset, desc="Evaluating samples")
):
try:
# Get the input text and correct answer based on dataset structure
input_text = self._get_input_text(
example, dataset_name
)
correct_answer = self._get_correct_answer(
example, dataset_name
)
# Run evaluation through council
evaluation = self.council.run(input_text)
# Check if the evaluation contains the correct answer
is_correct = self._check_answer(
evaluation, correct_answer, dataset_name
)
# Create sample result
sample_result = {
"input": input_text,
"correct_answer": correct_answer,
"evaluation": evaluation,
"is_correct": is_correct,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}
# Update dataset results
self.results["datasets"][dataset_name][
"evaluations"
].append(sample_result)
if is_correct:
self.results["datasets"][dataset_name][
"correct_answers"
] += 1
self.results["total_correct"] += 1
self.results["datasets"][dataset_name][
"total_evaluated"
] += 1
self.results["total_evaluations"] += 1
# Update accuracy
self.results["datasets"][dataset_name]["accuracy"] = (
self.results["datasets"][dataset_name][
"correct_answers"
]
/ self.results["datasets"][dataset_name][
"total_evaluated"
]
)
self.results["datasets"][dataset_name][
"last_updated"
] = time.strftime("%Y-%m-%d %H:%M:%S")
# Save results after each evaluation
if save_results:
self._save_results()
except Exception as e:
logger.error(
f"Error evaluating sample {idx}: {str(e)}"
)
continue
# Calculate final metrics
results = {
"dataset": dataset_name,
"split": split,
"num_samples": len(dataset),
"evaluations": self.results["datasets"][dataset_name][
"evaluations"
],
"correct_answers": self.results["datasets"][dataset_name][
"correct_answers"
],
"total_evaluated": self.results["datasets"][dataset_name][
"total_evaluated"
],
"accuracy": self.results["datasets"][dataset_name][
"accuracy"
],
"total_time": time.time() - start_time,
}
return results
def _get_input_text(
self, example: Dict, dataset_name: str
) -> str:
"""Extract input text based on dataset structure."""
if dataset_name == "gsm8k":
return example["question"]
elif dataset_name == "squad":
return example["question"]
elif dataset_name == "winogrande":
return example["sentence"]
elif dataset_name == "commonsense_qa":
return example["question"]
else:
# Default to first field that looks like text
for key, value in example.items():
if isinstance(value, str) and len(value) > 10:
return value
raise ValueError(
f"Could not find input text in example for dataset {dataset_name}"
)
def _get_correct_answer(
self, example: Dict, dataset_name: str
) -> str:
"""Extract correct answer based on dataset structure."""
if dataset_name == "gsm8k":
return str(example["answer"])
elif dataset_name == "squad":
return (
example["answers"]["text"][0]
if isinstance(example["answers"], dict)
else str(example["answers"])
)
elif dataset_name == "winogrande":
return str(example["answer"])
elif dataset_name == "commonsense_qa":
return str(example["answerKey"])
else:
# Try to find an answer field
for key in ["answer", "answers", "label", "target"]:
if key in example:
return str(example[key])
raise ValueError(
f"Could not find correct answer in example for dataset {dataset_name}"
)
def _check_answer(
self, evaluation: str, correct_answer: str, dataset_name: str
) -> bool:
"""Check if the evaluation contains the correct answer."""
# Convert both to lowercase for case-insensitive comparison
evaluation_lower = evaluation.lower()
correct_answer_lower = correct_answer.lower()
# For GSM8K, we need to extract the final numerical answer
if dataset_name == "gsm8k":
try:
# Look for the final answer in the format "The answer is X" or "Answer: X"
import re
final_answer = re.search(
r"(?:the answer is|answer:)\s*(\d+)",
evaluation_lower,
)
if final_answer:
return (
final_answer.group(1) == correct_answer_lower
)
except:
pass
# For other datasets, check if the correct answer is contained in the evaluation
return correct_answer_lower in evaluation_lower
def main():
# Example usage
evaluator = CouncilJudgeEvaluator()
# Evaluate on multiple datasets
datasets = ["gsm8k", "squad", "winogrande", "commonsense_qa"]
for dataset in datasets:
try:
logger.info(f"\nEvaluating on {dataset}...")
results = evaluator.evaluate_dataset(
dataset_name=dataset,
split="test",
num_samples=10, # Limit samples for testing
)
# Print summary
print(f"\nResults for {dataset}:")
print(f"Accuracy: {results['accuracy']:.3f}")
print(
f"Correct answers: {results['correct_answers']}/{results['total_evaluated']}"
)
print(f"Total time: {results['total_time']:.2f} seconds")
except Exception as e:
logger.error(f"Error evaluating {dataset}: {str(e)}")
continue
if __name__ == "__main__":
main()

@ -1,7 +1,6 @@
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
# ========== USAGE EXAMPLE ==========
if __name__ == "__main__":
user_query = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
@ -11,11 +10,12 @@ if __name__ == "__main__":
system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.",
model_name="claude-opus-4-20250514",
max_loops=1,
max_tokens=16000,
)
model_output = base_agent.run(user_query)
# model_output = base_agent.run(user_query)
panel = CouncilAsAJudge()
results = panel.run(user_query, model_output)
panel = CouncilAsAJudge(base_agent=base_agent)
results = panel.run(user_query)
print(results)

@ -0,0 +1,19 @@
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
if __name__ == "__main__":
user_query = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
base_agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial expert helping users understand and establish ROTH IRAs.",
model_name="claude-opus-4-20250514",
max_loops=1,
max_tokens=16000,
)
panel = CouncilAsAJudge(base_agent=base_agent)
results = panel.run(user_query)
print(results)

@ -2,7 +2,7 @@ import multiprocessing
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from typing import Dict, Tuple
from typing import Dict, Optional, Tuple
from loguru import logger
@ -140,29 +140,31 @@ def build_judge_prompt(
)
evaluation_focus = EVAL_DIMENSIONS[dimension_name]
return f"""## Evaluation Dimension: {dimension_name.upper()}
return f"""
## Evaluation Dimension: {dimension_name.upper()}
{evaluation_focus}
{evaluation_focus}
Your task is to provide a detailed, technical analysis of the model response focusing exclusively on the {dimension_name} dimension.
Your task is to provide a detailed, technical analysis of the model response focusing exclusively on the {dimension_name} dimension.
Guidelines:
1. Be specific and reference exact parts of the response
2. Explain the reasoning behind your observations
3. Provide concrete examples of both strengths and weaknesses
4. Suggest specific improvements where applicable
5. Maintain a technical, analytical tone
Guidelines:
1. Be specific and reference exact parts of the response
2. Explain the reasoning behind your observations
3. Provide concrete examples of both strengths and weaknesses
4. Suggest specific improvements where applicable
5. Maintain a technical, analytical tone
--- BEGIN USER PROMPT ---
{user_prompt}
--- END USER PROMPT ---
--- BEGIN USER PROMPT ---
{user_prompt}
--- END USER PROMPT ---
--- BEGIN MODEL RESPONSE ---
{model_response}
--- END MODEL RESPONSE ---
--- BEGIN MODEL RESPONSE ---
{model_response}
--- END MODEL RESPONSE ---
### Technical Analysis ({dimension_name.upper()} Dimension):
Provide a comprehensive analysis that would be valuable for model improvement."""
### Technical Analysis ({dimension_name.upper()} Dimension):
Provide a comprehensive analysis that would be valuable for model improvement.
"""
@lru_cache(maxsize=128)
@ -250,7 +252,10 @@ class CouncilAsAJudge:
output_type: str = "all",
cache_size: int = 128,
max_workers: int = None,
base_agent: Optional[Agent] = None,
random_model_name: bool = True,
max_loops: int = 1,
aggregation_model_name: str = "gpt-4o-mini",
):
"""
Initialize the CouncilAsAJudge.
@ -270,7 +275,10 @@ class CouncilAsAJudge:
self.output_type = output_type
self.cache_size = cache_size
self.max_workers = max_workers
self.base_agent = base_agent
self.random_model_name = random_model_name
self.max_loops = max_loops
self.aggregation_model_name = aggregation_model_name
self.reliability_check()
@ -371,7 +379,7 @@ class CouncilAsAJudge:
return Agent(
agent_name="aggregator_agent",
system_prompt=aggregator_system_prompt(),
model_name="anthropic/claude-3-sonnet-20240229",
model_name=self.aggregation_model_name,
max_loops=1,
dynamic_temperature_enabled=True,
output_type="final",
@ -407,7 +415,9 @@ class CouncilAsAJudge:
prompt = build_judge_prompt(
dim, user_prompt, model_response
)
result = agent.run(prompt)
result = agent.run(
f"{prompt} \n\n Evaluate the following agent {self.base_agent.agent_name} response for the {dim} dimension: {model_response}."
)
self.conversation.add(
role=agent.agent_name,
@ -420,7 +430,9 @@ class CouncilAsAJudge:
f"Failed to evaluate dimension {dim}: {str(e)}"
)
def run(self, task: str, model_response: str) -> None:
def run(
self, task: str, model_response: Optional[str] = None
) -> None:
"""
Run the evaluation process using ThreadPoolExecutor.
@ -433,6 +445,16 @@ class CouncilAsAJudge:
"""
try:
# Run the base agent
if self.base_agent and model_response is None:
model_response = self.base_agent.run(task=task)
self.conversation.add(
role="User",
content=task,
)
# Create tasks for all dimensions
tasks = [
(dim, agent, task, model_response)
@ -483,6 +505,32 @@ class CouncilAsAJudge:
content=final_report,
)
# Synthesize feedback and generate improved response
feedback_prompt = f"""
Based on the comprehensive evaluations from our expert council of judges, please refine your response to the original task.
Original Task:
{task}
Council Feedback:
{aggregation_prompt}
Please:
1. Carefully consider all feedback points
2. Address any identified weaknesses
3. Maintain or enhance existing strengths
4. Provide a refined, improved response that incorporates the council's insights
Your refined response:
"""
final_report = self.base_agent.run(task=feedback_prompt)
self.conversation.add(
role=self.base_agent.agent_name,
content=final_report,
)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,

@ -394,6 +394,7 @@ class SwarmRouter:
description=self.description,
model_name=self.model_name,
output_type=self.output_type,
base_agent=self.agents[0] if self.agents else None,
)
elif self.swarm_type == "DeepResearchSwarm":

Loading…
Cancel
Save