Merge branch 'master' into improve/conversation_integration

pull/866/head
harshalmore31 3 weeks ago committed by GitHub
commit 2103d6a256
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

1
.gitignore vendored

@ -14,6 +14,7 @@ Cargo.lock
.pytest_cache
static/generated
conversations/
next_swarms_update.txt
runs
Financial-Analysis-Agent_state.json
conversations/

@ -0,0 +1,449 @@
import json
import requests
from swarms import Agent
def create_python_file(code: str, filename: str) -> str:
"""Create a Python file with the given code and execute it using Python 3.12.
This function takes a string containing Python code, writes it to a file, and executes it
using Python 3.12 via subprocess. The file will be created in the current working directory.
If a file with the same name already exists, it will be overwritten.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to create and execute.
Returns:
str: A detailed message indicating the file was created and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Hello, World!')"
>>> result = create_python_file(code, "test.py")
>>> print(result)
'Python file created successfully. Execution result: Hello, World!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get file size and permissions
file_stats = os.stat(filename)
file_size = file_stats.st_size
file_permissions = oct(file_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True,
)
# Create detailed response
response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def update_python_file(code: str, filename: str) -> str:
"""Update an existing Python file with new code and execute it using Python 3.12.
This function takes a string containing Python code and updates an existing Python file.
If the file doesn't exist, it will be created. The file will be executed using Python 3.12.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to update and execute.
Returns:
str: A detailed message indicating the file was updated and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Updated code!')"
>>> result = update_python_file(code, "my_script.py")
>>> print(result)
'Python file updated successfully. Execution result: Updated code!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Check if file exists and get its stats
file_exists = os.path.exists(filename)
if file_exists:
old_stats = os.stat(filename)
old_size = old_stats.st_size
old_permissions = oct(old_stats.st_mode)[-3:]
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get new file stats
new_stats = os.stat(filename)
new_size = new_stats.st_size
new_permissions = oct(new_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True,
)
# Create detailed response
response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def run_quant_trading_agent(task: str) -> str:
"""Run a quantitative trading agent to analyze and execute trading strategies.
This function initializes and runs a specialized quantitative trading agent that can:
- Develop and backtest trading strategies
- Analyze market data for alpha opportunities
- Implement risk management frameworks
- Optimize portfolio allocations
- Conduct quantitative research
- Monitor market microstructure
- Evaluate trading system performance
Args:
task (str): The specific trading task or analysis to perform
Returns:
str: The agent's response or analysis results
Example:
>>> result = run_quant_trading_agent("Analyze SPY ETF for mean reversion opportunities")
>>> print(result)
"""
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=2,
model_name="claude-3-5-sonnet-20240620",
tools=[
create_python_file,
update_python_file,
backtest_summary,
],
)
out = agent.run(task)
return out
def backtest_summary(report: str) -> str:
"""Generate a summary of a backtest report, but only if the backtest was profitable.
This function should only be used when the backtest results show a positive return.
Using this function for unprofitable backtests may lead to misleading conclusions.
Args:
report (str): The backtest report containing performance metrics
Returns:
str: A formatted summary of the backtest report
Example:
>>> result = backtest_summary("Total Return: +15.2%, Sharpe: 1.8")
>>> print(result)
'The backtest report is: Total Return: +15.2%, Sharpe: 1.8'
"""
return f"The backtest report is: {report}"
def get_coin_price(coin_id: str, vs_currency: str) -> str:
"""
Get the current price of a specific cryptocurrency.
Args:
coin_id (str): The CoinGecko ID of the cryptocurrency (e.g., 'bitcoin', 'ethereum')
vs_currency (str, optional): The target currency. Defaults to "usd".
Returns:
str: JSON formatted string containing the coin's current price and market data
Raises:
requests.RequestException: If the API request fails
Example:
>>> result = get_coin_price("bitcoin")
>>> print(result)
{"bitcoin": {"usd": 45000, "usd_market_cap": 850000000000, ...}}
"""
try:
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": coin_id,
"vs_currencies": vs_currency,
"include_market_cap": True,
"include_24hr_vol": True,
"include_24hr_change": True,
"include_last_updated_at": True,
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return json.dumps(data, indent=2)
except requests.RequestException as e:
return json.dumps(
{
"error": f"Failed to fetch price for {coin_id}: {str(e)}"
}
)
except Exception as e:
return json.dumps({"error": f"Unexpected error: {str(e)}"})
def run_crypto_quant_agent(task: str) -> str:
"""
Run a crypto quantitative trading agent with specialized tools for cryptocurrency market analysis.
This function initializes and runs a quantitative trading agent specifically designed for
cryptocurrency markets. The agent is equipped with tools for price fetching and can perform
various quantitative analyses including algorithmic trading strategy development, risk management,
and market microstructure analysis.
Args:
task (str): The task or query to be processed by the crypto quant agent.
Returns:
str: The agent's response to the given task.
Example:
>>> response = run_crypto_quant_agent("Analyze the current market conditions for Bitcoin")
>>> print(response)
"Based on current market analysis..."
"""
# Initialize the agent with expanded tools
quant_agent = Agent(
agent_name="Crypto-Quant-Agent",
agent_description="Advanced quantitative trading agent specializing in cryptocurrency markets with algorithmic analysis capabilities",
system_prompt="""You are an expert quantitative trading agent specializing in cryptocurrency markets. Your capabilities include:
- Algorithmic trading strategy development and backtesting
- Statistical arbitrage and market making for crypto assets
- Risk management and portfolio optimization for digital assets
- High-frequency trading system design for crypto markets
- Market microstructure analysis of crypto exchanges
- Quantitative research methodologies for crypto assets
- Financial mathematics and stochastic processes
- Machine learning applications in crypto trading
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
max_tokens=4096,
model_name="gpt-4.1-mini",
dynamic_temperature_enabled=True,
output_type="final",
tools=[
get_coin_price,
],
)
return quant_agent.run(task)
# Initialize the agent
agent = Agent(
agent_name="Director-Agent",
agent_description="Strategic director and project management agent",
system_prompt="""You are an expert Director Agent with comprehensive capabilities in:
- Strategic planning and decision making
- Project management and coordination
- Resource allocation and optimization
- Team leadership and delegation
- Risk assessment and mitigation
- Stakeholder management
- Process optimization
- Quality assurance
Your core responsibilities include:
1. Developing and executing strategic initiatives
2. Coordinating cross-functional projects
3. Managing resource allocation
4. Setting and tracking KPIs
5. Ensuring project deliverables
6. Risk management and mitigation
7. Stakeholder communication
You maintain strict adherence to:
- Best practices in project management
- Data-driven decision making
- Clear communication protocols
- Quality standards
- Timeline management
- Budget constraints
- Regulatory compliance
You communicate with clarity and authority while maintaining professionalism and ensuring all stakeholders are aligned.""",
max_loops=1,
model_name="gpt-4o-mini",
output_type="final",
interactive=False,
tools=[run_quant_trading_agent],
)
out = agent.run(
"""
Please call the quantitative trading agent to generate Python code for an Bitcoin backtest using the CoinGecko API.
Provide a comprehensive description of the backtest methodology and trading strategy.
Consider the API limitations of CoinGecko and utilize only free, open-source libraries that don't require API keys. Use the requests library to fetch the data. Create a specialized strategy for the backtest focused on the orderbook and other data for price action.
The goal is to create a backtest that can predict the price action of the coin based on the orderbook and other data.
Maximize the profit of the backtest. Please use the OKX price API for the orderbook and other data. Be very explicit in your implementation.
Be very precise with the instructions you give to the agent and tell it to a 400 lines of good code.
"""
)
print(out)

@ -0,0 +1,58 @@
# Swarms Cookbook Examples Index
This index provides a categorized list of examples and tutorials for using the Swarms Framework across different industries. Each example demonstrates practical applications and implementations using the framework.
## Finance & Trading
| Name | Description | Link |
|------|-------------|------|
| Tickr-Agent | Financial analysis agent for stock market data using multithreaded processing and AI integration | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/finance/multi_agent/Swarms_Cookbook_Tickr_Agent.ipynb) |
| CryptoAgent | Real-time cryptocurrency data analysis and insights using CoinGecko integration | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/finance/multi_agent/Swarms_Cookbook_CryptoAgent.ipynb) |
| 10-K Analysis (Custom) | Detailed analysis of SEC 10-K reports using specialized agents | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/finance/multi_agent/swarms_finance_10k_analysis_custom.ipynb) |
| 10-K Analysis (AgentRearrange) | Mixed sequential and parallel analysis of 10-K reports | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/finance/multi_agent/swarms_finance_10k_analysis_agentrearrange.ipynb) |
## Healthcare & Medical
| Name | Description | Link |
|------|-------------|------|
| MedInsight Pro | Medical research summarization and analysis using AI-driven agents | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/medical/physical_therapy/Swarms_Cookbook_MedInsight_Pro.ipynb) |
| Athletics Diagnosis | Diagnosis and treatment system for extreme athletics using AgentRearrange | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/medical/physical_therapy/swarms_diagnosis_treatment_extreme_athletics.ipynb) |
## Marketing & Content
| Name | Description | Link |
|------|-------------|------|
| NewsAgent | Real-time news aggregation and summarization for business intelligence | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/marketing/news/Swarms_Cookbook_NewsAgent.ipynb) |
| Social Media Marketing | Spreadsheet-based content generation for multi-platform marketing | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/marketing/content_generation/swarms_spreadsheet_analysis_walkthrough.ipynb) |
## Accounting & Finance Operations
| Name | Description | Link |
|------|-------------|------|
| Accounting Agents | Multi-agent system for financial projections and risk assessment | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/enterprise/accounting/multi_agent/accounting_agents_for_moa.ipynb) |
## Workshops & Tutorials
| Name | Description | Link |
|------|-------------|------|
| GPTuesday Event | Example of creating promotional content for tech events | [View Example](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/cookbook/workshops/sep_6_workshop/gptuesday_swarm.py) |
## Additional Resources
| Platform | Link | Description |
|----------|------|-------------|
| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides |
| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles |
| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support |
| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements |
| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates |
| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos |
| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events |
## Contributing
We welcome contributions! If you have an example or tutorial you'd like to add, please check our [contribution guidelines](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/CONTRIBUTING.md).
## License
This project is licensed under the MIT License - see the [LICENSE](https://github.com/The-Swarm-Corporation/Cookbook/blob/main/LICENSE) file for details.

@ -0,0 +1,108 @@
# Swarms Examples Index
A comprehensive index of examples from the [Swarms Examples Repository](https://github.com/The-Swarm-Corporation/swarms-examples).
Additionally, we have more comprehensive examples available in [The Swarms Cookbook](https://github.com/The-Swarm-Corporation/Cookbook).
## Single Agent Examples
### Core Agents
| Category | Example | Description |
|----------|---------|-------------|
| Basic | [Easy Example](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/easy_example.py) | Basic agent implementation demonstrating core functionality and setup |
| Settings | [Agent Settings](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/agent_settings.py) | Comprehensive configuration options for customizing agent behavior and capabilities |
| YAML | [Agents from YAML](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/agents_from_yaml_example.py) | Creating and configuring agents using YAML configuration files for easy deployment |
| Memory | [Agent with Long-term Memory](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/memory/agents_and_memory/agent_with_longterm_memory.py) | Implementation of persistent memory capabilities for maintaining context across sessions |
### Model Integrations
| Category | Example | Description |
|----------|---------|-------------|
| Azure | [Azure OpenAI Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/settings/various_models/basic_agent_with_azure_openai.py) | Integration with Azure OpenAI services for enterprise-grade AI capabilities |
| Groq | [Groq Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/settings/various_models/groq_agent.py) | High-performance inference using Groq's accelerated computing platform |
| Custom | [Custom Model Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/settings/various_models/custom_model_with_agent.py) | Framework for integrating custom ML models into the agent architecture |
### Tools and Function Calling
| Category | Example | Description |
|----------|---------|-------------|
| Basic Tools | [Tool Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/tools/tool_agent.py) | Basic tool-using agent demonstrating external tool integration capabilities |
| Advanced Tools | [Agent with Many Tools](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/tools/agent_with_many_tools.py) | Advanced agent utilizing multiple tools for complex task execution |
| OpenAI Functions | [OpenAI Function Caller](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/tools/function_calling/openai_function_caller_example.py) | Integration with OpenAI's function calling API for structured outputs |
| Command Line | [Command Tool Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/tools/tool_agent/command_r_tool_agent.py) | Command-line interface tool integration |
| Jamba | [Jamba Tool Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/tools/tool_agent/jamba_tool_agent.py) | Integration with Jamba framework for enhanced tool capabilities |
| Pydantic | [Pydantic Tool Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/tools/tool_agent/tool_agent_pydantic.py) | Tool validation and schema enforcement using Pydantic |
### Third-Party Integrations
| Category | Example | Description |
|----------|---------|-------------|
| Microsoft | [AutoGen Integration](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/3rd_party_agents/auto_gen.py) | Integration with Microsoft's AutoGen framework for autonomous agents |
| LangChain | [LangChain Integration](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/3rd_party_agents/langchain.py) | Combining LangChain's capabilities with Swarms for enhanced functionality |
| Browser | [Multion Integration](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/3rd_party_agents/multion_agent.py) | Web automation and browsing capabilities using Multion |
| Team AI | [Crew AI](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/3rd_party_agents/crew_ai.py) | Team-based AI collaboration using Crew AI framework |
| Development | [Griptape](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/3rd_party_agents/griptape.py) | Integration with Griptape for structured AI application development |
### Industry-Specific Agents
| Category | Example | Description |
|----------|---------|-------------|
| Finance | [401k Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/use_cases/finance/401k_agent.py) | Retirement planning assistant with investment strategy recommendations |
| Finance | [Estate Planning](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/use_cases/finance/estate_planning_agent.py) | Comprehensive estate planning and wealth management assistant |
| Security | [Perimeter Defense](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/use_cases/security/perimeter_defense_agent.py) | Security monitoring and threat detection system |
| Research | [Perplexity Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/use_cases/research/perplexity_agent.py) | Advanced research automation using Perplexity AI integration |
| Legal | [Alberto Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/use_cases/law/alberto_agent.py) | Legal research and document analysis assistant |
| Healthcare | [Pharma Agent](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/agents/use_cases/pharma/pharma_agent_two.py) | Pharmaceutical research and drug interaction analysis |
## Multi-Agent Examples
### Core Architectures
| Category | Example | Description |
|----------|---------|-------------|
| Basic | [Build a Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/base_swarm/build_a_swarm.py) | Foundation for creating custom swarm architectures with multiple agents |
| Auto Swarm | [Auto Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/auto_swarm/auto_swarm_example.py) | Self-organizing swarm with automatic task distribution and management |
| Concurrent | [Concurrent Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/concurrent_swarm/concurrent_swarm_example.py) | Parallel execution of tasks across multiple agents for improved performance |
| Star | [Star Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/different_architectures/star_swarm.py) | Centralized architecture with a hub agent coordinating peripheral agents |
| Circular | [Circular Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/different_architectures/circular_swarm.py) | Ring topology for cyclic information flow between agents |
### Experimental Architectures
| Category | Example | Description |
|----------|---------|-------------|
| Monte Carlo | [Monte Carlo Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/experimental/monte_carlo_swarm.py) | Probabilistic decision-making using Monte Carlo simulation across agents |
| Federated | [Federated Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/experimental/federated_swarm.py) | Distributed learning system with privacy-preserving agent collaboration |
| Ant Colony | [Ant Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/experimental/ant_swarm.py) | Bio-inspired optimization using ant colony algorithms for agent coordination |
| Matrix | [Agent Matrix](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/experimental/agent_matrix.py) | Grid-based agent organization for complex problem-solving |
| DFS | [DFS Search Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/experimental/dfs_search_swarm.py) | Depth-first search swarm for complex problem exploration |
| Pulsar | [Pulsar Swarm](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/experimental/pulsar_swarm.py) | Pulsar-based coordination for synchronized agent behavior |
### Collaboration Patterns
| Category | Example | Description |
|----------|---------|-------------|
| Delegation | [Agent Delegation](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/multi_agent_collaboration/agent_delegation.py) | Task delegation and management system |
| Communication | [Message Pool](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/multi_agent_collaboration/message_pool.py) | Shared communication system for efficient agent interaction |
| Scheduling | [Round Robin](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/multi_agent_collaboration/round_robin_example.py) | Round-robin task scheduling and execution |
| Load Balancing | [Load Balancer](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/multi_agent_collaboration/load_balancer_example.py) | Dynamic task distribution system for optimal resource utilization |
| Consensus | [Majority Voting](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/structs/swarms/multi_agent_collaboration/majority_voting.py) | Consensus-building system using democratic voting among agents |
### Industry Applications
| Category | Example | Description |
|----------|---------|-------------|
| Finance | [Accountant Team](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/accountant_team/account_team2_example.py) | Multi-agent system for financial analysis, bookkeeping, and tax planning |
| Marketing | [Ad Generation](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/ad_gen/ad_gen_example.py) | Collaborative ad creation with copywriting and design agents |
| Aerospace | [Space Traffic Control](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/agentic_space_traffic_control/game.py) | Complex simulation of space traffic management with multiple coordinating agents |
| Agriculture | [Plant Biology](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/plant_biologist_swarm/agricultural_swarm.py) | Agricultural analysis and optimization using specialized biology agents |
| Urban Dev | [Urban Planning](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/urban_planning/urban_planning_example.py) | City development planning with multiple specialized urban development agents |
| Education | [Education System](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/education/education_example.py) | Personalized learning system with multiple teaching and assessment agents |
| Security | [Email Phishing Detection](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/email_phiser/email_swarm.py) | Multi-agent security analysis and threat detection |
| Fashion | [Personal Stylist](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/personal_stylist/personal_stylist_example.py) | Fashion recommendation system with style analysis and matching agents |
| Healthcare | [Healthcare Assistant](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/positive_med/positive_med_example.py) | Medical diagnosis and treatment planning with specialist consultation agents |
| Security Ops | [Security Team](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/security_team/security_team_example.py) | Comprehensive security operations with threat detection and response agents |
| Medical | [X-Ray Analysis](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/demos/xray/xray_example.py) | Multi-agent medical imaging analysis and diagnosis |
| Business | [Business Strategy](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/business_strategy/business_strategy_graph/growth_agent.py) | Strategic planning and business development swarm |
| Research | [Astronomy Research](https://github.com/The-Swarm-Corporation/swarms-examples/blob/main/examples/applications/astronomy/multiversal_detection/test.py) | Collaborative space research and astronomical analysis |
## Additional Resources
- [Github](https://github.com/kyegomez/swarms)
- Discord (https://t.co/zlLe07AqUX)
- Telegram (https://t.co/dSRy143zQv)
- X Community (https://x.com/i/communities/1875452887414804745)

@ -242,6 +242,7 @@ nav:
- MatrixSwarm: "swarms/structs/matrix_swarm.md"
- ModelRouter: "swarms/structs/model_router.md"
- MALT: "swarms/structs/malt.md"
- Interactive Group Chat: "swarms/structs/interactive_groupchat.md"
- Various Execution Methods: "swarms/structs/various_execution_methods.md"
- Deep Research Swarm: "swarms/structs/deep_research_swarm.md"
- Swarm Matcher: "swarms/structs/swarm_matcher.md"
@ -252,6 +253,9 @@ nav:
- Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md"
- Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md"
- Multi-Agent Multi-Modal Structures:
- ImageAgentBatchProcessor: "swarms/structs/image_batch_agent.md"
- Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
@ -302,12 +306,15 @@ nav:
- Swarms 5.9.2: "swarms/changelog/changelog_new.md"
- Examples:
- Agent Examples:
- Overview: "examples/index.md"
- CookBook Index: "examples/cookbook_index.md"
- Customizing Agents:
- Basic Agent: "swarms/examples/basic_agent.md"
- Agents with Callable Tools: "swarms/examples/agent_with_tools.md"
# - Agent With MCP Integration: "swarms/examples/agent_with_mcp.md"
- Agent Output Types: "swarms/examples/agent_output_types.md"
- Agent with Structured Outputs: "swarms/examples/agent_structured_outputs.md"
- Agents with Vision: "swarms/examples/vision_processing.md"
- Various Model Providers:
- OpenAI: "swarms/examples/openai_example.md"
- Anthropic: "swarms/examples/claude.md"
@ -339,6 +346,7 @@ nav:
- ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md"
- MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
- Unique Swarms: "swarms/examples/unique_swarms.md"
- Agents as Tools: "swarms/examples/agents_as_tools.md"
- Applications:
- Swarms DAO: "swarms/examples/swarms_dao.md"
- Swarms of Browser Agents: "swarms/examples/swarms_of_browser_agents.md"

@ -1,11 +1,12 @@
# Agent Output Types Examples
# Agent Output Types Examples with Vision Capabilities
This example demonstrates how to use different output types when working with Swarms agents. Each output type formats the agent's response in a specific way, making it easier to integrate with different parts of your application.
This example demonstrates how to use different output types when working with Swarms agents, including vision-enabled agents that can analyze images. Each output type formats the agent's response in a specific way, making it easier to integrate with different parts of your application.
## Prerequisites
- Python 3.7+
- OpenAI API key
- Anthropic API key (optional, for Claude models)
- Swarms library
## Installation
@ -18,171 +19,61 @@ pip3 install -U swarms
```plaintext
WORKSPACE_DIR="agent_workspace"
OPENAI_API_KEY=""
ANTHROPIC_API_KEY=""
OPENAI_API_KEY="" # Required for GPT-4V vision capabilities
ANTHROPIC_API_KEY="" # Optional, for Claude models
```
## Available Output Types
The following output types are supported:
| Output Type | Description |
|------------|-------------|
| `"list"` | Returns response as a JSON string containing a list |
| `"dict"` or `"dictionary"` | Returns response as a Python dictionary |
| `"string"` or `"str"` | Returns response as a plain string |
| `"final"` or `"last"` | Returns only the final response |
| `"json"` | Returns response as a JSON string |
| `"all"` | Returns all responses in the conversation |
| `"yaml"` | Returns response formatted as YAML |
| `"xml"` | Returns response formatted as XML |
| `"dict-all-except-first"` | Returns all responses except the first as a dictionary |
| `"str-all-except-first"` | Returns all responses except the first as a string |
| `"basemodel"` | Returns response as a Pydantic BaseModel |
## Examples
### 1. String Output (Default)
```python
from swarms import Agent
# Initialize agent with string output
agent = Agent(
agent_name="String-Output-Agent",
agent_description="Demonstrates string output format",
system_prompt="You are a helpful assistant that provides clear text responses.",
output_type="str", # or "string"
)
response = agent.run("What is the capital of France?")
```
### 2. JSON Output
### Vision-Enabled Quality Control Agent
```python
# Initialize agent with JSON output
agent = Agent(
agent_name="JSON-Output-Agent",
agent_description="Demonstrates JSON output format",
system_prompt="You are an assistant that provides structured data responses.",
output_type="json"
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
response = agent.run("List the top 3 programming languages.")
```
# Image for analysis
factory_image = "image.jpg"
### 3. List Output
```python
# Initialize agent with list output
agent = Agent(
agent_name="List-Output-Agent",
agent_description="Demonstrates list output format",
system_prompt="You are an assistant that provides list-based responses.",
output_type="list"
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="gpt-4.1-mini",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=2,
output_type="str-all-except-first",
)
response = agent.run("Name three primary colors.")
```
### 4. Dictionary Output
```python
# Initialize agent with dictionary output
agent = Agent(
agent_name="Dict-Output-Agent",
agent_description="Demonstrates dictionary output format",
system_prompt="You are an assistant that provides dictionary-based responses.",
output_type="dict" # or "dictionary"
response = quality_control_agent.run(
task="what is in the image?",
img=factory_image,
)
response = agent.run("Provide information about a book.")
```
### 5. YAML Output
```python
# Initialize agent with YAML output
agent = Agent(
agent_name="YAML-Output-Agent",
agent_description="Demonstrates YAML output format",
system_prompt="You are an assistant that provides YAML-formatted responses.",
output_type="yaml"
)
print(response)
response = agent.run("Describe a recipe.")
```
### 6. XML Output
```python
# Initialize agent with XML output
agent = Agent(
agent_name="XML-Output-Agent",
agent_description="Demonstrates XML output format",
system_prompt="You are an assistant that provides XML-formatted responses.",
output_type="xml"
)
response = agent.run("Provide user information.")
```
### 7. All Responses
```python
# Initialize agent to get all responses
agent = Agent(
agent_name="All-Output-Agent",
agent_description="Demonstrates getting all responses",
system_prompt="You are an assistant that provides multiple responses.",
output_type="all"
)
response = agent.run("Tell me about climate change.")
```
### 8. Final Response Only
```python
# Initialize agent to get only final response
agent = Agent(
agent_name="Final-Output-Agent",
agent_description="Demonstrates getting only final response",
system_prompt="You are an assistant that provides concise final answers.",
output_type="final" # or "last"
)
response = agent.run("What's the meaning of life?")
```
## Best Practices
1. Choose the output type based on your application's needs:
| Output Type | Use Case |
|------------|----------|
| `"str"` | Simple text responses |
| `"json"` or `"dict"` | Structured data |
| `"list"` | Array-like data |
| `"yaml"` | Configuration-like data |
| `"xml"` | XML-based integrations |
| `"basemodel"` | Type-safe data handling |
2. Handle the output appropriately in your application:
- Parse JSON/YAML responses when needed
- Validate structured data
### Supported Image Formats
- Handle potential formatting errors
The vision-enabled agents support various image formats including:
3. Consider using `try-except` blocks when working with structured output types to handle potential parsing errors.
| Format | Description |
|--------|-------------|
| JPEG/JPG | Standard image format with lossy compression |
| PNG | Lossless format supporting transparency |
| GIF | Animated format (only first frame used) |
| WebP | Modern format with both lossy and lossless compression |
### Best Practices for Vision Tasks
This comprehensive guide shows how to use all available output types in the Swarms framework, making it easier to integrate agent responses into your applications in the most suitable format for your needs.
| Best Practice | Description |
|--------------|-------------|
| Image Quality | Ensure images are clear and well-lit for optimal analysis |
| Image Size | Keep images under 20MB and in supported formats |
| Task Specificity | Provide clear, specific instructions for image analysis |
| Model Selection | Use vision-capable models (e.g., GPT-4V) for image tasks |

@ -0,0 +1,586 @@
# Agents as Tools Tutorial
This tutorial demonstrates how to create a powerful multi-agent system where agents can delegate tasks to specialized sub-agents. This pattern is particularly useful for complex tasks that require different types of expertise or capabilities.
## Overview
The Agents as Tools pattern allows you to:
- Create specialized agents with specific capabilities
- Have agents delegate tasks to other agents
- Chain multiple agents together for complex workflows
- Maintain separation of concerns between different agent roles
## Prerequisites
- Python 3.8 or higher
- Basic understanding of Python programming
- Familiarity with async/await concepts (optional)
## Installation
Install the swarms package using pip:
```bash
pip install -U swarms
```
## Basic Setup
1. First, set up your environment variables:
```python
WORKSPACE_DIR="agent_workspace"
ANTHROPIC_API_KEY=""
```
## Step-by-Step Guide
1. **Define Your Tools**
- Create functions that will serve as tools for your agents
- Add proper type hints and detailed docstrings
- Include error handling and logging
- Example:
```python
def my_tool(param: str) -> str:
"""Detailed description of what the tool does.
Args:
param: Description of the parameter
Returns:
Description of the return value
"""
# Tool implementation
return result
```
2. **Create Specialized Agents**
- Define agents with specific roles and capabilities
- Configure each agent with appropriate settings
- Assign relevant tools to each agent
```python
specialized_agent = Agent(
agent_name="Specialist",
agent_description="Expert in specific domain",
system_prompt="Detailed instructions for the agent",
tools=[tool1, tool2]
)
```
3. **Set Up the Director Agent**
- Create a high-level agent that coordinates other agents
- Give it access to specialized agents as tools
- Define clear delegation rules
```python
director = Agent(
agent_name="Director",
agent_description="Coordinates other agents",
tools=[specialized_agent.run]
)
```
4. **Execute Multi-Agent Workflows**
- Start with the director agent
- Let it delegate tasks as needed
- Handle responses and chain results
```python
result = director.run("Your high-level task description")
```
## Code
```python
import json
import requests
from swarms import Agent
def create_python_file(code: str, filename: str) -> str:
"""Create a Python file with the given code and execute it using Python 3.12.
This function takes a string containing Python code, writes it to a file, and executes it
using Python 3.12 via subprocess. The file will be created in the current working directory.
If a file with the same name already exists, it will be overwritten.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to create and execute.
Returns:
str: A detailed message indicating the file was created and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Hello, World!')"
>>> result = create_python_file(code, "test.py")
>>> print(result)
'Python file created successfully. Execution result: Hello, World!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get file size and permissions
file_stats = os.stat(filename)
file_size = file_stats.st_size
file_permissions = oct(file_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True
)
# Create detailed response
response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def update_python_file(code: str, filename: str) -> str:
"""Update an existing Python file with new code and execute it using Python 3.12.
This function takes a string containing Python code and updates an existing Python file.
If the file doesn't exist, it will be created. The file will be executed using Python 3.12.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to update and execute.
Returns:
str: A detailed message indicating the file was updated and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Updated code!')"
>>> result = update_python_file(code, "my_script.py")
>>> print(result)
'Python file updated successfully. Execution result: Updated code!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Check if file exists and get its stats
file_exists = os.path.exists(filename)
if file_exists:
old_stats = os.stat(filename)
old_size = old_stats.st_size
old_permissions = oct(old_stats.st_mode)[-3:]
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get new file stats
new_stats = os.stat(filename)
new_size = new_stats.st_size
new_permissions = oct(new_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True
)
# Create detailed response
response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def run_quant_trading_agent(task: str) -> str:
"""Run a quantitative trading agent to analyze and execute trading strategies.
This function initializes and runs a specialized quantitative trading agent that can:
- Develop and backtest trading strategies
- Analyze market data for alpha opportunities
- Implement risk management frameworks
- Optimize portfolio allocations
- Conduct quantitative research
- Monitor market microstructure
- Evaluate trading system performance
Args:
task (str): The specific trading task or analysis to perform
Returns:
str: The agent's response or analysis results
Example:
>>> result = run_quant_trading_agent("Analyze SPY ETF for mean reversion opportunities")
>>> print(result)
"""
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=2,
model_name="claude-3-5-sonnet-20240620",
tools=[create_python_file, update_python_file, backtest_summary],
)
out = agent.run(task)
return out
def backtest_summary(report: str) -> str:
"""Generate a summary of a backtest report, but only if the backtest was profitable.
This function should only be used when the backtest results show a positive return.
Using this function for unprofitable backtests may lead to misleading conclusions.
Args:
report (str): The backtest report containing performance metrics
Returns:
str: A formatted summary of the backtest report
Example:
>>> result = backtest_summary("Total Return: +15.2%, Sharpe: 1.8")
>>> print(result)
'The backtest report is: Total Return: +15.2%, Sharpe: 1.8'
"""
return f"The backtest report is: {report}"
def get_coin_price(coin_id: str, vs_currency: str) -> str:
"""
Get the current price of a specific cryptocurrency.
Args:
coin_id (str): The CoinGecko ID of the cryptocurrency (e.g., 'bitcoin', 'ethereum')
vs_currency (str, optional): The target currency. Defaults to "usd".
Returns:
str: JSON formatted string containing the coin's current price and market data
Raises:
requests.RequestException: If the API request fails
Example:
>>> result = get_coin_price("bitcoin")
>>> print(result)
{"bitcoin": {"usd": 45000, "usd_market_cap": 850000000000, ...}}
"""
try:
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": coin_id,
"vs_currencies": vs_currency,
"include_market_cap": True,
"include_24hr_vol": True,
"include_24hr_change": True,
"include_last_updated_at": True,
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return json.dumps(data, indent=2)
except requests.RequestException as e:
return json.dumps(
{
"error": f"Failed to fetch price for {coin_id}: {str(e)}"
}
)
except Exception as e:
return json.dumps({"error": f"Unexpected error: {str(e)}"})
def run_crypto_quant_agent(task: str) -> str:
"""
Run a crypto quantitative trading agent with specialized tools for cryptocurrency market analysis.
This function initializes and runs a quantitative trading agent specifically designed for
cryptocurrency markets. The agent is equipped with tools for price fetching and can perform
various quantitative analyses including algorithmic trading strategy development, risk management,
and market microstructure analysis.
Args:
task (str): The task or query to be processed by the crypto quant agent.
Returns:
str: The agent's response to the given task.
Example:
>>> response = run_crypto_quant_agent("Analyze the current market conditions for Bitcoin")
>>> print(response)
"Based on current market analysis..."
"""
# Initialize the agent with expanded tools
quant_agent = Agent(
agent_name="Crypto-Quant-Agent",
agent_description="Advanced quantitative trading agent specializing in cryptocurrency markets with algorithmic analysis capabilities",
system_prompt="""You are an expert quantitative trading agent specializing in cryptocurrency markets. Your capabilities include:
- Algorithmic trading strategy development and backtesting
- Statistical arbitrage and market making for crypto assets
- Risk management and portfolio optimization for digital assets
- High-frequency trading system design for crypto markets
- Market microstructure analysis of crypto exchanges
- Quantitative research methodologies for crypto assets
- Financial mathematics and stochastic processes
- Machine learning applications in crypto trading
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
max_tokens=4096,
model_name="gpt-4.1-mini",
dynamic_temperature_enabled=True,
output_type="final",
tools=[
get_coin_price,
],
)
return quant_agent.run(task)
# Initialize the agent
agent = Agent(
agent_name="Director-Agent",
agent_description="Strategic director and project management agent",
system_prompt="""You are an expert Director Agent with comprehensive capabilities in:
- Strategic planning and decision making
- Project management and coordination
- Resource allocation and optimization
- Team leadership and delegation
- Risk assessment and mitigation
- Stakeholder management
- Process optimization
- Quality assurance
Your core responsibilities include:
1. Developing and executing strategic initiatives
2. Coordinating cross-functional projects
3. Managing resource allocation
4. Setting and tracking KPIs
5. Ensuring project deliverables
6. Risk management and mitigation
7. Stakeholder communication
You maintain strict adherence to:
- Best practices in project management
- Data-driven decision making
- Clear communication protocols
- Quality standards
- Timeline management
- Budget constraints
- Regulatory compliance
You communicate with clarity and authority while maintaining professionalism and ensuring all stakeholders are aligned.""",
max_loops=1,
model_name="gpt-4o-mini",
output_type="final",
interactive=False,
tools=[run_quant_trading_agent],
)
out = agent.run("""
Please call the quantitative trading agent to generate Python code for an Bitcoin backtest using the CoinGecko API.
Provide a comprehensive description of the backtest methodology and trading strategy.
Consider the API limitations of CoinGecko and utilize only free, open-source libraries that don't require API keys. Use the requests library to fetch the data. Create a specialized strategy for the backtest focused on the orderbook and other data for price action.
The goal is to create a backtest that can predict the price action of the coin based on the orderbook and other data.
Maximize the profit of the backtest. Please use the OKX price API for the orderbook and other data. Be very explicit in your implementation.
Be very precise with the instructions you give to the agent and tell it to a 400 lines of good code.
""")
print(out)
```
## Best Practices
| Category | Best Practice | Description |
|----------|---------------|-------------|
| **Tool Design** | Single Purpose | Keep tools focused and single-purpose |
| | Clear Naming | Use clear, descriptive names |
| | Error Handling | Include comprehensive error handling |
| | Documentation | Add detailed documentation |
| **Agent Configuration** | Clear Role | Give each agent a clear, specific role |
| | System Prompts | Provide detailed system prompts |
| | Model Parameters | Configure appropriate model and parameters |
| | Resource Limits | Set reasonable limits on iterations and tokens |
| **Error Handling** | Multi-level | Implement proper error handling at each level |
| | Logging | Include logging for debugging |
| | API Management | Handle API rate limits and timeouts |
| | Fallbacks | Provide fallback options when possible |
| **Performance Optimization** | Async Operations | Use async operations where appropriate |
| | Caching | Implement caching when possible |
| | Token Usage | Monitor and optimize token usage |
| | Batch Processing | Consider batch operations for efficiency |

@ -0,0 +1,150 @@
# Vision Processing Examples
This example demonstrates how to use vision-enabled agents in Swarms to analyze images and process visual information. You'll learn how to work with both OpenAI and Anthropic vision models for various use cases.
## Prerequisites
- Python 3.7+
- OpenAI API key (for GPT-4V)
- Anthropic API key (for Claude 3)
- Swarms library
## Installation
```bash
pip3 install -U swarms
```
## Environment Variables
```plaintext
WORKSPACE_DIR="agent_workspace"
OPENAI_API_KEY="" # Required for GPT-4V
ANTHROPIC_API_KEY="" # Required for Claude 3
```
## Working with Images
### Supported Image Formats
Vision-enabled agents support various image formats:
| Format | Description |
|--------|-------------|
| JPEG/JPG | Standard image format with lossy compression |
| PNG | Lossless format supporting transparency |
| GIF | Animated format (only first frame used) |
| WebP | Modern format with both lossy and lossless compression |
### Image Guidelines
- Maximum file size: 20MB
- Recommended resolution: At least 512x512 pixels
- Image should be clear and well-lit
- Avoid heavily compressed or blurry images
## Examples
### 1. Quality Control with GPT-4V
```python
from swarms.structs import Agent
from swarms.prompts.logistics import Quality_Control_Agent_Prompt
# Load your image
factory_image = "path/to/your/image.jpg" # Local file path
# Or use a URL
# factory_image = "https://example.com/image.jpg"
# Initialize quality control agent with GPT-4V
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides detailed quality reports.",
model_name="gpt-4.1-mini",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=1
)
# Run the analysis
response = quality_control_agent.run(
task="Analyze this image and provide a detailed quality control report",
img=factory_image
)
print(response)
```
### 2. Visual Analysis with Claude 3
```python
from swarms.structs import Agent
from swarms.prompts.logistics import Visual_Analysis_Prompt
# Load your image
product_image = "path/to/your/product.jpg"
# Initialize visual analysis agent with Claude 3
visual_analyst = Agent(
agent_name="Visual Analyst",
agent_description="An agent that performs detailed visual analysis of products and scenes.",
model_name="anthropic/claude-3-opus-20240229",
system_prompt=Visual_Analysis_Prompt,
multi_modal=True,
max_loops=1
)
# Run the analysis
response = visual_analyst.run(
task="Provide a comprehensive analysis of this product image",
img=product_image
)
print(response)
```
### 3. Image Batch Processing
```python
from swarms.structs import Agent
import os
def process_image_batch(image_folder, agent):
"""Process multiple images in a folder"""
results = []
for image_file in os.listdir(image_folder):
if image_file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
image_path = os.path.join(image_folder, image_file)
response = agent.run(
task="Analyze this image",
img=image_path
)
results.append((image_file, response))
return results
# Example usage
image_folder = "path/to/image/folder"
batch_results = process_image_batch(image_folder, visual_analyst)
```
## Best Practices
| Category | Best Practice | Description |
|----------|---------------|-------------|
| Image Preparation | Format Support | Ensure images are in supported formats (JPEG, PNG, GIF, WebP) |
| | Size & Quality | Optimize image size and quality for better processing |
| | Image Quality | Use clear, well-lit images for accurate analysis |
| Model Selection | GPT-4V Usage | Use for general vision tasks and detailed analysis |
| | Claude 3 Usage | Use for complex reasoning and longer outputs |
| | Batch Processing | Consider batch processing for multiple images |
| Error Handling | Path Validation | Always validate image paths before processing |
| | API Error Handling | Implement proper error handling for API calls |
| | Rate Monitoring | Monitor API rate limits and token usage |
| Performance Optimization | Result Caching | Cache results when processing the same images |
| | Batch Processing | Use batch processing for multiple images |
| | Parallel Processing | Implement parallel processing for large datasets |

@ -0,0 +1,271 @@
# ImageAgentBatchProcessor Documentation
## Overview
The `ImageAgentBatchProcessor` is a high-performance parallel image processing system designed for running AI agents on multiple images concurrently. It provides robust error handling, logging, and flexible configuration options.
## Installation
```bash
pip install swarms
```
## Class Arguments
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| agents | Union[Agent, List[Agent], Callable, List[Callable]] | Required | Single agent or list of agents to process images |
| max_workers | int | None | Maximum number of parallel workers (defaults to 95% of CPU cores) |
| supported_formats | List[str] | ['.jpg', '.jpeg', '.png'] | List of supported image file extensions |
## Methods
### run()
**Description**: Main method for processing multiple images in parallel with configured agents. Can handle single images, multiple images, or entire directories.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| image_paths | Union[str, List[str], Path] | Yes | Single image path, list of paths, or directory path |
| tasks | Union[str, List[str]] | Yes | Single task or list of tasks to perform on each image |
**Returns**: List[Dict[str, Any]] - List of processing results for each image
**Example**:
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
from pathlib import Path
# Initialize agent and processor
agent = Agent(api_key="your-api-key", model="gpt-4-vision")
processor = ImageAgentBatchProcessor(agents=agent)
# Example 1: Process single image
results = processor.run(
image_paths="path/to/image.jpg",
tasks="Describe this image"
)
# Example 2: Process multiple images
results = processor.run(
image_paths=["image1.jpg", "image2.jpg"],
tasks=["Describe objects", "Identify colors"]
)
# Example 3: Process directory
results = processor.run(
image_paths=Path("./images"),
tasks="Analyze image content"
)
```
### _validate_image_path()
**Description**: Internal method that validates if an image path exists and has a supported format.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| image_path | Union[str, Path] | Yes | Path to the image file to validate |
**Returns**: Path - Validated Path object
**Example**:
```python
from swarms.structs import ImageAgentBatchProcessor, ImageProcessingError
from pathlib import Path
processor = ImageAgentBatchProcessor(agents=agent)
try:
validated_path = processor._validate_image_path("image.jpg")
print(f"Valid image path: {validated_path}")
except ImageProcessingError as e:
print(f"Invalid image path: {e}")
```
### _process_single_image()
**Description**: Internal method that processes a single image with one agent and one or more tasks.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| image_path | Path | Yes | Path to the image to process |
| tasks | Union[str, List[str]] | Yes | Tasks to perform on the image |
| agent | Agent | Yes | Agent to use for processing |
**Returns**: Dict[str, Any] - Processing results for the image
**Example**:
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
from pathlib import Path
agent = Agent(api_key="your-api-key", model="gpt-4-vision")
processor = ImageAgentBatchProcessor(agents=agent)
try:
result = processor._process_single_image(
image_path=Path("image.jpg"),
tasks=["Describe image", "Identify objects"],
agent=agent
)
print(f"Processing results: {result}")
except Exception as e:
print(f"Processing failed: {e}")
```
### __call__()
**Description**: Makes the ImageAgentBatchProcessor callable like a function. Redirects to the run() method.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| *args | Any | No | Variable length argument list passed to run() |
| **kwargs | Any | No | Keyword arguments passed to run() |
**Returns**: List[Dict[str, Any]] - Same as run() method
**Example**:
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
# Initialize
agent = Agent(api_key="your-api-key", model="gpt-4-vision")
processor = ImageAgentBatchProcessor(agents=agent)
# Using __call__
results = processor(
image_paths=["image1.jpg", "image2.jpg"],
tasks="Describe the image"
)
# This is equivalent to:
results = processor.run(
image_paths=["image1.jpg", "image2.jpg"],
tasks="Describe the image"
)
```
## Return Format
The processor returns a list of dictionaries with the following structure:
```python
{
"image_path": str, # Path to the processed image
"results": { # Results for each task
"task_name": result, # Task-specific results
},
"processing_time": float # Processing time in seconds
}
```
## Complete Usage Examples
### 1. Basic Usage with Single Agent
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
# Initialize an agent
agent = Agent(
api_key="your-api-key",
model="gpt-4-vision"
)
# Create processor
processor = ImageAgentBatchProcessor(agents=agent)
# Process single image
results = processor.run(
image_paths="path/to/image.jpg",
tasks="Describe this image in detail"
)
```
### 2. Processing Multiple Images with Multiple Tasks
```python
# Initialize with multiple agents
agent1 = Agent(api_key="key1", model="gpt-4-vision")
agent2 = Agent(api_key="key2", model="claude-3")
processor = ImageAgentBatchProcessor(
agents=[agent1, agent2],
supported_formats=['.jpg', '.png', '.webp']
)
# Define multiple tasks
tasks = [
"Describe the main objects in the image",
"What is the dominant color?",
"Identify any text in the image"
]
# Process a directory of images
results = processor.run(
image_paths="path/to/image/directory",
tasks=tasks
)
# Process results
for result in results:
print(f"Image: {result['image_path']}")
for task, output in result['results'].items():
print(f"Task: {task}")
print(f"Result: {output}")
print(f"Processing time: {result['processing_time']:.2f} seconds")
```
### 3. Custom Error Handling
```python
from swarms.structs import ImageAgentBatchProcessor, ImageProcessingError
try:
processor = ImageAgentBatchProcessor(agents=agent)
results = processor.run(
image_paths=["image1.jpg", "image2.png", "invalid.txt"],
tasks="Analyze the image"
)
except ImageProcessingError as e:
print(f"Image processing failed: {e}")
except InvalidAgentError as e:
print(f"Agent configuration error: {e}")
```
## Best Practices
| Best Practice | Description |
|--------------|-------------|
| Resource Management | • The processor automatically uses 95% of available CPU cores<br>• For memory-intensive operations, consider reducing `max_workers` |
| Error Handling | • Always wrap processor calls in try-except blocks<br>• Check the results for any error keys |
| Task Design | • Keep tasks focused and specific<br>• Group related tasks together for efficiency |
| Performance Optimization | • Process images in batches for better throughput<br>• Use multiple agents for different types of analysis |
## Limitations
| Limitation | Description |
|------------|-------------|
| File Format Support | Only supports image file formats specified in `supported_formats` |
| Agent Requirements | Requires valid agent configurations |
| Resource Scaling | Memory usage scales with number of concurrent processes |
This documentation provides a comprehensive guide to using the `ImageAgentBatchProcessor`. The class is designed to be both powerful and flexible, allowing for various use cases from simple image analysis to complex multi-agent processing pipelines.

@ -0,0 +1,270 @@
# InteractiveGroupChat Documentation
The InteractiveGroupChat is a sophisticated multi-agent system that enables interactive conversations between users and AI agents using @mentions. This system allows users to direct tasks to specific agents and facilitates collaborative responses when multiple agents are mentioned.
## Features
- **@mentions Support**: Direct tasks to specific agents using @agent_name syntax
- **Multi-Agent Collaboration**: Multiple mentioned agents can see and respond to each other's tasks
- **Callable Function Support**: Supports both Agent instances and callable functions as chat participants
- **Comprehensive Error Handling**: Custom error classes for different scenarios
- **Conversation History**: Maintains a complete history of the conversation
- **Flexible Output Formatting**: Configurable output format for conversation history
## Installation
```bash
pip install swarms
```
## Methods Reference
### Constructor (`__init__`)
**Description:**
Initializes a new InteractiveGroupChat instance with the specified configuration.
**Arguments:**
| Parameter | Type | Description | Default |
|-----------|------|-------------|---------|
| `id` | str | Unique identifier for the chat | auto-generated key |
| `name` | str | Name of the group chat | "InteractiveGroupChat" |
| `description` | str | Description of the chat's purpose | generic description |
| `agents` | List[Union[Agent, Callable]] | List of participating agents | empty list |
| `max_loops` | int | Maximum conversation turns | 1 |
| `output_type` | str | Type of output format | "string" |
| `interactive` | bool | Whether to enable interactive mode | False |
**Example:**
```python
from swarms import Agent, InteractiveGroupChat
# Create agents
financial_advisor = Agent(
agent_name="FinancialAdvisor",
system_prompt="You are a financial advisor specializing in investment strategies.",
model_name="gpt-4"
)
tax_expert = Agent(
agent_name="TaxExpert",
system_prompt="You are a tax expert providing tax-related guidance.",
model_name="gpt-4"
)
# Initialize group chat
chat = InteractiveGroupChat(
id="finance-chat-001",
name="Financial Advisory Team",
description="Expert financial guidance team",
agents=[financial_advisor, tax_expert],
max_loops=3,
output_type="string",
interactive=True
)
```
### Run Method (`run`)
**Description:**
Processes a task and gets responses from mentioned agents. This is the main method for sending tasks in non-interactive mode.
**Arguments:**
- `task` (str): The input task containing @mentions to agents
**Returns:**
- str: Formatted conversation history including agent responses
**Example:**
```python
# Single agent interaction
response = chat.run("@FinancialAdvisor what are the best ETFs for 2024?")
print(response)
# Multiple agent collaboration
response = chat.run("@FinancialAdvisor and @TaxExpert, how can I minimize taxes on my investments?")
print(response)
```
### Start Interactive Session (`start_interactive_session`)
**Description:**
Starts an interactive terminal session for real-time chat with agents. This creates a REPL (Read-Eval-Print Loop) interface.
**Arguments:**
None
**Example:**
```python
# Initialize chat with interactive mode
chat = InteractiveGroupChat(
agents=[financial_advisor, tax_expert],
interactive=True
)
# Start the interactive session
chat.start_interactive_session()
```
### Extract Mentions (`_extract_mentions`)
**Description:**
Internal method that extracts @mentions from a task. Used by the run method to identify which agents should respond.
**Arguments:**
- `task` (str): The input task to extract mentions from
**Returns:**
- List[str]: List of mentioned agent names
**Example:**
```python
# Internal usage example (not typically called directly)
chat = InteractiveGroupChat(agents=[financial_advisor, tax_expert])
mentions = chat._extract_mentions("@FinancialAdvisor and @TaxExpert, please help")
print(mentions) # ['FinancialAdvisor', 'TaxExpert']
```
### Validate Initialization (`_validate_initialization`)
**Description:**
Internal method that validates the group chat configuration during initialization.
**Arguments:**
None
**Example:**
```python
# Internal validation happens automatically during initialization
chat = InteractiveGroupChat(
agents=[financial_advisor], # Valid: at least one agent
max_loops=5 # Valid: positive number
)
```
### Setup Conversation Context (`_setup_conversation_context`)
**Description:**
Internal method that sets up the initial conversation context with group chat information.
**Arguments:**
None
**Example:**
```python
# Context is automatically set up during initialization
chat = InteractiveGroupChat(
name="Investment Team",
description="Expert investment advice",
agents=[financial_advisor, tax_expert]
)
# The conversation context now includes chat name, description, and agent info
```
### Update Agent Prompts (`_update_agent_prompts`)
**Description:**
Internal method that updates each agent's system prompt with information about other agents and the group chat.
**Arguments:**
None
**Example:**
```python
# Agent prompts are automatically updated during initialization
chat = InteractiveGroupChat(agents=[financial_advisor, tax_expert])
# Each agent now knows about the other participants in the chat
```
## Error Classes
### InteractiveGroupChatError
**Description:**
Base exception class for InteractiveGroupChat errors.
**Example:**
```python
try:
# Some operation that might fail
chat.run("@InvalidAgent hello")
except InteractiveGroupChatError as e:
print(f"Chat error occurred: {e}")
```
### AgentNotFoundError
**Description:**
Raised when a mentioned agent is not found in the group.
**Example:**
```python
try:
chat.run("@NonExistentAgent hello!")
except AgentNotFoundError as e:
print(f"Agent not found: {e}")
```
### NoMentionedAgentsError
**Description:**
Raised when no agents are mentioned in the task.
**Example:**
```python
try:
chat.run("Hello everyone!") # No @mentions
except NoMentionedAgentsError as e:
print(f"No agents mentioned: {e}")
```
### InvalidtaskFormatError
**Description:**
Raised when the task format is invalid.
**Example:**
```python
try:
chat.run("@Invalid@Format")
except InvalidtaskFormatError as e:
print(f"Invalid task format: {e}")
```
## Best Practices
| Best Practice | Description | Example |
|--------------|-------------|---------|
| Agent Naming | Use clear, unique names for agents to avoid confusion | `financial_advisor`, `tax_expert` |
| task Format | Always use @mentions to direct tasks to specific agents | `@financial_advisor What's your investment advice?` |
| Error Handling | Implement proper error handling for various scenarios | `try/except` blocks for `AgentNotFoundError` |
| Context Management | Be aware that agents can see the full conversation history | Monitor conversation length and relevance |
| Resource Management | Consider the number of agents and task length to optimize performance | Limit max_loops and task size |
## Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to our GitHub repository.
## License
This project is licensed under the Apache License - see the LICENSE file for details.

