[FEAT][Interactive Groupchat] + [DOCS][Agents as tools]

pull/869/head
Kye Gomez 3 weeks ago
parent 5911611628
commit cb8e023a33

@ -0,0 +1,449 @@
import json
import requests
from swarms import Agent
def create_python_file(code: str, filename: str) -> str:
"""Create a Python file with the given code and execute it using Python 3.12.
This function takes a string containing Python code, writes it to a file, and executes it
using Python 3.12 via subprocess. The file will be created in the current working directory.
If a file with the same name already exists, it will be overwritten.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to create and execute.
Returns:
str: A detailed message indicating the file was created and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Hello, World!')"
>>> result = create_python_file(code, "test.py")
>>> print(result)
'Python file created successfully. Execution result: Hello, World!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get file size and permissions
file_stats = os.stat(filename)
file_size = file_stats.st_size
file_permissions = oct(file_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True,
)
# Create detailed response
response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def update_python_file(code: str, filename: str) -> str:
"""Update an existing Python file with new code and execute it using Python 3.12.
This function takes a string containing Python code and updates an existing Python file.
If the file doesn't exist, it will be created. The file will be executed using Python 3.12.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to update and execute.
Returns:
str: A detailed message indicating the file was updated and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Updated code!')"
>>> result = update_python_file(code, "my_script.py")
>>> print(result)
'Python file updated successfully. Execution result: Updated code!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Check if file exists and get its stats
file_exists = os.path.exists(filename)
if file_exists:
old_stats = os.stat(filename)
old_size = old_stats.st_size
old_permissions = oct(old_stats.st_mode)[-3:]
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get new file stats
new_stats = os.stat(filename)
new_size = new_stats.st_size
new_permissions = oct(new_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True,
)
# Create detailed response
response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def run_quant_trading_agent(task: str) -> str:
"""Run a quantitative trading agent to analyze and execute trading strategies.
This function initializes and runs a specialized quantitative trading agent that can:
- Develop and backtest trading strategies
- Analyze market data for alpha opportunities
- Implement risk management frameworks
- Optimize portfolio allocations
- Conduct quantitative research
- Monitor market microstructure
- Evaluate trading system performance
Args:
task (str): The specific trading task or analysis to perform
Returns:
str: The agent's response or analysis results
Example:
>>> result = run_quant_trading_agent("Analyze SPY ETF for mean reversion opportunities")
>>> print(result)
"""
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=2,
model_name="claude-3-5-sonnet-20240620",
tools=[
create_python_file,
update_python_file,
backtest_summary,
],
)
out = agent.run(task)
return out
def backtest_summary(report: str) -> str:
"""Generate a summary of a backtest report, but only if the backtest was profitable.
This function should only be used when the backtest results show a positive return.
Using this function for unprofitable backtests may lead to misleading conclusions.
Args:
report (str): The backtest report containing performance metrics
Returns:
str: A formatted summary of the backtest report
Example:
>>> result = backtest_summary("Total Return: +15.2%, Sharpe: 1.8")
>>> print(result)
'The backtest report is: Total Return: +15.2%, Sharpe: 1.8'
"""
return f"The backtest report is: {report}"
def get_coin_price(coin_id: str, vs_currency: str) -> str:
"""
Get the current price of a specific cryptocurrency.
Args:
coin_id (str): The CoinGecko ID of the cryptocurrency (e.g., 'bitcoin', 'ethereum')
vs_currency (str, optional): The target currency. Defaults to "usd".
Returns:
str: JSON formatted string containing the coin's current price and market data
Raises:
requests.RequestException: If the API request fails
Example:
>>> result = get_coin_price("bitcoin")
>>> print(result)
{"bitcoin": {"usd": 45000, "usd_market_cap": 850000000000, ...}}
"""
try:
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": coin_id,
"vs_currencies": vs_currency,
"include_market_cap": True,
"include_24hr_vol": True,
"include_24hr_change": True,
"include_last_updated_at": True,
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return json.dumps(data, indent=2)
except requests.RequestException as e:
return json.dumps(
{
"error": f"Failed to fetch price for {coin_id}: {str(e)}"
}
)
except Exception as e:
return json.dumps({"error": f"Unexpected error: {str(e)}"})
def run_crypto_quant_agent(task: str) -> str:
"""
Run a crypto quantitative trading agent with specialized tools for cryptocurrency market analysis.
This function initializes and runs a quantitative trading agent specifically designed for
cryptocurrency markets. The agent is equipped with tools for price fetching and can perform
various quantitative analyses including algorithmic trading strategy development, risk management,
and market microstructure analysis.
Args:
task (str): The task or query to be processed by the crypto quant agent.
Returns:
str: The agent's response to the given task.
Example:
>>> response = run_crypto_quant_agent("Analyze the current market conditions for Bitcoin")
>>> print(response)
"Based on current market analysis..."
"""
# Initialize the agent with expanded tools
quant_agent = Agent(
agent_name="Crypto-Quant-Agent",
agent_description="Advanced quantitative trading agent specializing in cryptocurrency markets with algorithmic analysis capabilities",
system_prompt="""You are an expert quantitative trading agent specializing in cryptocurrency markets. Your capabilities include:
- Algorithmic trading strategy development and backtesting
- Statistical arbitrage and market making for crypto assets
- Risk management and portfolio optimization for digital assets
- High-frequency trading system design for crypto markets
- Market microstructure analysis of crypto exchanges
- Quantitative research methodologies for crypto assets
- Financial mathematics and stochastic processes
- Machine learning applications in crypto trading
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
max_tokens=4096,
model_name="gpt-4.1-mini",
dynamic_temperature_enabled=True,
output_type="final",
tools=[
get_coin_price,
],
)
return quant_agent.run(task)
# Initialize the agent
agent = Agent(
agent_name="Director-Agent",
agent_description="Strategic director and project management agent",
system_prompt="""You are an expert Director Agent with comprehensive capabilities in:
- Strategic planning and decision making
- Project management and coordination
- Resource allocation and optimization
- Team leadership and delegation
- Risk assessment and mitigation
- Stakeholder management
- Process optimization
- Quality assurance
Your core responsibilities include:
1. Developing and executing strategic initiatives
2. Coordinating cross-functional projects
3. Managing resource allocation
4. Setting and tracking KPIs
5. Ensuring project deliverables
6. Risk management and mitigation
7. Stakeholder communication
You maintain strict adherence to:
- Best practices in project management
- Data-driven decision making
- Clear communication protocols
- Quality standards
- Timeline management
- Budget constraints
- Regulatory compliance
You communicate with clarity and authority while maintaining professionalism and ensuring all stakeholders are aligned.""",
max_loops=1,
model_name="gpt-4o-mini",
output_type="final",
interactive=False,
tools=[run_quant_trading_agent],
)
out = agent.run(
"""
Please call the quantitative trading agent to generate Python code for an Bitcoin backtest using the CoinGecko API.
Provide a comprehensive description of the backtest methodology and trading strategy.
Consider the API limitations of CoinGecko and utilize only free, open-source libraries that don't require API keys. Use the requests library to fetch the data. Create a specialized strategy for the backtest focused on the orderbook and other data for price action.
The goal is to create a backtest that can predict the price action of the coin based on the orderbook and other data.
Maximize the profit of the backtest. Please use the OKX price API for the orderbook and other data. Be very explicit in your implementation.
Be very precise with the instructions you give to the agent and tell it to a 400 lines of good code.
"""
)
print(out)

@ -251,6 +251,9 @@ nav:
- Auto Agent Builder: "swarms/structs/auto_agent_builder.md"
- Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md"
- Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md"
- Multi-Agent Multi-Modal Structures:
- ImageAgentBatchProcessor: "swarms/structs/image_batch_agent.md"
- Workflows:
@ -302,12 +305,13 @@ nav:
- Swarms 5.9.2: "swarms/changelog/changelog_new.md"
- Examples:
- Agent Examples:
- Customizing Agents:
- Basic Agent: "swarms/examples/basic_agent.md"
- Agents with Callable Tools: "swarms/examples/agent_with_tools.md"
# - Agent With MCP Integration: "swarms/examples/agent_with_mcp.md"
- Agent Output Types: "swarms/examples/agent_output_types.md"
- Agent with Structured Outputs: "swarms/examples/agent_structured_outputs.md"
- Agents with Vision: "swarms/examples/vision_processing.md"
- Various Model Providers:
- OpenAI: "swarms/examples/openai_example.md"
- Anthropic: "swarms/examples/claude.md"
@ -339,6 +343,7 @@ nav:
- ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md"
- MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
- Unique Swarms: "swarms/examples/unique_swarms.md"
- Agents as Tools: "swarms/examples/agents_as_tools.md"
- Applications:
- Swarms DAO: "swarms/examples/swarms_dao.md"
- Swarms of Browser Agents: "swarms/examples/swarms_of_browser_agents.md"

@ -1,11 +1,12 @@
# Agent Output Types Examples
# Agent Output Types Examples with Vision Capabilities
This example demonstrates how to use different output types when working with Swarms agents. Each output type formats the agent's response in a specific way, making it easier to integrate with different parts of your application.
This example demonstrates how to use different output types when working with Swarms agents, including vision-enabled agents that can analyze images. Each output type formats the agent's response in a specific way, making it easier to integrate with different parts of your application.
## Prerequisites
- Python 3.7+
- OpenAI API key
- Anthropic API key (optional, for Claude models)
- Swarms library
## Installation
@ -18,171 +19,61 @@ pip3 install -U swarms
```plaintext
WORKSPACE_DIR="agent_workspace"
OPENAI_API_KEY=""
ANTHROPIC_API_KEY=""
OPENAI_API_KEY="" # Required for GPT-4V vision capabilities
ANTHROPIC_API_KEY="" # Optional, for Claude models
```
## Available Output Types
The following output types are supported:
| Output Type | Description |
|------------|-------------|
| `"list"` | Returns response as a JSON string containing a list |
| `"dict"` or `"dictionary"` | Returns response as a Python dictionary |
| `"string"` or `"str"` | Returns response as a plain string |
| `"final"` or `"last"` | Returns only the final response |
| `"json"` | Returns response as a JSON string |
| `"all"` | Returns all responses in the conversation |
| `"yaml"` | Returns response formatted as YAML |
| `"xml"` | Returns response formatted as XML |
| `"dict-all-except-first"` | Returns all responses except the first as a dictionary |
| `"str-all-except-first"` | Returns all responses except the first as a string |
| `"basemodel"` | Returns response as a Pydantic BaseModel |
## Examples
### 1. String Output (Default)
```python
from swarms import Agent
# Initialize agent with string output
agent = Agent(
agent_name="String-Output-Agent",
agent_description="Demonstrates string output format",
system_prompt="You are a helpful assistant that provides clear text responses.",
output_type="str", # or "string"
)
response = agent.run("What is the capital of France?")
```
### 2. JSON Output
```python
# Initialize agent with JSON output
agent = Agent(
agent_name="JSON-Output-Agent",
agent_description="Demonstrates JSON output format",
system_prompt="You are an assistant that provides structured data responses.",
output_type="json"
)
response = agent.run("List the top 3 programming languages.")
```
### 3. List Output
```python
# Initialize agent with list output
agent = Agent(
agent_name="List-Output-Agent",
agent_description="Demonstrates list output format",
system_prompt="You are an assistant that provides list-based responses.",
output_type="list"
)
response = agent.run("Name three primary colors.")
```
### 4. Dictionary Output
### Vision-Enabled Quality Control Agent
```python
# Initialize agent with dictionary output
agent = Agent(
agent_name="Dict-Output-Agent",
agent_description="Demonstrates dictionary output format",
system_prompt="You are an assistant that provides dictionary-based responses.",
output_type="dict" # or "dictionary"
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
response = agent.run("Provide information about a book.")
```
# Image for analysis
factory_image = "image.jpg"
### 5. YAML Output
```python
# Initialize agent with YAML output
agent = Agent(
agent_name="YAML-Output-Agent",
agent_description="Demonstrates YAML output format",
system_prompt="You are an assistant that provides YAML-formatted responses.",
output_type="yaml"
)
response = agent.run("Describe a recipe.")
```
### 6. XML Output
```python
# Initialize agent with XML output
agent = Agent(
agent_name="XML-Output-Agent",
agent_description="Demonstrates XML output format",
system_prompt="You are an assistant that provides XML-formatted responses.",
output_type="xml"
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="gpt-4.1-mini",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=2,
output_type="str-all-except-first",
)
response = agent.run("Provide user information.")
```
### 7. All Responses
```python
# Initialize agent to get all responses
agent = Agent(
agent_name="All-Output-Agent",
agent_description="Demonstrates getting all responses",
system_prompt="You are an assistant that provides multiple responses.",
output_type="all"
response = quality_control_agent.run(
task="what is in the image?",
img=factory_image,
)
response = agent.run("Tell me about climate change.")
```
### 8. Final Response Only
print(response)
```python
# Initialize agent to get only final response
agent = Agent(
agent_name="Final-Output-Agent",
agent_description="Demonstrates getting only final response",
system_prompt="You are an assistant that provides concise final answers.",
output_type="final" # or "last"
)
response = agent.run("What's the meaning of life?")
```
### Supported Image Formats
## Best Practices
1. Choose the output type based on your application's needs:
| Output Type | Use Case |
|------------|----------|
| `"str"` | Simple text responses |
| `"json"` or `"dict"` | Structured data |
| `"list"` | Array-like data |
| `"yaml"` | Configuration-like data |
| `"xml"` | XML-based integrations |
| `"basemodel"` | Type-safe data handling |
2. Handle the output appropriately in your application:
- Parse JSON/YAML responses when needed
- Validate structured data
- Handle potential formatting errors
The vision-enabled agents support various image formats including:
3. Consider using `try-except` blocks when working with structured output types to handle potential parsing errors.
| Format | Description |
|--------|-------------|
| JPEG/JPG | Standard image format with lossy compression |
| PNG | Lossless format supporting transparency |
| GIF | Animated format (only first frame used) |
| WebP | Modern format with both lossy and lossless compression |
### Best Practices for Vision Tasks
This comprehensive guide shows how to use all available output types in the Swarms framework, making it easier to integrate agent responses into your applications in the most suitable format for your needs.
| Best Practice | Description |
|--------------|-------------|
| Image Quality | Ensure images are clear and well-lit for optimal analysis |
| Image Size | Keep images under 20MB and in supported formats |
| Task Specificity | Provide clear, specific instructions for image analysis |
| Model Selection | Use vision-capable models (e.g., GPT-4V) for image tasks |

@ -0,0 +1,586 @@
# Agents as Tools Tutorial
This tutorial demonstrates how to create a powerful multi-agent system where agents can delegate tasks to specialized sub-agents. This pattern is particularly useful for complex tasks that require different types of expertise or capabilities.
## Overview
The Agents as Tools pattern allows you to:
- Create specialized agents with specific capabilities
- Have agents delegate tasks to other agents
- Chain multiple agents together for complex workflows
- Maintain separation of concerns between different agent roles
## Prerequisites
- Python 3.8 or higher
- Basic understanding of Python programming
- Familiarity with async/await concepts (optional)
## Installation
Install the swarms package using pip:
```bash
pip install -U swarms
```
## Basic Setup
1. First, set up your environment variables:
```python
WORKSPACE_DIR="agent_workspace"
ANTHROPIC_API_KEY=""
```
## Step-by-Step Guide
1. **Define Your Tools**
- Create functions that will serve as tools for your agents
- Add proper type hints and detailed docstrings
- Include error handling and logging
- Example:
```python
def my_tool(param: str) -> str:
"""Detailed description of what the tool does.
Args:
param: Description of the parameter
Returns:
Description of the return value
"""
# Tool implementation
return result
```
2. **Create Specialized Agents**
- Define agents with specific roles and capabilities
- Configure each agent with appropriate settings
- Assign relevant tools to each agent
```python
specialized_agent = Agent(
agent_name="Specialist",
agent_description="Expert in specific domain",
system_prompt="Detailed instructions for the agent",
tools=[tool1, tool2]
)
```
3. **Set Up the Director Agent**
- Create a high-level agent that coordinates other agents
- Give it access to specialized agents as tools
- Define clear delegation rules
```python
director = Agent(
agent_name="Director",
agent_description="Coordinates other agents",
tools=[specialized_agent.run]
)
```
4. **Execute Multi-Agent Workflows**
- Start with the director agent
- Let it delegate tasks as needed
- Handle responses and chain results
```python
result = director.run("Your high-level task description")
```
## Code
```python
import json
import requests
from swarms import Agent
def create_python_file(code: str, filename: str) -> str:
"""Create a Python file with the given code and execute it using Python 3.12.
This function takes a string containing Python code, writes it to a file, and executes it
using Python 3.12 via subprocess. The file will be created in the current working directory.
If a file with the same name already exists, it will be overwritten.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to create and execute.
Returns:
str: A detailed message indicating the file was created and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Hello, World!')"
>>> result = create_python_file(code, "test.py")
>>> print(result)
'Python file created successfully. Execution result: Hello, World!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get file size and permissions
file_stats = os.stat(filename)
file_size = file_stats.st_size
file_permissions = oct(file_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True
)
# Create detailed response
response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Creation Details:
----------------------
Timestamp: {timestamp}
Filename: {filename}
File Size: {file_size} bytes
File Permissions: {file_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def update_python_file(code: str, filename: str) -> str:
"""Update an existing Python file with new code and execute it using Python 3.12.
This function takes a string containing Python code and updates an existing Python file.
If the file doesn't exist, it will be created. The file will be executed using Python 3.12.
Args:
code (str): The Python code to write to the file. This should be valid Python 3.12 code.
filename (str): The name of the file to update and execute.
Returns:
str: A detailed message indicating the file was updated and the execution result.
Raises:
IOError: If there are any issues writing to the file.
subprocess.SubprocessError: If there are any issues executing the file.
Example:
>>> code = "print('Updated code!')"
>>> result = update_python_file(code, "my_script.py")
>>> print(result)
'Python file updated successfully. Execution result: Updated code!'
"""
import subprocess
import os
import datetime
# Get current timestamp for logging
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Check if file exists and get its stats
file_exists = os.path.exists(filename)
if file_exists:
old_stats = os.stat(filename)
old_size = old_stats.st_size
old_permissions = oct(old_stats.st_mode)[-3:]
# Write the code to file
with open(filename, "w") as f:
f.write(code)
# Get new file stats
new_stats = os.stat(filename)
new_size = new_stats.st_size
new_permissions = oct(new_stats.st_mode)[-3:]
# Execute the file using Python 3.12 and capture output
try:
result = subprocess.run(
["python3.12", filename],
capture_output=True,
text=True,
check=True
)
# Create detailed response
response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Details:
-----------------
Exit Code: {result.returncode}
Execution Time: {result.returncode} seconds
Output:
-------
{result.stdout}
Error Output (if any):
--------------------
{result.stderr}
"""
return response
except subprocess.CalledProcessError as e:
error_response = f"""
File Update Details:
-------------------
Timestamp: {timestamp}
Filename: {filename}
Previous Status: {'Existed' if file_exists else 'Did not exist'}
Previous Size: {old_size if file_exists else 'N/A'} bytes
Previous Permissions: {old_permissions if file_exists else 'N/A'}
New Size: {new_size} bytes
New Permissions: {new_permissions}
Location: {os.path.abspath(filename)}
Execution Error:
---------------
Exit Code: {e.returncode}
Error Message: {e.stderr}
Command Output:
-------------
{e.stdout}
"""
return error_response
def run_quant_trading_agent(task: str) -> str:
"""Run a quantitative trading agent to analyze and execute trading strategies.
This function initializes and runs a specialized quantitative trading agent that can:
- Develop and backtest trading strategies
- Analyze market data for alpha opportunities
- Implement risk management frameworks
- Optimize portfolio allocations
- Conduct quantitative research
- Monitor market microstructure
- Evaluate trading system performance
Args:
task (str): The specific trading task or analysis to perform
Returns:
str: The agent's response or analysis results
Example:
>>> result = run_quant_trading_agent("Analyze SPY ETF for mean reversion opportunities")
>>> print(result)
"""
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=2,
model_name="claude-3-5-sonnet-20240620",
tools=[create_python_file, update_python_file, backtest_summary],
)
out = agent.run(task)
return out
def backtest_summary(report: str) -> str:
"""Generate a summary of a backtest report, but only if the backtest was profitable.
This function should only be used when the backtest results show a positive return.
Using this function for unprofitable backtests may lead to misleading conclusions.
Args:
report (str): The backtest report containing performance metrics
Returns:
str: A formatted summary of the backtest report
Example:
>>> result = backtest_summary("Total Return: +15.2%, Sharpe: 1.8")
>>> print(result)
'The backtest report is: Total Return: +15.2%, Sharpe: 1.8'
"""
return f"The backtest report is: {report}"
def get_coin_price(coin_id: str, vs_currency: str) -> str:
"""
Get the current price of a specific cryptocurrency.
Args:
coin_id (str): The CoinGecko ID of the cryptocurrency (e.g., 'bitcoin', 'ethereum')
vs_currency (str, optional): The target currency. Defaults to "usd".
Returns:
str: JSON formatted string containing the coin's current price and market data
Raises:
requests.RequestException: If the API request fails
Example:
>>> result = get_coin_price("bitcoin")
>>> print(result)
{"bitcoin": {"usd": 45000, "usd_market_cap": 850000000000, ...}}
"""
try:
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": coin_id,
"vs_currencies": vs_currency,
"include_market_cap": True,
"include_24hr_vol": True,
"include_24hr_change": True,
"include_last_updated_at": True,
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
return json.dumps(data, indent=2)
except requests.RequestException as e:
return json.dumps(
{
"error": f"Failed to fetch price for {coin_id}: {str(e)}"
}
)
except Exception as e:
return json.dumps({"error": f"Unexpected error: {str(e)}"})
def run_crypto_quant_agent(task: str) -> str:
"""
Run a crypto quantitative trading agent with specialized tools for cryptocurrency market analysis.
This function initializes and runs a quantitative trading agent specifically designed for
cryptocurrency markets. The agent is equipped with tools for price fetching and can perform
various quantitative analyses including algorithmic trading strategy development, risk management,
and market microstructure analysis.
Args:
task (str): The task or query to be processed by the crypto quant agent.
Returns:
str: The agent's response to the given task.
Example:
>>> response = run_crypto_quant_agent("Analyze the current market conditions for Bitcoin")
>>> print(response)
"Based on current market analysis..."
"""
# Initialize the agent with expanded tools
quant_agent = Agent(
agent_name="Crypto-Quant-Agent",
agent_description="Advanced quantitative trading agent specializing in cryptocurrency markets with algorithmic analysis capabilities",
system_prompt="""You are an expert quantitative trading agent specializing in cryptocurrency markets. Your capabilities include:
- Algorithmic trading strategy development and backtesting
- Statistical arbitrage and market making for crypto assets
- Risk management and portfolio optimization for digital assets
- High-frequency trading system design for crypto markets
- Market microstructure analysis of crypto exchanges
- Quantitative research methodologies for crypto assets
- Financial mathematics and stochastic processes
- Machine learning applications in crypto trading
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1,
max_tokens=4096,
model_name="gpt-4.1-mini",
dynamic_temperature_enabled=True,
output_type="final",
tools=[
get_coin_price,
],
)
return quant_agent.run(task)
# Initialize the agent
agent = Agent(
agent_name="Director-Agent",
agent_description="Strategic director and project management agent",
system_prompt="""You are an expert Director Agent with comprehensive capabilities in:
- Strategic planning and decision making
- Project management and coordination
- Resource allocation and optimization
- Team leadership and delegation
- Risk assessment and mitigation
- Stakeholder management
- Process optimization
- Quality assurance
Your core responsibilities include:
1. Developing and executing strategic initiatives
2. Coordinating cross-functional projects
3. Managing resource allocation
4. Setting and tracking KPIs
5. Ensuring project deliverables
6. Risk management and mitigation
7. Stakeholder communication
You maintain strict adherence to:
- Best practices in project management
- Data-driven decision making
- Clear communication protocols
- Quality standards
- Timeline management
- Budget constraints
- Regulatory compliance
You communicate with clarity and authority while maintaining professionalism and ensuring all stakeholders are aligned.""",
max_loops=1,
model_name="gpt-4o-mini",
output_type="final",
interactive=False,
tools=[run_quant_trading_agent],
)
out = agent.run("""
Please call the quantitative trading agent to generate Python code for an Bitcoin backtest using the CoinGecko API.
Provide a comprehensive description of the backtest methodology and trading strategy.
Consider the API limitations of CoinGecko and utilize only free, open-source libraries that don't require API keys. Use the requests library to fetch the data. Create a specialized strategy for the backtest focused on the orderbook and other data for price action.
The goal is to create a backtest that can predict the price action of the coin based on the orderbook and other data.
Maximize the profit of the backtest. Please use the OKX price API for the orderbook and other data. Be very explicit in your implementation.
Be very precise with the instructions you give to the agent and tell it to a 400 lines of good code.
""")
print(out)
```
## Best Practices
| Category | Best Practice | Description |
|----------|---------------|-------------|
| **Tool Design** | Single Purpose | Keep tools focused and single-purpose |
| | Clear Naming | Use clear, descriptive names |
| | Error Handling | Include comprehensive error handling |
| | Documentation | Add detailed documentation |
| **Agent Configuration** | Clear Role | Give each agent a clear, specific role |
| | System Prompts | Provide detailed system prompts |
| | Model Parameters | Configure appropriate model and parameters |
| | Resource Limits | Set reasonable limits on iterations and tokens |
| **Error Handling** | Multi-level | Implement proper error handling at each level |
| | Logging | Include logging for debugging |
| | API Management | Handle API rate limits and timeouts |
| | Fallbacks | Provide fallback options when possible |
| **Performance Optimization** | Async Operations | Use async operations where appropriate |
| | Caching | Implement caching when possible |
| | Token Usage | Monitor and optimize token usage |
| | Batch Processing | Consider batch operations for efficiency |

@ -0,0 +1,150 @@
# Vision Processing Examples
This example demonstrates how to use vision-enabled agents in Swarms to analyze images and process visual information. You'll learn how to work with both OpenAI and Anthropic vision models for various use cases.
## Prerequisites
- Python 3.7+
- OpenAI API key (for GPT-4V)
- Anthropic API key (for Claude 3)
- Swarms library
## Installation
```bash
pip3 install -U swarms
```
## Environment Variables
```plaintext
WORKSPACE_DIR="agent_workspace"
OPENAI_API_KEY="" # Required for GPT-4V
ANTHROPIC_API_KEY="" # Required for Claude 3
```
## Working with Images
### Supported Image Formats
Vision-enabled agents support various image formats:
| Format | Description |
|--------|-------------|
| JPEG/JPG | Standard image format with lossy compression |
| PNG | Lossless format supporting transparency |
| GIF | Animated format (only first frame used) |
| WebP | Modern format with both lossy and lossless compression |
### Image Guidelines
- Maximum file size: 20MB
- Recommended resolution: At least 512x512 pixels
- Image should be clear and well-lit
- Avoid heavily compressed or blurry images
## Examples
### 1. Quality Control with GPT-4V
```python
from swarms.structs import Agent
from swarms.prompts.logistics import Quality_Control_Agent_Prompt
# Load your image
factory_image = "path/to/your/image.jpg" # Local file path
# Or use a URL
# factory_image = "https://example.com/image.jpg"
# Initialize quality control agent with GPT-4V
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides detailed quality reports.",
model_name="gpt-4.1-mini",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=1
)
# Run the analysis
response = quality_control_agent.run(
task="Analyze this image and provide a detailed quality control report",
img=factory_image
)
print(response)
```
### 2. Visual Analysis with Claude 3
```python
from swarms.structs import Agent
from swarms.prompts.logistics import Visual_Analysis_Prompt
# Load your image
product_image = "path/to/your/product.jpg"
# Initialize visual analysis agent with Claude 3
visual_analyst = Agent(
agent_name="Visual Analyst",
agent_description="An agent that performs detailed visual analysis of products and scenes.",
model_name="anthropic/claude-3-opus-20240229",
system_prompt=Visual_Analysis_Prompt,
multi_modal=True,
max_loops=1
)
# Run the analysis
response = visual_analyst.run(
task="Provide a comprehensive analysis of this product image",
img=product_image
)
print(response)
```
### 3. Image Batch Processing
```python
from swarms.structs import Agent
import os
def process_image_batch(image_folder, agent):
"""Process multiple images in a folder"""
results = []
for image_file in os.listdir(image_folder):
if image_file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
image_path = os.path.join(image_folder, image_file)
response = agent.run(
task="Analyze this image",
img=image_path
)
results.append((image_file, response))
return results
# Example usage
image_folder = "path/to/image/folder"
batch_results = process_image_batch(image_folder, visual_analyst)
```
## Best Practices
| Category | Best Practice | Description |
|----------|---------------|-------------|
| Image Preparation | Format Support | Ensure images are in supported formats (JPEG, PNG, GIF, WebP) |
| | Size & Quality | Optimize image size and quality for better processing |
| | Image Quality | Use clear, well-lit images for accurate analysis |
| Model Selection | GPT-4V Usage | Use for general vision tasks and detailed analysis |
| | Claude 3 Usage | Use for complex reasoning and longer outputs |
| | Batch Processing | Consider batch processing for multiple images |
| Error Handling | Path Validation | Always validate image paths before processing |
| | API Error Handling | Implement proper error handling for API calls |
| | Rate Monitoring | Monitor API rate limits and token usage |
| Performance Optimization | Result Caching | Cache results when processing the same images |
| | Batch Processing | Use batch processing for multiple images |
| | Parallel Processing | Implement parallel processing for large datasets |

@ -0,0 +1,271 @@
# ImageAgentBatchProcessor Documentation
## Overview
The `ImageAgentBatchProcessor` is a high-performance parallel image processing system designed for running AI agents on multiple images concurrently. It provides robust error handling, logging, and flexible configuration options.
## Installation
```bash
pip install swarms
```
## Class Arguments
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| agents | Union[Agent, List[Agent], Callable, List[Callable]] | Required | Single agent or list of agents to process images |
| max_workers | int | None | Maximum number of parallel workers (defaults to 95% of CPU cores) |
| supported_formats | List[str] | ['.jpg', '.jpeg', '.png'] | List of supported image file extensions |
## Methods
### run()
**Description**: Main method for processing multiple images in parallel with configured agents. Can handle single images, multiple images, or entire directories.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| image_paths | Union[str, List[str], Path] | Yes | Single image path, list of paths, or directory path |
| tasks | Union[str, List[str]] | Yes | Single task or list of tasks to perform on each image |
**Returns**: List[Dict[str, Any]] - List of processing results for each image
**Example**:
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
from pathlib import Path
# Initialize agent and processor
agent = Agent(api_key="your-api-key", model="gpt-4-vision")
processor = ImageAgentBatchProcessor(agents=agent)
# Example 1: Process single image
results = processor.run(
image_paths="path/to/image.jpg",
tasks="Describe this image"
)
# Example 2: Process multiple images
results = processor.run(
image_paths=["image1.jpg", "image2.jpg"],
tasks=["Describe objects", "Identify colors"]
)
# Example 3: Process directory
results = processor.run(
image_paths=Path("./images"),
tasks="Analyze image content"
)
```
### _validate_image_path()
**Description**: Internal method that validates if an image path exists and has a supported format.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| image_path | Union[str, Path] | Yes | Path to the image file to validate |
**Returns**: Path - Validated Path object
**Example**:
```python
from swarms.structs import ImageAgentBatchProcessor, ImageProcessingError
from pathlib import Path
processor = ImageAgentBatchProcessor(agents=agent)
try:
validated_path = processor._validate_image_path("image.jpg")
print(f"Valid image path: {validated_path}")
except ImageProcessingError as e:
print(f"Invalid image path: {e}")
```
### _process_single_image()
**Description**: Internal method that processes a single image with one agent and one or more tasks.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| image_path | Path | Yes | Path to the image to process |
| tasks | Union[str, List[str]] | Yes | Tasks to perform on the image |
| agent | Agent | Yes | Agent to use for processing |
**Returns**: Dict[str, Any] - Processing results for the image
**Example**:
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
from pathlib import Path
agent = Agent(api_key="your-api-key", model="gpt-4-vision")
processor = ImageAgentBatchProcessor(agents=agent)
try:
result = processor._process_single_image(
image_path=Path("image.jpg"),
tasks=["Describe image", "Identify objects"],
agent=agent
)
print(f"Processing results: {result}")
except Exception as e:
print(f"Processing failed: {e}")
```
### __call__()
**Description**: Makes the ImageAgentBatchProcessor callable like a function. Redirects to the run() method.
**Arguments**:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| *args | Any | No | Variable length argument list passed to run() |
| **kwargs | Any | No | Keyword arguments passed to run() |
**Returns**: List[Dict[str, Any]] - Same as run() method
**Example**:
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
# Initialize
agent = Agent(api_key="your-api-key", model="gpt-4-vision")
processor = ImageAgentBatchProcessor(agents=agent)
# Using __call__
results = processor(
image_paths=["image1.jpg", "image2.jpg"],
tasks="Describe the image"
)
# This is equivalent to:
results = processor.run(
image_paths=["image1.jpg", "image2.jpg"],
tasks="Describe the image"
)
```
## Return Format
The processor returns a list of dictionaries with the following structure:
```python
{
"image_path": str, # Path to the processed image
"results": { # Results for each task
"task_name": result, # Task-specific results
},
"processing_time": float # Processing time in seconds
}
```
## Complete Usage Examples
### 1. Basic Usage with Single Agent
```python
from swarms import Agent
from swarms.structs import ImageAgentBatchProcessor
# Initialize an agent
agent = Agent(
api_key="your-api-key",
model="gpt-4-vision"
)
# Create processor
processor = ImageAgentBatchProcessor(agents=agent)
# Process single image
results = processor.run(
image_paths="path/to/image.jpg",
tasks="Describe this image in detail"
)
```
### 2. Processing Multiple Images with Multiple Tasks
```python
# Initialize with multiple agents
agent1 = Agent(api_key="key1", model="gpt-4-vision")
agent2 = Agent(api_key="key2", model="claude-3")
processor = ImageAgentBatchProcessor(
agents=[agent1, agent2],
supported_formats=['.jpg', '.png', '.webp']
)
# Define multiple tasks
tasks = [
"Describe the main objects in the image",
"What is the dominant color?",
"Identify any text in the image"
]
# Process a directory of images
results = processor.run(
image_paths="path/to/image/directory",
tasks=tasks
)
# Process results
for result in results:
print(f"Image: {result['image_path']}")
for task, output in result['results'].items():
print(f"Task: {task}")
print(f"Result: {output}")
print(f"Processing time: {result['processing_time']:.2f} seconds")
```
### 3. Custom Error Handling
```python
from swarms.structs import ImageAgentBatchProcessor, ImageProcessingError
try:
processor = ImageAgentBatchProcessor(agents=agent)
results = processor.run(
image_paths=["image1.jpg", "image2.png", "invalid.txt"],
tasks="Analyze the image"
)
except ImageProcessingError as e:
print(f"Image processing failed: {e}")
except InvalidAgentError as e:
print(f"Agent configuration error: {e}")
```
## Best Practices
| Best Practice | Description |
|--------------|-------------|
| Resource Management | • The processor automatically uses 95% of available CPU cores<br>• For memory-intensive operations, consider reducing `max_workers` |
| Error Handling | • Always wrap processor calls in try-except blocks<br>• Check the results for any error keys |
| Task Design | • Keep tasks focused and specific<br>• Group related tasks together for efficiency |
| Performance Optimization | • Process images in batches for better throughput<br>• Use multiple agents for different types of analysis |
## Limitations
| Limitation | Description |
|------------|-------------|
| File Format Support | Only supports image file formats specified in `supported_formats` |
| Agent Requirements | Requires valid agent configurations |
| Resource Scaling | Memory usage scales with number of concurrent processes |
This documentation provides a comprehensive guide to using the `ImageAgentBatchProcessor`. The class is designed to be both powerful and flexible, allowing for various use cases from simple image analysis to complex multi-agent processing pipelines.

