Merge branch 'master' into rag

pull/1067/head
harshalmore31 1 month ago committed by GitHub
commit c08d329fb8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -346,6 +346,7 @@ nav:
- Overview: "deployment_solutions/overview.md" - Overview: "deployment_solutions/overview.md"
- FastAPI + Uvicorn: "deployment_solutions/fastapi_agent_api.md" - FastAPI + Uvicorn: "deployment_solutions/fastapi_agent_api.md"
- CronJob: "swarms/structs/cron_job.md" - CronJob: "swarms/structs/cron_job.md"
- Agent on MCP: "examples/mcp_ds.md"
- Providers: - Providers:
- Deploy on Google Cloud Run: "swarms_cloud/cloud_run.md" - Deploy on Google Cloud Run: "swarms_cloud/cloud_run.md"
- Deploy on Phala: "swarms_cloud/phala_deploy.md" - Deploy on Phala: "swarms_cloud/phala_deploy.md"

@ -6,8 +6,6 @@ Inspired by X.AI's Grok 4 heavy implementation, HeavySwarm provides robust task
## Architecture ## Architecture
### System Design
The HeavySwarm follows a structured 5-phase workflow: The HeavySwarm follows a structured 5-phase workflow:
1. **Task Decomposition**: Complex tasks are broken down into specialized questions 1. **Task Decomposition**: Complex tasks are broken down into specialized questions
@ -18,11 +16,13 @@ The HeavySwarm follows a structured 5-phase workflow:
### Agent Specialization ### Agent Specialization
- **Research Agent**: Comprehensive information gathering and synthesis | Agent | Role/Function |
- **Analysis Agent**: Pattern recognition and statistical analysis |---------------------|--------------------------------------------------------------------|
- **Alternatives Agent**: Creative problem-solving and strategic options | **Research Agent** | Comprehensive information gathering and synthesis |
- **Verification Agent**: Validation, feasibility assessment, and quality assurance | **Analysis Agent** | Pattern recognition and statistical analysis |
- **Synthesis Agent**: Multi-perspective integration and executive reporting | **Alternatives Agent** | Creative problem-solving and strategic options |
| **Verification Agent** | Validation, feasibility assessment, and quality assurance |
| **Synthesis Agent** | Multi-perspective integration and executive reporting |
## Architecture Diagram ## Architecture Diagram
@ -81,21 +81,32 @@ pip install swarms
## Quick Start ## Quick Start
```python ```python
from swarms import HeavySwarm from swarms.structs.heavy_swarm import HeavySwarm
from swarms_tools import exa_search
# Initialize the swarm
swarm = HeavySwarm( swarm = HeavySwarm(
name="MarketAnalysisSwarm", name="Gold ETF Research Team",
description="Financial market analysis swarm", description="A team of agents that research the best gold ETFs",
question_agent_model_name="gpt-4o-mini", worker_model_name="claude-sonnet-4-20250514",
worker_model_name="gpt-4o-mini",
show_dashboard=True, show_dashboard=True,
verbose=True question_agent_model_name="gpt-4.1",
loops_per_agent=1,
agent_prints_on=False,
worker_tools=[exa_search],
random_loops_per_agent=True,
) )
# Execute analysis prompt = (
result = swarm.run("Analyze the current cryptocurrency market trends and investment opportunities") "Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, "
print(result) "full name, current price, expense ratio, assets under management, and "
"a brief explanation of why it is considered among the best. Present the information "
"in a clear, structured format suitable for investors. Scrape the data from the web. "
)
out = swarm.run(prompt)
print(out)
``` ```
## API Reference ## API Reference
@ -107,17 +118,38 @@ print(result)
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
|-----------|------|---------|-------------| |-----------|------|---------|-------------|
| `name` | `str` | `"HeavySwarm"` | Identifier name for the swarm instance | | `name` | `str` | `"HeavySwarm"` | Identifier name for the swarm instance |
| `description` | `str` | `"A swarm of agents..."` | Description of the swarm's purpose | | `description` | `str` | `"A swarm of agents that can analyze a task and generate specialized questions for each agent role"` | Description of the swarm's purpose and capabilities |
| `agents` | `List[Agent]` | `None` | Pre-configured agent list (unused - agents created internally) |
| `timeout` | `int` | `300` | Maximum execution time per agent in seconds | | `timeout` | `int` | `300` | Maximum execution time per agent in seconds |
| `aggregation_strategy` | `str` | `"synthesis"` | Strategy for result aggregation | | `aggregation_strategy` | `str` | `"synthesis"` | Strategy for result aggregation (currently only 'synthesis' is supported) |
| `loops_per_agent` | `int` | `1` | Number of execution loops per agent | | `loops_per_agent` | `int` | `1` | Number of execution loops each agent should perform |
| `question_agent_model_name` | `str` | `"gpt-4o-mini"` | Model for question generation | | `question_agent_model_name` | `str` | `"gpt-4o-mini"` | Language model for question generation |
| `worker_model_name` | `str` | `"gpt-4o-mini"` | Model for specialized worker agents | | `worker_model_name` | `str` | `"gpt-4o-mini"` | Language model for specialized worker agents |
| `verbose` | `bool` | `False` | Enable detailed logging output | | `verbose` | `bool` | `False` | Enable detailed logging and debug output |
| `max_workers` | `int` | `int(os.cpu_count() * 0.9)` | Maximum concurrent workers | | `max_workers` | `int` | `int(os.cpu_count() * 0.9)` | Maximum concurrent workers for parallel execution |
| `show_dashboard` | `bool` | `False` | Enable rich dashboard visualization | | `show_dashboard` | `bool` | `False` | Enable rich dashboard with progress visualization |
| `agent_prints_on` | `bool` | `False` | Enable individual agent output printing | | `agent_prints_on` | `bool` | `False` | Enable individual agent output printing |
| `output_type` | `str` | `"dict-all-except-first"` | Output formatting type for conversation history |
| `worker_tools` | `tool_type` | `None` | Tools available to worker agents for enhanced functionality |
| `random_loops_per_agent` | `bool` | `False` | Enable random number of loops per agent (1-10 range) |
**Constructor Example:**
```python
swarm = HeavySwarm(
name="AdvancedAnalysisSwarm",
description="Comprehensive multi-agent analysis system",
timeout=600,
loops_per_agent=2,
question_agent_model_name="gpt-4o",
worker_model_name="gpt-4o",
verbose=True,
max_workers=8,
show_dashboard=True,
agent_prints_on=True,
output_type="dict-all-except-first",
worker_tools=None,
random_loops_per_agent=False
)
```
#### Methods #### Methods
@ -127,9 +159,10 @@ Execute the complete HeavySwarm orchestration flow.
**Parameters:** **Parameters:**
- `task` (str): The main task to analyze and decompose | Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
- `img` (str, optional): Image input for visual analysis tasks | `task` | `str` | Yes | - | The main task to analyze and decompose |
| `img` | `str` | No | `None` | Image input for visual analysis tasks |
**Returns:** **Returns:**
- `str`: Comprehensive final analysis from synthesis agent - `str`: Comprehensive final analysis from synthesis agent
@ -137,186 +170,683 @@ Execute the complete HeavySwarm orchestration flow.
**Example:** **Example:**
```python ```python
result = swarm.run("Develop a go-to-market strategy for a new SaaS product") result = swarm.run("Develop a go-to-market strategy for a new SaaS product")
print(result)
``` ```
---
## Real-World Applications ##### `get_questions_only(task: str) -> Dict[str, str]`
### Financial Services Generate and extract only the specialized questions without metadata or execution.
**Parameters:**
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `task` | `str` | Yes | - | The main task or query to analyze and decompose into specialized questions |
**Returns:**
- `Dict[str, str]`: Clean dictionary containing only the questions:
- `research_question` (str): Question for comprehensive information gathering
- `analysis_question` (str): Question for pattern analysis and insights
- `alternatives_question` (str): Question for exploring creative solutions
- `verification_question` (str): Question for validation and feasibility
- `error` (str): Error message if question generation fails (on error only)
**Example:**
```python ```python
# Market Analysis questions = swarm.get_questions_only("Analyze market trends for EVs")
swarm = HeavySwarm( print(questions['research_question'])
name="FinanceSwarm", print(questions['analysis_question'])
worker_model_name="gpt-4o", ```
show_dashboard=True
) ---
result = swarm.run(""" ##### `get_questions_as_list(task: str) -> List[str]`
Analyze the impact of recent Federal Reserve policy changes on:
1. Bond markets and yield curves Generate specialized questions and return them as an ordered list.
2. Equity market valuations
3. Currency exchange rates **Parameters:**
4. Provide investment recommendations for institutional portfolios
""") | Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `task` | `str` | Yes | - | The main task or query to decompose into specialized questions |
**Returns:**
- `List[str]`: Ordered list of 4 specialized questions:
- `[0]` Research question for comprehensive information gathering
- `[1]` Analysis question for pattern analysis and insights
- `[2]` Alternatives question for exploring creative solutions
- `[3]` Verification question for validation and feasibility assessment
- Single-item list containing error message (on error)
**Example:**
```python
questions = swarm.get_questions_as_list("Optimize supply chain efficiency")
for i, question in enumerate(questions):
print(f"Agent {i+1}: {question}")
``` ```
### Use-cases ---
| Use Case | Description | ##### `show_swarm_info() -> None`
|---------------------------------------------|---------------------------------------------|
| Portfolio optimization and risk assessment | Optimize asset allocation and assess risks |
| Market trend analysis and forecasting | Analyze and predict market movements |
| Regulatory compliance evaluation | Evaluate adherence to financial regulations |
| Investment strategy development | Develop and refine investment strategies |
| Credit risk analysis and modeling | Analyze and model credit risk |
Display comprehensive swarm configuration information in a rich dashboard format.
------- **Parameters:**
- None
**Returns:**
- `None`: This method only displays output and has no return value.
**Example:**
```python
swarm.show_swarm_info() # Displays configuration dashboard
```
---
##### `reliability_check() -> None`
Perform comprehensive reliability and configuration validation checks.
**Parameters:**
- None
**Returns:**
- `None`: This method only performs validation and has no return value.
**Raises:**
- `ValueError`: If loops_per_agent is 0 or negative (agents won't execute)
- `ValueError`: If worker_model_name is None (agents can't be created)
- `ValueError`: If question_agent_model_name is None (questions can't be generated)
**Example:**
```python
try:
swarm.reliability_check() # Automatically called in __init__
print("All checks passed!")
except ValueError as e:
print(f"Configuration error: {e}")
```
---
##### `create_agents() -> Dict[str, Agent]`
Create and cache the 4 specialized agents with detailed role-specific prompts.
**Parameters:**
- None
**Returns:**
- `Dict[str, Agent]`: Dictionary containing all 5 specialized agents:
- `'research'`: Research Agent instance
- `'analysis'`: Analysis Agent instance
- `'alternatives'`: Alternatives Agent instance
- `'verification'`: Verification Agent instance
- `'synthesis'`: Synthesis Agent instance
**Example:**
```python
agents = swarm.create_agents()
research_agent = agents['research']
print(f"Research agent name: {research_agent.agent_name}")
```
## Comprehensive Full-Code Examples
### Healthcare & Life Sciences ### Example 1: Basic Gold ETF Research with Dashboard
```python ```python
# Clinical Research Analysis # heavy_swarm_example.py
"""
Complete example demonstrating HeavySwarm for financial research
with dashboard visualization and tool integration.
"""
from swarms.structs.heavy_swarm import HeavySwarm
from swarms_tools import exa_search
# Initialize HeavySwarm with comprehensive configuration
swarm = HeavySwarm( swarm = HeavySwarm(
name="HealthcareSwarm", name="Gold ETF Research Team",
worker_model_name="gpt-4o", description="A team of agents that research the best gold ETFs",
timeout=600, worker_model_name="claude-sonnet-4-20250514",
loops_per_agent=2 question_agent_model_name="gpt-4o-mini",
show_dashboard=True,
loops_per_agent=1,
agent_prints_on=False,
worker_tools=[exa_search],
random_loops_per_agent=False,
timeout=300,
max_workers=4,
verbose=True
) )
result = swarm.run(""" # Define the research prompt
Evaluate the potential of AI-driven personalized medicine: prompt = (
1. Current technological capabilities and limitations "Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, "
2. Regulatory landscape and approval pathways "full name, current price, expense ratio, assets under management, and "
3. Market opportunities and competitive analysis "a brief explanation of why it is considered among the best. Present the information "
4. Implementation strategies for healthcare systems "in a clear, structured format suitable for investors. Scrape the data from the web."
""") )
# Execute the analysis
print("🚀 Starting HeavySwarm analysis...")
result = swarm.run(prompt)
print("\n📊 Analysis Complete!")
print("=" * 50)
print(result)
``` ```
---- ### Example 2: Advanced Market Analysis with Multiple Loops
**Use Cases:** ```python
# advanced_market_analysis.py
"""
Advanced example with multiple agent loops and comprehensive market analysis.
"""
| Use Case | Description | from swarms.structs.heavy_swarm import HeavySwarm
|----------------------------------------|---------------------------------------------| from swarms_tools import exa_search, calculator, web_scraper
| Drug discovery and development analysis| Analyze and accelerate drug R&D processes |
| Clinical trial optimization | Improve design and efficiency of trials |
| Healthcare policy evaluation | Assess and inform healthcare policies |
| Medical device market analysis | Evaluate trends and opportunities in devices|
| Patient outcome prediction modeling | Predict and model patient health outcomes |
--- # Advanced configuration for in-depth analysis
swarm = HeavySwarm(
name="AdvancedMarketAnalysis",
description="Comprehensive market analysis with multiple iterations",
worker_model_name="gpt-4o",
question_agent_model_name="gpt-4o",
show_dashboard=True,
loops_per_agent=3, # Multiple iterations for deeper analysis
agent_prints_on=True,
worker_tools=[exa_search, calculator, web_scraper],
random_loops_per_agent=False,
timeout=600, # Longer timeout for complex analysis
max_workers=8,
verbose=True
)
# Complex multi-dimensional analysis prompt
complex_prompt = """
Conduct a comprehensive analysis of the renewable energy sector:
1. Market Size & Growth:
- Current global market size
- Projected growth rates (5-year and 10-year)
- Key growth drivers and barriers
2. Technology Landscape:
- Dominant technologies (solar, wind, hydro, etc.)
- Emerging technologies and innovations
- Technology adoption rates by region
### Technology & Innovation 3. Investment Opportunities:
- Most promising investment sectors
- Risk assessment and mitigation strategies
- Return on investment projections
4. Regulatory Environment:
- Current policy frameworks
- Upcoming regulatory changes
- Impact on industry development
5. Competitive Analysis:
- Key players and market share
- Competitive advantages and disadvantages
- Market consolidation trends
Provide detailed analysis with data sources, assumptions, and confidence levels.
"""
print("🔬 Starting advanced market analysis...")
result = swarm.run(complex_prompt)
print(result)
```
### Example 3: Healthcare Policy Analysis with Random Loops
```python ```python
# Tech Strategy Analysis # healthcare_policy_analysis.py
"""
Healthcare policy analysis with random agent loops for diverse perspectives.
"""
from swarms.structs.heavy_swarm import HeavySwarm
from swarms_tools import exa_search, data_analyzer, policy_tracker
# Healthcare-focused configuration with random loops
swarm = HeavySwarm( swarm = HeavySwarm(
name="TechSwarm", name="HealthcarePolicyAnalyzer",
description="Multi-perspective healthcare policy analysis",
worker_model_name="gpt-4o", worker_model_name="gpt-4o",
question_agent_model_name="gpt-4o-mini",
show_dashboard=True, show_dashboard=True,
loops_per_agent=1, # Base loops (will be randomized)
agent_prints_on=False,
worker_tools=[exa_search, data_analyzer, policy_tracker],
random_loops_per_agent=True, # Enable random loop variation
timeout=900,
max_workers=6,
verbose=True verbose=True
) )
result = swarm.run(""" # Healthcare policy analysis prompt
Assess the strategic implications of quantum computing adoption: healthcare_prompt = """
1. Technical readiness and hardware developments Analyze the impact of recent healthcare policy changes on:
2. Industry applications and use cases
3. Competitive landscape and key players 1. **Access to Care**:
4. Investment and implementation roadmap - Changes in healthcare accessibility
""") - Impact on underserved populations
- Telemedicine adoption and effectiveness
2. **Cost Implications**:
- Healthcare spending trends
- Insurance premium changes
- Out-of-pocket expense analysis
3. **Quality of Care**:
- Patient outcome metrics
- Provider satisfaction and retention
- Innovation in treatment protocols
4. **Healthcare Workforce**:
- Staffing shortages and surpluses
- Training and education requirements
- Compensation and benefits trends
5. **Technology Integration**:
- Electronic health records adoption
- AI and machine learning applications
- Cybersecurity and data privacy
Provide evidence-based analysis with statistical data and policy recommendations.
"""
print("🏥 Starting healthcare policy analysis...")
result = swarm.run(healthcare_prompt)
print(result)
``` ```
**Use Cases:** ### Example 4: Question Generation and Preview
| Use Case | Description | ```python
|------------------------------------|---------------------------------------------| # question_generation_example.py
| Technology roadmap development | Plan and prioritize technology initiatives | """
| Competitive intelligence gathering | Analyze competitors and market trends | Example demonstrating question generation capabilities without full execution.
| Innovation pipeline analysis | Evaluate and manage innovation projects | """
| Digital transformation strategy | Develop and implement digital strategies |
| Emerging technology assessment | Assess new and disruptive technologies |
### Manufacturing & Supply Chain from swarms.structs.heavy_swarm import HeavySwarm
# Configure swarm for question generation
swarm = HeavySwarm(
name="QuestionGenerator",
description="Specialized question generation for task analysis",
question_agent_model_name="gpt-4o",
worker_model_name="gpt-4o-mini",
show_dashboard=False, # No dashboard for clean output
verbose=False
)
# Sample task for analysis
task = "Develop a comprehensive digital transformation strategy for a traditional manufacturing company"
print("🔍 Generating specialized questions...")
print("=" * 60)
# Method 1: Get questions as dictionary
questions_dict = swarm.get_questions_only(task)
print("\n📋 Questions as Dictionary:")
for key, question in questions_dict.items():
if key != 'error':
print(f"\n{key.upper()}:")
print(f"{question}")
# Method 2: Get questions as ordered list
questions_list = swarm.get_questions_as_list(task)
print("\n\n📝 Questions as Ordered List:")
for i, question in enumerate(questions_list):
agent_names = ["Research Agent", "Analysis Agent", "Alternatives Agent", "Verification Agent"]
print(f"\n{i+1}. {agent_names[i]}:")
print(f" {question}")
print("\n" + "=" * 60)
print("💡 Use these questions to understand how HeavySwarm decomposes complex tasks!")
```
### Example 5: High-Performance Configuration for Enterprise Use
```python ```python
# Supply Chain Optimization # enterprise_configuration.py
"""
Enterprise-grade HeavySwarm configuration for high-performance analysis.
"""
from swarms.structs.heavy_swarm import HeavySwarm
from swarms_tools import exa_search, data_analyzer, financial_modeler, risk_assessor
# Enterprise configuration with maximum performance
swarm = HeavySwarm( swarm = HeavySwarm(
name="ManufacturingSwarm", name="EnterpriseAnalysisSwarm",
worker_model_name="gpt-4o", description="High-performance enterprise analysis system",
max_workers=8 worker_model_name="gpt-4o", # Highest quality model
question_agent_model_name="gpt-4o", # Consistent high quality
show_dashboard=True,
loops_per_agent=5, # Maximum depth analysis
agent_prints_on=False, # Clean output for enterprise use
worker_tools=[
exa_search,
data_analyzer,
financial_modeler,
risk_assessor
],
random_loops_per_agent=False, # Consistent analysis depth
timeout=1800, # 30 minutes for complex analysis
max_workers=16, # Maximum parallelization
verbose=True # Full logging for enterprise monitoring
) )
result = swarm.run(""" # Enterprise-level analysis prompt
Optimize global supply chain resilience: enterprise_prompt = """
1. Risk assessment and vulnerability analysis Conduct an enterprise-grade analysis for a Fortune 500 company's strategic planning:
2. Alternative sourcing strategies
3. Technology integration opportunities EXECUTIVE SUMMARY REQUIREMENTS:
4. Cost-benefit analysis of proposed changes - Strategic positioning analysis
""") - Competitive landscape assessment
- Market opportunity evaluation
- Risk mitigation strategies
FINANCIAL ANALYSIS:
- Revenue optimization scenarios
- Cost structure analysis
- Investment requirement assessment
- ROI projections and sensitivity analysis
OPERATIONAL ASSESSMENT:
- Process optimization opportunities
- Technology integration roadmap
- Supply chain resilience evaluation
- Human capital strategy development
MARKET INTELLIGENCE:
- Customer segmentation analysis
- Competitive intelligence gathering
- Market trend forecasting
- Disruption risk assessment
STRATEGIC RECOMMENDATIONS:
- Short-term tactical initiatives (0-12 months)
- Medium-term strategic projects (1-3 years)
- Long-term transformation roadmap (3-5 years)
- Success metrics and KPIs
Provide analysis with executive-level presentation, detailed financial modeling,
and implementation roadmaps with measurable outcomes.
"""
print("🏢 Starting enterprise-grade analysis...")
print("⚡ This configuration uses maximum resources for comprehensive analysis")
result = swarm.run(enterprise_prompt)
print(result)
``` ```
**Use Cases:** ### Example 6: Error Handling and Reliability Testing
```python
# error_handling_example.py
"""
Example demonstrating error handling and reliability features.
"""
from swarms.structs.heavy_swarm import HeavySwarm
print("🛡️ Testing HeavySwarm Reliability Features")
print("=" * 50)
# Test 1: Valid configuration
print("\n✅ Test 1: Valid Configuration")
try:
swarm = HeavySwarm(
name="ReliabilityTest",
description="Testing valid configuration",
worker_model_name="gpt-4o-mini",
question_agent_model_name="gpt-4o-mini",
show_dashboard=False
)
print("✓ Swarm initialized successfully")
except Exception as e:
print(f"✗ Unexpected error: {e}")
# Test 2: Invalid loops_per_agent
print("\n❌ Test 2: Invalid loops_per_agent (should fail)")
try:
swarm = HeavySwarm(
name="InvalidTest",
worker_model_name="gpt-4o-mini",
question_agent_model_name="gpt-4o-mini",
loops_per_agent=0, # Invalid: must be > 0
show_dashboard=False
)
except ValueError as e:
print(f"✓ Expected error caught: {e}")
# Test 3: Missing worker_model_name
print("\n❌ Test 3: Missing worker_model_name (should fail)")
try:
swarm = HeavySwarm(
name="MissingModelTest",
question_agent_model_name="gpt-4o-mini",
worker_model_name=None, # Invalid: cannot be None
show_dashboard=False
)
except ValueError as e:
print(f"✓ Expected error caught: {e}")
# Test 4: Successful question generation with error handling
print("\n🔍 Test 4: Question Generation with Error Handling")
swarm = HeavySwarm(
name="ErrorHandlingTest",
worker_model_name="gpt-4o-mini",
question_agent_model_name="gpt-4o-mini",
show_dashboard=False
)
test_task = "Analyze the future of artificial intelligence in healthcare"
| Use Case | Description | try:
|----------------------------------|---------------------------------------------| questions = swarm.get_questions_only(test_task)
| Supply chain risk management | Identify and mitigate supply chain risks | if 'error' in questions:
| Manufacturing process optimization | Improve efficiency and productivity | print(f"✗ Question generation failed: {questions['error']}")
| Quality control system design | Develop systems to ensure product quality | else:
| Sustainability impact assessment | Evaluate environmental and social impacts | print("✓ Question generation successful")
| Logistics network optimization | Enhance logistics and distribution networks | print(f"✓ Generated {len(questions)} specialized questions")
except Exception as e:
print(f"✗ Unexpected error during question generation: {e}")
## Advanced Configuration print("\n" + "=" * 50)
print("🛡️ Reliability testing complete!")
```
### Custom Agent Configuration ### Example 7: Custom Tools Integration
```python ```python
# High-performance configuration # custom_tools_example.py
"""
Example demonstrating custom tools integration with HeavySwarm.
"""
from swarms.structs.heavy_swarm import HeavySwarm
from swarms_tools import exa_search, calculator, web_scraper
# Custom tool functions (simplified examples)
def custom_data_analyzer(data):
"""Custom data analysis tool"""
return f"Analyzed data: {len(data)} records processed"
def custom_report_generator(topic):
"""Custom report generation tool"""
return f"Generated comprehensive report on: {topic}"
def custom_risk_assessor(scenario):
"""Custom risk assessment tool"""
return f"Risk assessment completed for: {scenario}"
# Create custom tool objects (simplified)
custom_tools = [
exa_search, # Built-in tool
calculator, # Built-in tool
web_scraper, # Built-in tool
# Note: In practice, you'd wrap custom functions in proper tool format
]
# Initialize swarm with custom tools
swarm = HeavySwarm( swarm = HeavySwarm(
name="HighPerformanceSwarm", name="CustomToolsSwarm",
question_agent_model_name="gpt-4o", description="HeavySwarm with custom tool integration",
worker_model_name="gpt-4o", worker_model_name="gpt-4o",
timeout=900, question_agent_model_name="gpt-4o-mini",
loops_per_agent=3,
max_workers=12,
show_dashboard=True, show_dashboard=True,
loops_per_agent=2,
agent_prints_on=True,
worker_tools=custom_tools,
random_loops_per_agent=False,
timeout=600,
max_workers=6,
verbose=True verbose=True
) )
```
# Analysis task that benefits from custom tools
analysis_prompt = """
Conduct a comprehensive competitive intelligence analysis:
## Troubleshooting 1. **Market Research**:
- Use web scraping to gather competitor data
- Analyze market positioning and strategies
- Identify competitive advantages and weaknesses
| Issue | Solution | 2. **Financial Analysis**:
|-------------------------|---------------------------------------------------------------| - Calculate market share percentages
| **Agent Timeout** | Increase timeout parameter or reduce task complexity | - Analyze revenue growth trends
| **Model Rate Limits** | Implement backoff strategies or use different models | - Assess profitability metrics
| **Memory Usage** | Monitor system resources with large-scale operations |
| **Dashboard Performance** | Disable dashboard for batch processing |
## Contributing 3. **Risk Assessment**:
- Evaluate competitive threats
- Analyze market entry barriers
- Assess disruption risks
HeavySwarm is part of the Swarms ecosystem. Contributions are welcome for: 4. **Strategic Recommendations**:
- Develop competitive response strategies
- Identify partnership opportunities
- Recommend market positioning adjustments
- New agent specializations Use available tools to gather and analyze real-time data for accurate insights.
"""
- Performance optimizations print("🔧 Starting analysis with custom tools integration...")
result = swarm.run(analysis_prompt)
print(result)
```
- Integration capabilities ### Example 8: Batch Processing Multiple Tasks
- Documentation improvements ```python
# batch_processing_example.py
"""
Example demonstrating batch processing of multiple tasks with HeavySwarm.
"""
from swarms.structs.heavy_swarm import HeavySwarm
from typing import List, Dict
import time
def batch_process_tasks(swarm: HeavySwarm, tasks: List[str]) -> Dict[str, str]:
"""
Process multiple tasks in batch with progress tracking.
Args:
swarm: Configured HeavySwarm instance
tasks: List of task descriptions to process
Returns:
Dictionary mapping task descriptions to results
"""
results = {}
print(f"🚀 Starting batch processing of {len(tasks)} tasks")
print("=" * 60)
for i, task in enumerate(tasks, 1):
print(f"\n📋 Task {i}/{len(tasks)}")
print(f"Query: {task[:100]}{'...' if len(task) > 100 else ''}")
print("-" * 40)
start_time = time.time()
try:
result = swarm.run(task)
processing_time = time.time() - start_time
results[task] = result
print(".2f"
except Exception as e:
results[task] = f"Error: {str(e)}"
print(f"❌ Failed after {time.time() - start_time:.2f}s: {e}")
return results
# Configure swarm for batch processing (minimal output for efficiency)
swarm = HeavySwarm(
name="BatchProcessor",
description="Efficient batch processing configuration",
worker_model_name="gpt-4o-mini", # Faster model for batch processing
question_agent_model_name="gpt-4o-mini",
show_dashboard=False, # Disable dashboard for batch processing
loops_per_agent=1, # Single loop for efficiency
agent_prints_on=False, # Clean output
random_loops_per_agent=False,
timeout=300,
max_workers=4,
verbose=False # Minimal logging
)
# Define batch tasks
batch_tasks = [
"Analyze the impact of remote work on commercial real estate",
"Evaluate the potential of electric vehicles in the taxi industry",
"Assess the cybersecurity risks of IoT devices in smart homes",
"Explore the applications of blockchain in supply chain management",
"Investigate the effects of social media on consumer behavior",
"Review the potential of quantum computing in drug discovery",
"Analyze the economic impact of climate change policies",
"Evaluate the future of autonomous delivery systems"
]
# Process batch
batch_results = batch_process_tasks(swarm, batch_tasks)
# Summary report
print("\n" + "=" * 60)
print("📊 BATCH PROCESSING SUMMARY")
print("=" * 60)
print(f"Total tasks processed: {len(batch_results)}")
successful_tasks = sum(1 for result in batch_results.values() if not result.startswith("Error:"))
print(f"Successful tasks: {successful_tasks}")
print(f"Failed tasks: {len(batch_results) - successful_tasks}")
# Display results summary
print("\n📝 RESULTS SUMMARY:")
for i, (task, result) in enumerate(batch_results.items(), 1):
status = "✅" if not result.startswith("Error:") else "❌"
print(f"{i}. {status} {task[:50]}{'...' if len(task) > 50 else ''}")
print("\n💡 Batch processing complete! Individual results available in batch_results dictionary.")
```
## Acknowledgments ## Acknowledgments
- Inspired by X.AI's Grok heavy implementation - Inspired by X.AI's Grok heavy implementation
- Built on the Swarms framework - Built on the Swarms framework
- Utilizes Rich for dashboard visualization - Utilizes Rich for dashboard visualization
- Powered by advanced language models - Powered by advanced language models

@ -9,6 +9,7 @@ agent = Agent(
max_loops=1, max_loops=1,
dynamic_context_window=True, dynamic_context_window=True,
streaming_on=False, streaming_on=False,
print_on=False,
) )
out = agent.run( out = agent.run(

@ -3,11 +3,14 @@ from swarms import Agent
mcp = FastMCP("MCPAgentTool") mcp = FastMCP("MCPAgentTool")
@mcp.tool( @mcp.tool(
name="create_agent", name="create_agent",
description="Create an agent with the specified name, system prompt, and model, then run a task.", description="Create an agent with the specified name, system prompt, and model, then run a task.",
) )
def create_agent(agent_name: str, system_prompt: str, model_name: str, task: str) -> str: def create_agent(
agent_name: str, system_prompt: str, model_name: str, task: str
) -> str:
""" """
Create an agent with the given parameters and execute the specified task. Create an agent with the given parameters and execute the specified task.
@ -27,5 +30,6 @@ def create_agent(agent_name: str, system_prompt: str, model_name: str, task: str
) )
return agent.run(task) return agent.run(task)
if __name__ == "__main__": if __name__ == "__main__":
mcp.run(transport="streamable-http") mcp.run(transport="streamable-http")

@ -5,7 +5,10 @@ from mcp.client.streamable_http import (
streamablehttp_client as http_client, streamablehttp_client as http_client,
) )
async def create_agent_via_mcp(session, agent_name, system_prompt, model_name, task):
async def create_agent_via_mcp(
session, agent_name, system_prompt, model_name, task
):
"""Create and use an agent through MCP using streamable HTTP.""" """Create and use an agent through MCP using streamable HTTP."""
print(f"🔧 Creating agent '{agent_name}' with task: {task}") print(f"🔧 Creating agent '{agent_name}' with task: {task}")
try: try:
@ -13,18 +16,17 @@ async def create_agent_via_mcp(session, agent_name, system_prompt, model_name, t
"agent_name": agent_name, "agent_name": agent_name,
"system_prompt": system_prompt, "system_prompt": system_prompt,
"model_name": model_name, "model_name": model_name,
"task": task "task": task,
} }
result = await session.call_tool( result = await session.call_tool(
name="create_agent", name="create_agent", arguments=arguments
arguments=arguments
) )
# Result Handling # Result Handling
output = None output = None
if hasattr(result, 'content') and result.content: if hasattr(result, "content") and result.content:
if isinstance(result.content, list): if isinstance(result.content, list):
for content_item in result.content: for content_item in result.content:
if hasattr(content_item, 'text'): if hasattr(content_item, "text"):
print(content_item.text) print(content_item.text)
output = content_item.text output = content_item.text
else: else:
@ -39,14 +41,20 @@ async def create_agent_via_mcp(session, agent_name, system_prompt, model_name, t
except Exception as e: except Exception as e:
print(f"Tool call failed: {e}") print(f"Tool call failed: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise raise
async def main(): async def main():
print("🔧 Starting MCP client connection...") print("🔧 Starting MCP client connection...")
try: try:
async with http_client("http://localhost:8000/mcp") as (read, write, _): async with http_client("http://localhost:8000/mcp") as (
read,
write,
_,
):
async with ClientSession(read, write) as session: async with ClientSession(read, write) as session:
try: try:
await session.initialize() await session.initialize()
@ -59,7 +67,9 @@ async def main():
print("Listing available tools...") print("Listing available tools...")
try: try:
tools = await session.list_tools() tools = await session.list_tools()
print(f"📋 Available tools: {[tool.name for tool in tools.tools]}") print(
f"📋 Available tools: {[tool.name for tool in tools.tools]}"
)
except Exception as e: except Exception as e:
print(f"Failed to list tools: {e}") print(f"Failed to list tools: {e}")
raise raise
@ -69,14 +79,16 @@ async def main():
agent1_name = "tech_expert" agent1_name = "tech_expert"
agent1_prompt = "You are a technology expert. Provide clear explanations." agent1_prompt = "You are a technology expert. Provide clear explanations."
agent1_model = "gpt-4" agent1_model = "gpt-4"
agent1_task = "Explain blockchain technology in simple terms" agent1_task = (
"Explain blockchain technology in simple terms"
)
agent1_output = await create_agent_via_mcp( agent1_output = await create_agent_via_mcp(
session, session,
agent1_name, agent1_name,
agent1_prompt, agent1_prompt,
agent1_model, agent1_model,
agent1_task agent1_task,
) )
# Agent 2: Legal Expert analyzes the explanation from Agent 1 # Agent 2: Legal Expert analyzes the explanation from Agent 1
@ -90,7 +102,7 @@ async def main():
agent2_name, agent2_name,
agent2_prompt, agent2_prompt,
agent2_model, agent2_model,
agent2_task agent2_task,
) )
# Agent 3: Educator simplifies the legal analysis for students # Agent 3: Educator simplifies the legal analysis for students
@ -104,7 +116,7 @@ async def main():
agent3_name, agent3_name,
agent3_prompt, agent3_prompt,
agent3_model, agent3_model,
agent3_task agent3_task,
) )
print("\n=== Final Output from Educator Agent ===") print("\n=== Final Output from Educator Agent ===")
@ -113,9 +125,11 @@ async def main():
except Exception as e: except Exception as e:
print(f"Connection failed: {e}") print(f"Connection failed: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise raise
# Run the client # Run the client
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

@ -13,7 +13,11 @@ async def create_agent_via_mcp():
# Connect to the MCP server using streamable HTTP # Connect to the MCP server using streamable HTTP
try: try:
async with http_client("http://localhost:8000/mcp") as (read, write, _): async with http_client("http://localhost:8000/mcp") as (
read,
write,
_,
):
async with ClientSession(read, write) as session: async with ClientSession(read, write) as session:
try: try:
@ -27,7 +31,9 @@ async def create_agent_via_mcp():
print("Listing available tools...") print("Listing available tools...")
try: try:
tools = await session.list_tools() tools = await session.list_tools()
print(f"📋 Available tools: {[tool.name for tool in tools.tools]}") print(
f"📋 Available tools: {[tool.name for tool in tools.tools]}"
)
except Exception as e: except Exception as e:
print(f"Failed to list tools: {e}") print(f"Failed to list tools: {e}")
@ -40,19 +46,18 @@ async def create_agent_via_mcp():
"agent_name": "tech_expert", "agent_name": "tech_expert",
"system_prompt": "You are a technology expert. Provide clear explanations.", "system_prompt": "You are a technology expert. Provide clear explanations.",
"model_name": "gpt-4", "model_name": "gpt-4",
"task": "Explain blockchain technology in simple terms" "task": "Explain blockchain technology in simple terms",
} }
result = await session.call_tool( result = await session.call_tool(
name="create_agent", name="create_agent", arguments=arguments
arguments=arguments
) )
# Result Handling # Result Handling
if hasattr(result, 'content') and result.content: if hasattr(result, "content") and result.content:
if isinstance(result.content, list): if isinstance(result.content, list):
for content_item in result.content: for content_item in result.content:
if hasattr(content_item, 'text'): if hasattr(content_item, "text"):
print(content_item.text) print(content_item.text)
else: else:
print(content_item) print(content_item)
@ -66,15 +71,18 @@ async def create_agent_via_mcp():
except Exception as e: except Exception as e:
print(f"Tool call failed: {e}") print(f"Tool call failed: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise raise
except Exception as e: except Exception as e:
print(f"Connection failed: {e}") print(f"Connection failed: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
raise raise
# Run the client # Run the client
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(create_agent_via_mcp()) asyncio.run(create_agent_via_mcp())

@ -117,4 +117,4 @@ def get_okx_crypto_volume(symbol: str) -> str:
if __name__ == "__main__": if __name__ == "__main__":
# Run the server on port 8000 (you can change this to any available port) # Run the server on port 8000 (you can change this to any available port)
mcp.run(transport="sse") mcp.run(transport="streamable-http")

@ -1,32 +0,0 @@
from swarms import HeavySwarm
def main():
"""
Run a HeavySwarm query to find the best 3 gold ETFs.
This function initializes a HeavySwarm instance and queries it to provide
the top 3 gold exchange-traded funds (ETFs), requesting clear, structured results.
"""
swarm = HeavySwarm(
name="Gold ETF Research Team",
description="A team of agents that research the best gold ETFs",
worker_model_name="claude-sonnet-4-latest",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
prompt = (
"Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, "
"full name, current price, expense ratio, assets under management, and "
"a brief explanation of why it is considered among the best. Present the information "
"in a clear, structured format suitable for investors."
)
out = swarm.run(prompt)
print(out)
if __name__ == "__main__":
main()

@ -0,0 +1,27 @@
# task -> research -> synethsis or debate or voting -> output
from swarms.structs.heavy_swarm import HeavySwarm
from swarms_tools import exa_search
swarm = HeavySwarm(
name="Gold ETF Research Team",
description="A team of agents that research the best gold ETFs",
worker_model_name="claude-sonnet-4-20250514",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
agent_prints_on=False,
worker_tools=[exa_search],
random_loops_per_agent=True,
)
prompt = (
"Find the best 3 gold ETFs. For each ETF, provide the ticker symbol, "
"full name, current price, expense ratio, assets under management, and "
"a brief explanation of why it is considered among the best. Present the information "
"in a clear, structured format suitable for investors. Scrape the data from the web. "
)
out = swarm.run(prompt)
print(out)

@ -1,6 +1,7 @@
import concurrent.futures import concurrent.futures
import json import json
import os import os
import random
import time import time
import traceback import traceback
from functools import lru_cache from functools import lru_cache
@ -24,6 +25,7 @@ from swarms.utils.history_output_formatter import (
history_output_formatter, history_output_formatter,
) )
from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.litellm_wrapper import LiteLLM
from swarms.tools.tool_type import tool_type
RESEARCH_AGENT_PROMPT = """ RESEARCH_AGENT_PROMPT = """
You are an expert Research Agent with exceptional capabilities in: You are an expert Research Agent with exceptional capabilities in:
@ -471,7 +473,6 @@ class HeavySwarm:
self, self,
name: str = "HeavySwarm", name: str = "HeavySwarm",
description: str = "A swarm of agents that can analyze a task and generate specialized questions for each agent role", description: str = "A swarm of agents that can analyze a task and generate specialized questions for each agent role",
agents: List[Agent] = None,
timeout: int = 300, timeout: int = 300,
aggregation_strategy: str = "synthesis", aggregation_strategy: str = "synthesis",
loops_per_agent: int = 1, loops_per_agent: int = 1,
@ -482,6 +483,8 @@ class HeavySwarm:
show_dashboard: bool = False, show_dashboard: bool = False,
agent_prints_on: bool = False, agent_prints_on: bool = False,
output_type: str = "dict-all-except-first", output_type: str = "dict-all-except-first",
worker_tools: tool_type = None,
random_loops_per_agent: bool = False,
): ):
""" """
Initialize the HeavySwarm with configuration parameters. Initialize the HeavySwarm with configuration parameters.
@ -519,7 +522,6 @@ class HeavySwarm:
""" """
self.name = name self.name = name
self.description = description self.description = description
self.agents = agents
self.timeout = timeout self.timeout = timeout
self.aggregation_strategy = aggregation_strategy self.aggregation_strategy = aggregation_strategy
self.loops_per_agent = loops_per_agent self.loops_per_agent = loops_per_agent
@ -530,15 +532,31 @@ class HeavySwarm:
self.show_dashboard = show_dashboard self.show_dashboard = show_dashboard
self.agent_prints_on = agent_prints_on self.agent_prints_on = agent_prints_on
self.output_type = output_type self.output_type = output_type
self.worker_tools = worker_tools
self.random_loops_per_agent = random_loops_per_agent
self.conversation = Conversation() self.conversation = Conversation()
self.console = Console() self.console = Console()
self.agents = self.create_agents()
if self.show_dashboard: if self.show_dashboard:
self.show_swarm_info() self.show_swarm_info()
self.reliability_check() self.reliability_check()
def handle_worker_agent_loops(self) -> int:
"""
Handle the loops per agent for the worker agents.
"""
loops = None
if self.random_loops_per_agent:
loops = random.randint(1, 10)
else:
loops = self.loops_per_agent
return loops
def show_swarm_info(self): def show_swarm_info(self):
""" """
Display comprehensive swarm configuration information in a rich dashboard format. Display comprehensive swarm configuration information in a rich dashboard format.
@ -831,10 +849,8 @@ class HeavySwarm:
) )
) )
agents = self.create_agents()
agent_results = self._execute_agents_parallel( agent_results = self._execute_agents_parallel(
questions=questions, agents=agents, img=img questions=questions, agents=self.agents, img=img
) )
# Synthesis with dashboard # Synthesis with dashboard
@ -962,17 +978,23 @@ class HeavySwarm:
if self.verbose: if self.verbose:
logger.info("🏗️ Creating specialized agents...") logger.info("🏗️ Creating specialized agents...")
tools = None
if self.worker_tools is not None:
tools = self.worker_tools
# Research Agent - Deep information gathering and data collection # Research Agent - Deep information gathering and data collection
research_agent = Agent( research_agent = Agent(
agent_name="Research-Agent", agent_name="Research-Agent",
agent_description="Expert research agent specializing in comprehensive information gathering and data collection", agent_description="Expert research agent specializing in comprehensive information gathering and data collection",
system_prompt=RESEARCH_AGENT_PROMPT, system_prompt=RESEARCH_AGENT_PROMPT,
max_loops=self.loops_per_agent, max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name, model_name=self.worker_model_name,
streaming_on=False, streaming_on=False,
verbose=False, verbose=False,
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
print_on=self.agent_prints_on, print_on=self.agent_prints_on,
tools=tools,
) )
# Analysis Agent - Pattern recognition and deep analytical insights # Analysis Agent - Pattern recognition and deep analytical insights
@ -980,12 +1002,13 @@ class HeavySwarm:
agent_name="Analysis-Agent", agent_name="Analysis-Agent",
agent_description="Expert analytical agent specializing in pattern recognition, data analysis, and insight generation", agent_description="Expert analytical agent specializing in pattern recognition, data analysis, and insight generation",
system_prompt=ANALYSIS_AGENT_PROMPT, system_prompt=ANALYSIS_AGENT_PROMPT,
max_loops=self.loops_per_agent, max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name, model_name=self.worker_model_name,
streaming_on=False, streaming_on=False,
verbose=False, verbose=False,
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
print_on=self.agent_prints_on, print_on=self.agent_prints_on,
tools=tools,
) )
# Alternatives Agent - Strategic options and creative solutions # Alternatives Agent - Strategic options and creative solutions
@ -993,12 +1016,13 @@ class HeavySwarm:
agent_name="Alternatives-Agent", agent_name="Alternatives-Agent",
agent_description="Expert strategic agent specializing in alternative approaches, creative solutions, and option generation", agent_description="Expert strategic agent specializing in alternative approaches, creative solutions, and option generation",
system_prompt=ALTERNATIVES_AGENT_PROMPT, system_prompt=ALTERNATIVES_AGENT_PROMPT,
max_loops=self.loops_per_agent, max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name, model_name=self.worker_model_name,
streaming_on=False, streaming_on=False,
verbose=False, verbose=False,
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
print_on=self.agent_prints_on, print_on=self.agent_prints_on,
tools=tools,
) )
# Verification Agent - Validation, feasibility assessment, and quality assurance # Verification Agent - Validation, feasibility assessment, and quality assurance
@ -1006,12 +1030,13 @@ class HeavySwarm:
agent_name="Verification-Agent", agent_name="Verification-Agent",
agent_description="Expert verification agent specializing in validation, feasibility assessment, and quality assurance", agent_description="Expert verification agent specializing in validation, feasibility assessment, and quality assurance",
system_prompt=VERIFICATION_AGENT_PROMPT, system_prompt=VERIFICATION_AGENT_PROMPT,
max_loops=self.loops_per_agent, max_loops=self.handle_worker_agent_loops(),
model_name=self.worker_model_name, model_name=self.worker_model_name,
streaming_on=False, streaming_on=False,
verbose=False, verbose=False,
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
print_on=self.agent_prints_on, print_on=self.agent_prints_on,
tools=tools,
) )
# Synthesis Agent - Integration and comprehensive analysis # Synthesis Agent - Integration and comprehensive analysis
@ -1024,6 +1049,8 @@ class HeavySwarm:
streaming_on=False, streaming_on=False,
verbose=False, verbose=False,
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
print_on=self.agent_prints_on,
tools=tools,
) )
agents = { agents = {

@ -11,7 +11,6 @@ from typing import Any, Dict, List, Literal, Optional, Union
from litellm.types.utils import ChatCompletionMessageToolCall from litellm.types.utils import ChatCompletionMessageToolCall
from loguru import logger from loguru import logger
from mcp import ClientSession from mcp import ClientSession
from mcp.client.sse import sse_client
try: try:
from mcp.client.streamable_http import streamablehttp_client from mcp.client.streamable_http import streamablehttp_client
@ -315,7 +314,7 @@ def get_mcp_client(transport, url, headers=None, timeout=5, **kwargs):
url, headers=headers, timeout=timeout, **kwargs url, headers=headers, timeout=timeout, **kwargs
) )
else: else:
return sse_client( return streamablehttp_client(
url, headers=headers, timeout=timeout, **kwargs url, headers=headers, timeout=timeout, **kwargs
) )

