agent roles for routers

pull/781/head
Kye Gomez 4 months ago
parent 506d11601c
commit 311d5451b1

@ -24,6 +24,7 @@ agent = Agent(
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
roles="director",
)
agent.run(

@ -1,36 +0,0 @@
from swarms.structs.agent import Agent
from swarms.structs.groupchat import GroupChat
if __name__ == "__main__":
# Example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
)
agent2 = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
model_name="gpt-4o",
max_loops=1,
dynamic_temperature_enabled=True,
)
agents = [agent1, agent2]
chat = GroupChat(
name="Investment Advisory",
description="Financial and tax analysis group",
agents=agents,
max_loops=1,
)
history = chat.run(
"How to optimize tax strategy for investments?"
)
print(history)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.2.4"
version = "7.2.6"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,156 +0,0 @@
import asyncio
from swarms import Agent
from dotenv import load_dotenv
from swarms_tools import coin_gecko_coin_api
load_dotenv()
CRYPTO_ANALYST_SYSTEM_PROMPT = """
You are an expert cryptocurrency financial analyst with deep expertise in:
1. Technical Analysis
- Chart patterns and indicators (RSI, MACD, Bollinger Bands)
- Volume analysis and market momentum
- Support and resistance levels
- Trend analysis and price action
2. Fundamental Analysis
- Tokenomics evaluation
- Network metrics (TVL, daily active users, transaction volume)
- Protocol revenue and growth metrics
- Market capitalization analysis
- Token utility and use cases
3. Market Analysis
- Market sentiment analysis
- Correlation with broader crypto market
- Impact of macro events
- Institutional adoption metrics
- DeFi and NFT market analysis
4. Risk Assessment
- Volatility metrics
- Liquidity analysis
- Smart contract risks
- Regulatory considerations
- Exchange exposure risks
5. Data Analysis Methods
- On-chain metrics analysis
- Whale wallet tracking
- Exchange inflow/outflow
- Mining/Staking statistics
- Network health indicators
When analyzing crypto assets, always:
1. Start with a comprehensive market overview
2. Examine both on-chain and off-chain metrics
3. Consider multiple timeframes (short, medium, long-term)
4. Evaluate risk-reward ratios
5. Assess market sentiment and momentum
6. Consider regulatory and security factors
7. Analyze correlations with BTC, ETH, and traditional markets
8. Examine liquidity and volume profiles
9. Review recent protocol developments and updates
10. Consider macro economic factors
Format your analysis with:
- Clear section headings
- Relevant metrics and data points
- Risk warnings and disclaimers
- Price action analysis
- Market sentiment summary
- Technical indicators
- Fundamental factors
- Clear recommendations with rationale
Remember to:
- Always provide data-driven insights
- Include both bullish and bearish scenarios
- Highlight key risk factors
- Consider market cycles and seasonality
- Maintain objectivity in analysis
- Cite sources for data and claims
- Update analysis based on new market conditions
"""
# Initialize multiple crypto analysis agents with different specialties
technical_analyst = Agent(
agent_name="Technical-Analyst",
agent_description="Expert in technical analysis and chart patterns",
system_prompt=CRYPTO_ANALYST_SYSTEM_PROMPT,
max_loops=1,
model_name="gpt-4o",
dynamic_temperature_enabled=True,
user_name="tech_analyst",
output_type="str",
)
# List of coins to analyze
coins = ["solana", "raydium", "aixbt", "jupiter"]
# Dictionary to store analyses
coin_analyses = {}
async def analyze_coin(coin, technical_analyst):
print(f"\n=== Technical Analysis for {coin.upper()} ===\n")
# Fetch market data
gecko_data = coin_gecko_coin_api(coin)
# Get technical analysis
analysis = await technical_analyst.arun(
f"""Analyze {coin}'s technical indicators and price action using this data:
CoinGecko Data: {gecko_data}
Focus on:
- Chart patterns and trends
- Support/resistance levels
- Momentum indicators
- Price targets and risk levels
- Overall technical strength rating (1-10)
End with a clear technical strength score out of 10.
"""
)
return coin, analysis
async def main():
# Create tasks for concurrent execution
tasks = [analyze_coin(coin, technical_analyst) for coin in coins]
# Execute all analyses concurrently
results = await asyncio.gather(*tasks)
# Store results in coin_analyses
for coin, analysis in results:
coin_analyses[coin] = analysis
# Have technical analyst compare and recommend best investment
consensus = await technical_analyst.arun(
f"""Based on your technical analysis of these coins:
Solana Analysis:
{coin_analyses['solana']}
Raydium Analysis:
{coin_analyses['raydium']}
Jupiter Analysis:
{coin_analyses['jupiter']}
AIXBT Analysis:
{coin_analyses['aixbt']}
Please:
1. Rank the coins from strongest to weakest technical setup
2. Identify which coin has the best risk/reward ratio
3. Make a clear recommendation on which coin is the best investment opportunity and why
4. Note any key risks or concerns with the recommended coin
"""
)
return consensus
# Run the async main function
consensus = asyncio.run(main())