@ -1,6 +1,5 @@
import time
from swarms import Agent
from swarms.schemas.conversation_schema import ConversationSchema
# Initialize the agent
agent = Agent(
@ -38,12 +37,8 @@ agent = Agent(
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
output_type="json",
output_type="all",
safety_prompt_on=True,
conversation_schema=ConversationSchema(
time_enabled=True,
message_id_on=True,
),
)
out = agent.run("What are the best top 3 etfs for gold coverage?")

@ -0,0 +1,25 @@
from swarms import Agent
from swarms.communication.duckdb_wrap import DuckDBConversation
# Configure a DuckDB-backed conversation store
conversation_store = DuckDBConversation(
db_path="support_conversation.duckdb",
table_name="support_history",
enable_logging=True,
)
# Create an agent that uses this persistent memory
agent = Agent(
agent_name="HelpdeskAgent",
system_prompt="You are a helpful assistant.",
model_name="gpt-4o-mini",
long_term_memory=conversation_store,
max_loops=1,
autosave=False,
)
response = agent.run("What are your hours of operation?")
print(response)
# View the conversation as stored in DuckDB
print(conversation_store.to_dict())

@ -0,0 +1,25 @@
from swarms import Agent
from swarms.communication.pulsar_struct import PulsarConversation
# Configure a Pulsar-backed conversation store
conversation_store = PulsarConversation(
pulsar_host="pulsar://localhost:6650", # adjust to your broker
topic="support_conversation",
token_count=False,
)
# Create an agent that uses this persistent memory
agent = Agent(
agent_name="SupportAgent",
system_prompt="You are a helpful assistant.",
model_name="gpt-4o-mini",
long_term_memory=conversation_store,
max_loops=1,
autosave=False,
)
response = agent.run("What time is check-out?")
print(response)
# View the messages as stored in Pulsar
print(conversation_store.get_messages())

@ -0,0 +1,33 @@
"""Persist agent dialogue using SQLiteConversation.
Run `pip install -e .` in the repository root so the ``swarms`` package is
available before executing this script.
"""
from swarms import Agent
from swarms.communication.sqlite_wrap import SQLiteConversation
# Configure a conversation store backed by SQLite
conversation_store = SQLiteConversation(
db_path="agent_history.db",
table_name="messages",
enable_logging=True,
)
# Create an agent that leverages the SQLite-based memory
agent = Agent(
agent_name="SupportAgent",
system_prompt="You are a helpful assistant.",
model_name="gpt-4o-mini",
long_term_memory=conversation_store,
max_loops=1,
# Autosave attempts to call `save()` on the memory object which is not
# implemented in SQLiteConversation, so we disable it for this example.
autosave=False,
)
response = agent.run("How do I reset my password?")
print(response)
# Show the conversation as stored in SQLite
print(conversation_store.to_dict())

@ -0,0 +1,26 @@
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
# Image for analysis
factory_image = "image.jpg"
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="anthropic/claude-3-opus-20240229",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
)
response = quality_control_agent.run(
task="Create a comprehensive report on the factory image and it's status",
img=factory_image,
)
print(response)

