Merge branch 'master' into feat/Integrate-AgentRAGHandler

pull/903/head
harshalmore31 1 month ago committed by GitHub
commit 7197a37f85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1381,6 +1381,7 @@ print(out)
------- -------
## SpreadSheetSwarm ## SpreadSheetSwarm
SpreadSheetSwarm manages thousands of agents concurrently for efficient task processing. It supports one-to-many task distribution, scalability, and autosaving results. Initialized with a name, description, agents, and settings, the run method executes tasks and returns a dictionary of agent outputs. SpreadSheetSwarm manages thousands of agents concurrently for efficient task processing. It supports one-to-many task distribution, scalability, and autosaving results. Initialized with a name, description, agents, and settings, the run method executes tasks and returns a dictionary of agent outputs.
[Learn more:](https://docs.swarms.world/en/latest/swarms/structs/spreadsheet_swarm/) [Learn more:](https://docs.swarms.world/en/latest/swarms/structs/spreadsheet_swarm/)
@ -2029,35 +2030,18 @@ if __name__ == "__main__":
---------- ----------
## Onboarding Session ## Onboarding Session
Get onboarded now with the creator and lead maintainer of Swarms, Kye Gomez, who will show you how to get started with the installation, usage examples, and starting to build your custom use case! [CLICK HERE](https://cal.com/swarms/swarms-onboarding-session) Get onboarded now with the creator and lead maintainer of Swarms, Kye Gomez, who will show you how to get started with the installation, usage examples, and starting to build your custom use case! [CLICK HERE](https://cal.com/swarms/swarms-onboarding-session)
--- ---
## Documentation ## Documentation
Documentation is located here at: [docs.swarms.world](https://docs.swarms.world) Documentation is located here at: [docs.swarms.world](https://docs.swarms.world)
----- -----
## Folder Structure
The swarms package has been meticulously crafted for extreme usability and understanding,the swarms package is split up into various modules such as `swarms.agents` that holds pre-built agents, `swarms.structs` that holds a vast array of structures like `Agent` and multi agent structures. The package is split into various modules, with the most important being `structs`, `tools`, and `agents`.
```sh
├── __init__.py
├── agents/
├── artifacts/
├── client/
├── cli/
├── prompts/
├── schemas/
├── structs/
├── telemetry/
├── tools/
└── utils/
```
----
## 🫶 Contributions: ## 🫶 Contributions:
The easiest way to contribute is to pick any issue with the `good first issue` tag 💪. Read the Contributing guidelines [here](/CONTRIBUTING.md). Bug Report? [File here](https://github.com/swarms/gateway/issues) | Feature Request? [File here](https://github.com/swarms/gateway/issues) The easiest way to contribute is to pick any issue with the `good first issue` tag 💪. Read the Contributing guidelines [here](/CONTRIBUTING.md). Bug Report? [File here](https://github.com/swarms/gateway/issues) | Feature Request? [File here](https://github.com/swarms/gateway/issues)
@ -2067,17 +2051,20 @@ Swarms is an open-source project, and contributions are VERY welcome. If you wan
---- ----
## Community ### Connect With Us
| Platform | Link | Description |
|----------|------|-------------|
| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides |
| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles |
| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support |
| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements |
| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates |
| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos |
| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events |
Join our growing community around the world, for real-time support, ideas, and discussions on Swarms 😊
- View our official [Documents](https://docs.swarms.world)
- View our official [Blog](https://medium.com/@kyeg)
- Chat live with us on [Discord](https://discord.gg/jM3Z6M9uMq)
- Follow us on [Twitter](https://twitter.com/kyegomez)
- Connect with us on [LinkedIn](https://www.linkedin.com/company/the-swarm-corporation)
- Visit us on [YouTube](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ)
- Sign up for our events [Sign up here](https://lu.ma/5p2jnc2v)
## Citation ## Citation
If you use **swarms** in your research, please cite the project by referencing the metadata in [CITATION.cff](./CITATION.cff). If you use **swarms** in your research, please cite the project by referencing the metadata in [CITATION.cff](./CITATION.cff).

@ -187,9 +187,9 @@ nav:
- Agents: - Agents:
# - Overview: "swarms/structs/index.md" # - Overview: "swarms/structs/index.md"
- Concepts: - Concepts:
- Managing Prompts in Production: "swarms/prompts/main.md" # - Managing Prompts in Production: "swarms/prompts/main.md"
- Introduction into The Agent Architecture: "swarms/framework/agents_explained.md" - Introduction into The Agent Architecture: "swarms/framework/agents_explained.md"
# - Introduction to Tools: "swarms/tools/overview.md"
- Documentation: - Documentation:
- Agent Class Documentation: "swarms/structs/agent.md" - Agent Class Documentation: "swarms/structs/agent.md"
- Create and Run Agents from YAML: "swarms/agents/create_agents_yaml.md" - Create and Run Agents from YAML: "swarms/agents/create_agents_yaml.md"
@ -344,10 +344,13 @@ nav:
- Group Chat Example: "swarms/examples/groupchat_example.md" - Group Chat Example: "swarms/examples/groupchat_example.md"
- Sequential Workflow Example: "swarms/examples/sequential_example.md" - Sequential Workflow Example: "swarms/examples/sequential_example.md"
- SwarmRouter Example: "swarms/examples/swarm_router.md" - SwarmRouter Example: "swarms/examples/swarm_router.md"
- MultiAgentRouter Minimal Example: "swarms/examples/multi_agent_router_minimal.md"
- ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md" - ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md"
- MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md" - MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
- Unique Swarms: "swarms/examples/unique_swarms.md" - Unique Swarms: "swarms/examples/unique_swarms.md"
- Agents as Tools: "swarms/examples/agents_as_tools.md" - Agents as Tools: "swarms/examples/agents_as_tools.md"
- Aggregate Multi-Agent Responses: "swarms/examples/aggregate.md"
- Interactive GroupChat Example: "swarms/examples/interactive_groupchat_example.md"
- Applications: - Applications:
- Swarms DAO: "swarms/examples/swarms_dao.md" - Swarms DAO: "swarms/examples/swarms_dao.md"
- Swarms of Browser Agents: "swarms/examples/swarms_of_browser_agents.md" - Swarms of Browser Agents: "swarms/examples/swarms_of_browser_agents.md"

@ -156,3 +156,7 @@ logger.info("Task processing started")
- Processing time may vary based on task complexity - Processing time may vary based on task complexity
- Model response quality depends on input clarity - Model response quality depends on input clarity
- Resource usage scales with batch size - Resource usage scales with batch size
## Example Script
For a runnable demonstration, see the [reasoning_duo_batched.py](https://github.com/kyegomez/swarms/blob/master/examples/models/reasoning_duo_batched.py) example.

@ -34,6 +34,7 @@ Swarm architectures leverage these communication patterns to ensure that agents
| Spreadsheet Swarm | Manages tasks at scale, tracking agent outputs in a structured format like CSV files. | [Code Link](https://docs.swarms.world/en/latest/swarms/structs/spreadsheet_swarm/) | Large-scale marketing analytics, financial audits | | Spreadsheet Swarm | Manages tasks at scale, tracking agent outputs in a structured format like CSV files. | [Code Link](https://docs.swarms.world/en/latest/swarms/structs/spreadsheet_swarm/) | Large-scale marketing analytics, financial audits |
| Forest Swarm | A swarm structure that organizes agents in a tree-like hierarchy for complex decision-making processes. | [Code Link](https://docs.swarms.world/en/latest/swarms/structs/forest_swarm/) | Multi-stage workflows, hierarchical reinforcement learning | | Forest Swarm | A swarm structure that organizes agents in a tree-like hierarchy for complex decision-making processes. | [Code Link](https://docs.swarms.world/en/latest/swarms/structs/forest_swarm/) | Multi-stage workflows, hierarchical reinforcement learning |
| Swarm Router | Routes and chooses the swarm architecture based on the task requirements and available agents. | [Code Link](https://docs.swarms.world/en/latest/swarms/structs/swarm_router/) | Dynamic task routing, adaptive swarm architecture selection, optimized agent allocation | | Swarm Router | Routes and chooses the swarm architecture based on the task requirements and available agents. | [Code Link](https://docs.swarms.world/en/latest/swarms/structs/swarm_router/) | Dynamic task routing, adaptive swarm architecture selection, optimized agent allocation |
| MultiAgentRouter | Boss agent selects the best agent for each task. | [Minimal Example](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/mar/multi_agent_router_minimal.py) | Task-specific agent routing |

@ -0,0 +1,136 @@
# Aggregate Multi-Agent Responses
The `aggregate` function allows you to run multiple agents concurrently on the same task and then synthesize their responses using an intelligent aggregator agent. This is useful for getting diverse perspectives on a problem and then combining them into a comprehensive analysis.
## Installation
You can get started by first installing swarms with the following command, or [click here for more detailed installation instructions](https://docs.swarms.world/en/latest/swarms/install/install/):
```bash
pip3 install -U swarms
```
## Environment Variables
```txt
OPENAI_API_KEY=""
ANTHROPIC_API_KEY=""
```
## Function Parameters
### `workers: List[Callable]` (Required)
A list of Agent instances that will work on the task concurrently. Each agent should be a callable object (typically an Agent instance).
### `task: str` (Required)
The task or question that all agents will work on simultaneously. This should be a clear, specific prompt that allows for diverse perspectives.
### `type: HistoryOutputType` (Optional, Default: "all")
Controls the format of the returned conversation history. Available options:
| Type | Description |
|------|-------------|
| **"all"** | Returns the complete conversation including all agent responses and the final aggregation |
| **"list"** | Returns the conversation as a list format |
| **"dict"** or **"dictionary"** | Returns the conversation as a dictionary format |
| **"string"** or **"str"** | Returns only the final aggregated response as a string |
| **"final"** or **"last"** | Returns only the final aggregated response |
| **"json"** | Returns the conversation in JSON format |
| **"yaml"** | Returns the conversation in YAML format |
| **"xml"** | Returns the conversation in XML format |
| **"dict-all-except-first"** | Returns dictionary format excluding the first message |
| **"str-all-except-first"** | Returns string format excluding the first message |
| **"basemodel"** | Returns the conversation as a base model object |
| **"dict-final"** | Returns dictionary format with only the final response |
### `aggregator_model_name: str` (Optional, Default: "anthropic/claude-3-sonnet-20240229")
The model to use for the aggregator agent that synthesizes all the individual agent responses. This should be a model capable of understanding and summarizing complex multi-agent conversations.
## How It Works
1. **Concurrent Execution**: All agents in the `workers` list run the same task simultaneously
2. **Response Collection**: Individual agent responses are collected into a conversation
3. **Intelligent Aggregation**: A specialized aggregator agent analyzes all responses and creates a comprehensive synthesis
4. **Formatted Output**: The final result is returned in the specified format
## Code Example
```python
from swarms.structs.agent import Agent
from swarms.structs.ma_blocks import aggregate
# Create specialized agents for different perspectives
agents = [
Agent(
agent_name="Sector-Financial-Analyst",
agent_description="Senior financial analyst at BlackRock.",
system_prompt="You are a financial analyst tasked with optimizing asset allocations for a $50B portfolio. Provide clear, quantitative recommendations for each sector.",
max_loops=1,
model_name="gpt-4o-mini",
max_tokens=3000,
),
Agent(
agent_name="Sector-Risk-Analyst",
agent_description="Expert risk management analyst.",
system_prompt="You are a risk analyst responsible for advising on risk allocation within a $50B portfolio. Provide detailed insights on risk exposures for each sector.",
max_loops=1,
model_name="gpt-4o-mini",
max_tokens=3000,
),
Agent(
agent_name="Tech-Sector-Analyst",
agent_description="Technology sector analyst.",
system_prompt="You are a tech sector analyst focused on capital and risk allocations. Provide data-backed insights for the tech sector.",
max_loops=1,
model_name="gpt-4o-mini",
max_tokens=3000,
),
]
# Run the aggregate function
result = aggregate(
workers=agents,
task="What is the best sector to invest in?",
type="all", # Get complete conversation history
aggregator_model_name="anthropic/claude-3-sonnet-20240229"
)
print(result)
```
## Code Example
## Use Cases
| Use Case | Description |
|----------|-------------|
| **Investment Analysis** | Get multiple financial perspectives on investment decisions |
| **Research Synthesis** | Combine insights from different research agents |
| **Problem Solving** | Gather diverse approaches to complex problems |
| **Content Creation** | Generate comprehensive content from multiple specialized agents |
| **Decision Making** | Get balanced recommendations from different expert perspectives |
## Error Handling
The function includes validation for:
- Required parameters (`task` and `workers`)
- Proper data types (workers must be a list of callable objects)
- Agent compatibility
## Performance Considerations
- All agents run concurrently, so total execution time is limited by the slowest agent
- The aggregator agent processes all responses, so consider response length and complexity
- Memory usage scales with the number of agents and their response sizes

@ -0,0 +1,136 @@
# Interactive GroupChat Example
This is an example of the InteractiveGroupChat module in swarms. [Click here for full documentation](https://docs.swarms.world/en/latest/swarms/structs/interactive_groupchat/)
## Installation
You can get started by first installing swarms with the following command, or [click here for more detailed installation instructions](https://docs.swarms.world/en/latest/swarms/install/install/):
```bash
pip3 install -U swarms
```
## Environment Variables
```txt
OPENAI_API_KEY=""
ANTHROPIC_API_KEY=""
GROQ_API_KEY=""
```
# Code
## Interactive Session in Terminal
```python
from swarms import Agent
from swarms.structs.interactive_groupchat import InteractiveGroupChat
if __name__ == "__main__":
# Initialize agents
financial_advisor = Agent(
agent_name="FinancialAdvisor",
system_prompt="You are a financial advisor specializing in investment strategies and portfolio management.",
random_models_on=True,
output_type="final",
)
tax_expert = Agent(
agent_name="TaxExpert",
system_prompt="You are a tax expert who provides guidance on tax optimization and compliance.",
random_models_on=True,
output_type="final",
)
investment_analyst = Agent(
agent_name="InvestmentAnalyst",
system_prompt="You are an investment analyst focusing on market trends and investment opportunities.",
random_models_on=True,
output_type="final",
)
# Create a list of agents including both Agent instances and callables
agents = [
financial_advisor,
tax_expert,
investment_analyst,
]
# Initialize another chat instance in interactive mode
interactive_chat = InteractiveGroupChat(
name="Interactive Financial Advisory Team",
description="An interactive team of financial experts providing comprehensive financial advice",
agents=agents,
max_loops=1,
output_type="all",
interactive=True,
)
try:
# Start the interactive session
print("\nStarting interactive session...")
# interactive_chat.run("What is the best methodology to accumulate gold and silver commodities, and what is the best long-term strategy to accumulate them?")
interactive_chat.start_interactive_session()
except Exception as e:
print(f"An error occurred in interactive mode: {e}")
```
## Run Method // Manual Method
```python
from swarms import Agent
from swarms.structs.interactive_groupchat import InteractiveGroupChat
if __name__ == "__main__":
# Initialize agents
financial_advisor = Agent(
agent_name="FinancialAdvisor",
system_prompt="You are a financial advisor specializing in investment strategies and portfolio management.",
random_models_on=True,
output_type="final",
)
tax_expert = Agent(
agent_name="TaxExpert",
system_prompt="You are a tax expert who provides guidance on tax optimization and compliance.",
random_models_on=True,
output_type="final",
)
investment_analyst = Agent(
agent_name="InvestmentAnalyst",
system_prompt="You are an investment analyst focusing on market trends and investment opportunities.",
random_models_on=True,
output_type="final",
)
# Create a list of agents including both Agent instances and callables
agents = [
financial_advisor,
tax_expert,
investment_analyst,
]
# Initialize another chat instance in interactive mode
interactive_chat = InteractiveGroupChat(
name="Interactive Financial Advisory Team",
description="An interactive team of financial experts providing comprehensive financial advice",
agents=agents,
max_loops=1,
output_type="all",
interactive=False,
)
try:
# Start the interactive session
print("\nStarting interactive session...")
# interactive_chat.run("What is the best methodology to accumulate gold and silver commodities, and what is the best long-term strategy to accumulate them?")
interactive_chat.run('@TaxExpert how can I understand tax tactics for crypto payroll in solana?')
except Exception as e:
print(f"An error occurred in interactive mode: {e}")
```

@ -0,0 +1,33 @@
# MultiAgentRouter Minimal Example
This example shows how to route a task to the most suitable agent using `SwarmRouter` with `swarm_type="MultiAgentRouter"`.
```python
from swarms import Agent
from swarms.structs.swarm_router import SwarmRouter
agents = [
Agent(
agent_name="Researcher",
system_prompt="Answer questions briefly.",
model_name="gpt-4o-mini",
),
Agent(
agent_name="Coder",
system_prompt="Write small Python functions.",
model_name="gpt-4o-mini",
),
]
router = SwarmRouter(
name="multi-agent-router-demo",
description="Routes tasks to the most suitable agent",
agents=agents,
swarm_type="MultiAgentRouter"
)
result = router.run("Write a function that adds two numbers")
print(result)
```
View the source on [GitHub](https://github.com/kyegomez/swarms/blob/master/examples/multi_agent/mar/multi_agent_router_minimal.py).

@ -419,6 +419,8 @@ multi_agent_router = SwarmRouter(
result = multi_agent_router.run("Analyze the competitive landscape for our new product") result = multi_agent_router.run("Analyze the competitive landscape for our new product")
``` ```
See [MultiAgentRouter Minimal Example](../examples/multi_agent_router_minimal.md) for a lightweight demonstration.
### HierarchicalSwarm ### HierarchicalSwarm
Use Case: Creating a hierarchical structure of agents with a director. Use Case: Creating a hierarchical structure of agents with a director.

@ -1,8 +1,8 @@
### Swarms Tool Documentation # Swarms Tool Documentation
A tool is a Python function designed to perform specific tasks, with clear type annotations and comprehensive docstrings. Below are examples of tools to help you get started. A tool is a Python function designed to perform specific tasks with clear type annotations and comprehensive docstrings. Below are examples of financial tools to help you get started.
# Rules ## Rules
To create a tool in the Swarms environment, follow these rules: To create a tool in the Swarms environment, follow these rules:
@ -25,561 +25,515 @@ To create a tool in the Swarms environment, follow these rules:
- The function's input must be a string. - The function's input must be a string.
- The function's output must be a string. - The function's output must be a string.
## Example Financial Tools
### Example Tools ### Example 1: Fetch Stock Price from Yahoo Finance
### Examples and Anti-Examples
#### Example 1: Fetch Financial News
**Correct Implementation**
```python ```python
import requests import yfinance as yf
import os
def fetch_financial_news(query: str = "Nvidia news", num_articles: int = 5) -> str: def get_stock_price(symbol: str) -> str:
""" """
Fetches financial news from the Google News API and returns a formatted string of the top news. Fetches the current stock price from Yahoo Finance.
Args: Args:
query (str): The query term to search for news. Default is "Nvidia news". symbol (str): The stock symbol (e.g., "AAPL", "TSLA", "NVDA").
num_articles (int): The number of top articles to fetch. Default is 5.
Returns: Returns:
str: A formatted string of the top financial news articles. str: A formatted string containing the current stock price and basic information.
Raises: Raises:
ValueError: If the API response is invalid or there are no articles found. ValueError: If the stock symbol is invalid or data cannot be retrieved.
requests.exceptions.RequestException: If there is an error with the request. Exception: If there is an error with the API request.
""" """
url = "https://newsapi.org/v2/everything"
params = {
"q": query,
"apiKey": os.getenv("NEWSAPI_KEY"),
"pageSize": num_articles,
"sortBy": "relevancy",
}
try: try:
response = requests.get(url, params=params) # Remove any whitespace and convert to uppercase
response.raise_for_status() symbol = symbol.strip().upper()
data = response.json()
if not symbol:
if "articles" not in data or len(data["articles"]) == 0: raise ValueError("Stock symbol cannot be empty.")
raise ValueError("No articles found or invalid API response.")
# Fetch stock data
articles = data["articles"] stock = yf.Ticker(symbol)
formatted_articles = [] info = stock.info
for i, article in enumerate(articles, start=1): if not info or 'regularMarketPrice' not in info:
title = article.get("title", "No Title") raise ValueError(f"Unable to fetch data for symbol: {symbol}")
description = article.get("description", "No Description")
url = article.get("url", "No URL") current_price = info.get('regularMarketPrice', 'N/A')
formatted_articles.append( previous_close = info.get('regularMarketPreviousClose', 'N/A')
f"{i}. {title}\nDescription: {description}\nRead more: {url}\n" market_cap = info.get('marketCap', 'N/A')
) company_name = info.get('longName', symbol)
return "\n".join(formatted_articles) # Format market cap for readability
if isinstance(market_cap, (int, float)) and market_cap > 0:
except requests.exceptions.RequestException as e: if market_cap >= 1e12:
print(f"Request Error: {e}") market_cap_str = f"${market_cap/1e12:.2f}T"
raise elif market_cap >= 1e9:
market_cap_str = f"${market_cap/1e9:.2f}B"
elif market_cap >= 1e6:
market_cap_str = f"${market_cap/1e6:.2f}M"
else:
market_cap_str = f"${market_cap:,.0f}"
else:
market_cap_str = "N/A"
# Calculate price change
if isinstance(current_price, (int, float)) and isinstance(previous_close, (int, float)):
price_change = current_price - previous_close
price_change_percent = (price_change / previous_close) * 100
change_str = f"{price_change:+.2f} ({price_change_percent:+.2f}%)"
else:
change_str = "N/A"
result = f"""
Stock: {company_name} ({symbol})
Current Price: ${current_price}
Previous Close: ${previous_close}
Change: {change_str}
Market Cap: {market_cap_str}
""".strip()
return result
except ValueError as e: except ValueError as e:
print(f"Value Error: {e}") print(f"Value Error: {e}")
raise raise
except Exception as e:
print(f"Error fetching stock data: {e}")
raise
``` ```
**Incorrect Implementation** ### Example 2: Fetch Cryptocurrency Price from CoinGecko
```python ```python
import requests import requests
import os
def fetch_financial_news(query="Nvidia news", num_articles=5):
# Fetches financial news from the Google News API and returns a formatted string of the top news.
url = "https://newsapi.org/v2/everything"
params = {
"q": query,
"apiKey": os.getenv("NEWSAPI_KEY"),
"pageSize": num_articles,
"sortBy": "relevancy",
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if "articles" not in data or len(data["articles"]) == 0:
raise ValueError("No articles found or invalid API response.")
articles = data["articles"]
formatted_articles = []
for i, article in enumerate(articles, start=1):
title = article.get("title", "No Title")
description = article.get("description", "No Description")
url = article.get("url", "No URL")
formatted_articles.append(
f"{i}. {title}\nDescription: {description}\nRead more: {url}\n"
)
return "\n".join(formatted_articles)
```
**Issues with Incorrect Implementation:**
- No type annotations for arguments and return value.
- Missing comprehensive docstring.
#### Example 2: Convert Celsius to Fahrenheit
**Correct Implementation** def get_crypto_price(coin_id: str) -> str:
```python
def celsius_to_fahrenheit(celsius_str: str) -> str:
""" """
Converts a temperature from Celsius to Fahrenheit. Fetches the current cryptocurrency price from CoinGecko API.
Args: Args:
celsius_str (str): The temperature in Celsius as a string. coin_id (str): The cryptocurrency ID (e.g., "bitcoin", "ethereum", "cardano").
Returns: Returns:
str: The temperature converted to Fahrenheit as a formatted string. str: A formatted string containing the current crypto price and market data.
Raises: Raises:
ValueError: If the input cannot be converted to a float. ValueError: If the coin ID is invalid or data cannot be retrieved.
requests.exceptions.RequestException: If there is an error with the API request.
""" """
try: try:
celsius = float(celsius_str) # Remove any whitespace and convert to lowercase
fahrenheit = celsius * 9/5 + 32 coin_id = coin_id.strip().lower()
return f"{celsius}°C is {fahrenheit}°F"
if not coin_id:
raise ValueError("Coin ID cannot be empty.")
url = f"https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": coin_id,
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
"include_last_updated_at": "true"
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if coin_id not in data:
raise ValueError(f"Coin ID '{coin_id}' not found. Please check the spelling.")
coin_data = data[coin_id]
if not coin_data:
raise ValueError(f"No data available for coin ID: {coin_id}")
usd_price = coin_data.get('usd', 'N/A')
market_cap = coin_data.get('usd_market_cap', 'N/A')
volume_24h = coin_data.get('usd_24h_vol', 'N/A')
change_24h = coin_data.get('usd_24h_change', 'N/A')
last_updated = coin_data.get('last_updated_at', 'N/A')
# Format large numbers for readability
def format_number(value):
if isinstance(value, (int, float)) and value > 0:
if value >= 1e12:
return f"${value/1e12:.2f}T"
elif value >= 1e9:
return f"${value/1e9:.2f}B"
elif value >= 1e6:
return f"${value/1e6:.2f}M"
elif value >= 1e3:
return f"${value/1e3:.2f}K"
else:
return f"${value:,.2f}"
return "N/A"
# Format the result
result = f"""
Cryptocurrency: {coin_id.title()}
Current Price: ${usd_price:,.8f}" if isinstance(usd_price, (int, float)) else f"Current Price: {usd_price}
Market Cap: {format_number(market_cap)}
24h Volume: {format_number(volume_24h)}
24h Change: {change_24h:+.2f}%" if isinstance(change_24h, (int, float)) else f"24h Change: {change_24h}
Last Updated: {last_updated}
""".strip()
return result
except requests.exceptions.RequestException as e:
print(f"Request Error: {e}")
raise
except ValueError as e: except ValueError as e:
print(f"Value Error: {e}") print(f"Value Error: {e}")
raise raise
except Exception as e:
print(f"Error fetching crypto data: {e}")
raise
``` ```
**Incorrect Implementation** ### Example 3: Calculate Portfolio Performance
```python
def celsius_to_fahrenheit(celsius):
# Converts a temperature from Celsius to Fahrenheit.
celsius = float(celsius)
fahrenheit = celsius * 9/5 + 32
return f"{celsius}°C is {fahrenheit}°F"
```
**Issues with Incorrect Implementation:**
- No type annotations for arguments and return value.
- Missing comprehensive docstring.
- Input type is not enforced as string.
#### Example 3: Calculate Compound Interest
**Correct Implementation**
```python ```python
def calculate_compound_interest(principal_str: str, rate_str: str, time_str: str, n_str: str) -> str: def calculate_portfolio_performance(initial_investment_str: str, current_value_str: str, time_period_str: str) -> str:
""" """
Calculates compound interest. Calculates portfolio performance metrics including return percentage and annualized return.
Args: Args:
principal_str (str): The initial amount of money as a string. initial_investment_str (str): The initial investment amount as a string.
rate_str (str): The annual interest rate (decimal) as a string. current_value_str (str): The current portfolio value as a string.
time_str (str): The time the money is invested for in years as a string. time_period_str (str): The time period in years as a string.
n_str (str): The number of times that interest is compounded per year as a string.
Returns: Returns:
str: The amount of money accumulated after n years, including interest. str: A formatted string containing portfolio performance metrics.
Raises: Raises:
ValueError: If any of the inputs cannot be converted to the appropriate type or are negative. ValueError: If any of the inputs cannot be converted to the appropriate type or are negative.
""" """
try: try:
principal = float(principal_str) initial_investment = float(initial_investment_str)
rate = float(rate_str) current_value = float(current_value_str)
time = float(time_str) time_period = float(time_period_str)
n = int(n_str)
if principal < 0 or rate < 0 or time < 0 or n < 0: if initial_investment <= 0 or current_value < 0 or time_period <= 0:
raise ValueError("Inputs must be non-negative.") raise ValueError("Initial investment and time period must be positive, current value must be non-negative.")
# Calculate total return
total_return = current_value - initial_investment
total_return_percentage = (total_return / initial_investment) * 100
# Calculate annualized return
if time_period > 0:
annualized_return = ((current_value / initial_investment) ** (1 / time_period) - 1) * 100
else:
annualized_return = 0
# Determine performance status
if total_return > 0:
status = "Profitable"
elif total_return < 0:
status = "Loss"
else:
status = "Break-even"
result = f"""
Portfolio Performance Analysis:
Initial Investment: ${initial_investment:,.2f}
Current Value: ${current_value:,.2f}
Time Period: {time_period:.1f} years
Total Return: ${total_return:+,.2f} ({total_return_percentage:+.2f}%)
Annualized Return: {annualized_return:+.2f}%
Status: {status}
""".strip()
return result
amount = principal * (1 + rate / n) ** (n * time)
return f"The amount after {time} years is {amount:.2f}"
except ValueError as e: except ValueError as e:
print(f"Value Error: {e}") print(f"Value Error: {e}")
raise raise
``` except Exception as e:
print(f"Error calculating portfolio performance: {e}")
**Incorrect Implementation**
```python
def calculate_compound_interest(principal, rate, time, n):
# Calculates compound interest.
principal = float(principal)
rate = float(rate)
time = float(time)
n = int(n)
if principal < 0 or rate < 0 or time < 0 or n < 0:
raise ValueError("Inputs must be non-negative.")
amount = principal * (1 + rate / n) ** (n * time)
return f"The amount after {time} years is {amount:.2f}"
```
**Issues with Incorrect Implementation:**
- No type annotations for arguments and return value.
- Missing comprehensive docstring.
- Input types are not enforced as strings.
By following these rules and using the examples provided, you can create robust and well-documented tools in the Swarms environment. Ensure that all functions include proper type annotations, comprehensive docstrings, and that both input and output types are strings.
#### Example Tool 4: Reverse a String
**Functionality**: Reverses a given string.
```python
def reverse_string(s: str) -> str:
"""
Reverses a given string.
Args:
s (str): The string to reverse.
Returns:
str: The reversed string.
Raises:
TypeError: If the input is not a string.
"""
try:
if not isinstance(s, str):
raise TypeError("Input must be a string.")
return s[::-1]
except TypeError as e:
print(f"Type Error: {e}")
raise
```
#### Example Tool 5: Check Palindrome
**Functionality**: Checks if a given string is a palindrome.
```python
def is_palindrome(s: str) -> str:
"""
Checks if a given string is a palindrome.
Args:
s (str): The string to check.
Returns:
str: A message indicating whether the string is a palindrome or not.
Raises:
TypeError: If the input is not a string.
"""
try:
if not isinstance(s, str):
raise TypeError("Input must be a string.")
normalized_str = ''.join(filter(str.isalnum, s)).lower()
is_palindrome = normalized_str == normalized_str[::-1]
return f"The string '{s}' is {'a palindrome' if is_palindrome else 'not a palindrome'}."
except TypeError as e:
print(f"Type Error: {e}")
raise raise
``` ```
#### Example Tool 6: Fetch Current Weather ### Example 4: Calculate Compound Interest
**Functionality**: Fetches the current weather for a given city from the OpenWeatherMap API.
```python ```python
import requests def calculate_compound_interest(principal_str: str, rate_str: str, time_str: str, compounding_frequency_str: str) -> str:
import os
def fetch_current_weather(city: str) -> str:
""" """
Fetches the current weather for a given city from the OpenWeatherMap API. Calculates compound interest for investment planning.
Args: Args:
city (str): The name of the city to fetch the weather for. principal_str (str): The initial investment amount as a string.
rate_str (str): The annual interest rate (as decimal) as a string.
time_str (str): The investment time period in years as a string.
compounding_frequency_str (str): The number of times interest is compounded per year as a string.
Returns: Returns:
str: A formatted string of the current weather in the specified city. str: A formatted string containing the compound interest calculation results.
Raises: Raises:
ValueError: If the API response is invalid or the city is not found. ValueError: If any of the inputs cannot be converted to the appropriate type or are negative.
requests.exceptions.RequestException: If there is an error with the request.
""" """
url = "http://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": os.getenv("OPENWEATHERMAP_KEY"),
"units": "metric",
}
try: try:
response = requests.get(url, params=params) principal = float(principal_str)
response.raise_for_status() rate = float(rate_str)
data = response.json() time = float(time_str)
n = int(compounding_frequency_str)
if "weather" not in data or "main" not in data:
raise ValueError("Invalid API response or city not found.") if principal <= 0 or rate < 0 or time <= 0 or n <= 0:
raise ValueError("Principal, time, and compounding frequency must be positive. Rate must be non-negative.")
weather_description = data["weather"][0]["description"]
temperature = data["main"]["temp"] # Calculate compound interest
return f"The current weather in {city} is {weather_description} with a temperature of {temperature}°C." amount = principal * (1 + rate / n) ** (n * time)
interest_earned = amount - principal
except requests.exceptions.RequestException as e:
print(f"Request Error: {e}") # Calculate effective annual rate
raise effective_rate = ((1 + rate / n) ** n - 1) * 100
result = f"""
Compound Interest Calculation:
Principal: ${principal:,.2f}
Annual Rate: {rate*100:.2f}%
Time Period: {time:.1f} years
Compounding Frequency: {n} times per year
Final Amount: ${amount:,.2f}
Interest Earned: ${interest_earned:,.2f}
Effective Annual Rate: {effective_rate:.2f}%
""".strip()
return result
except ValueError as e: except ValueError as e:
print(f"Value Error: {e}") print(f"Value Error: {e}")
raise raise
except Exception as e:
print(f"Error calculating compound interest: {e}")
raise
``` ```
By following the examples provided, you can create your own tools to perform various tasks in the Swarms environment. Ensure each function includes type annotations, comprehensive docstrings, and appropriate error handling to make your tools robust and easy to use. ## Integrating Tools into an Agent
To integrate tools into an agent, simply pass callable functions with proper type annotations and documentation into the agent class.
## Integrate tools into Agent
To integrate tools into an agent, you'd simply just pass in a callable function with types and documentation into the agent class.
```python ```python
from swarms import Agent
# Initialize the financial analysis agent
from swarms import Agent, OpenAIChat # ChromaDB
import subprocess
# Model
llm = OpenAIChat(
temperature=0.1,
)
# Tools
def terminal(
code: str,
):
"""
Run code in the terminal.
Args:
code (str): The code to run in the terminal.
Returns:
str: The output of the code.
"""
out = subprocess.run(
code, shell=True, capture_output=True, text=True
).stdout
return str(out)
def browser(query: str):
"""
Search the query in the browser with the `browser` tool.
Args:
query (str): The query to search in the browser.
Returns:
str: The search results.
"""
import webbrowser
url = f"https://www.google.com/search?q={query}"
webbrowser.open(url)
return f"Searching for {query} in the browser."
def create_file(file_path: str, content: str):
"""
Create a file using the file editor tool.
Args:
file_path (str): The path to the file.
content (str): The content to write to the file.
Returns:
str: The result of the file creation operation.
"""
with open(file_path, "w") as file:
file.write(content)
return f"File {file_path} created successfully."
def file_editor(file_path: str, mode: str, content: str):
"""
Edit a file using the file editor tool.
Args:
file_path (str): The path to the file.
mode (str): The mode to open the file in.
content (str): The content to write to the file.
Returns:
str: The result of the file editing operation.
"""
with open(file_path, mode) as file:
file.write(content)
return f"File {file_path} edited successfully."
# Agent
agent = Agent( agent = Agent(
agent_name="Devin", agent_name="Financial-Analysis-Agent",
system_prompt=( system_prompt=(
"Autonomous agent that can interact with humans and other" "You are a professional financial analyst agent. Use the provided tools to "
" agents. Be Helpful and Kind. Use the tools provided to" "analyze stocks, cryptocurrencies, and investment performance. Provide "
" assist the user. Return all code in markdown format." "clear, accurate financial insights and recommendations. Always format "
"responses in markdown for better readability."
), ),
llm=llm, model_name="gpt-4o",
max_loops="auto", max_loops=3,
autosave=True, autosave=True,
dashboard=False, dashboard=False,
streaming_on=True,
verbose=True, verbose=True,
stopping_token="<DONE>", streaming_on=True,
interactive=True, dynamic_temperature_enabled=True,
tools=[terminal, browser, file_editor, create_file], saved_state_path="financial_agent.json",
# long_term_memory=chromadb, tools=[get_stock_price, get_crypto_price, calculate_portfolio_performance],
metadata_output_type="json", user_name="financial_analyst",
# List of schemas that the agent can handle retry_attempts=3,
# list_base_models=[tool_schema], context_length=200000,
function_calling_format_type="OpenAI",
function_calling_type="json", # or soon yaml
) )
# Run the agent # Run the agent
agent.run("Create a new file for a plan to take over the world.") response = agent("Analyze the current price of Apple stock and Bitcoin, then calculate the performance of a $10,000 investment in each over the past 2 years.")
print(response)
``` ```
## Complete Financial Analysis Example
## Example 2
```python ```python
import yfinance as yf
import os
import requests import requests
from swarms import Agent from swarms import Agent
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable def get_stock_price(symbol: str) -> str:
api_key = os.getenv("OPENAI_API_KEY") """
Fetches the current stock price from Yahoo Finance.
# Create an instance of the OpenAIChat class Args:
model = OpenAIChat( symbol (str): The stock symbol (e.g., "AAPL", "TSLA", "NVDA").
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
) Returns:
str: A formatted string containing the current stock price and basic information.
Raises:
ValueError: If the stock symbol is invalid or data cannot be retrieved.
Exception: If there is an error with the API request.
"""
try:
symbol = symbol.strip().upper()
if not symbol:
raise ValueError("Stock symbol cannot be empty.")
stock = yf.Ticker(symbol)
info = stock.info
if not info or 'regularMarketPrice' not in info:
raise ValueError(f"Unable to fetch data for symbol: {symbol}")
current_price = info.get('regularMarketPrice', 'N/A')
previous_close = info.get('regularMarketPreviousClose', 'N/A')
market_cap = info.get('marketCap', 'N/A')
company_name = info.get('longName', symbol)
if isinstance(market_cap, (int, float)) and market_cap > 0:
if market_cap >= 1e12:
market_cap_str = f"${market_cap/1e12:.2f}T"
elif market_cap >= 1e9:
market_cap_str = f"${market_cap/1e9:.2f}B"
elif market_cap >= 1e6:
market_cap_str = f"${market_cap/1e6:.2f}M"
else:
market_cap_str = f"${market_cap:,.0f}"
else:
market_cap_str = "N/A"
if isinstance(current_price, (int, float)) and isinstance(previous_close, (int, float)):
price_change = current_price - previous_close
price_change_percent = (price_change / previous_close) * 100
change_str = f"{price_change:+.2f} ({price_change_percent:+.2f}%)"
else:
change_str = "N/A"
result = f"""
Stock: {company_name} ({symbol})
Current Price: ${current_price}
Previous Close: ${previous_close}
Change: {change_str}
Market Cap: {market_cap_str}
""".strip()
return result
except ValueError as e:
print(f"Value Error: {e}")
raise
except Exception as e:
print(f"Error fetching stock data: {e}")
raise
def fetch_financial_news( def get_crypto_price(coin_id: str) -> str:
query: str = "Nvidia news", num_articles: int = 5
) -> str:
""" """
Fetches financial news from the Google News API and returns a formatted string of the top news. Fetches the current cryptocurrency price from CoinGecko API.
Args: Args:
api_key (str): Your Google News API key. coin_id (str): The cryptocurrency ID (e.g., "bitcoin", "ethereum", "cardano").
query (str): The query term to search for news. Default is "financial".
num_articles (int): The number of top articles to fetch. Default is 5.
Returns: Returns:
str: A formatted string of the top financial news articles. str: A formatted string containing the current crypto price and market data.
Raises: Raises:
ValueError: If the API response is invalid or there are no articles found. ValueError: If the coin ID is invalid or data cannot be retrieved.
requests.exceptions.RequestException: If there is an error with the request. requests.exceptions.RequestException: If there is an error with the API request.
""" """
url = "https://newsapi.org/v2/everything"
params = {
"q": query,
"apiKey": os.getenv("NEWSAPI_KEY"),
"pageSize": num_articles,
"sortBy": "relevancy",
}
try: try:
response = requests.get(url, params=params) coin_id = coin_id.strip().lower()
if not coin_id:
raise ValueError("Coin ID cannot be empty.")
url = f"https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": coin_id,
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true",
"include_last_updated_at": "true"
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
if "articles" not in data or len(data["articles"]) == 0: if coin_id not in data:
raise ValueError("No articles found or invalid API response.") raise ValueError(f"Coin ID '{coin_id}' not found. Please check the spelling.")
articles = data["articles"] coin_data = data[coin_id]
formatted_articles = []
if not coin_data:
for i, article in enumerate(articles, start=1): raise ValueError(f"No data available for coin ID: {coin_id}")
title = article.get("title", "No Title")
description = article.get("description", "No Description") usd_price = coin_data.get('usd', 'N/A')
url = article.get("url", "No URL") market_cap = coin_data.get('usd_market_cap', 'N/A')
formatted_articles.append( volume_24h = coin_data.get('usd_24h_vol', 'N/A')
f"{i}. {title}\nDescription: {description}\nRead more: {url}\n" change_24h = coin_data.get('usd_24h_change', 'N/A')
) last_updated = coin_data.get('last_updated_at', 'N/A')
return "\n".join(formatted_articles) def format_number(value):
if isinstance(value, (int, float)) and value > 0:
if value >= 1e12:
return f"${value/1e12:.2f}T"
elif value >= 1e9:
return f"${value/1e9:.2f}B"
elif value >= 1e6:
return f"${value/1e6:.2f}M"
elif value >= 1e3:
return f"${value/1e3:.2f}K"
else:
return f"${value:,.2f}"
return "N/A"
result = f"""
Cryptocurrency: {coin_id.title()}
Current Price: ${usd_price:,.8f}" if isinstance(usd_price, (int, float)) else f"Current Price: {usd_price}
Market Cap: {format_number(market_cap)}
24h Volume: {format_number(volume_24h)}
24h Change: {change_24h:+.2f}%" if isinstance(change_24h, (int, float)) else f"24h Change: {change_24h}
Last Updated: {last_updated}
""".strip()
return result
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"Request Error: {e}") print(f"Request Error: {e}")
raise raise
except ValueError as e: except ValueError as e:
print(f"Value Error: {e}") print(f"Value Error: {e}")
raise raise
except Exception as e:
print(f"Error fetching crypto data: {e}")
raise
# Initialize the financial analysis agent
# # Example usage:
# api_key = "ceabc81a7d8f45febfedadb27177f3a3"
# print(fetch_financial_news(api_key))
# Initialize the agent
agent = Agent( agent = Agent(
agent_name="Financial-Analysis-Agent", agent_name="Financial-Analysis-Agent",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, system_prompt=(
llm=model, "You are a professional financial analyst agent specializing in stock and "
max_loops=2, "cryptocurrency analysis. Use the provided tools to fetch real-time market "
"data and provide comprehensive financial insights. Always present data "
"in a clear, professional format with actionable recommendations."
),
model_name="gpt-4o",
max_loops=3,
autosave=True, autosave=True,
# dynamic_temperature_enabled=True,
dashboard=False, dashboard=False,
verbose=True, verbose=True,
streaming_on=True, streaming_on=True,
# interactive=True, # Set to False to disable interactive mode
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
saved_state_path="finance_agent.json", saved_state_path="financial_agent.json",
tools=[fetch_financial_news], tools=[get_stock_price, get_crypto_price],
# stopping_token="Stop!", user_name="financial_analyst",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="swarms_corp",
# # docs=
# # docs_folder="docs",
retry_attempts=3, retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000, context_length=200000,
# tool_schema=
# tools
# agent_ops_on=True,
# long_term_memory=ChromaDB(docs_folder="artifacts"),
) )
# Run the agent # Run the agent
response = agent("What are the latest financial news on Nvidia?") response = agent("What are the current prices and market data for Apple stock and Bitcoin? Provide a brief analysis of their performance.")
print(response) print(response)
``` ```

@ -1,130 +1,128 @@
# Swarms Cloud Subscription Tiers / Swarms 云订阅等级 # Swarms Cloud Subscription Tiers
!!! abstract "Overview / 概述" !!! abstract "Overview"
Choose the perfect plan for your agent infrastructure needs. All plans include our core features with additional benefits as you scale up. Choose the perfect plan for your agent infrastructure needs. All plans include our core features with additional benefits as you scale up.
为您的智能体基础设施需求选择完美方案。所有计划都包含我们的核心功能,随着您的扩展提供额外优势。
## Pricing Plans / 定价方案 ## Pricing Plans
### Free Tier / 免费版 ### Free Tier
!!! example "Free / 免费" !!! example "Free"
**$0/year / 0美元/年** **$0/year**
Perfect for getting started with AI development. Perfect for getting started with AI development.
非常适合开始AI开发。
[Get Started / 开始使用](https://swarms.world/platform/account){ .md-button .md-button--primary } [Get Started](https://swarms.world/platform/account){ .md-button .md-button--primary }
**What's Included: / 包含内容:** **What's Included:**
- [x] Sign up Bonus! / 注册奖励! - [x] Sign up Bonus!
- [x] Basic Access / 基础访问权限 - [x] Basic Access
- [x] Pay-Per-Use Pricing / 按使用量付费 - [x] Pay-Per-Use Pricing
- [x] Community Support / 社区支持 - [x] Community Support
- [x] Standard Processing Speed / 标准处理速度 - [x] Standard Processing Speed
### Premium Tier / 高级版 ### Premium Tier
!!! success "Premium / 高级版" !!! success "Premium"
**Monthly $100/month / 每月100美元** **Monthly $100/month**
**Yearly $1,020/year / 每年1,020美元** (Save 15% on annual billing / 年付节省15%) **Yearly $1,020/year** (Save 15% on annual billing)
[Subscribe Now / 立即订阅](https://swarms.world/platform/account){ .md-button .md-button--primary } [Subscribe Now](https://swarms.world/platform/account){ .md-button .md-button--primary }
**Everything in Free, plus: / 包含免费版所有内容,外加:** **Everything in Free, plus:**
- [x] Full Access to Explorer and Agents / 完全访问Explorer和智能体 - [x] Full Access to Explorer and Agents
- [x] Access to Premium Multi-Modality Models / 访问高级多模态模型 - [x] Access to Premium Multi-Modality Models
- [x] Priority Access to Swarms / 优先访问Swarms - [x] Priority Access to Swarms
- [x] High-Performance Infrastructure / 高性能基础设施 - [x] High-Performance Infrastructure
- [x] Exclusive Webinars and Tutorials / 独家网络研讨会和教程 - [x] Exclusive Webinars and Tutorials
- [x] Priority Support / 优先支持 - [x] Priority Support
- [x] Enhanced Security Features / 增强的安全功能 - [x] Enhanced Security Features
- [x] Early Access to New Models and Features / 新模型和功能的早期访问 - [x] Early Access to New Models and Features
### Enterprise Tier / 企业版 ### Enterprise Tier
!!! tip "Enterprise / 企业版" !!! tip "Enterprise"
**Contact for more Information / 联系获取更多信息** **Contact for more Information**
[Book a Call / 预约通话](https://cal.com/swarms){ .md-button .md-button--primary } [Book a Call](https://cal.com/swarms){ .md-button .md-button--primary }
**Everything in Premium, plus: / 包含高级版所有内容,外加:** **Everything in Premium, plus:**
- [x] High-Performance Infrastructure / 高性能基础设施 - [x] High-Performance Infrastructure
- [x] Batch API / 批量API - [x] Batch API
- [x] Early Access to New Swarms / 新Swarms的早期访问 - [x] Early Access to New Swarms
- [x] Dedicated 24/7 Support / 专属24/7支持 - [x] Dedicated 24/7 Support
- [x] Custom Solutions Engineering / 定制解决方案工程 - [x] Custom Solutions Engineering
- [x] Advanced Security Features / 高级安全功能 - [x] Advanced Security Features
- [x] Onsite Training and Onboarding / 现场培训和入职 - [x] Onsite Training and Onboarding
- [x] Custom Model Training / 定制模型训练 - [x] Custom Model Training
- [x] Priority Support / 优先支持 - [x] Priority Support
- [x] Pay-Per-Use Pricing / 按使用量付费 - [x] Pay-Per-Use Pricing
- [x] Enterprise Telemetry Platform / 企业遥测平台 - [x] Enterprise Telemetry Platform
- [x] Regular Check-In Strategy Sessions / 定期战略会议 - [x] Regular Check-In Strategy Sessions
## Feature Comparison / 功能对比 ## Feature Comparison
| Feature / 功能 | Free / 免费版 | Premium / 高级版 | Enterprise / 企业版 | | Feature | Free | Premium | Enterprise |
|---------|------|---------|------------| |---------|------|---------|------------|
| Sign up Bonus / 注册奖励 | ✅ | ✅ | ✅ | | Sign up Bonus | ✅ | ✅ | ✅ |
| Basic Access / 基础访问权限 | ✅ | ✅ | ✅ | | Basic Access | ✅ | ✅ | ✅ |
| Pay-Per-Use Pricing / 按使用量付费 | ✅ | ✅ | ✅ | | Pay-Per-Use Pricing | ✅ | ✅ | ✅ |
| Community Support / 社区支持 | ✅ | ✅ | ✅ | | Community Support | ✅ | ✅ | ✅ |
| Standard Processing Speed / 标准处理速度 | ✅ | ✅ | ✅ | | Standard Processing Speed | ✅ | ✅ | ✅ |
| Full Access to Explorer and Agents / 完全访问Explorer和智能体 | ❌ | ✅ | ✅ | | Full Access to Explorer and Agents | ❌ | ✅ | ✅ |
| Premium Multi-Modality Models / 高级多模态模型 | ❌ | ✅ | ✅ | | Premium Multi-Modality Models | ❌ | ✅ | ✅ |
| Priority Access to Swarms / 优先访问Swarms | ❌ | ✅ | ✅ | | Priority Access to Swarms | ❌ | ✅ | ✅ |
| High-Performance GPUs / 高性能GPU | ❌ | ✅ | ✅ | | High-Performance GPUs | ❌ | ✅ | ✅ |
| Exclusive Webinars and Tutorials / 独家网络研讨会和教程 | ❌ | ✅ | ✅ | | Exclusive Webinars and Tutorials | ❌ | ✅ | ✅ |
| Priority Support / 优先支持 | ❌ | ✅ | ✅ | | Priority Support | ❌ | ✅ | ✅ |
| Enhanced Security Features / 增强的安全功能 | ❌ | ✅ | ✅ | | Enhanced Security Features | ❌ | ✅ | ✅ |
| Early Access to New Models / 新模型的早期访问 | ❌ | ✅ | ✅ | | Early Access to New Models | ❌ | ✅ | ✅ |
| Batch API / 批量API | ❌ | ❌ | ✅ | | Batch API | ❌ | ❌ | ✅ |
| Dedicated 24/7 Support / 专属24/7支持 | ❌ | ❌ | ✅ | | Dedicated 24/7 Support | ❌ | ❌ | ✅ |
| Custom Solutions Engineering / 定制解决方案工程 | ❌ | ❌ | ✅ | | Custom Solutions Engineering | ❌ | ❌ | ✅ |
| Onsite Training and Onboarding / 现场培训和入职 | ❌ | ❌ | ✅ | | Onsite Training and Onboarding | ❌ | ❌ | ✅ |
| Custom Model Training / 定制模型训练 | ❌ | ❌ | ✅ | | Custom Model Training | ❌ | ❌ | ✅ |
## Rate Limits / 速率限制 ## Rate Limits
!!! info "Rate Limit Increases / 速率限制提升" !!! info "Rate Limit Increases"
- **Premium / 高级版**: 100% increase in rate limits / 速率限制提升100% - **Premium**: 100% increase in rate limits
- **Enterprise / 企业版**: Custom rate limits based on your needs (contact us for details) / 根据您的需求定制速率限制(详情请联系我们) - **Enterprise**: Custom rate limits based on your needs (contact us for details)
## Getting Started / 开始使用 ## Getting Started
1. Choose your plan / 选择您的方案 1. Choose your plan
2. Create your account / 创建您的账户 2. Create your account
3. Start building with Swarms! / 开始使用Swarms构建 3. Start building with Swarms!
!!! success "Need Help? / 需要帮助?" !!! success "Need Help?"
- For general questions: [Contact Support](mailto:kye@swarms.world) / 一般问题:[联系支持](mailto:kye@swarms.world) - For general questions: [Contact Support](mailto:kye@swarms.world)
- For enterprise inquiries: [Book a Call](https://cal.com/swarms) / 企业咨询:[预约通话](https://cal.com/swarms) - For enterprise inquiries: [Book a Call](https://cal.com/swarms)
- Upgrade Your Membership: [Upgrade Now](https://swarms.world/platform/account) / 升级会员:[立即升级](https://swarms.world/platform/account) - Upgrade Your Membership: [Upgrade Now](https://swarms.world/platform/account)

@ -0,0 +1,19 @@
from swarms.agents.reasoning_duo import ReasoningDuo
if __name__ == "__main__":
# Initialize the ReasoningDuo with two lightweight models
duo = ReasoningDuo(
model_names=["gpt-4o-mini", "gpt-4o-mini"],
# max_loops=1, # Remove this line
)
# Batched tasks to process
tasks = [
"Summarize the benefits of solar energy.",
"List three uses of robotics in healthcare.",
]
# Run the batch once and print each result
results = duo.batched_run(tasks)
for task, output in zip(tasks, results):
print(f"Task: {task}\nResult: {output}\n")

@ -0,0 +1,25 @@
from swarms import Agent
from swarms.structs.swarm_router import SwarmRouter
agents = [
Agent(
agent_name="Researcher",
system_prompt="Answer questions briefly.",
model_name="gpt-4o-mini",
),
Agent(
agent_name="Coder",
system_prompt="Write small Python functions.",
model_name="gpt-4o-mini",
),
]
router = SwarmRouter(
name="multi-agent-router-demo",
description="Routes tasks to the most suitable agent",
agents=agents,
swarm_type="MultiAgentRouter"
)
result = router.run("Write a function that adds two numbers")
print(result)

@ -1,4 +1,4 @@
from typing import List, Literal from typing import List, Literal, Dict, Callable, Any
from swarms.agents.consistency_agent import SelfConsistencyAgent from swarms.agents.consistency_agent import SelfConsistencyAgent
from swarms.agents.flexion_agent import ReflexionAgent from swarms.agents.flexion_agent import ReflexionAgent
@ -22,7 +22,6 @@ agent_types = Literal[
"AgentJudge", "AgentJudge",
] ]
class ReasoningAgentRouter: class ReasoningAgentRouter:
""" """
A Reasoning Agent that can answer questions and assist with various tasks using different reasoning strategies. A Reasoning Agent that can answer questions and assist with various tasks using different reasoning strategies.
@ -61,14 +60,104 @@ class ReasoningAgentRouter:
self.output_type = output_type self.output_type = output_type
self.num_knowledge_items = num_knowledge_items self.num_knowledge_items = num_knowledge_items
self.memory_capacity = memory_capacity self.memory_capacity = memory_capacity
# Added: Initialize the factory mapping dictionary
self._initialize_agent_factories()
# Added: Factory method initialization function
def _initialize_agent_factories(self) -> None:
"""
Initialize the agent factory mapping dictionary, mapping various agent types to their respective creation functions.
This method replaces the original if-elif chain, making the code easier to maintain and extend.
"""
self.agent_factories: Dict[str, Callable[[], Any]] = {
# ReasoningDuo factory methods
"reasoning-duo": self._create_reasoning_duo,
"reasoning-agent": self._create_reasoning_duo,
# SelfConsistencyAgent factory methods
"self-consistency": self._create_consistency_agent,
"consistency-agent": self._create_consistency_agent,
# IREAgent factory methods
"ire": self._create_ire_agent,
"ire-agent": self._create_ire_agent,
# Other agent type factory methods
"AgentJudge": self._create_agent_judge,
"ReflexionAgent": self._create_reflexion_agent,
"GKPAgent": self._create_gkp_agent
}
# Added: Concrete factory methods for various agent types
def _create_reasoning_duo(self):
"""Creates an agent instance for ReasoningDuo type"""
return ReasoningDuo(
agent_name=self.agent_name,
agent_description=self.description,
model_name=[self.model_name, self.model_name],
system_prompt=self.system_prompt,
output_type=self.output_type,
)
def _create_consistency_agent(self):
"""Creates an agent instance for SelfConsistencyAgent type"""
return SelfConsistencyAgent(
agent_name=self.agent_name,
description=self.description,
model_name=self.model_name,
system_prompt=self.system_prompt,
max_loops=self.max_loops,
num_samples=self.num_samples,
output_type=self.output_type,
)
def _create_ire_agent(self):
"""Creates an agent instance for IREAgent type"""
return IREAgent(
agent_name=self.agent_name,
description=self.description,
model_name=self.model_name,
system_prompt=self.system_prompt,
max_loops=self.max_loops,
max_iterations=self.num_samples,
output_type=self.output_type,
)
def _create_agent_judge(self):
"""Creates an agent instance for AgentJudge type"""
return AgentJudge(
agent_name=self.agent_name,
model_name=self.model_name,
system_prompt=self.system_prompt,
max_loops=self.max_loops,
)
def _create_reflexion_agent(self):
"""Creates an agent instance for ReflexionAgent type"""
return ReflexionAgent(
agent_name=self.agent_name,
system_prompt=self.system_prompt,
model_name=self.model_name,
max_loops=self.max_loops,
)
def _create_gkp_agent(self):
"""Creates an agent instance for GKPAgent type"""
return GKPAgent(
agent_name=self.agent_name,
model_name=self.model_name,
num_knowledge_items=self.num_knowledge_items,
)
def select_swarm(self): def select_swarm(self):
""" """
Selects and initializes the appropriate reasoning swarm based on the specified swarm type. Selects and initializes the appropriate reasoning swarm based on the specified swarm type.
Returns: Returns:
An instance of the selected reasoning swarm. An instance of the selected reasoning swarm.
""" """
# Commented out original if-elif chain implementation
"""
if ( if (
self.swarm_type == "reasoning-duo" self.swarm_type == "reasoning-duo"
or self.swarm_type == "reasoning-agent" or self.swarm_type == "reasoning-agent"
@ -132,6 +221,15 @@ class ReasoningAgentRouter:
) )
else: else:
raise ValueError(f"Invalid swarm type: {self.swarm_type}") raise ValueError(f"Invalid swarm type: {self.swarm_type}")
"""
# Added: Implementation using factory pattern and dictionary mapping
try:
# Get the corresponding creation function from the factory dictionary and call it
return self.agent_factories[self.swarm_type]()
except KeyError:
# Maintain the same error handling as the original code
raise ValueError(f"Invalid swarm type: {self.swarm_type}")
def run(self, task: str, *args, **kwargs): def run(self, task: str, *args, **kwargs):
""" """
@ -159,4 +257,4 @@ class ReasoningAgentRouter:
results = [] results = []
for task in tasks: for task in tasks:
results.append(self.run(task, *args, **kwargs)) results.append(self.run(task, *args, **kwargs))
return results return results

@ -657,7 +657,6 @@ class PulsarConversation(BaseCommunication):
try: try:
import pulsar import pulsar
pulsar_available = True
except ImportError: except ImportError:
logger.error("Pulsar client library is not installed") logger.error("Pulsar client library is not installed")
return False return False

@ -0,0 +1,164 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Union, Literal
SwarmType = Literal[
"AgentRearrange",
"MixtureOfAgents",
"SpreadSheetSwarm",
"SequentialWorkflow",
"ConcurrentWorkflow",
"GroupChat",
"MultiAgentRouter",
"AutoSwarmBuilder",
"HiearchicalSwarm",
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
]
class AgentSpec(BaseModel):
agent_name: Optional[str] = Field(
# default=None,
description="The unique name assigned to the agent, which identifies its role and functionality within the swarm.",
)
description: Optional[str] = Field(
default=None,
description="A detailed explanation of the agent's purpose, capabilities, and any specific tasks it is designed to perform.",
)
system_prompt: Optional[str] = Field(
default=None,
description="The initial instruction or context provided to the agent, guiding its behavior and responses during execution.",
)
model_name: Optional[str] = Field(
default="gpt-4o-mini",
description="The name of the AI model that the agent will utilize for processing tasks and generating outputs. For example: gpt-4o, gpt-4o-mini, openai/o3-mini",
)
auto_generate_prompt: Optional[bool] = Field(
default=False,
description="A flag indicating whether the agent should automatically create prompts based on the task requirements.",
)
max_tokens: Optional[int] = Field(
default=8192,
description="The maximum number of tokens that the agent is allowed to generate in its responses, limiting output length.",
)
temperature: Optional[float] = Field(
default=0.5,
description="A parameter that controls the randomness of the agent's output; lower values result in more deterministic responses.",
)
role: Optional[str] = Field(
default="worker",
description="The designated role of the agent within the swarm, which influences its behavior and interaction with other agents.",
)
max_loops: Optional[int] = Field(
default=1,
description="The maximum number of times the agent is allowed to repeat its task, enabling iterative processing if necessary.",
)
tools_list_dictionary: Optional[List[Dict[Any, Any]]] = Field(
default=None,
description="A dictionary of tools that the agent can use to complete its task.",
)
mcp_url: Optional[str] = Field(
default=None,
description="The URL of the MCP server that the agent can use to complete its task.",
)
class Config:
arbitrary_types_allowed = True
class AgentCompletion(BaseModel):
agent_config: Optional[AgentSpec] = Field(
None,
description="The configuration of the agent to be completed.",
)
task: Optional[str] = Field(
None, description="The task to be completed by the agent."
)
history: Optional[Union[Dict[Any, Any], List[Dict[str, str]]]] = (
Field(
default=None,
description="The history of the agent's previous tasks and responses. Can be either a dictionary or a list of message objects.",
)
)
model_config = {
"arbitrary_types_allowed": True,
"populate_by_name": True,
}
class Agents(BaseModel):
"""Configuration for a collection of agents that work together as a swarm to accomplish tasks."""
agents: List[AgentSpec] = Field(
description="A list containing the specifications of each agent that will participate in the swarm, detailing their roles and functionalities."
)
class SwarmSpec(BaseModel):
name: Optional[str] = Field(
None,
description="The name of the swarm, which serves as an identifier for the group of agents and their collective task.",
max_length=100,
)
description: Optional[str] = Field(
None,
description="A comprehensive description of the swarm's objectives, capabilities, and intended outcomes.",
)
agents: Optional[List[AgentSpec]] = Field(
None,
description="A list of agents or specifications that define the agents participating in the swarm.",
)
max_loops: Optional[int] = Field(
default=1,
description="The maximum number of execution loops allowed for the swarm, enabling repeated processing if needed.",
)
swarm_type: Optional[SwarmType] = Field(
None,
description="The classification of the swarm, indicating its operational style and methodology.",
)
rearrange_flow: Optional[str] = Field(
None,
description="Instructions on how to rearrange the flow of tasks among agents, if applicable.",
)
task: Optional[str] = Field(
None,
description="The specific task or objective that the swarm is designed to accomplish.",
)
img: Optional[str] = Field(
None,
description="An optional image URL that may be associated with the swarm's task or representation.",
)
return_history: Optional[bool] = Field(
True,
description="A flag indicating whether the swarm should return its execution history along with the final output.",
)
rules: Optional[str] = Field(
None,
description="Guidelines or constraints that govern the behavior and interactions of the agents within the swarm.",
)
tasks: Optional[List[str]] = Field(
None,
description="A list of tasks that the swarm should complete.",
)
messages: Optional[
Union[List[Dict[Any, Any]], Dict[Any, Any]]
] = Field(
None,
description="A list of messages that the swarm should complete.",
)
stream: Optional[bool] = Field(
False,
description="A flag indicating whether the swarm should stream its output.",
)
service_tier: Optional[str] = Field(
"standard",
description="The service tier to use for processing. Options: 'standard' (default) or 'flex' for lower cost but slower processing.",
)
class Config:
arbitrary_types_allowed = True

@ -41,7 +41,6 @@ from swarms.structs.meme_agent_persona_generator import (
) )
from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.model_router import ModelRouter from swarms.structs.model_router import ModelRouter
from swarms.structs.multi_agent_collab import MultiAgentCollaboration
from swarms.structs.multi_agent_exec import ( from swarms.structs.multi_agent_exec import (
get_agents_info, get_agents_info,
get_swarms_info, get_swarms_info,
@ -99,7 +98,6 @@ __all__ = [
"majority_voting", "majority_voting",
"most_frequent", "most_frequent",
"parse_code_completion", "parse_code_completion",
"MultiAgentCollaboration",
"AgentRearrange", "AgentRearrange",
"rearrange", "rearrange",
"RoundRobinSwarm", "RoundRobinSwarm",

@ -3,12 +3,25 @@ from typing import (
Dict, Dict,
TypedDict, TypedDict,
Any, Any,
Union,
TypeVar,
) )
from dataclasses import dataclass from dataclasses import dataclass
import csv import csv
import json
import yaml
from pathlib import Path from pathlib import Path
from enum import Enum from enum import Enum
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.schemas.swarms_api_schemas import AgentSpec
from litellm import model_list
import concurrent.futures
from tqdm import tqdm
# Type variable for agent configuration
AgentConfigType = TypeVar(
"AgentConfigType", bound=Union[AgentSpec, Dict[str, Any]]
)
class ModelName(str, Enum): class ModelName(str, Enum):
@ -32,12 +45,20 @@ class ModelName(str, Enum):
return model_name in cls.get_model_names() return model_name in cls.get_model_names()
class FileType(str, Enum):
"""Supported file types for agent configuration"""
CSV = "csv"
JSON = "json"
YAML = "yaml"
class AgentConfigDict(TypedDict): class AgentConfigDict(TypedDict):
"""TypedDict for agent configuration""" """TypedDict for agent configuration"""
agent_name: str agent_name: str
system_prompt: str system_prompt: str
model_name: str # Using str instead of ModelName for flexibility model_name: str
max_loops: int max_loops: int
autosave: bool autosave: bool
dashboard: bool dashboard: bool
@ -68,15 +89,26 @@ class AgentValidator:
"""Validates agent configuration data""" """Validates agent configuration data"""
@staticmethod @staticmethod
def validate_config(config: Dict[str, Any]) -> AgentConfigDict: def validate_config(
"""Validate and convert agent configuration""" config: Union[AgentSpec, Dict[str, Any]],
) -> AgentConfigDict:
"""Validate and convert agent configuration from either AgentSpec or Dict"""
try: try:
# Validate model name # Convert AgentSpec to dict if needed
if isinstance(config, AgentSpec):
config = config.model_dump()
# Validate model name using litellm model list
model_name = str(config["model_name"]) model_name = str(config["model_name"])
if not ModelName.is_valid_model(model_name): if not any(
valid_models = ModelName.get_model_names() model_name in model["model_name"]
for model in model_list
):
valid_models = [
model["model_name"] for model in model_list
]
raise AgentValidationError( raise AgentValidationError(
f"Invalid model name. Must be one of: {', '.join(valid_models)}", "Invalid model name. Must be one of the supported litellm models",
"model_name", "model_name",
model_name, model_name,
) )
@ -138,38 +170,36 @@ class AgentValidator:
class AgentLoader: class AgentLoader:
"""Class to manage agents through CSV with type safety""" """Class to manage agents through various file formats with type safety and high performance"""
csv_path: Path def __init__(
self, file_path: Union[str, Path], max_workers: int = 10
def __post_init__(self) -> None: ):
"""Convert string path to Path object if necessary""" """Initialize the AgentLoader with file path and max workers for parallel processing"""
if isinstance(self.csv_path, str): self.file_path = (
self.csv_path = Path(self.csv_path) Path(file_path)
if isinstance(file_path, str)
else file_path
)
self.max_workers = max_workers
@property @property
def headers(self) -> List[str]: def file_type(self) -> FileType:
"""CSV headers for agent configuration""" """Determine the file type based on extension"""
return [ ext = self.file_path.suffix.lower()
"agent_name", if ext == ".csv":
"system_prompt", return FileType.CSV
"model_name", elif ext == ".json":
"max_loops", return FileType.JSON
"autosave", elif ext in [".yaml", ".yml"]:
"dashboard", return FileType.YAML
"verbose", else:
"dynamic_temperature", raise ValueError(f"Unsupported file type: {ext}")
"saved_state_path",
"user_name", def create_agent_file(
"retry_attempts", self, agents: List[Union[AgentSpec, Dict[str, Any]]]
"context_length", ) -> None:
"return_step_meta", """Create a file with validated agent configurations"""
"output_type",
"streaming",
]
def create_agent_csv(self, agents: List[Dict[str, Any]]) -> None:
"""Create a CSV file with validated agent configurations"""
validated_agents = [] validated_agents = []
for agent in agents: for agent in agents:
try: try:
@ -183,81 +213,71 @@ class AgentLoader:
) )
raise raise
with open(self.csv_path, "w", newline="") as f: if self.file_type == FileType.CSV:
writer = csv.DictWriter(f, fieldnames=self.headers) self._write_csv(validated_agents)
writer.writeheader() elif self.file_type == FileType.JSON:
writer.writerows(validated_agents) self._write_json(validated_agents)
elif self.file_type == FileType.YAML:
self._write_yaml(validated_agents)
print( print(
f"Created CSV with {len(validated_agents)} agents at {self.csv_path}" f"Created {self.file_type.value} file with {len(validated_agents)} agents at {self.file_path}"
) )
def load_agents(self, file_type: str = "csv") -> List[Agent]: def load_agents(self) -> List[Agent]:
"""Load and create agents from CSV or JSON with validation""" """Load and create agents from file with validation and parallel processing"""
if file_type == "csv": if not self.file_path.exists():
if not self.csv_path.exists():
raise FileNotFoundError(
f"CSV file not found at {self.csv_path}"
)
return self._load_agents_from_csv()
elif file_type == "json":
return self._load_agents_from_json()
else:
raise ValueError(
"Unsupported file type. Use 'csv' or 'json'."
)
def _load_agents_from_csv(self) -> List[Agent]:
"""Load agents from a CSV file"""
agents: List[Agent] = []
with open(self.csv_path, "r") as f:
reader = csv.DictReader(f)
for row in reader:
try:
validated_config = AgentValidator.validate_config(
row
)
agent = self._create_agent(validated_config)
agents.append(agent)
except AgentValidationError as e:
print(
f"Skipping invalid agent configuration: {e}"
)
continue
print(f"Loaded {len(agents)} agents from {self.csv_path}")
return agents
def _load_agents_from_json(self) -> List[Agent]:
"""Load agents from a JSON file"""
import json
if not self.csv_path.with_suffix(".json").exists():
raise FileNotFoundError( raise FileNotFoundError(
f"JSON file not found at {self.csv_path.with_suffix('.json')}" f"File not found at {self.file_path}"
) )
if self.file_type == FileType.CSV:
agents_data = self._read_csv()
elif self.file_type == FileType.JSON:
agents_data = self._read_json()
elif self.file_type == FileType.YAML:
agents_data = self._read_yaml()
# Process agents in parallel with progress bar
agents: List[Agent] = [] agents: List[Agent] = []
with open(self.csv_path.with_suffix(".json"), "r") as f: with concurrent.futures.ThreadPoolExecutor(
agents_data = json.load(f) max_workers=self.max_workers
for agent in agents_data: ) as executor:
futures = []
for agent_data in agents_data:
futures.append(
executor.submit(self._process_agent, agent_data)
)
# Use tqdm to show progress
for future in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Loading agents",
):
try: try:
validated_config = AgentValidator.validate_config( agent = future.result()
agent if agent:
) agents.append(agent)
agent = self._create_agent(validated_config) except Exception as e:
agents.append(agent) print(f"Error processing agent: {e}")
except AgentValidationError as e:
print(
f"Skipping invalid agent configuration: {e}"
)
continue
print( print(f"Loaded {len(agents)} agents from {self.file_path}")
f"Loaded {len(agents)} agents from {self.csv_path.with_suffix('.json')}"
)
return agents return agents
def _process_agent(
self, agent_data: Union[AgentSpec, Dict[str, Any]]
) -> Union[Agent, None]:
"""Process a single agent configuration"""
try:
validated_config = AgentValidator.validate_config(
agent_data
)
return self._create_agent(validated_config)
except AgentValidationError as e:
print(f"Skipping invalid agent configuration: {e}")
return None
def _create_agent( def _create_agent(
self, validated_config: AgentConfigDict self, validated_config: AgentConfigDict
) -> Agent: ) -> Agent:
@ -281,3 +301,36 @@ class AgentLoader:
output_type=validated_config["output_type"], output_type=validated_config["output_type"],
streaming_on=validated_config["streaming"], streaming_on=validated_config["streaming"],
) )
def _write_csv(self, agents: List[Dict[str, Any]]) -> None:
"""Write agents to CSV file"""
with open(self.file_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=agents[0].keys())
writer.writeheader()
writer.writerows(agents)
def _write_json(self, agents: List[Dict[str, Any]]) -> None:
"""Write agents to JSON file"""
with open(self.file_path, "w") as f:
json.dump(agents, f, indent=2)
def _write_yaml(self, agents: List[Dict[str, Any]]) -> None:
"""Write agents to YAML file"""
with open(self.file_path, "w") as f:
yaml.dump(agents, f, default_flow_style=False)
def _read_csv(self) -> List[Dict[str, Any]]:
"""Read agents from CSV file"""
with open(self.file_path, "r") as f:
reader = csv.DictReader(f)
return list(reader)
def _read_json(self) -> List[Dict[str, Any]]:
"""Read agents from JSON file"""
with open(self.file_path, "r") as f:
return json.load(f)
def _read_yaml(self) -> List[Dict[str, Any]]:
"""Read agents from YAML file"""
with open(self.file_path, "r") as f:
return yaml.safe_load(f)

@ -1,242 +0,0 @@
from typing import List, Callable
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
# def select_next_speaker_bid(
# step: int,
# agents: List[Agent],
# ) -> int:
# """Selects the next speaker."""
# bids = []
# for agent in agents:
# bid = ask_for_bid(agent)
# bids.append(bid)
# max_value = max(bids)
# max_indices = [i for i, x in enumerate(bids) if x == max_value]
# idx = random.choice(max_indices)
# return idx
def select_next_speaker_roundtable(
step: int, agents: List[Agent]
) -> int:
"""Selects the next speaker."""
return step % len(agents)
def select_next_speaker_director(
step: int, agents: List[Agent], director: Agent
) -> int:
# if the step if even => director
# => director selects next speaker
if step % 2 == 1:
idx = 0
else:
idx = director.select_next_speaker() + 1
return idx
def run_director(self, task: str):
"""Runs the multi-agent collaboration with a director."""
n = 0
self.reset()
self.inject("Debate Moderator", task)
print("(Debate Moderator): \n")
while n < self.max_loops:
name, message = self.step()
print(f"({name}): {message}\n")
n += 1
# [MAYBE]: Add type hints
class MultiAgentCollaboration(BaseSwarm):
"""
Multi-agent collaboration class.
Attributes:
agents (List[Agent]): The agents in the collaboration.
selection_function (callable): The function that selects the next speaker.
Defaults to select_next_speaker.
max_loops (int): The maximum number of iterations. Defaults to 10.
autosave (bool): Whether to autosave the state of all agents. Defaults to True.
saved_file_path_name (str): The path to the saved file. Defaults to
"multi_agent_collab.json".
stopping_token (str): The token that stops the collaboration. Defaults to
"<DONE>".
results (list): The results of the collaboration. Defaults to [].
logger (logging.Logger): The logger. Defaults to logger.
logging (bool): Whether to log the collaboration. Defaults to True.
Methods:
reset: Resets the state of all agents.
inject: Injects a message into the collaboration.
inject_agent: Injects an agent into the collaboration.
step: Steps through the collaboration.
ask_for_bid: Asks an agent for a bid.
select_next_speaker: Selects the next speaker.
run: Runs the collaboration.
format_results: Formats the results of the run method.
Usage:
>>> from swarm_models import OpenAIChat
>>> from swarms.structs import Agent
>>> from swarms.swarms.multi_agent_collab import MultiAgentCollaboration
>>>
>>> # Initialize the language model
>>> llm = OpenAIChat(
>>> temperature=0.5,
>>> )
>>>
>>>
>>> ## Initialize the workflow
>>> agent = Agent(llm=llm, max_loops=1, dashboard=True)
>>>
>>> # Run the workflow on a task
>>> out = agent.run("Generate a 10,000 word blog on health and wellness.")
>>>
>>> # Initialize the multi-agent collaboration
>>> swarm = MultiAgentCollaboration(
>>> agents=[agent],
>>> max_loops=4,
>>> )
>>>
>>> # Run the multi-agent collaboration
>>> swarm.run()
>>>
>>> # Format the results of the multi-agent collaboration
>>> swarm.format_results(swarm.results)
"""
def __init__(
self,
name: str = "MultiAgentCollaboration",
description: str = "A multi-agent collaboration.",
director: Agent = None,
agents: List[Agent] = None,
select_next_speaker: Callable = None,
max_loops: int = 10,
autosave: bool = True,
saved_file_path_name: str = "multi_agent_collab.json",
stopping_token: str = "<DONE>",
logging: bool = True,
*args,
**kwargs,
):
super().__init__(
name=name,
description=description,
agents=agents,
*args,
**kwargs,
)
self.name = name
self.description = description
self.director = director
self.agents = agents
self.select_next_speaker = select_next_speaker
self._step = 0
self.max_loops = max_loops
self.autosave = autosave
self.saved_file_path_name = saved_file_path_name
self.stopping_token = stopping_token
self.results = []
self.logger = logger
self.logging = logging
# Conversation
self.conversation = Conversation(
time_enabled=False, *args, **kwargs
)
def default_select_next_speaker(
self, step: int, agents: List[Agent]
) -> int:
"""Default speaker selection function."""
return step % len(agents)
def inject(self, name: str, message: str):
"""Injects a message into the multi-agent collaboration."""
for agent in self.agents:
self.conversation.add(name, message)
agent.run(self.conversation.return_history_as_string())
self._step += 1
def step(self) -> str:
"""Steps through the multi-agent collaboration."""
speaker_idx = self.select_next_speaker(
self._step, self.agents
)
speaker = self.agents[speaker_idx]
message = speaker.send()
for receiver in self.agents:
self.conversation.add(speaker.name, message)
receiver.run(self.conversation.return_history_as_string())
self._step += 1
if self.logging:
self.log_step(speaker, message)
return self.conversation.return_history_as_string()
def log_step(self, speaker: str, response: str):
"""Logs the step of the multi-agent collaboration."""
self.logger.info(f"{speaker.name}: {response}")
def run(self, task: str, *args, **kwargs):
"""Runs the multi-agent collaboration."""
for _ in range(self.max_loops):
result = self.step()
if self.autosave:
self.save_state()
if self.stopping_token in result:
break
return self.conversation.return_history_as_string()
# def format_results(self, results):
# """Formats the results of the run method"""
# formatted_results = "\n".join(
# [
# f"{result['agent']} responded: {result['response']}"
# for result in results
# ]
# )
# return formatted_results
# def save(self):
# """Saves the state of all agents."""
# state = {
# "step": self._step,
# "results": [
# {"agent": r["agent"].name, "response": r["response"]}
# for r in self.results
# ],
# }
# with open(self.saved_file_path_name, "w") as file:
# json.dump(state, file)
# def load(self):
# """Loads the state of all agents."""
# with open(self.saved_file_path_name) as file:
# state = json.load(file)
# self._step = state["step"]
# self.results = state["results"]
# return state
# def __repr__(self):
# return (
# f"MultiAgentCollaboration(agents={self.agents},"
# f" selection_function={self.select_next_speaker},"
# f" max_loops={self.max_loops}, autosave={self.autosave},"
# f" saved_file_path_name={self.saved_file_path_name})"
# )

@ -212,7 +212,7 @@ class RedisConversationTester:
all_messages = self.conversation.return_messages_as_list() all_messages = self.conversation.return_messages_as_list()
if len(all_messages) > 0: if len(all_messages) > 0:
self.conversation.update(0, "user", "updated message") self.conversation.update(0, "user", "updated message")
updated_message = self.conversation.query(0) self.conversation.query(0)
assert True, "Update method executed successfully" assert True, "Update method executed successfully"
def test_clear(self): def test_clear(self):

@ -6,7 +6,7 @@ import pytest
from swarms import Agent from swarms import Agent
from swarm_models import OpenAIChat from swarm_models import OpenAIChat
from swarms.structs.multi_agent_collab import MultiAgentCollaboration from experimental.multi_agent_collab import MultiAgentCollaboration
# Initialize the director agent # Initialize the director agent

@ -1,12 +1,8 @@
import json
from swarms.structs import Agent from swarms.structs import Agent
from swarms.prompts.logistics import ( from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt, Quality_Control_Agent_Prompt,
) )
from swarms import BaseTool
import litellm
litellm._turn_on_debug()
# Image for analysis # Image for analysis
factory_image = "image.jpg" factory_image = "image.jpg"
@ -43,19 +39,21 @@ def security_analysis(danger_level: str = None) -> str:
return "Unknown danger level" return "Unknown danger level"
schema = BaseTool().function_to_dict(security_analysis) # schema = BaseTool().function_to_dict(security_analysis)
print(json.dumps(schema, indent=4)) # print(json.dumps(schema, indent=4))
# Quality control agent # Quality control agent
quality_control_agent = Agent( quality_control_agent = Agent(
agent_name="Quality Control Agent", agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.", agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
model_name="anthropic/claude-3-opus-20240229", # model_name="anthropic/claude-3-opus-20240229",
model_name="gpt-4o-mini",
system_prompt=Quality_Control_Agent_Prompt, system_prompt=Quality_Control_Agent_Prompt,
multi_modal=True, multi_modal=True,
max_loops=1, max_loops=1,
output_type="str-all-except-first", output_type="str-all-except-first",
tools_list_dictionary=[schema], # tools_list_dictionary=[schema],
tools=[security_analysis],
) )
Loading…
Cancel
Save