@ -0,0 +1,206 @@
# InteractiveGroupChat Documentation
The InteractiveGroupChat is a sophisticated multi-agent system that enables interactive conversations between users and AI agents using @mentions. This system allows users to direct messages to specific agents and facilitates collaborative responses when multiple agents are mentioned.
## Features
- **@mentions Support**: Direct messages to specific agents using @agent_name syntax
- **Multi-Agent Collaboration**: Multiple mentioned agents can see and respond to each other's messages
- **Callable Function Support**: Supports both Agent instances and callable functions as chat participants
- **Comprehensive Error Handling**: Custom error classes for different scenarios
- **Conversation History**: Maintains a complete history of the conversation
- **Flexible Output Formatting**: Configurable output format for conversation history
## Installation
```bash
pip install swarms
```
## Basic Usage
```python
from swarms import Agent, InteractiveGroupChat
# Initialize agents
financial_advisor = Agent(
agent_name="FinancialAdvisor",
system_prompt="You are a financial advisor specializing in investment strategies.",
model_name="gpt-4o-mini"
)
tax_expert = Agent(
agent_name="TaxExpert",
system_prompt="You are a tax expert providing tax-related guidance.",
model_name="gpt-4o-mini"
)
# Create the interactive group chat
chat = InteractiveGroupChat(
name="Financial Team",
description="Financial advisory team",
agents=[financial_advisor, tax_expert]
)
# Send a message to a single agent
response = chat.run("@FinancialAdvisor what are good investment strategies?")
# Send a message to multiple agents
response = chat.run("@FinancialAdvisor and @TaxExpert, how can I optimize my investment taxes?")
```
## Advanced Usage
### Using Callable Functions
```python
def custom_agent(context: str) -> str:
"""A custom callable function that can act as an agent"""
return "Custom response based on: " + context
# Add both Agent instances and callable functions
agents = [financial_advisor, tax_expert, custom_agent]
chat = InteractiveGroupChat(agents=agents)
# Interact with the callable function
response = chat.run("@custom_agent what do you think?")
```
## Configuration Options
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| name | str | "InteractiveGroupChat" | Name of the group chat |
| description | str | "An interactive group chat..." | Description of the chat's purpose |
| agents | List[Union[Agent, Callable]] | [] | List of agents or callable functions |
| max_loops | int | 1 | Maximum conversation turns |
| output_type | str | "string" | Output format type |
## Error Handling
The system includes several custom error classes:
- **InteractiveGroupChatError**: Base exception class
- **AgentNotFoundError**: Raised when a mentioned agent doesn't exist
- **NoMentionedAgentsError**: Raised when no agents are mentioned
- **InvalidMessageFormatError**: Raised for invalid message formats
Example error handling:
```python
try:
response = chat.run("@NonExistentAgent hello!")
except AgentNotFoundError as e:
print(f"Agent not found: {e}")
except NoMentionedAgentsError as e:
print(f"No agents mentioned: {e}")
```
## Best Practices
1. **Agent Naming**: Use clear, unique names for agents to avoid confusion
2. **Message Format**: Always use @mentions to direct messages to specific agents
3. **Error Handling**: Implement proper error handling for various scenarios
4. **Context Management**: Be aware that agents can see the full conversation history
5. **Resource Management**: Consider the number of agents and message length to optimize performance
## Logging
The system uses loguru for comprehensive logging:
```python
from loguru import logger
# Configure logging
logger.add("groupchat.log", rotation="500 MB")
# Logs will include:
# - Agent responses
# - Error messages
# - System events
```
## Examples
### Basic Interaction
```python
# Single agent interaction
response = chat.run("@FinancialAdvisor what are the best investment strategies for 2024?")
# Multiple agent collaboration
response = chat.run("@TaxExpert and @InvestmentAnalyst, how can we optimize investment taxes?")
```
### Error Handling
```python
try:
# Invalid agent mention
response = chat.run("@NonExistentAgent hello!")
except AgentNotFoundError as e:
print(f"Error: {e}")
try:
# No mentions
response = chat.run("Hello everyone!")
except NoMentionedAgentsError as e:
print(f"Error: {e}")
```
### Custom Callable Integration
```python
def market_analyzer(context: str) -> str:
"""Custom market analysis function"""
return "Market analysis based on: " + context
agents = [financial_advisor, tax_expert, market_analyzer]
chat = InteractiveGroupChat(agents=agents)
response = chat.run("@market_analyzer what's your analysis of the current market?")
```
## API Reference
### InteractiveGroupChat Class
```python
class InteractiveGroupChat:
def __init__(
self,
name: str = "InteractiveGroupChat",
description: str = "An interactive group chat for multiple agents",
agents: List[Union[Agent, Callable]] = [],
max_loops: int = 1,
output_type: str = "string",
):
"""Initialize the interactive group chat."""
def run(self, message: str) -> str:
"""Process a message and get responses from mentioned agents."""
```
### Custom Error Classes
```python
class InteractiveGroupChatError(Exception):
"""Base exception class for InteractiveGroupChat errors"""
class AgentNotFoundError(InteractiveGroupChatError):
"""Raised when a mentioned agent is not found"""
class NoMentionedAgentsError(InteractiveGroupChatError):
"""Raised when no agents are mentioned"""
class InvalidMessageFormatError(InteractiveGroupChatError):
"""Raised when the message format is invalid"""
```
## Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to our GitHub repository.
## License
This project is licensed under the Apache License - see the LICENSE file for details.

