Merge branch 'master' into bugfix/Multi-Modal-+-Function-Calling-Agents

pull/923/head
harshalmore31 5 days ago committed by GitHub
commit 61fa99aa71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -61,19 +61,19 @@ extra:
provider: google
property: G-MPE9C65596
alternate:
- name: English
link: /
lang: en
- name: 简体中文
link: /zh/
lang: zh
- name: 日本語
link: /ja/
lang: ja
- name: 한국어
link: /ko/
lang: ko
# alternate:
# - name: English
# link: /
# lang: en
# - name: 简体中文
# link: /zh/
# lang: zh
# - name: 日本語
# link: /ja/
# lang: ja
# - name: 한국어
# link: /ko/
# lang: ko
theme:
name: material
@ -317,6 +317,7 @@ nav:
- Agent with Structured Outputs: "swarms/examples/agent_structured_outputs.md"
- Agents with Vision: "swarms/examples/vision_processing.md"
- Agent with Multiple Images: "swarms/examples/multiple_images.md"
- Agents with Vision and Tool Usage: "swarms/examples/vision_tools.md"
- Gradio Chat Interface: "swarms/ui/main.md"
- Various Model Providers:
- OpenAI: "swarms/examples/openai_example.md"
@ -348,7 +349,8 @@ nav:
- SwarmRouter Example: "swarms/examples/swarm_router.md"
- MultiAgentRouter Minimal Example: "swarms/examples/multi_agent_router_minimal.md"
- ConcurrentWorkflow Example: "swarms/examples/concurrent_workflow.md"
- MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
# - MixtureOfAgents Example: "swarms/examples/mixture_of_agents.md"
- Mixture of Agents Example: "swarms/examples/moa_example.md"
- Unique Swarms: "swarms/examples/unique_swarms.md"
- Agents as Tools: "swarms/examples/agents_as_tools.md"
- Aggregate Multi-Agent Responses: "swarms/examples/aggregate.md"