Binary file not shown.

After

Width:  |  Height:  |  Size: 232 KiB

@ -0,0 +1,32 @@
from swarms import Agent
from swarms.structs.image_batch_processor import (
ImageAgentBatchProcessor,
)
from pathlib import Path
# Initialize agent and processor
# Quality control agent
agent = Agent(
model_name="gpt-4.1-mini",
max_loops=1,
)
# Create processor
processor = ImageAgentBatchProcessor(agents=agent)
# Example 1: Process single image
results = processor.run(
image_paths="path/to/image.jpg", tasks="Describe this image"
)
# Example 2: Process multiple images
results = processor.run(
image_paths=["image1.jpg", "image2.jpg"],
tasks=["Describe objects", "Identify colors"],
)
# Example 3: Process directory
results = processor.run(
image_paths=Path("./images"), tasks="Analyze image content"
)

@ -0,0 +1,67 @@
import json
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
from swarms import BaseTool
import litellm
litellm._turn_on_debug()
# Image for analysis
factory_image = "image.jpg"
def security_analysis(danger_level: str = None) -> str:
"""
Analyzes the security danger level and returns an appropriate response.
Args:
danger_level (str, optional): The level of danger to analyze.
Can be "low", "medium", "high", or None. Defaults to None.
Returns:
str: A string describing the danger level assessment.
- "No danger level provided" if danger_level is None
- "No danger" if danger_level is "low"
- "Medium danger" if danger_level is "medium"
- "High danger" if danger_level is "high"
- "Unknown danger level" for any other value
"""
if danger_level is None:
return "No danger level provided"
if danger_level == "low":
return "No danger"
if danger_level == "medium":
return "Medium danger"
if danger_level == "high":
return "High danger"
return "Unknown danger level"
schema = BaseTool().function_to_dict(security_analysis)
print(json.dumps(schema, indent=4))
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="anthropic/claude-3-opus-20240229",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
tools_list_dictionary=[schema],
)
response = quality_control_agent.run(
task="what is in the image?",
# img=factory_image,
)
print(response)