@ -1,6 +1,5 @@
import time
from swarms import Agent
from swarms.schemas.conversation_schema import ConversationSchema
# Initialize the agent
agent = Agent(
@ -38,12 +37,8 @@ agent = Agent(
max_loops=1,
model_name="gpt-4o-mini",
dynamic_temperature_enabled=True,
output_type="json",
output_type="all",
safety_prompt_on=True,
conversation_schema=ConversationSchema(
time_enabled=True,
message_id_on=True,
),
)
out = agent.run("What are the best top 3 etfs for gold coverage?")

@ -0,0 +1,26 @@
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
# Image for analysis
factory_image = "image.jpg"
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="anthropic/claude-3-opus-20240229",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
)
response = quality_control_agent.run(
task="Create a comprehensive report on the factory image and it's status",
img=factory_image,
)
print(response)

Binary file not shown.

After

Width:  |  Height:  |  Size: 232 KiB

@ -0,0 +1,32 @@
from swarms import Agent
from swarms.structs.image_batch_processor import (
ImageAgentBatchProcessor,
)
from pathlib import Path
# Initialize agent and processor
# Quality control agent
agent = Agent(
model_name="gpt-4.1-mini",
max_loops=1,
)
# Create processor
processor = ImageAgentBatchProcessor(agents=agent)
# Example 1: Process single image
results = processor.run(
image_paths="path/to/image.jpg", tasks="Describe this image"
)
# Example 2: Process multiple images
results = processor.run(
image_paths=["image1.jpg", "image2.jpg"],
tasks=["Describe objects", "Identify colors"],
)
# Example 3: Process directory
results = processor.run(
image_paths=Path("./images"), tasks="Analyze image content"
)