@ -0,0 +1,132 @@
# Mixture of Agents Example
The Mixture of Agents (MoA) is a sophisticated multi-agent architecture that implements parallel processing with iterative refinement. This approach processes multiple specialized agents simultaneously, concatenates their outputs, and then performs multiple parallel runs to achieve consensus or enhanced results.
## How It Works
1. **Parallel Processing**: Multiple agents work simultaneously on the same input
2. **Output Concatenation**: Results from all agents are combined into a unified response
3. **Iterative Refinement**: The process repeats for `n` layers/iterations to improve quality
4. **Consensus Building**: Multiple runs help achieve more reliable and comprehensive outputs
This architecture is particularly effective for complex tasks that benefit from diverse perspectives and iterative improvement, such as financial analysis, risk assessment, and multi-faceted problem solving.
![Mixture of Agents](https://files.readme.io/ddb138e-moa-3layer.png)
## Installation
Install the swarms package using pip:
```bash
pip install -U swarms
```
## Basic Setup
1. First, set up your environment variables:
```python
WORKSPACE_DIR="agent_workspace"
ANTHROPIC_API_KEY=""
```
## Code
```python
from swarms import Agent, MixtureOfAgents
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = MixtureOfAgents(
agents=[
risk_metrics_agent,
portfolio_risk_agent,
market_risk_agent,
],
layers=1,
max_loops=1,
output_type="final",
)
out = swarm.run(
"Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
)
print(out)
```
## Support and Community
If you're facing issues or want to learn more, check out the following resources to join our Discord, stay updated on Twitter, and watch tutorials on YouTube!
| Platform | Link | Description |
|----------|------|-------------|
| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides |
| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles |
| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support |
| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements |
| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates |
| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos |
| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events |

@ -0,0 +1,138 @@
# Agents with Vision and Tool Usage
This tutorial demonstrates how to create intelligent agents that can analyze images and use custom tools to perform specific actions based on their visual observations. You'll learn to build a quality control agent that can process images, identify potential security concerns, and automatically trigger appropriate responses using function calling capabilities.
## What You'll Learn
- How to configure an agent with multi-modal capabilities for image analysis
- How to integrate custom tools and functions with vision-enabled agents
- How to implement automated security analysis based on visual observations
- How to use function calling to trigger specific actions from image analysis results
- Best practices for building production-ready vision agents with tool integration
## Use Cases
This approach is perfect for:
- **Quality Control Systems**: Automated inspection of manufacturing processes
- **Security Monitoring**: Real-time threat detection and response
- **Object Detection**: Identifying and categorizing items in images
- **Compliance Checking**: Ensuring standards are met in various environments
- **Automated Reporting**: Generating detailed analysis reports from visual data
## Installation
Install the swarms package using pip:
```bash
pip install -U swarms
```
## Basic Setup
1. First, set up your environment variables:
```python
WORKSPACE_DIR="agent_workspace"
OPENAI_API_KEY=""
```
## Code
- Create tools for your agent as a function with types and documentation
- Pass tools to your agent `Agent(tools=[list_of_callables])`
- Add your image path to the run method like: `Agent().run(task=task, img=img)`
```python
from swarms.structs import Agent
from swarms.prompts.logistics import (
Quality_Control_Agent_Prompt,
)
# Image for analysis
factory_image = "image.jpg"
def security_analysis(danger_level: str) -> str:
"""
Analyzes the security danger level and returns an appropriate response.
Args:
danger_level (str, optional): The level of danger to analyze.
Can be "low", "medium", "high", or None. Defaults to None.
Returns:
str: A string describing the danger level assessment.
- "No danger level provided" if danger_level is None
- "No danger" if danger_level is "low"
- "Medium danger" if danger_level is "medium"
- "High danger" if danger_level is "high"
- "Unknown danger level" for any other value
"""
if danger_level is None:
return "No danger level provided"
if danger_level == "low":
return "No danger"
if danger_level == "medium":
return "Medium danger"
if danger_level == "high":
return "High danger"
return "Unknown danger level"
custom_system_prompt = f"""
{Quality_Control_Agent_Prompt}
You have access to tools that can help you with your analysis. When you need to perform a security analysis, you MUST use the security_analysis function with an appropriate danger level (low, medium, or high) based on your observations.
Always use the available tools when they are relevant to the task. If you determine there is any level of danger or security concern, call the security_analysis function with the appropriate danger level.
"""
# Quality control agent
quality_control_agent = Agent(
agent_name="Quality Control Agent",
agent_description="A quality control agent that analyzes images and provides a detailed report on the quality of the product in the image.",
# model_name="anthropic/claude-3-opus-20240229",
model_name="gpt-4o-mini",
system_prompt=custom_system_prompt,
multi_modal=True,
max_loops=1,
output_type="str-all-except-first",
# tools_list_dictionary=[schema],
tools=[security_analysis],
)
response = quality_control_agent.run(
task="Analyze the image and then perform a security analysis. Based on what you see in the image, determine if there is a low, medium, or high danger level and call the security_analysis function with that danger level",
img=factory_image,
)
```
## Support and Community
If you're facing issues or want to learn more, check out the following resources to join our Discord, stay updated on Twitter, and watch tutorials on YouTube!
| Platform | Link | Description |
|----------|------|-------------|
| 📚 Documentation | [docs.swarms.world](https://docs.swarms.world) | Official documentation and guides |
| 📝 Blog | [Medium](https://medium.com/@kyeg) | Latest updates and technical articles |
| 💬 Discord | [Join Discord](https://discord.gg/jM3Z6M9uMq) | Live chat and community support |
| 🐦 Twitter | [@kyegomez](https://twitter.com/kyegomez) | Latest news and announcements |
| 👥 LinkedIn | [The Swarm Corporation](https://www.linkedin.com/company/the-swarm-corporation) | Professional network and updates |
| 📺 YouTube | [Swarms Channel](https://www.youtube.com/channel/UC9yXyitkbU_WSy7bd_41SqQ) | Tutorials and demos |
| 🎫 Events | [Sign up here](https://lu.ma/5p2jnc2v) | Join our community events |

@ -0,0 +1,80 @@
from swarms import Agent, MixtureOfAgents
# Agent 1: Risk Metrics Calculator
risk_metrics_agent = Agent(
agent_name="Risk-Metrics-Calculator",
agent_description="Calculates key risk metrics like VaR, Sharpe ratio, and volatility",
system_prompt="""You are a risk metrics specialist. Calculate and explain:
- Value at Risk (VaR)
- Sharpe ratio
- Volatility
- Maximum drawdown
- Beta coefficient
Provide clear, numerical results with brief explanations.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 2: Portfolio Risk Analyzer
portfolio_risk_agent = Agent(
agent_name="Portfolio-Risk-Analyzer",
agent_description="Analyzes portfolio diversification and concentration risk",
system_prompt="""You are a portfolio risk analyst. Focus on:
- Portfolio diversification analysis
- Concentration risk assessment
- Correlation analysis
- Sector/asset allocation risk
- Liquidity risk evaluation
Provide actionable insights for risk reduction.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
# Agent 3: Market Risk Monitor
market_risk_agent = Agent(
agent_name="Market-Risk-Monitor",
agent_description="Monitors market conditions and identifies risk factors",
system_prompt="""You are a market risk monitor. Identify and assess:
- Market volatility trends
- Economic risk factors
- Geopolitical risks
- Interest rate risks
- Currency risks
Provide current risk alerts and trends.""",
max_loops=1,
# model_name="gpt-4o-mini",
random_model_enabled=True,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_tokens=4096,
)
swarm = MixtureOfAgents(
agents=[
risk_metrics_agent,
portfolio_risk_agent,
market_risk_agent,
],
layers=1,
max_loops=1,
output_type="final",
)
out = swarm.run(
"Calculate VaR and Sharpe ratio for a portfolio with 15% annual return and 20% volatility"
)
print(out)

@ -5,8 +5,9 @@ import random
def list_all_agents(
agents: List[Union[Callable, Any]],
conversation: Optional[Any] = None,
name: str = "",
add_to_conversation: bool = False,
name: Optional[str] = None,
description: Optional[str] = None,
add_to_conversation: Optional[bool] = False,
) -> str:
"""Lists all agents in a swarm and optionally adds them to a conversation.
@ -27,6 +28,7 @@ def list_all_agents(
>>> conversation = Conversation()
>>> agent_info = list_all_agents(agents, conversation, "MySwarm")
>>> print(agent_info)
Swarm: MySwarm
Total Agents: 2
Agent: Agent1
@ -39,8 +41,15 @@ def list_all_agents(
# Compile information about all agents
total_agents = len(agents)
all_agents = f"Total Agents: {total_agents}\n\n" + "\n\n".join(
f"Agent: {agent.agent_name} \n\n Description: {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)}"
all_agents = f"Team Name: {name}\n" if name else ""
all_agents += (
f"Team Description: {description}\n" if description else ""
)
all_agents += f"Total Agents: {total_agents}\n\n"
all_agents += "| Agent | Description |\n"
all_agents += "|-------|-------------|\n"
all_agents += "\n".join(
f"| {agent.agent_name} | {agent.description or (agent.system_prompt[:50] + '...' if len(agent.system_prompt) > 50 else agent.system_prompt)} |"
for agent in agents
)
@ -48,7 +57,7 @@ def list_all_agents(
# Add the agent information to the conversation
conversation.add(
role="System",
content=f"All Agents Available in the Swarm {name}:\n\n{all_agents}",
content=all_agents,
)
return all_agents

@ -1,17 +1,18 @@
import asyncio
import os
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.prompts.ag_prompt import aggregator_system_prompt_main
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
import concurrent.futures
from swarms.utils.output_types import OutputType
from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
logger = initialize_logger(log_folder="mixture_of_agents")
@ -25,13 +26,13 @@ class MixtureOfAgents:
self,
name: str = "MixtureOfAgents",
description: str = "A class to run a mixture of agents and aggregate their responses.",
agents: List[Agent] = [],
agents: List[Agent] = None,
aggregator_agent: Agent = None,
aggregator_system_prompt: str = aggregator_system_prompt_main,
layers: int = 3,
max_loops: int = 1,
return_str_on: bool = False,
output_type: OutputType = "dict",
output_type: OutputType = "final",
aggregator_model_name: str = "claude-3-5-sonnet-20240620",
) -> None:
"""
Initialize the Mixture of Agents class with agents and configuration.
@ -48,16 +49,36 @@ class MixtureOfAgents:
self.description = description
self.agents = agents
self.aggregator_agent = aggregator_agent
self.aggregator_system_prompt = aggregator_system_prompt_main
self.aggregator_system_prompt = aggregator_system_prompt
self.layers = layers
self.max_loops = max_loops
self.return_str_on = return_str_on
self.output_type = output_type
self.aggregator_model_name = aggregator_model_name
self.aggregator_agent = self.aggregator_agent_setup()
self.reliability_check()
self.conversation = Conversation()
list_all_agents(
agents=self.agents,
conversation=self.conversation,
description=self.description,
name=self.name,
add_to_conversation=True,
)
def aggregator_agent_setup(self):
return Agent(
agent_name="Aggregator Agent",
description="An agent that aggregates the responses of the other agents.",
system_prompt=aggregator_system_prompt_main,
model_name=self.aggregator_model_name,
temperature=0.5,
max_loops=1,
output_type="str-all-except-first",
)
def reliability_check(self) -> None:
"""
Performs a reliability check on the Mixture of Agents class.
@ -66,8 +87,8 @@ class MixtureOfAgents:
"Checking the reliability of the Mixture of Agents class."
)
if not self.agents:
raise ValueError("No reference agents provided.")
if len(self.agents) == 0:
raise ValueError("No agents provided.")
if not self.aggregator_agent:
raise ValueError("No aggregator agent provided.")
@ -78,129 +99,83 @@ class MixtureOfAgents:
if not self.layers:
raise ValueError("No layers provided.")
if self.layers < 1:
raise ValueError("Layers must be greater than 0.")
logger.info("Reliability check passed.")
logger.info("Mixture of Agents class is ready for use.")
def _get_final_system_prompt(
self, system_prompt: str, results: List[str]
) -> str:
"""
Constructs a system prompt for subsequent layers that includes previous responses.
def save_to_markdown_file(self, file_path: str = "moa.md"):
with open(file_path, "w") as f:
f.write(self.conversation.get_str())
Args:
system_prompt (str): The initial system prompt.
results (List[str]): A list of previous responses.
Returns:
str: The final system prompt including previous responses.
"""
return (
system_prompt
+ "\n"
+ "\n".join(
[
f"{i+1}. {str(element)}"
for i, element in enumerate(results)
]
)
)
async def _run_agent_async(
def step(
self,
agent: Agent,
task: str,
prev_responses: Optional[List[str]] = None,
) -> str:
"""
Asynchronous method to run a single agent.
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
# self.conversation.add(role="User", content=task)
Args:
agent (Agent): The agent to be run.
task (str): The task for the agent.
prev_responses (Optional[List[str]], optional): A list of previous responses. Defaults to None.
# Run agents concurrently
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
# Submit all agent tasks and store with their index
future_to_agent = {
executor.submit(
agent.run, task=task, img=img, imgs=imgs
): agent
for agent in self.agents
}
Returns:
str: The response from the agent.
"""
# If there are previous responses, update the agent's system prompt
if prev_responses:
system_prompt_with_responses = (
self._get_final_system_prompt(
self.aggregator_system_prompt, prev_responses
)
)
agent.system_prompt = system_prompt_with_responses
# Collect results and add to conversation in completion order
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
output = future.result()
self.conversation.add(role=agent.name, content=output)
# Run the agent asynchronously
response = await asyncio.to_thread(agent.run, task)
return self.conversation.get_str()
self.conversation.add(agent.agent_name, response)
def _run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
# Log the agent's response
print(f"Agent {agent.agent_name} response: {response}")
return response
self.conversation.add(role="User", content=task)
async def _run_async(self, task: str) -> None:
"""
Asynchronous method to run the Mixture of Agents process.
for i in range(self.layers):
out = self.step(
task=self.conversation.get_str(), img=img, imgs=imgs
)
task = out
Args:
task (str): The task for the mixture of agents.
"""
# Gather initial responses from reference agents
results: List[str] = await asyncio.gather(
*[
self._run_agent_async(agent, task)
for agent in self.agents
]
out = self.aggregator_agent.run(
task=self.conversation.get_str()
)
# Process additional layers, if applicable
for _ in range(1, self.layers - 1):
results = await asyncio.gather(
*[
self._run_agent_async(
agent, task, prev_responses=results
)
for agent in self.agents
]
)
# Perform final aggregation using the aggregator agent
final_result = await self._run_agent_async(
self.aggregator_agent, task, prev_responses=results
self.conversation.add(
role=self.aggregator_agent.agent_name, content=out
)
print(f"Final Aggregated Response: {final_result}")
out = history_output_formatter(
conversation=self.conversation, type=self.output_type
)
def run(self, task: str) -> None:
"""
Synchronous wrapper to run the async process.
return out
Args:
task (str): The task for the mixture of agents.
"""
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
try:
self.conversation.add("user", task)
for _ in range(self.max_loops):
# Add previous context to task if available
prompt = f"History: {self.conversation.get_str()}\n\nTask: {task}"
# Run async process
asyncio.run(self._run_async(prompt))
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
return self._run(task=task, img=img, imgs=imgs)
except Exception as e:
logger.error(f"Error running mixture of agents: {str(e)}")
raise e
logger.error(f"Error running Mixture of Agents: {e}")
return f"Error: {e}"
def run_batched(self, tasks: List[str]) -> List[str]:
"""

Loading…
Cancel
Save