@ -0,0 +1,27 @@
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
# Image for analysis
factory_image = "image.jpg"
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="gpt-4.1-mini",
system_prompt=Quality_Control_Agent_Prompt,
# multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
)
response = quality_control_agent.run(
task="Create a comprehensive report on the image",
img=factory_image,
)
print(response)

@ -423,7 +423,7 @@ agent = Agent(
system_prompt="You are an advanced financial advisor agent with access to real-time cryptocurrency data from multiple sources including CoinGecko, Jupiter Protocol, and HTX. You can help users analyze market trends, check prices, find trading opportunities, perform swaps, and get detailed market insights. Always provide accurate, up-to-date information and explain market data in an easy-to-understand way.",
max_loops=1,
max_tokens=4096,
model_name="gpt-4o-mini",
model_name="gpt-4.1-mini",
dynamic_temperature_enabled=True,
output_type="all",
tools=[
@ -442,5 +442,7 @@ agent = Agent(
)
# agent.run("Use defi stats to find the best defi project to invest in")
agent.run("Get the market sentiment for bitcoin")
agent.run(
"Get the market sentiment for bitcoin and fetch the price of ethereum"
)
# Automatically executes any number and combination of tools you have uploaded to the tools parameter!

@ -0,0 +1,51 @@
from swarms import Agent
from swarms.structs.interactive_groupchat import InteractiveGroupChat
if __name__ == "__main__":
# Initialize agents
financial_advisor = Agent(
agent_name="FinancialAdvisor",
system_prompt="You are a financial advisor specializing in investment strategies and portfolio management.",
random_models_on=True,
output_type="final",
)
tax_expert = Agent(
agent_name="TaxExpert",
system_prompt="You are a tax expert who provides guidance on tax optimization and compliance.",
random_models_on=True,
output_type="final",
)
investment_analyst = Agent(
agent_name="InvestmentAnalyst",
system_prompt="You are an investment analyst focusing on market trends and investment opportunities.",
random_models_on=True,
output_type="final",
)
# Create list of agents including both Agent instances and callable
agents = [
financial_advisor,
tax_expert,
investment_analyst,
]
# Initialize another chat instance in interactive mode
interactive_chat = InteractiveGroupChat(
name="Interactive Financial Advisory Team",
description="An interactive team of financial experts providing comprehensive financial advice",
agents=agents,
max_loops=1,
output_type="all",
interactive=True,
)
try:
# Start the interactive session
print("\nStarting interactive session...")
# interactive_chat.run("What is the best methodology to accumulate gold and silver commodities, what is the best long term strategy to accumulate them?")
interactive_chat.start_interactive_session()
except Exception as e:
print(f"An error occurred in interactive mode: {e}")

@ -240,10 +240,6 @@ class SupabaseConversation(BaseCommunication):
"""
# Try to create index as well
create_index_sql = f"""
CREATE INDEX IF NOT EXISTS idx_{self.table_name}_conversation_id
ON {self.table_name} (conversation_id);
"""
# Attempt to create table using RPC function
# Note: This requires a stored procedure to be created in Supabase
@ -339,7 +335,7 @@ class SupabaseConversation(BaseCommunication):
if hasattr(self.client, "postgrest") and hasattr(
self.client.postgrest, "rpc"
):
result = self.client.postgrest.rpc(
self.client.postgrest.rpc(
"exec_sql", {"query": admin_sql}
).execute()
if self.enable_logging:

@ -0,0 +1,7 @@
from pydantic import BaseModel
class AgentRAGConfig(BaseModel):
"""
Configuration for the AgentRAG class.
"""

@ -1,91 +1,109 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Union, Any, Literal
from litellm.types import (
ChatCompletionPredictionContentParam,
)
from typing import Optional
# from litellm.types import (
# ChatCompletionPredictionContentParam,
# )
class LLMCompletionRequest(BaseModel):
"""Schema for LLM completion request parameters."""
model: Optional[str] = Field(
default=None,
description="The name of the language model to use for text completion",
)
temperature: Optional[float] = Field(
default=0.5,
description="Controls randomness of the output (0.0 to 1.0)",
)
top_p: Optional[float] = Field(
default=None,
description="Controls diversity via nucleus sampling",
)
n: Optional[int] = Field(
default=None, description="Number of completions to generate"
)
stream: Optional[bool] = Field(
default=None, description="Whether to stream the response"
)
stream_options: Optional[dict] = Field(
default=None, description="Options for streaming response"
)
stop: Optional[Any] = Field(
default=None,
description="Up to 4 sequences where the API will stop generating",
)
max_completion_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens for completion including reasoning",
)
max_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens in generated completion",
)
prediction: Optional[ChatCompletionPredictionContentParam] = (
Field(
default=None,
description="Configuration for predicted output",
)
)
presence_penalty: Optional[float] = Field(
default=None,
description="Penalizes new tokens based on existence in text",
)
frequency_penalty: Optional[float] = Field(
default=None,
description="Penalizes new tokens based on frequency in text",
)
logit_bias: Optional[dict] = Field(
default=None,
description="Modifies probability of specific tokens",
)
reasoning_effort: Optional[Literal["low", "medium", "high"]] = (
Field(
default=None,
description="Level of reasoning effort for the model",
)
)
seed: Optional[int] = Field(
default=None, description="Random seed for reproducibility"
)
tools: Optional[List] = Field(
default=None,
description="List of tools available to the model",
)
tool_choice: Optional[Union[str, dict]] = Field(
default=None, description="Choice of tool to use"
)
logprobs: Optional[bool] = Field(
default=None,
description="Whether to return log probabilities",
)
top_logprobs: Optional[int] = Field(
# class LLMCompletionRequest(BaseModel):
# """Schema for LLM completion request parameters."""
# model: Optional[str] = Field(
# default=None,
# description="The name of the language model to use for text completion",
# )
# temperature: Optional[float] = Field(
# default=0.5,
# description="Controls randomness of the output (0.0 to 1.0)",
# )
# top_p: Optional[float] = Field(
# default=None,
# description="Controls diversity via nucleus sampling",
# )
# n: Optional[int] = Field(
# default=None, description="Number of completions to generate"
# )
# stream: Optional[bool] = Field(
# default=None, description="Whether to stream the response"
# )
# stream_options: Optional[dict] = Field(
# default=None, description="Options for streaming response"
# )
# stop: Optional[Any] = Field(
# default=None,
# description="Up to 4 sequences where the API will stop generating",
# )
# max_completion_tokens: Optional[int] = Field(
# default=None,
# description="Maximum tokens for completion including reasoning",
# )
# max_tokens: Optional[int] = Field(
# default=None,
# description="Maximum tokens in generated completion",
# )
# prediction: Optional[ChatCompletionPredictionContentParam] = (
# Field(
# default=None,
# description="Configuration for predicted output",
# )
# )
# presence_penalty: Optional[float] = Field(
# default=None,
# description="Penalizes new tokens based on existence in text",
# )
# frequency_penalty: Optional[float] = Field(
# default=None,
# description="Penalizes new tokens based on frequency in text",
# )
# logit_bias: Optional[dict] = Field(
# default=None,
# description="Modifies probability of specific tokens",
# )
# reasoning_effort: Optional[Literal["low", "medium", "high"]] = (
# Field(
# default=None,
# description="Level of reasoning effort for the model",
# )
# )
# seed: Optional[int] = Field(
# default=None, description="Random seed for reproducibility"
# )
# tools: Optional[List] = Field(
# default=None,
# description="List of tools available to the model",
# )
# tool_choice: Optional[Union[str, dict]] = Field(
# default=None, description="Choice of tool to use"
# )
# logprobs: Optional[bool] = Field(
# default=None,
# description="Whether to return log probabilities",
# )
# top_logprobs: Optional[int] = Field(
# default=None,
# description="Number of most likely tokens to return",
# )
# parallel_tool_calls: Optional[bool] = Field(
# default=None,
# description="Whether to allow parallel tool calls",
# )
# class Config:
# allow_arbitrary_types = True
class ModelConfigOrigin(BaseModel):
"""Schema for model configuration origin."""
model_url: Optional[str] = Field(
default=None,
description="Number of most likely tokens to return",
description="The URL of the model to use for text completion",
)
parallel_tool_calls: Optional[bool] = Field(
api_key: Optional[str] = Field(
default=None,
description="Whether to allow parallel tool calls",
description="The API key to use for the model",
)
class Config:

@ -84,6 +84,7 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.interactive_groupchat import InteractiveGroupChat
__all__ = [
"Agent",
@ -156,4 +157,5 @@ __all__ = [
"aggregate",
"find_agent_by_name",
"run_agent",
"InteractiveGroupChat",
]

@ -40,6 +40,11 @@ from swarms.schemas.base_schemas import (
ChatCompletionResponseChoice,
ChatMessageResponse,
)
from swarms.schemas.llm_agent_schema import ModelConfigOrigin
from swarms.structs.agent_rag_handler import (
RAGConfig,
AgentRAGHandler,
)
from swarms.structs.agent_roles import agent_roles
from swarms.structs.conversation import Conversation
from swarms.structs.safe_loading import (
@ -316,7 +321,7 @@ class Agent:
pdf_path: Optional[str] = None,
list_of_pdf: Optional[str] = None,
tokenizer: Optional[Any] = None,
long_term_memory: Optional[Any] = None,
long_term_memory: Optional[Union[Callable, Any]] = None,
preset_stopping_token: Optional[bool] = False,
traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None,
@ -407,6 +412,10 @@ class Agent:
mcp_config: Optional[MCPConnection] = None,
top_p: Optional[float] = 0.90,
conversation_schema: Optional[ConversationSchema] = None,
aditional_llm_config: Optional[ModelConfigOrigin] = None,
llm_base_url: Optional[str] = None,
llm_api_key: Optional[str] = None,
rag_config: Optional[RAGConfig] = None,
*args,
**kwargs,
):
@ -534,10 +543,10 @@ class Agent:
self.mcp_config = mcp_config
self.top_p = top_p
self.conversation_schema = conversation_schema
self._cached_llm = (
None # Add this line to cache the LLM instance
)
self.aditional_llm_config = aditional_llm_config
self.llm_base_url = llm_base_url
self.llm_api_key = llm_api_key
self.rag_config = rag_config
# self.short_memory = self.short_memory_init()
@ -547,6 +556,8 @@ class Agent:
# self.init_handling()
self.setup_config()
self.short_memory = self.short_memory_init()
if exists(self.docs_folder):
self.get_docs_from_doc_folders()
@ -564,8 +575,6 @@ class Agent:
if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT
self.short_memory = self.short_memory_init()
# Run sequential operations after all concurrent tasks are done
# self.agent_output = self.agent_output_model()
log_agent_data(self.to_dict())
@ -579,6 +588,17 @@ class Agent:
if self.random_models_on is True:
self.model_name = set_random_models_for_agents()
if self.long_term_memory is not None:
self.rag_handler = self.rag_setup_handling()
def rag_setup_handling(self):
return AgentRAGHandler(
long_term_memory=self.long_term_memory,
config=self.rag_config,
agent_name=self.agent_name,
verbose=self.verbose,
)
def tool_handling(self):
self.tool_struct = BaseTool(
@ -661,8 +681,8 @@ class Agent:
def llm_handling(self):
# Use cached instance if available
if self._cached_llm is not None:
return self._cached_llm
if self.llm is not None:
return self.llm
if self.model_name is None:
self.model_name = "gpt-4o-mini"
@ -682,11 +702,9 @@ class Agent:
}
if self.llm_args is not None:
self._cached_llm = LiteLLM(
**{**common_args, **self.llm_args}
)
self.llm = LiteLLM(**{**common_args, **self.llm_args})
elif self.tools_list_dictionary is not None:
self._cached_llm = LiteLLM(
self.llm = LiteLLM(
**common_args,
tools_list_dictionary=self.tools_list_dictionary,
tool_choice="auto",
@ -694,7 +712,7 @@ class Agent:
)
elif self.mcp_url is not None:
self._cached_llm = LiteLLM(
self.llm = LiteLLM(
**common_args,
tools_list_dictionary=self.add_mcp_tools_to_memory(),
tool_choice="auto",
@ -702,11 +720,14 @@ class Agent:
mcp_call=True,
)
else:
self._cached_llm = LiteLLM(
**common_args, stream=self.streaming_on
# common_args.update(self.aditional_llm_config.model_dump())
self.llm = LiteLLM(
**common_args,
stream=self.streaming_on,
)
return self._cached_llm
return self.llm
except AgentLLMInitializationError as e:
logger.error(
f"Error in llm_handling: {e} Your current configuration is not supported. Please check the configuration and parameters."
@ -789,7 +810,7 @@ class Agent:
"No agent details found. Using task as fallback for prompt generation."
)
self.system_prompt = auto_generate_prompt(
task=task, model=self._cached_llm
task=task, model=self.llm
)
else:
# Combine all available components
@ -1012,16 +1033,20 @@ class Agent:
)
self.memory_query(task_prompt)
# Generate response using LLM
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
# # Generate response using LLM
# response_args = (
# (task_prompt, *args)
# if img is None
# else (task_prompt, img, *args)
# )
# # Call the LLM
# response = self.call_llm(
# *response_args, **kwargs
# )
# Call the LLM
response = self.call_llm(
*response_args, **kwargs
task=task_prompt, img=img, *args, **kwargs
)
if exists(self.tools_list_dictionary):
@ -1045,35 +1070,6 @@ class Agent:
# Check and execute tools
if exists(self.tools):
# out = self.parse_and_execute_tools(
# response
# )
# self.short_memory.add(
# role="Tool Executor", content=out
# )
# if self.no_print is False:
# agent_print(
# f"{self.agent_name} - Tool Executor",
# out,
# loop_count,
# self.streaming_on,
# )
# out = self.call_llm(task=out)
# self.short_memory.add(
# role=self.agent_name, content=out
# )
# if self.no_print is False:
# agent_print(
# f"{self.agent_name} - Agent Analysis",
# out,
# loop_count,
# self.streaming_on,
# )
self.execute_tools(
response=response,
@ -1158,9 +1154,6 @@ class Agent:
log_agent_data(self.to_dict())
if self.autosave:
self.save()
# Output formatting based on output_type
return history_output_formatter(
self.short_memory, type=self.output_type
@ -2388,7 +2381,9 @@ class Agent:
return None
def call_llm(self, task: str, *args, **kwargs) -> str:
def call_llm(
self, task: str, img: str = None, *args, **kwargs
) -> str:
"""
Calls the appropriate method on the `llm` object based on the given task.
@ -2407,17 +2402,9 @@ class Agent:
TypeError: If task is not a string or llm object is None.
ValueError: If task is empty.
"""
# if not isinstance(task, str):
# task = any_to_str(task)
# if img is not None:
# kwargs['img'] = img
# if audio is not None:
# kwargs['audio'] = audio
try:
out = self.llm.run(task=task, *args, **kwargs)
out = self.llm.run(task=task, img=img, *args, **kwargs)
return out
except AgentLLMError as e:
@ -2764,13 +2751,7 @@ class Agent:
# Create a temporary LLM instance without tools for the follow-up call
try:
temp_llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
)
temp_llm = self.temp_llm_instance_for_tool_summary()
summary = temp_llm.run(
task=self.short_memory.get_str()
@ -2792,6 +2773,19 @@ class Agent:
logger.error(f"Error in MCP tool: {e}")
raise e
def temp_llm_instance_for_tool_summary(self):
return LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
tools_list_dictionary=None,
parallel_tool_calls=False,
base_url=self.llm_base_url,
api_key=self.llm_api_key,
)
def execute_tools(self, response: any, loop_count: int):
output = (
@ -2813,15 +2807,7 @@ class Agent:
# Now run the LLM again without tools - create a temporary LLM instance
# instead of modifying the cached one
# Create a temporary LLM instance without tools for the follow-up call
temp_llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
tools_list_dictionary=None,
parallel_tool_calls=False,
)
temp_llm = self.temp_llm_instance_for_tool_summary()
tool_response = temp_llm.run(
f"""

@ -0,0 +1,682 @@
import time
from typing import Any, Dict, List, Optional
from loguru import logger
from swarms.utils.litellm_tokenizer import count_tokens
from pydantic import BaseModel, Field, field_validator
class RAGConfig(BaseModel):
"""Configuration class for RAG operations"""
similarity_threshold: float = Field(
default=0.7,
ge=0.0,
le=1.0,
description="Similarity threshold for memory retrieval",
)
max_results: int = Field(
default=5,
gt=0,
description="Maximum number of results to return from memory",
)
context_window_tokens: int = Field(
default=2000,
gt=0,
description="Maximum number of tokens in the context window",
)
auto_save_to_memory: bool = Field(
default=True,
description="Whether to automatically save responses to memory",
)
save_every_n_loops: int = Field(
default=5, gt=0, description="Save to memory every N loops"
)
min_content_length: int = Field(
default=50,
gt=0,
description="Minimum content length to save to memory",
)
query_every_loop: bool = Field(
default=False,
description="Whether to query memory every loop",
)
enable_conversation_summaries: bool = Field(
default=True,
description="Whether to enable conversation summaries",
)
relevance_keywords: Optional[List[str]] = Field(
default=None, description="Keywords to check for relevance"
)
@field_validator("relevance_keywords", pre=True)
def set_default_keywords(cls, v):
if v is None:
return [
"important",
"key",
"critical",
"summary",
"conclusion",
]
return v
class Config:
arbitrary_types_allowed = True
validate_assignment = True
json_schema_extra = {
"example": {
"similarity_threshold": 0.7,
"max_results": 5,
"context_window_tokens": 2000,
"auto_save_to_memory": True,
"save_every_n_loops": 5,
"min_content_length": 50,
"query_every_loop": False,
"enable_conversation_summaries": True,
"relevance_keywords": [
"important",
"key",
"critical",
"summary",
"conclusion",
],
}
}
class AgentRAGHandler:
"""
Handles all RAG (Retrieval-Augmented Generation) operations for agents.
Provides memory querying, storage, and context management capabilities.
"""
def __init__(
self,
long_term_memory: Optional[Any] = None,
config: Optional[RAGConfig] = None,
agent_name: str = "Unknown",
max_context_length: int = 158_000,
verbose: bool = False,
):
"""
Initialize the RAG handler.
Args:
long_term_memory: The long-term memory store (must implement add() and query() methods)
config: RAG configuration settings
agent_name: Name of the agent using this handler
verbose: Enable verbose logging
"""
self.long_term_memory = long_term_memory
self.config = config or RAGConfig()
self.agent_name = agent_name
self.verbose = verbose
self.max_context_length = max_context_length
self._loop_counter = 0
self._conversation_history = []
self._important_memories = []
# Validate memory interface
if (
self.long_term_memory
and not self._validate_memory_interface()
):
logger.warning(
"Long-term memory doesn't implement required interface"
)
def _validate_memory_interface(self) -> bool:
"""Validate that the memory object has required methods"""
required_methods = ["add", "query"]
for method in required_methods:
if not hasattr(self.long_term_memory, method):
logger.error(
f"Memory object missing required method: {method}"
)
return False
return True
def is_enabled(self) -> bool:
"""Check if RAG is enabled (has valid memory store)"""
return self.long_term_memory is not None
def query_memory(
self,
query: str,
context_type: str = "general",
loop_count: Optional[int] = None,
) -> str:
"""
Query the long-term memory and return formatted context.
Args:
query: The query string to search for
context_type: Type of context being queried (for logging)
loop_count: Current loop number (for logging)
Returns:
Formatted string of relevant memories, empty string if no results
"""
if not self.is_enabled():
return ""
try:
if self.verbose:
logger.info(
f"🔍 [{self.agent_name}] Querying RAG for {context_type}: {query[:100]}..."
)
# Query the memory store
results = self.long_term_memory.query(
query=query,
top_k=self.config.max_results,
similarity_threshold=self.config.similarity_threshold,
)
if not results:
if self.verbose:
logger.info(
f"No relevant memories found for query: {context_type}"
)
return ""
# Format results for context
formatted_context = self._format_memory_results(
results, context_type, loop_count
)
# Ensure context fits within token limits
if (
count_tokens(formatted_context)
> self.config.context_window_tokens
):
formatted_context = self._truncate_context(
formatted_context
)
if self.verbose:
logger.info(
f"✅ Retrieved {len(results)} relevant memories for {context_type}"
)
return formatted_context
except Exception as e:
logger.error(f"Error querying long-term memory: {e}")
return ""
def _format_memory_results(
self,
results: List[Any],
context_type: str,
loop_count: Optional[int] = None,
) -> str:
"""Format memory results into a structured context string"""
if not results:
return ""
loop_info = f" (Loop {loop_count})" if loop_count else ""
header = (
f"📚 Relevant Knowledge - {context_type.title()}{loop_info}:\n"
+ "=" * 50
+ "\n"
)
formatted_sections = [header]
for i, result in enumerate(results, 1):
content, score, source, metadata = (
self._extract_result_fields(result)
)
section = f"""
[Memory {i}] Relevance: {score} | Source: {source}
{'-' * 40}
{content}
{'-' * 40}
"""
formatted_sections.append(section)
formatted_sections.append(f"\n{'='*50}\n")
return "\n".join(formatted_sections)
def _extract_result_fields(self, result: Any) -> tuple:
"""Extract content, score, source, and metadata from a result object"""
if isinstance(result, dict):
content = result.get(
"content", result.get("text", str(result))
)
score = result.get(
"score", result.get("similarity", "N/A")
)
metadata = result.get("metadata", {})
source = metadata.get(
"source", result.get("source", "Unknown")
)
else:
content = str(result)
score = "N/A"
source = "Unknown"
metadata = {}
return content, score, source, metadata
def _truncate_context(self, content: str) -> str:
"""Truncate content to fit within token limits using smart truncation"""
max_chars = (
self.config.context_window_tokens * 3
) # Rough token-to-char ratio
if len(content) <= max_chars:
return content
# Try to cut at section boundaries first
sections = content.split("=" * 50)
if len(sections) > 2: # Header + sections + footer
truncated_sections = [sections[0]] # Keep header
current_length = len(sections[0])
for section in sections[1:-1]: # Skip footer
if current_length + len(section) > max_chars * 0.9:
break
truncated_sections.append(section)
current_length += len(section)
truncated_sections.append(
f"\n[... {len(sections) - len(truncated_sections)} more memories truncated for length ...]\n"
)
truncated_sections.append(sections[-1]) # Keep footer
return "=" * (50).join(truncated_sections)
# Fallback: simple truncation at sentence boundary
truncated = content[:max_chars]
last_period = truncated.rfind(".")
if last_period > max_chars * 0.8:
truncated = truncated[: last_period + 1]
return (
truncated + "\n\n[... content truncated for length ...]"
)
def should_save_response(
self,
response: str,
loop_count: int,
has_tool_usage: bool = False,
) -> bool:
"""
Determine if a response should be saved to long-term memory.
Args:
response: The response text to evaluate
loop_count: Current loop number
has_tool_usage: Whether tools were used in this response
Returns:
Boolean indicating whether to save the response
"""
if (
not self.is_enabled()
or not self.config.auto_save_to_memory
):
return False
# Content length check
if len(response.strip()) < self.config.min_content_length:
return False
save_conditions = [
# Substantial content
len(response) > 200,
# Contains important keywords
any(
keyword in response.lower()
for keyword in self.config.relevance_keywords
),
# Periodic saves
loop_count % self.config.save_every_n_loops == 0,
# Tool usage indicates potentially important information
has_tool_usage,
# Complex responses (multiple sentences)
response.count(".") >= 3,
# Contains structured data or lists
any(
marker in response
for marker in ["- ", "1. ", "2. ", "* ", "```"]
),
]
return any(save_conditions)
def save_to_memory(
self,
content: str,
metadata: Optional[Dict] = None,
content_type: str = "response",
) -> bool:
"""
Save content to long-term memory with metadata.
Args:
content: The content to save
metadata: Additional metadata to store
content_type: Type of content being saved
Returns:
Boolean indicating success
"""
if not self.is_enabled():
return False
if (
not content
or len(content.strip()) < self.config.min_content_length
):
return False
try:
# Create default metadata
default_metadata = {
"timestamp": time.time(),
"agent_name": self.agent_name,
"content_type": content_type,
"loop_count": self._loop_counter,
"saved_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
# Merge with provided metadata
if metadata:
default_metadata.update(metadata)
if self.verbose:
logger.info(
f"💾 [{self.agent_name}] Saving to long-term memory: {content[:100]}..."
)
success = self.long_term_memory.add(
content, metadata=default_metadata
)
if success and self.verbose:
logger.info(
f"✅ Successfully saved {content_type} to long-term memory"
)
# Track important memories
if content_type in [
"final_response",
"conversation_summary",
]:
self._important_memories.append(
{
"content": content[:200],
"timestamp": time.time(),
"type": content_type,
}
)
return success
except Exception as e:
logger.error(f"Error saving to long-term memory: {e}")
return False
def create_conversation_summary(
self,
task: str,
final_response: str,
total_loops: int,
tools_used: List[str] = None,
) -> str:
"""Create a comprehensive summary of the conversation"""
tools_info = (
f"Tools Used: {', '.join(tools_used)}"
if tools_used
else "Tools Used: None"
)
summary = f"""
CONVERSATION SUMMARY
====================
Agent: {self.agent_name}
Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}
ORIGINAL TASK:
{task}
FINAL RESPONSE:
{final_response}
EXECUTION DETAILS:
- Total Reasoning Loops: {total_loops}
- {tools_info}
- Memory Queries Made: {len(self._conversation_history)}
KEY INSIGHTS:
{self._extract_key_insights(final_response)}
====================
"""
return summary
def _extract_key_insights(self, response: str) -> str:
"""Extract key insights from the response for summary"""
# Simple keyword-based extraction
insights = []
sentences = response.split(".")
for sentence in sentences:
if any(
keyword in sentence.lower()
for keyword in self.config.relevance_keywords[:5]
):
insights.append(sentence.strip())
if insights:
return "\n- " + "\n- ".join(
insights[:3]
) # Top 3 insights
return "No specific insights extracted"
def handle_loop_memory_operations(
self,
task: str,
response: str,
loop_count: int,
conversation_context: str = "",
has_tool_usage: bool = False,
) -> str:
"""
Handle all memory operations for a single loop iteration.
Args:
task: Original task
response: Current response
loop_count: Current loop number
conversation_context: Current conversation context
has_tool_usage: Whether tools were used
Returns:
Retrieved context string (empty if no relevant memories)
"""
self._loop_counter = loop_count
retrieved_context = ""
# 1. Query memory if enabled for this loop
if self.config.query_every_loop and loop_count > 1:
query_context = f"Task: {task}\nCurrent Context: {conversation_context[-500:]}"
retrieved_context = self.query_memory(
query_context,
context_type=f"loop_{loop_count}",
loop_count=loop_count,
)
# 2. Save response if criteria met
if self.should_save_response(
response, loop_count, has_tool_usage
):
self.save_to_memory(
content=response,
metadata={
"task_preview": task[:200],
"loop_count": loop_count,
"has_tool_usage": has_tool_usage,
},
content_type="loop_response",
)
return retrieved_context
def handle_initial_memory_query(self, task: str) -> str:
"""Handle the initial memory query before reasoning loops begin"""
if not self.is_enabled():
return ""
return self.query_memory(task, context_type="initial_task")
def handle_final_memory_consolidation(
self,
task: str,
final_response: str,
total_loops: int,
tools_used: List[str] = None,
) -> bool:
"""Handle final memory consolidation after all loops complete"""
if (
not self.is_enabled()
or not self.config.enable_conversation_summaries
):
return False
# Create and save conversation summary
summary = self.create_conversation_summary(
task, final_response, total_loops, tools_used
)
return self.save_to_memory(
content=summary,
metadata={
"task": task[:200],
"total_loops": total_loops,
"tools_used": tools_used or [],
},
content_type="conversation_summary",
)
def search_memories(
self,
query: str,
top_k: int = None,
similarity_threshold: float = None,
) -> List[Dict]:
"""
Search long-term memory and return raw results.
Args:
query: Search query
top_k: Number of results to return (uses config default if None)
similarity_threshold: Similarity threshold (uses config default if None)
Returns:
List of memory results
"""
if not self.is_enabled():
return []
try:
results = self.long_term_memory.query(
query=query,
top_k=top_k or self.config.max_results,
similarity_threshold=similarity_threshold
or self.config.similarity_threshold,
)
return results if results else []
except Exception as e:
logger.error(f"Error searching memories: {e}")
return []
def get_memory_stats(self) -> Dict[str, Any]:
"""Get statistics about memory usage and operations"""
return {
"is_enabled": self.is_enabled(),
"config": self.config.__dict__,
"loops_processed": self._loop_counter,
"important_memories_count": len(self._important_memories),
"last_important_memories": (
self._important_memories[-3:]
if self._important_memories
else []
),
"memory_store_type": (
type(self.long_term_memory).__name__
if self.long_term_memory
else None
),
}
def clear_session_data(self):
"""Clear session-specific data (not the long-term memory store)"""
self._loop_counter = 0
self._conversation_history.clear()
self._important_memories.clear()
if self.verbose:
logger.info(f"[{self.agent_name}] Session data cleared")
def update_config(self, **kwargs):
"""Update RAG configuration parameters"""
for key, value in kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
if self.verbose:
logger.info(
f"Updated RAG config: {key} = {value}"
)
else:
logger.warning(f"Unknown config parameter: {key}")
# # Example memory interface that your RAG implementation should follow
# class ExampleMemoryInterface:
# """Example interface for long-term memory implementations"""
# def add(self, content: str, metadata: Dict = None) -> bool:
# """
# Add content to the memory store.
# Args:
# content: Text content to store
# metadata: Additional metadata dictionary
# Returns:
# Boolean indicating success
# """
# # Your vector database implementation here
# return True
# def query(
# self,
# query: str,
# top_k: int = 5,
# similarity_threshold: float = 0.7
# ) -> List[Dict]:
# """
# Query the memory store for relevant content.
# Args:
# query: Search query string
# top_k: Maximum number of results to return
# similarity_threshold: Minimum similarity score
# Returns:
# List of dictionaries with keys: 'content', 'score', 'metadata'
# """
# # Your vector database query implementation here
# return [
# {
# 'content': 'Example memory content',
# 'score': 0.85,
# 'metadata': {'source': 'example', 'timestamp': time.time()}
# }
# ]

@ -1,12 +1,17 @@
import asyncio
import concurrent.futures
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Tuple
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
import aiohttp
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.tree import Tree
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.agent import Agent
@ -116,22 +121,44 @@ async def _async_exa_search(
) -> Dict[str, Any]:
"""Asynchronous helper function for Exa.ai API requests"""
api_url = "https://api.exa.ai/search"
# Check if API key is available
api_key = os.getenv("EXA_API_KEY")
if not api_key:
return {"error": "EXA_API_KEY environment variable not set"}
headers = {
"x-api-key": os.getenv("EXA_API_KEY"),
"x-api-key": api_key,
"Content-Type": "application/json",
}
# Filter out None keys AND None values from kwargs
safe_kwargs = {
str(k): v
for k, v in kwargs.items()
if k is not None and v is not None and str(k) != "None"
}
payload = {
"query": query,
"useAutoprompt": True,
"numResults": kwargs.get("num_results", 10),
"numResults": safe_kwargs.get("num_results", 10),
"contents": {
"text": True,
"highlights": {"numSentences": 2},
},
**kwargs,
}
# Only add safe_kwargs if they don't conflict with existing keys
for key, value in safe_kwargs.items():
if key not in payload and key not in [
"query",
"useAutoprompt",
"numResults",
"contents",
]:
payload[key] = value
try:
async with aiohttp.ClientSession() as session:
async with session.post(
@ -370,24 +397,20 @@ class DeepResearchSwarm:
return []
def _process_query(self, query: str) -> Tuple[str, str]:
def _process_query(self, query: str) -> str:
"""
Process a single query with search and reasoning.
Process a single query with search only.
This function is designed to be run in a separate thread.
Args:
query (str): The query to process
Returns:
Tuple[str, str]: A tuple containing (search_results, reasoning_output)
str: Search results
"""
# Run the search
# Run the search only - no individual reasoning to avoid duplication
results = exa_search(query)
# Run the reasoning on the search results
reasoning_output = self.reasoning_duo.run(results)
return (results, reasoning_output)
return results
def step(self, query: str):
"""
@ -399,54 +422,100 @@ class DeepResearchSwarm:
Returns:
Formatted conversation history
"""
# Get all the queries to process
queries = self.get_queries(query)
try:
# Get all the queries to process
queries = self.get_queries(query)
# Submit all queries for concurrent processing
# Using a list instead of generator for clearer debugging
futures = []
for q in queries:
future = self.executor.submit(self._process_query, q)
futures.append((q, future))
if not queries:
error_msg = (
"No queries generated. Please check your input."
)
self.conversation.add(
role="System", content=error_msg
)
return history_output_formatter(
self.conversation, type=self.output_type
)
# Process results as they complete (no waiting for slower queries)
for q, future in futures:
# Submit all queries for concurrent processing
futures = []
for q in queries:
future = self.executor.submit(self._process_query, q)
futures.append((q, future))
# Process results as they complete
for q, future in futures:
try:
# Get search results only
results = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
)
except Exception as e:
# Handle any errors in the thread
error_msg = (
f"Error processing query '{q}': {str(e)}"
)
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Generate final comprehensive analysis after all searches are complete
try:
# Get results (blocks until this specific future is done)
results, reasoning_output = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
final_summary = self.reasoning_duo.run(
f"Generate an extensive report of the following content: {self.conversation.get_str()}"
)
# Add reasoning output to conversation
self.conversation.add(
role=self.reasoning_duo.agent_name,
content=reasoning_output,
content=final_summary,
)
except Exception as e:
# Handle any errors in the thread
error_msg = (
f"Error generating final summary: {str(e)}"
)
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=f"Error processing query '{q}': {str(e)}",
content=error_msg,
)
# Once all query processing is complete, generate the final summary
# This step runs after all queries to ensure it summarizes all results
final_summary = self.reasoning_duo.run(
f"Generate an extensive report of the following content: {self.conversation.get_str()}"
)
self.conversation.add(
role=self.reasoning_duo.agent_name,
content=final_summary,
)
# Return formatted output
result = history_output_formatter(
self.conversation, type=self.output_type
)
return history_output_formatter(
self.conversation, type=self.output_type
)
# If output type is JSON, ensure it's properly formatted
if self.output_type.lower() == "json":
try:
import json
if isinstance(result, str):
# Try to parse and reformat for pretty printing
parsed = json.loads(result)
return json.dumps(
parsed, indent=2, ensure_ascii=False
)
except (json.JSONDecodeError, TypeError):
# If parsing fails, return as-is
pass
return result
except Exception as e:
error_msg = f"Critical error in step execution: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
return (
{"error": error_msg}
if self.output_type.lower() == "json"
else error_msg
)
def run(self, task: str):
return self.step(task)
@ -466,14 +535,159 @@ class DeepResearchSwarm:
future = self.executor.submit(self.step, task)
futures.append((task, future))
def parse_and_display_results(
self, json_result: str, export_markdown: bool = True
):
"""
Parse JSON results and display in rich format with optional markdown export.
Args:
json_result (str): JSON string containing conversation results
export_markdown (bool): Whether to export to markdown file
"""
try:
# Parse JSON
data = json.loads(json_result)
# Create rich display
console.print("\n" + "=" * 100, style="cyan")
console.print(
"🔬 DEEP RESEARCH RESULTS",
style="bold cyan",
justify="center",
)
console.print("=" * 100, style="cyan")
# Create conversation tree
tree = Tree("🗣️ Research Conversation", style="bold blue")
markdown_content = [
"# Deep Research Results\n",
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
]
for i, entry in enumerate(data, 1):
if isinstance(entry, dict):
role = entry.get("role", "Unknown")
content = entry.get("content", "")
timestamp = entry.get("timestamp", "")
# Get role info for display
role_info = self._get_role_display_info(role)
# Create tree branch
branch_text = f"{role_info['emoji']} {role}"
if timestamp:
time_part = (
timestamp.split()[-1]
if " " in timestamp
else timestamp[-8:]
)
branch_text += f" ({time_part})"
branch = tree.add(
branch_text, style=role_info["style"]
)
# Add content preview to tree
content_preview = (
content[:150] + "..."
if len(content) > 150
else content
)
content_preview = content_preview.replace(
"\n", " "
)
branch.add(content_preview, style="dim")
# Add to markdown
markdown_content.append(f"\n## {i}. {role}")
if timestamp:
markdown_content.append(
f"**Timestamp:** {timestamp}"
)
markdown_content.append(f"\n{content}\n")
# Display full content for important entries
if (
role.lower() in ["reasoning-agent-01"]
and len(content) > 300
):
console.print(
f"\n📋 {role} Full Response:",
style="bold green",
)
console.print(
Panel(
content,
border_style="green",
title=f"{role} Analysis",
)
)
# Display the tree
console.print(tree)
# Export to markdown if requested
if export_markdown:
# Create deepsearch_results directory
results_dir = Path("deepsearch_results")
results_dir.mkdir(exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = (
results_dir / f"research_results_{timestamp}.md"
)
# Write markdown file
with open(filename, "w", encoding="utf-8") as f:
f.write("\n".join(markdown_content))
console.print(
f"\n💾 Results exported to: {filename}",
style="bold green",
)
# # Example usage
console.print(
"\n✅ Research analysis complete!", style="bold cyan"
)
except json.JSONDecodeError as e:
console.print(f"❌ Error parsing JSON: {e}", style="red")
except Exception as e:
console.print(
f"❌ Error displaying results: {e}", style="red"
)
def _get_role_display_info(self, role: str) -> Dict[str, str]:
"""Get display information for different conversation roles."""
role_map = {
"user": {"emoji": "👤", "style": "cyan"},
"deep-research-agent": {"emoji": "🔍", "style": "blue"},
"reasoning-agent-01": {"emoji": "🧠", "style": "magenta"},
"system": {"emoji": "⚙️", "style": "yellow"},
}
role_lower = role.lower()
return role_map.get(
role_lower, {"emoji": "🤖", "style": "white"}
)
# Example usage
# if __name__ == "__main__":
# swarm = DeepResearchSwarm(
# output_type="json",
# )
# print(
# swarm.step(
# "What is the active tarrif situation with mexico? Only create 2 queries"
# try:
# swarm = DeepResearchSwarm(
# output_type="json",
# )
# result = swarm.step(
# "What is the active tariff situation with mexico? Only create 2 queries"
# )
# )
# # Parse and display results in rich format with markdown export
# swarm.parse_and_display_results(result, export_markdown=True)
# except Exception as e:
# print(f"Error running deep research swarm: {str(e)}")
# import traceback
# traceback.print_exc()

@ -0,0 +1,261 @@
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from loguru import logger
from swarms.structs import Agent
class ImageProcessingError(Exception):
"""Custom exception for image processing errors."""
pass
class InvalidAgentError(Exception):
"""Custom exception for invalid agent configurations."""
pass
class ImageAgentBatchProcessor:
"""
A class for processing multiple images in parallel using one or more agents.
This processor can:
- Handle multiple images from a directory
- Process images with single or multiple agents
- Execute tasks in parallel
- Provide detailed logging and error handling
Attributes:
agents (List[Agent]): List of agents to process images
max_workers (int): Maximum number of parallel workers
supported_formats (set): Set of supported image formats
"""
def __init__(
self,
agents: Union[Agent, List[Agent], Callable, List[Callable]],
max_workers: int = None,
supported_formats: Optional[List[str]] = None,
):
"""
Initialize the ImageBatchProcessor.
Args:
agents: Single agent or list of agents to process images
max_workers: Maximum number of parallel workers (default: 4)
supported_formats: List of supported image formats (default: ['.jpg', '.jpeg', '.png'])
Raises:
InvalidAgentError: If agents parameter is invalid
"""
self.agents = agents
self.max_workers = max_workers
self.supported_formats = supported_formats
self.agents = (
[agents] if isinstance(agents, Agent) else agents
)
if not self.agents:
raise InvalidAgentError(
"At least one agent must be provided"
)
# Get 95% of the total number of cores
self.max_workers = int(os.cpu_count() * 0.95)
self.supported_formats = set(
supported_formats or [".jpg", ".jpeg", ".png"]
)
# Setup logging
logger.add(
"image_processor.log",
rotation="100 MB",
retention="10 days",
level="INFO",
)
def _validate_image_path(
self, image_path: Union[str, Path]
) -> Path:
"""
Validate if the image path exists and has supported format.
Args:
image_path: Path to the image file
Returns:
Path: Validated Path object
Raises:
ImageProcessingError: If path is invalid or format not supported
"""
path = Path(image_path)
if not path.exists():
raise ImageProcessingError(
f"Image path does not exist: {path}"
)
if path.suffix.lower() not in self.supported_formats:
raise ImageProcessingError(
f"Unsupported image format {path.suffix}. Supported formats: {self.supported_formats}"
)
return path
def _process_single_image(
self,
image_path: Path,
tasks: Union[str, List[str]],
agent: Agent,
) -> Dict[str, Any]:
"""
Process a single image with one agent and one or more tasks.
Args:
image_path: Path to the image
tasks: Single task or list of tasks to perform
agent: Agent to process the image
Returns:
Dict containing results for each task
"""
try:
tasks_list = [tasks] if isinstance(tasks, str) else tasks
results = {}
logger.info(
f"Processing image {image_path} with agent {agent.__class__.__name__}"
)
start_time = time.time()
for task in tasks_list:
try:
result = agent.run(task=task, img=str(image_path))
results[task] = result
except Exception as e:
logger.error(
f"Error processing task '{task}' for image {image_path}: {str(e)}"
)
results[task] = f"Error: {str(e)}"
processing_time = time.time() - start_time
logger.info(
f"Completed processing {image_path} in {processing_time:.2f} seconds"
)
return {
"image_path": str(image_path),
"results": results,
"processing_time": processing_time,
}
except Exception as e:
logger.error(
f"Failed to process image {image_path}: {str(e)}"
)
raise ImageProcessingError(
f"Failed to process image {image_path}: {str(e)}"
)
def run(
self,
image_paths: Union[str, List[str], Path],
tasks: Union[str, List[str]],
) -> List[Dict[str, Any]]:
"""
Process multiple images in parallel with the configured agents.
Args:
image_paths: Single image path or list of image paths or directory path
tasks: Single task or list of tasks to perform on each image
Returns:
List of dictionaries containing results for each image
Raises:
ImageProcessingError: If any image processing fails
"""
# Handle directory input
if (
isinstance(image_paths, (str, Path))
and Path(image_paths).is_dir()
):
image_paths = [
os.path.join(image_paths, f)
for f in os.listdir(image_paths)
if Path(os.path.join(image_paths, f)).suffix.lower()
in self.supported_formats
]
elif isinstance(image_paths, (str, Path)):
image_paths = [image_paths]
# Validate all paths
validated_paths = [
self._validate_image_path(path) for path in image_paths
]
if not validated_paths:
logger.warning("No valid images found to process")
return []
logger.info(
f"Starting batch processing of {len(validated_paths)} images"
)
results = []
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
future_to_path = {}
# Submit all tasks
for path in validated_paths:
for agent in self.agents:
future = executor.submit(
self._process_single_image, path, tasks, agent
)
future_to_path[future] = (path, agent)
# Collect results as they complete
for future in as_completed(future_to_path):
path, agent = future_to_path[future]
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(
f"Failed to process {path} with {agent.__class__.__name__}: {str(e)}"
)
results.append(
{
"image_path": str(path),
"error": str(e),
"agent": agent.__class__.__name__,
}
)
logger.info(
f"Completed batch processing of {len(validated_paths)} images"
)
return results
def __call__(self, *args, **kwargs):
"""
Make the ImageAgentBatchProcessor callable like a function.
This allows the processor to be used directly as a function, which will
call the run() method with the provided arguments.
Args:
*args: Variable length argument list to pass to run()
**kwargs: Arbitrary keyword arguments to pass to run()
Returns:
The result of calling run() with the provided arguments
"""
return self.run(*args, **kwargs)

@ -0,0 +1,356 @@
import re
from typing import Callable, List, Union
from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.generate_keys import generate_api_key
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
class InteractiveGroupChatError(Exception):
"""Base exception class for InteractiveGroupChat errors"""
pass
class AgentNotFoundError(InteractiveGroupChatError):
"""Raised when a mentioned agent is not found in the group"""
pass
class NoMentionedAgentsError(InteractiveGroupChatError):
"""Raised when no agents are mentioned in the task"""
pass
class InvalidTaskFormatError(InteractiveGroupChatError):
"""Raised when the task format is invalid"""
pass
class InteractiveGroupChat:
"""
An interactive group chat system that enables conversations with multiple agents using @mentions.
This class allows users to interact with multiple agents by mentioning them using @agent_name syntax.
When multiple agents are mentioned, they can see and respond to each other's tasks.
Attributes:
name (str): Name of the group chat
description (str): Description of the group chat's purpose
agents (List[Union[Agent, Callable]]): List of Agent instances or callable functions
max_loops (int): Maximum number of conversation turns
conversation (Conversation): Stores the chat history
agent_map (Dict[str, Union[Agent, Callable]]): Mapping of agent names to their instances
Args:
name (str, optional): Name of the group chat. Defaults to "InteractiveGroupChat".
description (str, optional): Description of the chat. Defaults to "An interactive group chat for multiple agents".
agents (List[Union[Agent, Callable]], optional): List of participating agents or callables. Defaults to empty list.
max_loops (int, optional): Maximum conversation turns. Defaults to 1.
output_type (str, optional): Type of output format. Defaults to "string".
interactive (bool, optional): Whether to enable interactive terminal mode. Defaults to False.
Raises:
ValueError: If invalid initialization parameters are provided
"""
def __init__(
self,
id: str = generate_api_key(prefix="swarms-"),
name: str = "InteractiveGroupChat",
description: str = "An interactive group chat for multiple agents",
agents: List[Union[Agent, Callable]] = [],
max_loops: int = 1,
output_type: str = "string",
interactive: bool = False,
):
self.id = id
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.output_type = output_type
self.interactive = interactive
# Initialize conversation history
self.conversation = Conversation(time_enabled=True)
# Create a mapping of agent names to agents for easy lookup
self.agent_map = {}
for agent in agents:
if isinstance(agent, Agent):
self.agent_map[agent.agent_name] = agent
elif callable(agent):
# For callable functions, use the function name as the agent name
self.agent_map[agent.__name__] = agent
self._validate_initialization()
self._setup_conversation_context()
self._update_agent_prompts()
def _validate_initialization(self) -> None:
"""
Validates the group chat configuration.
Raises:
ValueError: If any required components are missing or invalid
"""
if len(self.agents) < 1:
raise ValueError(
"At least one agent is required for the group chat"
)
if self.max_loops <= 0:
raise ValueError("Max loops must be greater than 0")
def _setup_conversation_context(self) -> None:
"""Sets up the initial conversation context with group chat information."""
agent_info = []
for agent in self.agents:
if isinstance(agent, Agent):
agent_info.append(
f"- {agent.agent_name}: {agent.system_prompt}"
)
elif callable(agent):
agent_info.append(
f"- {agent.__name__}: Custom callable function"
)
context = (
f"Group Chat Name: {self.name}\n"
f"Description: {self.description}\n"
f"Available Agents:\n" + "\n".join(agent_info)
)
self.conversation.add(role="System", content=context)
def _update_agent_prompts(self) -> None:
"""Updates each agent's system prompt with information about other agents and the group chat."""
agent_info = []
for agent in self.agents:
if isinstance(agent, Agent):
agent_info.append(
{
"name": agent.agent_name,
"description": agent.system_prompt,
}
)
elif callable(agent):
agent_info.append(
{
"name": agent.__name__,
"description": "Custom callable function",
}
)
group_context = (
f"\n\nYou are part of a group chat named '{self.name}' with the following description: {self.description}\n"
f"Other participants in this chat:\n"
)
for agent in self.agents:
if isinstance(agent, Agent):
# Create context excluding the current agent
other_agents = [
info
for info in agent_info
if info["name"] != agent.agent_name
]
agent_context = group_context
for other in other_agents:
agent_context += (
f"- {other['name']}: {other['description']}\n"
)
# Update the agent's system prompt
agent.system_prompt = (
agent.system_prompt + agent_context
)
logger.info(
f"Updated system prompt for agent: {agent.agent_name}"
)
def _extract_mentions(self, task: str) -> List[str]:
"""
Extracts @mentions from the task.
Args:
task (str): The input task
Returns:
List[str]: List of mentioned agent names
Raises:
InvalidtaskFormatError: If the task format is invalid
"""
try:
# Find all @mentions using regex
mentions = re.findall(r"@(\w+)", task)
return [
mention
for mention in mentions
if mention in self.agent_map
]
except Exception as e:
logger.error(f"Error extracting mentions: {e}")
raise InvalidTaskFormatError(f"Invalid task format: {e}")
def start_interactive_session(self):
"""
Start an interactive terminal session for chatting with agents.
This method creates a REPL (Read-Eval-Print Loop) that allows users to:
- Chat with agents using @mentions
- See available agents and their descriptions
- Exit the session using 'exit' or 'quit'
- Get help using 'help' or '?'
"""
if not self.interactive:
raise InteractiveGroupChatError(
"Interactive mode is not enabled. Initialize with interactive=True"
)
print(f"\nWelcome to {self.name}!")
print(f"Description: {self.description}")
print("\nAvailable agents:")
for name, agent in self.agent_map.items():
if isinstance(agent, Agent):
print(
f"- @{name}: {agent.system_prompt.split('\n')[0]}"
)
else:
print(f"- @{name}: Custom callable function")
print("\nCommands:")
print("- Type 'help' or '?' for help")
print("- Type 'exit' or 'quit' to end the session")
print("- Use @agent_name to mention agents")
print("\nStart chatting:")
while True:
try:
# Get user input
user_input = input("\nYou: ").strip()
# Handle special commands
if user_input.lower() in ["exit", "quit"]:
print("Goodbye!")
break
if user_input.lower() in ["help", "?"]:
print("\nHelp:")
print("1. Mention agents using @agent_name")
print(
"2. You can mention multiple agents in one task"
)
print("3. Available agents:")
for name in self.agent_map:
print(f" - @{name}")
print(
"4. Type 'exit' or 'quit' to end the session"
)
continue
if not user_input:
continue
# Process the task and get responses
try:
response = self.run(user_input)
print("\nChat:")
print(response)
except NoMentionedAgentsError:
print(
"\nError: Please mention at least one agent using @agent_name"
)
except AgentNotFoundError as e:
print(f"\nError: {str(e)}")
except Exception as e:
print(f"\nAn error occurred: {str(e)}")
except KeyboardInterrupt:
print("\nSession terminated by user. Goodbye!")
break
except Exception as e:
print(f"\nAn unexpected error occurred: {str(e)}")
print(
"The session will continue. You can type 'exit' to end it."
)
def run(self, task: str) -> str:
"""
Process a task and get responses from mentioned agents.
If interactive mode is enabled, this will be called by start_interactive_session().
Otherwise, it can be called directly for single task processing.
"""
try:
# Extract mentioned agents
mentioned_agents = self._extract_mentions(task)
if not mentioned_agents:
raise NoMentionedAgentsError(
"No valid agents mentioned in the task"
)
# Add user task to conversation
self.conversation.add(role="User", content=task)
# Get responses from mentioned agents
for agent_name in mentioned_agents:
agent = self.agent_map.get(agent_name)
if not agent:
raise AgentNotFoundError(
f"Agent '{agent_name}' not found"
)
try:
# Get the complete conversation history
context = (
self.conversation.return_history_as_string()
)
# Get response from agent
if isinstance(agent, Agent):
response = agent.run(
task=f"{context}\nPlease respond to the latest task as {agent_name}."
)
else:
# For callable functions
response = agent(context)
# Add response to conversation
if response and not response.isspace():
self.conversation.add(
role=agent_name, content=response
)
logger.info(f"Agent {agent_name} responded")
except Exception as e:
logger.error(
f"Error getting response from {agent_name}: {e}"
)
self.conversation.add(
role=agent_name,
content=f"Error: Unable to generate response - {str(e)}",
)
return history_output_formatter(
self.conversation, self.output_type
)
except InteractiveGroupChatError as e:
logger.error(f"GroupChat error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise InteractiveGroupChatError(
f"Unexpected error occurred: {str(e)}"
)

@ -1,6 +1,8 @@
import traceback
from typing import Optional
import base64
import requests
from pathlib import Path
import asyncio
from typing import List
@ -9,11 +11,7 @@ from loguru import logger
import litellm
from pydantic import BaseModel
from litellm import completion, acompletion
litellm.set_verbose = True
litellm.ssl_verify = False
# litellm._turn_on_debug()
from litellm import completion, acompletion, supports_vision
class LiteLLMException(Exception):
@ -53,6 +51,35 @@ def get_audio_base64(audio_source: str) -> str:
return encoded_string
def get_image_base64(image_source: str) -> str:
"""
Convert image from a given source to a base64 encoded string.
Handles URLs, local file paths, and data URIs.
"""
# If already a data URI, return as is
if image_source.startswith("data:image"):
return image_source
# Handle URL
if image_source.startswith(("http://", "https://")):
response = requests.get(image_source)
response.raise_for_status()
image_data = response.content
# Handle local file
else:
with open(image_source, "rb") as file:
image_data = file.read()
# Get file extension for mime type
extension = Path(image_source).suffix.lower()
mime_type = (
f"image/{extension[1:]}" if extension else "image/jpeg"
)
encoded_string = base64.b64encode(image_data).decode("utf-8")
return f"data:{mime_type};base64,{encoded_string}"
class LiteLLM:
"""
This class represents a LiteLLM.
@ -72,12 +99,15 @@ class LiteLLM:
tool_choice: str = "auto",
parallel_tool_calls: bool = False,
audio: str = None,
retries: int = 3,
retries: int = 0,
verbose: bool = False,
caching: bool = False,
mcp_call: bool = False,
top_p: float = 1.0,
functions: List[dict] = None,
return_all: bool = False,
base_url: str = None,
api_key: str = None,
*args,
**kwargs,
):
@ -105,8 +135,11 @@ class LiteLLM:
self.mcp_call = mcp_call
self.top_p = top_p
self.functions = functions
self.audio = audio
self.return_all = return_all
self.base_url = base_url
self.api_key = api_key
self.modalities = []
self._cached_messages = {} # Cache for prepared messages
self.messages = [] # Initialize messages list
# Configure litellm settings
@ -135,7 +168,11 @@ class LiteLLM:
out = out.model_dump()
return out
def _prepare_messages(self, task: str) -> list:
def _prepare_messages(
self,
task: str,
img: str = None,
):
"""
Prepare the messages for the given task.
@ -145,91 +182,201 @@ class LiteLLM:
Returns:
list: A list of messages prepared for the task.
"""
# Check cache first
cache_key = f"{self.system_prompt}:{task}"
if cache_key in self._cached_messages:
return self._cached_messages[cache_key].copy()
self.check_if_model_supports_vision(img=img)
# Initialize messages
messages = []
if self.system_prompt:
# Add system prompt if present
if self.system_prompt is not None:
messages.append(
{"role": "system", "content": self.system_prompt}
)
messages.append({"role": "user", "content": task})
# Cache the prepared messages
self._cached_messages[cache_key] = messages.copy()
# Handle vision case
if img is not None:
messages = self.vision_processing(
task=task, image=img, messages=messages
)
else:
messages.append({"role": "user", "content": task})
return messages
def audio_processing(self, task: str, audio: str):
def anthropic_vision_processing(
self, task: str, image: str, messages: list
) -> list:
"""
Process the audio for the given task.
Args:
task (str): The task to be processed.
audio (str): The path or identifier for the audio file.
Process vision input specifically for Anthropic models.
Handles Anthropic's specific image format requirements.
"""
self.modalities.append("audio")
encoded_string = get_audio_base64(audio)
# Get base64 encoded image
image_url = get_image_base64(image)
# Extract mime type from the data URI or use default
mime_type = "image/jpeg" # default
if "data:" in image_url and ";base64," in image_url:
mime_type = image_url.split(";base64,")[0].split("data:")[
1
]
# Ensure mime type is one of the supported formats
supported_formats = [
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
]
if mime_type not in supported_formats:
mime_type = (
"image/jpeg" # fallback to jpeg if unsupported
)
# Append messages
self.messages.append(
# Construct Anthropic vision message
messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": task},
{
"type": "input_audio",
"input_audio": {
"data": encoded_string,
"format": "wav",
"type": "image_url",
"image_url": {
"url": image_url,
"format": mime_type,
},
},
],
}
)
def vision_processing(self, task: str, image: str):
return messages
def openai_vision_processing(
self, task: str, image: str, messages: list
) -> list:
"""
Process vision input specifically for OpenAI models.
Handles OpenAI's specific image format requirements.
"""
# Get base64 encoded image with proper format
image_url = get_image_base64(image)
# Prepare vision message
vision_message = {
"type": "image_url",
"image_url": {"url": image_url},
}
# Add format for specific models
extension = Path(image).suffix.lower()
mime_type = (
f"image/{extension[1:]}" if extension else "image/jpeg"
)
vision_message["image_url"]["format"] = mime_type
# Append vision message
messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": task},
vision_message,
],
}
)
return messages
def vision_processing(
self, task: str, image: str, messages: Optional[list] = None
):
"""
Process the image for the given task.
Handles different image formats and model requirements.
"""
# # # Handle Anthropic models separately
# # if "anthropic" in self.model_name.lower() or "claude" in self.model_name.lower():
# # messages = self.anthropic_vision_processing(task, image, messages)
# # return messages
# # Get base64 encoded image with proper format
# image_url = get_image_base64(image)
# # Prepare vision message
# vision_message = {
# "type": "image_url",
# "image_url": {"url": image_url},
# }
# # Add format for specific models
# extension = Path(image).suffix.lower()
# mime_type = f"image/{extension[1:]}" if extension else "image/jpeg"
# vision_message["image_url"]["format"] = mime_type
# # Append vision message
# messages.append(
# {
# "role": "user",
# "content": [
# {"type": "text", "text": task},
# vision_message,
# ],
# }
# )
# return messages
if (
"anthropic" in self.model_name.lower()
or "claude" in self.model_name.lower()
):
messages = self.anthropic_vision_processing(
task, image, messages
)
return messages
else:
messages = self.openai_vision_processing(
task, image, messages
)
return messages
def audio_processing(self, task: str, audio: str):
"""
self.modalities.append("vision")
Process the audio for the given task.
# Append messages
Args:
task (str): The task to be processed.
audio (str): The path or identifier for the audio file.
"""
encoded_string = get_audio_base64(audio)
# Append audio message
self.messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": task},
{
"type": "image_url",
"image_url": {
"url": image,
# "detail": "high"
# "format": "image",
"type": "input_audio",
"input_audio": {
"data": encoded_string,
"format": "wav",
},
},
],
}
)
def handle_modalities(
self, task: str, audio: str = None, img: str = None
):
def check_if_model_supports_vision(self, img: str = None):
"""
Handle the modalities for the given task.
Check if the model supports vision.
"""
self.messages = [] # Reset messages
self.modalities.append("text")
if audio is not None:
self.audio_processing(task=task, audio=audio)
self.modalities.append("audio")
if img is not None:
self.vision_processing(task=task, image=img)
self.modalities.append("vision")
out = supports_vision(model=self.model_name)
if out is False:
raise ValueError(
f"Model {self.model_name} does not support vision"
)
def run(
self,
@ -256,13 +403,7 @@ class LiteLLM:
Exception: If there is an error in processing the request.
"""
try:
messages = self._prepare_messages(task)
if audio is not None or img is not None:
self.handle_modalities(
task=task, audio=audio, img=img
)
messages = self.messages
messages = self._prepare_messages(task=task, img=img)
# Base completion parameters
completion_params = {
@ -298,6 +439,9 @@ class LiteLLM:
{"functions": self.functions}
)
if self.base_url is not None:
completion_params["base_url"] = self.base_url
# Add modalities if needed
if self.modalities and len(self.modalities) >= 2:
completion_params["modalities"] = self.modalities
@ -308,12 +452,16 @@ class LiteLLM:
# Handle tool-based response
if self.tools_list_dictionary is not None:
return self.output_for_tools(response)
elif self.return_all is True:
return response.model_dump()
else:
# Return standard response content
return response.choices[0].message.content
except LiteLLMException as error:
logger.error(f"Error in LiteLLM run: {str(error)}")
logger.error(
f"Error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}"
)
if "rate_limit" in str(error).lower():
logger.warning(
"Rate limit hit, retrying with exponential backoff..."

@ -294,7 +294,7 @@ def test_logging_configuration() -> bool:
try:
assert (
conversation_with_logging.enable_logging == True
conversation_with_logging.enable_logging is True
), "Logging should be enabled"
assert (
conversation_with_logging.logger is not None
@ -309,7 +309,7 @@ def test_logging_configuration() -> bool:
)
assert (
conversation_no_logging.enable_logging == False
conversation_no_logging.enable_logging is False
), "Logging should be disabled"
print("✓ Logging configuration test passed")
@ -629,7 +629,7 @@ def test_update_message_method() -> bool:
)
assert (
success == True
success is True
), "update_message should return True on success"
# Verify the update
@ -643,7 +643,7 @@ def test_update_message_method() -> bool:
updated_msg["metadata"]["version"] == 2
), "Metadata should be updated"
assert (
updated_msg["metadata"]["updated"] == True
updated_msg["metadata"]["updated"] is True
), "New metadata field should be added"
# Test update_message with non-existent ID
@ -651,7 +651,7 @@ def test_update_message_method() -> bool:
message_id=999999, content="This should fail"
)
assert (
failure == False
failure is False
), "update_message should return False for non-existent message"
print("✓ Update message method test passed")
@ -1106,7 +1106,7 @@ def test_enhanced_error_handling() -> bool:
# Test invalid credentials
try:
invalid_conversation = SupabaseConversation(
SupabaseConversation(
supabase_url="https://invalid-url.supabase.co",
supabase_key="invalid_key",
enable_logging=False,
@ -1139,7 +1139,7 @@ def test_enhanced_error_handling() -> bool:
"999999", "user", "content"
)
assert (
update_result == False
update_result is False
), "_update_flexible should return False for invalid ID"
# Test update_message with invalid ID
@ -1147,7 +1147,7 @@ def test_enhanced_error_handling() -> bool:
999999, "invalid content"
)
assert (
result == False
result is False
), "update_message should return False for invalid ID"
# Test search with empty query
@ -1174,7 +1174,7 @@ def test_enhanced_error_handling() -> bool:
"not_a_number", "user", "content"
)
assert (
invalid_update == False
invalid_update is False
), "Invalid ID should return False for update"
print("✓ Enhanced error handling test passed")