@ -0,0 +1,67 @@
import json
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
from swarms import BaseTool
import litellm
litellm._turn_on_debug()
# Image for analysis
factory_image = "image.jpg"
def security_analysis(danger_level: str = None) -> str:
"""
Analyzes the security danger level and returns an appropriate response.
Args:
danger_level (str, optional): The level of danger to analyze.
Can be "low", "medium", "high", or None. Defaults to None.
Returns:
str: A string describing the danger level assessment.
- "No danger level provided" if danger_level is None
- "No danger" if danger_level is "low"
- "Medium danger" if danger_level is "medium"
- "High danger" if danger_level is "high"
- "Unknown danger level" for any other value
"""
if danger_level is None:
return "No danger level provided"
if danger_level == "low":
return "No danger"
if danger_level == "medium":
return "Medium danger"
if danger_level == "high":
return "High danger"
return "Unknown danger level"
schema = BaseTool().function_to_dict(security_analysis)
print(json.dumps(schema, indent=4))
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="anthropic/claude-3-opus-20240229",
system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
tools_list_dictionary=[schema],
)
response = quality_control_agent.run(
task="what is in the image?",
# img=factory_image,
)
print(response)

@ -0,0 +1,27 @@
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
# Image for analysis
factory_image = "image.jpg"
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="gpt-4.1-mini",
system_prompt=Quality_Control_Agent_Prompt,
# multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
)
response = quality_control_agent.run(
task="Create a comprehensive report on the image",
img=factory_image,
)
print(response)