@ -0,0 +1,7 @@
from typing import Dict, List, Callable, Any, Type, Union
ToolType = Union[
Dict[str, Any], List[Callable[..., Any]], Callable[..., Any], Any
]
tool_type = Type[ToolType]

@ -234,7 +234,7 @@ class LiteLLM:
reasoning_effort: str = None, reasoning_effort: str = None,
drop_params: bool = True, drop_params: bool = True,
thinking_tokens: int = None, thinking_tokens: int = None,
reasoning_enabled: bool = True, reasoning_enabled: bool = False,
*args, *args,
**kwargs, **kwargs,
): ):
@ -291,6 +291,7 @@ class LiteLLM:
self.reasoning_effort = reasoning_effort self.reasoning_effort = reasoning_effort
self.thinking_tokens = thinking_tokens self.thinking_tokens = thinking_tokens
self.reasoning_enabled = reasoning_enabled self.reasoning_enabled = reasoning_enabled
self.verbose = verbose
self.modalities = [] self.modalities = []
self.messages = [] # Initialize messages list self.messages = [] # Initialize messages list
@ -315,7 +316,8 @@ class LiteLLM:
self.init_args = args self.init_args = args
self.init_kwargs = kwargs self.init_kwargs = kwargs
self.reasoning_check() # if self.reasoning_enabled is True:
# self.reasoning_check()
def reasoning_check(self): def reasoning_check(self):
""" """
@ -324,11 +326,19 @@ class LiteLLM:
If reasoning is enabled and the model supports reasoning, set temperature to 1 for optimal reasoning. If reasoning is enabled and the model supports reasoning, set temperature to 1 for optimal reasoning.
Also logs information or warnings based on the model's reasoning support and configuration. Also logs information or warnings based on the model's reasoning support and configuration.
""" """
if ( """
self.reasoning_enabled is True Check if reasoning is enabled and supported by the model, and adjust temperature, thinking_tokens, and top_p accordingly.
and litellm.supports_reasoning(model=self.model_name)
is True This single-line version combines all previous checks and actions for reasoning-enabled models, including Anthropic-specific logic.
): """
if self.reasoning_enabled:
supports_reasoning = litellm.supports_reasoning(
model=self.model_name
)
uses_anthropic = self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
if supports_reasoning:
logger.info( logger.info(
f"Model {self.model_name} supports reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will be set to 1 for better reasoning as some models may not work with low temperature." f"Model {self.model_name} supports reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will be set to 1 for better reasoning as some models may not work with low temperature."
) )
@ -337,36 +347,15 @@ class LiteLLM:
logger.warning( logger.warning(
f"Model {self.model_name} does not support reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will not be set to 1." f"Model {self.model_name} does not support reasoning and reasoning enabled is set to {self.reasoning_enabled}. Temperature will not be set to 1."
) )
if (
self.reasoning_enabled is True
and litellm.supports_reasoning(model=self.model_name)
is False
):
logger.warning( logger.warning(
f"Model {self.model_name} may or may not support reasoning and reasoning enabled is set to {self.reasoning_enabled}" f"Model {self.model_name} may or may not support reasoning and reasoning enabled is set to {self.reasoning_enabled}"
) )
if uses_anthropic:
if (
self.reasoning_enabled is True
and self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
is True
):
if self.thinking_tokens is None: if self.thinking_tokens is None:
logger.info( logger.info(
f"Model {self.model_name} is an Anthropic model and reasoning enabled is set to {self.reasoning_enabled}. Thinking tokens is mandatory for Anthropic models." f"Model {self.model_name} is an Anthropic model and reasoning enabled is set to {self.reasoning_enabled}. Thinking tokens is mandatory for Anthropic models."
) )
self.thinking_tokens = self.max_tokens / 4 self.thinking_tokens = self.max_tokens / 4
if (
self.reasoning_enabled is True
and self.check_if_model_name_uses_anthropic(
model_name=self.model_name
)
is True
):
logger.info( logger.info(
"top_p must be greater than 0.95 for Anthropic models with reasoning enabled" "top_p must be greater than 0.95 for Anthropic models with reasoning enabled"
) )
@ -884,10 +873,6 @@ class LiteLLM:
completion_params["reasoning_effort"] = ( completion_params["reasoning_effort"] = (
self.reasoning_effort self.reasoning_effort
) )
else:
logger.warning(
f"Model {self.model_name} may or may not support reasoning"
)
if ( if (
self.reasoning_enabled is True self.reasoning_enabled is True

Loading…
Cancel
Save