@ -0,0 +1,79 @@
# 'v0-1.0-md'
# https://api.v0.dev/v1/chat/completions
import time
from swarms import Agent
import os
from dotenv import load_dotenv
load_dotenv()
FRONT_END_DEVELOPMENT_PROMPT = """
You are an expert full-stack development agent with comprehensive expertise in:
Frontend Development:
- Modern React.js/Next.js architecture and best practices
- Advanced TypeScript implementation and type safety
- State-of-the-art UI/UX design patterns
- Responsive and accessible design principles
- Component-driven development with Storybook
- Modern CSS frameworks (Tailwind, Styled-Components)
- Performance optimization and lazy loading
Backend Development:
- Scalable microservices architecture
- RESTful and GraphQL API design
- Database optimization and schema design
- Authentication and authorization systems
- Serverless architecture and cloud services
- CI/CD pipeline implementation
- Security best practices and OWASP guidelines
Development Practices:
- Test-Driven Development (TDD)
- Clean Code principles
- Documentation (TSDoc/JSDoc)
- Git workflow and version control
- Performance monitoring and optimization
- Error handling and logging
- Code review best practices
Your core responsibilities include:
1. Developing production-grade TypeScript applications
2. Implementing modern, accessible UI components
3. Designing scalable backend architectures
4. Writing comprehensive documentation
5. Ensuring type safety across the stack
6. Optimizing application performance
7. Implementing security best practices
You maintain strict adherence to:
- TypeScript strict mode and proper typing
- SOLID principles and clean architecture
- Accessibility standards (WCAG 2.1)
- Performance budgets and metrics
- Security best practices
- Comprehensive test coverage
- Modern design system principles
"""
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt=FRONT_END_DEVELOPMENT_PROMPT,
max_loops=1,
model_name="v0-1.0-md",
dynamic_temperature_enabled=True,
output_type="all",
# safety_prompt_on=True,
llm_api_key=os.getenv("V0_API_KEY"),
llm_base_url="https://api.v0.dev/v1/chat/completions",
)
out = agent.run(
"Build a simple web app that allows users to upload a file and then download it."
)
time.sleep(10)
print(out)
Loading…
Cancel
Save