@ -0,0 +1,51 @@
from swarms import Agent
from swarms.structs.interactive_groupchat import InteractiveGroupChat
if __name__ == "__main__":
# Initialize agents
financial_advisor = Agent(
agent_name="FinancialAdvisor",
system_prompt="You are a financial advisor specializing in investment strategies and portfolio management.",
random_models_on=True,
output_type="final",
)
tax_expert = Agent(
agent_name="TaxExpert",
system_prompt="You are a tax expert who provides guidance on tax optimization and compliance.",
random_models_on=True,
output_type="final",
)
investment_analyst = Agent(
agent_name="InvestmentAnalyst",
system_prompt="You are an investment analyst focusing on market trends and investment opportunities.",
random_models_on=True,
output_type="final",
)
# Create list of agents including both Agent instances and callable
agents = [
financial_advisor,
tax_expert,
investment_analyst,
]
# Initialize another chat instance in interactive mode
interactive_chat = InteractiveGroupChat(
name="Interactive Financial Advisory Team",
description="An interactive team of financial experts providing comprehensive financial advice",
agents=agents,
max_loops=1,
output_type="all",
interactive=True,
)
try:
# Start the interactive session
print("\nStarting interactive session...")
# interactive_chat.run("What is the best methodology to accumulate gold and silver commodities, what is the best long term strategy to accumulate them?")
interactive_chat.start_interactive_session()
except Exception as e:
print(f"An error occurred in interactive mode: {e}")