@ -54,6 +54,7 @@ from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.formatter import formatter
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.structs.agent_roles import agent_roles
# Utils
@ -338,6 +339,7 @@ class Agent:
model_name: str = None,
llm_args: dict = None,
load_state_path: str = None,
role: agent_roles = "worker",
*args,
**kwargs,
):
@ -454,6 +456,7 @@ class Agent:
self.model_name = model_name
self.llm_args = llm_args
self.load_state_path = load_state_path
self.role = role
# Initialize the short term memory
self.short_memory = Conversation(
@ -2597,3 +2600,9 @@ class Agent:
outputs.append(output)
return outputs
def get_agent_role(self) -> str:
"""
Get the role of the agent.
"""
return self.role

@ -0,0 +1,38 @@
from typing import Literal
agent_roles = Literal[
"supervisor",
"director",
"boss",
"generator",
"evaluator",
"revisor",
"worker",
"manager",
"team-leader",
"team-manager",
"team-lead",
"researcher",
"analyst",
"architect",
"developer",
"tester",
"qa",
"designer",
"product-owner",
"scrum-master",
"coordinator",
"mentor",
"trainer",
"consultant",
"specialist",
"expert",
"strategist",
"planner",
"executor",
"reviewer",
"auditor",
]
# print(agent_roles)

@ -4,7 +4,8 @@ from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Union, Callable, Any
from swarms import Agent
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.lazy_loader import lazy_import_decorator
# from swarms.utils.lazy_loader import lazy_import_decorator
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
)
@ -13,7 +14,6 @@ from swarms.utils.auto_download_check_packages import (
logger = initialize_logger(log_folder="agent_router")
@lazy_import_decorator
class AgentRouter:
"""
Initialize the AgentRouter.

@ -0,0 +1,580 @@
"""
TalkHier: A hierarchical multi-agent framework for content generation and refinement.
Implements structured communication and evaluation protocols.
"""
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from swarms import Agent
from swarms.structs.conversation import Conversation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgentRole(Enum):
"""Defines the possible roles for agents in the system."""
SUPERVISOR = "supervisor"
GENERATOR = "generator"
EVALUATOR = "evaluator"
REVISOR = "revisor"
@dataclass
class CommunicationEvent:
"""Represents a structured communication event between agents."""
message: str
background: Optional[str] = None
intermediate_output: Optional[Dict[str, Any]] = None
class TalkHier:
"""
A hierarchical multi-agent system for content generation and refinement.
Implements the TalkHier framework with structured communication protocols
and hierarchical refinement processes.
Attributes:
max_iterations: Maximum number of refinement iterations
quality_threshold: Minimum score required for content acceptance
model_name: Name of the LLM model to use
base_path: Path for saving agent states
"""
def __init__(
self,
max_iterations: int = 3,
quality_threshold: float = 0.8,
model_name: str = "gpt-4",
base_path: Optional[str] = None,
return_string: bool = False,
):
"""Initialize the TalkHier system."""
self.max_iterations = max_iterations
self.quality_threshold = quality_threshold
self.model_name = model_name
self.return_string = return_string
self.base_path = (
Path(base_path) if base_path else Path("./agent_states")
)
self.base_path.mkdir(exist_ok=True)
# Initialize agents
self._init_agents()
# Create conversation
self.conversation = Conversation()
def _safely_parse_json(self, json_str: str) -> Dict[str, Any]:
"""
Safely parse JSON string, handling various formats and potential errors.
Args:
json_str: String containing JSON data
Returns:
Parsed dictionary
"""
try:
# Try direct JSON parsing
return json.loads(json_str)
except json.JSONDecodeError:
try:
# Try extracting JSON from potential text wrapper
import re
json_match = re.search(r"\{.*\}", json_str, re.DOTALL)
if json_match:
return json.loads(json_match.group())
# Try extracting from markdown code blocks
code_block_match = re.search(
r"```(?:json)?\s*(\{.*?\})\s*```",
json_str,
re.DOTALL,
)
if code_block_match:
return json.loads(code_block_match.group(1))
except Exception as e:
logger.warning(f"Failed to extract JSON: {str(e)}")
# Fallback: create structured dict from text
return {
"content": json_str,
"metadata": {
"parsed": False,
"timestamp": str(datetime.now()),
},
}
def _init_agents(self) -> None:
"""Initialize all agents with their specific roles and prompts."""
# Main supervisor agent
self.main_supervisor = Agent(
agent_name="Main-Supervisor",
system_prompt=self._get_supervisor_prompt(),
model_name=self.model_name,
max_loops=1,
saved_state_path=str(
self.base_path / "main_supervisor.json"
),
verbose=True,
)
# Generator agent
self.generator = Agent(
agent_name="Content-Generator",
system_prompt=self._get_generator_prompt(),
model_name=self.model_name,
max_loops=1,
saved_state_path=str(self.base_path / "generator.json"),
verbose=True,
)
# Evaluators
self.evaluators = [
Agent(
agent_name=f"Evaluator-{i}",
system_prompt=self._get_evaluator_prompt(i),
model_name=self.model_name,
max_loops=1,
saved_state_path=str(
self.base_path / f"evaluator_{i}.json"
),
verbose=True,
)
for i in range(3)
]
# Revisor agent
self.revisor = Agent(
agent_name="Content-Revisor",
system_prompt=self._get_revisor_prompt(),
model_name=self.model_name,
max_loops=1,
saved_state_path=str(self.base_path / "revisor.json"),
verbose=True,
)
def _get_supervisor_prompt(self) -> str:
"""Get the prompt for the supervisor agent."""
return """You are a Supervisor agent responsible for orchestrating the content generation process. Your role is to analyze tasks, develop strategies, and coordinate other agents effectively.
You must carefully analyze each task to understand:
- The core objectives and requirements
- Target audience and their needs
- Complexity level and scope
- Any constraints or special considerations
Based on your analysis, develop a clear strategy that:
- Breaks down the task into manageable steps
- Identifies which agents are best suited for each step
- Anticipates potential challenges
- Sets clear success criteria
Output all responses in strict JSON format:
{
"thoughts": {
"task_analysis": "Detailed analysis of requirements, audience, scope, and constraints",
"strategy": "Step-by-step plan including agent allocation and success metrics",
"concerns": "Potential challenges, edge cases, and mitigation strategies"
},
"next_action": {
"agent": "Specific agent to engage (Generator, Evaluator, or Revisor)",
"instruction": "Detailed instructions including context, requirements, and expected output"
}
}"""
def _get_generator_prompt(self) -> str:
"""Get the prompt for the generator agent."""
return """You are a Generator agent responsible for creating high-quality, original content. Your role is to produce content that is engaging, informative, and tailored to the target audience.
When generating content:
- Thoroughly research and fact-check all information
- Structure content logically with clear flow
- Use appropriate tone and language for the target audience
- Include relevant examples and explanations
- Ensure content is original and plagiarism-free
- Consider SEO best practices where applicable
Output all responses in strict JSON format:
{
"content": {
"main_body": "The complete generated content with proper formatting and structure",
"metadata": {
"word_count": "Accurate word count of main body",
"target_audience": "Detailed audience description",
"key_points": ["List of main points covered"],
"sources": ["List of reference sources if applicable"],
"readability_level": "Estimated reading level",
"tone": "Description of content tone"
}
}
}"""
def _get_evaluator_prompt(self, evaluator_id: int) -> str:
"""Get the prompt for an evaluator agent."""
return f"""You are Evaluator {evaluator_id}, responsible for critically assessing content quality. Your evaluation must be thorough, objective, and constructive.
Evaluate content across multiple dimensions:
- Accuracy: factual correctness, source reliability
- Clarity: readability, organization, flow
- Coherence: logical consistency, argument structure
- Engagement: interest level, relevance
- Completeness: topic coverage, depth
- Technical quality: grammar, spelling, formatting
- Audience alignment: appropriate level and tone
Output all responses in strict JSON format:
{{
"scores": {{
"overall": "0.0-1.0 composite score",
"categories": {{
"accuracy": "0.0-1.0 score with evidence",
"clarity": "0.0-1.0 score with examples",
"coherence": "0.0-1.0 score with analysis",
"engagement": "0.0-1.0 score with justification",
"completeness": "0.0-1.0 score with gaps identified",
"technical_quality": "0.0-1.0 score with issues noted",
"audience_alignment": "0.0-1.0 score with reasoning"
}}
}},
"feedback": [
"Specific, actionable improvement suggestions",
"Examples of issues found",
"Recommendations for enhancement"
],
"strengths": ["Notable positive aspects"],
"weaknesses": ["Areas needing improvement"]
}}"""
def _get_revisor_prompt(self) -> str:
"""Get the prompt for the revisor agent."""
return """You are a Revisor agent responsible for improving content based on evaluator feedback. Your role is to enhance content while maintaining its core message and purpose.
When revising content:
- Address all evaluator feedback systematically
- Maintain consistency in tone and style
- Preserve accurate information
- Enhance clarity and flow
- Fix technical issues
- Optimize for target audience
- Track all changes made
Output all responses in strict JSON format:
{
"revised_content": {
"main_body": "Complete revised content incorporating all improvements",
"metadata": {
"word_count": "Updated word count",
"changes_made": [
"Detailed list of specific changes and improvements",
"Reasoning for each major revision",
"Feedback points addressed"
],
"improvement_summary": "Overview of main enhancements",
"preserved_elements": ["Key elements maintained from original"],
"revision_approach": "Strategy used for revisions"
}
}
}"""
def _evaluate_content(
self, content: Union[str, Dict]
) -> Dict[str, Any]:
"""
Coordinate the evaluation of content across multiple evaluators.
Args:
content: Content to evaluate (string or dict)
Returns:
Combined evaluation results
"""
try:
# Ensure content is in correct format
content_dict = (
self._safely_parse_json(content)
if isinstance(content, str)
else content
)
# Collect evaluations
evaluations = []
for evaluator in self.evaluators:
try:
eval_response = evaluator.run(
json.dumps(content_dict)
)
self.conversation.add(
role=evaluator.agent_name,
content=eval_response,
)
eval_data = self._safely_parse_json(eval_response)
evaluations.append(eval_data)
except Exception as e:
logger.warning(f"Evaluator error: {str(e)}")
evaluations.append(
self._get_fallback_evaluation()
)
# Aggregate results
return self._aggregate_evaluations(evaluations)
except Exception as e:
logger.error(f"Evaluation error: {str(e)}")
return self._get_fallback_evaluation()
def _get_fallback_evaluation(self) -> Dict[str, Any]:
"""Get a safe fallback evaluation result."""
return {
"scores": {
"overall": 0.5,
"categories": {
"accuracy": 0.5,
"clarity": 0.5,
"coherence": 0.5,
},
},
"feedback": ["Evaluation failed"],
"metadata": {
"timestamp": str(datetime.now()),
"is_fallback": True,
},
}
def _aggregate_evaluations(
self, evaluations: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Aggregate multiple evaluation results into a single evaluation.
Args:
evaluations: List of evaluation results
Returns:
Combined evaluation
"""
# Calculate average scores
overall_scores = []
accuracy_scores = []
clarity_scores = []
coherence_scores = []
all_feedback = []
for eval_data in evaluations:
try:
scores = eval_data.get("scores", {})
overall_scores.append(scores.get("overall", 0.5))
categories = scores.get("categories", {})
accuracy_scores.append(
categories.get("accuracy", 0.5)
)
clarity_scores.append(categories.get("clarity", 0.5))
coherence_scores.append(
categories.get("coherence", 0.5)
)
all_feedback.extend(eval_data.get("feedback", []))
except Exception as e:
logger.warning(
f"Error aggregating evaluation: {str(e)}"
)
def safe_mean(scores: List[float]) -> float:
return sum(scores) / len(scores) if scores else 0.5
return {
"scores": {
"overall": safe_mean(overall_scores),
"categories": {
"accuracy": safe_mean(accuracy_scores),
"clarity": safe_mean(clarity_scores),
"coherence": safe_mean(coherence_scores),
},
},
"feedback": list(set(all_feedback)), # Remove duplicates
"metadata": {
"evaluator_count": len(evaluations),
"timestamp": str(datetime.now()),
},
}
def run(self, task: str) -> Dict[str, Any]:
"""
Generate and iteratively refine content based on the given task.
Args:
task: Content generation task description
Returns:
Dictionary containing final content and metadata
"""
logger.info(f"Starting content generation for task: {task}")
try:
# Get initial direction from supervisor
supervisor_response = self.main_supervisor.run(task)
self.conversation.add(
role=self.main_supervisor.agent_name,
content=supervisor_response,
)
supervisor_data = self._safely_parse_json(
supervisor_response
)
# Generate initial content
generator_response = self.generator.run(
json.dumps(supervisor_data.get("next_action", {}))
)
self.conversation.add(
role=self.generator.agent_name,
content=generator_response,
)
current_content = self._safely_parse_json(
generator_response
)
for iteration in range(self.max_iterations):
logger.info(f"Starting iteration {iteration + 1}")
# Evaluate current content
evaluation = self._evaluate_content(current_content)
# Check if quality threshold is met
if (
evaluation["scores"]["overall"]
>= self.quality_threshold
):
logger.info(
"Quality threshold met, returning content"
)
return {
"content": current_content.get(
"content", {}
).get("main_body", ""),
"final_score": evaluation["scores"][
"overall"
],
"iterations": iteration + 1,
"metadata": {
"content_metadata": current_content.get(
"content", {}
).get("metadata", {}),
"evaluation": evaluation,
},
}
# Revise content if needed
revision_input = {
"content": current_content,
"evaluation": evaluation,
}
revision_response = self.revisor.run(
json.dumps(revision_input)
)
current_content = self._safely_parse_json(
revision_response
)
self.conversation.add(
role=self.revisor.agent_name,
content=revision_response,
)
logger.warning(
"Max iterations reached without meeting quality threshold"
)
except Exception as e:
logger.error(f"Error in generate_and_refine: {str(e)}")
current_content = {
"content": {"main_body": f"Error: {str(e)}"}
}
evaluation = self._get_fallback_evaluation()
if self.return_string:
return self.conversation.return_history_as_string()
else:
return {
"content": current_content.get("content", {}).get(
"main_body", ""
),
"final_score": evaluation["scores"]["overall"],
"iterations": self.max_iterations,
"metadata": {
"content_metadata": current_content.get(
"content", {}
).get("metadata", {}),
"evaluation": evaluation,
"error": "Max iterations reached",
},
}
def save_state(self) -> None:
"""Save the current state of all agents."""
for agent in [
self.main_supervisor,
self.generator,
*self.evaluators,
self.revisor,
]:
try:
agent.save_state()
except Exception as e:
logger.error(
f"Error saving state for {agent.agent_name}: {str(e)}"
)
def load_state(self) -> None:
"""Load the saved state of all agents."""
for agent in [
self.main_supervisor,
self.generator,
*self.evaluators,
self.revisor,
]:
try:
agent.load_state()
except Exception as e:
logger.error(
f"Error loading state for {agent.agent_name}: {str(e)}"
)
if __name__ == "__main__":
# Example usage
try:
talkhier = TalkHier(
max_iterations=1,
quality_threshold=0.8,
model_name="gpt-4o",
return_string=True,
)
task = "Write a comprehensive explanation of quantum computing for beginners"
result = talkhier.run(task)
print(result)
# print(f"Final content: {result['content']}")
# print(f"Quality score: {result['final_score']}")
# print(f"Iterations: {result['iterations']}")
except Exception as e:
logger.error(f"Error in main execution: {str(e)}")
Loading…
Cancel
Save