@ -423,7 +423,7 @@ agent = Agent(
system_prompt="You are an advanced financial advisor agent with access to real-time cryptocurrency data from multiple sources including CoinGecko, Jupiter Protocol, and HTX. You can help users analyze market trends, check prices, find trading opportunities, perform swaps, and get detailed market insights. Always provide accurate, up-to-date information and explain market data in an easy-to-understand way.",
max_loops=1,
max_tokens=4096,
model_name="gpt-4o-mini",
model_name="gpt-4.1-mini",
dynamic_temperature_enabled=True,
output_type="all",
tools=[
@ -442,5 +442,7 @@ agent = Agent(
)
# agent.run("Use defi stats to find the best defi project to invest in")
agent.run("Get the market sentiment for bitcoin")
agent.run(
"Get the market sentiment for bitcoin and fetch the price of ethereum"
)
# Automatically executes any number and combination of tools you have uploaded to the tools parameter!

@ -223,10 +223,6 @@ class SupabaseConversation(BaseCommunication):
"""
# Try to create index as well
create_index_sql = f"""
CREATE INDEX IF NOT EXISTS idx_{self.table_name}_conversation_id
ON {self.table_name} (conversation_id);
"""
# Attempt to create table using RPC function
# Note: This requires a stored procedure to be created in Supabase
@ -322,7 +318,7 @@ class SupabaseConversation(BaseCommunication):
if hasattr(self.client, "postgrest") and hasattr(
self.client.postgrest, "rpc"
):
result = self.client.postgrest.rpc(
self.client.postgrest.rpc(
"exec_sql", {"query": admin_sql}
).execute()
if self.enable_logging:

@ -1,91 +1,109 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Union, Any, Literal
from litellm.types import (
ChatCompletionPredictionContentParam,
)
from typing import Optional
# from litellm.types import (
# ChatCompletionPredictionContentParam,
# )
class LLMCompletionRequest(BaseModel):
"""Schema for LLM completion request parameters."""
model: Optional[str] = Field(
default=None,
description="The name of the language model to use for text completion",
)
temperature: Optional[float] = Field(
default=0.5,
description="Controls randomness of the output (0.0 to 1.0)",
)
top_p: Optional[float] = Field(
default=None,
description="Controls diversity via nucleus sampling",
)
n: Optional[int] = Field(
default=None, description="Number of completions to generate"
)
stream: Optional[bool] = Field(
default=None, description="Whether to stream the response"
)
stream_options: Optional[dict] = Field(
default=None, description="Options for streaming response"
)
stop: Optional[Any] = Field(
default=None,
description="Up to 4 sequences where the API will stop generating",
)
max_completion_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens for completion including reasoning",
)
max_tokens: Optional[int] = Field(
default=None,
description="Maximum tokens in generated completion",
)
prediction: Optional[ChatCompletionPredictionContentParam] = (
Field(
default=None,
description="Configuration for predicted output",
)
)
presence_penalty: Optional[float] = Field(
default=None,
description="Penalizes new tokens based on existence in text",
)
frequency_penalty: Optional[float] = Field(
default=None,
description="Penalizes new tokens based on frequency in text",
)
logit_bias: Optional[dict] = Field(
default=None,
description="Modifies probability of specific tokens",
)
reasoning_effort: Optional[Literal["low", "medium", "high"]] = (
Field(
default=None,
description="Level of reasoning effort for the model",
)
)
seed: Optional[int] = Field(
default=None, description="Random seed for reproducibility"
)
tools: Optional[List] = Field(
default=None,
description="List of tools available to the model",
)
tool_choice: Optional[Union[str, dict]] = Field(
default=None, description="Choice of tool to use"
)
logprobs: Optional[bool] = Field(
default=None,
description="Whether to return log probabilities",
)
top_logprobs: Optional[int] = Field(
# class LLMCompletionRequest(BaseModel):
# """Schema for LLM completion request parameters."""
# model: Optional[str] = Field(
# default=None,
# description="The name of the language model to use for text completion",
# )
# temperature: Optional[float] = Field(
# default=0.5,
# description="Controls randomness of the output (0.0 to 1.0)",
# )
# top_p: Optional[float] = Field(
# default=None,
# description="Controls diversity via nucleus sampling",
# )
# n: Optional[int] = Field(
# default=None, description="Number of completions to generate"
# )
# stream: Optional[bool] = Field(
# default=None, description="Whether to stream the response"
# )
# stream_options: Optional[dict] = Field(
# default=None, description="Options for streaming response"
# )
# stop: Optional[Any] = Field(
# default=None,
# description="Up to 4 sequences where the API will stop generating",
# )
# max_completion_tokens: Optional[int] = Field(
# default=None,
# description="Maximum tokens for completion including reasoning",
# )
# max_tokens: Optional[int] = Field(
# default=None,
# description="Maximum tokens in generated completion",
# )
# prediction: Optional[ChatCompletionPredictionContentParam] = (
# Field(
# default=None,
# description="Configuration for predicted output",
# )
# )
# presence_penalty: Optional[float] = Field(
# default=None,
# description="Penalizes new tokens based on existence in text",
# )
# frequency_penalty: Optional[float] = Field(
# default=None,
# description="Penalizes new tokens based on frequency in text",
# )
# logit_bias: Optional[dict] = Field(
# default=None,
# description="Modifies probability of specific tokens",
# )
# reasoning_effort: Optional[Literal["low", "medium", "high"]] = (
# Field(
# default=None,
# description="Level of reasoning effort for the model",
# )
# )
# seed: Optional[int] = Field(
# default=None, description="Random seed for reproducibility"
# )
# tools: Optional[List] = Field(
# default=None,
# description="List of tools available to the model",
# )
# tool_choice: Optional[Union[str, dict]] = Field(
# default=None, description="Choice of tool to use"
# )
# logprobs: Optional[bool] = Field(
# default=None,
# description="Whether to return log probabilities",
# )
# top_logprobs: Optional[int] = Field(
# default=None,
# description="Number of most likely tokens to return",
# )
# parallel_tool_calls: Optional[bool] = Field(
# default=None,
# description="Whether to allow parallel tool calls",
# )
# class Config:
# allow_arbitrary_types = True
class ModelConfigOrigin(BaseModel):
"""Schema for model configuration origin."""
model_url: Optional[str] = Field(
default=None,
description="Number of most likely tokens to return",
description="The URL of the model to use for text completion",
)
parallel_tool_calls: Optional[bool] = Field(
api_key: Optional[str] = Field(
default=None,
description="Whether to allow parallel tool calls",
description="The API key to use for the model",
)
class Config:

@ -40,6 +40,7 @@ from swarms.schemas.base_schemas import (
ChatCompletionResponseChoice,
ChatMessageResponse,
)
from swarms.schemas.llm_agent_schema import ModelConfigOrigin
from swarms.structs.agent_roles import agent_roles
from swarms.structs.conversation import Conversation
from swarms.structs.safe_loading import (
@ -407,6 +408,9 @@ class Agent:
mcp_config: Optional[MCPConnection] = None,
top_p: Optional[float] = 0.90,
conversation_schema: Optional[ConversationSchema] = None,
aditional_llm_config: Optional[ModelConfigOrigin] = None,
llm_base_url: Optional[str] = None,
llm_api_key: Optional[str] = None,
*args,
**kwargs,
):
@ -534,10 +538,9 @@ class Agent:
self.mcp_config = mcp_config
self.top_p = top_p
self.conversation_schema = conversation_schema
self._cached_llm = (
None # Add this line to cache the LLM instance
)
self.aditional_llm_config = aditional_llm_config
self.llm_base_url = llm_base_url
self.llm_api_key = llm_api_key
# self.short_memory = self.short_memory_init()
@ -547,6 +550,8 @@ class Agent:
# self.init_handling()
self.setup_config()
self.short_memory = self.short_memory_init()
if exists(self.docs_folder):
self.get_docs_from_doc_folders()
@ -564,8 +569,6 @@ class Agent:
if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT
self.short_memory = self.short_memory_init()
# Run sequential operations after all concurrent tasks are done
# self.agent_output = self.agent_output_model()
log_agent_data(self.to_dict())
@ -661,8 +664,8 @@ class Agent:
def llm_handling(self):
# Use cached instance if available
if self._cached_llm is not None:
return self._cached_llm
if self.llm is not None:
return self.llm
if self.model_name is None:
self.model_name = "gpt-4o-mini"
@ -682,11 +685,9 @@ class Agent:
}
if self.llm_args is not None:
self._cached_llm = LiteLLM(
**{**common_args, **self.llm_args}
)
self.llm = LiteLLM(**{**common_args, **self.llm_args})
elif self.tools_list_dictionary is not None:
self._cached_llm = LiteLLM(
self.llm = LiteLLM(
**common_args,
tools_list_dictionary=self.tools_list_dictionary,
tool_choice="auto",
@ -694,7 +695,7 @@ class Agent:
)
elif self.mcp_url is not None:
self._cached_llm = LiteLLM(
self.llm = LiteLLM(
**common_args,
tools_list_dictionary=self.add_mcp_tools_to_memory(),
tool_choice="auto",
@ -702,11 +703,14 @@ class Agent:
mcp_call=True,
)
else:
self._cached_llm = LiteLLM(
**common_args, stream=self.streaming_on
# common_args.update(self.aditional_llm_config.model_dump())
self.llm = LiteLLM(
**common_args,
stream=self.streaming_on,
)
return self._cached_llm
return self.llm
except AgentLLMInitializationError as e:
logger.error(
f"Error in llm_handling: {e} Your current configuration is not supported. Please check the configuration and parameters."
@ -789,7 +793,7 @@ class Agent:
"No agent details found. Using task as fallback for prompt generation."
)
self.system_prompt = auto_generate_prompt(
task=task, model=self._cached_llm
task=task, model=self.llm
)
else:
# Combine all available components
@ -1012,16 +1016,20 @@ class Agent:
)
self.memory_query(task_prompt)
# Generate response using LLM
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
# # Generate response using LLM
# response_args = (
# (task_prompt, *args)
# if img is None
# else (task_prompt, img, *args)
# )
# # Call the LLM
# response = self.call_llm(
# *response_args, **kwargs
# )
# Call the LLM
response = self.call_llm(
*response_args, **kwargs
task=task_prompt, img=img, *args, **kwargs
)
if exists(self.tools_list_dictionary):
@ -2388,7 +2396,9 @@ class Agent:
return None
def call_llm(self, task: str, *args, **kwargs) -> str:
def call_llm(
self, task: str, img: str = None, *args, **kwargs
) -> str:
"""
Calls the appropriate method on the `llm` object based on the given task.
@ -2407,17 +2417,9 @@ class Agent:
TypeError: If task is not a string or llm object is None.
ValueError: If task is empty.
"""
# if not isinstance(task, str):
# task = any_to_str(task)
# if img is not None:
# kwargs['img'] = img
# if audio is not None:
# kwargs['audio'] = audio
try:
out = self.llm.run(task=task, *args, **kwargs)
out = self.llm.run(task=task, img=img, *args, **kwargs)
return out
except AgentLLMError as e:
@ -2764,13 +2766,7 @@ class Agent:
# Create a temporary LLM instance without tools for the follow-up call
try:
temp_llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
)
temp_llm = self.temp_llm_instance_for_tool_summary()
summary = temp_llm.run(
task=self.short_memory.get_str()
@ -2792,6 +2788,19 @@ class Agent:
logger.error(f"Error in MCP tool: {e}")
raise e
def temp_llm_instance_for_tool_summary(self):
return LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
tools_list_dictionary=None,
parallel_tool_calls=False,
base_url=self.llm_base_url,
api_key=self.llm_api_key,
)
def execute_tools(self, response: any, loop_count: int):
output = (
@ -2813,15 +2822,7 @@ class Agent:
# Now run the LLM again without tools - create a temporary LLM instance
# instead of modifying the cached one
# Create a temporary LLM instance without tools for the follow-up call
temp_llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
stream=self.streaming_on,
tools_list_dictionary=None,
parallel_tool_calls=False,
)
temp_llm = self.temp_llm_instance_for_tool_summary()
tool_response = temp_llm.run(
f"""

@ -0,0 +1,261 @@
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from loguru import logger
from swarms.structs import Agent
class ImageProcessingError(Exception):
"""Custom exception for image processing errors."""
pass
class InvalidAgentError(Exception):
"""Custom exception for invalid agent configurations."""
pass
class ImageAgentBatchProcessor:
"""
A class for processing multiple images in parallel using one or more agents.
This processor can:
- Handle multiple images from a directory
- Process images with single or multiple agents
- Execute tasks in parallel
- Provide detailed logging and error handling
Attributes:
agents (List[Agent]): List of agents to process images
max_workers (int): Maximum number of parallel workers
supported_formats (set): Set of supported image formats
"""
def __init__(
self,
agents: Union[Agent, List[Agent], Callable, List[Callable]],
max_workers: int = None,
supported_formats: Optional[List[str]] = None,
):
"""
Initialize the ImageBatchProcessor.
Args:
agents: Single agent or list of agents to process images
max_workers: Maximum number of parallel workers (default: 4)
supported_formats: List of supported image formats (default: ['.jpg', '.jpeg', '.png'])
Raises:
InvalidAgentError: If agents parameter is invalid
"""
self.agents = agents
self.max_workers = max_workers
self.supported_formats = supported_formats
self.agents = (
[agents] if isinstance(agents, Agent) else agents
)
if not self.agents:
raise InvalidAgentError(
"At least one agent must be provided"
)
# Get 95% of the total number of cores
self.max_workers = int(os.cpu_count() * 0.95)
self.supported_formats = set(
supported_formats or [".jpg", ".jpeg", ".png"]
)
# Setup logging
logger.add(
"image_processor.log",
rotation="100 MB",
retention="10 days",
level="INFO",
)
def _validate_image_path(
self, image_path: Union[str, Path]
) -> Path:
"""
Validate if the image path exists and has supported format.
Args:
image_path: Path to the image file
Returns:
Path: Validated Path object
Raises:
ImageProcessingError: If path is invalid or format not supported
"""
path = Path(image_path)
if not path.exists():
raise ImageProcessingError(
f"Image path does not exist: {path}"
)
if path.suffix.lower() not in self.supported_formats:
raise ImageProcessingError(
f"Unsupported image format {path.suffix}. Supported formats: {self.supported_formats}"
)
return path
def _process_single_image(
self,
image_path: Path,
tasks: Union[str, List[str]],
agent: Agent,
) -> Dict[str, Any]:
"""
Process a single image with one agent and one or more tasks.
Args:
image_path: Path to the image
tasks: Single task or list of tasks to perform
agent: Agent to process the image
Returns:
Dict containing results for each task
"""
try:
tasks_list = [tasks] if isinstance(tasks, str) else tasks
results = {}
logger.info(
f"Processing image {image_path} with agent {agent.__class__.__name__}"
)
start_time = time.time()
for task in tasks_list:
try:
result = agent.run(task=task, img=str(image_path))
results[task] = result
except Exception as e:
logger.error(
f"Error processing task '{task}' for image {image_path}: {str(e)}"
)
results[task] = f"Error: {str(e)}"
processing_time = time.time() - start_time
logger.info(
f"Completed processing {image_path} in {processing_time:.2f} seconds"
)
return {
"image_path": str(image_path),
"results": results,
"processing_time": processing_time,
}
except Exception as e:
logger.error(
f"Failed to process image {image_path}: {str(e)}"
)
raise ImageProcessingError(
f"Failed to process image {image_path}: {str(e)}"
)
def run(
self,
image_paths: Union[str, List[str], Path],
tasks: Union[str, List[str]],
) -> List[Dict[str, Any]]:
"""
Process multiple images in parallel with the configured agents.
Args:
image_paths: Single image path or list of image paths or directory path
tasks: Single task or list of tasks to perform on each image
Returns:
List of dictionaries containing results for each image
Raises:
ImageProcessingError: If any image processing fails
"""
# Handle directory input
if (
isinstance(image_paths, (str, Path))
and Path(image_paths).is_dir()
):
image_paths = [
os.path.join(image_paths, f)
for f in os.listdir(image_paths)
if Path(os.path.join(image_paths, f)).suffix.lower()
in self.supported_formats
]
elif isinstance(image_paths, (str, Path)):
image_paths = [image_paths]
# Validate all paths
validated_paths = [
self._validate_image_path(path) for path in image_paths
]
if not validated_paths:
logger.warning("No valid images found to process")
return []
logger.info(
f"Starting batch processing of {len(validated_paths)} images"
)
results = []
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
future_to_path = {}
# Submit all tasks
for path in validated_paths:
for agent in self.agents:
future = executor.submit(
self._process_single_image, path, tasks, agent
)
future_to_path[future] = (path, agent)
# Collect results as they complete
for future in as_completed(future_to_path):
path, agent = future_to_path[future]
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(
f"Failed to process {path} with {agent.__class__.__name__}: {str(e)}"
)
results.append(
{
"image_path": str(path),
"error": str(e),
"agent": agent.__class__.__name__,
}
)
logger.info(
f"Completed batch processing of {len(validated_paths)} images"
)
return results
def __call__(self, *args, **kwargs):
"""
Make the ImageAgentBatchProcessor callable like a function.
This allows the processor to be used directly as a function, which will
call the run() method with the provided arguments.
Args:
*args: Variable length argument list to pass to run()
**kwargs: Arbitrary keyword arguments to pass to run()
Returns:
The result of calling run() with the provided arguments
"""
return self.run(*args, **kwargs)

@ -0,0 +1,356 @@
from typing import List, Union, Callable
import re
from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.generate_keys import generate_api_key
class InteractiveGroupChatError(Exception):
"""Base exception class for InteractiveGroupChat errors"""
pass
class AgentNotFoundError(InteractiveGroupChatError):
"""Raised when a mentioned agent is not found in the group"""
pass
class NoMentionedAgentsError(InteractiveGroupChatError):
"""Raised when no agents are mentioned in the message"""
pass
class InvalidMessageFormatError(InteractiveGroupChatError):
"""Raised when the message format is invalid"""
pass
class InteractiveGroupChat:
"""
An interactive group chat system that enables conversations with multiple agents using @mentions.
This class allows users to interact with multiple agents by mentioning them using @agent_name syntax.
When multiple agents are mentioned, they can see and respond to each other's messages.
Attributes:
name (str): Name of the group chat
description (str): Description of the group chat's purpose
agents (List[Union[Agent, Callable]]): List of Agent instances or callable functions
max_loops (int): Maximum number of conversation turns
conversation (Conversation): Stores the chat history
agent_map (Dict[str, Union[Agent, Callable]]): Mapping of agent names to their instances
Args:
name (str, optional): Name of the group chat. Defaults to "InteractiveGroupChat".
description (str, optional): Description of the chat. Defaults to "An interactive group chat for multiple agents".
agents (List[Union[Agent, Callable]], optional): List of participating agents or callables. Defaults to empty list.
max_loops (int, optional): Maximum conversation turns. Defaults to 1.
output_type (str, optional): Type of output format. Defaults to "string".
interactive (bool, optional): Whether to enable interactive terminal mode. Defaults to False.
Raises:
ValueError: If invalid initialization parameters are provided
"""
def __init__(
self,
id: str = generate_api_key(prefix="swarms-"),
name: str = "InteractiveGroupChat",
description: str = "An interactive group chat for multiple agents",
agents: List[Union[Agent, Callable]] = [],
max_loops: int = 1,
output_type: str = "string",
interactive: bool = False,
):
self.id = id
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.output_type = output_type
self.interactive = interactive
# Initialize conversation history
self.conversation = Conversation(time_enabled=True)
# Create a mapping of agent names to agents for easy lookup
self.agent_map = {}
for agent in agents:
if isinstance(agent, Agent):
self.agent_map[agent.agent_name] = agent
elif callable(agent):
# For callable functions, use the function name as the agent name
self.agent_map[agent.__name__] = agent
self._validate_initialization()
self._setup_conversation_context()
self._update_agent_prompts()
def _validate_initialization(self) -> None:
"""
Validates the group chat configuration.
Raises:
ValueError: If any required components are missing or invalid
"""
if len(self.agents) < 1:
raise ValueError(
"At least one agent is required for the group chat"
)
if self.max_loops <= 0:
raise ValueError("Max loops must be greater than 0")
def _setup_conversation_context(self) -> None:
"""Sets up the initial conversation context with group chat information."""
agent_info = []
for agent in self.agents:
if isinstance(agent, Agent):
agent_info.append(
f"- {agent.agent_name}: {agent.system_prompt}"
)
elif callable(agent):
agent_info.append(
f"- {agent.__name__}: Custom callable function"
)
context = (
f"Group Chat Name: {self.name}\n"
f"Description: {self.description}\n"
f"Available Agents:\n" + "\n".join(agent_info)
)
self.conversation.add(role="System", content=context)
def _update_agent_prompts(self) -> None:
"""Updates each agent's system prompt with information about other agents and the group chat."""
agent_info = []
for agent in self.agents:
if isinstance(agent, Agent):
agent_info.append(
{
"name": agent.agent_name,
"description": agent.system_prompt,
}
)
elif callable(agent):
agent_info.append(
{
"name": agent.__name__,
"description": "Custom callable function",
}
)
group_context = (
f"\n\nYou are part of a group chat named '{self.name}' with the following description: {self.description}\n"
f"Other participants in this chat:\n"
)
for agent in self.agents:
if isinstance(agent, Agent):
# Create context excluding the current agent
other_agents = [
info
for info in agent_info
if info["name"] != agent.agent_name
]
agent_context = group_context
for other in other_agents:
agent_context += (
f"- {other['name']}: {other['description']}\n"
)
# Update the agent's system prompt
agent.system_prompt = (
agent.system_prompt + agent_context
)
logger.info(
f"Updated system prompt for agent: {agent.agent_name}"
)
def _extract_mentions(self, message: str) -> List[str]:
"""
Extracts @mentions from the message.
Args:
message (str): The input message
Returns:
List[str]: List of mentioned agent names
Raises:
InvalidMessageFormatError: If the message format is invalid
"""
try:
# Find all @mentions using regex
mentions = re.findall(r"@(\w+)", message)
return [
mention
for mention in mentions
if mention in self.agent_map
]
except Exception as e:
logger.error(f"Error extracting mentions: {e}")
raise InvalidMessageFormatError(
f"Invalid message format: {e}"
)
def start_interactive_session(self):
"""
Start an interactive terminal session for chatting with agents.
This method creates a REPL (Read-Eval-Print Loop) that allows users to:
- Chat with agents using @mentions
- See available agents and their descriptions
- Exit the session using 'exit' or 'quit'
- Get help using 'help' or '?'
"""
if not self.interactive:
raise InteractiveGroupChatError(
"Interactive mode is not enabled. Initialize with interactive=True"
)
print(f"\nWelcome to {self.name}!")
print(f"Description: {self.description}")
print("\nAvailable agents:")
for name, agent in self.agent_map.items():
if isinstance(agent, Agent):
print(
f"- @{name}: {agent.system_prompt.split('\n')[0]}"
)
else:
print(f"- @{name}: Custom callable function")
print("\nCommands:")
print("- Type 'help' or '?' for help")
print("- Type 'exit' or 'quit' to end the session")
print("- Use @agent_name to mention agents")
print("\nStart chatting:")
while True:
try:
# Get user input
user_input = input("\nYou: ").strip()
# Handle special commands
if user_input.lower() in ["exit", "quit"]:
print("Goodbye!")
break
if user_input.lower() in ["help", "?"]:
print("\nHelp:")
print("1. Mention agents using @agent_name")
print(
"2. You can mention multiple agents in one message"
)
print("3. Available agents:")
for name in self.agent_map:
print(f" - @{name}")
print(
"4. Type 'exit' or 'quit' to end the session"
)
continue
if not user_input:
continue
# Process the message and get responses
try:
response = self.run(user_input)
print("\nChat:")
print(response)
except NoMentionedAgentsError:
print(
"\nError: Please mention at least one agent using @agent_name"
)
except AgentNotFoundError as e:
print(f"\nError: {str(e)}")
except Exception as e:
print(f"\nAn error occurred: {str(e)}")
except KeyboardInterrupt:
print("\nSession terminated by user. Goodbye!")
break
except Exception as e:
print(f"\nAn unexpected error occurred: {str(e)}")
print(
"The session will continue. You can type 'exit' to end it."
)
def run(self, message: str) -> str:
"""
Process a message and get responses from mentioned agents.
If interactive mode is enabled, this will be called by start_interactive_session().
Otherwise, it can be called directly for single message processing.
"""
try:
# Extract mentioned agents
mentioned_agents = self._extract_mentions(message)
if not mentioned_agents:
raise NoMentionedAgentsError(
"No valid agents mentioned in the message"
)
# Add user message to conversation
self.conversation.add(role="User", content=message)
# Get responses from mentioned agents
for agent_name in mentioned_agents:
agent = self.agent_map.get(agent_name)
if not agent:
raise AgentNotFoundError(
f"Agent '{agent_name}' not found"
)
try:
# Get the complete conversation history
context = (
self.conversation.return_history_as_string()
)
# Get response from agent
if isinstance(agent, Agent):
response = agent.run(
task=f"{context}\nPlease respond to the latest message as {agent_name}."
)
else:
# For callable functions
response = agent(context)
# Add response to conversation
if response and not response.isspace():
self.conversation.add(
role=agent_name, content=response
)
logger.info(f"Agent {agent_name} responded")
except Exception as e:
logger.error(
f"Error getting response from {agent_name}: {e}"
)
self.conversation.add(
role=agent_name,
content=f"Error: Unable to generate response - {str(e)}",
)
return history_output_formatter(
self.conversation, self.output_type
)
except InteractiveGroupChatError as e:
logger.error(f"GroupChat error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise InteractiveGroupChatError(
f"Unexpected error occurred: {str(e)}"
)

@ -1,6 +1,8 @@
import traceback
from typing import Optional
import base64
import requests
from pathlib import Path
import asyncio
from typing import List
@ -9,11 +11,7 @@ from loguru import logger
import litellm
from pydantic import BaseModel
from litellm import completion, acompletion
litellm.set_verbose = True
litellm.ssl_verify = False
# litellm._turn_on_debug()
from litellm import completion, acompletion, supports_vision
class LiteLLMException(Exception):
@ -53,6 +51,35 @@ def get_audio_base64(audio_source: str) -> str:
return encoded_string
def get_image_base64(image_source: str) -> str:
"""
Convert image from a given source to a base64 encoded string.
Handles URLs, local file paths, and data URIs.
"""
# If already a data URI, return as is
if image_source.startswith("data:image"):
return image_source
# Handle URL
if image_source.startswith(("http://", "https://")):
response = requests.get(image_source)
response.raise_for_status()
image_data = response.content
# Handle local file
else:
with open(image_source, "rb") as file:
image_data = file.read()
# Get file extension for mime type
extension = Path(image_source).suffix.lower()
mime_type = (
f"image/{extension[1:]}" if extension else "image/jpeg"
)
encoded_string = base64.b64encode(image_data).decode("utf-8")
return f"data:{mime_type};base64,{encoded_string}"
class LiteLLM:
"""
This class represents a LiteLLM.
@ -72,12 +99,15 @@ class LiteLLM:
tool_choice: str = "auto",
parallel_tool_calls: bool = False,
audio: str = None,
retries: int = 3,
retries: int = 0,
verbose: bool = False,
caching: bool = False,
mcp_call: bool = False,
top_p: float = 1.0,
functions: List[dict] = None,
return_all: bool = False,
base_url: str = None,
api_key: str = None,
*args,
**kwargs,
):
@ -105,8 +135,11 @@ class LiteLLM:
self.mcp_call = mcp_call
self.top_p = top_p
self.functions = functions
self.audio = audio
self.return_all = return_all
self.base_url = base_url
self.api_key = api_key
self.modalities = []
self._cached_messages = {} # Cache for prepared messages
self.messages = [] # Initialize messages list
# Configure litellm settings
@ -135,7 +168,11 @@ class LiteLLM:
out = out.model_dump()
return out
def _prepare_messages(self, task: str) -> list:
def _prepare_messages(
self,
task: str,
img: str = None,
):
"""
Prepare the messages for the given task.
@ -145,91 +182,201 @@ class LiteLLM:
Returns:
list: A list of messages prepared for the task.
"""
# Check cache first
cache_key = f"{self.system_prompt}:{task}"
if cache_key in self._cached_messages:
return self._cached_messages[cache_key].copy()
self.check_if_model_supports_vision(img=img)
# Initialize messages
messages = []
if self.system_prompt:
# Add system prompt if present
if self.system_prompt is not None:
messages.append(
{"role": "system", "content": self.system_prompt}
)
messages.append({"role": "user", "content": task})
# Cache the prepared messages
self._cached_messages[cache_key] = messages.copy()
# Handle vision case
if img is not None:
messages = self.vision_processing(
task=task, image=img, messages=messages
)
else:
messages.append({"role": "user", "content": task})
return messages
def audio_processing(self, task: str, audio: str):
def anthropic_vision_processing(
self, task: str, image: str, messages: list
) -> list:
"""
Process the audio for the given task.
Args:
task (str): The task to be processed.
audio (str): The path or identifier for the audio file.
Process vision input specifically for Anthropic models.
Handles Anthropic's specific image format requirements.
"""
self.modalities.append("audio")
encoded_string = get_audio_base64(audio)
# Get base64 encoded image
image_url = get_image_base64(image)
# Extract mime type from the data URI or use default
mime_type = "image/jpeg" # default
if "data:" in image_url and ";base64," in image_url:
mime_type = image_url.split(";base64,")[0].split("data:")[
1
]
# Ensure mime type is one of the supported formats
supported_formats = [
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
]
if mime_type not in supported_formats:
mime_type = (
"image/jpeg" # fallback to jpeg if unsupported
)
# Append messages
self.messages.append(
# Construct Anthropic vision message
messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": task},
{
"type": "input_audio",
"input_audio": {
"data": encoded_string,
"format": "wav",
"type": "image_url",
"image_url": {
"url": image_url,
"format": mime_type,
},
},
],
}
)
def vision_processing(self, task: str, image: str):
return messages
def openai_vision_processing(
self, task: str, image: str, messages: list
) -> list:
"""
Process vision input specifically for OpenAI models.
Handles OpenAI's specific image format requirements.
"""
# Get base64 encoded image with proper format
image_url = get_image_base64(image)
# Prepare vision message
vision_message = {
"type": "image_url",
"image_url": {"url": image_url},
}
# Add format for specific models
extension = Path(image).suffix.lower()
mime_type = (
f"image/{extension[1:]}" if extension else "image/jpeg"
)
vision_message["image_url"]["format"] = mime_type
# Append vision message
messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": task},
vision_message,
],
}
)
return messages
def vision_processing(
self, task: str, image: str, messages: Optional[list] = None
):
"""
Process the image for the given task.
Handles different image formats and model requirements.
"""
# # # Handle Anthropic models separately
# # if "anthropic" in self.model_name.lower() or "claude" in self.model_name.lower():
# # messages = self.anthropic_vision_processing(task, image, messages)
# # return messages
# # Get base64 encoded image with proper format
# image_url = get_image_base64(image)
# # Prepare vision message
# vision_message = {
# "type": "image_url",
# "image_url": {"url": image_url},
# }
# # Add format for specific models
# extension = Path(image).suffix.lower()
# mime_type = f"image/{extension[1:]}" if extension else "image/jpeg"
# vision_message["image_url"]["format"] = mime_type
# # Append vision message
# messages.append(
# {
# "role": "user",
# "content": [
# {"type": "text", "text": task},
# vision_message,
# ],
# }
# )
# return messages
if (
"anthropic" in self.model_name.lower()
or "claude" in self.model_name.lower()
):
messages = self.anthropic_vision_processing(
task, image, messages
)
return messages
else:
messages = self.openai_vision_processing(
task, image, messages
)
return messages
def audio_processing(self, task: str, audio: str):
"""
self.modalities.append("vision")
Process the audio for the given task.
# Append messages
Args:
task (str): The task to be processed.
audio (str): The path or identifier for the audio file.
"""
encoded_string = get_audio_base64(audio)
# Append audio message
self.messages.append(
{
"role": "user",
"content": [
{"type": "text", "text": task},
{
"type": "image_url",
"image_url": {
"url": image,
# "detail": "high"
# "format": "image",
"type": "input_audio",
"input_audio": {
"data": encoded_string,
"format": "wav",
},
},
],
}
)
def handle_modalities(
self, task: str, audio: str = None, img: str = None
):
def check_if_model_supports_vision(self, img: str = None):
"""
Handle the modalities for the given task.
Check if the model supports vision.
"""
self.messages = [] # Reset messages
self.modalities.append("text")
if audio is not None:
self.audio_processing(task=task, audio=audio)
self.modalities.append("audio")
if img is not None:
self.vision_processing(task=task, image=img)
self.modalities.append("vision")
out = supports_vision(model=self.model_name)
if out is False:
raise ValueError(
f"Model {self.model_name} does not support vision"
)
def run(
self,
@ -256,13 +403,7 @@ class LiteLLM:
Exception: If there is an error in processing the request.
"""
try:
messages = self._prepare_messages(task)
if audio is not None or img is not None:
self.handle_modalities(
task=task, audio=audio, img=img
)
messages = self.messages
messages = self._prepare_messages(task=task, img=img)
# Base completion parameters
completion_params = {
@ -298,6 +439,9 @@ class LiteLLM:
{"functions": self.functions}
)
if self.base_url is not None:
completion_params["base_url"] = self.base_url
# Add modalities if needed
if self.modalities and len(self.modalities) >= 2:
completion_params["modalities"] = self.modalities
@ -308,12 +452,16 @@ class LiteLLM:
# Handle tool-based response
if self.tools_list_dictionary is not None:
return self.output_for_tools(response)
elif self.return_all is True:
return response.model_dump()
else:
# Return standard response content
return response.choices[0].message.content
except LiteLLMException as error:
logger.error(f"Error in LiteLLM run: {str(error)}")
logger.error(
f"Error in LiteLLM run: {str(error)} Traceback: {traceback.format_exc()}"
)
if "rate_limit" in str(error).lower():
logger.warning(
"Rate limit hit, retrying with exponential backoff..."

@ -294,7 +294,7 @@ def test_logging_configuration() -> bool:
try:
assert (
conversation_with_logging.enable_logging == True
conversation_with_logging.enable_logging is True
), "Logging should be enabled"
assert (
conversation_with_logging.logger is not None
@ -309,7 +309,7 @@ def test_logging_configuration() -> bool:
)
assert (
conversation_no_logging.enable_logging == False
conversation_no_logging.enable_logging is False
), "Logging should be disabled"
print("✓ Logging configuration test passed")
@ -629,7 +629,7 @@ def test_update_message_method() -> bool:
)
assert (
success == True
success is True
), "update_message should return True on success"
# Verify the update
@ -643,7 +643,7 @@ def test_update_message_method() -> bool:
updated_msg["metadata"]["version"] == 2
), "Metadata should be updated"
assert (
updated_msg["metadata"]["updated"] == True
updated_msg["metadata"]["updated"] is True
), "New metadata field should be added"
# Test update_message with non-existent ID
@ -651,7 +651,7 @@ def test_update_message_method() -> bool:
message_id=999999, content="This should fail"
)
assert (
failure == False
failure is False
), "update_message should return False for non-existent message"
print("✓ Update message method test passed")
@ -1106,7 +1106,7 @@ def test_enhanced_error_handling() -> bool:
# Test invalid credentials
try:
invalid_conversation = SupabaseConversation(
SupabaseConversation(
supabase_url="https://invalid-url.supabase.co",
supabase_key="invalid_key",
enable_logging=False,
@ -1139,7 +1139,7 @@ def test_enhanced_error_handling() -> bool:
"999999", "user", "content"
)
assert (
update_result == False
update_result is False
), "_update_flexible should return False for invalid ID"
# Test update_message with invalid ID
@ -1147,7 +1147,7 @@ def test_enhanced_error_handling() -> bool:
999999, "invalid content"
)
assert (
result == False
result is False
), "update_message should return False for invalid ID"
# Test search with empty query
@ -1174,7 +1174,7 @@ def test_enhanced_error_handling() -> bool:
"not_a_number", "user", "content"
)
assert (
invalid_update == False
invalid_update is False
), "Invalid ID should return False for update"
print("✓ Enhanced error handling test passed")

@ -0,0 +1,79 @@
# 'v0-1.0-md'
# https://api.v0.dev/v1/chat/completions
import time
from swarms import Agent
import os
from dotenv import load_dotenv
load_dotenv()
FRONT_END_DEVELOPMENT_PROMPT = """
You are an expert full-stack development agent with comprehensive expertise in:
Frontend Development:
- Modern React.js/Next.js architecture and best practices
- Advanced TypeScript implementation and type safety
- State-of-the-art UI/UX design patterns
- Responsive and accessible design principles
- Component-driven development with Storybook
- Modern CSS frameworks (Tailwind, Styled-Components)
- Performance optimization and lazy loading
Backend Development:
- Scalable microservices architecture
- RESTful and GraphQL API design
- Database optimization and schema design
- Authentication and authorization systems
- Serverless architecture and cloud services
- CI/CD pipeline implementation
- Security best practices and OWASP guidelines
Development Practices:
- Test-Driven Development (TDD)
- Clean Code principles
- Documentation (TSDoc/JSDoc)
- Git workflow and version control
- Performance monitoring and optimization
- Error handling and logging
- Code review best practices
Your core responsibilities include:
1. Developing production-grade TypeScript applications
2. Implementing modern, accessible UI components
3. Designing scalable backend architectures
4. Writing comprehensive documentation
5. Ensuring type safety across the stack
6. Optimizing application performance
7. Implementing security best practices
You maintain strict adherence to:
- TypeScript strict mode and proper typing
- SOLID principles and clean architecture
- Accessibility standards (WCAG 2.1)
- Performance budgets and metrics
- Security best practices
- Comprehensive test coverage
- Modern design system principles
"""
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",
agent_description="Advanced quantitative trading and algorithmic analysis agent",
system_prompt=FRONT_END_DEVELOPMENT_PROMPT,
max_loops=1,
model_name="v0-1.0-md",
dynamic_temperature_enabled=True,
output_type="all",
# safety_prompt_on=True,
llm_api_key=os.getenv("V0_API_KEY"),
llm_base_url="https://api.v0.dev/v1/chat/completions",
)
out = agent.run(
"Build a simple web app that allows users to upload a file and then download it."
)
time.sleep(10)
print(out)
Loading…
Cancel
Save